admin 52ce331f37
All checks were successful
continuous-integration/drone/push Build is passing
v.0.2 Добавлено создание уведомления при подписке
2025-09-16 22:33:43 +03:00

426 lines
14 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"log"
"net"
"os"
"tailly_subscribers/proto"
"time"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
)
type server struct {
proto.UnimplementedSubscribeServiceServer
db *pgxpool.Pool
logger *log.Logger
}
func NewServer(db *pgxpool.Pool) *server {
return &server{
db: db,
logger: log.New(os.Stdout, "SUBSCRIBE_SERVICE: ", log.LstdFlags|log.Lmicroseconds|log.Lshortfile),
}
}
func (s *server) FollowUser(ctx context.Context, req *proto.FollowRequest) (*proto.FollowResponse, error) {
follower, following := req.FollowerId, req.FollowingId
if follower == following {
return nil, status.Error(codes.InvalidArgument, "cannot follow yourself")
}
var subscriptionID int32
err := s.db.QueryRow(ctx, `
INSERT INTO subscriptions (follower_id, following_id)
VALUES ($1, $2)
ON CONFLICT (follower_id, following_id) DO NOTHING
RETURNING id`,
follower, following).Scan(&subscriptionID)
if err != nil {
if err == pgx.ErrNoRows {
return &proto.FollowResponse{
Success: false,
Message: "Already following this user",
}, nil
}
return nil, status.Errorf(codes.Internal, "failed to follow: %v", err)
}
var notificationID int32
err = s.db.QueryRow(ctx, `
INSERT INTO subscription_notifications (subscription_id)
VALUES ($1)
RETURNING id`,
subscriptionID).Scan(&notificationID)
if err != nil {
s.logger.Printf("Warning: failed to create notification for subscription %d: %v", subscriptionID, err)
} else {
s.logger.Printf("Created notification %d for subscription %d", notificationID, subscriptionID)
}
return &proto.FollowResponse{
Success: true,
Message: "Successfully followed user",
SubscriptionId: subscriptionID,
}, nil
}
func (s *server) UnfollowUser(ctx context.Context, req *proto.UnfollowRequest) (*proto.UnfollowResponse, error) {
follower, following := req.FollowerId, req.FollowingId
if follower == 0 || following == 0 {
return nil, status.Errorf(codes.InvalidArgument, "Переданы пустые id follower или following, %d, %d", follower, following)
}
var userExists bool
err := s.db.QueryRow(ctx, `
SELECT EXISTS(SELECT 1 FROM users WHERE id IN ($1, $2))`,
follower, following).Scan(&userExists)
if !userExists {
return nil, status.Errorf(codes.NotFound, "one or both users not found, %v", err)
}
result, err := s.db.Exec(ctx, `
DELETE FROM subscriptions
WHERE follower_id = $1 AND following_id = $2`,
follower, following)
if result.RowsAffected() == 0 {
return &proto.UnfollowResponse{
Success: false,
Message: "Subscription not found",
}, nil
}
return &proto.UnfollowResponse{
Success: true,
Message: "Successfully unfollowed user",
}, nil
}
func (s *server) GetFollowers(ctx context.Context, req *proto.GetFollowersRequest) (*proto.GetFollowersResponse, error) {
if req.UserId == 0 {
return nil, status.Error(codes.InvalidArgument, "user_id is required")
}
// Получаем общее количество подписчиков
var totalCount int32
err := s.db.QueryRow(ctx, `
SELECT COUNT(*) FROM subscriptions WHERE following_id = $1`,
req.UserId).Scan(&totalCount)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get followers count: %v", err)
}
// Получаем список подписчиков с пагинацией
rows, err := s.db.Query(ctx, `
SELECT u.id, u.username, u.avatar
FROM subscriptions s
JOIN users u ON s.follower_id = u.id
WHERE s.following_id = $1
ORDER BY s.created_at DESC
LIMIT $2 OFFSET $3`,
req.UserId, req.Limit, req.Offset)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get followers: %v", err)
}
defer rows.Close()
var followers []*proto.GetFollowersResponse_Follower
for rows.Next() {
var follower proto.GetFollowersResponse_Follower
if err := rows.Scan(&follower.UserId, &follower.Username, &follower.Avatar); err != nil {
return nil, status.Errorf(codes.Internal, "failed to scan follower: %v", err)
}
followers = append(followers, &follower)
}
if err := rows.Err(); err != nil {
return nil, status.Errorf(codes.Internal, "rows error: %v", err)
}
return &proto.GetFollowersResponse{
Followers: followers,
TotalCount: totalCount,
}, nil
}
func (s *server) GetFollowing(ctx context.Context, req *proto.GetFollowingRequest) (*proto.GetFollowingResponse, error) {
if req.UserId == 0 {
return nil, status.Error(codes.InvalidArgument, "user_id is required")
}
// Получаем общее количество подписок
var totalCount int32
err := s.db.QueryRow(ctx, `
SELECT COUNT(*) FROM subscriptions WHERE follower_id = $1`,
req.UserId).Scan(&totalCount)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get following count: %v", err)
}
// Получаем список подписок с пагинацией
rows, err := s.db.Query(ctx, `
SELECT u.id, u.username, u.avatar
FROM subscriptions s
JOIN users u ON s.following_id = u.id
WHERE s.follower_id = $1
ORDER BY s.created_at DESC
LIMIT $2 OFFSET $3`,
req.UserId, req.Limit, req.Offset)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get following: %v", err)
}
defer rows.Close()
var following []*proto.GetFollowingResponse_Following
for rows.Next() {
var follow proto.GetFollowingResponse_Following
if err := rows.Scan(&follow.UserId, &follow.Username, &follow.Avatar); err != nil {
return nil, status.Errorf(codes.Internal, "failed to scan following: %v", err)
}
following = append(following, &follow)
}
if err := rows.Err(); err != nil {
return nil, status.Errorf(codes.Internal, "rows error: %v", err)
}
return &proto.GetFollowingResponse{
Following: following,
TotalCount: totalCount,
}, nil
}
func (s *server) IsFollowing(ctx context.Context, req *proto.IsFollowingRequest) (*proto.IsFollowingResponse, error) {
if req.FollowerId == 0 || req.FollowingId == 0 {
return nil, status.Error(codes.InvalidArgument, "follower_id and following_id are required")
}
var isFollowing bool
err := s.db.QueryRow(ctx, `
SELECT EXISTS(
SELECT 1 FROM subscriptions
WHERE follower_id = $1 AND following_id = $2
)`,
req.FollowerId, req.FollowingId).Scan(&isFollowing)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to check subscription: %v", err)
}
return &proto.IsFollowingResponse{IsFollowing: isFollowing}, nil
}
func (s *server) GetFollowersCount(ctx context.Context, req *proto.GetCountRequest) (*proto.GetCountResponse, error) {
if req.UserId == 0 {
return nil, status.Error(codes.InvalidArgument, "user_id is required")
}
var count int32
err := s.db.QueryRow(ctx, `
SELECT COUNT(*) FROM subscriptions WHERE following_id = $1`,
req.UserId).Scan(&count)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get followers count: %v", err)
}
return &proto.GetCountResponse{Count: count}, nil
}
func (s *server) GetFollowingCount(ctx context.Context, req *proto.GetCountRequest) (*proto.GetCountResponse, error) {
if req.UserId == 0 {
return nil, status.Error(codes.InvalidArgument, "user_id is required")
}
var count int32
err := s.db.QueryRow(ctx, `
SELECT COUNT(*) FROM subscriptions WHERE follower_id = $1`,
req.UserId).Scan(&count)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get following count: %v", err)
}
return &proto.GetCountResponse{Count: count}, nil
}
func (s *server) GetSubscriptionNotifications(ctx context.Context, req *proto.GetNotificationsRequest) (*proto.GetNotificationsResponse, error) {
if req.UserId == 0 {
return nil, status.Error(codes.InvalidArgument, "user_id is required")
}
// Получаем общее количество уведомлений
var totalCount, unreadCount int32
err := s.db.QueryRow(ctx, `
SELECT
COUNT(*),
COUNT(*) FILTER (WHERE NOT is_read)
FROM subscription_notifications sn
JOIN subscriptions s ON sn.subscription_id = s.id
WHERE s.following_id = $1`,
req.UserId).Scan(&totalCount, &unreadCount)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get notifications count: %v", err)
}
// Базовый запрос
query := `
SELECT
sn.id, sn.subscription_id, s.follower_id,
u.username, u.avatar, sn.is_read,
sn.created_at, sn.created_at + INTERVAL '7 days' as expires_at
FROM subscription_notifications sn
JOIN subscriptions s ON sn.subscription_id = s.id
JOIN users u ON s.follower_id = u.id
WHERE s.following_id = $1`
// Добавляем условие для непрочитанных
if req.UnreadOnly {
query += " AND NOT sn.is_read"
}
// Добавляем пагинацию
query += " ORDER BY sn.created_at DESC LIMIT $2 OFFSET $3"
rows, err := s.db.Query(ctx, query, req.UserId, req.Limit, req.Offset)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get notifications: %v", err)
}
defer rows.Close()
var notifications []*proto.GetNotificationsResponse_Notification
for rows.Next() {
var notif proto.GetNotificationsResponse_Notification
var createdAt, expiresAt time.Time
if err := rows.Scan(
&notif.Id, &notif.SubscriptionId, &notif.FollowerId,
&notif.FollowerUsername, &notif.FollowerAvatar, &notif.IsRead,
&createdAt, &expiresAt,
); err != nil {
return nil, status.Errorf(codes.Internal, "failed to scan notification: %v", err)
}
notif.CreatedAt = createdAt.Format(time.RFC3339)
notif.ExpiresAt = expiresAt.Format(time.RFC3339)
notifications = append(notifications, &notif)
}
if err := rows.Err(); err != nil {
return nil, status.Errorf(codes.Internal, "rows error: %v", err)
}
return &proto.GetNotificationsResponse{
Notifications: notifications,
TotalCount: totalCount,
UnreadCount: unreadCount,
}, nil
}
func (s *server) MarkNotificationAsRead(ctx context.Context, req *proto.MarkNotificationReadRequest) (*proto.MarkNotificationReadResponse, error) {
if req.NotificationId == 0 || req.UserId == 0 {
return nil, status.Error(codes.InvalidArgument, "notification_id and user_id are required")
}
// Проверяем, что уведомление принадлежит пользователю
var belongsToUser bool
err := s.db.QueryRow(ctx, `
SELECT EXISTS(
SELECT 1 FROM subscription_notifications sn
JOIN subscriptions s ON sn.subscription_id = s.id
WHERE sn.id = $1 AND s.following_id = $2
)`,
req.NotificationId, req.UserId).Scan(&belongsToUser)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to check notification ownership: %v", err)
}
if !belongsToUser {
return nil, status.Error(codes.PermissionDenied, "notification does not belong to user")
}
// Помечаем как прочитанное
result, err := s.db.Exec(ctx, `
UPDATE subscription_notifications
SET is_read = true
WHERE id = $1`,
req.NotificationId)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to mark notification as read: %v", err)
}
if result.RowsAffected() == 0 {
return &proto.MarkNotificationReadResponse{
Success: false,
Message: "Notification not found or already read",
}, nil
}
return &proto.MarkNotificationReadResponse{
Success: true,
Message: "Notification marked as read",
}, nil
}
func main() {
logger := log.New(os.Stdout, "MAIN: ", log.LstdFlags|log.Lmicroseconds|log.Lshortfile)
logger.Println("Starting subscribe service")
// Инициализация подключения к БД
dbURL := "postgres://tailly_v2:i0Oq%2675LA%26M612ceuy@79.174.89.104:15452/tailly_v2"
logger.Printf("Connecting to database at %s", dbURL)
pool, err := pgxpool.Connect(context.Background(), dbURL)
if err != nil {
logger.Fatalf("Unable to connect to database: %v", err)
}
defer func() {
logger.Println("Closing database connection")
pool.Close()
}()
// Создаем gRPC сервер
grpcServer := grpc.NewServer(
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionAge: 24 * time.Hour,
MaxConnectionAgeGrace: 5 * time.Minute,
Time: 30 * time.Second,
Timeout: 10 * time.Second,
}),
grpc.UnaryInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
logger.Printf("Unary call: %s, request: %+v", info.FullMethod, req)
start := time.Now()
defer func() {
logger.Printf("Unary call %s completed in %v, response: %+v, error: %v",
info.FullMethod, time.Since(start), resp, err)
}()
return handler(ctx, req)
}),
grpc.StreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
logger.Printf("Stream call: %s", info.FullMethod)
start := time.Now()
defer func() {
logger.Printf("Stream call %s completed in %v", info.FullMethod, time.Since(start))
}()
return handler(srv, ss)
}),
)
proto.RegisterSubscribeServiceServer(grpcServer, NewServer(pool))
// Запускаем сервер
port := ":50053"
logger.Printf("Starting gRPC server on port %s", port)
lis, err := net.Listen("tcp", port)
if err != nil {
logger.Fatalf("failed to listen: %v", err)
}
logger.Println("Server is ready to accept connections")
if err := grpcServer.Serve(lis); err != nil {
logger.Fatalf("failed to serve: %v", err)
}
}