v.0.0.3.3 Фикс соединения rabbit
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
madipo2611 2025-08-17 16:08:52 +03:00
parent 043f73f7f8
commit 75025c0c75

View File

@ -8,9 +8,9 @@ import (
"github.com/jackc/pgx/v4/pgxpool"
amqp "github.com/rabbitmq/amqp091-go"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"google.golang.org/protobuf/types/known/timestamppb"
"log"
"math"
"net"
"sync"
"tailly_messages/proto"
@ -322,27 +322,34 @@ func (s *server) UpdateMessageStatus(ctx context.Context, req *proto.UpdateMessa
}
func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error {
const maxRetries = 5
retryDelay := time.Second
ctx := stream.Context()
// Создаем новый контекст с таймаутом для серверной части
serverCtx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()
// Мониторим клиентский контекст
go func() {
<-ctx.Done()
log.Println("Client context canceled, cleaning up...")
cancel()
}()
// Бесконечный цикл для переподключения
for {
err := s.runStream(req, stream)
if err == nil {
return nil
}
log.Printf("Stream error: %v", err)
// Проверяем, не закрыт ли контекст
select {
case <-stream.Context().Done():
case <-serverCtx.Done():
return nil
default:
// Правильный вызов runStream с нужными параметрами
err := s.runStream(serverCtx, req, stream)
if err != nil {
log.Printf("Stream error: %v, reconnecting...", err)
time.Sleep(2 * time.Second)
continue
}
return nil
}
// Экспоненциальная задержка с максимумом
time.Sleep(retryDelay)
retryDelay = time.Duration(math.Min(float64(retryDelay*2), float64(30*time.Second)))
}
}
@ -369,10 +376,15 @@ func (s *server) ensureRabbitConnection() error {
return nil
}
func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error {
ctx := stream.Context()
func (s *server) runStream(ctx context.Context, req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error {
queueName := fmt.Sprintf("user_%d_messages", req.UserId)
ch, err := s.rabbitConn.Channel()
if err != nil {
return fmt.Errorf("failed to open channel: %v", err)
}
defer ch.Close()
for { // Бесконечный цикл для переподключения
select {
case <-ctx.Done():
@ -506,7 +518,15 @@ func main() {
}(&rabbitConn, &sync.Mutex{})
// Создаем gRPC сервер
grpcServer := grpc.NewServer()
grpcServer := grpc.NewServer(
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: 15 * time.Minute,
MaxConnectionAge: 30 * time.Minute,
MaxConnectionAgeGrace: 5 * time.Minute,
Time: 2 * time.Minute,
Timeout: 20 * time.Second,
}),
)
proto.RegisterMessageServiceServer(grpcServer, NewServer(pool, rabbitConn))
// Запускаем сервер