v.0.0.3.7 Не отправляем пустое сообщение в stream
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
parent
33952dd83b
commit
3efeae50ac
85
server.go
85
server.go
@ -9,8 +9,6 @@ import (
|
||||
"github.com/jackc/pgx/v4/pgxpool"
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
"log"
|
||||
"net"
|
||||
@ -421,35 +419,68 @@ func (s *server) UpdateMessageStatus(ctx context.Context, req *proto.UpdateMessa
|
||||
}
|
||||
|
||||
func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error {
|
||||
s.logRequest("StreamMessages", req)
|
||||
defer func(start time.Time) {
|
||||
s.logger.Printf("StreamMessages execution time: %v", time.Since(start))
|
||||
}(time.Now())
|
||||
ctx := stream.Context()
|
||||
queueName := fmt.Sprintf("user_%d_messages", req.UserId)
|
||||
|
||||
if req.UserId == 0 {
|
||||
err := status.Error(codes.InvalidArgument, "userID cannot be 0")
|
||||
s.logResponse("StreamMessages", nil, err)
|
||||
return err
|
||||
}
|
||||
|
||||
const maxRetries = 5
|
||||
retryDelay := time.Second
|
||||
|
||||
for i := 0; i < maxRetries; i++ {
|
||||
s.logger.Printf("Starting stream attempt %d/%d for user %d", i+1, maxRetries, req.UserId)
|
||||
err := s.runStream(req, stream)
|
||||
if err == nil {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
default:
|
||||
ch, err := s.rabbitConn.Channel()
|
||||
if err != nil {
|
||||
time.Sleep(1 * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
msgs, err := ch.Consume(
|
||||
queueName,
|
||||
"",
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
ch.Close()
|
||||
time.Sleep(1 * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
ch.Close()
|
||||
return nil
|
||||
case d, ok := <-msgs:
|
||||
if !ok {
|
||||
ch.Close()
|
||||
break // Переподключимся
|
||||
}
|
||||
|
||||
var msg proto.Message
|
||||
if err := json.Unmarshal(d.Body, &msg); err != nil {
|
||||
d.Nack(false, false)
|
||||
continue
|
||||
}
|
||||
|
||||
// Проверяем валидность сообщения перед отправкой
|
||||
if msg.Id == 0 || msg.Content == "" {
|
||||
d.Ack(false)
|
||||
continue
|
||||
}
|
||||
|
||||
if err := stream.Send(&proto.MessageResponse{Message: &msg}); err != nil {
|
||||
d.Nack(false, true)
|
||||
ch.Close()
|
||||
break
|
||||
}
|
||||
d.Ack(false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
s.logger.Printf("Stream error (attempt %d/%d): %v", i+1, maxRetries, err)
|
||||
time.Sleep(retryDelay)
|
||||
retryDelay *= 2
|
||||
}
|
||||
|
||||
err := fmt.Errorf("max retries (%d) exceeded", maxRetries)
|
||||
s.logResponse("StreamMessages", nil, err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user