v.0.0.3.4 Фикс rabbit
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
madipo2611 2025-08-16 20:21:37 +03:00
parent 1776942388
commit ef5b55bd4e

106
server.go
View File

@ -151,6 +151,7 @@ func (s *server) SendMessage(ctx context.Context, req *proto.SendMessageRequest)
queueName := fmt.Sprintf("user_%d_messages", receiverId)
msgBytes, _ := json.Marshal(message)
log.Printf("[DEBUG] Publishing to queue: user_%d", receiverId)
err = ch.PublishWithContext(ctx,
"", // exchange
queueName, // routing key
@ -159,9 +160,15 @@ func (s *server) SendMessage(ctx context.Context, req *proto.SendMessageRequest)
amqp.Publishing{
ContentType: "application/json",
Body: msgBytes,
DeliveryMode: amqp.Persistent, // Сохраняем сообщения на диск
DeliveryMode: amqp.Persistent,
})
if err != nil {
log.Printf("[ERROR] Publish failed: %v", err)
return nil, err
}
log.Printf("[DEBUG] Message published successfully")
ch.Close()
if err == nil {
return &proto.MessageResponse{Message: &message}, nil
@ -343,56 +350,57 @@ func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.Messag
ctx := stream.Context()
queueName := fmt.Sprintf("user_%d_messages", req.UserId)
ch, err := s.rabbitConn.Channel()
if err != nil {
return fmt.Errorf("failed to open channel: %v", err)
// 1. Создаем канал с повторными попытками
var ch *amqp.Channel
for i := 0; i < 3; i++ {
var err error
s.mu.Lock()
ch, err = s.rabbitConn.Channel()
s.mu.Unlock()
if err == nil {
break
}
log.Printf("Failed to open channel (attempt %d): %v", i+1, err)
time.Sleep(time.Second * time.Duration(i+1))
}
if ch == nil {
return fmt.Errorf("failed to open channel after 3 attempts")
}
defer ch.Close()
// Пассивная проверка существования очереди
_, err = ch.QueueDeclarePassive(
// 2. Объявляем очередь с проверкой существования
_, err := ch.QueueDeclare(
queueName,
true, // durable
false, // autoDelete
false, // exclusive
false, // noWait
nil, // args
amqp.Table{
"x-message-ttl": int32(86400000),
},
)
// Если очередь не существует, создаем с TTL
if err != nil {
_, err = ch.QueueDeclare(
queueName,
true, // durable
false, // autoDelete
false, // exclusive
false, // noWait
amqp.Table{
"x-message-ttl": int32(86400000),
},
)
if err != nil {
return fmt.Errorf("failed to declare queue: %v", err)
}
return fmt.Errorf("failed to declare queue: %v", err)
}
msgs, err := ch.Consume(
// 3. Начинаем потребление сообщений
deliveries, err := ch.Consume(
queueName,
"", // consumer
"", // consumer tag
false, // auto-ack
false, // exclusive
false, // noLocal
false, // noWait
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return fmt.Errorf("failed to consume: %v", err)
}
log.Printf("Starting message stream for user %d", req.UserId)
defer log.Printf("Stopping message stream for user %d", req.UserId)
log.Printf("[DEBUG] Started stream for user %d, queue: %s", req.UserId, queueName)
// Запускаем heartbeat в отдельной горутине
// 4. Heartbeat горутина
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
@ -400,17 +408,10 @@ func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.Messag
for {
select {
case <-ticker.C:
s.mu.Lock()
err := stream.Send(&proto.MessageResponse{
Message: &proto.Message{
Content: "__heartbeat__",
Status: "system",
},
})
s.mu.Unlock()
if err != nil {
log.Printf("Heartbeat failed for user %d: %v", req.UserId, err)
if err := stream.Send(&proto.MessageResponse{
Message: &proto.Message{Content: "HEARTBEAT", Status: "system"},
}); err != nil {
log.Printf("[ERROR] Heartbeat failed for user %d: %v", req.UserId, err)
return
}
case <-ctx.Done():
@ -419,36 +420,39 @@ func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.Messag
}
}()
// Основной цикл обработки сообщений
// 5. Основной цикл обработки сообщений
for {
select {
case <-ctx.Done():
log.Printf("[DEBUG] Context done for user %d", req.UserId)
return nil
case d, ok := <-msgs:
case d, ok := <-deliveries:
if !ok {
log.Printf("Message channel closed for user %d", req.UserId)
return nil
log.Printf("[ERROR] Delivery channel closed for user %d", req.UserId)
return fmt.Errorf("delivery channel closed")
}
var msg proto.Message
if err := json.Unmarshal(d.Body, &msg); err != nil {
log.Printf("Failed to unmarshal message for user %d: %v", req.UserId, err)
d.Nack(false, false)
log.Printf("[ERROR] Unmarshal error for user %d: %v", req.UserId, err)
_ = d.Nack(false, false)
continue
}
log.Printf("Processing message for user %d: %+v", req.UserId, msg)
log.Printf("[DEBUG] Sending message to user %d: %+v", req.UserId, msg)
if err := stream.Send(&proto.MessageResponse{Message: &msg}); err != nil {
log.Printf("Failed to send message to user %d: %v", req.UserId, err)
d.Nack(false, true)
log.Printf("[ERROR] Stream send error for user %d: %v", req.UserId, err)
_ = d.Nack(false, true)
return err
}
d.Ack(false)
if err := d.Ack(false); err != nil {
log.Printf("[ERROR] Ack failed for user %d: %v", req.UserId, err)
}
}
}
}
func main() {
// Инициализация подключения к БД
pool, err := pgxpool.Connect(context.Background(), "postgres://tailly_v2:i0Oq%2675LA%26M612ceuy@79.174.89.104:15452/tailly_v2")