tailly_back_v2/internal/http/graph/messages_resolvers.go
admin db403171da
All checks were successful
continuous-integration/drone/push Build is passing
v0.0.25 Добавлен gRPC сервис подписок/пидписчиков
2025-08-26 09:48:50 +03:00

260 lines
8.0 KiB
Go

package graph
import (
"context"
"fmt"
"log"
"tailly_back_v2/internal/domain"
"tailly_back_v2/proto"
"time"
)
// Subscription возвращает реализацию SubscriptionResolver
func (r *Resolver) Subscription() SubscriptionResolver {
return &subscriptionResolver{r}
}
type subscriptionResolver struct{ *Resolver }
// CreateChat is the resolver for the createChat field.
func (r *mutationResolver) CreateChat(ctx context.Context, user1Id int, user2Id int) (*domain.Chat, error) {
// Просто вызываем gRPC метод, вся логика уже там
res, err := r.MessageClient.CreateChat(ctx, &proto.CreateChatRequest{
User1Id: int32(user1Id),
User2Id: int32(user2Id),
})
if err != nil {
return nil, fmt.Errorf("failed to create chat: %w", err)
}
// Преобразуем proto-чат в domain-модель
return protoChatToDomain(res.Chat), nil
}
// SendMessage реализация мутации для отправки сообщения
func (r *mutationResolver) SendMessage(ctx context.Context, chatID int, content string) (*domain.Message, error) {
// Получаем senderID из контекста
senderID, err := getUserIDFromContext(ctx)
if err != nil {
return nil, fmt.Errorf("authentication required: %w", err)
}
res, err := r.MessageClient.SendMessage(ctx, &proto.SendMessageRequest{
ChatId: int32(chatID),
SenderId: int32(senderID), // Теперь senderId берется из контекста
Content: content,
})
if err != nil {
return nil, fmt.Errorf("failed to send message: %w", err)
}
return protoMessageToDomain(res.Message), nil
}
// UpdateMessageStatus реализация мутации для обновления статуса сообщения
func (r *mutationResolver) UpdateMessageStatus(ctx context.Context, messageID int, status MessageStatus) (*domain.Message, error) {
var statusStr string
switch status {
case MessageStatusSent:
statusStr = "SENT"
case MessageStatusDelivered:
statusStr = "DELIVERED"
case MessageStatusRead:
statusStr = "READ"
default:
return nil, fmt.Errorf("unknown message status: %v", status)
}
res, err := r.MessageClient.UpdateMessageStatus(ctx, &proto.UpdateMessageStatusRequest{
MessageId: int32(messageID),
Status: statusStr,
})
if err != nil {
return nil, fmt.Errorf("failed to update message status: %w", err)
}
return protoMessageToDomain(res.Message), nil
}
// GetChat реализация запроса для получения чата
func (r *queryResolver) GetChat(ctx context.Context, user1Id int, user2Id int) (*domain.Chat, error) {
res, err := r.MessageClient.GetChat(ctx, &proto.GetChatRequest{
User1Id: int32(user1Id),
User2Id: int32(user2Id),
})
if err != nil {
return nil, fmt.Errorf("failed to get chat: %w", err)
}
return protoChatToDomain(res.Chat), nil
}
// GetChatMessages реализация запроса для получения сообщений чата
func (r *queryResolver) GetChatMessages(ctx context.Context, chatID int, limit int, offset int) ([]*domain.Message, error) {
res, err := r.MessageClient.GetChatMessages(ctx, &proto.GetChatMessagesRequest{
ChatId: int32(chatID),
Limit: int32(limit),
Offset: int32(offset),
})
if err != nil {
return nil, fmt.Errorf("failed to get chat messages: %w", err)
}
var messages []*domain.Message
for _, msg := range res.Messages {
messages = append(messages, protoMessageToDomain(msg))
}
return messages, nil
}
// GetUserChats реализация запроса для получения чатов пользователя
func (r *queryResolver) GetUserChats(ctx context.Context, userID int) ([]*domain.Chat, error) {
res, err := r.MessageClient.GetUserChats(ctx, &proto.GetUserChatsRequest{
UserId: int32(userID),
})
if err != nil {
return nil, fmt.Errorf("failed to get user chats: %w", err)
}
var chats []*domain.Chat
for _, chat := range res.Chats {
chats = append(chats, protoChatToDomain(chat))
}
return chats, nil
}
// MessageStream реализация подписки на новые сообщения
func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<-chan *domain.Message, error) {
messageChan := make(chan *domain.Message, 100)
go func() {
defer close(messageChan)
defer log.Println("MessageStream: goroutine stopped")
for {
select {
case <-ctx.Done():
log.Printf("MessageStream: context canceled (reason: %v)", ctx.Err())
return
default:
err := r.runMessageStream(ctx, userID, messageChan)
if err != nil {
log.Printf("MessageStream error: %v, reconnecting...", err)
}
time.Sleep(2 * time.Second) // Задержка перед переподключением
}
}
}()
return messageChan, nil
}
func (r *subscriptionResolver) runMessageStream(ctx context.Context, userID int, messageChan chan<- *domain.Message) error {
log.Printf("Starting new stream for user %d", userID)
_, err := r.MessageClient.UpdateMessageStatus(ctx, &proto.UpdateMessageStatusRequest{
MessageId: 0, // 0 = все сообщения для пользователя
Status: "DELIVERED",
UserId: int32(userID),
})
if err != nil {
log.Printf("Failed to mark messages as delivered: %v", err)
}
streamCtx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := r.MessageClient.StreamMessages(streamCtx, &proto.StreamMessagesRequest{
UserId: int32(userID),
})
if err != nil {
return fmt.Errorf("failed to create stream: %w", err)
}
heartbeat := time.NewTicker(25 * time.Second)
defer heartbeat.Stop()
for {
select {
case <-heartbeat.C:
// Отправляем ping для поддержания соединения
if err := stream.Context().Err(); err != nil {
return fmt.Errorf("connection lost: %w", err)
}
case <-ctx.Done():
return nil
default:
msg, err := stream.Recv()
if err != nil {
return fmt.Errorf("receive error: %w", err)
}
if msg.GetMessage() != nil {
select {
case messageChan <- protoMessageToDomain(msg.Message):
log.Printf("Delivered message %d to user %d", msg.Message.Id, userID)
case <-ctx.Done():
return nil
}
}
}
}
}
// Преобразование proto-сообщения в domain-модель
func protoMessageToDomain(msg *proto.Message) *domain.Message {
return &domain.Message{
ID: int(msg.Id),
ChatID: int(msg.ChatId),
SenderID: int(msg.SenderId),
ReceiverID: int(msg.ReceiverId),
Content: msg.Content,
Status: msg.Status,
CreatedAt: msg.CreatedAt.AsTime(),
}
}
// Преобразование proto-чата в domain-модель
func protoChatToDomain(chat *proto.Chat) *domain.Chat {
gqlChat := &domain.Chat{
ID: int(chat.Id),
User1ID: int(chat.User1Id),
User2ID: int(chat.User2Id),
CreatedAt: chat.CreatedAt.AsTime(),
UpdatedAt: chat.UpdatedAt.AsTime(),
}
if chat.LastMessage != nil {
gqlChat.LastMessage = protoMessageToDomain(chat.LastMessage)
}
return gqlChat
}
// Реализации резолверов для отдельных полей
type chatResolver struct{ *Resolver }
type messageResolver struct{ *Resolver }
// Chat возвращает ChatResolver
func (r *Resolver) Chat() ChatResolver { return &chatResolver{r} }
// Message возвращает MessageResolver
func (r *Resolver) Message() MessageResolver { return &messageResolver{r} }
// CreatedAt для Chat
func (r *chatResolver) CreatedAt(ctx context.Context, obj *domain.Chat) (string, error) {
return obj.CreatedAt.Format(time.RFC3339), nil
}
// UpdatedAt для Chat
func (r *chatResolver) UpdatedAt(ctx context.Context, obj *domain.Chat) (string, error) {
return obj.UpdatedAt.Format(time.RFC3339), nil
}
// Status для Message
func (r *messageResolver) Status(ctx context.Context, obj *domain.Message) (MessageStatus, error) {
return MessageStatus(obj.Status), nil
}
// CreatedAt для Message
func (r *messageResolver) CreatedAt(ctx context.Context, obj *domain.Message) (string, error) {
return obj.CreatedAt.Format(time.RFC3339), nil
}