v.0.0.3.3 Замена ws на SSE
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
madipo2611 2025-08-17 08:55:54 +03:00
parent 42156dcb5b
commit bfd785ed5c

View File

@ -337,22 +337,15 @@ func (s *server) UpdateMessageStatus(ctx context.Context, req *proto.UpdateMessa
func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error {
log.Printf("Starting message stream for user %d", req.UserId)
// Создаем канал с подтверждением сообщений
ch, err := s.rabbitConn.Channel()
if err != nil {
return fmt.Errorf("failed to open RabbitMQ channel: %v", err)
}
defer ch.Close()
// Настройка QoS
err = ch.Qos(1, 0, false)
if err != nil {
return fmt.Errorf("failed to set QoS: %v", err)
}
queueName := fmt.Sprintf("user_%d_messages", req.UserId)
// Объявляем очередь с параметрами
// Объявляем очередь
_, err = ch.QueueDeclare(
queueName,
true, // durable
@ -360,21 +353,19 @@ func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.M
false, // exclusive
false, // no-wait
amqp.Table{
"x-message-ttl": int32(3600000), // 1 hour
"x-expires": int32(86400000), // 24 hours
"x-dead-letter-exchange": "amq.direct",
"x-dead-letter-routing-key": queueName,
"x-message-ttl": int32(3600000),
"x-expires": int32(86400000),
},
)
if err != nil {
return fmt.Errorf("failed to declare queue: %v", err)
}
// Подписываемся с ручным подтверждением
// Подписываемся на очередь
msgs, err := ch.Consume(
queueName,
"", // consumer
false, // auto-ack
false, // auto-ack (false для ручного подтверждения)
false, // exclusive
false, // no-local
false, // no-wait
@ -388,49 +379,36 @@ func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.M
heartbeatTicker := time.NewTicker(15 * time.Second)
defer heartbeatTicker.Stop()
// Основной цикл обработки
for {
select {
case <-stream.Context().Done():
log.Printf("Client %d disconnected gracefully", req.UserId)
log.Printf("Client disconnected, user %d", req.UserId)
return nil
case <-heartbeatTicker.C:
// Отправляем heartbeat
if err := stream.Send(&proto.MessageResponse{
Message: &proto.Message{
Content: "__heartbeat__",
},
Message: &proto.Message{Content: "__heartbeat__"},
}); err != nil {
log.Printf("Heartbeat failed for user %d: %v", req.UserId, err)
return err
}
case d, ok := <-msgs:
if !ok {
log.Printf("RabbitMQ channel closed for user %d", req.UserId)
return nil
}
var msg proto.Message
if err := json.Unmarshal(d.Body, &msg); err != nil {
log.Printf("Failed to unmarshal message: %v", err)
d.Nack(false, true) // Повторная попытка
d.Nack(false, true) // Возвращаем в очередь при ошибке
continue
}
// Игнорируем heartbeat
if msg.Content == "__heartbeat__" {
d.Ack(false)
continue
}
log.Printf("Processing message for user %d: %+v", req.UserId, msg)
// Отправляем сообщение клиенту
if err := stream.Send(&proto.MessageResponse{Message: &msg}); err != nil {
log.Printf("Failed to send to client: %v", err)
d.Nack(false, true) // Возвращаем в очередь
d.Nack(false, true) // Возвращаем в очередь при ошибке
return err
}
@ -438,7 +416,6 @@ func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.M
}
}
}
func main() {
// Инициализация подключения к БД
pool, err := pgxpool.Connect(context.Background(), "postgres://tailly_v2:i0Oq%2675LA%26M612ceuy@79.174.89.104:15452/tailly_v2")