v0.0.19 исправлен timeout в messageStream
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
parent
199e891eec
commit
200d5b09f9
@ -121,37 +121,33 @@ func (r *queryResolver) GetUserChats(ctx context.Context, userID int) ([]*domain
|
||||
|
||||
// MessageStream реализация подписки на новые сообщения
|
||||
func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<-chan *domain.Message, error) {
|
||||
if userID == 0 {
|
||||
return nil, fmt.Errorf("user not authenticated")
|
||||
}
|
||||
messageChan := make(chan *domain.Message, 100) // Увеличиваем буфер
|
||||
|
||||
messageChan := make(chan *domain.Message, 10) // Буферизированный канал
|
||||
go func() {
|
||||
defer func() {
|
||||
close(messageChan)
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("Recovered from panic in MessageStream: %v", r)
|
||||
}
|
||||
}()
|
||||
defer close(messageChan)
|
||||
|
||||
// Создаем новый контекст с таймаутом для переподключения
|
||||
retryCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer cancel()
|
||||
retryDelay := time.Second
|
||||
const maxRetries = 5
|
||||
|
||||
for {
|
||||
for i := 0; i < maxRetries; i++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
stream, err := r.MessageClient.StreamMessages(retryCtx, &proto.StreamMessagesRequest{
|
||||
// Создаем новый контекст БЕЗ таймаута
|
||||
stream, err := r.MessageClient.StreamMessages(ctx, &proto.StreamMessagesRequest{
|
||||
UserId: int32(userID),
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("Failed to create stream: %v", err)
|
||||
time.Sleep(1 * time.Second)
|
||||
log.Printf("Stream connection error (attempt %d/%d): %v", i+1, maxRetries, err)
|
||||
time.Sleep(retryDelay)
|
||||
retryDelay *= 2
|
||||
continue
|
||||
}
|
||||
|
||||
// Сброс задержки при успешном подключении
|
||||
retryDelay = time.Second
|
||||
|
||||
for {
|
||||
msg, err := stream.Recv()
|
||||
if err != nil {
|
||||
@ -159,21 +155,14 @@ func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<
|
||||
break // Выходим из внутреннего цикла для переподключения
|
||||
}
|
||||
|
||||
// Полная проверка сообщения
|
||||
if msg == nil || msg.Message == nil || msg.Message.Id == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
domainMsg := protoMessageToDomain(msg.Message)
|
||||
if domainMsg.ID == 0 || domainMsg.Content == "" {
|
||||
continue
|
||||
if msg == nil || msg.Message == nil {
|
||||
continue // Пропускаем heartbeat/пустые сообщения
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case messageChan <- domainMsg:
|
||||
// Успешно отправили сообщение
|
||||
case messageChan <- protoMessageToDomain(msg.Message):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user