v.0.0.3.2 Фикс соединения rabbit
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
madipo2611 2025-08-17 15:54:59 +03:00
parent b0dd54aa28
commit 043f73f7f8

109
server.go
View File

@ -8,9 +8,9 @@ import (
"github.com/jackc/pgx/v4/pgxpool"
amqp "github.com/rabbitmq/amqp091-go"
"google.golang.org/grpc"
protobuf "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"log"
"math"
"net"
"sync"
"tailly_messages/proto"
@ -171,24 +171,6 @@ func (s *server) SendMessage(ctx context.Context, req *proto.SendMessageRequest)
return nil, fmt.Errorf("failed to publish message after 3 attempts: %v", lastErr)
}
func mustMarshal(msg protobuf.Message) []byte {
data, err := protobuf.Marshal(msg)
if err != nil {
log.Fatalf("failed to marshal message: %v", err)
}
return data
}
func protoMessageToMap(msg *proto.Message) map[string]interface{} {
return map[string]interface{}{
"id": msg.Id,
"chatId": msg.ChatId,
"senderId": msg.SenderId,
"receiverId": msg.ReceiverId,
"content": msg.Content,
"status": msg.Status,
"createdAt": msg.CreatedAt.AsTime().Format(time.RFC3339Nano),
}
}
func (s *server) GetChat(ctx context.Context, req *proto.GetChatRequest) (*proto.ChatResponse, error) {
var chat proto.Chat
var createdAt, updatedAt time.Time
@ -343,31 +325,78 @@ func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.M
const maxRetries = 5
retryDelay := time.Second
for i := 0; i < maxRetries; i++ {
for {
err := s.runStream(req, stream)
if err == nil {
return nil
}
log.Printf("Stream error (attempt %d/%d): %v", i+1, maxRetries, err)
time.Sleep(retryDelay)
retryDelay *= 2
}
log.Printf("Stream error: %v", err)
return fmt.Errorf("max retries (%d) exceeded", maxRetries)
// Проверяем, не закрыт ли контекст
select {
case <-stream.Context().Done():
return nil
default:
}
// Экспоненциальная задержка с максимумом
time.Sleep(retryDelay)
retryDelay = time.Duration(math.Min(float64(retryDelay*2), float64(30*time.Second)))
}
}
func (s *server) ensureRabbitConnection() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.rabbitConn == nil || s.rabbitConn.IsClosed() {
var err error
for i := 0; i < 5; i++ {
s.rabbitConn, err = amqp.DialConfig("amqp://tailly_rabbitmq:o2p2S80MPbl27LUU@89.104.69.222:5673/", amqp.Config{
Heartbeat: 10 * time.Second,
Locale: "en_US",
})
if err == nil {
break
}
time.Sleep(time.Second * time.Duration(i+1))
}
if err != nil {
return fmt.Errorf("failed to reconnect to RabbitMQ: %v", err)
}
}
return nil
}
func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error {
ctx := stream.Context()
queueName := fmt.Sprintf("user_%d_messages", req.UserId)
for { // Бесконечный цикл для переподключения
select {
case <-ctx.Done():
return nil
default:
err := s.processMessages(ctx, queueName, stream)
if err != nil {
log.Printf("Stream error, reconnecting: %v", err)
time.Sleep(time.Second * 2) // Задержка перед повторной попыткой
continue
}
return nil
}
}
}
func (s *server) processMessages(ctx context.Context, queueName string, stream proto.MessageService_StreamMessagesServer) error {
ch, err := s.rabbitConn.Channel()
if err != nil {
return fmt.Errorf("failed to open channel: %v", err)
}
defer ch.Close()
// Объявляем очередь с persistence
// Убедимся, что очередь существует
_, err = ch.QueueDeclare(
queueName,
true, // durable
@ -375,27 +404,17 @@ func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.Messag
false, // exclusive
false, // noWait
amqp.Table{
"x-message-ttl": int32(86400000), // 24 часа TTL
"x-message-ttl": int32(86400000),
},
)
if err != nil {
return fmt.Errorf("failed to declare queue: %v", err)
}
// QoS для контроля скорости обработки
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
if err != nil {
return fmt.Errorf("failed to set QoS: %v", err)
}
msgs, err := ch.Consume(
queueName,
"", // consumer
false, // auto-ack (false для ручного подтверждения)
false, // auto-ack
false, // exclusive
false, // noLocal
false, // noWait
@ -405,8 +424,8 @@ func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.Messag
return fmt.Errorf("failed to consume: %v", err)
}
log.Printf("Starting message stream for user %d", req.UserId)
defer log.Printf("Stopping message stream for user %d", req.UserId)
log.Printf("Starting message stream for user")
defer log.Printf("Stopping message stream for user")
for {
select {
@ -414,23 +433,21 @@ func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.Messag
return nil
case d, ok := <-msgs:
if !ok {
return fmt.Errorf("message channel closed")
return fmt.Errorf("message channel closed, will reconnect")
}
var msg proto.Message
if err := json.Unmarshal(d.Body, &msg); err != nil {
log.Printf("Failed to unmarshal message: %v", err)
d.Nack(false, false) // Отбрасываем некорректное сообщение
d.Nack(false, false)
continue
}
log.Printf("Sending message to user %d: %+v", req.UserId, msg)
if err := stream.Send(&proto.MessageResponse{Message: &msg}); err != nil {
d.Nack(false, true) // Возвращаем в очередь при ошибке отправки
d.Nack(false, true)
return err
}
d.Ack(false) // Подтверждаем обработку
d.Ack(false)
}
}
}