v.0.0.3.1 Фикс соединения rabbit
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
parent
75025c0c75
commit
488bb3fc5f
155
server.go
155
server.go
@ -8,7 +8,7 @@ import (
|
||||
"github.com/jackc/pgx/v4/pgxpool"
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
protobuf "google.golang.org/protobuf/proto"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
"log"
|
||||
"net"
|
||||
@ -171,6 +171,24 @@ func (s *server) SendMessage(ctx context.Context, req *proto.SendMessageRequest)
|
||||
return nil, fmt.Errorf("failed to publish message after 3 attempts: %v", lastErr)
|
||||
}
|
||||
|
||||
func mustMarshal(msg protobuf.Message) []byte {
|
||||
data, err := protobuf.Marshal(msg)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to marshal message: %v", err)
|
||||
}
|
||||
return data
|
||||
}
|
||||
func protoMessageToMap(msg *proto.Message) map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"id": msg.Id,
|
||||
"chatId": msg.ChatId,
|
||||
"senderId": msg.SenderId,
|
||||
"receiverId": msg.ReceiverId,
|
||||
"content": msg.Content,
|
||||
"status": msg.Status,
|
||||
"createdAt": msg.CreatedAt.AsTime().Format(time.RFC3339Nano),
|
||||
}
|
||||
}
|
||||
func (s *server) GetChat(ctx context.Context, req *proto.GetChatRequest) (*proto.ChatResponse, error) {
|
||||
var chat proto.Chat
|
||||
var createdAt, updatedAt time.Time
|
||||
@ -322,61 +340,25 @@ func (s *server) UpdateMessageStatus(ctx context.Context, req *proto.UpdateMessa
|
||||
}
|
||||
|
||||
func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error {
|
||||
const maxRetries = 5
|
||||
retryDelay := time.Second
|
||||
|
||||
for i := 0; i < maxRetries; i++ {
|
||||
err := s.runStream(req, stream)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Printf("Stream error (attempt %d/%d): %v", i+1, maxRetries, err)
|
||||
time.Sleep(retryDelay)
|
||||
retryDelay *= 2
|
||||
}
|
||||
|
||||
return fmt.Errorf("max retries (%d) exceeded", maxRetries)
|
||||
}
|
||||
|
||||
func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error {
|
||||
ctx := stream.Context()
|
||||
|
||||
// Создаем новый контекст с таймаутом для серверной части
|
||||
serverCtx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
// Мониторим клиентский контекст
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
log.Println("Client context canceled, cleaning up...")
|
||||
cancel()
|
||||
}()
|
||||
|
||||
// Бесконечный цикл для переподключения
|
||||
for {
|
||||
select {
|
||||
case <-serverCtx.Done():
|
||||
return nil
|
||||
default:
|
||||
// Правильный вызов runStream с нужными параметрами
|
||||
err := s.runStream(serverCtx, req, stream)
|
||||
if err != nil {
|
||||
log.Printf("Stream error: %v, reconnecting...", err)
|
||||
time.Sleep(2 * time.Second)
|
||||
continue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *server) ensureRabbitConnection() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if s.rabbitConn == nil || s.rabbitConn.IsClosed() {
|
||||
var err error
|
||||
for i := 0; i < 5; i++ {
|
||||
s.rabbitConn, err = amqp.DialConfig("amqp://tailly_rabbitmq:o2p2S80MPbl27LUU@89.104.69.222:5673/", amqp.Config{
|
||||
Heartbeat: 10 * time.Second,
|
||||
Locale: "en_US",
|
||||
})
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Second * time.Duration(i+1))
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to reconnect to RabbitMQ: %v", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *server) runStream(ctx context.Context, req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error {
|
||||
queueName := fmt.Sprintf("user_%d_messages", req.UserId)
|
||||
|
||||
ch, err := s.rabbitConn.Channel()
|
||||
@ -385,30 +367,7 @@ func (s *server) runStream(ctx context.Context, req *proto.StreamMessagesRequest
|
||||
}
|
||||
defer ch.Close()
|
||||
|
||||
for { // Бесконечный цикл для переподключения
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
default:
|
||||
err := s.processMessages(ctx, queueName, stream)
|
||||
if err != nil {
|
||||
log.Printf("Stream error, reconnecting: %v", err)
|
||||
time.Sleep(time.Second * 2) // Задержка перед повторной попыткой
|
||||
continue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *server) processMessages(ctx context.Context, queueName string, stream proto.MessageService_StreamMessagesServer) error {
|
||||
ch, err := s.rabbitConn.Channel()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open channel: %v", err)
|
||||
}
|
||||
defer ch.Close()
|
||||
|
||||
// Убедимся, что очередь существует
|
||||
// Объявляем очередь с persistence
|
||||
_, err = ch.QueueDeclare(
|
||||
queueName,
|
||||
true, // durable
|
||||
@ -416,17 +375,27 @@ func (s *server) processMessages(ctx context.Context, queueName string, stream p
|
||||
false, // exclusive
|
||||
false, // noWait
|
||||
amqp.Table{
|
||||
"x-message-ttl": int32(86400000),
|
||||
"x-message-ttl": int32(86400000), // 24 часа TTL
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to declare queue: %v", err)
|
||||
}
|
||||
|
||||
// QoS для контроля скорости обработки
|
||||
err = ch.Qos(
|
||||
1, // prefetch count
|
||||
0, // prefetch size
|
||||
false, // global
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to set QoS: %v", err)
|
||||
}
|
||||
|
||||
msgs, err := ch.Consume(
|
||||
queueName,
|
||||
"", // consumer
|
||||
false, // auto-ack
|
||||
false, // auto-ack (false для ручного подтверждения)
|
||||
false, // exclusive
|
||||
false, // noLocal
|
||||
false, // noWait
|
||||
@ -436,8 +405,8 @@ func (s *server) processMessages(ctx context.Context, queueName string, stream p
|
||||
return fmt.Errorf("failed to consume: %v", err)
|
||||
}
|
||||
|
||||
log.Printf("Starting message stream for user")
|
||||
defer log.Printf("Stopping message stream for user")
|
||||
log.Printf("Starting message stream for user %d", req.UserId)
|
||||
defer log.Printf("Stopping message stream for user %d", req.UserId)
|
||||
|
||||
for {
|
||||
select {
|
||||
@ -445,21 +414,23 @@ func (s *server) processMessages(ctx context.Context, queueName string, stream p
|
||||
return nil
|
||||
case d, ok := <-msgs:
|
||||
if !ok {
|
||||
return fmt.Errorf("message channel closed, will reconnect")
|
||||
return fmt.Errorf("message channel closed")
|
||||
}
|
||||
|
||||
var msg proto.Message
|
||||
if err := json.Unmarshal(d.Body, &msg); err != nil {
|
||||
log.Printf("Failed to unmarshal message: %v", err)
|
||||
d.Nack(false, false)
|
||||
d.Nack(false, false) // Отбрасываем некорректное сообщение
|
||||
continue
|
||||
}
|
||||
|
||||
log.Printf("Sending message to user %d: %+v", req.UserId, msg)
|
||||
if err := stream.Send(&proto.MessageResponse{Message: &msg}); err != nil {
|
||||
d.Nack(false, true)
|
||||
d.Nack(false, true) // Возвращаем в очередь при ошибке отправки
|
||||
return err
|
||||
}
|
||||
d.Ack(false)
|
||||
|
||||
d.Ack(false) // Подтверждаем обработку
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -518,15 +489,7 @@ func main() {
|
||||
}(&rabbitConn, &sync.Mutex{})
|
||||
|
||||
// Создаем gRPC сервер
|
||||
grpcServer := grpc.NewServer(
|
||||
grpc.KeepaliveParams(keepalive.ServerParameters{
|
||||
MaxConnectionIdle: 15 * time.Minute,
|
||||
MaxConnectionAge: 30 * time.Minute,
|
||||
MaxConnectionAgeGrace: 5 * time.Minute,
|
||||
Time: 2 * time.Minute,
|
||||
Timeout: 20 * time.Second,
|
||||
}),
|
||||
)
|
||||
grpcServer := grpc.NewServer()
|
||||
proto.RegisterMessageServiceServer(grpcServer, NewServer(pool, rabbitConn))
|
||||
|
||||
// Запускаем сервер
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user