v0.0.24 Добавлены метрики в messages_resolvers
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
parent
470b0b5342
commit
e58bbb71be
1
go.mod
1
go.mod
@ -36,7 +36,6 @@ require (
|
||||
github.com/prometheus/common v0.65.0 // indirect
|
||||
github.com/prometheus/procfs v0.17.0 // indirect
|
||||
github.com/sosodev/duration v1.3.1 // indirect
|
||||
github.com/stretchr/objx v0.5.2 // indirect
|
||||
golang.org/x/net v0.42.0 // indirect
|
||||
golang.org/x/sys v0.34.0 // indirect
|
||||
golang.org/x/text v0.27.0 // indirect
|
||||
|
||||
1
go.sum
1
go.sum
@ -79,7 +79,6 @@ github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NF
|
||||
github.com/sosodev/duration v1.3.1 h1:qtHBDMQ6lvMQsL15g4aopM4HEfOaYuhWBw3NPTtlqq4=
|
||||
github.com/sosodev/duration v1.3.1/go.mod h1:RQIBBX0+fMLc/D9+Jb/fwvVmo0eZvDDEERAikUR6SDg=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
|
||||
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
||||
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/vektah/gqlparser/v2 v2.5.30 h1:EqLwGAFLIzt1wpx1IPpY67DwUujF1OfzgEyDsLrN6kE=
|
||||
|
||||
@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"log"
|
||||
"tailly_back_v2/internal/domain"
|
||||
"tailly_back_v2/internal/http/middleware"
|
||||
"tailly_back_v2/proto"
|
||||
"time"
|
||||
)
|
||||
@ -18,6 +19,17 @@ type subscriptionResolver struct{ *Resolver }
|
||||
|
||||
// CreateChat is the resolver for the createChat field.
|
||||
func (r *mutationResolver) CreateChat(ctx context.Context, user1Id int, user2Id int) (*domain.Chat, error) {
|
||||
start := time.Now()
|
||||
success := false
|
||||
|
||||
defer func() {
|
||||
duration := time.Since(start)
|
||||
middleware.IncGQLOperation("mutation", "createChat", success, duration)
|
||||
if !success {
|
||||
middleware.IncError("gql_operation", "createChat", "error")
|
||||
}
|
||||
}()
|
||||
|
||||
// Просто вызываем gRPC метод, вся логика уже там
|
||||
res, err := r.MessageClient.CreateChat(ctx, &proto.CreateChatRequest{
|
||||
User1Id: int32(user1Id),
|
||||
@ -27,12 +39,24 @@ func (r *mutationResolver) CreateChat(ctx context.Context, user1Id int, user2Id
|
||||
return nil, fmt.Errorf("failed to create chat: %w", err)
|
||||
}
|
||||
|
||||
success = true
|
||||
// Преобразуем proto-чат в domain-модель
|
||||
return protoChatToDomain(res.Chat), nil
|
||||
}
|
||||
|
||||
// SendMessage реализация мутации для отправки сообщения
|
||||
func (r *mutationResolver) SendMessage(ctx context.Context, chatID int, content string) (*domain.Message, error) {
|
||||
start := time.Now()
|
||||
success := false
|
||||
|
||||
defer func() {
|
||||
duration := time.Since(start)
|
||||
middleware.IncGQLOperation("mutation", "sendMessage", success, duration)
|
||||
if !success {
|
||||
middleware.IncError("gql_operation", "sendMessage", "error")
|
||||
}
|
||||
}()
|
||||
|
||||
// Получаем senderID из контекста
|
||||
senderID, err := getUserIDFromContext(ctx)
|
||||
if err != nil {
|
||||
@ -47,11 +71,25 @@ func (r *mutationResolver) SendMessage(ctx context.Context, chatID int, content
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to send message: %w", err)
|
||||
}
|
||||
|
||||
success = true
|
||||
middleware.IncWebSocketMessage("outgoing", "chat_message")
|
||||
return protoMessageToDomain(res.Message), nil
|
||||
}
|
||||
|
||||
// UpdateMessageStatus реализация мутации для обновления статуса сообщения
|
||||
func (r *mutationResolver) UpdateMessageStatus(ctx context.Context, messageID int, status MessageStatus) (*domain.Message, error) {
|
||||
start := time.Now()
|
||||
success := false
|
||||
|
||||
defer func() {
|
||||
duration := time.Since(start)
|
||||
middleware.IncGQLOperation("mutation", "updateMessageStatus", success, duration)
|
||||
if !success {
|
||||
middleware.IncError("gql_operation", "updateMessageStatus", "error")
|
||||
}
|
||||
}()
|
||||
|
||||
var statusStr string
|
||||
switch status {
|
||||
case MessageStatusSent:
|
||||
@ -72,11 +110,23 @@ func (r *mutationResolver) UpdateMessageStatus(ctx context.Context, messageID in
|
||||
return nil, fmt.Errorf("failed to update message status: %w", err)
|
||||
}
|
||||
|
||||
success = true
|
||||
return protoMessageToDomain(res.Message), nil
|
||||
}
|
||||
|
||||
// GetChat реализация запроса для получения чата
|
||||
func (r *queryResolver) GetChat(ctx context.Context, user1Id int, user2Id int) (*domain.Chat, error) {
|
||||
start := time.Now()
|
||||
success := false
|
||||
|
||||
defer func() {
|
||||
duration := time.Since(start)
|
||||
middleware.IncGQLOperation("query", "getChat", success, duration)
|
||||
if !success {
|
||||
middleware.IncError("gql_operation", "getChat", "error")
|
||||
}
|
||||
}()
|
||||
|
||||
res, err := r.MessageClient.GetChat(ctx, &proto.GetChatRequest{
|
||||
User1Id: int32(user1Id),
|
||||
User2Id: int32(user2Id),
|
||||
@ -84,11 +134,24 @@ func (r *queryResolver) GetChat(ctx context.Context, user1Id int, user2Id int) (
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get chat: %w", err)
|
||||
}
|
||||
|
||||
success = true
|
||||
return protoChatToDomain(res.Chat), nil
|
||||
}
|
||||
|
||||
// GetChatMessages реализация запроса для получения сообщений чата
|
||||
func (r *queryResolver) GetChatMessages(ctx context.Context, chatID int, limit int, offset int) ([]*domain.Message, error) {
|
||||
start := time.Now()
|
||||
success := false
|
||||
|
||||
defer func() {
|
||||
duration := time.Since(start)
|
||||
middleware.IncGQLOperation("query", "getChatMessages", success, duration)
|
||||
if !success {
|
||||
middleware.IncError("gql_operation", "getChatMessages", "error")
|
||||
}
|
||||
}()
|
||||
|
||||
res, err := r.MessageClient.GetChatMessages(ctx, &proto.GetChatMessagesRequest{
|
||||
ChatId: int32(chatID),
|
||||
Limit: int32(limit),
|
||||
@ -97,30 +160,58 @@ func (r *queryResolver) GetChatMessages(ctx context.Context, chatID int, limit i
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get chat messages: %w", err)
|
||||
}
|
||||
|
||||
var messages []*domain.Message
|
||||
for _, msg := range res.Messages {
|
||||
messages = append(messages, protoMessageToDomain(msg))
|
||||
}
|
||||
|
||||
success = true
|
||||
return messages, nil
|
||||
}
|
||||
|
||||
// GetUserChats реализация запроса для получения чатов пользователя
|
||||
func (r *queryResolver) GetUserChats(ctx context.Context, userID int) ([]*domain.Chat, error) {
|
||||
start := time.Now()
|
||||
success := false
|
||||
|
||||
defer func() {
|
||||
duration := time.Since(start)
|
||||
middleware.IncGQLOperation("query", "getUserChats", success, duration)
|
||||
if !success {
|
||||
middleware.IncError("gql_operation", "getUserChats", "error")
|
||||
}
|
||||
}()
|
||||
|
||||
res, err := r.MessageClient.GetUserChats(ctx, &proto.GetUserChatsRequest{
|
||||
UserId: int32(userID),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get user chats: %w", err)
|
||||
}
|
||||
|
||||
var chats []*domain.Chat
|
||||
for _, chat := range res.Chats {
|
||||
chats = append(chats, protoChatToDomain(chat))
|
||||
}
|
||||
|
||||
success = true
|
||||
return chats, nil
|
||||
}
|
||||
|
||||
// MessageStream реализация подписки на новые сообщения
|
||||
func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<-chan *domain.Message, error) {
|
||||
start := time.Now()
|
||||
success := false
|
||||
|
||||
defer func() {
|
||||
duration := time.Since(start)
|
||||
middleware.IncGQLOperation("subscription", "messageStream", success, duration)
|
||||
if !success {
|
||||
middleware.IncError("gql_operation", "messageStream", "error")
|
||||
}
|
||||
}()
|
||||
|
||||
messageChan := make(chan *domain.Message, 100)
|
||||
|
||||
go func() {
|
||||
@ -136,12 +227,14 @@ func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<
|
||||
err := r.runMessageStream(ctx, userID, messageChan)
|
||||
if err != nil {
|
||||
log.Printf("MessageStream error: %v, reconnecting...", err)
|
||||
middleware.IncError("websocket_stream", "messageStream", "warning")
|
||||
}
|
||||
time.Sleep(2 * time.Second) // Задержка перед переподключением
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
success = true
|
||||
return messageChan, nil
|
||||
}
|
||||
|
||||
@ -155,6 +248,7 @@ func (r *subscriptionResolver) runMessageStream(ctx context.Context, userID int,
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("Failed to mark messages as delivered: %v", err)
|
||||
middleware.IncError("message_status", "updateStatus", "warning")
|
||||
}
|
||||
|
||||
streamCtx, cancel := context.WithCancel(ctx)
|
||||
@ -175,6 +269,7 @@ func (r *subscriptionResolver) runMessageStream(ctx context.Context, userID int,
|
||||
case <-heartbeat.C:
|
||||
// Отправляем ping для поддержания соединения
|
||||
if err := stream.Context().Err(); err != nil {
|
||||
middleware.IncError("websocket_heartbeat", "messageStream", "error")
|
||||
return fmt.Errorf("connection lost: %w", err)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
@ -182,6 +277,7 @@ func (r *subscriptionResolver) runMessageStream(ctx context.Context, userID int,
|
||||
default:
|
||||
msg, err := stream.Recv()
|
||||
if err != nil {
|
||||
middleware.IncError("websocket_receive", "messageStream", "error")
|
||||
return fmt.Errorf("receive error: %w", err)
|
||||
}
|
||||
|
||||
@ -189,6 +285,7 @@ func (r *subscriptionResolver) runMessageStream(ctx context.Context, userID int,
|
||||
select {
|
||||
case messageChan <- protoMessageToDomain(msg.Message):
|
||||
log.Printf("Delivered message %d to user %d", msg.Message.Id, userID)
|
||||
middleware.IncWebSocketMessage("incoming", "chat_message")
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -10,29 +10,128 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
// HTTP метрики
|
||||
httpRequestsTotal = promauto.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "http_requests_total",
|
||||
Help: "Total number of HTTP requests",
|
||||
},
|
||||
[]string{"method", "path", "status"},
|
||||
[]string{"method", "path", "status", "handler"},
|
||||
)
|
||||
|
||||
httpRequestDuration = promauto.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "http_request_duration_seconds",
|
||||
Help: "Duration of HTTP requests",
|
||||
Buckets: []float64{0.1, 0.5, 1, 2.5, 5, 10},
|
||||
Buckets: []float64{0.01, 0.05, 0.1, 0.5, 1, 2.5, 5, 10},
|
||||
},
|
||||
[]string{"method", "path"},
|
||||
[]string{"method", "path", "handler"},
|
||||
)
|
||||
|
||||
httpResponseSize = promauto.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Name: "http_response_size_bytes",
|
||||
Help: "Size of HTTP responses",
|
||||
// GraphQL специфичные метрики
|
||||
gqlOperationsTotal = promauto.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "graphql_operations_total",
|
||||
Help: "Total number of GraphQL operations",
|
||||
},
|
||||
[]string{"method", "path"},
|
||||
[]string{"operation", "type", "name", "success"},
|
||||
)
|
||||
|
||||
gqlOperationDuration = promauto.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "graphql_operation_duration_seconds",
|
||||
Help: "Duration of GraphQL operations",
|
||||
Buckets: []float64{0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 2, 5},
|
||||
},
|
||||
[]string{"operation", "type", "name"},
|
||||
)
|
||||
|
||||
// Бизнес метрики
|
||||
usersTotal = promauto.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "users_total",
|
||||
Help: "Total number of registered users",
|
||||
},
|
||||
)
|
||||
|
||||
postsTotal = promauto.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "posts_total",
|
||||
Help: "Total number of posts",
|
||||
},
|
||||
)
|
||||
|
||||
commentsTotal = promauto.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "comments_total",
|
||||
Help: "Total number of comments",
|
||||
},
|
||||
)
|
||||
|
||||
messagesTotal = promauto.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "messages_total",
|
||||
Help: "Total number of messages",
|
||||
},
|
||||
)
|
||||
|
||||
activeWebsockets = promauto.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "websocket_connections_active",
|
||||
Help: "Number of active WebSocket connections",
|
||||
},
|
||||
)
|
||||
|
||||
websocketMessagesTotal = promauto.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "websocket_messages_total",
|
||||
Help: "Total number of WebSocket messages",
|
||||
},
|
||||
[]string{"direction", "type"},
|
||||
)
|
||||
|
||||
// Метрики ошибок
|
||||
errorsTotal = promauto.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "errors_total",
|
||||
Help: "Total number of errors by type",
|
||||
},
|
||||
[]string{"type", "source", "severity"},
|
||||
)
|
||||
|
||||
// Метрики базы данных
|
||||
dbQueriesTotal = promauto.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "db_queries_total",
|
||||
Help: "Total number of database queries",
|
||||
},
|
||||
[]string{"operation", "table", "success"},
|
||||
)
|
||||
|
||||
dbQueryDuration = promauto.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "db_query_duration_seconds",
|
||||
Help: "Duration of database queries",
|
||||
Buckets: []float64{0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 2},
|
||||
},
|
||||
[]string{"operation", "table"},
|
||||
)
|
||||
|
||||
// Метрики кэша
|
||||
cacheHitsTotal = promauto.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "cache_hits_total",
|
||||
Help: "Total number of cache hits",
|
||||
},
|
||||
[]string{"type", "name"},
|
||||
)
|
||||
|
||||
cacheMissesTotal = promauto.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "cache_misses_total",
|
||||
Help: "Total number of cache misses",
|
||||
},
|
||||
[]string{"type", "name"},
|
||||
)
|
||||
)
|
||||
|
||||
@ -47,21 +146,107 @@ func MetricsMiddleware(next http.Handler) http.Handler {
|
||||
duration := time.Since(start).Seconds()
|
||||
status := strconv.Itoa(rw.status)
|
||||
|
||||
// Регистрируем метрики
|
||||
// Определяем handler type
|
||||
handlerType := "http"
|
||||
if r.URL.Path == "/query" {
|
||||
handlerType = "graphql"
|
||||
} else if r.URL.Path == "/ws" {
|
||||
handlerType = "websocket"
|
||||
}
|
||||
|
||||
httpRequestsTotal.WithLabelValues(
|
||||
r.Method,
|
||||
r.URL.Path,
|
||||
status,
|
||||
handlerType,
|
||||
).Inc()
|
||||
|
||||
httpRequestDuration.WithLabelValues(
|
||||
r.Method,
|
||||
r.URL.Path,
|
||||
handlerType,
|
||||
).Observe(duration)
|
||||
|
||||
httpResponseSize.WithLabelValues(
|
||||
r.Method,
|
||||
r.URL.Path,
|
||||
).Observe(float64(rw.size))
|
||||
})
|
||||
}
|
||||
|
||||
// GraphQLMetricsMiddleware для отслеживания GraphQL операций
|
||||
func GraphQLMetricsMiddleware() func(http.Handler) http.Handler {
|
||||
return func(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path != "/query" {
|
||||
next.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
// Здесь можно парсить GraphQL запрос и извлекать информацию об операции
|
||||
// Для простоты пока пропускаем
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Вспомогательные функции для обновления метрик
|
||||
|
||||
func IncGQLOperation(operationType, operationName string, success bool, duration time.Duration) {
|
||||
status := "false"
|
||||
if success {
|
||||
status = "true"
|
||||
}
|
||||
|
||||
gqlOperationsTotal.WithLabelValues(
|
||||
operationType,
|
||||
operationName,
|
||||
status,
|
||||
).Inc()
|
||||
|
||||
gqlOperationDuration.WithLabelValues(
|
||||
operationType,
|
||||
operationName,
|
||||
).Observe(duration.Seconds())
|
||||
}
|
||||
|
||||
func IncWebSocketMessage(direction, messageType string) {
|
||||
websocketMessagesTotal.WithLabelValues(direction, messageType).Inc()
|
||||
}
|
||||
|
||||
func SetActiveWebsockets(count int) {
|
||||
activeWebsockets.Set(float64(count))
|
||||
}
|
||||
|
||||
func IncError(errorType, source, severity string) {
|
||||
errorsTotal.WithLabelValues(errorType, source, severity).Inc()
|
||||
}
|
||||
|
||||
func IncDBQuery(operation, table string, success bool, duration time.Duration) {
|
||||
status := "false"
|
||||
if success {
|
||||
status = "true"
|
||||
}
|
||||
|
||||
dbQueriesTotal.WithLabelValues(operation, table, status).Inc()
|
||||
dbQueryDuration.WithLabelValues(operation, table).Observe(duration.Seconds())
|
||||
}
|
||||
|
||||
func IncCacheHit(cacheType, cacheName string) {
|
||||
cacheHitsTotal.WithLabelValues(cacheType, cacheName).Inc()
|
||||
}
|
||||
|
||||
func IncCacheMiss(cacheType, cacheName string) {
|
||||
cacheMissesTotal.WithLabelValues(cacheType, cacheName).Inc()
|
||||
}
|
||||
|
||||
func SetUsersCount(count int) {
|
||||
usersTotal.Set(float64(count))
|
||||
}
|
||||
|
||||
func SetPostsCount(count int) {
|
||||
postsTotal.Set(float64(count))
|
||||
}
|
||||
|
||||
func SetCommentsCount(count int) {
|
||||
commentsTotal.Set(float64(count))
|
||||
}
|
||||
|
||||
func SetMessagesCount(count int) {
|
||||
messagesTotal.Set(float64(count))
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user