package main import ( "context" "log" "net" "os" "tailly_subscribers/proto" "time" "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/status" ) type server struct { proto.UnimplementedSubscribeServiceServer db *pgxpool.Pool logger *log.Logger } func NewServer(db *pgxpool.Pool) *server { return &server{ db: db, logger: log.New(os.Stdout, "SUBSCRIBE_SERVICE: ", log.LstdFlags|log.Lmicroseconds|log.Lshortfile), } } func (s *server) FollowUser(ctx context.Context, req *proto.FollowRequest) (*proto.FollowResponse, error) { follower, following := req.FollowerId, req.FollowingId if follower == following { return nil, status.Error(codes.InvalidArgument, "cannot follow yourself") } var subscriptionID int32 err := s.db.QueryRow(ctx, ` INSERT INTO subscriptions (follower_id, following_id) VALUES ($1, $2) ON CONFLICT (follower_id, following_id) DO NOTHING RETURNING id`, follower, following).Scan(&subscriptionID) if err != nil { if err == pgx.ErrNoRows { return &proto.FollowResponse{ Success: false, Message: "Already following this user", }, nil } return nil, status.Errorf(codes.Internal, "failed to follow: %v", err) } var notificationID int32 err = s.db.QueryRow(ctx, ` INSERT INTO subscription_notifications (subscription_id) VALUES ($1) RETURNING id`, subscriptionID).Scan(¬ificationID) if err != nil { s.logger.Printf("Warning: failed to create notification for subscription %d: %v", subscriptionID, err) } else { s.logger.Printf("Created notification %d for subscription %d", notificationID, subscriptionID) } return &proto.FollowResponse{ Success: true, Message: "Successfully followed user", SubscriptionId: subscriptionID, }, nil } func (s *server) UnfollowUser(ctx context.Context, req *proto.UnfollowRequest) (*proto.UnfollowResponse, error) { follower, following := req.FollowerId, req.FollowingId if follower == 0 || following == 0 { return nil, status.Errorf(codes.InvalidArgument, "Переданы пустые id follower или following, %d, %d", follower, following) } var userExists bool err := s.db.QueryRow(ctx, ` SELECT EXISTS(SELECT 1 FROM users WHERE id IN ($1, $2))`, follower, following).Scan(&userExists) if !userExists { return nil, status.Errorf(codes.NotFound, "one or both users not found, %v", err) } result, err := s.db.Exec(ctx, ` DELETE FROM subscriptions WHERE follower_id = $1 AND following_id = $2`, follower, following) if result.RowsAffected() == 0 { return &proto.UnfollowResponse{ Success: false, Message: "Subscription not found", }, nil } return &proto.UnfollowResponse{ Success: true, Message: "Successfully unfollowed user", }, nil } func (s *server) GetFollowers(ctx context.Context, req *proto.GetFollowersRequest) (*proto.GetFollowersResponse, error) { if req.UserId == 0 { return nil, status.Error(codes.InvalidArgument, "user_id is required") } // Получаем общее количество подписчиков var totalCount int32 err := s.db.QueryRow(ctx, ` SELECT COUNT(*) FROM subscriptions WHERE following_id = $1`, req.UserId).Scan(&totalCount) if err != nil { return nil, status.Errorf(codes.Internal, "failed to get followers count: %v", err) } // Получаем список подписчиков с пагинацией rows, err := s.db.Query(ctx, ` SELECT u.id, u.username, u.avatar FROM subscriptions s JOIN users u ON s.follower_id = u.id WHERE s.following_id = $1 ORDER BY s.created_at DESC LIMIT $2 OFFSET $3`, req.UserId, req.Limit, req.Offset) if err != nil { return nil, status.Errorf(codes.Internal, "failed to get followers: %v", err) } defer rows.Close() var followers []*proto.GetFollowersResponse_Follower for rows.Next() { var follower proto.GetFollowersResponse_Follower if err := rows.Scan(&follower.UserId, &follower.Username, &follower.Avatar); err != nil { return nil, status.Errorf(codes.Internal, "failed to scan follower: %v", err) } followers = append(followers, &follower) } if err := rows.Err(); err != nil { return nil, status.Errorf(codes.Internal, "rows error: %v", err) } return &proto.GetFollowersResponse{ Followers: followers, TotalCount: totalCount, }, nil } func (s *server) GetFollowing(ctx context.Context, req *proto.GetFollowingRequest) (*proto.GetFollowingResponse, error) { if req.UserId == 0 { return nil, status.Error(codes.InvalidArgument, "user_id is required") } // Получаем общее количество подписок var totalCount int32 err := s.db.QueryRow(ctx, ` SELECT COUNT(*) FROM subscriptions WHERE follower_id = $1`, req.UserId).Scan(&totalCount) if err != nil { return nil, status.Errorf(codes.Internal, "failed to get following count: %v", err) } // Получаем список подписок с пагинацией rows, err := s.db.Query(ctx, ` SELECT u.id, u.username, u.avatar FROM subscriptions s JOIN users u ON s.following_id = u.id WHERE s.follower_id = $1 ORDER BY s.created_at DESC LIMIT $2 OFFSET $3`, req.UserId, req.Limit, req.Offset) if err != nil { return nil, status.Errorf(codes.Internal, "failed to get following: %v", err) } defer rows.Close() var following []*proto.GetFollowingResponse_Following for rows.Next() { var follow proto.GetFollowingResponse_Following if err := rows.Scan(&follow.UserId, &follow.Username, &follow.Avatar); err != nil { return nil, status.Errorf(codes.Internal, "failed to scan following: %v", err) } following = append(following, &follow) } if err := rows.Err(); err != nil { return nil, status.Errorf(codes.Internal, "rows error: %v", err) } return &proto.GetFollowingResponse{ Following: following, TotalCount: totalCount, }, nil } func (s *server) IsFollowing(ctx context.Context, req *proto.IsFollowingRequest) (*proto.IsFollowingResponse, error) { if req.FollowerId == 0 || req.FollowingId == 0 { return nil, status.Error(codes.InvalidArgument, "follower_id and following_id are required") } var isFollowing bool err := s.db.QueryRow(ctx, ` SELECT EXISTS( SELECT 1 FROM subscriptions WHERE follower_id = $1 AND following_id = $2 )`, req.FollowerId, req.FollowingId).Scan(&isFollowing) if err != nil { return nil, status.Errorf(codes.Internal, "failed to check subscription: %v", err) } return &proto.IsFollowingResponse{IsFollowing: isFollowing}, nil } func (s *server) GetFollowersCount(ctx context.Context, req *proto.GetCountRequest) (*proto.GetCountResponse, error) { if req.UserId == 0 { return nil, status.Error(codes.InvalidArgument, "user_id is required") } var count int32 err := s.db.QueryRow(ctx, ` SELECT COUNT(*) FROM subscriptions WHERE following_id = $1`, req.UserId).Scan(&count) if err != nil { return nil, status.Errorf(codes.Internal, "failed to get followers count: %v", err) } return &proto.GetCountResponse{Count: count}, nil } func (s *server) GetFollowingCount(ctx context.Context, req *proto.GetCountRequest) (*proto.GetCountResponse, error) { if req.UserId == 0 { return nil, status.Error(codes.InvalidArgument, "user_id is required") } var count int32 err := s.db.QueryRow(ctx, ` SELECT COUNT(*) FROM subscriptions WHERE follower_id = $1`, req.UserId).Scan(&count) if err != nil { return nil, status.Errorf(codes.Internal, "failed to get following count: %v", err) } return &proto.GetCountResponse{Count: count}, nil } func (s *server) GetSubscriptionNotifications(ctx context.Context, req *proto.GetNotificationsRequest) (*proto.GetNotificationsResponse, error) { if req.UserId == 0 { return nil, status.Error(codes.InvalidArgument, "user_id is required") } // Получаем общее количество уведомлений var totalCount, unreadCount int32 err := s.db.QueryRow(ctx, ` SELECT COUNT(*), COUNT(*) FILTER (WHERE NOT is_read) FROM subscription_notifications sn JOIN subscriptions s ON sn.subscription_id = s.id WHERE s.following_id = $1`, req.UserId).Scan(&totalCount, &unreadCount) if err != nil { return nil, status.Errorf(codes.Internal, "failed to get notifications count: %v", err) } // Базовый запрос query := ` SELECT sn.id, sn.subscription_id, s.follower_id, u.username, u.avatar, sn.is_read, sn.created_at, sn.created_at + INTERVAL '7 days' as expires_at FROM subscription_notifications sn JOIN subscriptions s ON sn.subscription_id = s.id JOIN users u ON s.follower_id = u.id WHERE s.following_id = $1` // Добавляем условие для непрочитанных if req.UnreadOnly { query += " AND NOT sn.is_read" } // Добавляем пагинацию query += " ORDER BY sn.created_at DESC LIMIT $2 OFFSET $3" rows, err := s.db.Query(ctx, query, req.UserId, req.Limit, req.Offset) if err != nil { return nil, status.Errorf(codes.Internal, "failed to get notifications: %v", err) } defer rows.Close() var notifications []*proto.GetNotificationsResponse_Notification for rows.Next() { var notif proto.GetNotificationsResponse_Notification var createdAt, expiresAt time.Time if err := rows.Scan( ¬if.Id, ¬if.SubscriptionId, ¬if.FollowerId, ¬if.FollowerUsername, ¬if.FollowerAvatar, ¬if.IsRead, &createdAt, &expiresAt, ); err != nil { return nil, status.Errorf(codes.Internal, "failed to scan notification: %v", err) } notif.CreatedAt = createdAt.Format(time.RFC3339) notif.ExpiresAt = expiresAt.Format(time.RFC3339) notifications = append(notifications, ¬if) } if err := rows.Err(); err != nil { return nil, status.Errorf(codes.Internal, "rows error: %v", err) } return &proto.GetNotificationsResponse{ Notifications: notifications, TotalCount: totalCount, UnreadCount: unreadCount, }, nil } func (s *server) MarkNotificationAsRead(ctx context.Context, req *proto.MarkNotificationReadRequest) (*proto.MarkNotificationReadResponse, error) { if req.NotificationId == 0 || req.UserId == 0 { return nil, status.Error(codes.InvalidArgument, "notification_id and user_id are required") } // Проверяем, что уведомление принадлежит пользователю var belongsToUser bool err := s.db.QueryRow(ctx, ` SELECT EXISTS( SELECT 1 FROM subscription_notifications sn JOIN subscriptions s ON sn.subscription_id = s.id WHERE sn.id = $1 AND s.following_id = $2 )`, req.NotificationId, req.UserId).Scan(&belongsToUser) if err != nil { return nil, status.Errorf(codes.Internal, "failed to check notification ownership: %v", err) } if !belongsToUser { return nil, status.Error(codes.PermissionDenied, "notification does not belong to user") } // Помечаем как прочитанное result, err := s.db.Exec(ctx, ` UPDATE subscription_notifications SET is_read = true WHERE id = $1`, req.NotificationId) if err != nil { return nil, status.Errorf(codes.Internal, "failed to mark notification as read: %v", err) } if result.RowsAffected() == 0 { return &proto.MarkNotificationReadResponse{ Success: false, Message: "Notification not found or already read", }, nil } return &proto.MarkNotificationReadResponse{ Success: true, Message: "Notification marked as read", }, nil } func main() { logger := log.New(os.Stdout, "MAIN: ", log.LstdFlags|log.Lmicroseconds|log.Lshortfile) logger.Println("Starting subscribe service") // Инициализация подключения к БД dbURL := "postgres://tailly_v2:i0Oq%2675LA%26M612ceuy@79.174.89.104:15452/tailly_v2" logger.Printf("Connecting to database at %s", dbURL) pool, err := pgxpool.Connect(context.Background(), dbURL) if err != nil { logger.Fatalf("Unable to connect to database: %v", err) } defer func() { logger.Println("Closing database connection") pool.Close() }() // Создаем gRPC сервер grpcServer := grpc.NewServer( grpc.KeepaliveParams(keepalive.ServerParameters{ MaxConnectionAge: 24 * time.Hour, MaxConnectionAgeGrace: 5 * time.Minute, Time: 30 * time.Second, Timeout: 10 * time.Second, }), grpc.UnaryInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { logger.Printf("Unary call: %s, request: %+v", info.FullMethod, req) start := time.Now() defer func() { logger.Printf("Unary call %s completed in %v, response: %+v, error: %v", info.FullMethod, time.Since(start), resp, err) }() return handler(ctx, req) }), grpc.StreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { logger.Printf("Stream call: %s", info.FullMethod) start := time.Now() defer func() { logger.Printf("Stream call %s completed in %v", info.FullMethod, time.Since(start)) }() return handler(srv, ss) }), ) proto.RegisterSubscribeServiceServer(grpcServer, NewServer(pool)) // Запускаем сервер port := ":50053" logger.Printf("Starting gRPC server on port %s", port) lis, err := net.Listen("tcp", port) if err != nil { logger.Fatalf("failed to listen: %v", err) } logger.Println("Server is ready to accept connections") if err := grpcServer.Serve(lis); err != nil { logger.Fatalf("failed to serve: %v", err) } }