package graph import ( "context" "fmt" "log" "tailly_back_v2/internal/domain" "tailly_back_v2/proto" "time" ) // Subscription возвращает реализацию SubscriptionResolver func (r *Resolver) Subscription() SubscriptionResolver { return &subscriptionResolver{r} } type subscriptionResolver struct{ *Resolver } // CreateChat is the resolver for the createChat field. func (r *mutationResolver) CreateChat(ctx context.Context, user1Id int, user2Id int) (*domain.Chat, error) { // Просто вызываем gRPC метод, вся логика уже там res, err := r.MessageClient.CreateChat(ctx, &proto.CreateChatRequest{ User1Id: int32(user1Id), User2Id: int32(user2Id), }) if err != nil { return nil, fmt.Errorf("failed to create chat: %w", err) } // Преобразуем proto-чат в domain-модель return protoChatToDomain(res.Chat), nil } // SendMessage реализация мутации для отправки сообщения func (r *mutationResolver) SendMessage(ctx context.Context, chatID int, content string) (*domain.Message, error) { // Получаем senderID из контекста senderID, err := getUserIDFromContext(ctx) if err != nil { return nil, fmt.Errorf("authentication required: %w", err) } res, err := r.MessageClient.SendMessage(ctx, &proto.SendMessageRequest{ ChatId: int32(chatID), SenderId: int32(senderID), // Теперь senderId берется из контекста Content: content, }) if err != nil { return nil, fmt.Errorf("failed to send message: %w", err) } return protoMessageToDomain(res.Message), nil } // UpdateMessageStatus реализация мутации для обновления статуса сообщения func (r *mutationResolver) UpdateMessageStatus(ctx context.Context, messageID int, status MessageStatus) (*domain.Message, error) { var statusStr string switch status { case MessageStatusSent: statusStr = "SENT" case MessageStatusDelivered: statusStr = "DELIVERED" case MessageStatusRead: statusStr = "READ" default: return nil, fmt.Errorf("unknown message status: %v", status) } res, err := r.MessageClient.UpdateMessageStatus(ctx, &proto.UpdateMessageStatusRequest{ MessageId: int32(messageID), Status: statusStr, }) if err != nil { return nil, fmt.Errorf("failed to update message status: %w", err) } return protoMessageToDomain(res.Message), nil } // GetChat реализация запроса для получения чата func (r *queryResolver) GetChat(ctx context.Context, user1Id int, user2Id int) (*domain.Chat, error) { res, err := r.MessageClient.GetChat(ctx, &proto.GetChatRequest{ User1Id: int32(user1Id), User2Id: int32(user2Id), }) if err != nil { return nil, fmt.Errorf("failed to get chat: %w", err) } return protoChatToDomain(res.Chat), nil } // GetChatMessages реализация запроса для получения сообщений чата func (r *queryResolver) GetChatMessages(ctx context.Context, chatID int, limit int, offset int) ([]*domain.Message, error) { res, err := r.MessageClient.GetChatMessages(ctx, &proto.GetChatMessagesRequest{ ChatId: int32(chatID), Limit: int32(limit), Offset: int32(offset), }) if err != nil { return nil, fmt.Errorf("failed to get chat messages: %w", err) } var messages []*domain.Message for _, msg := range res.Messages { messages = append(messages, protoMessageToDomain(msg)) } return messages, nil } // GetUserChats реализация запроса для получения чатов пользователя func (r *queryResolver) GetUserChats(ctx context.Context, userID int) ([]*domain.Chat, error) { res, err := r.MessageClient.GetUserChats(ctx, &proto.GetUserChatsRequest{ UserId: int32(userID), }) if err != nil { return nil, fmt.Errorf("failed to get user chats: %w", err) } var chats []*domain.Chat for _, chat := range res.Chats { chats = append(chats, protoChatToDomain(chat)) } return chats, nil } // MessageStream реализация подписки на новые сообщения func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<-chan *domain.Message, error) { messageChan := make(chan *domain.Message, 100) go func() { defer close(messageChan) defer log.Println("MessageStream: goroutine stopped") for { select { case <-ctx.Done(): log.Printf("MessageStream: context canceled (reason: %v)", ctx.Err()) return default: err := r.runMessageStream(ctx, userID, messageChan) if err != nil { log.Printf("MessageStream error: %v, reconnecting...", err) } time.Sleep(2 * time.Second) // Задержка перед переподключением } } }() return messageChan, nil } func (r *subscriptionResolver) runMessageStream(ctx context.Context, userID int, messageChan chan<- *domain.Message) error { log.Printf("Starting new stream for user %d", userID) _, err := r.MessageClient.UpdateMessageStatus(ctx, &proto.UpdateMessageStatusRequest{ MessageId: 0, // 0 = все сообщения для пользователя Status: "DELIVERED", UserId: int32(userID), }) if err != nil { log.Printf("Failed to mark messages as delivered: %v", err) } streamCtx, cancel := context.WithCancel(ctx) defer cancel() stream, err := r.MessageClient.StreamMessages(streamCtx, &proto.StreamMessagesRequest{ UserId: int32(userID), }) if err != nil { return fmt.Errorf("failed to create stream: %w", err) } heartbeat := time.NewTicker(25 * time.Second) defer heartbeat.Stop() for { select { case <-heartbeat.C: // Отправляем ping для поддержания соединения if err := stream.Context().Err(); err != nil { return fmt.Errorf("connection lost: %w", err) } case <-ctx.Done(): return nil default: msg, err := stream.Recv() if err != nil { return fmt.Errorf("receive error: %w", err) } if msg.GetMessage() != nil { select { case messageChan <- protoMessageToDomain(msg.Message): log.Printf("Delivered message %d to user %d", msg.Message.Id, userID) case <-ctx.Done(): return nil } } } } } // Преобразование proto-сообщения в domain-модель func protoMessageToDomain(msg *proto.Message) *domain.Message { return &domain.Message{ ID: int(msg.Id), ChatID: int(msg.ChatId), SenderID: int(msg.SenderId), ReceiverID: int(msg.ReceiverId), Content: msg.Content, Status: msg.Status, CreatedAt: msg.CreatedAt.AsTime(), } } // Преобразование proto-чата в domain-модель func protoChatToDomain(chat *proto.Chat) *domain.Chat { gqlChat := &domain.Chat{ ID: int(chat.Id), User1ID: int(chat.User1Id), User2ID: int(chat.User2Id), CreatedAt: chat.CreatedAt.AsTime(), UpdatedAt: chat.UpdatedAt.AsTime(), } if chat.LastMessage != nil { gqlChat.LastMessage = protoMessageToDomain(chat.LastMessage) } return gqlChat } // Реализации резолверов для отдельных полей type chatResolver struct{ *Resolver } type messageResolver struct{ *Resolver } // Chat возвращает ChatResolver func (r *Resolver) Chat() ChatResolver { return &chatResolver{r} } // Message возвращает MessageResolver func (r *Resolver) Message() MessageResolver { return &messageResolver{r} } // CreatedAt для Chat func (r *chatResolver) CreatedAt(ctx context.Context, obj *domain.Chat) (string, error) { return obj.CreatedAt.Format(time.RFC3339), nil } // UpdatedAt для Chat func (r *chatResolver) UpdatedAt(ctx context.Context, obj *domain.Chat) (string, error) { return obj.UpdatedAt.Format(time.RFC3339), nil } // Status для Message func (r *messageResolver) Status(ctx context.Context, obj *domain.Message) (MessageStatus, error) { return MessageStatus(obj.Status), nil } // CreatedAt для Message func (r *messageResolver) CreatedAt(ctx context.Context, obj *domain.Message) (string, error) { return obj.CreatedAt.Format(time.RFC3339), nil }