From 5286a534cf95e011ea3399f6f367b15a835ea0df Mon Sep 17 00:00:00 2001 From: madipo2611 Date: Mon, 18 Aug 2025 14:16:16 +0300 Subject: [PATCH] =?UTF-8?q?v.0.0.3.5=20=D0=94=D0=BE=D0=B1=D0=B0=D0=B2?= =?UTF-8?q?=D0=BB=D0=B5=D0=BD=20=D0=B8=D0=BD=D1=82=D0=B5=D0=B3=D1=80=D0=B0?= =?UTF-8?q?=D1=86=D0=B8=D0=BE=D0=BD=D0=BD=D1=8B=D0=B9=20=D1=82=D0=B5=D1=81?= =?UTF-8?q?=D1=82,=20=D0=BF=D1=80=D0=BE=D0=B1=D0=BB=D0=B5=D0=BC=20=D0=BD?= =?UTF-8?q?=D0=B5=D1=82.=20=D0=98=D1=81=D0=BF=D1=80=D0=B0=D0=B2=D0=BB?= =?UTF-8?q?=D0=B5=D0=BD=D0=B0=20=D0=BF=D1=80=D0=BE=D0=B1=D0=BB=D0=B5=D0=BC?= =?UTF-8?q?=D0=B0=20=D1=81=D0=BE=20=D1=81=D1=82=D0=B0=D1=82=D1=83=D1=81?= =?UTF-8?q?=D0=B0=D0=BC=D0=B8=20=D0=B2=20=D0=91=D0=94.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 4 + go.sum | 7 + migrations/0001_initial_schema.up.sql | 2 +- qodana.yaml | 29 +++ server.go | 314 +++++++++++++++++++------- server_test.go | 201 +++++++++++++++++ 6 files changed, 473 insertions(+), 84 deletions(-) create mode 100644 qodana.yaml create mode 100644 server_test.go diff --git a/go.mod b/go.mod index dc333cc..fdd95f2 100644 --- a/go.mod +++ b/go.mod @@ -5,11 +5,13 @@ go 1.25rc2 require ( github.com/jackc/pgx/v4 v4.18.3 github.com/rabbitmq/amqp091-go v1.10.0 + github.com/stretchr/testify v1.8.1 google.golang.org/grpc v1.74.2 google.golang.org/protobuf v1.36.7 ) require ( + github.com/davecgh/go-spew v1.1.1 // indirect github.com/jackc/chunkreader/v2 v2.0.1 // indirect github.com/jackc/pgconn v1.14.3 // indirect github.com/jackc/pgio v1.0.0 // indirect @@ -18,9 +20,11 @@ require ( github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/pgtype v1.14.0 // indirect github.com/jackc/puddle v1.3.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect golang.org/x/crypto v0.38.0 // indirect golang.org/x/net v0.40.0 // indirect golang.org/x/sys v0.33.0 // indirect golang.org/x/text v0.25.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250528174236-200df99c418a // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 9732834..20fa377 100644 --- a/go.sum +++ b/go.sum @@ -76,9 +76,11 @@ github.com/jackc/puddle v1.3.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dv github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= @@ -109,11 +111,15 @@ github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6Mwd github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= @@ -207,6 +213,7 @@ google.golang.org/grpc v1.74.2/go.mod h1:CtQ+BGjaAIXHs/5YS3i473GqwBBa1zGQNevxdeB google.golang.org/protobuf v1.36.7 h1:IgrO7UwFQGJdRNXH/sQux4R1Dj1WAKcLElzeeRaXV2A= google.golang.org/protobuf v1.36.7/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s= diff --git a/migrations/0001_initial_schema.up.sql b/migrations/0001_initial_schema.up.sql index 3c8441d..4afc15e 100644 --- a/migrations/0001_initial_schema.up.sql +++ b/migrations/0001_initial_schema.up.sql @@ -14,7 +14,7 @@ CREATE TABLE messages ( sender_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, receiver_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, content TEXT NOT NULL, - status VARCHAR(20) NOT NULL DEFAULT 'sent' CHECK (status IN ('sent', 'delivered', 'read')), + status VARCHAR(20) NOT NULL DEFAULT 'SENT' CHECK (status IN ('SENT', 'DELIVERED', 'READ')), created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() ); -- Индексы для производительности diff --git a/qodana.yaml b/qodana.yaml new file mode 100644 index 0000000..9317905 --- /dev/null +++ b/qodana.yaml @@ -0,0 +1,29 @@ +#-------------------------------------------------------------------------------# +# Qodana analysis is configured by qodana.yaml file # +# https://www.jetbrains.com/help/qodana/qodana-yaml.html # +#-------------------------------------------------------------------------------# +version: "1.0" + +#Specify inspection profile for code analysis +profile: + name: qodana.starter + +#Enable inspections +#include: +# - name: + +#Disable inspections +#exclude: +# - name: +# paths: +# - + +#Execute shell command before Qodana execution (Applied in CI/CD pipeline) +#bootstrap: sh ./prepare-qodana.sh + +#Install IDE plugins before Qodana execution (Applied in CI/CD pipeline) +#plugins: +# - id: #(plugin id can be found at https://plugins.jetbrains.com) + +#Specify Qodana linter for analysis (Applied in CI/CD pipeline) +linter: jetbrains/qodana-go:2025.1 diff --git a/server.go b/server.go index 9f50b89..72ab089 100644 --- a/server.go +++ b/server.go @@ -4,16 +4,17 @@ import ( "context" "database/sql" "encoding/json" + "errors" "fmt" "github.com/jackc/pgx/v4/pgxpool" amqp "github.com/rabbitmq/amqp091-go" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - protobuf "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" "log" "net" + "os" "sync" "tailly_messages/proto" "time" @@ -24,61 +25,81 @@ type server struct { db *pgxpool.Pool rabbitConn *amqp.Connection mu sync.Mutex + logger *log.Logger } func NewServer(db *pgxpool.Pool, rabbitConn *amqp.Connection) *server { - return &server{db: db, rabbitConn: rabbitConn} + return &server{ + db: db, + rabbitConn: rabbitConn, + logger: log.New(os.Stdout, "MESSAGE_SERVICE: ", log.LstdFlags|log.Lmicroseconds|log.Lshortfile), + } +} + +func (s *server) logRequest(method string, req interface{}) { + s.logger.Printf("REQUEST: %s - %+v", method, req) +} + +func (s *server) logResponse(method string, resp interface{}, err error) { + if err != nil { + s.logger.Printf("RESPONSE ERROR: %s - %v", method, err) + } else { + s.logger.Printf("RESPONSE: %s - %+v", method, resp) + } } func (s *server) CreateChat(ctx context.Context, req *proto.CreateChatRequest) (*proto.ChatResponse, error) { - log.Printf("CreateChat request received: user1_id=%d, user2_id=%d", req.GetUser1Id(), req.GetUser2Id()) + s.logRequest("CreateChat", req) + defer func(start time.Time) { + s.logger.Printf("CreateChat execution time: %v", time.Since(start)) + }(time.Now()) user1, user2 := req.GetUser1Id(), req.GetUser2Id() if user1 > user2 { user1, user2 = user2, user1 } - // Проверка существования пользователей + s.logger.Printf("Checking user existence: user1=%d, user2=%d", user1, user2) var user1Exists, user2Exists bool err := s.db.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM users WHERE id = $1)", user1).Scan(&user1Exists) if err != nil { - log.Printf("Error checking user1 existence: %v", err) + s.logger.Printf("Error checking user1 existence: %v", err) return nil, fmt.Errorf("failed to check user existence") } err = s.db.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM users WHERE id = $1)", user2).Scan(&user2Exists) if err != nil { - log.Printf("Error checking user2 existence: %v", err) + s.logger.Printf("Error checking user2 existence: %v", err) return nil, fmt.Errorf("failed to check user existence") } if !user1Exists || !user2Exists { errMsg := fmt.Sprintf("One or both users don't exist: user1=%d (%t), user2=%d (%t)", user1, user1Exists, user2, user2Exists) - log.Println(errMsg) - return nil, fmt.Errorf(errMsg) + s.logger.Println(errMsg) + return nil, fmt.Errorf("%w", errors.New(errMsg)) // Оборачиваем ошибку } var chat proto.Chat var createdAt, updatedAt time.Time - // Проверяем, не существует ли уже чат + s.logger.Printf("Checking chat existence between users %d and %d", user1, user2) var chatExists bool err = s.db.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM chats WHERE user1_id = $1 AND user2_id = $2)", user1, user2).Scan(&chatExists) if err != nil { - log.Printf("Error checking chat existence: %v", err) + s.logger.Printf("Error checking chat existence: %v", err) return nil, fmt.Errorf("failed to check chat existence") } if chatExists { - log.Printf("Chat already exists between users %d and %d", user1, user2) + s.logger.Printf("Chat already exists between users %d and %d, returning existing chat", user1, user2) return s.GetChat(ctx, &proto.GetChatRequest{ User1Id: user1, User2Id: user2, }) } - // Создаем новый чат + s.logger.Printf("Creating new chat between users %d and %d", user1, user2) err = s.db.QueryRow(ctx, ` INSERT INTO chats (user1_id, user2_id) VALUES ($1, $2) @@ -87,38 +108,48 @@ func (s *server) CreateChat(ctx context.Context, req *proto.CreateChatRequest) ( &chat.Id, &chat.User1Id, &chat.User2Id, &createdAt, &updatedAt, ) if err != nil { - log.Printf("Failed to create chat: %v", err) + s.logger.Printf("Failed to create chat: %v", err) return nil, fmt.Errorf("failed to create chat: %v", err) } - log.Printf("Successfully created new chat: id=%d, user1_id=%d, user2_id=%d", + s.logger.Printf("Successfully created new chat: id=%d, user1_id=%d, user2_id=%d", chat.Id, chat.User1Id, chat.User2Id) chat.CreatedAt = timestamppb.New(createdAt) chat.UpdatedAt = timestamppb.New(updatedAt) - return &proto.ChatResponse{Chat: &chat}, nil + resp := &proto.ChatResponse{Chat: &chat} + s.logResponse("CreateChat", resp, nil) + return resp, nil } func (s *server) SendMessage(ctx context.Context, req *proto.SendMessageRequest) (*proto.MessageResponse, error) { - // Получаем информацию о чате (как в оригинале) + s.logRequest("SendMessage", req) + defer func(start time.Time) { + s.logger.Printf("SendMessage execution time: %v", time.Since(start)) + }(time.Now()) + + s.logger.Printf("Getting chat info for chat_id=%d", req.ChatId) var user1Id, user2Id int32 err := s.db.QueryRow(ctx, "SELECT user1_id, user2_id FROM chats WHERE id = $1", req.ChatId).Scan(&user1Id, &user2Id) if err != nil { + s.logger.Printf("Failed to get chat info: %v", err) return nil, fmt.Errorf("failed to get chat info: %v", err) } - // Определяем получателя var receiverId int32 if req.SenderId == user1Id { receiverId = user2Id } else if req.SenderId == user2Id { receiverId = user1Id } else { - return nil, fmt.Errorf("sender %d is not a participant of chat %d", req.SenderId, req.ChatId) + errMsg := fmt.Sprintf("sender %d is not a participant of chat %d", req.SenderId, req.ChatId) + s.logger.Println(errMsg) + return nil, errors.New(errMsg) // Используем errors.New } - // Создаем сообщение в БД + s.logger.Printf("Inserting message into database: chat_id=%d, sender_id=%d, receiver_id=%d", + req.ChatId, req.SenderId, receiverId) var message proto.Message var createdAt time.Time @@ -130,22 +161,26 @@ func (s *server) SendMessage(ctx context.Context, req *proto.SendMessageRequest) &message.Id, &message.ChatId, &message.SenderId, &message.ReceiverId, &message.Content, &message.Status, &createdAt, ) if err != nil { + s.logger.Printf("Failed to insert message: %v", err) return nil, err } message.CreatedAt = timestamppb.New(createdAt) - // Обновляем время чата + s.logger.Printf("Updating chat updated_at for chat_id=%d", req.ChatId) _, err = s.db.Exec(ctx, `UPDATE chats SET updated_at = NOW() WHERE id = $1`, req.ChatId) if err != nil { + s.logger.Printf("Failed to update chat timestamp: %v", err) return nil, err } + s.logger.Printf("Publishing message to RabbitMQ for user_id=%d", receiverId) var lastErr error for i := 0; i < 3; i++ { ch, err := s.rabbitConn.Channel() if err != nil { lastErr = fmt.Errorf("failed to open channel (attempt %d): %v", i+1, err) + s.logger.Printf("RabbitMQ channel error: %v", lastErr) time.Sleep(time.Second * time.Duration(i+1)) continue } @@ -153,6 +188,7 @@ func (s *server) SendMessage(ctx context.Context, req *proto.SendMessageRequest) queueName := fmt.Sprintf("user_%d_messages", receiverId) msgBytes, _ := json.Marshal(message) + s.logger.Printf("Publishing to queue %s: %s", queueName, string(msgBytes)) err = ch.PublishWithContext(ctx, "", // exchange queueName, // routing key @@ -161,37 +197,31 @@ func (s *server) SendMessage(ctx context.Context, req *proto.SendMessageRequest) amqp.Publishing{ ContentType: "application/json", Body: msgBytes, - DeliveryMode: amqp.Persistent, // Сохраняем сообщения на диск + DeliveryMode: amqp.Persistent, }) ch.Close() if err == nil { - return &proto.MessageResponse{Message: &message}, nil + s.logger.Printf("Successfully published message to queue %s", queueName) + resp := &proto.MessageResponse{Message: &message} + s.logResponse("SendMessage", resp, nil) + return resp, nil } lastErr = err + s.logger.Printf("Failed to publish message (attempt %d): %v", i+1, err) } - return nil, fmt.Errorf("failed to publish message after 3 attempts: %v", lastErr) + + errMsg := fmt.Errorf("failed to publish message after 3 attempts: %v", lastErr) + s.logResponse("SendMessage", nil, errMsg) + return nil, errMsg } -func mustMarshal(msg protobuf.Message) []byte { - data, err := protobuf.Marshal(msg) - if err != nil { - log.Fatalf("failed to marshal message: %v", err) - } - return data -} -func protoMessageToMap(msg *proto.Message) map[string]interface{} { - return map[string]interface{}{ - "id": msg.Id, - "chatId": msg.ChatId, - "senderId": msg.SenderId, - "receiverId": msg.ReceiverId, - "content": msg.Content, - "status": msg.Status, - "createdAt": msg.CreatedAt.AsTime().Format(time.RFC3339Nano), - } -} func (s *server) GetChat(ctx context.Context, req *proto.GetChatRequest) (*proto.ChatResponse, error) { + s.logRequest("GetChat", req) + defer func(start time.Time) { + s.logger.Printf("GetChat execution time: %v", time.Since(start)) + }(time.Now()) + var chat proto.Chat var createdAt, updatedAt time.Time var lastMessageID sql.NullInt32 @@ -204,6 +234,7 @@ func (s *server) GetChat(ctx context.Context, req *proto.GetChatRequest) (*proto user1, user2 = user2, user1 } + s.logger.Printf("Querying chat between users %d and %d", user1, user2) err := s.db.QueryRow(ctx, ` SELECT c.id, c.user1_id, c.user2_id, c.created_at, c.updated_at, m.id, m.content, m.status, m.created_at @@ -218,6 +249,7 @@ func (s *server) GetChat(ctx context.Context, req *proto.GetChatRequest) (*proto &lastMessageID, &lastMessageContent, &lastMessageStatus, &lastMessageCreatedAt, ) if err != nil { + s.logger.Printf("Failed to get chat: %v", err) return nil, err } @@ -232,20 +264,30 @@ func (s *server) GetChat(ctx context.Context, req *proto.GetChatRequest) (*proto Status: lastMessageStatus.String, CreatedAt: timestamppb.New(lastMessageCreatedAt.Time), } + s.logger.Printf("Found last message for chat %d: message_id=%d", chat.Id, lastMessageID.Int32) } - return &proto.ChatResponse{Chat: &chat}, nil + resp := &proto.ChatResponse{Chat: &chat} + s.logResponse("GetChat", resp, nil) + return resp, nil } func (s *server) GetChatMessages(ctx context.Context, req *proto.GetChatMessagesRequest) (*proto.MessagesResponse, error) { + s.logRequest("GetChatMessages", req) + defer func(start time.Time) { + s.logger.Printf("GetChatMessages execution time: %v", time.Since(start)) + }(time.Now()) + + s.logger.Printf("Querying messages for chat_id=%d, limit=%d, offset=%d", req.ChatId, req.Limit, req.Offset) rows, err := s.db.Query(ctx, ` - SELECT id, chat_id, sender_id, content, status, created_at + SELECT id, chat_id, sender_id, receiver_id, content, status, created_at FROM messages WHERE chat_id = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3 `, req.ChatId, req.Limit, req.Offset) if err != nil { + s.logger.Printf("Failed to query messages: %v", err) return nil, err } defer rows.Close() @@ -255,19 +297,34 @@ func (s *server) GetChatMessages(ctx context.Context, req *proto.GetChatMessages var msg proto.Message var createdAt time.Time err := rows.Scan( - &msg.Id, &msg.ChatId, &msg.SenderId, &msg.Content, &msg.Status, &createdAt, + &msg.Id, &msg.ChatId, &msg.SenderId, &msg.ReceiverId, &msg.Content, &msg.Status, &createdAt, ) if err != nil { + s.logger.Printf("Failed to scan message row: %v", err) return nil, err } msg.CreatedAt = timestamppb.New(createdAt) messages = append(messages, &msg) } - return &proto.MessagesResponse{Messages: messages}, nil + if err := rows.Err(); err != nil { + s.logger.Printf("Rows error: %v", err) + return nil, err + } + + s.logger.Printf("Retrieved %d messages for chat_id=%d", len(messages), req.ChatId) + resp := &proto.MessagesResponse{Messages: messages} + s.logResponse("GetChatMessages", resp, nil) + return resp, nil } func (s *server) GetUserChats(ctx context.Context, req *proto.GetUserChatsRequest) (*proto.UserChatsResponse, error) { + s.logRequest("GetUserChats", req) + defer func(start time.Time) { + s.logger.Printf("GetUserChats execution time: %v", time.Since(start)) + }(time.Now()) + + s.logger.Printf("Querying chats for user_id=%d", req.UserId) rows, err := s.db.Query(ctx, ` SELECT c.id, c.user1_id, c.user2_id, c.created_at, c.updated_at, m.id, m.content, m.status, m.created_at @@ -280,6 +337,7 @@ func (s *server) GetUserChats(ctx context.Context, req *proto.GetUserChatsReques ORDER BY c.updated_at DESC `, req.UserId) if err != nil { + s.logger.Printf("Failed to query user chats: %v", err) return nil, err } defer rows.Close() @@ -298,6 +356,7 @@ func (s *server) GetUserChats(ctx context.Context, req *proto.GetUserChatsReques &lastMessageID, &lastMessageContent, &lastMessageStatus, &lastMessageCreatedAt, ) if err != nil { + s.logger.Printf("Failed to scan chat row: %v", err) return nil, err } @@ -312,15 +371,30 @@ func (s *server) GetUserChats(ctx context.Context, req *proto.GetUserChatsReques Status: lastMessageStatus.String, CreatedAt: timestamppb.New(lastMessageCreatedAt.Time), } + s.logger.Printf("Found last message for chat %d: message_id=%d", chat.Id, lastMessageID.Int32) } chats = append(chats, &chat) } - return &proto.UserChatsResponse{Chats: chats}, nil + if err := rows.Err(); err != nil { + s.logger.Printf("Rows error: %v", err) + return nil, err + } + + s.logger.Printf("Retrieved %d chats for user_id=%d", len(chats), req.UserId) + resp := &proto.UserChatsResponse{Chats: chats} + s.logResponse("GetUserChats", resp, nil) + return resp, nil } func (s *server) UpdateMessageStatus(ctx context.Context, req *proto.UpdateMessageStatusRequest) (*proto.MessageResponse, error) { + s.logRequest("UpdateMessageStatus", req) + defer func(start time.Time) { + s.logger.Printf("UpdateMessageStatus execution time: %v", time.Since(start)) + }(time.Now()) + + s.logger.Printf("Updating message status: message_id=%d, status=%s", req.MessageId, req.Status) var message proto.Message var createdAt time.Time @@ -328,53 +402,71 @@ func (s *server) UpdateMessageStatus(ctx context.Context, req *proto.UpdateMessa UPDATE messages SET status = $1 WHERE id = $2 - RETURNING id, chat_id, sender_id, content, status, created_at + RETURNING id, chat_id, sender_id, receiver_id, content, status, created_at `, req.Status, req.MessageId).Scan( - &message.Id, &message.ChatId, &message.SenderId, &message.Content, &message.Status, &createdAt, + &message.Id, &message.ChatId, &message.SenderId, &message.ReceiverId, &message.Content, &message.Status, &createdAt, ) if err != nil { + s.logger.Printf("Failed to update message status: %v", err) return nil, err } message.CreatedAt = timestamppb.New(createdAt) - return &proto.MessageResponse{Message: &message}, nil + s.logger.Printf("Successfully updated message status: message_id=%d, new_status=%s", + message.Id, message.Status) + resp := &proto.MessageResponse{Message: &message} + s.logResponse("UpdateMessageStatus", resp, nil) + return resp, nil } func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error { + s.logRequest("StreamMessages", req) + defer func(start time.Time) { + s.logger.Printf("StreamMessages execution time: %v", time.Since(start)) + }(time.Now()) if req.UserId == 0 { - return status.Error(codes.InvalidArgument, "userID cannot be 0") + err := status.Error(codes.InvalidArgument, "userID cannot be 0") + s.logResponse("StreamMessages", nil, err) + return err } const maxRetries = 5 retryDelay := time.Second for i := 0; i < maxRetries; i++ { + s.logger.Printf("Starting stream attempt %d/%d for user %d", i+1, maxRetries, req.UserId) err := s.runStream(req, stream) if err == nil { return nil } - log.Printf("Stream error (attempt %d/%d): %v", i+1, maxRetries, err) + s.logger.Printf("Stream error (attempt %d/%d): %v", i+1, maxRetries, err) time.Sleep(retryDelay) retryDelay *= 2 } - return fmt.Errorf("max retries (%d) exceeded", maxRetries) + err := fmt.Errorf("max retries (%d) exceeded", maxRetries) + s.logResponse("StreamMessages", nil, err) + return err } func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error { ctx := stream.Context() queueName := fmt.Sprintf("user_%d_messages", req.UserId) + s.logger.Printf("Opening RabbitMQ channel for queue %s", queueName) ch, err := s.rabbitConn.Channel() if err != nil { return fmt.Errorf("failed to open channel: %v", err) } - defer ch.Close() + defer func() { + s.logger.Printf("Closing RabbitMQ channel for queue %s", queueName) + ch.Close() + }() - // Объявляем очередь с persistence + s.logger.Printf("Declaring queue %s with persistence", queueName) _, err = ch.QueueDeclare( queueName, true, // durable @@ -382,7 +474,8 @@ func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.Messag false, // exclusive false, // noWait amqp.Table{ - "x-message-ttl": int32(86400000), + "x-message-ttl": int32(86400000), // 24 hours + "x-expires": int32(86400000), // 24 hours "x-single-active-consumer": false, }, ) @@ -390,7 +483,7 @@ func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.Messag return fmt.Errorf("failed to declare queue: %v", err) } - // QoS для контроля скорости обработки + s.logger.Printf("Setting QoS for queue %s", queueName) err = ch.Qos( 1, // prefetch count 0, // prefetch size @@ -400,6 +493,7 @@ func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.Messag return fmt.Errorf("failed to set QoS: %v", err) } + s.logger.Printf("Starting consumer for queue %s", queueName) msgs, err := ch.Consume( queueName, "", // consumer @@ -413,100 +507,154 @@ func (s *server) runStream(req *proto.StreamMessagesRequest, stream proto.Messag return fmt.Errorf("failed to consume: %v", err) } - log.Printf("Starting message stream for user %d", req.UserId) - defer log.Printf("Stopping message stream for user %d", req.UserId) + s.logger.Printf("Starting message stream for user %d", req.UserId) + defer s.logger.Printf("Stopping message stream for user %d", req.UserId) + + heartbeat := time.NewTicker(30 * time.Second) + defer heartbeat.Stop() for { select { case <-ctx.Done(): + s.logger.Printf("Context canceled for user %d: %v", req.UserId, ctx.Err()) return nil + case <-heartbeat.C: + s.logger.Printf("Sending heartbeat for user %d", req.UserId) + // Send empty message as heartbeat + if err := stream.Send(&proto.MessageResponse{}); err != nil { + s.logger.Printf("Failed to send heartbeat: %v", err) + return err + } case d, ok := <-msgs: if !ok { + s.logger.Printf("Message channel closed for user %d", req.UserId) return fmt.Errorf("message channel closed") } + s.logger.Printf("Received message from RabbitMQ for user %d: %s", req.UserId, string(d.Body)) var msg proto.Message if err := json.Unmarshal(d.Body, &msg); err != nil { - log.Printf("Failed to unmarshal message: %v", err) + s.logger.Printf("Failed to unmarshal message: %v", err) d.Nack(false, false) // Отбрасываем некорректное сообщение continue } - log.Printf("Sending message to user %d: %+v", req.UserId, msg) + s.logger.Printf("Sending message to stream for user %d: %+v", req.UserId, msg) if err := stream.Send(&proto.MessageResponse{Message: &msg}); err != nil { + s.logger.Printf("Failed to send message to stream: %v", err) d.Nack(false, true) // Возвращаем в очередь при ошибке отправки return err } + s.logger.Printf("Acknowledging message for user %d", req.UserId) d.Ack(false) // Подтверждаем обработку } } } func main() { + logger := log.New(os.Stdout, "MAIN: ", log.LstdFlags|log.Lmicroseconds|log.Lshortfile) + logger.Println("Starting message service") + // Инициализация подключения к БД - pool, err := pgxpool.Connect(context.Background(), "postgres://tailly_v2:i0Oq%2675LA%26M612ceuy@79.174.89.104:15452/tailly_v2") + dbURL := "postgres://tailly_v2:i0Oq%2675LA%26M612ceuy@79.174.89.104:15452/tailly_v2" + logger.Printf("Connecting to database at %s", dbURL) + pool, err := pgxpool.Connect(context.Background(), dbURL) if err != nil { - log.Fatalf("Unable to connect to database: %v", err) + logger.Fatalf("Unable to connect to database: %v", err) } - defer pool.Close() + defer func() { + logger.Println("Closing database connection") + pool.Close() + }() // Инициализация подключения к RabbitMQ + rabbitURL := "amqp://tailly_rabbitmq:o2p2S80MPbl27LUU@89.104.69.222:5673/" + logger.Printf("Connecting to RabbitMQ at %s", rabbitURL) var rabbitConn *amqp.Connection for i := 0; i < 5; i++ { - rabbitConn, err = amqp.DialConfig("amqp://tailly_rabbitmq:o2p2S80MPbl27LUU@89.104.69.222:5673/", amqp.Config{ + rabbitConn, err = amqp.DialConfig(rabbitURL, amqp.Config{ Heartbeat: 10 * time.Second, Locale: "en_US", }) if err == nil { break } - log.Printf("Failed to connect to RabbitMQ (attempt %d): %v", i+1, err) + logger.Printf("Failed to connect to RabbitMQ (attempt %d): %v", i+1, err) time.Sleep(time.Second * time.Duration(i+1)) } if err != nil { - log.Fatalf("Failed to connect to RabbitMQ after 5 attempts: %v", err) + logger.Fatalf("Failed to connect to RabbitMQ after 5 attempts: %v", err) } - defer rabbitConn.Close() + defer func() { + logger.Println("Closing RabbitMQ connection") + rabbitConn.Close() + }() // Обработка событий соединения - go func(conn **amqp.Connection, mu *sync.Mutex) { + connMutex := &sync.Mutex{} + go func() { for { - reason, ok := <-(*conn).NotifyClose(make(chan *amqp.Error)) + reason, ok := <-rabbitConn.NotifyClose(make(chan *amqp.Error)) if !ok { - log.Println("RabbitMQ connection closed") + logger.Println("RabbitMQ connection closed normally") break } - log.Printf("RabbitMQ connection closed: %v", reason) + logger.Printf("RabbitMQ connection closed: %v", reason) // Попытка переподключения for i := 0; i < 5; i++ { time.Sleep(time.Second * time.Duration(i+1)) - newConn, err := amqp.Dial("amqp://tailly_rabbitmq:o2p2S80MPbl27LUU@89.104.69.222:5673/") + logger.Printf("Attempting to reconnect to RabbitMQ (attempt %d)", i+1) + newConn, err := amqp.DialConfig(rabbitURL, amqp.Config{ + Heartbeat: 10 * time.Second, + Locale: "en_US", + }) if err == nil { - mu.Lock() - *conn = newConn - mu.Unlock() - log.Println("Successfully reconnected to RabbitMQ") + connMutex.Lock() + rabbitConn = newConn + connMutex.Unlock() + logger.Println("Successfully reconnected to RabbitMQ") break } - log.Printf("Failed to reconnect to RabbitMQ (attempt %d): %v", i+1, err) + logger.Printf("Failed to reconnect to RabbitMQ (attempt %d): %v", i+1, err) } } - }(&rabbitConn, &sync.Mutex{}) + }() // Создаем gRPC сервер - grpcServer := grpc.NewServer() + grpcServer := grpc.NewServer( + grpc.UnaryInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + logger.Printf("Unary call: %s, request: %+v", info.FullMethod, req) + start := time.Now() + defer func() { + logger.Printf("Unary call %s completed in %v, response: %+v, error: %v", + info.FullMethod, time.Since(start), resp, err) + }() + return handler(ctx, req) + }), + grpc.StreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + logger.Printf("Stream call: %s", info.FullMethod) + start := time.Now() + defer func() { + logger.Printf("Stream call %s completed in %v", info.FullMethod, time.Since(start)) + }() + return handler(srv, ss) + }), + ) proto.RegisterMessageServiceServer(grpcServer, NewServer(pool, rabbitConn)) // Запускаем сервер - lis, err := net.Listen("tcp", ":50052") + port := ":50052" + logger.Printf("Starting gRPC server on port %s", port) + lis, err := net.Listen("tcp", port) if err != nil { - log.Fatalf("failed to listen: %v", err) + logger.Fatalf("failed to listen: %v", err) } - log.Println("Server started on port 50052") + + logger.Println("Server is ready to accept connections") if err := grpcServer.Serve(lis); err != nil { - log.Fatalf("failed to serve: %v", err) + logger.Fatalf("failed to serve: %v", err) } } diff --git a/server_test.go b/server_test.go new file mode 100644 index 0000000..63a81e1 --- /dev/null +++ b/server_test.go @@ -0,0 +1,201 @@ +package main + +import ( + "context" + "fmt" + "net" + "tailly_messages/proto" + "testing" + "time" + + "github.com/jackc/pgx/v4/pgxpool" + amqp "github.com/rabbitmq/amqp091-go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/test/bufconn" +) + +const bufSize = 1024 * 1024 + +func setupTestServer(t *testing.T) (*grpc.Server, *bufconn.Listener, *server) { + // Setup in-memory connection + lis := bufconn.Listen(bufSize) + s := grpc.NewServer() + + // Используем реальные подключения из server.go + dbURL := "postgres://tailly_v2:i0Oq%2675LA%26M612ceuy@79.174.89.104:15452/tailly_v2" + pool, err := pgxpool.Connect(context.Background(), dbURL) + require.NoError(t, err, "Failed to connect to test database") + + rabbitURL := "amqp://tailly_rabbitmq:o2p2S80MPbl27LUU@89.104.69.222:5673/" + rabbitConn, err := amqp.DialConfig(rabbitURL, amqp.Config{ + Heartbeat: 10 * time.Second, + Locale: "en_US", + }) + require.NoError(t, err, "Failed to connect to RabbitMQ") + + testServer := NewServer(pool, rabbitConn) + proto.RegisterMessageServiceServer(s, testServer) + + go func() { + if err := s.Serve(lis); err != nil { + t.Logf("Server exited with error: %v", err) + } + }() + + // Очищаем только тестовые данные после завершения тестов + t.Cleanup(func() { + // Удаляем только те данные, которые создали в тестах + _, _ = pool.Exec(context.Background(), "DELETE FROM messages WHERE content = 'test message'") + _, _ = pool.Exec(context.Background(), "DELETE FROM chats WHERE user1_id = 1 AND user2_id = 2") + }) + + return s, lis, testServer +} + +func TestCreateChat(t *testing.T) { + s, lis, _ := setupTestServer(t) + defer s.Stop() + + ctx := context.Background() + conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { + return lis.Dial() + }), grpc.WithInsecure()) + require.NoError(t, err) + defer conn.Close() + + client := proto.NewMessageServiceClient(conn) + + req := &proto.CreateChatRequest{ + User1Id: 1, + User2Id: 2, + } + + resp, err := client.CreateChat(ctx, req) + require.NoError(t, err) + require.NotNil(t, resp) + require.NotNil(t, resp.GetChat()) + assert.Equal(t, int32(1), resp.GetChat().GetUser1Id()) + assert.Equal(t, int32(2), resp.GetChat().GetUser2Id()) +} + +func TestSendAndStreamMessages(t *testing.T) { + s, lis, _ := setupTestServer(t) + defer s.Stop() + + ctx := context.Background() + conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { + return lis.Dial() + }), grpc.WithInsecure()) + require.NoError(t, err) + defer conn.Close() + + client := proto.NewMessageServiceClient(conn) + + // Сначала создаем чат + chatResp, err := client.CreateChat(ctx, &proto.CreateChatRequest{ + User1Id: 1, + User2Id: 2, + }) + require.NoError(t, err) + chatId := chatResp.GetChat().GetId() + + // Запускаем стрим + streamReq := &proto.StreamMessagesRequest{UserId: 2} + stream, err := client.StreamMessages(ctx, streamReq) + require.NoError(t, err) + + // Отправляем сообщение + sendReq := &proto.SendMessageRequest{ + ChatId: chatId, + SenderId: 1, + Content: "test message", + } + _, err = client.SendMessage(ctx, sendReq) + require.NoError(t, err) + + // Пытаемся получить сообщение через стрим + done := make(chan bool) + go func() { + resp, err := stream.Recv() + assert.NoError(t, err) + if resp.GetMessage() != nil { // Игнорируем heartbeat сообщения + assert.Equal(t, "test message", resp.GetMessage().GetContent()) + done <- true + } + }() + + select { + case <-done: + // Успех + case <-time.After(5 * time.Second): + t.Fatal("Timeout waiting for message") + } +} +func TestMultipleMessages(t *testing.T) { + s, lis, _ := setupTestServer(t) + defer s.Stop() + + ctx := context.Background() + conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { + return lis.Dial() + }), grpc.WithInsecure()) + require.NoError(t, err) + defer conn.Close() + + client := proto.NewMessageServiceClient(conn) + + // Создаем чат + chatResp, err := client.CreateChat(ctx, &proto.CreateChatRequest{ + User1Id: 1, + User2Id: 2, + }) + require.NoError(t, err) + chatId := chatResp.GetChat().GetId() + + // Запускаем стрим + streamReq := &proto.StreamMessagesRequest{UserId: 2} + stream, err := client.StreamMessages(ctx, streamReq) + require.NoError(t, err) + + // Отправляем 5 сообщений + for i := 0; i < 5; i++ { + sendReq := &proto.SendMessageRequest{ + ChatId: chatId, + SenderId: 1, + Content: fmt.Sprintf("test message %d", i+1), + } + _, err = client.SendMessage(ctx, sendReq) + require.NoError(t, err) + } + + // Проверяем получение всех сообщений + received := 0 + done := make(chan bool) + go func() { + for { + resp, err := stream.Recv() + if err != nil { + t.Logf("Stream error: %v", err) + break + } + if resp.GetMessage() != nil { + t.Logf("Received message: %s", resp.GetMessage().GetContent()) + received++ + if received >= 5 { + done <- true + break + } + } + } + }() + + select { + case <-done: + // Успех + assert.Equal(t, 5, received) + case <-time.After(10 * time.Second): + t.Fatal("Timeout waiting for messages") + } +}