From f9e9347389b453e4130b7de8d24d6d99a8270ca3 Mon Sep 17 00:00:00 2001 From: madipo2611 Date: Sun, 17 Aug 2025 08:32:32 +0300 Subject: [PATCH] =?UTF-8?q?v0.0.18.4=20=D0=A4=D0=B8=D0=BA=D1=81=20=D0=B0?= =?UTF-8?q?=D0=BA=D1=82=D0=B8=D0=B2=D0=BD=D0=BE=D0=B3=D0=BE=20=D1=81=D0=BE?= =?UTF-8?q?=D0=B5=D0=B4=D0=B8=D0=BD=D0=B5=D0=BD=D0=B8=D1=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/http/graph/messages_resolvers.go | 61 +++++++++++++++++++---- 1 file changed, 51 insertions(+), 10 deletions(-) diff --git a/internal/http/graph/messages_resolvers.go b/internal/http/graph/messages_resolvers.go index dbedac2..f354a2d 100644 --- a/internal/http/graph/messages_resolvers.go +++ b/internal/http/graph/messages_resolvers.go @@ -3,6 +3,7 @@ package graph import ( "context" "fmt" + "log" "tailly_back_v2/internal/domain" "tailly_back_v2/proto" "time" @@ -120,25 +121,62 @@ func (r *queryResolver) GetUserChats(ctx context.Context, userID int) ([]*domain // MessageStream реализация подписки на новые сообщения func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<-chan *domain.Message, error) { - stream, err := r.MessageClient.StreamMessages(ctx, &proto.StreamMessagesRequest{ - UserId: int32(userID), - }) - if err != nil { - return nil, fmt.Errorf("failed to stream messages: %w", err) - } + messageChan := make(chan *domain.Message, 10) // Буферизованный канал - messageChan := make(chan *domain.Message) go func() { defer close(messageChan) + retryCount := 0 + maxRetries := 5 + for { - msg, err := stream.Recv() - if err != nil { + if retryCount >= maxRetries { + log.Printf("Max retries (%d) reached for user %d", maxRetries, userID) return } + + // Создаем новое соединение + stream, err := r.MessageClient.StreamMessages(ctx, &proto.StreamMessagesRequest{ + UserId: int32(userID), + }) + if err != nil { + log.Printf("Stream connection error for user %d: %v (retry %d)", userID, err, retryCount) + retryCount++ + select { + case <-ctx.Done(): + return + case <-time.After(time.Duration(retryCount) * time.Second): + continue + } + } + + retryCount = 0 // Сброс счетчика при успешном подключении + + for { + msg, err := stream.Recv() + if err != nil { + log.Printf("Stream receive error for user %d: %v", userID, err) + break + } + + // Пропускаем heartbeat сообщения + if msg.Message.GetContent() == "__heartbeat__" { + continue + } + + select { + case <-ctx.Done(): + return + case messageChan <- protoMessageToDomain(msg.Message): + // Сообщение успешно отправлено в канал + } + } + + // Пауза перед повторной попыткой select { case <-ctx.Done(): return - case messageChan <- protoMessageToDomain(msg.Message): + case <-time.After(1 * time.Second): + retryCount++ } } }() @@ -148,6 +186,9 @@ func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (< // Преобразование proto-сообщения в domain-модель func protoMessageToDomain(msg *proto.Message) *domain.Message { + if msg == nil { + return nil + } return &domain.Message{ ID: int(msg.Id), ChatID: int(msg.ChatId),