diff --git a/internal/http/graph/messages_resolvers.go b/internal/http/graph/messages_resolvers.go index da66d51..c56e47e 100644 --- a/internal/http/graph/messages_resolvers.go +++ b/internal/http/graph/messages_resolvers.go @@ -125,14 +125,7 @@ func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (< return nil, fmt.Errorf("user not authenticated") } - stream, err := r.MessageClient.StreamMessages(ctx, &proto.StreamMessagesRequest{ - UserId: int32(userID), - }) - if err != nil { - return nil, fmt.Errorf("failed to stream messages: %w", err) - } - - messageChan := make(chan *domain.Message) + messageChan := make(chan *domain.Message, 10) // Буферизированный канал go func() { defer func() { close(messageChan) @@ -141,26 +134,47 @@ func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (< } }() + // Создаем новый контекст с таймаутом для переподключения + retryCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + for { select { case <-ctx.Done(): return default: - msg, err := stream.Recv() + stream, err := r.MessageClient.StreamMessages(retryCtx, &proto.StreamMessagesRequest{ + UserId: int32(userID), + }) if err != nil { - log.Printf("Stream receive error: %v", err) - return - } - - // Добавляем проверку на nil сообщение - if msg == nil || msg.Message == nil { + log.Printf("Failed to create stream: %v", err) + time.Sleep(1 * time.Second) continue } - select { - case <-ctx.Done(): - return - case messageChan <- protoMessageToDomain(msg.Message): + for { + msg, err := stream.Recv() + if err != nil { + log.Printf("Stream receive error: %v", err) + break // Выходим из внутреннего цикла для переподключения + } + + // Полная проверка сообщения + if msg == nil || msg.Message == nil || msg.Message.Id == 0 { + continue + } + + domainMsg := protoMessageToDomain(msg.Message) + if domainMsg.ID == 0 || domainMsg.Content == "" { + continue + } + + select { + case <-ctx.Done(): + return + case messageChan <- domainMsg: + // Успешно отправили сообщение + } } } } @@ -172,7 +186,12 @@ func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (< // Преобразование proto-сообщения в domain-модель func protoMessageToDomain(msg *proto.Message) *domain.Message { if msg == nil { - return &domain.Message{} + return &domain.Message{} // Возвращаем пустую структуру вместо nil + } + + createdAt := time.Now() + if msg.CreatedAt != nil { + createdAt = msg.CreatedAt.AsTime() } return &domain.Message{ @@ -182,7 +201,7 @@ func protoMessageToDomain(msg *proto.Message) *domain.Message { ReceiverID: int(msg.ReceiverId), Content: msg.Content, Status: msg.Status, - CreatedAt: msg.CreatedAt.AsTime(), + CreatedAt: createdAt, } }