This commit is contained in:
parent
d48eb31edc
commit
cadf06a932
@ -3,8 +3,6 @@ package graph
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"math"
|
|
||||||
"tailly_back_v2/internal/domain"
|
"tailly_back_v2/internal/domain"
|
||||||
"tailly_back_v2/proto"
|
"tailly_back_v2/proto"
|
||||||
"time"
|
"time"
|
||||||
@ -122,50 +120,27 @@ func (r *queryResolver) GetUserChats(ctx context.Context, userID int) ([]*domain
|
|||||||
|
|
||||||
// MessageStream реализация подписки на новые сообщения
|
// MessageStream реализация подписки на новые сообщения
|
||||||
func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<-chan *domain.Message, error) {
|
func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<-chan *domain.Message, error) {
|
||||||
messageChan := make(chan *domain.Message)
|
stream, err := r.MessageClient.StreamMessages(ctx, &proto.StreamMessagesRequest{
|
||||||
|
|
||||||
go func() {
|
|
||||||
defer close(messageChan)
|
|
||||||
|
|
||||||
retryDelay := time.Second
|
|
||||||
|
|
||||||
for {
|
|
||||||
// Создаем новый контекст с таймаутом
|
|
||||||
streamCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
||||||
|
|
||||||
stream, err := r.MessageClient.StreamMessages(streamCtx, &proto.StreamMessagesRequest{
|
|
||||||
UserId: int32(userID),
|
UserId: int32(userID),
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cancel()
|
return nil, fmt.Errorf("failed to stream messages: %w", err)
|
||||||
log.Printf("Failed to stream messages: %v", err)
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case <-time.After(retryDelay):
|
|
||||||
retryDelay = time.Duration(math.Min(float64(retryDelay*2), float64(30*time.Second)))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
messageChan := make(chan *domain.Message)
|
||||||
|
go func() {
|
||||||
|
defer close(messageChan)
|
||||||
for {
|
for {
|
||||||
msg, err := stream.Recv()
|
msg, err := stream.Recv()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cancel()
|
return
|
||||||
log.Printf("Stream receive error: %v", err)
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
cancel()
|
|
||||||
return
|
return
|
||||||
case messageChan <- protoMessageToDomain(msg.Message):
|
case messageChan <- protoMessageToDomain(msg.Message):
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return messageChan, nil
|
return messageChan, nil
|
||||||
|
|||||||
@ -17,7 +17,6 @@ import (
|
|||||||
"tailly_back_v2/internal/http/middleware"
|
"tailly_back_v2/internal/http/middleware"
|
||||||
"tailly_back_v2/internal/service"
|
"tailly_back_v2/internal/service"
|
||||||
"tailly_back_v2/pkg/auth"
|
"tailly_back_v2/pkg/auth"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Server struct {
|
type Server struct {
|
||||||
@ -85,7 +84,6 @@ func (s *Server) configureRouter() {
|
|||||||
return false
|
return false
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
KeepAlivePingInterval: 10 * time.Second,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
srv.AddTransport(&wsTransport)
|
srv.AddTransport(&wsTransport)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user