v.0.0.3.2 Фикс rabbit, добавлен Heartbeat и цикл прослушки канала
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
madipo2611 2025-08-16 20:10:25 +03:00
parent 4766732c82
commit 1924e6fd37

View File

@ -343,18 +343,21 @@ func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.M
const maxRetries = 5
retryDelay := time.Second
for i := 0; i < maxRetries; i++ {
for {
err := s.runStream(req, stream)
if err == nil {
return nil
}
log.Printf("Stream error (attempt %d/%d): %v", i+1, maxRetries, err)
log.Printf("Stream error for user %d: %v", req.UserId, err)
if retryDelay > maxRetries {
return fmt.Errorf("max retries exceeded for user %d", req.UserId)
}
time.Sleep(retryDelay)
retryDelay *= 2
}
return fmt.Errorf("max retries (%d) exceeded", maxRetries)
}
func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error {
@ -367,35 +370,37 @@ func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.Messag
}
defer ch.Close()
// Объявляем очередь с persistence
_, err = ch.QueueDeclare(
// Пассивная проверка существования очереди
_, err = ch.QueueDeclarePassive(
queueName,
true, // durable
false, // autoDelete
false, // exclusive
false, // noWait
amqp.Table{
"x-message-ttl": int32(86400000), // 24 часа TTL
},
nil, // args
)
if err != nil {
return fmt.Errorf("failed to declare queue: %v", err)
}
// QoS для контроля скорости обработки
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
// Если очередь не существует, создаем с TTL
if err != nil {
return fmt.Errorf("failed to set QoS: %v", err)
_, err = ch.QueueDeclare(
queueName,
true, // durable
false, // autoDelete
false, // exclusive
false, // noWait
amqp.Table{
"x-message-ttl": int32(86400000),
},
)
if err != nil {
return fmt.Errorf("failed to declare queue: %v", err)
}
}
msgs, err := ch.Consume(
queueName,
"", // consumer
false, // auto-ack (false для ручного подтверждения)
false, // auto-ack
false, // exclusive
false, // noLocal
false, // noWait
@ -408,6 +413,32 @@ func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.Messag
log.Printf("Starting message stream for user %d", req.UserId)
defer log.Printf("Stopping message stream for user %d", req.UserId)
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.mu.Lock()
err := stream.Send(&proto.MessageResponse{
Message: &proto.Message{
Content: "__heartbeat__",
Status: "system",
},
})
s.mu.Unlock()
if err != nil {
log.Printf("Heartbeat failed for user %d: %v", req.UserId, err)
return
}
case <-ctx.Done():
return
}
}
}()
for {
select {
case <-ctx.Done():
@ -419,18 +450,18 @@ func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.Messag
var msg proto.Message
if err := json.Unmarshal(d.Body, &msg); err != nil {
log.Printf("Failed to unmarshal message: %v", err)
d.Nack(false, false) // Отбрасываем некорректное сообщение
log.Printf("Failed to unmarshal message for user %d: %v", req.UserId, err)
d.Nack(false, false)
continue
}
log.Printf("Sending message to user %d: %+v", req.UserId, msg)
if err := stream.Send(&proto.MessageResponse{Message: &msg}); err != nil {
d.Nack(false, true) // Возвращаем в очередь при ошибке отправки
d.Nack(false, true)
return err
}
d.Ack(false) // Подтверждаем обработку
d.Ack(false)
}
}
}