This commit is contained in:
parent
1924e6fd37
commit
1776942388
29
server.go
29
server.go
@ -339,27 +339,6 @@ func (s *server) UpdateMessageStatus(ctx context.Context, req *proto.UpdateMessa
|
||||
return &proto.MessageResponse{Message: &message}, nil
|
||||
}
|
||||
|
||||
func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error {
|
||||
const maxRetries = 5
|
||||
retryDelay := time.Second
|
||||
|
||||
for {
|
||||
err := s.runStream(req, stream)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Printf("Stream error for user %d: %v", req.UserId, err)
|
||||
|
||||
if retryDelay > maxRetries {
|
||||
return fmt.Errorf("max retries exceeded for user %d", req.UserId)
|
||||
}
|
||||
|
||||
time.Sleep(retryDelay)
|
||||
retryDelay *= 2
|
||||
}
|
||||
}
|
||||
|
||||
func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error {
|
||||
ctx := stream.Context()
|
||||
queueName := fmt.Sprintf("user_%d_messages", req.UserId)
|
||||
@ -413,6 +392,7 @@ func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.Messag
|
||||
log.Printf("Starting message stream for user %d", req.UserId)
|
||||
defer log.Printf("Stopping message stream for user %d", req.UserId)
|
||||
|
||||
// Запускаем heartbeat в отдельной горутине
|
||||
go func() {
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
defer ticker.Stop()
|
||||
@ -439,13 +419,15 @@ func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.Messag
|
||||
}
|
||||
}()
|
||||
|
||||
// Основной цикл обработки сообщений
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case d, ok := <-msgs:
|
||||
if !ok {
|
||||
return fmt.Errorf("message channel closed")
|
||||
log.Printf("Message channel closed for user %d", req.UserId)
|
||||
return nil
|
||||
}
|
||||
|
||||
var msg proto.Message
|
||||
@ -455,8 +437,9 @@ func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.Messag
|
||||
continue
|
||||
}
|
||||
|
||||
log.Printf("Sending message to user %d: %+v", req.UserId, msg)
|
||||
log.Printf("Processing message for user %d: %+v", req.UserId, msg)
|
||||
if err := stream.Send(&proto.MessageResponse{Message: &msg}); err != nil {
|
||||
log.Printf("Failed to send message to user %d: %v", req.UserId, err)
|
||||
d.Nack(false, true)
|
||||
return err
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user