v0.0.16 Исправление работы мессенджера
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
parent
ad9781a34b
commit
6ad760a85b
@ -72,7 +72,7 @@ func main() {
|
||||
postService := service.NewPostService(postRepo)
|
||||
commentService := service.NewCommentService(commentRepo, postRepo)
|
||||
likeService := service.NewLikeService(likeRepo, postRepo)
|
||||
chatService := service.NewChatService(chatRepo, userRepo, chatHub)
|
||||
chatService := service.NewChatService(chatRepo, userRepo)
|
||||
auditService := service.NewAuditService(auditRepo)
|
||||
recoveryService := service.NewRecoveryService(recoveryRepo, userRepo, sessionRepo, deviceRepo, mailService)
|
||||
sessionService := service.NewSessionService(sessionRepo, deviceRepo, userRepo, mailService)
|
||||
@ -89,6 +89,7 @@ func main() {
|
||||
Recovery: recoveryService,
|
||||
Session: sessionService,
|
||||
Mail: mailService,
|
||||
ChatHub: chatHub, // Добавляем хаб в Services
|
||||
}
|
||||
|
||||
// HTTP сервер - передаем db как дополнительный параметр
|
||||
|
||||
@ -18,3 +18,8 @@ type Message struct {
|
||||
Status string `json:"status"` // "sent", "delivered", "read"
|
||||
CreatedAt time.Time `json:"createdAt"`
|
||||
}
|
||||
type ChatSession struct {
|
||||
User *User `json:"user"`
|
||||
LastMessage *Message `json:"lastMessage"`
|
||||
UnreadCount int `json:"unreadCount"`
|
||||
}
|
||||
|
||||
@ -99,6 +99,7 @@ type ComplexityRoot struct {
|
||||
Mutation struct {
|
||||
ChangePassword func(childComplexity int, oldPassword string, newPassword string) int
|
||||
ConfirmEmail func(childComplexity int, token string) int
|
||||
CreateChat func(childComplexity int, userID int) int
|
||||
CreateComment func(childComplexity int, postID int, content string) int
|
||||
CreatePost func(childComplexity int, title string, content string) int
|
||||
DeletePost func(childComplexity int, id int) int
|
||||
@ -209,6 +210,7 @@ type MutationResolver interface {
|
||||
RenameDevice(ctx context.Context, deviceID int, name string) (*domain.Device, error)
|
||||
RequestEmailConfirmation(ctx context.Context) (bool, error)
|
||||
ConfirmEmail(ctx context.Context, token string) (bool, error)
|
||||
CreateChat(ctx context.Context, userID int) (*ChatSession, error)
|
||||
ResendEmailConfirmation(ctx context.Context) (bool, error)
|
||||
DeletePost(ctx context.Context, id int) (bool, error)
|
||||
}
|
||||
@ -461,6 +463,18 @@ func (e *executableSchema) Complexity(ctx context.Context, typeName, field strin
|
||||
|
||||
return e.complexity.Mutation.ConfirmEmail(childComplexity, args["token"].(string)), true
|
||||
|
||||
case "Mutation.createChat":
|
||||
if e.complexity.Mutation.CreateChat == nil {
|
||||
break
|
||||
}
|
||||
|
||||
args, err := ec.field_Mutation_createChat_args(ctx, rawArgs)
|
||||
if err != nil {
|
||||
return 0, false
|
||||
}
|
||||
|
||||
return e.complexity.Mutation.CreateChat(childComplexity, args["userId"].(int)), true
|
||||
|
||||
case "Mutation.createComment":
|
||||
if e.complexity.Mutation.CreateComment == nil {
|
||||
break
|
||||
@ -1112,6 +1126,17 @@ func (ec *executionContext) field_Mutation_confirmEmail_args(ctx context.Context
|
||||
return args, nil
|
||||
}
|
||||
|
||||
func (ec *executionContext) field_Mutation_createChat_args(ctx context.Context, rawArgs map[string]any) (map[string]any, error) {
|
||||
var err error
|
||||
args := map[string]any{}
|
||||
arg0, err := processArgField(ctx, rawArgs, "userId", ec.unmarshalNInt2int)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
args["userId"] = arg0
|
||||
return args, nil
|
||||
}
|
||||
|
||||
func (ec *executionContext) field_Mutation_createComment_args(ctx context.Context, rawArgs map[string]any) (map[string]any, error) {
|
||||
var err error
|
||||
args := map[string]any{}
|
||||
@ -3539,6 +3564,69 @@ func (ec *executionContext) fieldContext_Mutation_confirmEmail(ctx context.Conte
|
||||
return fc, nil
|
||||
}
|
||||
|
||||
func (ec *executionContext) _Mutation_createChat(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) {
|
||||
fc, err := ec.fieldContext_Mutation_createChat(ctx, field)
|
||||
if err != nil {
|
||||
return graphql.Null
|
||||
}
|
||||
ctx = graphql.WithFieldContext(ctx, fc)
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
ec.Error(ctx, ec.Recover(ctx, r))
|
||||
ret = graphql.Null
|
||||
}
|
||||
}()
|
||||
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (any, error) {
|
||||
ctx = rctx // use context from middleware stack in children
|
||||
return ec.resolvers.Mutation().CreateChat(rctx, fc.Args["userId"].(int))
|
||||
})
|
||||
if err != nil {
|
||||
ec.Error(ctx, err)
|
||||
return graphql.Null
|
||||
}
|
||||
if resTmp == nil {
|
||||
if !graphql.HasFieldError(ctx, fc) {
|
||||
ec.Errorf(ctx, "must not be null")
|
||||
}
|
||||
return graphql.Null
|
||||
}
|
||||
res := resTmp.(*ChatSession)
|
||||
fc.Result = res
|
||||
return ec.marshalNChatSession2ᚖtailly_back_v2ᚋinternalᚋhttpᚋgraphᚐChatSession(ctx, field.Selections, res)
|
||||
}
|
||||
|
||||
func (ec *executionContext) fieldContext_Mutation_createChat(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
|
||||
fc = &graphql.FieldContext{
|
||||
Object: "Mutation",
|
||||
Field: field,
|
||||
IsMethod: true,
|
||||
IsResolver: true,
|
||||
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
|
||||
switch field.Name {
|
||||
case "user":
|
||||
return ec.fieldContext_ChatSession_user(ctx, field)
|
||||
case "lastMessage":
|
||||
return ec.fieldContext_ChatSession_lastMessage(ctx, field)
|
||||
case "unreadCount":
|
||||
return ec.fieldContext_ChatSession_unreadCount(ctx, field)
|
||||
}
|
||||
return nil, fmt.Errorf("no field named %q was found under type ChatSession", field.Name)
|
||||
},
|
||||
}
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
err = ec.Recover(ctx, r)
|
||||
ec.Error(ctx, err)
|
||||
}
|
||||
}()
|
||||
ctx = graphql.WithFieldContext(ctx, fc)
|
||||
if fc.Args, err = ec.field_Mutation_createChat_args(ctx, field.ArgumentMap(ec.Variables)); err != nil {
|
||||
ec.Error(ctx, err)
|
||||
return fc, err
|
||||
}
|
||||
return fc, nil
|
||||
}
|
||||
|
||||
func (ec *executionContext) _Mutation_resendEmailConfirmation(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) {
|
||||
fc, err := ec.fieldContext_Mutation_resendEmailConfirmation(ctx, field)
|
||||
if err != nil {
|
||||
@ -8510,6 +8598,13 @@ func (ec *executionContext) _Mutation(ctx context.Context, sel ast.SelectionSet)
|
||||
if out.Values[i] == graphql.Null {
|
||||
out.Invalids++
|
||||
}
|
||||
case "createChat":
|
||||
out.Values[i] = ec.OperationContext.RootResolverMiddleware(innerCtx, func(ctx context.Context) (res graphql.Marshaler) {
|
||||
return ec._Mutation_createChat(ctx, field)
|
||||
})
|
||||
if out.Values[i] == graphql.Null {
|
||||
out.Invalids++
|
||||
}
|
||||
case "resendEmailConfirmation":
|
||||
out.Values[i] = ec.OperationContext.RootResolverMiddleware(innerCtx, func(ctx context.Context) (res graphql.Marshaler) {
|
||||
return ec._Mutation_resendEmailConfirmation(ctx, field)
|
||||
@ -9890,6 +9985,10 @@ func (ec *executionContext) marshalNBoolean2bool(ctx context.Context, sel ast.Se
|
||||
return res
|
||||
}
|
||||
|
||||
func (ec *executionContext) marshalNChatSession2tailly_back_v2ᚋinternalᚋhttpᚋgraphᚐChatSession(ctx context.Context, sel ast.SelectionSet, v ChatSession) graphql.Marshaler {
|
||||
return ec._ChatSession(ctx, sel, &v)
|
||||
}
|
||||
|
||||
func (ec *executionContext) marshalNChatSession2ᚕᚖtailly_back_v2ᚋinternalᚋhttpᚋgraphᚐChatSessionᚄ(ctx context.Context, sel ast.SelectionSet, v []*ChatSession) graphql.Marshaler {
|
||||
ret := make(graphql.Array, len(v))
|
||||
var wg sync.WaitGroup
|
||||
|
||||
@ -38,6 +38,11 @@ func (r *queryResolver) GetUserChats(ctx context.Context) ([]*ChatSession, error
|
||||
return nil, errors.New("не авторизован")
|
||||
}
|
||||
|
||||
// Проверяем инициализацию сервиса
|
||||
if r.Services == nil || r.Services.Chat == nil {
|
||||
return nil, errors.New("chat service not initialized")
|
||||
}
|
||||
|
||||
// Получаем чаты пользователя
|
||||
chats, err := r.Services.Chat.GetUserChats(ctx, userID)
|
||||
if err != nil {
|
||||
@ -68,12 +73,17 @@ func (r *queryResolver) GetUserChats(ctx context.Context) ([]*ChatSession, error
|
||||
if len(messages) > 0 {
|
||||
lastMessage = messages[0]
|
||||
} else {
|
||||
// Если нет сообщений, возвращаем ошибку, так как в схеме lastMessage обязательное поле
|
||||
continue
|
||||
// Создаем пустое сообщение, если чат новый
|
||||
lastMessage = &domain.Message{
|
||||
ChatID: chat.ID,
|
||||
Content: "Чат создан",
|
||||
Status: "system",
|
||||
CreatedAt: chat.CreatedAt,
|
||||
}
|
||||
}
|
||||
|
||||
// Получаем количество непрочитанных сообщений
|
||||
unreadCount, err := r.chatRepo.GetUnreadCount(ctx, chat.ID, userID)
|
||||
unreadCount, err := r.Services.Chat.GetUnreadCount(ctx, chat.ID, userID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("ошибка получения количества непрочитанных: %v", err)
|
||||
}
|
||||
@ -128,17 +138,17 @@ func (r *mutationResolver) SendMessage(ctx context.Context, receiverID int, cont
|
||||
return nil, errors.New("не авторизован")
|
||||
}
|
||||
|
||||
// Проверяем, что не отправляем сообщение себе
|
||||
if senderID == receiverID {
|
||||
return nil, errors.New("cannot send message to yourself")
|
||||
}
|
||||
|
||||
chat, err := r.Services.Chat.GetOrCreateChat(ctx, senderID, receiverID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("ошибка создания чата: %v", err)
|
||||
}
|
||||
|
||||
message, err := r.Services.Chat.SendMessage(ctx, senderID, chat.ID, content)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("ошибка отправки сообщения: %v", err)
|
||||
}
|
||||
|
||||
return message, nil
|
||||
return r.Services.Chat.SendMessage(ctx, senderID, chat.ID, content)
|
||||
}
|
||||
|
||||
// MarkAsRead - помечает сообщение как прочитанное
|
||||
@ -198,9 +208,49 @@ func (r *subscriptionResolver) MessageReceived(ctx context.Context) (<-chan *dom
|
||||
// Горутина для обработки отключения
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
r.Services.ChatHub.Unregister(client)
|
||||
// Добавляем защиту от повторного закрытия
|
||||
select {
|
||||
case <-messageChan: // Если канал уже закрыт
|
||||
default:
|
||||
close(messageChan)
|
||||
}
|
||||
r.Services.ChatHub.Unregister(client)
|
||||
}()
|
||||
|
||||
return messageChan, nil
|
||||
}
|
||||
|
||||
// CreateChat is the resolver for the createChat field.
|
||||
func (r *mutationResolver) CreateChat(ctx context.Context, userID int) (*ChatSession, error) {
|
||||
currentUserID, err := getUserIDFromContext(ctx)
|
||||
if err != nil {
|
||||
return nil, errors.New("не авторизован")
|
||||
}
|
||||
|
||||
// Создаем или получаем существующий чат
|
||||
chat, err := r.Services.Chat.GetOrCreateChat(ctx, currentUserID, userID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("ошибка создания чата: %v", err)
|
||||
}
|
||||
|
||||
// Получаем данные другого пользователя
|
||||
otherUser, err := r.Services.User.GetByID(ctx, userID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("ошибка получения пользователя: %v", err)
|
||||
}
|
||||
|
||||
// Создаем пустое последнее сообщение (или можно вернуть nil, если схема позволяет)
|
||||
emptyMessage := &domain.Message{
|
||||
ChatID: chat.ID,
|
||||
SenderID: currentUserID,
|
||||
Content: "Чат создан",
|
||||
Status: "system",
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
|
||||
return &ChatSession{
|
||||
User: otherUser,
|
||||
LastMessage: emptyMessage,
|
||||
UnreadCount: 0,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -140,7 +140,7 @@ type Mutation {
|
||||
|
||||
# Подтверждение email по токену
|
||||
confirmEmail(token: String!): Boolean!
|
||||
|
||||
createChat(userId: Int!): ChatSession!
|
||||
# Повторная отправка подтверждения email
|
||||
resendEmailConfirmation: Boolean!
|
||||
deletePost(id: Int!): Boolean!
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"tailly_back_v2/internal/domain"
|
||||
"time"
|
||||
)
|
||||
@ -38,14 +39,19 @@ func NewChatRepository(db *sql.DB) ChatRepository {
|
||||
}
|
||||
|
||||
func (r *chatRepository) SaveMessage(ctx context.Context, message *domain.Message) error {
|
||||
if message.ReceiverID == 0 {
|
||||
return errors.New("receiver_id is required")
|
||||
}
|
||||
|
||||
query := `
|
||||
INSERT INTO messages (chat_id, sender_id, content, status, created_at)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
INSERT INTO messages (chat_id, sender_id, receiver_id, content, status, created_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6)
|
||||
RETURNING id
|
||||
`
|
||||
err := r.db.QueryRowContext(ctx, query,
|
||||
message.ChatID,
|
||||
message.SenderID,
|
||||
message.ReceiverID,
|
||||
message.Content,
|
||||
message.Status,
|
||||
message.CreatedAt,
|
||||
@ -123,6 +129,16 @@ func (r *chatRepository) DeleteMessage(ctx context.Context, id int) error {
|
||||
}
|
||||
|
||||
func (r *chatRepository) CreateChat(ctx context.Context, user1ID, user2ID int) (*domain.Chat, error) {
|
||||
// Проверяем, что пользователи разные
|
||||
if user1ID == user2ID {
|
||||
return nil, errors.New("cannot create chat with yourself")
|
||||
}
|
||||
|
||||
// Упорядочиваем ID пользователей согласно CHECK constraint
|
||||
if user1ID > user2ID {
|
||||
user1ID, user2ID = user2ID, user1ID
|
||||
}
|
||||
|
||||
query := `
|
||||
INSERT INTO chats (user1_id, user2_id, created_at)
|
||||
VALUES ($1, $2, $3)
|
||||
@ -134,7 +150,10 @@ func (r *chatRepository) CreateChat(ctx context.Context, user1ID, user2ID int) (
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
err := r.db.QueryRowContext(ctx, query, user1ID, user2ID, chat.CreatedAt).Scan(&chat.ID)
|
||||
return chat, err
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create chat: %v", err)
|
||||
}
|
||||
return chat, nil
|
||||
}
|
||||
|
||||
func (r *chatRepository) GetChatByID(ctx context.Context, id int) (*domain.Chat, error) {
|
||||
@ -186,11 +205,15 @@ func (r *chatRepository) GetUserChats(ctx context.Context, userID int) ([]*domai
|
||||
}
|
||||
|
||||
func (r *chatRepository) GetChatByParticipants(ctx context.Context, user1ID, user2ID int) (*domain.Chat, error) {
|
||||
// Упорядочиваем ID пользователей согласно CHECK constraint
|
||||
if user1ID > user2ID {
|
||||
user1ID, user2ID = user2ID, user1ID
|
||||
}
|
||||
|
||||
query := `
|
||||
SELECT id, user1_id, user2_id, created_at
|
||||
FROM chats
|
||||
WHERE (user1_id = $1 AND user2_id = $2)
|
||||
OR (user1_id = $2 AND user2_id = $1)
|
||||
WHERE user1_id = $1 AND user2_id = $2
|
||||
LIMIT 1
|
||||
`
|
||||
chat := &domain.Chat{}
|
||||
|
||||
@ -16,6 +16,7 @@ type ChatService interface {
|
||||
DeleteMessage(ctx context.Context, messageID, userID int) error
|
||||
GetUserChats(ctx context.Context, userID int) ([]*domain.Chat, error)
|
||||
GetOrCreateChat(ctx context.Context, user1ID, user2ID int) (*domain.Chat, error)
|
||||
GetUnreadCount(ctx context.Context, chatID, userID int) (int, error)
|
||||
}
|
||||
|
||||
type chatService struct {
|
||||
@ -27,8 +28,10 @@ type chatService struct {
|
||||
func NewChatService(
|
||||
chatRepo repository.ChatRepository,
|
||||
userRepo repository.UserRepository,
|
||||
hub *ws.ChatHub,
|
||||
) ChatService {
|
||||
hub := ws.NewChatHub()
|
||||
go hub.Run() // Запускаем хаб в отдельной горутине
|
||||
|
||||
return &chatService{
|
||||
chatRepo: chatRepo,
|
||||
userRepo: userRepo,
|
||||
@ -48,9 +51,16 @@ func (s *chatService) SendMessage(ctx context.Context, senderID, chatID int, con
|
||||
return nil, errors.New("user is not a participant of this chat")
|
||||
}
|
||||
|
||||
// Определяем получателя
|
||||
receiverID := chat.User1ID
|
||||
if senderID == chat.User1ID {
|
||||
receiverID = chat.User2ID
|
||||
}
|
||||
|
||||
message := &domain.Message{
|
||||
ChatID: chatID,
|
||||
SenderID: senderID,
|
||||
ReceiverID: receiverID, // Явно устанавливаем получателя
|
||||
Content: content,
|
||||
Status: "sent",
|
||||
CreatedAt: time.Now(),
|
||||
@ -61,21 +71,10 @@ func (s *chatService) SendMessage(ctx context.Context, senderID, chatID int, con
|
||||
}
|
||||
|
||||
// Отправляем через WebSocket
|
||||
recipientID := chat.User1ID
|
||||
if senderID == chat.User1ID {
|
||||
recipientID = chat.User2ID
|
||||
if s.hub != nil {
|
||||
s.hub.Broadcast(message)
|
||||
}
|
||||
|
||||
s.hub.Broadcast(&domain.Message{
|
||||
ID: message.ID,
|
||||
ChatID: chatID,
|
||||
SenderID: senderID,
|
||||
ReceiverID: recipientID,
|
||||
Content: content,
|
||||
Status: "sent",
|
||||
CreatedAt: message.CreatedAt,
|
||||
})
|
||||
|
||||
return message, nil
|
||||
}
|
||||
|
||||
@ -128,3 +127,6 @@ func (s *chatService) GetOrCreateChat(ctx context.Context, user1ID, user2ID int)
|
||||
// Создаем новый чат
|
||||
return s.chatRepo.CreateChat(ctx, user1ID, user2ID)
|
||||
}
|
||||
func (s *chatService) GetUnreadCount(ctx context.Context, chatID, userID int) (int, error) {
|
||||
return s.chatRepo.GetUnreadCount(ctx, chatID, userID)
|
||||
}
|
||||
|
||||
@ -8,6 +8,13 @@ import (
|
||||
type Client struct {
|
||||
UserID int
|
||||
Send chan *domain.Message
|
||||
once sync.Once // Для гарантии однократного закрытия
|
||||
}
|
||||
|
||||
func (c *Client) Close() {
|
||||
c.once.Do(func() {
|
||||
close(c.Send)
|
||||
})
|
||||
}
|
||||
|
||||
type ChatHub struct {
|
||||
@ -29,43 +36,50 @@ func NewChatHub() *ChatHub {
|
||||
|
||||
// Register добавляет нового клиента в хаб
|
||||
func (h *ChatHub) Register(client *Client) {
|
||||
if h == nil || client == nil {
|
||||
return
|
||||
}
|
||||
h.register <- client
|
||||
}
|
||||
|
||||
// Unregister удаляет клиента из хаба
|
||||
func (h *ChatHub) Unregister(client *Client) {
|
||||
if h == nil || client == nil {
|
||||
return
|
||||
}
|
||||
h.unregister <- client
|
||||
}
|
||||
|
||||
// Broadcast отправляет сообщение всем клиентам
|
||||
func (h *ChatHub) Broadcast(message *domain.Message) {
|
||||
if h == nil || message == nil {
|
||||
return
|
||||
}
|
||||
h.broadcast <- message
|
||||
}
|
||||
|
||||
func (h *ChatHub) Run() {
|
||||
for {
|
||||
select {
|
||||
case client := <-h.register:
|
||||
h.mutex.Lock()
|
||||
h.clients[client.UserID] = client
|
||||
h.mutex.Unlock()
|
||||
|
||||
case client := <-h.unregister:
|
||||
h.mutex.Lock()
|
||||
if _, ok := h.clients[client.UserID]; ok {
|
||||
close(client.Send)
|
||||
if c, ok := h.clients[client.UserID]; ok && c == client {
|
||||
client.Close() // Используем безопасное закрытие
|
||||
delete(h.clients, client.UserID)
|
||||
}
|
||||
h.mutex.Unlock()
|
||||
|
||||
case message := <-h.broadcast:
|
||||
h.mutex.RLock()
|
||||
// Отправляем отправителю и получателю
|
||||
if sender, ok := h.clients[message.SenderID]; ok {
|
||||
sender.Send <- message
|
||||
select {
|
||||
case sender.Send <- message: // Не блокируется, если канал закрыт
|
||||
default:
|
||||
}
|
||||
}
|
||||
if receiver, ok := h.clients[message.ReceiverID]; ok {
|
||||
receiver.Send <- message
|
||||
select {
|
||||
case receiver.Send <- message:
|
||||
default:
|
||||
}
|
||||
}
|
||||
h.mutex.RUnlock()
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user