From a2caa4a3a13ec7851a8ba71e591d2543bb29464d Mon Sep 17 00:00:00 2001 From: admin Date: Wed, 4 Mar 2026 11:30:19 +0300 Subject: [PATCH] =?UTF-8?q?=D0=9E=D0=B1=D0=BD=D0=BE=D0=B2=D0=B8=D1=82?= =?UTF-8?q?=D1=8C=20server.go?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server.go | 26 ++++++++++---------------- 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/server.go b/server.go index aeaaf2d..a5b93b9 100644 --- a/server.go +++ b/server.go @@ -84,7 +84,7 @@ func (s *server) CreateChat(ctx context.Context, req *proto.CreateChatRequest) ( errMsg := fmt.Sprintf("One or both users don't exist: user1=%d (%t), user2=%d (%t)", user1, user1Exists, user2, user2Exists) s.logger.Println(errMsg) - return nil, fmt.Errorf("%w", errors.New(errMsg)) // Оборачиваем ошибку + return nil, fmt.Errorf("%w", errors.New(errMsg)) } var chat proto.Chat @@ -182,8 +182,7 @@ func (s *server) SendMessage(ctx context.Context, req *proto.SendMessageRequest) message.CreatedAt = timestamppb.New(createdAt) - // Для RabbitMQ отправляем расшифрованное сообщение (или зашифрованное, в зависимости от требований) - // Здесь отправляем расшифрованное для совместимости с существующими клиентами + decryptedContent, err := s.crypto.DecryptMessage(encryptedContent, encryptedKey, nonce) if err != nil { s.logger.Printf("Failed to decrypt for RabbitMQ: %v", err) @@ -290,7 +289,6 @@ func (s *server) GetChat(ctx context.Context, req *proto.GetChatRequest) (*proto ) if err != nil { s.logger.Printf("Failed to decrypt last message: %v", err) - // Можно вернуть ошибку или пустое сообщение decryptedContent = "[не удалось расшифровать]" } @@ -497,7 +495,6 @@ func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.M ctx, cancel := context.WithTimeout(stream.Context(), 24*time.Hour) defer cancel() - // Механизм переподключения с экспоненциальной задержкой retryDelay := time.Second const maxRetries = 5 @@ -508,12 +505,11 @@ func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.M default: err := s.runStream(ctx, req, stream) if err == nil { - return nil // Успешное завершение + return nil } s.logger.Printf("Stream error (attempt %d/%d): %v", i+1, maxRetries, err) - // Экспоненциальная задержка перед повторной попыткой time.Sleep(retryDelay) retryDelay *= 2 } @@ -545,8 +541,8 @@ func (s *server) runStream(ctx context.Context, req *proto.StreamMessagesRequest false, // exclusive false, // noWait amqp.Table{ - "x-message-ttl": int32(86400000), // 24 hours - "x-expires": int32(86400000), // 24 hours + "x-message-ttl": int32(86400000), + "x-expires": int32(86400000), "x-single-active-consumer": false, }, ) @@ -568,7 +564,7 @@ func (s *server) runStream(ctx context.Context, req *proto.StreamMessagesRequest msgs, err := ch.Consume( queueName, "", // consumer - false, // auto-ack (false для ручного подтверждения) + false, // auto-ack false, // exclusive false, // noLocal false, // noWait @@ -588,7 +584,6 @@ func (s *server) runStream(ctx context.Context, req *proto.StreamMessagesRequest for { select { case <-keepaliveTicker.C: - // Отправляем пустое сообщение как keepalive if err := stream.Send(&proto.MessageResponse{}); err != nil { return err } @@ -597,7 +592,6 @@ func (s *server) runStream(ctx context.Context, req *proto.StreamMessagesRequest return nil case <-heartbeat.C: s.logger.Printf("Sending heartbeat for user %d", req.UserId) - // Send empty message as heartbeat if err := stream.Send(&proto.MessageResponse{}); err != nil { s.logger.Printf("Failed to send heartbeat: %v", err) return err @@ -612,24 +606,24 @@ func (s *server) runStream(ctx context.Context, req *proto.StreamMessagesRequest var msg proto.Message if err := json.Unmarshal(d.Body, &msg); err != nil { s.logger.Printf("Failed to unmarshal message: %v", err) - d.Nack(false, false) // Отбрасываем некорректное сообщение + d.Nack(false, false) continue } if msg.Id == 0 || msg.Content == "" { - d.Ack(false) // Подтверждаем, но не отправляем в стрим + d.Ack(false) continue } s.logger.Printf("Sending message to stream for user %d: %+v", req.UserId, msg) if err := stream.Send(&proto.MessageResponse{Message: &msg}); err != nil { s.logger.Printf("Failed to send message to stream: %v", err) - d.Nack(false, true) // Возвращаем в очередь при ошибке отправки + d.Nack(false, true) return err } s.logger.Printf("Acknowledging message for user %d", req.UserId) - d.Ack(false) // Подтверждаем обработку + d.Ack(false) } } }