diff --git a/internal/http/graph/messages_resolvers.go b/internal/http/graph/messages_resolvers.go index 278e9f9..a609e00 100644 --- a/internal/http/graph/messages_resolvers.go +++ b/internal/http/graph/messages_resolvers.go @@ -121,50 +121,23 @@ func (r *queryResolver) GetUserChats(ctx context.Context, userID int) ([]*domain // MessageStream реализация подписки на новые сообщения func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<-chan *domain.Message, error) { - messageChan := make(chan *domain.Message, 100) // Увеличиваем буфер + messageChan := make(chan *domain.Message, 100) go func() { defer close(messageChan) + defer log.Println("MessageStream: goroutine stopped") - retryDelay := time.Second - const maxRetries = 5 - - for i := 0; i < maxRetries; i++ { + for { select { case <-ctx.Done(): + log.Printf("MessageStream: context canceled (reason: %v)", ctx.Err()) return default: - // Создаем новый контекст БЕЗ таймаута - stream, err := r.MessageClient.StreamMessages(ctx, &proto.StreamMessagesRequest{ - UserId: int32(userID), - }) + err := r.runMessageStream(ctx, userID, messageChan) if err != nil { - log.Printf("Stream connection error (attempt %d/%d): %v", i+1, maxRetries, err) - time.Sleep(retryDelay) - retryDelay *= 2 - continue - } - - // Сброс задержки при успешном подключении - retryDelay = time.Second - - for { - msg, err := stream.Recv() - if err != nil { - log.Printf("Stream receive error: %v", err) - break // Выходим из внутреннего цикла для переподключения - } - - if msg == nil || msg.Message == nil { - continue // Пропускаем heartbeat/пустые сообщения - } - - select { - case <-ctx.Done(): - return - case messageChan <- protoMessageToDomain(msg.Message): - } + log.Printf("MessageStream error: %v, reconnecting...", err) } + time.Sleep(2 * time.Second) // Задержка перед переподключением } } }() @@ -172,6 +145,46 @@ func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (< return messageChan, nil } +func (r *subscriptionResolver) runMessageStream(ctx context.Context, userID int, messageChan chan<- *domain.Message) error { + log.Printf("Starting new stream for user %d", userID) + + _, err := r.MessageClient.UpdateMessageStatus(ctx, &proto.UpdateMessageStatusRequest{ + MessageId: 0, // 0 = все сообщения для пользователя + Status: "DELIVERED", + }) + if err != nil { + log.Printf("Failed to mark messages as delivered: %v", err) + } + + stream, err := r.MessageClient.StreamMessages(ctx, &proto.StreamMessagesRequest{ + UserId: int32(userID), + }) + if err != nil { + return fmt.Errorf("stream creation failed: %w", err) + } + defer log.Println("Stream closed") + + for { + msg, err := stream.Recv() + if err != nil { + return fmt.Errorf("stream receive failed: %w", err) + } + + if msg == nil || msg.Message == nil { + continue // Пропускаем heartbeat + } + + log.Printf("Received message: %+v", msg.Message) + + select { + case <-ctx.Done(): + return nil + case messageChan <- protoMessageToDomain(msg.Message): + log.Printf("Message forwarded to channel: %d", msg.Message.Id) + } + } +} + // Преобразование proto-сообщения в domain-модель func protoMessageToDomain(msg *proto.Message) *domain.Message { return &domain.Message{ diff --git a/internal/http/middleware/ws_auth.go b/internal/http/middleware/ws_auth.go index 4926a82..497ee2f 100644 --- a/internal/http/middleware/ws_auth.go +++ b/internal/http/middleware/ws_auth.go @@ -2,6 +2,7 @@ package middleware import ( "context" + "log" "net/http" "tailly_back_v2/pkg/auth" ) @@ -10,21 +11,22 @@ import ( func WSAuthMiddleware(tokenAuth *auth.TokenAuth) func(http.Handler) http.Handler { return func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Проверяем, что это WebSocket запрос if r.Header.Get("Upgrade") == "websocket" { - // Извлекаем токен из query параметров или заголовков + log.Printf("New WS connection from %s", r.RemoteAddr) token := extractTokenFromRequest(r) + log.Printf("Extracted token: %s", token[:10]+"...") // Логируем часть токена if token != "" { userID, err := tokenAuth.ValidateAccessToken(token) - if err == nil { - // Добавляем userID в контекст + if err != nil { + log.Printf("WS auth failed: %v", err) + } else { + log.Printf("Authenticated user ID: %d", userID) ctx := context.WithValue(r.Context(), userIDKey, userID) r = r.WithContext(ctx) } } } - next.ServeHTTP(w, r) }) } diff --git a/internal/http/server.go b/internal/http/server.go index c2b1b9b..6422176 100644 --- a/internal/http/server.go +++ b/internal/http/server.go @@ -82,11 +82,13 @@ func (s *Server) configureRouter() { } return false }, + ReadBufferSize: 1024, + WriteBufferSize: 1024, } srv.AddTransport(&transport.Websocket{ Upgrader: wsUpgrader, - KeepAlivePingInterval: 15 * time.Second, + KeepAlivePingInterval: 10 * time.Second, }) // Раздельные эндпоинты: