v.0.0.3.8 добавлен keepaliveTicker в runstream
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
parent
6c83f96add
commit
e41d0de62a
45
server.go
45
server.go
@ -10,6 +10,7 @@ import (
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
"log"
|
||||
@ -432,19 +433,30 @@ func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.M
|
||||
return err
|
||||
}
|
||||
|
||||
const maxRetries = 5
|
||||
// Создаем контекст с увеличенным таймаутом
|
||||
ctx, cancel := context.WithTimeout(stream.Context(), 24*time.Hour)
|
||||
defer cancel()
|
||||
|
||||
// Механизм переподключения с экспоненциальной задержкой
|
||||
retryDelay := time.Second
|
||||
const maxRetries = 5
|
||||
|
||||
for i := 0; i < maxRetries; i++ {
|
||||
s.logger.Printf("Starting stream attempt %d/%d for user %d", i+1, maxRetries, req.UserId)
|
||||
err := s.runStream(req, stream)
|
||||
if err == nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
default:
|
||||
err := s.runStream(ctx, req, stream)
|
||||
if err == nil {
|
||||
return nil // Успешное завершение
|
||||
}
|
||||
|
||||
s.logger.Printf("Stream error (attempt %d/%d): %v", i+1, maxRetries, err)
|
||||
time.Sleep(retryDelay)
|
||||
retryDelay *= 2
|
||||
s.logger.Printf("Stream error (attempt %d/%d): %v", i+1, maxRetries, err)
|
||||
|
||||
// Экспоненциальная задержка перед повторной попыткой
|
||||
time.Sleep(retryDelay)
|
||||
retryDelay *= 2
|
||||
}
|
||||
}
|
||||
|
||||
err := fmt.Errorf("max retries (%d) exceeded", maxRetries)
|
||||
@ -452,8 +464,7 @@ func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.M
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error {
|
||||
ctx := stream.Context()
|
||||
func (s *server) runStream(ctx context.Context, req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error {
|
||||
queueName := fmt.Sprintf("user_%d_messages", req.UserId)
|
||||
|
||||
s.logger.Printf("Opening RabbitMQ channel for queue %s", queueName)
|
||||
@ -512,9 +523,15 @@ func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.Messag
|
||||
|
||||
heartbeat := time.NewTicker(30 * time.Second)
|
||||
defer heartbeat.Stop()
|
||||
|
||||
keepaliveTicker := time.NewTicker(15 * time.Second)
|
||||
defer keepaliveTicker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-keepaliveTicker.C:
|
||||
// Отправляем пустое сообщение как keepalive
|
||||
if err := stream.Send(&proto.MessageResponse{}); err != nil {
|
||||
return err
|
||||
}
|
||||
case <-ctx.Done():
|
||||
s.logger.Printf("Context canceled for user %d: %v", req.UserId, ctx.Err())
|
||||
return nil
|
||||
@ -630,6 +647,12 @@ func main() {
|
||||
|
||||
// Создаем gRPC сервер
|
||||
grpcServer := grpc.NewServer(
|
||||
grpc.KeepaliveParams(keepalive.ServerParameters{
|
||||
MaxConnectionAge: 24 * time.Hour,
|
||||
MaxConnectionAgeGrace: 5 * time.Minute,
|
||||
Time: 30 * time.Second,
|
||||
Timeout: 10 * time.Second,
|
||||
}),
|
||||
grpc.UnaryInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||
logger.Printf("Unary call: %s, request: %+v", info.FullMethod, req)
|
||||
start := time.Now()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user