From b0dd54aa2801809dc24b523d07ba94fab917ffd8 Mon Sep 17 00:00:00 2001 From: madipo2611 Date: Sun, 17 Aug 2025 10:29:32 +0300 Subject: [PATCH] v.0.0.3.1 --- .idea/sqldialects.xml | 7 -- server.go | 177 ++++++++++++++++++++++++++++++------------ 2 files changed, 127 insertions(+), 57 deletions(-) delete mode 100644 .idea/sqldialects.xml diff --git a/.idea/sqldialects.xml b/.idea/sqldialects.xml deleted file mode 100644 index c746b56..0000000 --- a/.idea/sqldialects.xml +++ /dev/null @@ -1,7 +0,0 @@ - - - - - - - \ No newline at end of file diff --git a/server.go b/server.go index b095b8d..b918343 100644 --- a/server.go +++ b/server.go @@ -12,6 +12,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" "log" "net" + "sync" "tailly_messages/proto" "time" ) @@ -20,6 +21,7 @@ type server struct { proto.UnimplementedMessageServiceServer db *pgxpool.Pool rabbitConn *amqp.Connection + mu sync.Mutex } func NewServer(db *pgxpool.Pool, rabbitConn *amqp.Connection) *server { @@ -137,33 +139,36 @@ func (s *server) SendMessage(ctx context.Context, req *proto.SendMessageRequest) return nil, err } - // Публикуем в RabbitMQ - ch, err := s.rabbitConn.Channel() - if err != nil { - return nil, fmt.Errorf("failed to open RabbitMQ channel: %v", err) - } - defer ch.Close() + var lastErr error + for i := 0; i < 3; i++ { + ch, err := s.rabbitConn.Channel() + if err != nil { + lastErr = fmt.Errorf("failed to open channel (attempt %d): %v", i+1, err) + time.Sleep(time.Second * time.Duration(i+1)) + continue + } - queueName := fmt.Sprintf("user_%d_messages", receiverId) - msgBytes, err := json.Marshal(message) - if err != nil { - return nil, fmt.Errorf("failed to marshal message: %v", err) - } + queueName := fmt.Sprintf("user_%d_messages", receiverId) + msgBytes, _ := json.Marshal(message) - err = ch.PublishWithContext(ctx, - "", // exchange - queueName, // routing key - false, // mandatory - false, // immediate - amqp.Publishing{ - ContentType: "application/json", - Body: msgBytes, - }) - if err != nil { - return nil, fmt.Errorf("failed to publish message: %v", err) - } + err = ch.PublishWithContext(ctx, + "", // exchange + queueName, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: msgBytes, + DeliveryMode: amqp.Persistent, // Сохраняем сообщения на диск + }) - return &proto.MessageResponse{Message: &message}, nil + ch.Close() + if err == nil { + return &proto.MessageResponse{Message: &message}, nil + } + lastErr = err + } + return nil, fmt.Errorf("failed to publish message after 3 attempts: %v", lastErr) } func mustMarshal(msg protobuf.Message) []byte { @@ -335,63 +340,97 @@ func (s *server) UpdateMessageStatus(ctx context.Context, req *proto.UpdateMessa } func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error { - log.Printf("Starting message stream for user %d", req.UserId) + const maxRetries = 5 + retryDelay := time.Second + + for i := 0; i < maxRetries; i++ { + err := s.runStream(req, stream) + if err == nil { + return nil + } + + log.Printf("Stream error (attempt %d/%d): %v", i+1, maxRetries, err) + time.Sleep(retryDelay) + retryDelay *= 2 + } + + return fmt.Errorf("max retries (%d) exceeded", maxRetries) +} + +func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error { + ctx := stream.Context() + queueName := fmt.Sprintf("user_%d_messages", req.UserId) ch, err := s.rabbitConn.Channel() if err != nil { - return fmt.Errorf("failed to open RabbitMQ channel: %v", err) + return fmt.Errorf("failed to open channel: %v", err) } defer ch.Close() - queueName := fmt.Sprintf("user_%d_messages", req.UserId) - - // Объявляем очередь + // Объявляем очередь с persistence _, err = ch.QueueDeclare( - queueName, // name - true, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments + queueName, + true, // durable + false, // autoDelete + false, // exclusive + false, // noWait + amqp.Table{ + "x-message-ttl": int32(86400000), // 24 часа TTL + }, ) if err != nil { return fmt.Errorf("failed to declare queue: %v", err) } - // Подписываемся на очередь - msgs, err := ch.Consume( - queueName, // queue - "", // consumer - true, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // args + // QoS для контроля скорости обработки + err = ch.Qos( + 1, // prefetch count + 0, // prefetch size + false, // global ) if err != nil { - return fmt.Errorf("failed to register consumer: %v", err) + return fmt.Errorf("failed to set QoS: %v", err) } + msgs, err := ch.Consume( + queueName, + "", // consumer + false, // auto-ack (false для ручного подтверждения) + false, // exclusive + false, // noLocal + false, // noWait + nil, // args + ) + if err != nil { + return fmt.Errorf("failed to consume: %v", err) + } + + log.Printf("Starting message stream for user %d", req.UserId) + defer log.Printf("Stopping message stream for user %d", req.UserId) + for { select { - case <-stream.Context().Done(): - log.Printf("Client disconnected, user %d", req.UserId) + case <-ctx.Done(): return nil case d, ok := <-msgs: if !ok { - return nil + return fmt.Errorf("message channel closed") } var msg proto.Message if err := json.Unmarshal(d.Body, &msg); err != nil { log.Printf("Failed to unmarshal message: %v", err) + d.Nack(false, false) // Отбрасываем некорректное сообщение continue } log.Printf("Sending message to user %d: %+v", req.UserId, msg) if err := stream.Send(&proto.MessageResponse{Message: &msg}); err != nil { + d.Nack(false, true) // Возвращаем в очередь при ошибке отправки return err } + + d.Ack(false) // Подтверждаем обработку } } } @@ -405,12 +444,50 @@ func main() { defer pool.Close() // Инициализация подключения к RabbitMQ - rabbitConn, err := amqp.Dial("amqp://tailly_rabbitmq:o2p2S80MPbl27LUU@89.104.69.222:5673/") + var rabbitConn *amqp.Connection + + for i := 0; i < 5; i++ { + rabbitConn, err = amqp.DialConfig("amqp://tailly_rabbitmq:o2p2S80MPbl27LUU@89.104.69.222:5673/", amqp.Config{ + Heartbeat: 10 * time.Second, + Locale: "en_US", + }) + if err == nil { + break + } + log.Printf("Failed to connect to RabbitMQ (attempt %d): %v", i+1, err) + time.Sleep(time.Second * time.Duration(i+1)) + } if err != nil { - log.Fatalf("Failed to connect to RabbitMQ: %v", err) + log.Fatalf("Failed to connect to RabbitMQ after 5 attempts: %v", err) } defer rabbitConn.Close() + // Обработка событий соединения + go func(conn **amqp.Connection, mu *sync.Mutex) { + for { + reason, ok := <-(*conn).NotifyClose(make(chan *amqp.Error)) + if !ok { + log.Println("RabbitMQ connection closed") + break + } + log.Printf("RabbitMQ connection closed: %v", reason) + + // Попытка переподключения + for i := 0; i < 5; i++ { + time.Sleep(time.Second * time.Duration(i+1)) + newConn, err := amqp.Dial("amqp://tailly_rabbitmq:o2p2S80MPbl27LUU@89.104.69.222:5673/") + if err == nil { + mu.Lock() + *conn = newConn + mu.Unlock() + log.Println("Successfully reconnected to RabbitMQ") + break + } + log.Printf("Failed to reconnect to RabbitMQ (attempt %d): %v", i+1, err) + } + } + }(&rabbitConn, &sync.Mutex{}) + // Создаем gRPC сервер grpcServer := grpc.NewServer() proto.RegisterMessageServiceServer(grpcServer, NewServer(pool, rabbitConn))