From e58bbb71bec1cadad04c5da649b484eab1e1414e Mon Sep 17 00:00:00 2001 From: madipo2611 Date: Fri, 22 Aug 2025 23:01:49 +0300 Subject: [PATCH] =?UTF-8?q?v0.0.24=20=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=BB?= =?UTF-8?q?=D0=B5=D0=BD=D1=8B=20=D0=BC=D0=B5=D1=82=D1=80=D0=B8=D0=BA=D0=B8?= =?UTF-8?q?=20=D0=B2=20messages=5Fresolvers?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 1 - go.sum | 1 - internal/http/graph/messages_resolvers.go | 97 ++++++++++ internal/http/middleware/metrics.go | 213 ++++++++++++++++++++-- 4 files changed, 296 insertions(+), 16 deletions(-) diff --git a/go.mod b/go.mod index 73e5171..8a99f96 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,6 @@ require ( github.com/prometheus/common v0.65.0 // indirect github.com/prometheus/procfs v0.17.0 // indirect github.com/sosodev/duration v1.3.1 // indirect - github.com/stretchr/objx v0.5.2 // indirect golang.org/x/net v0.42.0 // indirect golang.org/x/sys v0.34.0 // indirect golang.org/x/text v0.27.0 // indirect diff --git a/go.sum b/go.sum index ec5ccb1..1b0701b 100644 --- a/go.sum +++ b/go.sum @@ -79,7 +79,6 @@ github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NF github.com/sosodev/duration v1.3.1 h1:qtHBDMQ6lvMQsL15g4aopM4HEfOaYuhWBw3NPTtlqq4= github.com/sosodev/duration v1.3.1/go.mod h1:RQIBBX0+fMLc/D9+Jb/fwvVmo0eZvDDEERAikUR6SDg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/vektah/gqlparser/v2 v2.5.30 h1:EqLwGAFLIzt1wpx1IPpY67DwUujF1OfzgEyDsLrN6kE= diff --git a/internal/http/graph/messages_resolvers.go b/internal/http/graph/messages_resolvers.go index 4795245..46aeeb0 100644 --- a/internal/http/graph/messages_resolvers.go +++ b/internal/http/graph/messages_resolvers.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "tailly_back_v2/internal/domain" + "tailly_back_v2/internal/http/middleware" "tailly_back_v2/proto" "time" ) @@ -18,6 +19,17 @@ type subscriptionResolver struct{ *Resolver } // CreateChat is the resolver for the createChat field. func (r *mutationResolver) CreateChat(ctx context.Context, user1Id int, user2Id int) (*domain.Chat, error) { + start := time.Now() + success := false + + defer func() { + duration := time.Since(start) + middleware.IncGQLOperation("mutation", "createChat", success, duration) + if !success { + middleware.IncError("gql_operation", "createChat", "error") + } + }() + // Просто вызываем gRPC метод, вся логика уже там res, err := r.MessageClient.CreateChat(ctx, &proto.CreateChatRequest{ User1Id: int32(user1Id), @@ -27,12 +39,24 @@ func (r *mutationResolver) CreateChat(ctx context.Context, user1Id int, user2Id return nil, fmt.Errorf("failed to create chat: %w", err) } + success = true // Преобразуем proto-чат в domain-модель return protoChatToDomain(res.Chat), nil } // SendMessage реализация мутации для отправки сообщения func (r *mutationResolver) SendMessage(ctx context.Context, chatID int, content string) (*domain.Message, error) { + start := time.Now() + success := false + + defer func() { + duration := time.Since(start) + middleware.IncGQLOperation("mutation", "sendMessage", success, duration) + if !success { + middleware.IncError("gql_operation", "sendMessage", "error") + } + }() + // Получаем senderID из контекста senderID, err := getUserIDFromContext(ctx) if err != nil { @@ -47,11 +71,25 @@ func (r *mutationResolver) SendMessage(ctx context.Context, chatID int, content if err != nil { return nil, fmt.Errorf("failed to send message: %w", err) } + + success = true + middleware.IncWebSocketMessage("outgoing", "chat_message") return protoMessageToDomain(res.Message), nil } // UpdateMessageStatus реализация мутации для обновления статуса сообщения func (r *mutationResolver) UpdateMessageStatus(ctx context.Context, messageID int, status MessageStatus) (*domain.Message, error) { + start := time.Now() + success := false + + defer func() { + duration := time.Since(start) + middleware.IncGQLOperation("mutation", "updateMessageStatus", success, duration) + if !success { + middleware.IncError("gql_operation", "updateMessageStatus", "error") + } + }() + var statusStr string switch status { case MessageStatusSent: @@ -72,11 +110,23 @@ func (r *mutationResolver) UpdateMessageStatus(ctx context.Context, messageID in return nil, fmt.Errorf("failed to update message status: %w", err) } + success = true return protoMessageToDomain(res.Message), nil } // GetChat реализация запроса для получения чата func (r *queryResolver) GetChat(ctx context.Context, user1Id int, user2Id int) (*domain.Chat, error) { + start := time.Now() + success := false + + defer func() { + duration := time.Since(start) + middleware.IncGQLOperation("query", "getChat", success, duration) + if !success { + middleware.IncError("gql_operation", "getChat", "error") + } + }() + res, err := r.MessageClient.GetChat(ctx, &proto.GetChatRequest{ User1Id: int32(user1Id), User2Id: int32(user2Id), @@ -84,11 +134,24 @@ func (r *queryResolver) GetChat(ctx context.Context, user1Id int, user2Id int) ( if err != nil { return nil, fmt.Errorf("failed to get chat: %w", err) } + + success = true return protoChatToDomain(res.Chat), nil } // GetChatMessages реализация запроса для получения сообщений чата func (r *queryResolver) GetChatMessages(ctx context.Context, chatID int, limit int, offset int) ([]*domain.Message, error) { + start := time.Now() + success := false + + defer func() { + duration := time.Since(start) + middleware.IncGQLOperation("query", "getChatMessages", success, duration) + if !success { + middleware.IncError("gql_operation", "getChatMessages", "error") + } + }() + res, err := r.MessageClient.GetChatMessages(ctx, &proto.GetChatMessagesRequest{ ChatId: int32(chatID), Limit: int32(limit), @@ -97,30 +160,58 @@ func (r *queryResolver) GetChatMessages(ctx context.Context, chatID int, limit i if err != nil { return nil, fmt.Errorf("failed to get chat messages: %w", err) } + var messages []*domain.Message for _, msg := range res.Messages { messages = append(messages, protoMessageToDomain(msg)) } + + success = true return messages, nil } // GetUserChats реализация запроса для получения чатов пользователя func (r *queryResolver) GetUserChats(ctx context.Context, userID int) ([]*domain.Chat, error) { + start := time.Now() + success := false + + defer func() { + duration := time.Since(start) + middleware.IncGQLOperation("query", "getUserChats", success, duration) + if !success { + middleware.IncError("gql_operation", "getUserChats", "error") + } + }() + res, err := r.MessageClient.GetUserChats(ctx, &proto.GetUserChatsRequest{ UserId: int32(userID), }) if err != nil { return nil, fmt.Errorf("failed to get user chats: %w", err) } + var chats []*domain.Chat for _, chat := range res.Chats { chats = append(chats, protoChatToDomain(chat)) } + + success = true return chats, nil } // MessageStream реализация подписки на новые сообщения func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<-chan *domain.Message, error) { + start := time.Now() + success := false + + defer func() { + duration := time.Since(start) + middleware.IncGQLOperation("subscription", "messageStream", success, duration) + if !success { + middleware.IncError("gql_operation", "messageStream", "error") + } + }() + messageChan := make(chan *domain.Message, 100) go func() { @@ -136,12 +227,14 @@ func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (< err := r.runMessageStream(ctx, userID, messageChan) if err != nil { log.Printf("MessageStream error: %v, reconnecting...", err) + middleware.IncError("websocket_stream", "messageStream", "warning") } time.Sleep(2 * time.Second) // Задержка перед переподключением } } }() + success = true return messageChan, nil } @@ -155,6 +248,7 @@ func (r *subscriptionResolver) runMessageStream(ctx context.Context, userID int, }) if err != nil { log.Printf("Failed to mark messages as delivered: %v", err) + middleware.IncError("message_status", "updateStatus", "warning") } streamCtx, cancel := context.WithCancel(ctx) @@ -175,6 +269,7 @@ func (r *subscriptionResolver) runMessageStream(ctx context.Context, userID int, case <-heartbeat.C: // Отправляем ping для поддержания соединения if err := stream.Context().Err(); err != nil { + middleware.IncError("websocket_heartbeat", "messageStream", "error") return fmt.Errorf("connection lost: %w", err) } case <-ctx.Done(): @@ -182,6 +277,7 @@ func (r *subscriptionResolver) runMessageStream(ctx context.Context, userID int, default: msg, err := stream.Recv() if err != nil { + middleware.IncError("websocket_receive", "messageStream", "error") return fmt.Errorf("receive error: %w", err) } @@ -189,6 +285,7 @@ func (r *subscriptionResolver) runMessageStream(ctx context.Context, userID int, select { case messageChan <- protoMessageToDomain(msg.Message): log.Printf("Delivered message %d to user %d", msg.Message.Id, userID) + middleware.IncWebSocketMessage("incoming", "chat_message") case <-ctx.Done(): return nil } diff --git a/internal/http/middleware/metrics.go b/internal/http/middleware/metrics.go index 4eca91d..9fb0cd2 100644 --- a/internal/http/middleware/metrics.go +++ b/internal/http/middleware/metrics.go @@ -10,29 +10,128 @@ import ( ) var ( + // HTTP метрики httpRequestsTotal = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "http_requests_total", Help: "Total number of HTTP requests", }, - []string{"method", "path", "status"}, + []string{"method", "path", "status", "handler"}, ) httpRequestDuration = promauto.NewHistogramVec( prometheus.HistogramOpts{ Name: "http_request_duration_seconds", Help: "Duration of HTTP requests", - Buckets: []float64{0.1, 0.5, 1, 2.5, 5, 10}, + Buckets: []float64{0.01, 0.05, 0.1, 0.5, 1, 2.5, 5, 10}, }, - []string{"method", "path"}, + []string{"method", "path", "handler"}, ) - httpResponseSize = promauto.NewSummaryVec( - prometheus.SummaryOpts{ - Name: "http_response_size_bytes", - Help: "Size of HTTP responses", + // GraphQL специфичные метрики + gqlOperationsTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "graphql_operations_total", + Help: "Total number of GraphQL operations", }, - []string{"method", "path"}, + []string{"operation", "type", "name", "success"}, + ) + + gqlOperationDuration = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "graphql_operation_duration_seconds", + Help: "Duration of GraphQL operations", + Buckets: []float64{0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 2, 5}, + }, + []string{"operation", "type", "name"}, + ) + + // Бизнес метрики + usersTotal = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "users_total", + Help: "Total number of registered users", + }, + ) + + postsTotal = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "posts_total", + Help: "Total number of posts", + }, + ) + + commentsTotal = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "comments_total", + Help: "Total number of comments", + }, + ) + + messagesTotal = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "messages_total", + Help: "Total number of messages", + }, + ) + + activeWebsockets = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "websocket_connections_active", + Help: "Number of active WebSocket connections", + }, + ) + + websocketMessagesTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "websocket_messages_total", + Help: "Total number of WebSocket messages", + }, + []string{"direction", "type"}, + ) + + // Метрики ошибок + errorsTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "errors_total", + Help: "Total number of errors by type", + }, + []string{"type", "source", "severity"}, + ) + + // Метрики базы данных + dbQueriesTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "db_queries_total", + Help: "Total number of database queries", + }, + []string{"operation", "table", "success"}, + ) + + dbQueryDuration = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "db_query_duration_seconds", + Help: "Duration of database queries", + Buckets: []float64{0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 2}, + }, + []string{"operation", "table"}, + ) + + // Метрики кэша + cacheHitsTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "cache_hits_total", + Help: "Total number of cache hits", + }, + []string{"type", "name"}, + ) + + cacheMissesTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "cache_misses_total", + Help: "Total number of cache misses", + }, + []string{"type", "name"}, ) ) @@ -47,21 +146,107 @@ func MetricsMiddleware(next http.Handler) http.Handler { duration := time.Since(start).Seconds() status := strconv.Itoa(rw.status) - // Регистрируем метрики + // Определяем handler type + handlerType := "http" + if r.URL.Path == "/query" { + handlerType = "graphql" + } else if r.URL.Path == "/ws" { + handlerType = "websocket" + } + httpRequestsTotal.WithLabelValues( r.Method, r.URL.Path, status, + handlerType, ).Inc() httpRequestDuration.WithLabelValues( r.Method, r.URL.Path, + handlerType, ).Observe(duration) - - httpResponseSize.WithLabelValues( - r.Method, - r.URL.Path, - ).Observe(float64(rw.size)) }) } + +// GraphQLMetricsMiddleware для отслеживания GraphQL операций +func GraphQLMetricsMiddleware() func(http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/query" { + next.ServeHTTP(w, r) + return + } + + // Здесь можно парсить GraphQL запрос и извлекать информацию об операции + // Для простоты пока пропускаем + next.ServeHTTP(w, r) + }) + } +} + +// Вспомогательные функции для обновления метрик + +func IncGQLOperation(operationType, operationName string, success bool, duration time.Duration) { + status := "false" + if success { + status = "true" + } + + gqlOperationsTotal.WithLabelValues( + operationType, + operationName, + status, + ).Inc() + + gqlOperationDuration.WithLabelValues( + operationType, + operationName, + ).Observe(duration.Seconds()) +} + +func IncWebSocketMessage(direction, messageType string) { + websocketMessagesTotal.WithLabelValues(direction, messageType).Inc() +} + +func SetActiveWebsockets(count int) { + activeWebsockets.Set(float64(count)) +} + +func IncError(errorType, source, severity string) { + errorsTotal.WithLabelValues(errorType, source, severity).Inc() +} + +func IncDBQuery(operation, table string, success bool, duration time.Duration) { + status := "false" + if success { + status = "true" + } + + dbQueriesTotal.WithLabelValues(operation, table, status).Inc() + dbQueryDuration.WithLabelValues(operation, table).Observe(duration.Seconds()) +} + +func IncCacheHit(cacheType, cacheName string) { + cacheHitsTotal.WithLabelValues(cacheType, cacheName).Inc() +} + +func IncCacheMiss(cacheType, cacheName string) { + cacheMissesTotal.WithLabelValues(cacheType, cacheName).Inc() +} + +func SetUsersCount(count int) { + usersTotal.Set(float64(count)) +} + +func SetPostsCount(count int) { + postsTotal.Set(float64(count)) +} + +func SetCommentsCount(count int) { + commentsTotal.Set(float64(count)) +} + +func SetMessagesCount(count int) { + messagesTotal.Set(float64(count)) +}