v.0.0.3.2 Фикс активного соединения
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
madipo2611 2025-08-17 08:32:07 +03:00
parent b87e0151e9
commit 42156dcb5b

View File

@ -337,13 +337,14 @@ func (s *server) UpdateMessageStatus(ctx context.Context, req *proto.UpdateMessa
func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error {
log.Printf("Starting message stream for user %d", req.UserId)
// Создаем канал с подтверждением сообщений
ch, err := s.rabbitConn.Channel()
if err != nil {
return fmt.Errorf("failed to open RabbitMQ channel: %v", err)
}
defer ch.Close()
// Устанавливаем QoS для предварительной выборки
// Настройка QoS
err = ch.Qos(1, 0, false)
if err != nil {
return fmt.Errorf("failed to set QoS: %v", err)
@ -351,7 +352,7 @@ func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.M
queueName := fmt.Sprintf("user_%d_messages", req.UserId)
// Объявляем очередь с аргументами для heartbeat
// Объявляем очередь с параметрами
_, err = ch.QueueDeclare(
queueName,
true, // durable
@ -359,19 +360,21 @@ func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.M
false, // exclusive
false, // no-wait
amqp.Table{
"x-message-ttl": int32(3600000), // 1 hour TTL
"x-expires": int32(86400000), // 24 hours queue expiration
"x-message-ttl": int32(3600000), // 1 hour
"x-expires": int32(86400000), // 24 hours
"x-dead-letter-exchange": "amq.direct",
"x-dead-letter-routing-key": queueName,
},
)
if err != nil {
return fmt.Errorf("failed to declare queue: %v", err)
}
// Подписываемся на очередь с ручным подтверждением
// Подписываемся с ручным подтверждением
msgs, err := ch.Consume(
queueName,
"", // consumer
false, // auto-ack (false для ручного подтверждения)
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
@ -381,25 +384,28 @@ func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.M
return fmt.Errorf("failed to register consumer: %v", err)
}
// Канал для heartbeat
// Таймер для heartbeat
heartbeatTicker := time.NewTicker(15 * time.Second)
defer heartbeatTicker.Stop()
// Основной цикл обработки
for {
select {
case <-stream.Context().Done():
log.Printf("Client disconnected, user %d", req.UserId)
log.Printf("Client %d disconnected gracefully", req.UserId)
return nil
case <-heartbeatTicker.C:
// Отправляем heartbeat сообщение
// Отправляем heartbeat
if err := stream.Send(&proto.MessageResponse{
Message: &proto.Message{
Content: "__heartbeat__",
},
}); err != nil {
log.Printf("Failed to send heartbeat: %v", err)
log.Printf("Heartbeat failed for user %d: %v", req.UserId, err)
return err
}
case d, ok := <-msgs:
if !ok {
log.Printf("RabbitMQ channel closed for user %d", req.UserId)
@ -409,23 +415,26 @@ func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.M
var msg proto.Message
if err := json.Unmarshal(d.Body, &msg); err != nil {
log.Printf("Failed to unmarshal message: %v", err)
d.Nack(false, true) // Повторная попытка доставки
d.Nack(false, true) // Повторная попытка
continue
}
// Пропускаем heartbeat сообщения
// Игнорируем heartbeat
if msg.Content == "__heartbeat__" {
d.Ack(false)
continue
}
log.Printf("Sending message to user %d: %+v", req.UserId, msg)
log.Printf("Processing message for user %d: %+v", req.UserId, msg)
// Отправляем сообщение клиенту
if err := stream.Send(&proto.MessageResponse{Message: &msg}); err != nil {
d.Nack(false, true) // Сообщение не обработано, возвращаем в очередь
log.Printf("Failed to send to client: %v", err)
d.Nack(false, true) // Возвращаем в очередь
return err
}
d.Ack(false) // Подтверждаем обработку сообщения
d.Ack(false) // Подтверждаем обработку
}
}
}