v0.0.18.7 проверка сообщения на nil в messageStream
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
madipo2611 2025-08-18 14:37:32 +03:00
parent 2f822df519
commit c5bc174b44

View File

@ -125,14 +125,7 @@ func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<
return nil, fmt.Errorf("user not authenticated") return nil, fmt.Errorf("user not authenticated")
} }
stream, err := r.MessageClient.StreamMessages(ctx, &proto.StreamMessagesRequest{ messageChan := make(chan *domain.Message, 10) // Буферизированный канал
UserId: int32(userID),
})
if err != nil {
return nil, fmt.Errorf("failed to stream messages: %w", err)
}
messageChan := make(chan *domain.Message)
go func() { go func() {
defer func() { defer func() {
close(messageChan) close(messageChan)
@ -141,26 +134,47 @@ func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<
} }
}() }()
// Создаем новый контекст с таймаутом для переподключения
retryCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
default: default:
stream, err := r.MessageClient.StreamMessages(retryCtx, &proto.StreamMessagesRequest{
UserId: int32(userID),
})
if err != nil {
log.Printf("Failed to create stream: %v", err)
time.Sleep(1 * time.Second)
continue
}
for {
msg, err := stream.Recv() msg, err := stream.Recv()
if err != nil { if err != nil {
log.Printf("Stream receive error: %v", err) log.Printf("Stream receive error: %v", err)
return break // Выходим из внутреннего цикла для переподключения
} }
// Добавляем проверку на nil сообщение // Полная проверка сообщения
if msg == nil || msg.Message == nil { if msg == nil || msg.Message == nil || msg.Message.Id == 0 {
continue
}
domainMsg := protoMessageToDomain(msg.Message)
if domainMsg.ID == 0 || domainMsg.Content == "" {
continue continue
} }
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case messageChan <- protoMessageToDomain(msg.Message): case messageChan <- domainMsg:
// Успешно отправили сообщение
}
} }
} }
} }
@ -172,7 +186,12 @@ func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<
// Преобразование proto-сообщения в domain-модель // Преобразование proto-сообщения в domain-модель
func protoMessageToDomain(msg *proto.Message) *domain.Message { func protoMessageToDomain(msg *proto.Message) *domain.Message {
if msg == nil { if msg == nil {
return &domain.Message{} return &domain.Message{} // Возвращаем пустую структуру вместо nil
}
createdAt := time.Now()
if msg.CreatedAt != nil {
createdAt = msg.CreatedAt.AsTime()
} }
return &domain.Message{ return &domain.Message{
@ -182,7 +201,7 @@ func protoMessageToDomain(msg *proto.Message) *domain.Message {
ReceiverID: int(msg.ReceiverId), ReceiverID: int(msg.ReceiverId),
Content: msg.Content, Content: msg.Content,
Status: msg.Status, Status: msg.Status,
CreatedAt: msg.CreatedAt.AsTime(), CreatedAt: createdAt,
} }
} }