From d48eb31edc856743e0f5739382bdbdbcf2164e7e Mon Sep 17 00:00:00 2001 From: madipo2611 Date: Sun, 17 Aug 2025 16:01:29 +0300 Subject: [PATCH] =?UTF-8?q?v0.0.18.4=20=D0=A4=D0=B8=D0=BA=D1=81=20=D0=BF?= =?UTF-8?q?=D0=B5=D1=80=D0=B5=D0=BF=D0=BE=D0=B4=D0=BA=D0=BB=D1=8E=D1=87?= =?UTF-8?q?=D0=B5=D0=BD=D0=B8=D1=8F=20=D0=BA=20rabbit?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/http/graph/messages_resolvers.go | 17 +++++++++++++++-- internal/http/server.go | 2 ++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/internal/http/graph/messages_resolvers.go b/internal/http/graph/messages_resolvers.go index 8fb0a51..b655919 100644 --- a/internal/http/graph/messages_resolvers.go +++ b/internal/http/graph/messages_resolvers.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log" + "math" "tailly_back_v2/internal/domain" "tailly_back_v2/proto" "time" @@ -126,16 +127,25 @@ func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (< go func() { defer close(messageChan) + retryDelay := time.Second + for { - stream, err := r.MessageClient.StreamMessages(ctx, &proto.StreamMessagesRequest{ + // Создаем новый контекст с таймаутом + streamCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + + stream, err := r.MessageClient.StreamMessages(streamCtx, &proto.StreamMessagesRequest{ UserId: int32(userID), }) + if err != nil { + cancel() log.Printf("Failed to stream messages: %v", err) + select { case <-ctx.Done(): return - case <-time.After(5 * time.Second): // Задержка перед повторной попыткой + case <-time.After(retryDelay): + retryDelay = time.Duration(math.Min(float64(retryDelay*2), float64(30*time.Second))) continue } } @@ -143,11 +153,14 @@ func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (< for { msg, err := stream.Recv() if err != nil { + cancel() log.Printf("Stream receive error: %v", err) break } + select { case <-ctx.Done(): + cancel() return case messageChan <- protoMessageToDomain(msg.Message): } diff --git a/internal/http/server.go b/internal/http/server.go index 943548e..e360098 100644 --- a/internal/http/server.go +++ b/internal/http/server.go @@ -17,6 +17,7 @@ import ( "tailly_back_v2/internal/http/middleware" "tailly_back_v2/internal/service" "tailly_back_v2/pkg/auth" + "time" ) type Server struct { @@ -84,6 +85,7 @@ func (s *Server) configureRouter() { return false }, }, + KeepAlivePingInterval: 10 * time.Second, } srv.AddTransport(&wsTransport)