This commit is contained in:
parent
ef5e8b1172
commit
b0dd54aa28
7
.idea/sqldialects.xml
generated
7
.idea/sqldialects.xml
generated
@ -1,7 +0,0 @@
|
|||||||
<?xml version="1.0" encoding="UTF-8"?>
|
|
||||||
<project version="4">
|
|
||||||
<component name="SqlDialectMappings">
|
|
||||||
<file url="file://$PROJECT_DIR$/migrations/0001_initial_schema.down.sql" dialect="GenericSQL" />
|
|
||||||
<file url="file://$PROJECT_DIR$/migrations/0001_initial_schema.up.sql" dialect="GenericSQL" />
|
|
||||||
</component>
|
|
||||||
</project>
|
|
||||||
177
server.go
177
server.go
@ -12,6 +12,7 @@ import (
|
|||||||
"google.golang.org/protobuf/types/known/timestamppb"
|
"google.golang.org/protobuf/types/known/timestamppb"
|
||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
|
"sync"
|
||||||
"tailly_messages/proto"
|
"tailly_messages/proto"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@ -20,6 +21,7 @@ type server struct {
|
|||||||
proto.UnimplementedMessageServiceServer
|
proto.UnimplementedMessageServiceServer
|
||||||
db *pgxpool.Pool
|
db *pgxpool.Pool
|
||||||
rabbitConn *amqp.Connection
|
rabbitConn *amqp.Connection
|
||||||
|
mu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewServer(db *pgxpool.Pool, rabbitConn *amqp.Connection) *server {
|
func NewServer(db *pgxpool.Pool, rabbitConn *amqp.Connection) *server {
|
||||||
@ -137,33 +139,36 @@ func (s *server) SendMessage(ctx context.Context, req *proto.SendMessageRequest)
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Публикуем в RabbitMQ
|
var lastErr error
|
||||||
ch, err := s.rabbitConn.Channel()
|
for i := 0; i < 3; i++ {
|
||||||
if err != nil {
|
ch, err := s.rabbitConn.Channel()
|
||||||
return nil, fmt.Errorf("failed to open RabbitMQ channel: %v", err)
|
if err != nil {
|
||||||
}
|
lastErr = fmt.Errorf("failed to open channel (attempt %d): %v", i+1, err)
|
||||||
defer ch.Close()
|
time.Sleep(time.Second * time.Duration(i+1))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
queueName := fmt.Sprintf("user_%d_messages", receiverId)
|
queueName := fmt.Sprintf("user_%d_messages", receiverId)
|
||||||
msgBytes, err := json.Marshal(message)
|
msgBytes, _ := json.Marshal(message)
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to marshal message: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = ch.PublishWithContext(ctx,
|
err = ch.PublishWithContext(ctx,
|
||||||
"", // exchange
|
"", // exchange
|
||||||
queueName, // routing key
|
queueName, // routing key
|
||||||
false, // mandatory
|
false, // mandatory
|
||||||
false, // immediate
|
false, // immediate
|
||||||
amqp.Publishing{
|
amqp.Publishing{
|
||||||
ContentType: "application/json",
|
ContentType: "application/json",
|
||||||
Body: msgBytes,
|
Body: msgBytes,
|
||||||
})
|
DeliveryMode: amqp.Persistent, // Сохраняем сообщения на диск
|
||||||
if err != nil {
|
})
|
||||||
return nil, fmt.Errorf("failed to publish message: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &proto.MessageResponse{Message: &message}, nil
|
ch.Close()
|
||||||
|
if err == nil {
|
||||||
|
return &proto.MessageResponse{Message: &message}, nil
|
||||||
|
}
|
||||||
|
lastErr = err
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("failed to publish message after 3 attempts: %v", lastErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func mustMarshal(msg protobuf.Message) []byte {
|
func mustMarshal(msg protobuf.Message) []byte {
|
||||||
@ -335,63 +340,97 @@ func (s *server) UpdateMessageStatus(ctx context.Context, req *proto.UpdateMessa
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error {
|
func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error {
|
||||||
log.Printf("Starting message stream for user %d", req.UserId)
|
const maxRetries = 5
|
||||||
|
retryDelay := time.Second
|
||||||
|
|
||||||
|
for i := 0; i < maxRetries; i++ {
|
||||||
|
err := s.runStream(req, stream)
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Stream error (attempt %d/%d): %v", i+1, maxRetries, err)
|
||||||
|
time.Sleep(retryDelay)
|
||||||
|
retryDelay *= 2
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("max retries (%d) exceeded", maxRetries)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error {
|
||||||
|
ctx := stream.Context()
|
||||||
|
queueName := fmt.Sprintf("user_%d_messages", req.UserId)
|
||||||
|
|
||||||
ch, err := s.rabbitConn.Channel()
|
ch, err := s.rabbitConn.Channel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to open RabbitMQ channel: %v", err)
|
return fmt.Errorf("failed to open channel: %v", err)
|
||||||
}
|
}
|
||||||
defer ch.Close()
|
defer ch.Close()
|
||||||
|
|
||||||
queueName := fmt.Sprintf("user_%d_messages", req.UserId)
|
// Объявляем очередь с persistence
|
||||||
|
|
||||||
// Объявляем очередь
|
|
||||||
_, err = ch.QueueDeclare(
|
_, err = ch.QueueDeclare(
|
||||||
queueName, // name
|
queueName,
|
||||||
true, // durable
|
true, // durable
|
||||||
false, // delete when unused
|
false, // autoDelete
|
||||||
false, // exclusive
|
false, // exclusive
|
||||||
false, // no-wait
|
false, // noWait
|
||||||
nil, // arguments
|
amqp.Table{
|
||||||
|
"x-message-ttl": int32(86400000), // 24 часа TTL
|
||||||
|
},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to declare queue: %v", err)
|
return fmt.Errorf("failed to declare queue: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Подписываемся на очередь
|
// QoS для контроля скорости обработки
|
||||||
msgs, err := ch.Consume(
|
err = ch.Qos(
|
||||||
queueName, // queue
|
1, // prefetch count
|
||||||
"", // consumer
|
0, // prefetch size
|
||||||
true, // auto-ack
|
false, // global
|
||||||
false, // exclusive
|
|
||||||
false, // no-local
|
|
||||||
false, // no-wait
|
|
||||||
nil, // args
|
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to register consumer: %v", err)
|
return fmt.Errorf("failed to set QoS: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
msgs, err := ch.Consume(
|
||||||
|
queueName,
|
||||||
|
"", // consumer
|
||||||
|
false, // auto-ack (false для ручного подтверждения)
|
||||||
|
false, // exclusive
|
||||||
|
false, // noLocal
|
||||||
|
false, // noWait
|
||||||
|
nil, // args
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to consume: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Starting message stream for user %d", req.UserId)
|
||||||
|
defer log.Printf("Stopping message stream for user %d", req.UserId)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-stream.Context().Done():
|
case <-ctx.Done():
|
||||||
log.Printf("Client disconnected, user %d", req.UserId)
|
|
||||||
return nil
|
return nil
|
||||||
case d, ok := <-msgs:
|
case d, ok := <-msgs:
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil
|
return fmt.Errorf("message channel closed")
|
||||||
}
|
}
|
||||||
|
|
||||||
var msg proto.Message
|
var msg proto.Message
|
||||||
if err := json.Unmarshal(d.Body, &msg); err != nil {
|
if err := json.Unmarshal(d.Body, &msg); err != nil {
|
||||||
log.Printf("Failed to unmarshal message: %v", err)
|
log.Printf("Failed to unmarshal message: %v", err)
|
||||||
|
d.Nack(false, false) // Отбрасываем некорректное сообщение
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("Sending message to user %d: %+v", req.UserId, msg)
|
log.Printf("Sending message to user %d: %+v", req.UserId, msg)
|
||||||
if err := stream.Send(&proto.MessageResponse{Message: &msg}); err != nil {
|
if err := stream.Send(&proto.MessageResponse{Message: &msg}); err != nil {
|
||||||
|
d.Nack(false, true) // Возвращаем в очередь при ошибке отправки
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
d.Ack(false) // Подтверждаем обработку
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -405,12 +444,50 @@ func main() {
|
|||||||
defer pool.Close()
|
defer pool.Close()
|
||||||
|
|
||||||
// Инициализация подключения к RabbitMQ
|
// Инициализация подключения к RabbitMQ
|
||||||
rabbitConn, err := amqp.Dial("amqp://tailly_rabbitmq:o2p2S80MPbl27LUU@89.104.69.222:5673/")
|
var rabbitConn *amqp.Connection
|
||||||
|
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
rabbitConn, err = amqp.DialConfig("amqp://tailly_rabbitmq:o2p2S80MPbl27LUU@89.104.69.222:5673/", amqp.Config{
|
||||||
|
Heartbeat: 10 * time.Second,
|
||||||
|
Locale: "en_US",
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
log.Printf("Failed to connect to RabbitMQ (attempt %d): %v", i+1, err)
|
||||||
|
time.Sleep(time.Second * time.Duration(i+1))
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
|
log.Fatalf("Failed to connect to RabbitMQ after 5 attempts: %v", err)
|
||||||
}
|
}
|
||||||
defer rabbitConn.Close()
|
defer rabbitConn.Close()
|
||||||
|
|
||||||
|
// Обработка событий соединения
|
||||||
|
go func(conn **amqp.Connection, mu *sync.Mutex) {
|
||||||
|
for {
|
||||||
|
reason, ok := <-(*conn).NotifyClose(make(chan *amqp.Error))
|
||||||
|
if !ok {
|
||||||
|
log.Println("RabbitMQ connection closed")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
log.Printf("RabbitMQ connection closed: %v", reason)
|
||||||
|
|
||||||
|
// Попытка переподключения
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
time.Sleep(time.Second * time.Duration(i+1))
|
||||||
|
newConn, err := amqp.Dial("amqp://tailly_rabbitmq:o2p2S80MPbl27LUU@89.104.69.222:5673/")
|
||||||
|
if err == nil {
|
||||||
|
mu.Lock()
|
||||||
|
*conn = newConn
|
||||||
|
mu.Unlock()
|
||||||
|
log.Println("Successfully reconnected to RabbitMQ")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
log.Printf("Failed to reconnect to RabbitMQ (attempt %d): %v", i+1, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(&rabbitConn, &sync.Mutex{})
|
||||||
|
|
||||||
// Создаем gRPC сервер
|
// Создаем gRPC сервер
|
||||||
grpcServer := grpc.NewServer()
|
grpcServer := grpc.NewServer()
|
||||||
proto.RegisterMessageServiceServer(grpcServer, NewServer(pool, rabbitConn))
|
proto.RegisterMessageServiceServer(grpcServer, NewServer(pool, rabbitConn))
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user