v.0.0.3
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
madipo2611 2025-08-17 10:24:00 +03:00
parent bfd785ed5c
commit ef5e8b1172

View File

@ -347,15 +347,12 @@ func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.M
// Объявляем очередь
_, err = ch.QueueDeclare(
queueName,
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
amqp.Table{
"x-message-ttl": int32(3600000),
"x-expires": int32(86400000),
},
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return fmt.Errorf("failed to declare queue: %v", err)
@ -363,34 +360,23 @@ func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.M
// Подписываемся на очередь
msgs, err := ch.Consume(
queueName,
"", // consumer
false, // auto-ack (false для ручного подтверждения)
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
queueName, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return fmt.Errorf("failed to register consumer: %v", err)
}
// Таймер для heartbeat
heartbeatTicker := time.NewTicker(15 * time.Second)
defer heartbeatTicker.Stop()
for {
select {
case <-stream.Context().Done():
log.Printf("Client disconnected, user %d", req.UserId)
return nil
case <-heartbeatTicker.C:
// Отправляем heartbeat
if err := stream.Send(&proto.MessageResponse{
Message: &proto.Message{Content: "__heartbeat__"},
}); err != nil {
return err
}
case d, ok := <-msgs:
if !ok {
return nil
@ -398,24 +384,18 @@ func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.M
var msg proto.Message
if err := json.Unmarshal(d.Body, &msg); err != nil {
d.Nack(false, true) // Возвращаем в очередь при ошибке
continue
}
if msg.Content == "__heartbeat__" {
d.Ack(false)
log.Printf("Failed to unmarshal message: %v", err)
continue
}
log.Printf("Sending message to user %d: %+v", req.UserId, msg)
if err := stream.Send(&proto.MessageResponse{Message: &msg}); err != nil {
d.Nack(false, true) // Возвращаем в очередь при ошибке
return err
}
d.Ack(false) // Подтверждаем обработку
}
}
}
func main() {
// Инициализация подключения к БД
pool, err := pgxpool.Connect(context.Background(), "postgres://tailly_v2:i0Oq%2675LA%26M612ceuy@79.174.89.104:15452/tailly_v2")