v0.0.22 правки в messageStream
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
parent
aa839737b6
commit
b423683af7
@ -157,42 +157,41 @@ func (r *subscriptionResolver) runMessageStream(ctx context.Context, userID int,
|
||||
log.Printf("Failed to mark messages as delivered: %v", err)
|
||||
}
|
||||
|
||||
streamCtx, cancel := context.WithTimeout(context.Background(), time.Hour)
|
||||
streamCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
stream, err := r.MessageClient.StreamMessages(streamCtx, &proto.StreamMessagesRequest{
|
||||
UserId: int32(userID),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("stream creation failed: %w", err)
|
||||
return fmt.Errorf("failed to create stream: %w", err)
|
||||
}
|
||||
|
||||
// Heartbeat для поддержания соединения
|
||||
heartbeat := time.NewTicker(15 * time.Second)
|
||||
heartbeat := time.NewTicker(25 * time.Second)
|
||||
defer heartbeat.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-heartbeat.C:
|
||||
// Пустой heartbeat для поддержания соединения
|
||||
continue
|
||||
// Отправляем ping для поддержания соединения
|
||||
if err := stream.Context().Err(); err != nil {
|
||||
return fmt.Errorf("connection lost: %w", err)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
default:
|
||||
msg, err := stream.Recv()
|
||||
if err != nil {
|
||||
return fmt.Errorf("stream receive failed: %w", err)
|
||||
return fmt.Errorf("receive error: %w", err)
|
||||
}
|
||||
|
||||
if msg.GetMessage() == nil {
|
||||
continue // Пропускаем heartbeat
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case messageChan <- protoMessageToDomain(msg.Message):
|
||||
log.Printf("Message delivered to channel: %d", msg.Message.Id)
|
||||
if msg.GetMessage() != nil {
|
||||
select {
|
||||
case messageChan <- protoMessageToDomain(msg.Message):
|
||||
log.Printf("Delivered message %d to user %d", msg.Message.Id, userID)
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -2,9 +2,9 @@ package middleware
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"net/http"
|
||||
"tailly_back_v2/pkg/auth"
|
||||
"time"
|
||||
)
|
||||
|
||||
// WSAuthMiddleware проверяет JWT токен для WebSocket соединений
|
||||
@ -12,20 +12,16 @@ func WSAuthMiddleware(tokenAuth *auth.TokenAuth) func(http.Handler) http.Handler
|
||||
return func(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Header.Get("Upgrade") == "websocket" {
|
||||
log.Printf("New WS connection from %s", r.RemoteAddr)
|
||||
token := extractTokenFromRequest(r)
|
||||
log.Printf("Extracted token: %s", token[:10]+"...") // Логируем часть токена
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 30*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
token := extractTokenFromRequest(r)
|
||||
if token != "" {
|
||||
userID, err := tokenAuth.ValidateAccessToken(token)
|
||||
if err != nil {
|
||||
log.Printf("WS auth failed: %v", err)
|
||||
} else {
|
||||
log.Printf("Authenticated user ID: %d", userID)
|
||||
ctx := context.WithValue(r.Context(), userIDKey, userID)
|
||||
r = r.WithContext(ctx)
|
||||
if userID, err := tokenAuth.ValidateAccessToken(token); err == nil {
|
||||
ctx = context.WithValue(ctx, userIDKey, userID)
|
||||
}
|
||||
}
|
||||
r = r.WithContext(ctx)
|
||||
}
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user