v0.0.18.3 Фикс переподключения к rabbit
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
madipo2611 2025-08-17 15:49:08 +03:00
parent 15ae17189d
commit 7f48fb767f

View File

@ -3,6 +3,7 @@ package graph
import ( import (
"context" "context"
"fmt" "fmt"
"log"
"tailly_back_v2/internal/domain" "tailly_back_v2/internal/domain"
"tailly_back_v2/proto" "tailly_back_v2/proto"
"time" "time"
@ -120,20 +121,30 @@ func (r *queryResolver) GetUserChats(ctx context.Context, userID int) ([]*domain
// MessageStream реализация подписки на новые сообщения // MessageStream реализация подписки на новые сообщения
func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<-chan *domain.Message, error) { func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<-chan *domain.Message, error) {
messageChan := make(chan *domain.Message)
go func() {
defer close(messageChan)
for {
stream, err := r.MessageClient.StreamMessages(ctx, &proto.StreamMessagesRequest{ stream, err := r.MessageClient.StreamMessages(ctx, &proto.StreamMessagesRequest{
UserId: int32(userID), UserId: int32(userID),
}) })
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to stream messages: %w", err) log.Printf("Failed to stream messages: %v", err)
select {
case <-ctx.Done():
return
case <-time.After(5 * time.Second): // Задержка перед повторной попыткой
continue
}
} }
messageChan := make(chan *domain.Message)
go func() {
defer close(messageChan)
for { for {
msg, err := stream.Recv() msg, err := stream.Recv()
if err != nil { if err != nil {
return log.Printf("Stream receive error: %v", err)
break
} }
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -141,6 +152,7 @@ func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<
case messageChan <- protoMessageToDomain(msg.Message): case messageChan <- protoMessageToDomain(msg.Message):
} }
} }
}
}() }()
return messageChan, nil return messageChan, nil