246 lines
7.5 KiB
Go
246 lines
7.5 KiB
Go
package graph
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"log"
|
||
"tailly_back_v2/internal/domain"
|
||
"tailly_back_v2/proto"
|
||
"time"
|
||
)
|
||
|
||
// Subscription возвращает реализацию SubscriptionResolver
|
||
func (r *Resolver) Subscription() SubscriptionResolver {
|
||
return &subscriptionResolver{r}
|
||
}
|
||
|
||
type subscriptionResolver struct{ *Resolver }
|
||
|
||
// CreateChat is the resolver for the createChat field.
|
||
func (r *mutationResolver) CreateChat(ctx context.Context, user1Id int, user2Id int) (*domain.Chat, error) {
|
||
// Просто вызываем gRPC метод, вся логика уже там
|
||
res, err := r.MessageClient.CreateChat(ctx, &proto.CreateChatRequest{
|
||
User1Id: int32(user1Id),
|
||
User2Id: int32(user2Id),
|
||
})
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to create chat: %w", err)
|
||
}
|
||
|
||
// Преобразуем proto-чат в domain-модель
|
||
return protoChatToDomain(res.Chat), nil
|
||
}
|
||
|
||
// SendMessage реализация мутации для отправки сообщения
|
||
func (r *mutationResolver) SendMessage(ctx context.Context, chatID int, content string) (*domain.Message, error) {
|
||
// Получаем senderID из контекста
|
||
senderID, err := getUserIDFromContext(ctx)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("authentication required: %w", err)
|
||
}
|
||
|
||
res, err := r.MessageClient.SendMessage(ctx, &proto.SendMessageRequest{
|
||
ChatId: int32(chatID),
|
||
SenderId: int32(senderID), // Теперь senderId берется из контекста
|
||
Content: content,
|
||
})
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to send message: %w", err)
|
||
}
|
||
return protoMessageToDomain(res.Message), nil
|
||
}
|
||
|
||
// UpdateMessageStatus реализация мутации для обновления статуса сообщения
|
||
func (r *mutationResolver) UpdateMessageStatus(ctx context.Context, messageID int, status MessageStatus) (*domain.Message, error) {
|
||
var statusStr string
|
||
switch status {
|
||
case MessageStatusSent:
|
||
statusStr = "SENT"
|
||
case MessageStatusDelivered:
|
||
statusStr = "DELIVERED"
|
||
case MessageStatusRead:
|
||
statusStr = "READ"
|
||
default:
|
||
return nil, fmt.Errorf("unknown message status: %v", status)
|
||
}
|
||
|
||
res, err := r.MessageClient.UpdateMessageStatus(ctx, &proto.UpdateMessageStatusRequest{
|
||
MessageId: int32(messageID),
|
||
Status: statusStr,
|
||
})
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to update message status: %w", err)
|
||
}
|
||
|
||
return protoMessageToDomain(res.Message), nil
|
||
}
|
||
|
||
// GetChat реализация запроса для получения чата
|
||
func (r *queryResolver) GetChat(ctx context.Context, user1Id int, user2Id int) (*domain.Chat, error) {
|
||
res, err := r.MessageClient.GetChat(ctx, &proto.GetChatRequest{
|
||
User1Id: int32(user1Id),
|
||
User2Id: int32(user2Id),
|
||
})
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to get chat: %w", err)
|
||
}
|
||
return protoChatToDomain(res.Chat), nil
|
||
}
|
||
|
||
// GetChatMessages реализация запроса для получения сообщений чата
|
||
func (r *queryResolver) GetChatMessages(ctx context.Context, chatID int, limit int, offset int) ([]*domain.Message, error) {
|
||
res, err := r.MessageClient.GetChatMessages(ctx, &proto.GetChatMessagesRequest{
|
||
ChatId: int32(chatID),
|
||
Limit: int32(limit),
|
||
Offset: int32(offset),
|
||
})
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to get chat messages: %w", err)
|
||
}
|
||
var messages []*domain.Message
|
||
for _, msg := range res.Messages {
|
||
messages = append(messages, protoMessageToDomain(msg))
|
||
}
|
||
return messages, nil
|
||
}
|
||
|
||
// GetUserChats реализация запроса для получения чатов пользователя
|
||
func (r *queryResolver) GetUserChats(ctx context.Context, userID int) ([]*domain.Chat, error) {
|
||
res, err := r.MessageClient.GetUserChats(ctx, &proto.GetUserChatsRequest{
|
||
UserId: int32(userID),
|
||
})
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to get user chats: %w", err)
|
||
}
|
||
var chats []*domain.Chat
|
||
for _, chat := range res.Chats {
|
||
chats = append(chats, protoChatToDomain(chat))
|
||
}
|
||
return chats, nil
|
||
}
|
||
|
||
// MessageStream реализация подписки на новые сообщения
|
||
func (r *subscriptionResolver) MessageStream(ctx context.Context, userID int) (<-chan *domain.Message, error) {
|
||
if userID == 0 {
|
||
return nil, fmt.Errorf("user not authenticated")
|
||
}
|
||
|
||
messageChan := make(chan *domain.Message, 10) // Буферизированный канал
|
||
go func() {
|
||
defer func() {
|
||
close(messageChan)
|
||
if r := recover(); r != nil {
|
||
log.Printf("Recovered from panic in MessageStream: %v", r)
|
||
}
|
||
}()
|
||
|
||
// Создаем новый контекст с таймаутом для переподключения
|
||
retryCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
||
defer cancel()
|
||
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
default:
|
||
stream, err := r.MessageClient.StreamMessages(retryCtx, &proto.StreamMessagesRequest{
|
||
UserId: int32(userID),
|
||
})
|
||
if err != nil {
|
||
log.Printf("Failed to create stream: %v", err)
|
||
time.Sleep(1 * time.Second)
|
||
continue
|
||
}
|
||
|
||
for {
|
||
msg, err := stream.Recv()
|
||
if err != nil {
|
||
log.Printf("Stream receive error: %v", err)
|
||
break // Выходим из внутреннего цикла для переподключения
|
||
}
|
||
|
||
// Полная проверка сообщения
|
||
if msg == nil || msg.Message == nil || msg.Message.Id == 0 {
|
||
continue
|
||
}
|
||
|
||
domainMsg := protoMessageToDomain(msg.Message)
|
||
if domainMsg.ID == 0 || domainMsg.Content == "" {
|
||
continue
|
||
}
|
||
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
case messageChan <- domainMsg:
|
||
// Успешно отправили сообщение
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}()
|
||
|
||
return messageChan, nil
|
||
}
|
||
|
||
// Преобразование proto-сообщения в domain-модель
|
||
func protoMessageToDomain(msg *proto.Message) *domain.Message {
|
||
return &domain.Message{
|
||
ID: int(msg.Id),
|
||
ChatID: int(msg.ChatId),
|
||
SenderID: int(msg.SenderId),
|
||
ReceiverID: int(msg.ReceiverId),
|
||
Content: msg.Content,
|
||
Status: msg.Status,
|
||
CreatedAt: msg.CreatedAt.AsTime(),
|
||
}
|
||
}
|
||
|
||
// Преобразование proto-чата в domain-модель
|
||
func protoChatToDomain(chat *proto.Chat) *domain.Chat {
|
||
gqlChat := &domain.Chat{
|
||
ID: int(chat.Id),
|
||
User1ID: int(chat.User1Id),
|
||
User2ID: int(chat.User2Id),
|
||
CreatedAt: chat.CreatedAt.AsTime(),
|
||
UpdatedAt: chat.UpdatedAt.AsTime(),
|
||
}
|
||
|
||
if chat.LastMessage != nil {
|
||
gqlChat.LastMessage = protoMessageToDomain(chat.LastMessage)
|
||
}
|
||
|
||
return gqlChat
|
||
}
|
||
|
||
// Реализации резолверов для отдельных полей
|
||
|
||
type chatResolver struct{ *Resolver }
|
||
type messageResolver struct{ *Resolver }
|
||
|
||
// Chat возвращает ChatResolver
|
||
func (r *Resolver) Chat() ChatResolver { return &chatResolver{r} }
|
||
|
||
// Message возвращает MessageResolver
|
||
func (r *Resolver) Message() MessageResolver { return &messageResolver{r} }
|
||
|
||
// CreatedAt для Chat
|
||
func (r *chatResolver) CreatedAt(ctx context.Context, obj *domain.Chat) (string, error) {
|
||
return obj.CreatedAt.Format(time.RFC3339), nil
|
||
}
|
||
|
||
// UpdatedAt для Chat
|
||
func (r *chatResolver) UpdatedAt(ctx context.Context, obj *domain.Chat) (string, error) {
|
||
return obj.UpdatedAt.Format(time.RFC3339), nil
|
||
}
|
||
|
||
// Status для Message
|
||
func (r *messageResolver) Status(ctx context.Context, obj *domain.Message) (MessageStatus, error) {
|
||
return MessageStatus(obj.Status), nil
|
||
}
|
||
|
||
// CreatedAt для Message
|
||
func (r *messageResolver) CreatedAt(ctx context.Context, obj *domain.Message) (string, error) {
|
||
return obj.CreatedAt.Format(time.RFC3339), nil
|
||
}
|