package main import ( "context" "database/sql" "fmt" "github.com/jackc/pgx/v4/pgxpool" "github.com/segmentio/kafka-go" "google.golang.org/grpc" protobuf "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" "log" "net" "tailly_messages/proto" "time" ) type server struct { proto.UnimplementedMessageServiceServer db *pgxpool.Pool producer *kafka.Writer } func NewServer(db *pgxpool.Pool, producer *kafka.Writer) *server { return &server{db: db, producer: producer} } func (s *server) CreateChat(ctx context.Context, req *proto.CreateChatRequest) (*proto.ChatResponse, error) { log.Printf("CreateChat request received: user1_id=%d, user2_id=%d", req.GetUser1Id(), req.GetUser2Id()) user1, user2 := req.GetUser1Id(), req.GetUser2Id() if user1 > user2 { user1, user2 = user2, user1 } // Проверка существования пользователей var user1Exists, user2Exists bool err := s.db.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM users WHERE id = $1)", user1).Scan(&user1Exists) if err != nil { log.Printf("Error checking user1 existence: %v", err) return nil, fmt.Errorf("failed to check user existence") } err = s.db.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM users WHERE id = $1)", user2).Scan(&user2Exists) if err != nil { log.Printf("Error checking user2 existence: %v", err) return nil, fmt.Errorf("failed to check user existence") } if !user1Exists || !user2Exists { errMsg := fmt.Sprintf("One or both users don't exist: user1=%d (%t), user2=%d (%t)", user1, user1Exists, user2, user2Exists) log.Println(errMsg) return nil, fmt.Errorf(errMsg) } var chat proto.Chat var createdAt, updatedAt time.Time // Проверяем, не существует ли уже чат var chatExists bool err = s.db.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM chats WHERE user1_id = $1 AND user2_id = $2)", user1, user2).Scan(&chatExists) if err != nil { log.Printf("Error checking chat existence: %v", err) return nil, fmt.Errorf("failed to check chat existence") } if chatExists { log.Printf("Chat already exists between users %d and %d", user1, user2) return s.GetChat(ctx, &proto.GetChatRequest{ User1Id: user1, User2Id: user2, }) } // Создаем новый чат err = s.db.QueryRow(ctx, ` INSERT INTO chats (user1_id, user2_id) VALUES ($1, $2) RETURNING id, user1_id, user2_id, created_at, updated_at `, user1, user2).Scan( &chat.Id, &chat.User1Id, &chat.User2Id, &createdAt, &updatedAt, ) if err != nil { log.Printf("Failed to create chat: %v", err) return nil, fmt.Errorf("failed to create chat: %v", err) } log.Printf("Successfully created new chat: id=%d, user1_id=%d, user2_id=%d", chat.Id, chat.User1Id, chat.User2Id) chat.CreatedAt = timestamppb.New(createdAt) chat.UpdatedAt = timestamppb.New(updatedAt) return &proto.ChatResponse{Chat: &chat}, nil } func (s *server) SendMessage(ctx context.Context, req *proto.SendMessageRequest) (*proto.MessageResponse, error) { var message proto.Message var createdAt time.Time err := s.db.QueryRow(ctx, ` INSERT INTO messages (chat_id, sender_id, content) VALUES ($1, $2, $3) RETURNING id, chat_id, sender_id, content, status, created_at `, req.ChatId, req.SenderId, req.Content).Scan( &message.Id, &message.ChatId, &message.SenderId, &message.Content, &message.Status, &createdAt, ) if err != nil { return nil, err } message.CreatedAt = timestamppb.New(createdAt) // Update chat's updated_at _, err = s.db.Exec(ctx, ` UPDATE chats SET updated_at = NOW() WHERE id = $1 `, req.ChatId) if err != nil { return nil, err } // Publish to Kafka for real-time delivery err = s.producer.WriteMessages(ctx, kafka.Message{ Key: []byte(string(req.ChatId)), Value: []byte(message.String()), }) if err != nil { log.Printf("Failed to publish message to Kafka: %v", err) } return &proto.MessageResponse{Message: &message}, nil } func (s *server) GetChat(ctx context.Context, req *proto.GetChatRequest) (*proto.ChatResponse, error) { var chat proto.Chat var createdAt, updatedAt time.Time var lastMessageID sql.NullInt32 var lastMessageContent sql.NullString var lastMessageStatus sql.NullString var lastMessageCreatedAt sql.NullTime user1, user2 := req.User1Id, req.User2Id if user1 > user2 { user1, user2 = user2, user1 } err := s.db.QueryRow(ctx, ` SELECT c.id, c.user1_id, c.user2_id, c.created_at, c.updated_at, m.id, m.content, m.status, m.created_at FROM chats c LEFT JOIN messages m ON m.id = ( SELECT id FROM messages WHERE chat_id = c.id ORDER BY created_at DESC LIMIT 1 ) WHERE c.user1_id = $1 AND c.user2_id = $2 `, user1, user2).Scan( &chat.Id, &chat.User1Id, &chat.User2Id, &createdAt, &updatedAt, &lastMessageID, &lastMessageContent, &lastMessageStatus, &lastMessageCreatedAt, ) if err != nil { return nil, err } chat.CreatedAt = timestamppb.New(createdAt) chat.UpdatedAt = timestamppb.New(updatedAt) if lastMessageID.Valid { chat.LastMessage = &proto.Message{ Id: lastMessageID.Int32, ChatId: chat.Id, Content: lastMessageContent.String, Status: lastMessageStatus.String, CreatedAt: timestamppb.New(lastMessageCreatedAt.Time), } } return &proto.ChatResponse{Chat: &chat}, nil } func (s *server) GetChatMessages(ctx context.Context, req *proto.GetChatMessagesRequest) (*proto.MessagesResponse, error) { rows, err := s.db.Query(ctx, ` SELECT id, chat_id, sender_id, content, status, created_at FROM messages WHERE chat_id = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3 `, req.ChatId, req.Limit, req.Offset) if err != nil { return nil, err } defer rows.Close() var messages []*proto.Message for rows.Next() { var msg proto.Message var createdAt time.Time err := rows.Scan( &msg.Id, &msg.ChatId, &msg.SenderId, &msg.Content, &msg.Status, &createdAt, ) if err != nil { return nil, err } msg.CreatedAt = timestamppb.New(createdAt) messages = append(messages, &msg) } return &proto.MessagesResponse{Messages: messages}, nil } func (s *server) GetUserChats(ctx context.Context, req *proto.GetUserChatsRequest) (*proto.UserChatsResponse, error) { rows, err := s.db.Query(ctx, ` SELECT c.id, c.user1_id, c.user2_id, c.created_at, c.updated_at, m.id, m.content, m.status, m.created_at FROM chats c LEFT JOIN messages m ON m.id = ( SELECT id FROM messages WHERE chat_id = c.id ORDER BY created_at DESC LIMIT 1 ) WHERE c.user1_id = $1 OR c.user2_id = $1 ORDER BY c.updated_at DESC `, req.UserId) if err != nil { return nil, err } defer rows.Close() var chats []*proto.Chat for rows.Next() { var chat proto.Chat var createdAt, updatedAt time.Time var lastMessageID sql.NullInt32 var lastMessageContent sql.NullString var lastMessageStatus sql.NullString var lastMessageCreatedAt sql.NullTime err := rows.Scan( &chat.Id, &chat.User1Id, &chat.User2Id, &createdAt, &updatedAt, &lastMessageID, &lastMessageContent, &lastMessageStatus, &lastMessageCreatedAt, ) if err != nil { return nil, err } chat.CreatedAt = timestamppb.New(createdAt) chat.UpdatedAt = timestamppb.New(updatedAt) if lastMessageID.Valid { chat.LastMessage = &proto.Message{ Id: lastMessageID.Int32, ChatId: chat.Id, Content: lastMessageContent.String, Status: lastMessageStatus.String, CreatedAt: timestamppb.New(lastMessageCreatedAt.Time), } } chats = append(chats, &chat) } return &proto.UserChatsResponse{Chats: chats}, nil } func (s *server) UpdateMessageStatus(ctx context.Context, req *proto.UpdateMessageStatusRequest) (*proto.MessageResponse, error) { var message proto.Message var createdAt time.Time err := s.db.QueryRow(ctx, ` UPDATE messages SET status = $1 WHERE id = $2 RETURNING id, chat_id, sender_id, content, status, created_at `, req.Status, req.MessageId).Scan( &message.Id, &message.ChatId, &message.SenderId, &message.Content, &message.Status, &createdAt, ) if err != nil { return nil, err } message.CreatedAt = timestamppb.New(createdAt) return &proto.MessageResponse{Message: &message}, nil } func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error { // Create Kafka reader for this user's messages reader := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092"}, Topic: "user_messages", Partition: 0, MinBytes: 10e3, // 10KB MaxBytes: 10e6, // 10MB }) defer reader.Close() for { m, err := reader.ReadMessage(stream.Context()) if err != nil { return err } // Check if message is for this user if string(m.Key) == string(req.UserId) { var message proto.Message if err := protobuf.Unmarshal(m.Value, &message); err != nil { log.Printf("Failed to unmarshal message: %v", err) continue } if err := stream.Send(&proto.MessageResponse{Message: &message}); err != nil { return err } } } } func main() { // Initialize database connection pool, err := pgxpool.Connect(context.Background(), "postgres://tailly_v2:BBP%263XP956%26D8y6cYJ@79.174.89.104:15452/tailly_v2") if err != nil { log.Fatalf("Unable to connect to database: %v", err) } defer pool.Close() // Initialize Kafka producer producer := &kafka.Writer{ Addr: kafka.TCP("89.104.69.222:9092"), Topic: "user_messages", Balancer: &kafka.Hash{}, } defer producer.Close() // Create gRPC server grpcServer := grpc.NewServer() proto.RegisterMessageServiceServer(grpcServer, NewServer(pool, producer)) // Start server lis, err := net.Listen("tcp", ":50051") if err != nil { log.Fatalf("failed to listen: %v", err) } log.Println("Server started on port 50051") if err := grpcServer.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } }