v.0.0.3.4 добавлен x-single-active-consumer
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
parent
ae91ab0dac
commit
3e4644f3fa
38
server.go
38
server.go
@ -9,8 +9,8 @@ import (
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/status"
|
||||
protobuf "google.golang.org/protobuf/proto"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
"log"
|
||||
"net"
|
||||
@ -173,6 +173,24 @@ func (s *server) SendMessage(ctx context.Context, req *proto.SendMessageRequest)
|
||||
return nil, fmt.Errorf("failed to publish message after 3 attempts: %v", lastErr)
|
||||
}
|
||||
|
||||
func mustMarshal(msg protobuf.Message) []byte {
|
||||
data, err := protobuf.Marshal(msg)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to marshal message: %v", err)
|
||||
}
|
||||
return data
|
||||
}
|
||||
func protoMessageToMap(msg *proto.Message) map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"id": msg.Id,
|
||||
"chatId": msg.ChatId,
|
||||
"senderId": msg.SenderId,
|
||||
"receiverId": msg.ReceiverId,
|
||||
"content": msg.Content,
|
||||
"status": msg.Status,
|
||||
"createdAt": msg.CreatedAt.AsTime().Format(time.RFC3339Nano),
|
||||
}
|
||||
}
|
||||
func (s *server) GetChat(ctx context.Context, req *proto.GetChatRequest) (*proto.ChatResponse, error) {
|
||||
var chat proto.Chat
|
||||
var createdAt, updatedAt time.Time
|
||||
@ -348,12 +366,9 @@ func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.M
|
||||
|
||||
func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error {
|
||||
ctx := stream.Context()
|
||||
log.Printf("ctx: %v", ctx)
|
||||
queueName := fmt.Sprintf("user_%d_messages", req.UserId)
|
||||
log.Printf("queue: %v", queueName)
|
||||
|
||||
ch, err := s.rabbitConn.Channel()
|
||||
log.Printf("ch: %v", ch)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open channel: %v", err)
|
||||
}
|
||||
@ -367,7 +382,8 @@ func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.Messag
|
||||
false, // exclusive
|
||||
false, // noWait
|
||||
amqp.Table{
|
||||
"x-message-ttl": int32(86400000), // 24 часа TTL
|
||||
"x-message-ttl": int32(86400000),
|
||||
"x-single-active-consumer": false,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
@ -396,7 +412,6 @@ func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.Messag
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to consume: %v", err)
|
||||
}
|
||||
log.Printf("msgs: %v", msgs)
|
||||
|
||||
log.Printf("Starting message stream for user %d", req.UserId)
|
||||
defer log.Printf("Stopping message stream for user %d", req.UserId)
|
||||
@ -419,7 +434,6 @@ func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.Messag
|
||||
|
||||
log.Printf("Sending message to user %d: %+v", req.UserId, msg)
|
||||
if err := stream.Send(&proto.MessageResponse{Message: &msg}); err != nil {
|
||||
log.Printf("Failed to send message to user %d: %v", req.UserId, err)
|
||||
d.Nack(false, true) // Возвращаем в очередь при ошибке отправки
|
||||
return err
|
||||
}
|
||||
@ -483,15 +497,7 @@ func main() {
|
||||
}(&rabbitConn, &sync.Mutex{})
|
||||
|
||||
// Создаем gRPC сервер
|
||||
grpcServer := grpc.NewServer(
|
||||
grpc.KeepaliveParams(keepalive.ServerParameters{
|
||||
MaxConnectionIdle: 5 * time.Minute, // Макс время бездействия соединения
|
||||
MaxConnectionAge: 30 * time.Minute, // Макс время жизни соединения
|
||||
MaxConnectionAgeGrace: 5 * time.Second, // Время на завершение операций
|
||||
Time: 30 * time.Second, // Отправлять keepalive каждые 30 сек
|
||||
Timeout: 5 * time.Second, // Ждать ответа 5 сек
|
||||
}),
|
||||
)
|
||||
grpcServer := grpc.NewServer()
|
||||
proto.RegisterMessageServiceServer(grpcServer, NewServer(pool, rabbitConn))
|
||||
|
||||
// Запускаем сервер
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user