From 7f48fb767f5c66d28b997274e53169a5f49e4b39 Mon Sep 17 00:00:00 2001 From: madipo2611 Date: Sun, 17 Aug 2025 15:49:08 +0300 Subject: [PATCH] =?UTF-8?q?v0.0.18.3=20=D0=A4=D0=B8=D0=BA=D1=81=20=D0=BF?= =?UTF-8?q?=D0=B5=D1=80=D0=B5=D0=BF=D0=BE=D0=B4=D0=BA=D0=BB=D1=8E=D1=87?= =?UTF-8?q?=D0=B5=D0=BD=D0=B8=D1=8F=20=D0=BA=20rabbit?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/http/graph/messages_resolvers.go | 38 +++++++++++++++-------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/internal/http/graph/messages_resolvers.go b/internal/http/graph/messages_resolvers.go index dbedac2..8fb0a51 100644 --- a/internal/http/graph/messages_resolvers.go +++ b/internal/http/graph/messages_resolvers.go @@ -3,6 +3,7 @@ package graph import ( "context" "fmt" + "log" "tailly_back_v2/internal/domain" "tailly_back_v2/proto" "time" @@ -120,25 +121,36 @@ func (r *queryResolver) GetUserChats(ctx context.Context, userID int) ([]*domain // MessageStream реализация подписки на новые сообщения func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<-chan *domain.Message, error) { - stream, err := r.MessageClient.StreamMessages(ctx, &proto.StreamMessagesRequest{ - UserId: int32(userID), - }) - if err != nil { - return nil, fmt.Errorf("failed to stream messages: %w", err) - } - messageChan := make(chan *domain.Message) + go func() { defer close(messageChan) + for { - msg, err := stream.Recv() + stream, err := r.MessageClient.StreamMessages(ctx, &proto.StreamMessagesRequest{ + UserId: int32(userID), + }) if err != nil { - return + log.Printf("Failed to stream messages: %v", err) + select { + case <-ctx.Done(): + return + case <-time.After(5 * time.Second): // Задержка перед повторной попыткой + continue + } } - select { - case <-ctx.Done(): - return - case messageChan <- protoMessageToDomain(msg.Message): + + for { + msg, err := stream.Recv() + if err != nil { + log.Printf("Stream receive error: %v", err) + break + } + select { + case <-ctx.Done(): + return + case messageChan <- protoMessageToDomain(msg.Message): + } } } }()