diff --git a/internal/http/graph/messages_resolvers.go b/internal/http/graph/messages_resolvers.go index 278e9f9..4795245 100644 --- a/internal/http/graph/messages_resolvers.go +++ b/internal/http/graph/messages_resolvers.go @@ -121,50 +121,23 @@ func (r *queryResolver) GetUserChats(ctx context.Context, userID int) ([]*domain // MessageStream реализация подписки на новые сообщения func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<-chan *domain.Message, error) { - messageChan := make(chan *domain.Message, 100) // Увеличиваем буфер + messageChan := make(chan *domain.Message, 100) go func() { defer close(messageChan) + defer log.Println("MessageStream: goroutine stopped") - retryDelay := time.Second - const maxRetries = 5 - - for i := 0; i < maxRetries; i++ { + for { select { case <-ctx.Done(): + log.Printf("MessageStream: context canceled (reason: %v)", ctx.Err()) return default: - // Создаем новый контекст БЕЗ таймаута - stream, err := r.MessageClient.StreamMessages(ctx, &proto.StreamMessagesRequest{ - UserId: int32(userID), - }) + err := r.runMessageStream(ctx, userID, messageChan) if err != nil { - log.Printf("Stream connection error (attempt %d/%d): %v", i+1, maxRetries, err) - time.Sleep(retryDelay) - retryDelay *= 2 - continue - } - - // Сброс задержки при успешном подключении - retryDelay = time.Second - - for { - msg, err := stream.Recv() - if err != nil { - log.Printf("Stream receive error: %v", err) - break // Выходим из внутреннего цикла для переподключения - } - - if msg == nil || msg.Message == nil { - continue // Пропускаем heartbeat/пустые сообщения - } - - select { - case <-ctx.Done(): - return - case messageChan <- protoMessageToDomain(msg.Message): - } + log.Printf("MessageStream error: %v, reconnecting...", err) } + time.Sleep(2 * time.Second) // Задержка перед переподключением } } }() @@ -172,6 +145,58 @@ func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (< return messageChan, nil } +func (r *subscriptionResolver) runMessageStream(ctx context.Context, userID int, messageChan chan<- *domain.Message) error { + log.Printf("Starting new stream for user %d", userID) + + _, err := r.MessageClient.UpdateMessageStatus(ctx, &proto.UpdateMessageStatusRequest{ + MessageId: 0, // 0 = все сообщения для пользователя + Status: "DELIVERED", + UserId: int32(userID), + }) + if err != nil { + log.Printf("Failed to mark messages as delivered: %v", err) + } + + streamCtx, cancel := context.WithCancel(ctx) + defer cancel() + + stream, err := r.MessageClient.StreamMessages(streamCtx, &proto.StreamMessagesRequest{ + UserId: int32(userID), + }) + if err != nil { + return fmt.Errorf("failed to create stream: %w", err) + } + + heartbeat := time.NewTicker(25 * time.Second) + defer heartbeat.Stop() + + for { + select { + case <-heartbeat.C: + // Отправляем ping для поддержания соединения + if err := stream.Context().Err(); err != nil { + return fmt.Errorf("connection lost: %w", err) + } + case <-ctx.Done(): + return nil + default: + msg, err := stream.Recv() + if err != nil { + return fmt.Errorf("receive error: %w", err) + } + + if msg.GetMessage() != nil { + select { + case messageChan <- protoMessageToDomain(msg.Message): + log.Printf("Delivered message %d to user %d", msg.Message.Id, userID) + case <-ctx.Done(): + return nil + } + } + } + } +} + // Преобразование proto-сообщения в domain-модель func protoMessageToDomain(msg *proto.Message) *domain.Message { return &domain.Message{ diff --git a/internal/http/middleware/ws_auth.go b/internal/http/middleware/ws_auth.go index 4926a82..667da89 100644 --- a/internal/http/middleware/ws_auth.go +++ b/internal/http/middleware/ws_auth.go @@ -4,27 +4,25 @@ import ( "context" "net/http" "tailly_back_v2/pkg/auth" + "time" ) // WSAuthMiddleware проверяет JWT токен для WebSocket соединений func WSAuthMiddleware(tokenAuth *auth.TokenAuth) func(http.Handler) http.Handler { return func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Проверяем, что это WebSocket запрос if r.Header.Get("Upgrade") == "websocket" { - // Извлекаем токен из query параметров или заголовков - token := extractTokenFromRequest(r) + ctx, cancel := context.WithTimeout(r.Context(), 30*time.Minute) + defer cancel() + token := extractTokenFromRequest(r) if token != "" { - userID, err := tokenAuth.ValidateAccessToken(token) - if err == nil { - // Добавляем userID в контекст - ctx := context.WithValue(r.Context(), userIDKey, userID) - r = r.WithContext(ctx) + if userID, err := tokenAuth.ValidateAccessToken(token); err == nil { + ctx = context.WithValue(ctx, userIDKey, userID) } } + r = r.WithContext(ctx) } - next.ServeHTTP(w, r) }) } diff --git a/internal/http/server.go b/internal/http/server.go index c2b1b9b..9f16cfc 100644 --- a/internal/http/server.go +++ b/internal/http/server.go @@ -82,11 +82,13 @@ func (s *Server) configureRouter() { } return false }, + ReadBufferSize: 1024, + WriteBufferSize: 1024, } srv.AddTransport(&transport.Websocket{ Upgrader: wsUpgrader, - KeepAlivePingInterval: 15 * time.Second, + KeepAlivePingInterval: 30 * time.Second, }) // Раздельные эндпоинты: diff --git a/proto/messages.pb.go b/proto/messages.pb.go index 9042144..abddc03 100644 --- a/proto/messages.pb.go +++ b/proto/messages.pb.go @@ -294,6 +294,7 @@ type UpdateMessageStatusRequest struct { state protoimpl.MessageState `protogen:"open.v1"` MessageId int32 `protobuf:"varint,1,opt,name=message_id,json=messageId,proto3" json:"message_id,omitempty"` Status string `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"` + UserId int32 `protobuf:"varint,3,opt,name=userId,proto3" json:"userId,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -342,6 +343,13 @@ func (x *UpdateMessageStatusRequest) GetStatus() string { return "" } +func (x *UpdateMessageStatusRequest) GetUserId() int32 { + if x != nil { + return x.UserId + } + return 0 +} + type StreamMessagesRequest struct { state protoimpl.MessageState `protogen:"open.v1"` UserId int32 `protobuf:"varint,1,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"` @@ -758,11 +766,12 @@ const file_messages_proto_rawDesc = "" + "\x05limit\x18\x02 \x01(\x05R\x05limit\x12\x16\n" + "\x06offset\x18\x03 \x01(\x05R\x06offset\".\n" + "\x13GetUserChatsRequest\x12\x17\n" + - "\auser_id\x18\x01 \x01(\x05R\x06userId\"S\n" + + "\auser_id\x18\x01 \x01(\x05R\x06userId\"k\n" + "\x1aUpdateMessageStatusRequest\x12\x1d\n" + "\n" + "message_id\x18\x01 \x01(\x05R\tmessageId\x12\x16\n" + - "\x06status\x18\x02 \x01(\tR\x06status\"0\n" + + "\x06status\x18\x02 \x01(\tR\x06status\x12\x16\n" + + "\x06userId\x18\x03 \x01(\x05R\x06userId\"0\n" + "\x15StreamMessagesRequest\x12\x17\n" + "\auser_id\x18\x01 \x01(\x05R\x06userId\"\xdd\x01\n" + "\aMessage\x12\x0e\n" + diff --git a/proto/messages.proto b/proto/messages.proto index 0a824d0..7ddf475 100644 --- a/proto/messages.proto +++ b/proto/messages.proto @@ -45,6 +45,7 @@ message GetUserChatsRequest { message UpdateMessageStatusRequest { int32 message_id = 1; string status = 2; + int32 userId = 3; } message StreamMessagesRequest {