This commit is contained in:
parent
ef5b55bd4e
commit
070712fe59
118
server.go
118
server.go
@ -151,7 +151,6 @@ func (s *server) SendMessage(ctx context.Context, req *proto.SendMessageRequest)
|
||||
queueName := fmt.Sprintf("user_%d_messages", receiverId)
|
||||
msgBytes, _ := json.Marshal(message)
|
||||
|
||||
log.Printf("[DEBUG] Publishing to queue: user_%d", receiverId)
|
||||
err = ch.PublishWithContext(ctx,
|
||||
"", // exchange
|
||||
queueName, // routing key
|
||||
@ -160,15 +159,9 @@ func (s *server) SendMessage(ctx context.Context, req *proto.SendMessageRequest)
|
||||
amqp.Publishing{
|
||||
ContentType: "application/json",
|
||||
Body: msgBytes,
|
||||
DeliveryMode: amqp.Persistent,
|
||||
DeliveryMode: amqp.Persistent, // Сохраняем сообщения на диск
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Printf("[ERROR] Publish failed: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
log.Printf("[DEBUG] Message published successfully")
|
||||
|
||||
ch.Close()
|
||||
if err == nil {
|
||||
return &proto.MessageResponse{Message: &message}, nil
|
||||
@ -346,113 +339,102 @@ func (s *server) UpdateMessageStatus(ctx context.Context, req *proto.UpdateMessa
|
||||
return &proto.MessageResponse{Message: &message}, nil
|
||||
}
|
||||
|
||||
func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error {
|
||||
const maxRetries = 5
|
||||
retryDelay := time.Second
|
||||
|
||||
for i := 0; i < maxRetries; i++ {
|
||||
err := s.runStream(req, stream)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Printf("Stream error (attempt %d/%d): %v", i+1, maxRetries, err)
|
||||
time.Sleep(retryDelay)
|
||||
retryDelay *= 2
|
||||
}
|
||||
|
||||
return fmt.Errorf("max retries (%d) exceeded", maxRetries)
|
||||
}
|
||||
|
||||
func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error {
|
||||
ctx := stream.Context()
|
||||
queueName := fmt.Sprintf("user_%d_messages", req.UserId)
|
||||
|
||||
// 1. Создаем канал с повторными попытками
|
||||
var ch *amqp.Channel
|
||||
for i := 0; i < 3; i++ {
|
||||
var err error
|
||||
s.mu.Lock()
|
||||
ch, err = s.rabbitConn.Channel()
|
||||
s.mu.Unlock()
|
||||
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
log.Printf("Failed to open channel (attempt %d): %v", i+1, err)
|
||||
time.Sleep(time.Second * time.Duration(i+1))
|
||||
}
|
||||
if ch == nil {
|
||||
return fmt.Errorf("failed to open channel after 3 attempts")
|
||||
ch, err := s.rabbitConn.Channel()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open channel: %v", err)
|
||||
}
|
||||
defer ch.Close()
|
||||
|
||||
// 2. Объявляем очередь с проверкой существования
|
||||
_, err := ch.QueueDeclare(
|
||||
// Объявляем очередь с persistence
|
||||
_, err = ch.QueueDeclare(
|
||||
queueName,
|
||||
true, // durable
|
||||
false, // autoDelete
|
||||
false, // exclusive
|
||||
false, // noWait
|
||||
amqp.Table{
|
||||
"x-message-ttl": int32(86400000),
|
||||
"x-message-ttl": int32(86400000), // 24 часа TTL
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to declare queue: %v", err)
|
||||
}
|
||||
|
||||
// 3. Начинаем потребление сообщений
|
||||
deliveries, err := ch.Consume(
|
||||
// QoS для контроля скорости обработки
|
||||
err = ch.Qos(
|
||||
1, // prefetch count
|
||||
0, // prefetch size
|
||||
false, // global
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to set QoS: %v", err)
|
||||
}
|
||||
|
||||
msgs, err := ch.Consume(
|
||||
queueName,
|
||||
"", // consumer tag
|
||||
false, // auto-ack
|
||||
"", // consumer
|
||||
false, // auto-ack (false для ручного подтверждения)
|
||||
false, // exclusive
|
||||
false, // no-local
|
||||
false, // no-wait
|
||||
false, // noLocal
|
||||
false, // noWait
|
||||
nil, // args
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to consume: %v", err)
|
||||
}
|
||||
|
||||
log.Printf("[DEBUG] Started stream for user %d, queue: %s", req.UserId, queueName)
|
||||
log.Printf("Starting message stream for user %d", req.UserId)
|
||||
defer log.Printf("Stopping message stream for user %d", req.UserId)
|
||||
|
||||
// 4. Heartbeat горутина
|
||||
go func() {
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if err := stream.Send(&proto.MessageResponse{
|
||||
Message: &proto.Message{Content: "HEARTBEAT", Status: "system"},
|
||||
}); err != nil {
|
||||
log.Printf("[ERROR] Heartbeat failed for user %d: %v", req.UserId, err)
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// 5. Основной цикл обработки сообщений
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Printf("[DEBUG] Context done for user %d", req.UserId)
|
||||
return nil
|
||||
|
||||
case d, ok := <-deliveries:
|
||||
case d, ok := <-msgs:
|
||||
if !ok {
|
||||
log.Printf("[ERROR] Delivery channel closed for user %d", req.UserId)
|
||||
return fmt.Errorf("delivery channel closed")
|
||||
return fmt.Errorf("message channel closed")
|
||||
}
|
||||
|
||||
var msg proto.Message
|
||||
if err := json.Unmarshal(d.Body, &msg); err != nil {
|
||||
log.Printf("[ERROR] Unmarshal error for user %d: %v", req.UserId, err)
|
||||
_ = d.Nack(false, false)
|
||||
log.Printf("Failed to unmarshal message: %v", err)
|
||||
d.Nack(false, false) // Отбрасываем некорректное сообщение
|
||||
continue
|
||||
}
|
||||
|
||||
log.Printf("[DEBUG] Sending message to user %d: %+v", req.UserId, msg)
|
||||
log.Printf("Sending message to user %d: %+v", req.UserId, msg)
|
||||
if err := stream.Send(&proto.MessageResponse{Message: &msg}); err != nil {
|
||||
log.Printf("[ERROR] Stream send error for user %d: %v", req.UserId, err)
|
||||
_ = d.Nack(false, true)
|
||||
d.Nack(false, true) // Возвращаем в очередь при ошибке отправки
|
||||
return err
|
||||
}
|
||||
|
||||
if err := d.Ack(false); err != nil {
|
||||
log.Printf("[ERROR] Ack failed for user %d: %v", req.UserId, err)
|
||||
}
|
||||
d.Ack(false) // Подтверждаем обработку
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
// Инициализация подключения к БД
|
||||
pool, err := pgxpool.Connect(context.Background(), "postgres://tailly_v2:i0Oq%2675LA%26M612ceuy@79.174.89.104:15452/tailly_v2")
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user