v0.0.18.5 замена ws на sse
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
madipo2611 2025-08-17 09:16:35 +03:00
parent f9e9347389
commit 2a3ae72db9
2 changed files with 38 additions and 65 deletions

View File

@ -121,62 +121,36 @@ func (r *queryResolver) GetUserChats(ctx context.Context, userID int) ([]*domain
// MessageStream реализация подписки на новые сообщения
func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<-chan *domain.Message, error) {
messageChan := make(chan *domain.Message, 10) // Буферизованный канал
// Создаем канал для GraphQL подписки
messageChan := make(chan *domain.Message)
// Вызываем gRPC стриминг
stream, err := r.MessageClient.StreamMessages(ctx, &proto.StreamMessagesRequest{
UserId: int32(userID),
})
if err != nil {
return nil, fmt.Errorf("failed to stream messages: %w", err)
}
go func() {
defer close(messageChan)
retryCount := 0
maxRetries := 5
for {
if retryCount >= maxRetries {
log.Printf("Max retries (%d) reached for user %d", maxRetries, userID)
msg, err := stream.Recv()
if err != nil {
log.Printf("gRPC stream error: %v", err)
return
}
// Создаем новое соединение
stream, err := r.MessageClient.StreamMessages(ctx, &proto.StreamMessagesRequest{
UserId: int32(userID),
})
if err != nil {
log.Printf("Stream connection error for user %d: %v (retry %d)", userID, err, retryCount)
retryCount++
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(retryCount) * time.Second):
continue
}
// Пропускаем heartbeat сообщения
if msg.Message.Content == "__heartbeat__" {
continue
}
retryCount = 0 // Сброс счетчика при успешном подключении
for {
msg, err := stream.Recv()
if err != nil {
log.Printf("Stream receive error for user %d: %v", userID, err)
break
}
// Пропускаем heartbeat сообщения
if msg.Message.GetContent() == "__heartbeat__" {
continue
}
select {
case <-ctx.Done():
return
case messageChan <- protoMessageToDomain(msg.Message):
// Сообщение успешно отправлено в канал
}
}
// Пауза перед повторной попыткой
select {
case <-ctx.Done():
return
case <-time.After(1 * time.Second):
retryCount++
case messageChan <- protoMessageToDomain(msg.Message):
}
}
}()
@ -186,9 +160,6 @@ func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<
// Преобразование proto-сообщения в domain-модель
func protoMessageToDomain(msg *proto.Message) *domain.Message {
if msg == nil {
return nil
}
return &domain.Message{
ID: int(msg.Id),
ChatID: int(msg.ChatId),

View File

@ -5,10 +5,9 @@ import (
"database/sql"
"github.com/99designs/gqlgen/graphql/handler"
"github.com/99designs/gqlgen/graphql/handler/transport"
"github.com/99designs/gqlgen/graphql/playground"
"github.com/go-chi/chi/v5"
"github.com/gorilla/websocket"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/vektah/gqlparser/v2/ast"
"log"
"net/http"
"os"
@ -72,26 +71,29 @@ func (s *Server) configureRouter() {
Resolvers: resolver,
}))
wsTransport := transport.Websocket{
Upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
origin := r.Header.Get("Origin")
for _, allowed := range allowedOrigins {
if origin == allowed {
return true
}
}
return false
},
},
}
srv.AddTransport(&transport.POST{})
srv.AddTransport(&transport.GET{})
srv.AddTransport(&transport.Options{})
srv.AddTransport(&wsTransport)
// Настройка SSE транспорта (правильный способ)
sseTransport := &transport.SSE{}
srv.AddTransport(sseTransport)
s.router.Handle("/", playground.Handler("GraphQL playground", "/query"))
s.router.Handle("/query", srv)
s.router.Handle("/uploads/*", http.StripPrefix("/uploads/", http.FileServer(http.Dir("./uploads"))))
// Простейший кеш запросов (альтернатива MemoryCache)
srv.SetQueryCache(&NoopCache{})
})
s.router.Handle("/", playground.Handler("GraphQL playground", "/query"))
s.router.Handle("/query", srv)
s.router.Handle("/uploads/*", http.StripPrefix("/uploads/", http.FileServer(http.Dir("./uploads"))))
}
type NoopCache struct{}
func (n *NoopCache) Get(ctx context.Context, key string) (*ast.QueryDocument, bool) {
return nil, false
}
func (n *NoopCache) Add(ctx context.Context, key string, value *ast.QueryDocument) {}
func (s *Server) configureMetrics() {
metricsRouter := chi.NewRouter()