v0.0.17.3 Переработан websocket, добавлена обработка ping/pong
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
parent
68577d1878
commit
ef8595afae
@ -4,7 +4,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
|
||||||
"tailly_back_v2/internal/domain"
|
"tailly_back_v2/internal/domain"
|
||||||
"tailly_back_v2/internal/service"
|
"tailly_back_v2/internal/service"
|
||||||
"tailly_back_v2/internal/ws"
|
"tailly_back_v2/internal/ws"
|
||||||
@ -40,54 +39,8 @@ func NewChatHandler(chatService service.ChatService, hub *ws.Hub, tokenAuth *aut
|
|||||||
func (h *ChatHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
|
func (h *ChatHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
|
||||||
log.Printf("Incoming WebSocket headers: %+v", r.Header)
|
log.Printf("Incoming WebSocket headers: %+v", r.Header)
|
||||||
log.Printf("Cookies: %+v", r.Cookies())
|
log.Printf("Cookies: %+v", r.Cookies())
|
||||||
requestedProtocol := r.Header.Get("Sec-WebSocket-Protocol")
|
|
||||||
if requestedProtocol != "" && requestedProtocol != "graphql-transport-ws" {
|
|
||||||
http.Error(w, "Unsupported WebSocket protocol", http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
log.Printf("Requested protocols: %v", r.Header["Sec-WebSocket-Protocol"])
|
|
||||||
// 1. Проверяем куки
|
|
||||||
var token string
|
|
||||||
cookie, err := r.Cookie("accessToken")
|
|
||||||
if err == nil {
|
|
||||||
token = cookie.Value
|
|
||||||
log.Printf("WebSocket: токен из куки: %s", token)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Из заголовка Authorization
|
// Убираем все проверки токена здесь, так как будем проверять его в первом сообщении
|
||||||
if authHeader := r.Header.Get("Authorization"); authHeader != "" {
|
|
||||||
token = strings.TrimPrefix(authHeader, "Bearer ")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Из параметра URL
|
|
||||||
if token == "" {
|
|
||||||
token = r.URL.Query().Get("token")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Из куков
|
|
||||||
if token == "" {
|
|
||||||
if cookie, err := r.Cookie("accessToken"); err == nil {
|
|
||||||
token = cookie.Value
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if token == "" {
|
|
||||||
log.Println("WebSocket: токен не найден")
|
|
||||||
http.Error(w, "Token is required", http.StatusUnauthorized)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Валидация токена
|
|
||||||
userID, err := h.tokenAuth.ValidateAccessToken(token)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("WebSocket: ошибка валидации токена: %v", err)
|
|
||||||
http.Error(w, "Invalid token", http.StatusUnauthorized)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Printf("WebSocket: успешная авторизация, userID=%d", userID)
|
|
||||||
|
|
||||||
// 5. Обновление соединения
|
|
||||||
conn, err := upgrader.Upgrade(w, r, nil)
|
conn, err := upgrader.Upgrade(w, r, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("WebSocket upgrade error: %v", err)
|
log.Printf("WebSocket upgrade error: %v", err)
|
||||||
@ -96,32 +49,64 @@ func (h *ChatHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
client := &ws.Client{
|
client := &ws.Client{
|
||||||
UserID: userID,
|
UserID: 0, // Пока не авторизован
|
||||||
Send: make(chan *domain.Message, 256),
|
Send: make(chan *domain.Message, 256),
|
||||||
}
|
}
|
||||||
|
|
||||||
h.hub.RegisterClient(client)
|
h.hub.RegisterClient(client)
|
||||||
|
|
||||||
// Добавляем контекст для управления жизненным циклом соединения
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
// Запускаем горутины с обработкой контекста
|
go h.readPump(ctx, conn, client)
|
||||||
go h.readPump(ctx, conn, client, userID)
|
|
||||||
go h.writePump(ctx, conn, client)
|
go h.writePump(ctx, conn, client)
|
||||||
|
|
||||||
// Ждем завершения
|
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *ChatHandler) readPump(ctx context.Context, conn *websocket.Conn, client *ws.Client, userID int) {
|
func (h *ChatHandler) readPump(ctx context.Context, conn *websocket.Conn, client *ws.Client) {
|
||||||
ticker := time.NewTicker(30 * time.Second)
|
ticker := time.NewTicker(30 * time.Second)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
|
// 1. Ожидаем первое сообщение с токеном
|
||||||
|
var authMsg struct {
|
||||||
|
Type string `json:"type"`
|
||||||
|
Token string `json:"token"`
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := conn.ReadJSON(&authMsg); err != nil {
|
||||||
|
log.Printf("Failed to read auth message: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if authMsg.Type != "auth" || authMsg.Token == "" {
|
||||||
|
log.Println("First message must be auth with token")
|
||||||
|
conn.WriteJSON(map[string]string{"type": "error", "message": "Auth required"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Валидируем токен
|
||||||
|
userID, err := h.tokenAuth.ValidateAccessToken(authMsg.Token)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Token validation error: %v", err)
|
||||||
|
conn.WriteJSON(map[string]string{"type": "error", "message": "Invalid token"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
client.UserID = userID
|
||||||
|
log.Printf("WebSocket authenticated, userID=%d", userID)
|
||||||
|
|
||||||
|
// 3. Отправляем подтверждение авторизации
|
||||||
|
if err := conn.WriteJSON(map[string]string{"type": "auth_success"}); err != nil {
|
||||||
|
log.Printf("Failed to send auth confirmation: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. Основной цикл обработки сообщений
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
// Отправляем ping
|
|
||||||
if err := conn.WriteJSON(map[string]string{"type": "ping"}); err != nil {
|
if err := conn.WriteJSON(map[string]string{"type": "ping"}); err != nil {
|
||||||
log.Printf("Ping error: %v", err)
|
log.Printf("Ping error: %v", err)
|
||||||
return
|
return
|
||||||
@ -144,23 +129,19 @@ func (h *ChatHandler) readPump(ctx context.Context, conn *websocket.Conn, client
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if msg.Type == "pong" {
|
switch msg.Type {
|
||||||
|
case "pong":
|
||||||
continue
|
continue
|
||||||
}
|
case "ping":
|
||||||
|
|
||||||
// Обработка ping/pong
|
|
||||||
if msg.Type == "ping" {
|
|
||||||
conn.WriteJSON(map[string]string{"type": "pong"})
|
conn.WriteJSON(map[string]string{"type": "pong"})
|
||||||
continue
|
continue
|
||||||
}
|
case "message":
|
||||||
|
if client.UserID == 0 {
|
||||||
if msg.Type != "message" {
|
conn.WriteJSON(map[string]string{"type": "error", "message": "Not authenticated"})
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Проверяем receiverId
|
|
||||||
if msg.Payload.ReceiverID == 0 {
|
if msg.Payload.ReceiverID == 0 {
|
||||||
log.Printf("Invalid receiverId: 0")
|
|
||||||
conn.WriteJSON(map[string]interface{}{
|
conn.WriteJSON(map[string]interface{}{
|
||||||
"type": "error",
|
"type": "error",
|
||||||
"message": "Invalid receiver ID",
|
"message": "Invalid receiver ID",
|
||||||
@ -168,11 +149,7 @@ func (h *ChatHandler) readPump(ctx context.Context, conn *websocket.Conn, client
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Логирование для отладки
|
chat, err := h.chatService.GetOrCreateChat(ctx, client.UserID, msg.Payload.ReceiverID)
|
||||||
log.Printf("Received message from %d to %d", userID, msg.Payload.ReceiverID)
|
|
||||||
|
|
||||||
// Создаем или находим чат
|
|
||||||
chat, err := h.chatService.GetOrCreateChat(ctx, userID, msg.Payload.ReceiverID)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Chat error: %v", err)
|
log.Printf("Chat error: %v", err)
|
||||||
conn.WriteJSON(map[string]interface{}{
|
conn.WriteJSON(map[string]interface{}{
|
||||||
@ -183,10 +160,9 @@ func (h *ChatHandler) readPump(ctx context.Context, conn *websocket.Conn, client
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Отправляем сообщение
|
|
||||||
message, err := h.chatService.SendMessage(
|
message, err := h.chatService.SendMessage(
|
||||||
ctx,
|
ctx,
|
||||||
userID,
|
client.UserID,
|
||||||
chat.ID,
|
chat.ID,
|
||||||
msg.Payload.Content,
|
msg.Payload.Content,
|
||||||
)
|
)
|
||||||
@ -195,8 +171,10 @@ func (h *ChatHandler) readPump(ctx context.Context, conn *websocket.Conn, client
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Рассылаем сообщение
|
|
||||||
h.hub.Broadcast(message)
|
h.hub.Broadcast(message)
|
||||||
|
default:
|
||||||
|
conn.WriteJSON(map[string]string{"type": "error", "message": "Unknown message type"})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user