v0.0.17.4
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
madipo2611 2025-08-11 19:04:08 +03:00
parent c98667c5b5
commit 8c84a5d7d5
2 changed files with 73 additions and 16 deletions

View File

@ -42,17 +42,17 @@ func (h *ChatHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
log.Printf("Incoming WebSocket headers: %+v", r.Header)
log.Printf("Cookies: %+v", r.Cookies())
// Убираем все проверки токена здесь, так как будем проверять его в первом сообщении
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("WebSocket upgrade error: %v", err)
http.Error(w, "Could not upgrade to WebSocket", http.StatusBadRequest)
return
}
// Создаем клиента без userID (он будет установлен при аутентификации)
client := &ws.Client{
UserID: 0, // Пока не авторизован
Send: make(chan *domain.Message, 256),
LastSeen: time.Now(),
CloseChan: make(chan bool, 1),
}
h.hub.RegisterClient(client)
@ -60,10 +60,35 @@ func (h *ChatHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Горутина для чтения (теперь включает аутентификацию)
go h.readPump(ctx, conn, client)
go h.writePump(ctx, conn, client)
<-ctx.Done()
// Горутина для записи с обработкой закрытия
go func() {
defer conn.Close()
defer h.hub.UnregisterClient(client)
for {
select {
case <-ctx.Done():
return
case <-client.CloseChan:
return
case message, ok := <-client.Send:
if !ok {
return
}
// Добавляем таймаут на запись
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := conn.WriteJSON(message); err != nil {
log.Printf("WebSocket write error: %v", err)
return
}
conn.SetWriteDeadline(time.Time{}) // Сбрасываем таймаут
}
}
}()
}
func (h *ChatHandler) readPump(ctx context.Context, conn *websocket.Conn, client *ws.Client) {

View File

@ -4,11 +4,14 @@ import (
"log"
"sync"
"tailly_back_v2/internal/domain"
"time"
)
type Client struct {
UserID int
Send chan *domain.Message
LastSeen time.Time
CloseChan chan bool
}
type Hub struct {
@ -20,15 +23,39 @@ type Hub struct {
}
func NewHub() *Hub {
return &Hub{
hub := &Hub{
clients: make(map[int]*Client),
register: make(chan *Client),
unregister: make(chan *Client),
broadcast: make(chan *domain.Message, 100),
}
go hub.cleanupInactiveClients()
return hub
}
// Добавляем периодическую очистку неактивных клиентов
func (h *Hub) cleanupInactiveClients() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
h.mu.Lock()
for userID, client := range h.clients {
if time.Since(client.LastSeen) > 10*time.Minute {
log.Printf("Cleaning up inactive client: %d", userID)
close(client.Send)
delete(h.clients, userID)
}
}
h.mu.Unlock()
}
}
}
func (h *Hub) RegisterClient(client *Client) {
client.LastSeen = time.Now()
h.register <- client
}
@ -55,8 +82,15 @@ func (h *Hub) Run() {
select {
case client := <-h.register:
h.mu.Lock()
// Обновляем LastSeen при регистрации
client.LastSeen = time.Now()
// Закрываем предыдущее соединение если есть
if existing, ok := h.clients[client.UserID]; ok {
select {
case existing.CloseChan <- true: // Уведомляем о закрытии
default:
}
close(existing.Send)
}
h.clients[client.UserID] = client
@ -75,16 +109,14 @@ func (h *Hub) Run() {
// Отправляем всем клиентам, кто участвует в этом чате
for _, client := range h.clients {
if client.UserID == message.SenderID || client.UserID == message.ReceiverID {
client.LastSeen = time.Now() // Обновляем время активности
select {
case client.Send <- message:
// Сообщение успешно отправлено
default:
// Если канал полон, закрываем соединение
close(client.Send)
h.mu.RUnlock()
h.mu.Lock()
delete(h.clients, client.UserID)
h.mu.Unlock()
h.mu.RLock()
log.Printf("Client %d channel busy, skipping message", client.UserID)
// Не закрываем соединение, просто пропускаем сообщение
}
}
}