v0.0.17 Переработан websocket
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
madipo2611 2025-08-10 18:19:20 +03:00
parent 6ad760a85b
commit dc22989752
7 changed files with 94 additions and 83 deletions

View File

@ -39,7 +39,7 @@ func main() {
) )
// Инициализация чата // Инициализация чата
chatHub := ws.NewChatHub() chatHub := ws.NewHub()
go chatHub.Run() go chatHub.Run()
// Репозитории // Репозитории
@ -72,7 +72,7 @@ func main() {
postService := service.NewPostService(postRepo) postService := service.NewPostService(postRepo)
commentService := service.NewCommentService(commentRepo, postRepo) commentService := service.NewCommentService(commentRepo, postRepo)
likeService := service.NewLikeService(likeRepo, postRepo) likeService := service.NewLikeService(likeRepo, postRepo)
chatService := service.NewChatService(chatRepo, userRepo) chatService := service.NewChatService(chatRepo, userRepo, chatHub)
auditService := service.NewAuditService(auditRepo) auditService := service.NewAuditService(auditRepo)
recoveryService := service.NewRecoveryService(recoveryRepo, userRepo, sessionRepo, deviceRepo, mailService) recoveryService := service.NewRecoveryService(recoveryRepo, userRepo, sessionRepo, deviceRepo, mailService)
sessionService := service.NewSessionService(sessionRepo, deviceRepo, userRepo, mailService) sessionService := service.NewSessionService(sessionRepo, deviceRepo, userRepo, mailService)

View File

@ -203,18 +203,13 @@ func (r *subscriptionResolver) MessageReceived(ctx context.Context) (<-chan *dom
} }
// Регистрируем клиента в хабе // Регистрируем клиента в хабе
r.Services.ChatHub.Register(client) r.Services.ChatHub.RegisterClient(client)
// Горутина для обработки отключения // Горутина для обработки отключения
go func() { go func() {
<-ctx.Done() <-ctx.Done()
// Добавляем защиту от повторного закрытия r.Services.ChatHub.UnregisterClient(client)
select { close(messageChan)
case <-messageChan: // Если канал уже закрыт
default:
close(messageChan)
}
r.Services.ChatHub.Unregister(client)
}() }()
return messageChan, nil return messageChan, nil

View File

