225 lines
5.5 KiB
Go
225 lines
5.5 KiB
Go
package handlers
|
||
|
||
import (
|
||
"context"
|
||
"log"
|
||
"net/http"
|
||
"strings"
|
||
"tailly_back_v2/internal/domain"
|
||
"tailly_back_v2/internal/service"
|
||
"tailly_back_v2/internal/ws"
|
||
"tailly_back_v2/pkg/auth"
|
||
"time"
|
||
|
||
"github.com/gorilla/websocket"
|
||
)
|
||
|
||
var upgrader = websocket.Upgrader{
|
||
ReadBufferSize: 1024,
|
||
WriteBufferSize: 1024,
|
||
CheckOrigin: func(r *http.Request) bool {
|
||
return true
|
||
},
|
||
Subprotocols: []string{"graphql-transport-ws"},
|
||
}
|
||
|
||
type ChatHandler struct {
|
||
chatService service.ChatService
|
||
hub *ws.Hub
|
||
tokenAuth *auth.TokenAuth
|
||
}
|
||
|
||
func NewChatHandler(chatService service.ChatService, hub *ws.Hub, tokenAuth *auth.TokenAuth) *ChatHandler {
|
||
return &ChatHandler{
|
||
chatService: chatService,
|
||
hub: hub,
|
||
tokenAuth: tokenAuth,
|
||
}
|
||
}
|
||
|
||
func (h *ChatHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
|
||
log.Printf("Incoming WebSocket headers: %+v", r.Header)
|
||
log.Printf("Cookies: %+v", r.Cookies())
|
||
requestedProtocol := r.Header.Get("Sec-WebSocket-Protocol")
|
||
if requestedProtocol != "" && requestedProtocol != "graphql-transport-ws" {
|
||
http.Error(w, "Unsupported WebSocket protocol", http.StatusBadRequest)
|
||
return
|
||
}
|
||
log.Printf("Requested protocols: %v", r.Header["Sec-WebSocket-Protocol"])
|
||
// 1. Проверяем куки
|
||
var token string
|
||
cookie, err := r.Cookie("accessToken")
|
||
if err == nil {
|
||
token = cookie.Value
|
||
log.Printf("WebSocket: токен из куки: %s", token)
|
||
}
|
||
|
||
// Из заголовка Authorization
|
||
if authHeader := r.Header.Get("Authorization"); authHeader != "" {
|
||
token = strings.TrimPrefix(authHeader, "Bearer ")
|
||
}
|
||
|
||
// Из параметра URL
|
||
if token == "" {
|
||
token = r.URL.Query().Get("token")
|
||
}
|
||
|
||
// Из куков
|
||
if token == "" {
|
||
if cookie, err := r.Cookie("accessToken"); err == nil {
|
||
token = cookie.Value
|
||
}
|
||
}
|
||
|
||
if token == "" {
|
||
log.Println("WebSocket: токен не найден")
|
||
http.Error(w, "Token is required", http.StatusUnauthorized)
|
||
return
|
||
}
|
||
|
||
// Валидация токена
|
||
userID, err := h.tokenAuth.ValidateAccessToken(token)
|
||
if err != nil {
|
||
log.Printf("WebSocket: ошибка валидации токена: %v", err)
|
||
http.Error(w, "Invalid token", http.StatusUnauthorized)
|
||
return
|
||
}
|
||
|
||
log.Printf("WebSocket: успешная авторизация, userID=%d", userID)
|
||
|
||
// 5. Обновление соединения
|
||
conn, err := upgrader.Upgrade(w, r, nil)
|
||
if err != nil {
|
||
log.Printf("WebSocket upgrade error: %v", err)
|
||
http.Error(w, "Could not upgrade to WebSocket", http.StatusBadRequest)
|
||
return
|
||
}
|
||
|
||
client := &ws.Client{
|
||
UserID: userID,
|
||
Send: make(chan *domain.Message, 256),
|
||
}
|
||
|
||
h.hub.RegisterClient(client)
|
||
|
||
// Добавляем контекст для управления жизненным циклом соединения
|
||
ctx, cancel := context.WithCancel(context.Background())
|
||
defer cancel()
|
||
|
||
// Запускаем горутины с обработкой контекста
|
||
go h.readPump(ctx, conn, client, userID)
|
||
go h.writePump(ctx, conn, client)
|
||
|
||
// Ждем завершения
|
||
<-ctx.Done()
|
||
}
|
||
|
||
func (h *ChatHandler) readPump(ctx context.Context, conn *websocket.Conn, client *ws.Client, userID int) {
|
||
ticker := time.NewTicker(30 * time.Second)
|
||
defer ticker.Stop()
|
||
|
||
for {
|
||
select {
|
||
case <-ticker.C:
|
||
// Отправляем ping
|
||
if err := conn.WriteJSON(map[string]string{"type": "ping"}); err != nil {
|
||
log.Printf("Ping error: %v", err)
|
||
return
|
||
}
|
||
case <-ctx.Done():
|
||
return
|
||
default:
|
||
var msg struct {
|
||
Type string `json:"type"`
|
||
Payload struct {
|
||
ReceiverID int `json:"receiverId"`
|
||
Content string `json:"content"`
|
||
} `json:"payload"`
|
||
}
|
||
|
||
if err := conn.ReadJSON(&msg); err != nil {
|
||
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
|
||
log.Printf("WebSocket error: %v", err)
|
||
}
|
||
return
|
||
}
|
||
|
||
if msg.Type == "pong" {
|
||
continue
|
||
}
|
||
|
||
// Обработка ping/pong
|
||
if msg.Type == "ping" {
|
||
conn.WriteJSON(map[string]string{"type": "pong"})
|
||
continue
|
||
}
|
||
|
||
if msg.Type != "message" {
|
||
continue
|
||
}
|
||
|
||
// Проверяем receiverId
|
||
if msg.Payload.ReceiverID == 0 {
|
||
log.Printf("Invalid receiverId: 0")
|
||
conn.WriteJSON(map[string]interface{}{
|
||
"type": "error",
|
||
"message": "Invalid receiver ID",
|
||
})
|
||
continue
|
||
}
|
||
|
||
// Логирование для отладки
|
||
log.Printf("Received message from %d to %d", userID, msg.Payload.ReceiverID)
|
||
|
||
// Создаем или находим чат
|
||
chat, err := h.chatService.GetOrCreateChat(ctx, userID, msg.Payload.ReceiverID)
|
||
if err != nil {
|
||
log.Printf("Chat error: %v", err)
|
||
conn.WriteJSON(map[string]interface{}{
|
||
"type": "error",
|
||
"message": "Chat error",
|
||
"details": err.Error(),
|
||
})
|
||
continue
|
||
}
|
||
|
||
// Отправляем сообщение
|
||
message, err := h.chatService.SendMessage(
|
||
ctx,
|
||
userID,
|
||
chat.ID,
|
||
msg.Payload.Content,
|
||
)
|
||
if err != nil {
|
||
log.Printf("Message send error: %v", err)
|
||
continue
|
||
}
|
||
|
||
// Рассылаем сообщение
|
||
h.hub.Broadcast(message)
|
||
}
|
||
}
|
||
}
|
||
|
||
func (h *ChatHandler) writePump(ctx context.Context, conn *websocket.Conn, client *ws.Client) {
|
||
defer conn.Close()
|
||
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
case message, ok := <-client.Send:
|
||
if !ok {
|
||
// Канал закрыт
|
||
conn.WriteMessage(websocket.CloseMessage, []byte{})
|
||
return
|
||
}
|
||
|
||
if err := conn.WriteJSON(message); err != nil {
|
||
log.Printf("WebSocket write error: %v", err)
|
||
return
|
||
}
|
||
}
|
||
}
|
||
}
|