diff --git a/cmd/server/main.go b/cmd/server/main.go index 0f50089..5ffa2e2 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -39,7 +39,7 @@ func main() { ) // Инициализация чата - chatHub := ws.NewChatHub() + chatHub := ws.NewHub() go chatHub.Run() // Репозитории @@ -72,7 +72,7 @@ func main() { postService := service.NewPostService(postRepo) commentService := service.NewCommentService(commentRepo, postRepo) likeService := service.NewLikeService(likeRepo, postRepo) - chatService := service.NewChatService(chatRepo, userRepo) + chatService := service.NewChatService(chatRepo, userRepo, chatHub) auditService := service.NewAuditService(auditRepo) recoveryService := service.NewRecoveryService(recoveryRepo, userRepo, sessionRepo, deviceRepo, mailService) sessionService := service.NewSessionService(sessionRepo, deviceRepo, userRepo, mailService) diff --git a/internal/http/graph/message_resolvers.go b/internal/http/graph/message_resolvers.go index fcbf38c..9208acd 100644 --- a/internal/http/graph/message_resolvers.go +++ b/internal/http/graph/message_resolvers.go @@ -203,18 +203,13 @@ func (r *subscriptionResolver) MessageReceived(ctx context.Context) (<-chan *dom } // Регистрируем клиента в хабе - r.Services.ChatHub.Register(client) + r.Services.ChatHub.RegisterClient(client) // Горутина для обработки отключения go func() { <-ctx.Done() - // Добавляем защиту от повторного закрытия - select { - case <-messageChan: // Если канал уже закрыт - default: - close(messageChan) - } - r.Services.ChatHub.Unregister(client) + r.Services.ChatHub.UnregisterClient(client) + close(messageChan) }() return messageChan, nil diff --git a/internal/http/handlers/chat.go b/internal/http/handlers/chat.go index 1f1ba61..cc5d722 100644 --- a/internal/http/handlers/chat.go +++ b/internal/http/handlers/chat.go @@ -3,30 +3,47 @@ package handlers import ( "context" "net/http" - "strconv" "tailly_back_v2/internal/domain" "tailly_back_v2/internal/service" "tailly_back_v2/internal/ws" + "tailly_back_v2/pkg/auth" - "github.com/go-chi/chi/v5" "github.com/gorilla/websocket" ) -type ChatHandler struct { - chatService service.ChatService - hub *ws.ChatHub -} - var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, CheckOrigin: func(r *http.Request) bool { return true // В production заменить на проверку origin }, } -func (h *ChatHandler) WebSocketConnection(w http.ResponseWriter, r *http.Request) { - userID, err := strconv.Atoi(chi.URLParam(r, "userID")) +type ChatHandler struct { + chatService service.ChatService + hub *ws.Hub + tokenAuth *auth.TokenAuth +} + +func NewChatHandler(chatService service.ChatService, hub *ws.Hub, tokenAuth *auth.TokenAuth) *ChatHandler { + return &ChatHandler{ + chatService: chatService, + hub: hub, + tokenAuth: tokenAuth, + } +} + +func (h *ChatHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request) { + // Аутентификация пользователя + token := r.URL.Query().Get("token") + if token == "" { + http.Error(w, "Token is required", http.StatusUnauthorized) + return + } + + userID, err := h.tokenAuth.ValidateAccessToken(token) if err != nil { - http.Error(w, "Invalid user ID", http.StatusBadRequest) + http.Error(w, "Invalid token", http.StatusUnauthorized) return } @@ -41,17 +58,17 @@ func (h *ChatHandler) WebSocketConnection(w http.ResponseWriter, r *http.Request Send: make(chan *domain.Message, 256), } - h.hub.Register(client) + h.hub.RegisterClient(client) // Горутина для чтения сообщений - go h.readPump(conn, client) + go h.readPump(conn, client, userID) // Горутина для записи сообщений go h.writePump(conn, client) } -func (h *ChatHandler) readPump(conn *websocket.Conn, client *ws.Client) { +func (h *ChatHandler) readPump(conn *websocket.Conn, client *ws.Client, userID int) { defer func() { - h.hub.Unregister(client) + h.hub.UnregisterClient(client) conn.Close() }() @@ -65,34 +82,27 @@ func (h *ChatHandler) readPump(conn *websocket.Conn, client *ws.Client) { break } - // Отправляем сообщение через сервис + // Используем context.Background() вместо r.Context() message, err := h.chatService.SendMessage( context.Background(), - client.UserID, + userID, msg.ChatID, msg.Content, ) if err != nil { - // Обработка ошибки (можно отправить ошибку обратно клиенту) continue } - // Отправляем сообщение получателю через хаб h.hub.Broadcast(message) } } func (h *ChatHandler) writePump(conn *websocket.Conn, client *ws.Client) { defer conn.Close() - for { - message, ok := <-client.Send - if !ok { - conn.WriteMessage(websocket.CloseMessage, []byte{}) - return - } + for message := range client.Send { if err := conn.WriteJSON(message); err != nil { - return + break } } } diff --git a/internal/http/server.go b/internal/http/server.go index 623ffe3..7f1eca5 100644 --- a/internal/http/server.go +++ b/internal/http/server.go @@ -12,8 +12,11 @@ import ( "os" "tailly_back_v2/internal/config" "tailly_back_v2/internal/http/graph" + "tailly_back_v2/internal/http/handlers" "tailly_back_v2/internal/http/middleware" + "tailly_back_v2/internal/repository" "tailly_back_v2/internal/service" + "tailly_back_v2/internal/ws" "tailly_back_v2/pkg/auth" ) @@ -50,6 +53,21 @@ func (s *Server) configureRouter() { "http://localhost:3000", // React dev server "https://tailly.ru", // Продакшен домен } + // Инициализация WebSocket хаба + hub := ws.NewHub() + go hub.Run() + + // Инициализация сервиса чата + chatService := service.NewChatService( + repository.NewChatRepository(s.db), + repository.NewUserRepository(s.db), + hub, + ) + + // Обновляем сервис чата в services + s.services.Chat = chatService + + // Добавляем обработчик WebSocket // Логирование logger := log.New(os.Stdout, "HTTP: ", log.LstdFlags) @@ -67,6 +85,8 @@ func (s *Server) configureRouter() { s.router.Handle("/", playground.Handler("GraphQL playground", "/query")) s.router.Handle("/query", srv) s.router.Handle("/uploads/*", http.StripPrefix("/uploads/", http.FileServer(http.Dir("./uploads")))) + chatHandler := handlers.NewChatHandler(chatService, hub, s.tokenAuth) + s.router.HandleFunc("/ws", chatHandler.HandleWebSocket) } func (s *Server) configureMetrics() { diff --git a/internal/service/chat_service.go b/internal/service/chat_service.go index fc29073..0e1d4e7 100644 --- a/internal/service/chat_service.go +++ b/internal/service/chat_service.go @@ -22,16 +22,10 @@ type ChatService interface { type chatService struct { chatRepo repository.ChatRepository userRepo repository.UserRepository - hub *ws.ChatHub + hub *ws.Hub } -func NewChatService( - chatRepo repository.ChatRepository, - userRepo repository.UserRepository, -) ChatService { - hub := ws.NewChatHub() - go hub.Run() // Запускаем хаб в отдельной горутине - +func NewChatService(chatRepo repository.ChatRepository, userRepo repository.UserRepository, hub *ws.Hub) ChatService { return &chatService{ chatRepo: chatRepo, userRepo: userRepo, @@ -40,18 +34,15 @@ func NewChatService( } func (s *chatService) SendMessage(ctx context.Context, senderID, chatID int, content string) (*domain.Message, error) { - // Проверяем существование чата chat, err := s.chatRepo.GetChatByID(ctx, chatID) if err != nil { return nil, err } - // Проверяем, что отправитель является участником чата if senderID != chat.User1ID && senderID != chat.User2ID { return nil, errors.New("user is not a participant of this chat") } - // Определяем получателя receiverID := chat.User1ID if senderID == chat.User1ID { receiverID = chat.User2ID @@ -60,7 +51,7 @@ func (s *chatService) SendMessage(ctx context.Context, senderID, chatID int, con message := &domain.Message{ ChatID: chatID, SenderID: senderID, - ReceiverID: receiverID, // Явно устанавливаем получателя + ReceiverID: receiverID, Content: content, Status: "sent", CreatedAt: time.Now(), @@ -70,7 +61,7 @@ func (s *chatService) SendMessage(ctx context.Context, senderID, chatID int, con return nil, err } - // Отправляем через WebSocket + // Отправляем сообщение через WebSocket if s.hub != nil { s.hub.Broadcast(message) } diff --git a/internal/service/services.go b/internal/service/services.go index 3a86d1a..1a1fd1b 100644 --- a/internal/service/services.go +++ b/internal/service/services.go @@ -16,10 +16,10 @@ type Services struct { Recovery RecoveryService Audit AuditService Chat ChatService - ChatHub *ws.ChatHub + ChatHub *ws.Hub } -func NewServices(authService AuthService, userService UserService, postService PostService, commentService CommentService, likeService LikeService, mailService MailService, auditService AuditService, recoveryService RecoveryService, sessionService SessionService, chatService ChatService, chatHub *ws.ChatHub) *Services { +func NewServices(authService AuthService, userService UserService, postService PostService, commentService CommentService, likeService LikeService, mailService MailService, auditService AuditService, recoveryService RecoveryService, sessionService SessionService, chatService ChatService, chatHub *ws.Hub) *Services { return &Services{ Auth: authService, User: userService, diff --git a/internal/ws/hub.go b/internal/ws/hub.go index caa14eb..15c5046 100644 --- a/internal/ws/hub.go +++ b/internal/ws/hub.go @@ -1,3 +1,4 @@ +// ws/hub.go package ws import ( @@ -8,80 +9,74 @@ import ( type Client struct { UserID int Send chan *domain.Message - once sync.Once // Для гарантии однократного закрытия } -func (c *Client) Close() { - c.once.Do(func() { - close(c.Send) - }) -} - -type ChatHub struct { +type Hub struct { clients map[int]*Client register chan *Client unregister chan *Client broadcast chan *domain.Message - mutex sync.RWMutex + mu sync.RWMutex } -func NewChatHub() *ChatHub { - return &ChatHub{ +func NewHub() *Hub { + return &Hub{ clients: make(map[int]*Client), register: make(chan *Client), unregister: make(chan *Client), - broadcast: make(chan *domain.Message), + broadcast: make(chan *domain.Message, 100), } } -// Register добавляет нового клиента в хаб -func (h *ChatHub) Register(client *Client) { - if h == nil || client == nil { - return - } +func (h *Hub) RegisterClient(client *Client) { h.register <- client } -func (h *ChatHub) Unregister(client *Client) { - if h == nil || client == nil { - return - } +func (h *Hub) UnregisterClient(client *Client) { h.unregister <- client } -func (h *ChatHub) Broadcast(message *domain.Message) { - if h == nil || message == nil { - return - } +func (h *Hub) Broadcast(message *domain.Message) { h.broadcast <- message } -func (h *ChatHub) Run() { +func (h *Hub) Run() { for { select { + case client := <-h.register: + h.mu.Lock() + h.clients[client.UserID] = client + h.mu.Unlock() + case client := <-h.unregister: - h.mutex.Lock() - if c, ok := h.clients[client.UserID]; ok && c == client { - client.Close() // Используем безопасное закрытие + h.mu.Lock() + if c, ok := h.clients[client.UserID]; ok { + close(c.Send) delete(h.clients, client.UserID) } - h.mutex.Unlock() + h.mu.Unlock() case message := <-h.broadcast: - h.mutex.RLock() + h.mu.RLock() + // Отправляем сообщение отправителю if sender, ok := h.clients[message.SenderID]; ok { select { - case sender.Send <- message: // Не блокируется, если канал закрыт + case sender.Send <- message: default: + close(sender.Send) + delete(h.clients, sender.UserID) } } + // Отправляем сообщение получателю if receiver, ok := h.clients[message.ReceiverID]; ok { select { case receiver.Send <- message: default: + close(receiver.Send) + delete(h.clients, receiver.UserID) } } - h.mutex.RUnlock() + h.mu.RUnlock() } } }