426 lines
14 KiB
Go
426 lines
14 KiB
Go
package main
|
||
|
||
import (
|
||
"context"
|
||
"log"
|
||
"net"
|
||
"os"
|
||
"tailly_subscribers/proto"
|
||
"time"
|
||
|
||
"github.com/jackc/pgx/v4"
|
||
"github.com/jackc/pgx/v4/pgxpool"
|
||
"google.golang.org/grpc"
|
||
"google.golang.org/grpc/codes"
|
||
"google.golang.org/grpc/keepalive"
|
||
"google.golang.org/grpc/status"
|
||
)
|
||
|
||
type server struct {
|
||
proto.UnimplementedSubscribeServiceServer
|
||
db *pgxpool.Pool
|
||
logger *log.Logger
|
||
}
|
||
|
||
func NewServer(db *pgxpool.Pool) *server {
|
||
return &server{
|
||
db: db,
|
||
logger: log.New(os.Stdout, "SUBSCRIBE_SERVICE: ", log.LstdFlags|log.Lmicroseconds|log.Lshortfile),
|
||
}
|
||
}
|
||
|
||
func (s *server) FollowUser(ctx context.Context, req *proto.FollowRequest) (*proto.FollowResponse, error) {
|
||
follower, following := req.FollowerId, req.FollowingId
|
||
if follower == following {
|
||
return nil, status.Error(codes.InvalidArgument, "cannot follow yourself")
|
||
}
|
||
|
||
var subscriptionID int32
|
||
err := s.db.QueryRow(ctx, `
|
||
INSERT INTO subscriptions (follower_id, following_id)
|
||
VALUES ($1, $2)
|
||
ON CONFLICT (follower_id, following_id) DO NOTHING
|
||
RETURNING id`,
|
||
follower, following).Scan(&subscriptionID)
|
||
|
||
if err != nil {
|
||
if err == pgx.ErrNoRows {
|
||
return &proto.FollowResponse{
|
||
Success: false,
|
||
Message: "Already following this user",
|
||
}, nil
|
||
}
|
||
return nil, status.Errorf(codes.Internal, "failed to follow: %v", err)
|
||
}
|
||
|
||
var notificationID int32
|
||
err = s.db.QueryRow(ctx, `
|
||
INSERT INTO subscription_notifications (subscription_id)
|
||
VALUES ($1)
|
||
RETURNING id`,
|
||
subscriptionID).Scan(¬ificationID)
|
||
|
||
if err != nil {
|
||
s.logger.Printf("Warning: failed to create notification for subscription %d: %v", subscriptionID, err)
|
||
} else {
|
||
s.logger.Printf("Created notification %d for subscription %d", notificationID, subscriptionID)
|
||
}
|
||
|
||
return &proto.FollowResponse{
|
||
Success: true,
|
||
Message: "Successfully followed user",
|
||
SubscriptionId: subscriptionID,
|
||
}, nil
|
||
}
|
||
|
||
func (s *server) UnfollowUser(ctx context.Context, req *proto.UnfollowRequest) (*proto.UnfollowResponse, error) {
|
||
follower, following := req.FollowerId, req.FollowingId
|
||
if follower == 0 || following == 0 {
|
||
return nil, status.Errorf(codes.InvalidArgument, "Переданы пустые id follower или following, %d, %d", follower, following)
|
||
}
|
||
|
||
var userExists bool
|
||
err := s.db.QueryRow(ctx, `
|
||
SELECT EXISTS(SELECT 1 FROM users WHERE id IN ($1, $2))`,
|
||
follower, following).Scan(&userExists)
|
||
|
||
if !userExists {
|
||
return nil, status.Errorf(codes.NotFound, "one or both users not found, %v", err)
|
||
}
|
||
|
||
result, err := s.db.Exec(ctx, `
|
||
DELETE FROM subscriptions
|
||
WHERE follower_id = $1 AND following_id = $2`,
|
||
follower, following)
|
||
|
||
if result.RowsAffected() == 0 {
|
||
return &proto.UnfollowResponse{
|
||
Success: false,
|
||
Message: "Subscription not found",
|
||
}, nil
|
||
}
|
||
|
||
return &proto.UnfollowResponse{
|
||
Success: true,
|
||
Message: "Successfully unfollowed user",
|
||
}, nil
|
||
}
|
||
|
||
func (s *server) GetFollowers(ctx context.Context, req *proto.GetFollowersRequest) (*proto.GetFollowersResponse, error) {
|
||
if req.UserId == 0 {
|
||
return nil, status.Error(codes.InvalidArgument, "user_id is required")
|
||
}
|
||
|
||
// Получаем общее количество подписчиков
|
||
var totalCount int32
|
||
err := s.db.QueryRow(ctx, `
|
||
SELECT COUNT(*) FROM subscriptions WHERE following_id = $1`,
|
||
req.UserId).Scan(&totalCount)
|
||
if err != nil {
|
||
return nil, status.Errorf(codes.Internal, "failed to get followers count: %v", err)
|
||
}
|
||
|
||
// Получаем список подписчиков с пагинацией
|
||
rows, err := s.db.Query(ctx, `
|
||
SELECT u.id, u.username, u.avatar
|
||
FROM subscriptions s
|
||
JOIN users u ON s.follower_id = u.id
|
||
WHERE s.following_id = $1
|
||
ORDER BY s.created_at DESC
|
||
LIMIT $2 OFFSET $3`,
|
||
req.UserId, req.Limit, req.Offset)
|
||
if err != nil {
|
||
return nil, status.Errorf(codes.Internal, "failed to get followers: %v", err)
|
||
}
|
||
defer rows.Close()
|
||
|
||
var followers []*proto.GetFollowersResponse_Follower
|
||
for rows.Next() {
|
||
var follower proto.GetFollowersResponse_Follower
|
||
if err := rows.Scan(&follower.UserId, &follower.Username, &follower.Avatar); err != nil {
|
||
return nil, status.Errorf(codes.Internal, "failed to scan follower: %v", err)
|
||
}
|
||
followers = append(followers, &follower)
|
||
}
|
||
|
||
if err := rows.Err(); err != nil {
|
||
return nil, status.Errorf(codes.Internal, "rows error: %v", err)
|
||
}
|
||
|
||
return &proto.GetFollowersResponse{
|
||
Followers: followers,
|
||
TotalCount: totalCount,
|
||
}, nil
|
||
}
|
||
|
||
func (s *server) GetFollowing(ctx context.Context, req *proto.GetFollowingRequest) (*proto.GetFollowingResponse, error) {
|
||
if req.UserId == 0 {
|
||
return nil, status.Error(codes.InvalidArgument, "user_id is required")
|
||
}
|
||
|
||
// Получаем общее количество подписок
|
||
var totalCount int32
|
||
err := s.db.QueryRow(ctx, `
|
||
SELECT COUNT(*) FROM subscriptions WHERE follower_id = $1`,
|
||
req.UserId).Scan(&totalCount)
|
||
if err != nil {
|
||
return nil, status.Errorf(codes.Internal, "failed to get following count: %v", err)
|
||
}
|
||
|
||
// Получаем список подписок с пагинацией
|
||
rows, err := s.db.Query(ctx, `
|
||
SELECT u.id, u.username, u.avatar
|
||
FROM subscriptions s
|
||
JOIN users u ON s.following_id = u.id
|
||
WHERE s.follower_id = $1
|
||
ORDER BY s.created_at DESC
|
||
LIMIT $2 OFFSET $3`,
|
||
req.UserId, req.Limit, req.Offset)
|
||
if err != nil {
|
||
return nil, status.Errorf(codes.Internal, "failed to get following: %v", err)
|
||
}
|
||
defer rows.Close()
|
||
|
||
var following []*proto.GetFollowingResponse_Following
|
||
for rows.Next() {
|
||
var follow proto.GetFollowingResponse_Following
|
||
if err := rows.Scan(&follow.UserId, &follow.Username, &follow.Avatar); err != nil {
|
||
return nil, status.Errorf(codes.Internal, "failed to scan following: %v", err)
|
||
}
|
||
following = append(following, &follow)
|
||
}
|
||
|
||
if err := rows.Err(); err != nil {
|
||
return nil, status.Errorf(codes.Internal, "rows error: %v", err)
|
||
}
|
||
|
||
return &proto.GetFollowingResponse{
|
||
Following: following,
|
||
TotalCount: totalCount,
|
||
}, nil
|
||
}
|
||
|
||
func (s *server) IsFollowing(ctx context.Context, req *proto.IsFollowingRequest) (*proto.IsFollowingResponse, error) {
|
||
if req.FollowerId == 0 || req.FollowingId == 0 {
|
||
return nil, status.Error(codes.InvalidArgument, "follower_id and following_id are required")
|
||
}
|
||
|
||
var isFollowing bool
|
||
err := s.db.QueryRow(ctx, `
|
||
SELECT EXISTS(
|
||
SELECT 1 FROM subscriptions
|
||
WHERE follower_id = $1 AND following_id = $2
|
||
)`,
|
||
req.FollowerId, req.FollowingId).Scan(&isFollowing)
|
||
if err != nil {
|
||
return nil, status.Errorf(codes.Internal, "failed to check subscription: %v", err)
|
||
}
|
||
|
||
return &proto.IsFollowingResponse{IsFollowing: isFollowing}, nil
|
||
}
|
||
|
||
func (s *server) GetFollowersCount(ctx context.Context, req *proto.GetCountRequest) (*proto.GetCountResponse, error) {
|
||
if req.UserId == 0 {
|
||
return nil, status.Error(codes.InvalidArgument, "user_id is required")
|
||
}
|
||
|
||
var count int32
|
||
err := s.db.QueryRow(ctx, `
|
||
SELECT COUNT(*) FROM subscriptions WHERE following_id = $1`,
|
||
req.UserId).Scan(&count)
|
||
if err != nil {
|
||
return nil, status.Errorf(codes.Internal, "failed to get followers count: %v", err)
|
||
}
|
||
|
||
return &proto.GetCountResponse{Count: count}, nil
|
||
}
|
||
|
||
func (s *server) GetFollowingCount(ctx context.Context, req *proto.GetCountRequest) (*proto.GetCountResponse, error) {
|
||
if req.UserId == 0 {
|
||
return nil, status.Error(codes.InvalidArgument, "user_id is required")
|
||
}
|
||
|
||
var count int32
|
||
err := s.db.QueryRow(ctx, `
|
||
SELECT COUNT(*) FROM subscriptions WHERE follower_id = $1`,
|
||
req.UserId).Scan(&count)
|
||
if err != nil {
|
||
return nil, status.Errorf(codes.Internal, "failed to get following count: %v", err)
|
||
}
|
||
|
||
return &proto.GetCountResponse{Count: count}, nil
|
||
}
|
||
|
||
func (s *server) GetSubscriptionNotifications(ctx context.Context, req *proto.GetNotificationsRequest) (*proto.GetNotificationsResponse, error) {
|
||
if req.UserId == 0 {
|
||
return nil, status.Error(codes.InvalidArgument, "user_id is required")
|
||
}
|
||
|
||
// Получаем общее количество уведомлений
|
||
var totalCount, unreadCount int32
|
||
err := s.db.QueryRow(ctx, `
|
||
SELECT
|
||
COUNT(*),
|
||
COUNT(*) FILTER (WHERE NOT is_read)
|
||
FROM subscription_notifications sn
|
||
JOIN subscriptions s ON sn.subscription_id = s.id
|
||
WHERE s.following_id = $1`,
|
||
req.UserId).Scan(&totalCount, &unreadCount)
|
||
if err != nil {
|
||
return nil, status.Errorf(codes.Internal, "failed to get notifications count: %v", err)
|
||
}
|
||
|
||
// Базовый запрос
|
||
query := `
|
||
SELECT
|
||
sn.id, sn.subscription_id, s.follower_id,
|
||
u.username, u.avatar, sn.is_read,
|
||
sn.created_at, sn.created_at + INTERVAL '7 days' as expires_at
|
||
FROM subscription_notifications sn
|
||
JOIN subscriptions s ON sn.subscription_id = s.id
|
||
JOIN users u ON s.follower_id = u.id
|
||
WHERE s.following_id = $1`
|
||
|
||
// Добавляем условие для непрочитанных
|
||
if req.UnreadOnly {
|
||
query += " AND NOT sn.is_read"
|
||
}
|
||
|
||
// Добавляем пагинацию
|
||
query += " ORDER BY sn.created_at DESC LIMIT $2 OFFSET $3"
|
||
|
||
rows, err := s.db.Query(ctx, query, req.UserId, req.Limit, req.Offset)
|
||
if err != nil {
|
||
return nil, status.Errorf(codes.Internal, "failed to get notifications: %v", err)
|
||
}
|
||
defer rows.Close()
|
||
|
||
var notifications []*proto.GetNotificationsResponse_Notification
|
||
for rows.Next() {
|
||
var notif proto.GetNotificationsResponse_Notification
|
||
var createdAt, expiresAt time.Time
|
||
if err := rows.Scan(
|
||
¬if.Id, ¬if.SubscriptionId, ¬if.FollowerId,
|
||
¬if.FollowerUsername, ¬if.FollowerAvatar, ¬if.IsRead,
|
||
&createdAt, &expiresAt,
|
||
); err != nil {
|
||
return nil, status.Errorf(codes.Internal, "failed to scan notification: %v", err)
|
||
}
|
||
notif.CreatedAt = createdAt.Format(time.RFC3339)
|
||
notif.ExpiresAt = expiresAt.Format(time.RFC3339)
|
||
notifications = append(notifications, ¬if)
|
||
}
|
||
|
||
if err := rows.Err(); err != nil {
|
||
return nil, status.Errorf(codes.Internal, "rows error: %v", err)
|
||
}
|
||
|
||
return &proto.GetNotificationsResponse{
|
||
Notifications: notifications,
|
||
TotalCount: totalCount,
|
||
UnreadCount: unreadCount,
|
||
}, nil
|
||
}
|
||
|
||
func (s *server) MarkNotificationAsRead(ctx context.Context, req *proto.MarkNotificationReadRequest) (*proto.MarkNotificationReadResponse, error) {
|
||
if req.NotificationId == 0 || req.UserId == 0 {
|
||
return nil, status.Error(codes.InvalidArgument, "notification_id and user_id are required")
|
||
}
|
||
|
||
// Проверяем, что уведомление принадлежит пользователю
|
||
var belongsToUser bool
|
||
err := s.db.QueryRow(ctx, `
|
||
SELECT EXISTS(
|
||
SELECT 1 FROM subscription_notifications sn
|
||
JOIN subscriptions s ON sn.subscription_id = s.id
|
||
WHERE sn.id = $1 AND s.following_id = $2
|
||
)`,
|
||
req.NotificationId, req.UserId).Scan(&belongsToUser)
|
||
if err != nil {
|
||
return nil, status.Errorf(codes.Internal, "failed to check notification ownership: %v", err)
|
||
}
|
||
|
||
if !belongsToUser {
|
||
return nil, status.Error(codes.PermissionDenied, "notification does not belong to user")
|
||
}
|
||
|
||
// Помечаем как прочитанное
|
||
result, err := s.db.Exec(ctx, `
|
||
UPDATE subscription_notifications
|
||
SET is_read = true
|
||
WHERE id = $1`,
|
||
req.NotificationId)
|
||
if err != nil {
|
||
return nil, status.Errorf(codes.Internal, "failed to mark notification as read: %v", err)
|
||
}
|
||
|
||
if result.RowsAffected() == 0 {
|
||
return &proto.MarkNotificationReadResponse{
|
||
Success: false,
|
||
Message: "Notification not found or already read",
|
||
}, nil
|
||
}
|
||
|
||
return &proto.MarkNotificationReadResponse{
|
||
Success: true,
|
||
Message: "Notification marked as read",
|
||
}, nil
|
||
}
|
||
|
||
func main() {
|
||
logger := log.New(os.Stdout, "MAIN: ", log.LstdFlags|log.Lmicroseconds|log.Lshortfile)
|
||
logger.Println("Starting subscribe service")
|
||
|
||
// Инициализация подключения к БД
|
||
dbURL := "postgres://tailly_v2:i0Oq%2675LA%26M612ceuy@79.174.89.104:15452/tailly_v2"
|
||
logger.Printf("Connecting to database at %s", dbURL)
|
||
pool, err := pgxpool.Connect(context.Background(), dbURL)
|
||
if err != nil {
|
||
logger.Fatalf("Unable to connect to database: %v", err)
|
||
}
|
||
defer func() {
|
||
logger.Println("Closing database connection")
|
||
pool.Close()
|
||
}()
|
||
// Создаем gRPC сервер
|
||
grpcServer := grpc.NewServer(
|
||
grpc.KeepaliveParams(keepalive.ServerParameters{
|
||
MaxConnectionAge: 24 * time.Hour,
|
||
MaxConnectionAgeGrace: 5 * time.Minute,
|
||
Time: 30 * time.Second,
|
||
Timeout: 10 * time.Second,
|
||
}),
|
||
grpc.UnaryInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||
logger.Printf("Unary call: %s, request: %+v", info.FullMethod, req)
|
||
start := time.Now()
|
||
defer func() {
|
||
logger.Printf("Unary call %s completed in %v, response: %+v, error: %v",
|
||
info.FullMethod, time.Since(start), resp, err)
|
||
}()
|
||
return handler(ctx, req)
|
||
}),
|
||
grpc.StreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||
logger.Printf("Stream call: %s", info.FullMethod)
|
||
start := time.Now()
|
||
defer func() {
|
||
logger.Printf("Stream call %s completed in %v", info.FullMethod, time.Since(start))
|
||
}()
|
||
return handler(srv, ss)
|
||
}),
|
||
)
|
||
proto.RegisterSubscribeServiceServer(grpcServer, NewServer(pool))
|
||
|
||
// Запускаем сервер
|
||
port := ":50053"
|
||
logger.Printf("Starting gRPC server on port %s", port)
|
||
lis, err := net.Listen("tcp", port)
|
||
if err != nil {
|
||
logger.Fatalf("failed to listen: %v", err)
|
||
}
|
||
|
||
logger.Println("Server is ready to accept connections")
|
||
if err := grpcServer.Serve(lis); err != nil {
|
||
logger.Fatalf("failed to serve: %v", err)
|
||
}
|
||
}
|