v.0.0.3.1 Фикс активного соединения
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
madipo2611 2025-08-16 21:37:18 +03:00
parent 3e4e1bf376
commit b87e0151e9

View File

@ -343,55 +343,89 @@ func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.M
}
defer ch.Close()
// Устанавливаем QoS для предварительной выборки
err = ch.Qos(1, 0, false)
if err != nil {
return fmt.Errorf("failed to set QoS: %v", err)
}
queueName := fmt.Sprintf("user_%d_messages", req.UserId)
// Объявляем очередь
// Объявляем очередь с аргументами для heartbeat
_, err = ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
queueName,
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
amqp.Table{
"x-message-ttl": int32(3600000), // 1 hour TTL
"x-expires": int32(86400000), // 24 hours queue expiration
},
)
if err != nil {
return fmt.Errorf("failed to declare queue: %v", err)
}
// Подписываемся на очередь
// Подписываемся на очередь с ручным подтверждением
msgs, err := ch.Consume(
queueName, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
queueName,
"", // consumer
false, // auto-ack (false для ручного подтверждения)
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return fmt.Errorf("failed to register consumer: %v", err)
}
// Канал для heartbeat
heartbeatTicker := time.NewTicker(15 * time.Second)
defer heartbeatTicker.Stop()
for {
select {
case <-stream.Context().Done():
log.Printf("Client disconnected, user %d", req.UserId)
return nil
case <-heartbeatTicker.C:
// Отправляем heartbeat сообщение
if err := stream.Send(&proto.MessageResponse{
Message: &proto.Message{
Content: "__heartbeat__",
},
}); err != nil {
log.Printf("Failed to send heartbeat: %v", err)
return err
}
case d, ok := <-msgs:
if !ok {
log.Printf("RabbitMQ channel closed for user %d", req.UserId)
return nil
}
var msg proto.Message
if err := json.Unmarshal(d.Body, &msg); err != nil {
log.Printf("Failed to unmarshal message: %v", err)
d.Nack(false, true) // Повторная попытка доставки
continue
}
// Пропускаем heartbeat сообщения
if msg.Content == "__heartbeat__" {
d.Ack(false)
continue
}
log.Printf("Sending message to user %d: %+v", req.UserId, msg)
if err := stream.Send(&proto.MessageResponse{Message: &msg}); err != nil {
d.Nack(false, true) // Сообщение не обработано, возвращаем в очередь
return err
}
d.Ack(false) // Подтверждаем обработку сообщения
}
}
}