v0.0.18.4 Фикс активного соединения
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
parent
903c8bb2ff
commit
f9e9347389
@ -3,6 +3,7 @@ package graph
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"tailly_back_v2/internal/domain"
|
||||
"tailly_back_v2/proto"
|
||||
"time"
|
||||
@ -120,25 +121,62 @@ func (r *queryResolver) GetUserChats(ctx context.Context, userID int) ([]*domain
|
||||
|
||||
// MessageStream реализация подписки на новые сообщения
|
||||
func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<-chan *domain.Message, error) {
|
||||
stream, err := r.MessageClient.StreamMessages(ctx, &proto.StreamMessagesRequest{
|
||||
UserId: int32(userID),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to stream messages: %w", err)
|
||||
}
|
||||
messageChan := make(chan *domain.Message, 10) // Буферизованный канал
|
||||
|
||||
messageChan := make(chan *domain.Message)
|
||||
go func() {
|
||||
defer close(messageChan)
|
||||
retryCount := 0
|
||||
maxRetries := 5
|
||||
|
||||
for {
|
||||
msg, err := stream.Recv()
|
||||
if err != nil {
|
||||
if retryCount >= maxRetries {
|
||||
log.Printf("Max retries (%d) reached for user %d", maxRetries, userID)
|
||||
return
|
||||
}
|
||||
|
||||
// Создаем новое соединение
|
||||
stream, err := r.MessageClient.StreamMessages(ctx, &proto.StreamMessagesRequest{
|
||||
UserId: int32(userID),
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("Stream connection error for user %d: %v (retry %d)", userID, err, retryCount)
|
||||
retryCount++
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(time.Duration(retryCount) * time.Second):
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
retryCount = 0 // Сброс счетчика при успешном подключении
|
||||
|
||||
for {
|
||||
msg, err := stream.Recv()
|
||||
if err != nil {
|
||||
log.Printf("Stream receive error for user %d: %v", userID, err)
|
||||
break
|
||||
}
|
||||
|
||||
// Пропускаем heartbeat сообщения
|
||||
if msg.Message.GetContent() == "__heartbeat__" {
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case messageChan <- protoMessageToDomain(msg.Message):
|
||||
// Сообщение успешно отправлено в канал
|
||||
}
|
||||
}
|
||||
|
||||
// Пауза перед повторной попыткой
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case messageChan <- protoMessageToDomain(msg.Message):
|
||||
case <-time.After(1 * time.Second):
|
||||
retryCount++
|
||||
}
|
||||
}
|
||||
}()
|
||||
@ -148,6 +186,9 @@ func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<
|
||||
|
||||
// Преобразование proto-сообщения в domain-модель
|
||||
func protoMessageToDomain(msg *proto.Message) *domain.Message {
|
||||
if msg == nil {
|
||||
return nil
|
||||
}
|
||||
return &domain.Message{
|
||||
ID: int(msg.Id),
|
||||
ChatID: int(msg.ChatId),
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user