This commit is contained in:
parent
aad2fb366f
commit
62aea84fcb
@ -121,23 +121,50 @@ func (r *queryResolver) GetUserChats(ctx context.Context, userID int) ([]*domain
|
|||||||
|
|
||||||
// MessageStream реализация подписки на новые сообщения
|
// MessageStream реализация подписки на новые сообщения
|
||||||
func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<-chan *domain.Message, error) {
|
func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<-chan *domain.Message, error) {
|
||||||
messageChan := make(chan *domain.Message, 100)
|
messageChan := make(chan *domain.Message, 100) // Увеличиваем буфер
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer close(messageChan)
|
defer close(messageChan)
|
||||||
defer log.Println("MessageStream: goroutine stopped")
|
|
||||||
|
|
||||||
for {
|
retryDelay := time.Second
|
||||||
|
const maxRetries = 5
|
||||||
|
|
||||||
|
for i := 0; i < maxRetries; i++ {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
log.Printf("MessageStream: context canceled (reason: %v)", ctx.Err())
|
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
err := r.runMessageStream(ctx, userID, messageChan)
|
// Создаем новый контекст БЕЗ таймаута
|
||||||
|
stream, err := r.MessageClient.StreamMessages(ctx, &proto.StreamMessagesRequest{
|
||||||
|
UserId: int32(userID),
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("MessageStream error: %v, reconnecting...", err)
|
log.Printf("Stream connection error (attempt %d/%d): %v", i+1, maxRetries, err)
|
||||||
|
time.Sleep(retryDelay)
|
||||||
|
retryDelay *= 2
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Сброс задержки при успешном подключении
|
||||||
|
retryDelay = time.Second
|
||||||
|
|
||||||
|
for {
|
||||||
|
msg, err := stream.Recv()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Stream receive error: %v", err)
|
||||||
|
break // Выходим из внутреннего цикла для переподключения
|
||||||
|
}
|
||||||
|
|
||||||
|
if msg == nil || msg.Message == nil {
|
||||||
|
continue // Пропускаем heartbeat/пустые сообщения
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case messageChan <- protoMessageToDomain(msg.Message):
|
||||||
|
}
|
||||||
}
|
}
|
||||||
time.Sleep(2 * time.Second) // Задержка перед переподключением
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -145,58 +172,6 @@ func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<
|
|||||||
return messageChan, nil
|
return messageChan, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *subscriptionResolver) runMessageStream(ctx context.Context, userID int, messageChan chan<- *domain.Message) error {
|
|
||||||
log.Printf("Starting new stream for user %d", userID)
|
|
||||||
|
|
||||||
// Создаем отдельный контекст для gRPC стрима
|
|
||||||
grpcCtx := context.Background()
|
|
||||||
|
|
||||||
_, err := r.MessageClient.UpdateMessageStatus(ctx, &proto.UpdateMessageStatusRequest{
|
|
||||||
MessageId: 0, // 0 = все сообщения для пользователя
|
|
||||||
Status: "DELIVERED",
|
|
||||||
UserId: int32(userID),
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Failed to mark messages as delivered: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
stream, err := r.MessageClient.StreamMessages(grpcCtx, &proto.StreamMessagesRequest{
|
|
||||||
UserId: int32(userID),
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to create stream: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
heartbeat := time.NewTicker(25 * time.Second)
|
|
||||||
defer heartbeat.Stop()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-heartbeat.C:
|
|
||||||
// Отправляем ping для поддержания соединения
|
|
||||||
if err := stream.Context().Err(); err != nil {
|
|
||||||
return fmt.Errorf("connection lost: %w", err)
|
|
||||||
}
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil
|
|
||||||
default:
|
|
||||||
msg, err := stream.Recv()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("receive error: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if msg.GetMessage() != nil {
|
|
||||||
select {
|
|
||||||
case messageChan <- protoMessageToDomain(msg.Message):
|
|
||||||
log.Printf("Delivered message %d to user %d", msg.Message.Id, userID)
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Преобразование proto-сообщения в domain-модель
|
// Преобразование proto-сообщения в domain-модель
|
||||||
func protoMessageToDomain(msg *proto.Message) *domain.Message {
|
func protoMessageToDomain(msg *proto.Message) *domain.Message {
|
||||||
return &domain.Message{
|
return &domain.Message{
|
||||||
|
|||||||
@ -3,7 +3,6 @@ package middleware
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
|
||||||
"tailly_back_v2/pkg/auth"
|
"tailly_back_v2/pkg/auth"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -11,39 +10,31 @@ import (
|
|||||||
func WSAuthMiddleware(tokenAuth *auth.TokenAuth) func(http.Handler) http.Handler {
|
func WSAuthMiddleware(tokenAuth *auth.TokenAuth) func(http.Handler) http.Handler {
|
||||||
return func(next http.Handler) http.Handler {
|
return func(next http.Handler) http.Handler {
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
// Проверяем, что это WebSocket запрос
|
||||||
if r.Header.Get("Upgrade") == "websocket" {
|
if r.Header.Get("Upgrade") == "websocket" {
|
||||||
|
// Извлекаем токен из query параметров или заголовков
|
||||||
token := extractTokenFromRequest(r)
|
token := extractTokenFromRequest(r)
|
||||||
|
|
||||||
if token != "" {
|
if token != "" {
|
||||||
if userID, err := tokenAuth.ValidateAccessToken(token); err == nil {
|
userID, err := tokenAuth.ValidateAccessToken(token)
|
||||||
// Создаем контекст без таймаута для WebSocket
|
if err == nil {
|
||||||
|
// Добавляем userID в контекст
|
||||||
ctx := context.WithValue(r.Context(), userIDKey, userID)
|
ctx := context.WithValue(r.Context(), userIDKey, userID)
|
||||||
r = r.WithContext(ctx)
|
r = r.WithContext(ctx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
next.ServeHTTP(w, r)
|
next.ServeHTTP(w, r)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func extractTokenFromRequest(r *http.Request) string {
|
func extractTokenFromRequest(r *http.Request) string {
|
||||||
// Проверяем куки
|
// Только проверка кук (как в вашем коде)
|
||||||
cookie, err := r.Cookie("accessToken")
|
cookie, err := r.Cookie("accessToken")
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return cookie.Value
|
return cookie.Value
|
||||||
}
|
}
|
||||||
|
|
||||||
// Проверяем заголовок Authorization
|
|
||||||
authHeader := r.Header.Get("Authorization")
|
|
||||||
if authHeader != "" && strings.HasPrefix(authHeader, "Bearer ") {
|
|
||||||
return strings.TrimPrefix(authHeader, "Bearer ")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Проверяем query параметры
|
|
||||||
token := r.URL.Query().Get("token")
|
|
||||||
if token != "" {
|
|
||||||
return token
|
|
||||||
}
|
|
||||||
|
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|||||||
@ -65,9 +65,10 @@ func (s *Server) configureRouter() {
|
|||||||
s.router.Use(middleware.MetricsMiddleware)
|
s.router.Use(middleware.MetricsMiddleware)
|
||||||
s.router.Use(middleware.CORS(allowedOrigins))
|
s.router.Use(middleware.CORS(allowedOrigins))
|
||||||
|
|
||||||
// Создаем отдельный обработчик для WebSocket
|
// Основной GraphQL обработчик
|
||||||
wsHandler := handler.NewDefaultServer(graph.NewExecutableSchema(graph.Config{
|
resolver := graph.NewResolver(s.services, s.db, s.services.Messages)
|
||||||
Resolvers: graph.NewResolver(s.services, s.db, s.services.Messages),
|
srv := handler.NewDefaultServer(graph.NewExecutableSchema(graph.Config{
|
||||||
|
Resolvers: resolver,
|
||||||
}))
|
}))
|
||||||
|
|
||||||
// Настройка WebSocket транспорта
|
// Настройка WebSocket транспорта
|
||||||
@ -81,34 +82,20 @@ func (s *Server) configureRouter() {
|
|||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
},
|
},
|
||||||
ReadBufferSize: 1024,
|
|
||||||
WriteBufferSize: 1024,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
wsHandler.AddTransport(&transport.Websocket{
|
srv.AddTransport(&transport.Websocket{
|
||||||
Upgrader: wsUpgrader,
|
Upgrader: wsUpgrader,
|
||||||
KeepAlivePingInterval: 30 * time.Second,
|
KeepAlivePingInterval: 15 * time.Second,
|
||||||
})
|
})
|
||||||
|
|
||||||
// Создаем обычный HTTP обработчик
|
|
||||||
httpHandler := handler.NewDefaultServer(graph.NewExecutableSchema(graph.Config{
|
|
||||||
Resolvers: graph.NewResolver(s.services, s.db, s.services.Messages),
|
|
||||||
}))
|
|
||||||
|
|
||||||
// Раздельные эндпоинты:
|
// Раздельные эндпоинты:
|
||||||
|
|
||||||
// 1. Обычные HTTP GraphQL запросы (только POST)
|
// 1. Обычные HTTP GraphQL запросы (только POST)
|
||||||
s.router.With(middleware.AuthMiddleware(s.tokenAuth)).Post("/query", httpHandler.ServeHTTP)
|
s.router.With(middleware.AuthMiddleware(s.tokenAuth)).Post("/query", srv.ServeHTTP)
|
||||||
|
|
||||||
// 2. WebSocket подписки (отдельный эндпоинт с WS middleware)
|
// 2. WebSocket подписки (отдельный эндпоинт)
|
||||||
s.router.With(middleware.WSAuthMiddleware(s.tokenAuth)).Get("/ws", func(w http.ResponseWriter, r *http.Request) {
|
s.router.With(middleware.WSAuthMiddleware(s.tokenAuth)).Get("/ws", srv.ServeHTTP)
|
||||||
// Проверяем, что это WebSocket запрос
|
|
||||||
if websocket.IsWebSocketUpgrade(r) {
|
|
||||||
wsHandler.ServeHTTP(w, r)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
http.Error(w, "WebSocket upgrade required", http.StatusBadRequest)
|
|
||||||
})
|
|
||||||
|
|
||||||
// 3. Playground и другие обработчики
|
// 3. Playground и другие обработчики
|
||||||
s.router.Handle("/", playground.Handler("GraphQL playground", "/query"))
|
s.router.Handle("/", playground.Handler("GraphQL playground", "/query"))
|
||||||
|
|||||||
@ -294,7 +294,6 @@ type UpdateMessageStatusRequest struct {
|
|||||||
state protoimpl.MessageState `protogen:"open.v1"`
|
state protoimpl.MessageState `protogen:"open.v1"`
|
||||||
MessageId int32 `protobuf:"varint,1,opt,name=message_id,json=messageId,proto3" json:"message_id,omitempty"`
|
MessageId int32 `protobuf:"varint,1,opt,name=message_id,json=messageId,proto3" json:"message_id,omitempty"`
|
||||||
Status string `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"`
|
Status string `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"`
|
||||||
UserId int32 `protobuf:"varint,3,opt,name=userId,proto3" json:"userId,omitempty"`
|
|
||||||
unknownFields protoimpl.UnknownFields
|
unknownFields protoimpl.UnknownFields
|
||||||
sizeCache protoimpl.SizeCache
|
sizeCache protoimpl.SizeCache
|
||||||
}
|
}
|
||||||
@ -343,13 +342,6 @@ func (x *UpdateMessageStatusRequest) GetStatus() string {
|
|||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *UpdateMessageStatusRequest) GetUserId() int32 {
|
|
||||||
if x != nil {
|
|
||||||
return x.UserId
|
|
||||||
}
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
type StreamMessagesRequest struct {
|
type StreamMessagesRequest struct {
|
||||||
state protoimpl.MessageState `protogen:"open.v1"`
|
state protoimpl.MessageState `protogen:"open.v1"`
|
||||||
UserId int32 `protobuf:"varint,1,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"`
|
UserId int32 `protobuf:"varint,1,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"`
|
||||||
@ -766,12 +758,11 @@ const file_messages_proto_rawDesc = "" +
|
|||||||
"\x05limit\x18\x02 \x01(\x05R\x05limit\x12\x16\n" +
|
"\x05limit\x18\x02 \x01(\x05R\x05limit\x12\x16\n" +
|
||||||
"\x06offset\x18\x03 \x01(\x05R\x06offset\".\n" +
|
"\x06offset\x18\x03 \x01(\x05R\x06offset\".\n" +
|
||||||
"\x13GetUserChatsRequest\x12\x17\n" +
|
"\x13GetUserChatsRequest\x12\x17\n" +
|
||||||
"\auser_id\x18\x01 \x01(\x05R\x06userId\"k\n" +
|
"\auser_id\x18\x01 \x01(\x05R\x06userId\"S\n" +
|
||||||
"\x1aUpdateMessageStatusRequest\x12\x1d\n" +
|
"\x1aUpdateMessageStatusRequest\x12\x1d\n" +
|
||||||
"\n" +
|
"\n" +
|
||||||
"message_id\x18\x01 \x01(\x05R\tmessageId\x12\x16\n" +
|
"message_id\x18\x01 \x01(\x05R\tmessageId\x12\x16\n" +
|
||||||
"\x06status\x18\x02 \x01(\tR\x06status\x12\x16\n" +
|
"\x06status\x18\x02 \x01(\tR\x06status\"0\n" +
|
||||||
"\x06userId\x18\x03 \x01(\x05R\x06userId\"0\n" +
|
|
||||||
"\x15StreamMessagesRequest\x12\x17\n" +
|
"\x15StreamMessagesRequest\x12\x17\n" +
|
||||||
"\auser_id\x18\x01 \x01(\x05R\x06userId\"\xdd\x01\n" +
|
"\auser_id\x18\x01 \x01(\x05R\x06userId\"\xdd\x01\n" +
|
||||||
"\aMessage\x12\x0e\n" +
|
"\aMessage\x12\x0e\n" +
|
||||||
|
|||||||
@ -45,7 +45,6 @@ message GetUserChatsRequest {
|
|||||||
message UpdateMessageStatusRequest {
|
message UpdateMessageStatusRequest {
|
||||||
int32 message_id = 1;
|
int32 message_id = 1;
|
||||||
string status = 2;
|
string status = 2;
|
||||||
int32 userId = 3;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message StreamMessagesRequest {
|
message StreamMessagesRequest {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user