tailly_messages/server.go
2025-08-12 22:16:32 +03:00

362 lines
10 KiB
Go

package main
import (
"context"
"database/sql"
"fmt"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/segmentio/kafka-go"
"google.golang.org/grpc"
protobuf "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"log"
"net"
"tailly_messages/proto"
"time"
)
type server struct {
proto.UnimplementedMessageServiceServer
db *pgxpool.Pool
producer *kafka.Writer
}
func NewServer(db *pgxpool.Pool, producer *kafka.Writer) *server {
return &server{db: db, producer: producer}
}
func (s *server) CreateChat(ctx context.Context, req *proto.CreateChatRequest) (*proto.ChatResponse, error) {
log.Printf("CreateChat request received: user1_id=%d, user2_id=%d", req.GetUser1Id(), req.GetUser2Id())
user1, user2 := req.GetUser1Id(), req.GetUser2Id()
if user1 > user2 {
user1, user2 = user2, user1
}
// Проверка существования пользователей
var user1Exists, user2Exists bool
err := s.db.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM users WHERE id = $1)", user1).Scan(&user1Exists)
if err != nil {
log.Printf("Error checking user1 existence: %v", err)
return nil, fmt.Errorf("failed to check user existence")
}
err = s.db.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM users WHERE id = $1)", user2).Scan(&user2Exists)
if err != nil {
log.Printf("Error checking user2 existence: %v", err)
return nil, fmt.Errorf("failed to check user existence")
}
if !user1Exists || !user2Exists {
errMsg := fmt.Sprintf("One or both users don't exist: user1=%d (%t), user2=%d (%t)",
user1, user1Exists, user2, user2Exists)
log.Println(errMsg)
return nil, fmt.Errorf(errMsg)
}
var chat proto.Chat
var createdAt, updatedAt time.Time
// Проверяем, не существует ли уже чат
var chatExists bool
err = s.db.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM chats WHERE user1_id = $1 AND user2_id = $2)",
user1, user2).Scan(&chatExists)
if err != nil {
log.Printf("Error checking chat existence: %v", err)
return nil, fmt.Errorf("failed to check chat existence")
}
if chatExists {
log.Printf("Chat already exists between users %d and %d", user1, user2)
return s.GetChat(ctx, &proto.GetChatRequest{
User1Id: user1,
User2Id: user2,
})
}
// Создаем новый чат
err = s.db.QueryRow(ctx, `
INSERT INTO chats (user1_id, user2_id)
VALUES ($1, $2)
RETURNING id, user1_id, user2_id, created_at, updated_at
`, user1, user2).Scan(
&chat.Id, &chat.User1Id, &chat.User2Id, &createdAt, &updatedAt,
)
if err != nil {
log.Printf("Failed to create chat: %v", err)
return nil, fmt.Errorf("failed to create chat: %v", err)
}
log.Printf("Successfully created new chat: id=%d, user1_id=%d, user2_id=%d",
chat.Id, chat.User1Id, chat.User2Id)
chat.CreatedAt = timestamppb.New(createdAt)
chat.UpdatedAt = timestamppb.New(updatedAt)
return &proto.ChatResponse{Chat: &chat}, nil
}
func (s *server) SendMessage(ctx context.Context, req *proto.SendMessageRequest) (*proto.MessageResponse, error) {
// Проверяем, что отправитель является участником чата
var isParticipant bool
err := s.db.QueryRow(ctx, `
SELECT EXISTS(
SELECT 1 FROM chats
WHERE id = $1 AND (user1_id = $2 OR user2_id = $2)
)`, req.ChatId, req.SenderId).Scan(&isParticipant)
if err != nil {
return nil, fmt.Errorf("failed to check chat participation: %v", err)
}
if !isParticipant {
return nil, fmt.Errorf("user %d is not a participant of chat %d", req.SenderId, req.ChatId)
}
var message proto.Message
var createdAt time.Time
err = s.db.QueryRow(ctx, `
INSERT INTO messages (chat_id, sender_id, content)
VALUES ($1, $2, $3)
RETURNING id, chat_id, sender_id, content, status, created_at
`, req.ChatId, req.SenderId, req.Content).Scan(
&message.Id, &message.ChatId, &message.SenderId, &message.Content, &message.Status, &createdAt,
)
if err != nil {
return nil, err
}
message.CreatedAt = timestamppb.New(createdAt)
// Update chat's updated_at
_, err = s.db.Exec(ctx, `
UPDATE chats SET updated_at = NOW() WHERE id = $1
`, req.ChatId)
if err != nil {
return nil, err
}
// Publish to Kafka for real-time delivery
err = s.producer.WriteMessages(ctx, kafka.Message{
Key: []byte(string(req.ChatId)),
Value: []byte(message.String()),
})
if err != nil {
log.Printf("Failed to publish message to Kafka: %v", err)
}
return &proto.MessageResponse{Message: &message}, nil
}
func (s *server) GetChat(ctx context.Context, req *proto.GetChatRequest) (*proto.ChatResponse, error) {
var chat proto.Chat
var createdAt, updatedAt time.Time
var lastMessageID sql.NullInt32
var lastMessageContent sql.NullString
var lastMessageStatus sql.NullString
var lastMessageCreatedAt sql.NullTime
user1, user2 := req.User1Id, req.User2Id
if user1 > user2 {
user1, user2 = user2, user1
}
err := s.db.QueryRow(ctx, `
SELECT c.id, c.user1_id, c.user2_id, c.created_at, c.updated_at,
m.id, m.content, m.status, m.created_at
FROM chats c
LEFT JOIN messages m ON m.id = (
SELECT id FROM messages WHERE chat_id = c.id
ORDER BY created_at DESC LIMIT 1
)
WHERE c.user1_id = $1 AND c.user2_id = $2
`, user1, user2).Scan(
&chat.Id, &chat.User1Id, &chat.User2Id, &createdAt, &updatedAt,
&lastMessageID, &lastMessageContent, &lastMessageStatus, &lastMessageCreatedAt,
)
if err != nil {
return nil, err
}
chat.CreatedAt = timestamppb.New(createdAt)
chat.UpdatedAt = timestamppb.New(updatedAt)
if lastMessageID.Valid {
chat.LastMessage = &proto.Message{
Id: lastMessageID.Int32,
ChatId: chat.Id,
Content: lastMessageContent.String,
Status: lastMessageStatus.String,
CreatedAt: timestamppb.New(lastMessageCreatedAt.Time),
}
}
return &proto.ChatResponse{Chat: &chat}, nil
}
func (s *server) GetChatMessages(ctx context.Context, req *proto.GetChatMessagesRequest) (*proto.MessagesResponse, error) {
rows, err := s.db.Query(ctx, `
SELECT id, chat_id, sender_id, content, status, created_at
FROM messages
WHERE chat_id = $1
ORDER BY created_at DESC
LIMIT $2 OFFSET $3
`, req.ChatId, req.Limit, req.Offset)
if err != nil {
return nil, err
}
defer rows.Close()
var messages []*proto.Message
for rows.Next() {
var msg proto.Message
var createdAt time.Time
err := rows.Scan(
&msg.Id, &msg.ChatId, &msg.SenderId, &msg.Content, &msg.Status, &createdAt,
)
if err != nil {
return nil, err
}
msg.CreatedAt = timestamppb.New(createdAt)
messages = append(messages, &msg)
}
return &proto.MessagesResponse{Messages: messages}, nil
}
func (s *server) GetUserChats(ctx context.Context, req *proto.GetUserChatsRequest) (*proto.UserChatsResponse, error) {
rows, err := s.db.Query(ctx, `
SELECT c.id, c.user1_id, c.user2_id, c.created_at, c.updated_at,
m.id, m.content, m.status, m.created_at
FROM chats c
LEFT JOIN messages m ON m.id = (
SELECT id FROM messages WHERE chat_id = c.id
ORDER BY created_at DESC LIMIT 1
)
WHERE c.user1_id = $1 OR c.user2_id = $1
ORDER BY c.updated_at DESC
`, req.UserId)
if err != nil {
return nil, err
}
defer rows.Close()
var chats []*proto.Chat
for rows.Next() {
var chat proto.Chat
var createdAt, updatedAt time.Time
var lastMessageID sql.NullInt32
var lastMessageContent sql.NullString
var lastMessageStatus sql.NullString
var lastMessageCreatedAt sql.NullTime
err := rows.Scan(
&chat.Id, &chat.User1Id, &chat.User2Id, &createdAt, &updatedAt,
&lastMessageID, &lastMessageContent, &lastMessageStatus, &lastMessageCreatedAt,
)
if err != nil {
return nil, err
}
chat.CreatedAt = timestamppb.New(createdAt)
chat.UpdatedAt = timestamppb.New(updatedAt)
if lastMessageID.Valid {
chat.LastMessage = &proto.Message{
Id: lastMessageID.Int32,
ChatId: chat.Id,
Content: lastMessageContent.String,
Status: lastMessageStatus.String,
CreatedAt: timestamppb.New(lastMessageCreatedAt.Time),
}
}
chats = append(chats, &chat)
}
return &proto.UserChatsResponse{Chats: chats}, nil
}
func (s *server) UpdateMessageStatus(ctx context.Context, req *proto.UpdateMessageStatusRequest) (*proto.MessageResponse, error) {
var message proto.Message
var createdAt time.Time
err := s.db.QueryRow(ctx, `
UPDATE messages
SET status = $1
WHERE id = $2
RETURNING id, chat_id, sender_id, content, status, created_at
`, req.Status, req.MessageId).Scan(
&message.Id, &message.ChatId, &message.SenderId, &message.Content, &message.Status, &createdAt,
)
if err != nil {
return nil, err
}
message.CreatedAt = timestamppb.New(createdAt)
return &proto.MessageResponse{Message: &message}, nil
}
func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error {
// Create Kafka reader for this user's messages
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "user_messages",
Partition: 0,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
defer reader.Close()
for {
m, err := reader.ReadMessage(stream.Context())
if err != nil {
return err
}
// Check if message is for this user
if string(m.Key) == string(req.UserId) {
var message proto.Message
if err := protobuf.Unmarshal(m.Value, &message); err != nil {
log.Printf("Failed to unmarshal message: %v", err)
continue
}
if err := stream.Send(&proto.MessageResponse{Message: &message}); err != nil {
return err
}
}
}
}
func main() {
// Initialize database connection
pool, err := pgxpool.Connect(context.Background(), "postgres://tailly_v2:BBP%263XP956%26D8y6cYJ@79.174.89.104:15452/tailly_v2")
if err != nil {
log.Fatalf("Unable to connect to database: %v", err)
}
defer pool.Close()
// Initialize Kafka producer
producer := &kafka.Writer{
Addr: kafka.TCP("89.104.69.222:9092"),
Topic: "user_messages",
Balancer: &kafka.Hash{},
}
defer producer.Close()
// Create gRPC server
grpcServer := grpc.NewServer()
proto.RegisterMessageServiceServer(grpcServer, NewServer(pool, producer))
// Start server
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
log.Println("Server started on port 50051")
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}