From 62aea84fcbda91ef4ee8a9b18e46b9ba50b66ab6 Mon Sep 17 00:00:00 2001 From: madipo2611 Date: Tue, 19 Aug 2025 14:12:16 +0300 Subject: [PATCH] reset to v0.0.19 --- internal/http/graph/messages_resolvers.go | 93 +++++++++-------------- internal/http/middleware/ws_auth.go | 25 ++---- internal/http/server.go | 31 +++----- proto/messages.pb.go | 13 +--- proto/messages.proto | 1 - 5 files changed, 53 insertions(+), 110 deletions(-) diff --git a/internal/http/graph/messages_resolvers.go b/internal/http/graph/messages_resolvers.go index 1bb2763..278e9f9 100644 --- a/internal/http/graph/messages_resolvers.go +++ b/internal/http/graph/messages_resolvers.go @@ -121,23 +121,50 @@ func (r *queryResolver) GetUserChats(ctx context.Context, userID int) ([]*domain // MessageStream реализация подписки на новые сообщения func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<-chan *domain.Message, error) { - messageChan := make(chan *domain.Message, 100) + messageChan := make(chan *domain.Message, 100) // Увеличиваем буфер go func() { defer close(messageChan) - defer log.Println("MessageStream: goroutine stopped") - for { + retryDelay := time.Second + const maxRetries = 5 + + for i := 0; i < maxRetries; i++ { select { case <-ctx.Done(): - log.Printf("MessageStream: context canceled (reason: %v)", ctx.Err()) return default: - err := r.runMessageStream(ctx, userID, messageChan) + // Создаем новый контекст БЕЗ таймаута + stream, err := r.MessageClient.StreamMessages(ctx, &proto.StreamMessagesRequest{ + UserId: int32(userID), + }) if err != nil { - log.Printf("MessageStream error: %v, reconnecting...", err) + log.Printf("Stream connection error (attempt %d/%d): %v", i+1, maxRetries, err) + time.Sleep(retryDelay) + retryDelay *= 2 + continue + } + + // Сброс задержки при успешном подключении + retryDelay = time.Second + + for { + msg, err := stream.Recv() + if err != nil { + log.Printf("Stream receive error: %v", err) + break // Выходим из внутреннего цикла для переподключения + } + + if msg == nil || msg.Message == nil { + continue // Пропускаем heartbeat/пустые сообщения + } + + select { + case <-ctx.Done(): + return + case messageChan <- protoMessageToDomain(msg.Message): + } } - time.Sleep(2 * time.Second) // Задержка перед переподключением } } }() @@ -145,58 +172,6 @@ func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (< return messageChan, nil } -func (r *subscriptionResolver) runMessageStream(ctx context.Context, userID int, messageChan chan<- *domain.Message) error { - log.Printf("Starting new stream for user %d", userID) - - // Создаем отдельный контекст для gRPC стрима - grpcCtx := context.Background() - - _, err := r.MessageClient.UpdateMessageStatus(ctx, &proto.UpdateMessageStatusRequest{ - MessageId: 0, // 0 = все сообщения для пользователя - Status: "DELIVERED", - UserId: int32(userID), - }) - if err != nil { - log.Printf("Failed to mark messages as delivered: %v", err) - } - - stream, err := r.MessageClient.StreamMessages(grpcCtx, &proto.StreamMessagesRequest{ - UserId: int32(userID), - }) - if err != nil { - return fmt.Errorf("failed to create stream: %w", err) - } - - heartbeat := time.NewTicker(25 * time.Second) - defer heartbeat.Stop() - - for { - select { - case <-heartbeat.C: - // Отправляем ping для поддержания соединения - if err := stream.Context().Err(); err != nil { - return fmt.Errorf("connection lost: %w", err) - } - case <-ctx.Done(): - return nil - default: - msg, err := stream.Recv() - if err != nil { - return fmt.Errorf("receive error: %w", err) - } - - if msg.GetMessage() != nil { - select { - case messageChan <- protoMessageToDomain(msg.Message): - log.Printf("Delivered message %d to user %d", msg.Message.Id, userID) - case <-ctx.Done(): - return nil - } - } - } - } -} - // Преобразование proto-сообщения в domain-модель func protoMessageToDomain(msg *proto.Message) *domain.Message { return &domain.Message{ diff --git a/internal/http/middleware/ws_auth.go b/internal/http/middleware/ws_auth.go index ad3add5..4926a82 100644 --- a/internal/http/middleware/ws_auth.go +++ b/internal/http/middleware/ws_auth.go @@ -3,7 +3,6 @@ package middleware import ( "context" "net/http" - "strings" "tailly_back_v2/pkg/auth" ) @@ -11,39 +10,31 @@ import ( func WSAuthMiddleware(tokenAuth *auth.TokenAuth) func(http.Handler) http.Handler { return func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Проверяем, что это WebSocket запрос if r.Header.Get("Upgrade") == "websocket" { + // Извлекаем токен из query параметров или заголовков token := extractTokenFromRequest(r) + if token != "" { - if userID, err := tokenAuth.ValidateAccessToken(token); err == nil { - // Создаем контекст без таймаута для WebSocket + userID, err := tokenAuth.ValidateAccessToken(token) + if err == nil { + // Добавляем userID в контекст ctx := context.WithValue(r.Context(), userIDKey, userID) r = r.WithContext(ctx) } } } + next.ServeHTTP(w, r) }) } } func extractTokenFromRequest(r *http.Request) string { - // Проверяем куки + // Только проверка кук (как в вашем коде) cookie, err := r.Cookie("accessToken") if err == nil { return cookie.Value } - - // Проверяем заголовок Authorization - authHeader := r.Header.Get("Authorization") - if authHeader != "" && strings.HasPrefix(authHeader, "Bearer ") { - return strings.TrimPrefix(authHeader, "Bearer ") - } - - // Проверяем query параметры - token := r.URL.Query().Get("token") - if token != "" { - return token - } - return "" } diff --git a/internal/http/server.go b/internal/http/server.go index 013f4ee..c2b1b9b 100644 --- a/internal/http/server.go +++ b/internal/http/server.go @@ -65,9 +65,10 @@ func (s *Server) configureRouter() { s.router.Use(middleware.MetricsMiddleware) s.router.Use(middleware.CORS(allowedOrigins)) - // Создаем отдельный обработчик для WebSocket - wsHandler := handler.NewDefaultServer(graph.NewExecutableSchema(graph.Config{ - Resolvers: graph.NewResolver(s.services, s.db, s.services.Messages), + // Основной GraphQL обработчик + resolver := graph.NewResolver(s.services, s.db, s.services.Messages) + srv := handler.NewDefaultServer(graph.NewExecutableSchema(graph.Config{ + Resolvers: resolver, })) // Настройка WebSocket транспорта @@ -81,34 +82,20 @@ func (s *Server) configureRouter() { } return false }, - ReadBufferSize: 1024, - WriteBufferSize: 1024, } - wsHandler.AddTransport(&transport.Websocket{ + srv.AddTransport(&transport.Websocket{ Upgrader: wsUpgrader, - KeepAlivePingInterval: 30 * time.Second, + KeepAlivePingInterval: 15 * time.Second, }) - // Создаем обычный HTTP обработчик - httpHandler := handler.NewDefaultServer(graph.NewExecutableSchema(graph.Config{ - Resolvers: graph.NewResolver(s.services, s.db, s.services.Messages), - })) - // Раздельные эндпоинты: // 1. Обычные HTTP GraphQL запросы (только POST) - s.router.With(middleware.AuthMiddleware(s.tokenAuth)).Post("/query", httpHandler.ServeHTTP) + s.router.With(middleware.AuthMiddleware(s.tokenAuth)).Post("/query", srv.ServeHTTP) - // 2. WebSocket подписки (отдельный эндпоинт с WS middleware) - s.router.With(middleware.WSAuthMiddleware(s.tokenAuth)).Get("/ws", func(w http.ResponseWriter, r *http.Request) { - // Проверяем, что это WebSocket запрос - if websocket.IsWebSocketUpgrade(r) { - wsHandler.ServeHTTP(w, r) - return - } - http.Error(w, "WebSocket upgrade required", http.StatusBadRequest) - }) + // 2. WebSocket подписки (отдельный эндпоинт) + s.router.With(middleware.WSAuthMiddleware(s.tokenAuth)).Get("/ws", srv.ServeHTTP) // 3. Playground и другие обработчики s.router.Handle("/", playground.Handler("GraphQL playground", "/query")) diff --git a/proto/messages.pb.go b/proto/messages.pb.go index abddc03..9042144 100644 --- a/proto/messages.pb.go +++ b/proto/messages.pb.go @@ -294,7 +294,6 @@ type UpdateMessageStatusRequest struct { state protoimpl.MessageState `protogen:"open.v1"` MessageId int32 `protobuf:"varint,1,opt,name=message_id,json=messageId,proto3" json:"message_id,omitempty"` Status string `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"` - UserId int32 `protobuf:"varint,3,opt,name=userId,proto3" json:"userId,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -343,13 +342,6 @@ func (x *UpdateMessageStatusRequest) GetStatus() string { return "" } -func (x *UpdateMessageStatusRequest) GetUserId() int32 { - if x != nil { - return x.UserId - } - return 0 -} - type StreamMessagesRequest struct { state protoimpl.MessageState `protogen:"open.v1"` UserId int32 `protobuf:"varint,1,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"` @@ -766,12 +758,11 @@ const file_messages_proto_rawDesc = "" + "\x05limit\x18\x02 \x01(\x05R\x05limit\x12\x16\n" + "\x06offset\x18\x03 \x01(\x05R\x06offset\".\n" + "\x13GetUserChatsRequest\x12\x17\n" + - "\auser_id\x18\x01 \x01(\x05R\x06userId\"k\n" + + "\auser_id\x18\x01 \x01(\x05R\x06userId\"S\n" + "\x1aUpdateMessageStatusRequest\x12\x1d\n" + "\n" + "message_id\x18\x01 \x01(\x05R\tmessageId\x12\x16\n" + - "\x06status\x18\x02 \x01(\tR\x06status\x12\x16\n" + - "\x06userId\x18\x03 \x01(\x05R\x06userId\"0\n" + + "\x06status\x18\x02 \x01(\tR\x06status\"0\n" + "\x15StreamMessagesRequest\x12\x17\n" + "\auser_id\x18\x01 \x01(\x05R\x06userId\"\xdd\x01\n" + "\aMessage\x12\x0e\n" + diff --git a/proto/messages.proto b/proto/messages.proto index 7ddf475..0a824d0 100644 --- a/proto/messages.proto +++ b/proto/messages.proto @@ -45,7 +45,6 @@ message GetUserChatsRequest { message UpdateMessageStatusRequest { int32 message_id = 1; string status = 2; - int32 userId = 3; } message StreamMessagesRequest {