@ -3,30 +3,47 @@ package handlers
import ( import (
"context" "context"
"net/http" "net/http"
"strconv"
"tailly_back_v2/internal/domain" "tailly_back_v2/internal/domain"
"tailly_back_v2/internal/service" "tailly_back_v2/internal/service"
"tailly_back_v2/internal/ws" "tailly_back_v2/internal/ws"
"tailly_back_v2/pkg/auth"
"github.com/go-chi/chi/v5"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
type ChatHandler struct {
chatService service.ChatService
hub *ws.ChatHub
}
var upgrader = websocket.Upgrader{ var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool { CheckOrigin: func(r *http.Request) bool {
return true // В production заменить на проверку origin return true // В production заменить на проверку origin
}, },
} }
func (h *ChatHandler) WebSocketConnection(w http.ResponseWriter, r *http.Request) { type ChatHandler struct {
userID, err := strconv.Atoi(chi.URLParam(r, "userID")) chatService service.ChatService
hub *ws.Hub
tokenAuth *auth.TokenAuth
}
func NewChatHandler(chatService service.ChatService, hub *ws.Hub, tokenAuth *auth.TokenAuth) *ChatHandler {
return &ChatHandler{
chatService: chatService,
hub: hub,
tokenAuth: tokenAuth,
}
}
func (h *ChatHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
// Аутентификация пользователя
token := r.URL.Query().Get("token")
if token == "" {
http.Error(w, "Token is required", http.StatusUnauthorized)
return
}
userID, err := h.tokenAuth.ValidateAccessToken(token)
if err != nil { if err != nil {
http.Error(w, "Invalid user ID", http.StatusBadRequest) http.Error(w, "Invalid token", http.StatusUnauthorized)
return return
} }
@ -41,17 +58,17 @@ func (h *ChatHandler) WebSocketConnection(w http.ResponseWriter, r *http.Request
Send: make(chan *domain.Message, 256), Send: make(chan *domain.Message, 256),
} }
h.hub.Register(client) h.hub.RegisterClient(client)
// Горутина для чтения сообщений // Горутина для чтения сообщений
go h.readPump(conn, client) go h.readPump(conn, client, userID)
// Горутина для записи сообщений // Горутина для записи сообщений
go h.writePump(conn, client) go h.writePump(conn, client)
} }
func (h *ChatHandler) readPump(conn *websocket.Conn, client *ws.Client) { func (h *ChatHandler) readPump(conn *websocket.Conn, client *ws.Client, userID int) {
defer func() { defer func() {
h.hub.Unregister(client) h.hub.UnregisterClient(client)
conn.Close() conn.Close()
}() }()
@ -65,34 +82,27 @@ func (h *ChatHandler) readPump(conn *websocket.Conn, client *ws.Client) {
break break
} }
// Отправляем сообщение через сервис // Используем context.Background() вместо r.Context()
message, err := h.chatService.SendMessage( message, err := h.chatService.SendMessage(
context.Background(), context.Background(),
client.UserID, userID,
msg.ChatID, msg.ChatID,
msg.Content, msg.Content,
) )
if err != nil { if err != nil {
// Обработка ошибки (можно отправить ошибку обратно клиенту)
continue continue
} }
// Отправляем сообщение получателю через хаб
h.hub.Broadcast(message) h.hub.Broadcast(message)
} }
} }
func (h *ChatHandler) writePump(conn *websocket.Conn, client *ws.Client) { func (h *ChatHandler) writePump(conn *websocket.Conn, client *ws.Client) {
defer conn.Close() defer conn.Close()
for {
message, ok := <-client.Send
if !ok {
conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
for message := range client.Send {
if err := conn.WriteJSON(message); err != nil { if err := conn.WriteJSON(message); err != nil {
return break
} }
} }
} }

View File

@ -12,8 +12,11 @@ import (
"os" "os"
"tailly_back_v2/internal/config" "tailly_back_v2/internal/config"
"tailly_back_v2/internal/http/graph" "tailly_back_v2/internal/http/graph"
"tailly_back_v2/internal/http/handlers"
"tailly_back_v2/internal/http/middleware" "tailly_back_v2/internal/http/middleware"
"tailly_back_v2/internal/repository"
"tailly_back_v2/internal/service" "tailly_back_v2/internal/service"
"tailly_back_v2/internal/ws"
"tailly_back_v2/pkg/auth" "tailly_back_v2/pkg/auth"
) )
@ -50,6 +53,21 @@ func (s *Server) configureRouter() {
"http://localhost:3000", // React dev server "http://localhost:3000", // React dev server
"https://tailly.ru", // Продакшен домен "https://tailly.ru", // Продакшен домен
} }
// Инициализация WebSocket хаба
hub := ws.NewHub()
go hub.Run()
// Инициализация сервиса чата
chatService := service.NewChatService(
repository.NewChatRepository(s.db),
repository.NewUserRepository(s.db),
hub,
)
// Обновляем сервис чата в services
s.services.Chat = chatService
// Добавляем обработчик WebSocket
// Логирование // Логирование
logger := log.New(os.Stdout, "HTTP: ", log.LstdFlags) logger := log.New(os.Stdout, "HTTP: ", log.LstdFlags)
@ -67,6 +85,8 @@ func (s *Server) configureRouter() {
s.router.Handle("/", playground.Handler("GraphQL playground", "/query")) s.router.Handle("/", playground.Handler("GraphQL playground", "/query"))
s.router.Handle("/query", srv) s.router.Handle("/query", srv)
s.router.Handle("/uploads/*", http.StripPrefix("/uploads/", http.FileServer(http.Dir("./uploads")))) s.router.Handle("/uploads/*", http.StripPrefix("/uploads/", http.FileServer(http.Dir("./uploads"))))
chatHandler := handlers.NewChatHandler(chatService, hub, s.tokenAuth)
s.router.HandleFunc("/ws", chatHandler.HandleWebSocket)
} }
func (s *Server) configureMetrics() { func (s *Server) configureMetrics() {

View File

@ -22,16 +22,10 @@ type ChatService interface {
type chatService struct { type chatService struct {
chatRepo repository.ChatRepository chatRepo repository.ChatRepository
userRepo repository.UserRepository userRepo repository.UserRepository
hub *ws.ChatHub hub *ws.Hub
} }
func NewChatService( func NewChatService(chatRepo repository.ChatRepository, userRepo repository.UserRepository, hub *ws.Hub) ChatService {
chatRepo repository.ChatRepository,
userRepo repository.UserRepository,
) ChatService {
hub := ws.NewChatHub()
go hub.Run() // Запускаем хаб в отдельной горутине
return &chatService{ return &chatService{
chatRepo: chatRepo, chatRepo: chatRepo,
userRepo: userRepo, userRepo: userRepo,
@ -40,18 +34,15 @@ func NewChatService(
} }
func (s *chatService) SendMessage(ctx context.Context, senderID, chatID int, content string) (*domain.Message, error) { func (s *chatService) SendMessage(ctx context.Context, senderID, chatID int, content string) (*domain.Message, error) {
// Проверяем существование чата
chat, err := s.chatRepo.GetChatByID(ctx, chatID) chat, err := s.chatRepo.GetChatByID(ctx, chatID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Проверяем, что отправитель является участником чата
if senderID != chat.User1ID && senderID != chat.User2ID { if senderID != chat.User1ID && senderID != chat.User2ID {
return nil, errors.New("user is not a participant of this chat") return nil, errors.New("user is not a participant of this chat")
} }
// Определяем получателя
receiverID := chat.User1ID receiverID := chat.User1ID
if senderID == chat.User1ID { if senderID == chat.User1ID {
receiverID = chat.User2ID receiverID = chat.User2ID
@ -60,7 +51,7 @@ func (s *chatService) SendMessage(ctx context.Context, senderID, chatID int, con
message := &domain.Message{ message := &domain.Message{
ChatID: chatID, ChatID: chatID,
SenderID: senderID, SenderID: senderID,
ReceiverID: receiverID, // Явно устанавливаем получателя ReceiverID: receiverID,
Content: content, Content: content,
Status: "sent", Status: "sent",
CreatedAt: time.Now(), CreatedAt: time.Now(),
@ -70,7 +61,7 @@ func (s *chatService) SendMessage(ctx context.Context, senderID, chatID int, con
return nil, err return nil, err
} }
// Отправляем через WebSocket // Отправляем сообщение через WebSocket
if s.hub != nil { if s.hub != nil {
s.hub.Broadcast(message) s.hub.Broadcast(message)
} }

View File

@ -16,10 +16,10 @@ type Services struct {
Recovery RecoveryService Recovery RecoveryService
Audit AuditService Audit AuditService
Chat ChatService Chat ChatService
ChatHub *ws.ChatHub ChatHub *ws.Hub
} }
func NewServices(authService AuthService, userService UserService, postService PostService, commentService CommentService, likeService LikeService, mailService MailService, auditService AuditService, recoveryService RecoveryService, sessionService SessionService, chatService ChatService, chatHub *ws.ChatHub) *Services { func NewServices(authService AuthService, userService UserService, postService PostService, commentService CommentService, likeService LikeService, mailService MailService, auditService AuditService, recoveryService RecoveryService, sessionService SessionService, chatService ChatService, chatHub *ws.Hub) *Services {
return &Services{ return &Services{
Auth: authService, Auth: authService,
User: userService, User: userService,

View File

@ -1,3 +1,4 @@
// ws/hub.go
package ws package ws
import ( import (
@ -8,80 +9,74 @@ import (
type Client struct { type Client struct {
UserID int UserID int
Send chan *domain.Message Send chan *domain.Message
once sync.Once // Для гарантии однократного закрытия
} }
func (c *Client) Close() { type Hub struct {
c.once.Do(func() {
close(c.Send)
})
}
type ChatHub struct {
clients map[int]*Client clients map[int]*Client
register chan *Client register chan *Client
unregister chan *Client unregister chan *Client
broadcast chan *domain.Message broadcast chan *domain.Message
mutex sync.RWMutex mu sync.RWMutex
} }
func NewChatHub() *ChatHub { func NewHub() *Hub {
return &ChatHub{ return &Hub{
clients: make(map[int]*Client), clients: make(map[int]*Client),
register: make(chan *Client), register: make(chan *Client),
unregister: make(chan *Client), unregister: make(chan *Client),
broadcast: make(chan *domain.Message), broadcast: make(chan *domain.Message, 100),
} }
} }
// Register добавляет нового клиента в хаб func (h *Hub) RegisterClient(client *Client) {
func (h *ChatHub) Register(client *Client) {
if h == nil || client == nil {
return
}
h.register <- client h.register <- client
} }
func (h *ChatHub) Unregister(client *Client) { func (h *Hub) UnregisterClient(client *Client) {
if h == nil || client == nil {
return
}
h.unregister <- client h.unregister <- client
} }
func (h *ChatHub) Broadcast(message *domain.Message) { func (h *Hub) Broadcast(message *domain.Message) {
if h == nil || message == nil {
return
}
h.broadcast <- message h.broadcast <- message
} }
func (h *ChatHub) Run() { func (h *Hub) Run() {
for { for {
select { select {
case client := <-h.register:
h.mu.Lock()
h.clients[client.UserID] = client
h.mu.Unlock()
case client := <-h.unregister: case client := <-h.unregister:
h.mutex.Lock() h.mu.Lock()
if c, ok := h.clients[client.UserID]; ok && c == client { if c, ok := h.clients[client.UserID]; ok {
client.Close() // Используем безопасное закрытие close(c.Send)
delete(h.clients, client.UserID) delete(h.clients, client.UserID)
} }
h.mutex.Unlock() h.mu.Unlock()
case message := <-h.broadcast: case message := <-h.broadcast:
h.mutex.RLock() h.mu.RLock()
// Отправляем сообщение отправителю
if sender, ok := h.clients[message.SenderID]; ok { if sender, ok := h.clients[message.SenderID]; ok {
select { select {
case sender.Send <- message: // Не блокируется, если канал закрыт case sender.Send <- message:
default: default:
close(sender.Send)
delete(h.clients, sender.UserID)
} }
} }
// Отправляем сообщение получателю
if receiver, ok := h.clients[message.ReceiverID]; ok { if receiver, ok := h.clients[message.ReceiverID]; ok {
select { select {
case receiver.Send <- message: case receiver.Send <- message:
default: default:
close(receiver.Send)
delete(h.clients, receiver.UserID)
} }
} }
h.mutex.RUnlock() h.mu.RUnlock()
} }
} }
} }