v0.0.20 правки в messageStream
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
parent
200d5b09f9
commit
9eabb910cd
@ -121,50 +121,23 @@ func (r *queryResolver) GetUserChats(ctx context.Context, userID int) ([]*domain
|
|||||||
|
|
||||||
// MessageStream реализация подписки на новые сообщения
|
// MessageStream реализация подписки на новые сообщения
|
||||||
func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<-chan *domain.Message, error) {
|
func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<-chan *domain.Message, error) {
|
||||||
messageChan := make(chan *domain.Message, 100) // Увеличиваем буфер
|
messageChan := make(chan *domain.Message, 100)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer close(messageChan)
|
defer close(messageChan)
|
||||||
|
defer log.Println("MessageStream: goroutine stopped")
|
||||||
|
|
||||||
retryDelay := time.Second
|
for {
|
||||||
const maxRetries = 5
|
|
||||||
|
|
||||||
for i := 0; i < maxRetries; i++ {
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
log.Printf("MessageStream: context canceled (reason: %v)", ctx.Err())
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
// Создаем новый контекст БЕЗ таймаута
|
err := r.runMessageStream(ctx, userID, messageChan)
|
||||||
stream, err := r.MessageClient.StreamMessages(ctx, &proto.StreamMessagesRequest{
|
|
||||||
UserId: int32(userID),
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Stream connection error (attempt %d/%d): %v", i+1, maxRetries, err)
|
log.Printf("MessageStream error: %v, reconnecting...", err)
|
||||||
time.Sleep(retryDelay)
|
|
||||||
retryDelay *= 2
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Сброс задержки при успешном подключении
|
|
||||||
retryDelay = time.Second
|
|
||||||
|
|
||||||
for {
|
|
||||||
msg, err := stream.Recv()
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Stream receive error: %v", err)
|
|
||||||
break // Выходим из внутреннего цикла для переподключения
|
|
||||||
}
|
|
||||||
|
|
||||||
if msg == nil || msg.Message == nil {
|
|
||||||
continue // Пропускаем heartbeat/пустые сообщения
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case messageChan <- protoMessageToDomain(msg.Message):
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
time.Sleep(2 * time.Second) // Задержка перед переподключением
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -172,6 +145,46 @@ func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<
|
|||||||
return messageChan, nil
|
return messageChan, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *subscriptionResolver) runMessageStream(ctx context.Context, userID int, messageChan chan<- *domain.Message) error {
|
||||||
|
log.Printf("Starting new stream for user %d", userID)
|
||||||
|
|
||||||
|
_, err := r.MessageClient.UpdateMessageStatus(ctx, &proto.UpdateMessageStatusRequest{
|
||||||
|
MessageId: 0, // 0 = все сообщения для пользователя
|
||||||
|
Status: "DELIVERED",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed to mark messages as delivered: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
stream, err := r.MessageClient.StreamMessages(ctx, &proto.StreamMessagesRequest{
|
||||||
|
UserId: int32(userID),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("stream creation failed: %w", err)
|
||||||
|
}
|
||||||
|
defer log.Println("Stream closed")
|
||||||
|
|
||||||
|
for {
|
||||||
|
msg, err := stream.Recv()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("stream receive failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if msg == nil || msg.Message == nil {
|
||||||
|
continue // Пропускаем heartbeat
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Received message: %+v", msg.Message)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil
|
||||||
|
case messageChan <- protoMessageToDomain(msg.Message):
|
||||||
|
log.Printf("Message forwarded to channel: %d", msg.Message.Id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Преобразование proto-сообщения в domain-модель
|
// Преобразование proto-сообщения в domain-модель
|
||||||
func protoMessageToDomain(msg *proto.Message) *domain.Message {
|
func protoMessageToDomain(msg *proto.Message) *domain.Message {
|
||||||
return &domain.Message{
|
return &domain.Message{
|
||||||
|
|||||||
@ -2,6 +2,7 @@ package middleware
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"tailly_back_v2/pkg/auth"
|
"tailly_back_v2/pkg/auth"
|
||||||
)
|
)
|
||||||
@ -10,21 +11,22 @@ import (
|
|||||||
func WSAuthMiddleware(tokenAuth *auth.TokenAuth) func(http.Handler) http.Handler {
|
func WSAuthMiddleware(tokenAuth *auth.TokenAuth) func(http.Handler) http.Handler {
|
||||||
return func(next http.Handler) http.Handler {
|
return func(next http.Handler) http.Handler {
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
// Проверяем, что это WebSocket запрос
|
|
||||||
if r.Header.Get("Upgrade") == "websocket" {
|
if r.Header.Get("Upgrade") == "websocket" {
|
||||||
// Извлекаем токен из query параметров или заголовков
|
log.Printf("New WS connection from %s", r.RemoteAddr)
|
||||||
token := extractTokenFromRequest(r)
|
token := extractTokenFromRequest(r)
|
||||||
|
log.Printf("Extracted token: %s", token[:10]+"...") // Логируем часть токена
|
||||||
|
|
||||||
if token != "" {
|
if token != "" {
|
||||||
userID, err := tokenAuth.ValidateAccessToken(token)
|
userID, err := tokenAuth.ValidateAccessToken(token)
|
||||||
if err == nil {
|
if err != nil {
|
||||||
// Добавляем userID в контекст
|
log.Printf("WS auth failed: %v", err)
|
||||||
|
} else {
|
||||||
|
log.Printf("Authenticated user ID: %d", userID)
|
||||||
ctx := context.WithValue(r.Context(), userIDKey, userID)
|
ctx := context.WithValue(r.Context(), userIDKey, userID)
|
||||||
r = r.WithContext(ctx)
|
r = r.WithContext(ctx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
next.ServeHTTP(w, r)
|
next.ServeHTTP(w, r)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@ -82,11 +82,13 @@ func (s *Server) configureRouter() {
|
|||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
},
|
},
|
||||||
|
ReadBufferSize: 1024,
|
||||||
|
WriteBufferSize: 1024,
|
||||||
}
|
}
|
||||||
|
|
||||||
srv.AddTransport(&transport.Websocket{
|
srv.AddTransport(&transport.Websocket{
|
||||||
Upgrader: wsUpgrader,
|
Upgrader: wsUpgrader,
|
||||||
KeepAlivePingInterval: 15 * time.Second,
|
KeepAlivePingInterval: 10 * time.Second,
|
||||||
})
|
})
|
||||||
|
|
||||||
// Раздельные эндпоинты:
|
// Раздельные эндпоинты:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user