From 2a3ae72db96c4e23f349113fa3f1fe73f41ee2ed Mon Sep 17 00:00:00 2001 From: madipo2611 Date: Sun, 17 Aug 2025 09:16:35 +0300 Subject: [PATCH] =?UTF-8?q?v0.0.18.5=20=D0=B7=D0=B0=D0=BC=D0=B5=D0=BD?= =?UTF-8?q?=D0=B0=20ws=20=D0=BD=D0=B0=20sse?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/http/graph/messages_resolvers.go | 63 ++++++----------------- internal/http/server.go | 40 +++++++------- 2 files changed, 38 insertions(+), 65 deletions(-) diff --git a/internal/http/graph/messages_resolvers.go b/internal/http/graph/messages_resolvers.go index f354a2d..10ff393 100644 --- a/internal/http/graph/messages_resolvers.go +++ b/internal/http/graph/messages_resolvers.go @@ -121,62 +121,36 @@ func (r *queryResolver) GetUserChats(ctx context.Context, userID int) ([]*domain // MessageStream реализация подписки на новые сообщения func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<-chan *domain.Message, error) { - messageChan := make(chan *domain.Message, 10) // Буферизованный канал + // Создаем канал для GraphQL подписки + messageChan := make(chan *domain.Message) + + // Вызываем gRPC стриминг + stream, err := r.MessageClient.StreamMessages(ctx, &proto.StreamMessagesRequest{ + UserId: int32(userID), + }) + if err != nil { + return nil, fmt.Errorf("failed to stream messages: %w", err) + } go func() { defer close(messageChan) - retryCount := 0 - maxRetries := 5 for { - if retryCount >= maxRetries { - log.Printf("Max retries (%d) reached for user %d", maxRetries, userID) + msg, err := stream.Recv() + if err != nil { + log.Printf("gRPC stream error: %v", err) return } - // Создаем новое соединение - stream, err := r.MessageClient.StreamMessages(ctx, &proto.StreamMessagesRequest{ - UserId: int32(userID), - }) - if err != nil { - log.Printf("Stream connection error for user %d: %v (retry %d)", userID, err, retryCount) - retryCount++ - select { - case <-ctx.Done(): - return - case <-time.After(time.Duration(retryCount) * time.Second): - continue - } + // Пропускаем heartbeat сообщения + if msg.Message.Content == "__heartbeat__" { + continue } - retryCount = 0 // Сброс счетчика при успешном подключении - - for { - msg, err := stream.Recv() - if err != nil { - log.Printf("Stream receive error for user %d: %v", userID, err) - break - } - - // Пропускаем heartbeat сообщения - if msg.Message.GetContent() == "__heartbeat__" { - continue - } - - select { - case <-ctx.Done(): - return - case messageChan <- protoMessageToDomain(msg.Message): - // Сообщение успешно отправлено в канал - } - } - - // Пауза перед повторной попыткой select { case <-ctx.Done(): return - case <-time.After(1 * time.Second): - retryCount++ + case messageChan <- protoMessageToDomain(msg.Message): } } }() @@ -186,9 +160,6 @@ func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (< // Преобразование proto-сообщения в domain-модель func protoMessageToDomain(msg *proto.Message) *domain.Message { - if msg == nil { - return nil - } return &domain.Message{ ID: int(msg.Id), ChatID: int(msg.ChatId), diff --git a/internal/http/server.go b/internal/http/server.go index 943548e..fbf19b0 100644 --- a/internal/http/server.go +++ b/internal/http/server.go @@ -5,10 +5,9 @@ import ( "database/sql" "github.com/99designs/gqlgen/graphql/handler" "github.com/99designs/gqlgen/graphql/handler/transport" - "github.com/99designs/gqlgen/graphql/playground" "github.com/go-chi/chi/v5" - "github.com/gorilla/websocket" "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/vektah/gqlparser/v2/ast" "log" "net/http" "os" @@ -72,26 +71,29 @@ func (s *Server) configureRouter() { Resolvers: resolver, })) - wsTransport := transport.Websocket{ - Upgrader: websocket.Upgrader{ - CheckOrigin: func(r *http.Request) bool { - origin := r.Header.Get("Origin") - for _, allowed := range allowedOrigins { - if origin == allowed { - return true - } - } - return false - }, - }, - } + srv.AddTransport(&transport.POST{}) + srv.AddTransport(&transport.GET{}) + srv.AddTransport(&transport.Options{}) - srv.AddTransport(&wsTransport) + // Настройка SSE транспорта (правильный способ) + sseTransport := &transport.SSE{} + srv.AddTransport(sseTransport) - s.router.Handle("/", playground.Handler("GraphQL playground", "/query")) - s.router.Handle("/query", srv) - s.router.Handle("/uploads/*", http.StripPrefix("/uploads/", http.FileServer(http.Dir("./uploads")))) + // Простейший кеш запросов (альтернатива MemoryCache) + srv.SetQueryCache(&NoopCache{}) +}) + +s.router.Handle("/", playground.Handler("GraphQL playground", "/query")) +s.router.Handle("/query", srv) +s.router.Handle("/uploads/*", http.StripPrefix("/uploads/", http.FileServer(http.Dir("./uploads")))) } +type NoopCache struct{} + +func (n *NoopCache) Get(ctx context.Context, key string) (*ast.QueryDocument, bool) { + return nil, false +} + +func (n *NoopCache) Add(ctx context.Context, key string, value *ast.QueryDocument) {} func (s *Server) configureMetrics() { metricsRouter := chi.NewRouter()