202 lines
5.3 KiB
Go
202 lines
5.3 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"tailly_messages/proto"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v4/pgxpool"
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/test/bufconn"
|
|
)
|
|
|
|
const bufSize = 1024 * 1024
|
|
|
|
func setupTestServer(t *testing.T) (*grpc.Server, *bufconn.Listener, *server) {
|
|
// Setup in-memory connection
|
|
lis := bufconn.Listen(bufSize)
|
|
s := grpc.NewServer()
|
|
|
|
// Используем реальные подключения из server.go
|
|
dbURL := "postgres://tailly_v2:i0Oq%2675LA%26M612ceuy@79.174.89.104:15452/tailly_v2"
|
|
pool, err := pgxpool.Connect(context.Background(), dbURL)
|
|
require.NoError(t, err, "Failed to connect to test database")
|
|
|
|
rabbitURL := "amqp://tailly_rabbitmq:o2p2S80MPbl27LUU@89.104.69.222:5673/"
|
|
rabbitConn, err := amqp.DialConfig(rabbitURL, amqp.Config{
|
|
Heartbeat: 10 * time.Second,
|
|
Locale: "en_US",
|
|
})
|
|
require.NoError(t, err, "Failed to connect to RabbitMQ")
|
|
|
|
testServer := NewServer(pool, rabbitConn)
|
|
proto.RegisterMessageServiceServer(s, testServer)
|
|
|
|
go func() {
|
|
if err := s.Serve(lis); err != nil {
|
|
t.Logf("Server exited with error: %v", err)
|
|
}
|
|
}()
|
|
|
|
// Очищаем только тестовые данные после завершения тестов
|
|
t.Cleanup(func() {
|
|
// Удаляем только те данные, которые создали в тестах
|
|
_, _ = pool.Exec(context.Background(), "DELETE FROM messages WHERE content = 'test message'")
|
|
_, _ = pool.Exec(context.Background(), "DELETE FROM chats WHERE user1_id = 1 AND user2_id = 2")
|
|
})
|
|
|
|
return s, lis, testServer
|
|
}
|
|
|
|
func TestCreateChat(t *testing.T) {
|
|
s, lis, _ := setupTestServer(t)
|
|
defer s.Stop()
|
|
|
|
ctx := context.Background()
|
|
conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
|
|
return lis.Dial()
|
|
}), grpc.WithInsecure())
|
|
require.NoError(t, err)
|
|
defer conn.Close()
|
|
|
|
client := proto.NewMessageServiceClient(conn)
|
|
|
|
req := &proto.CreateChatRequest{
|
|
User1Id: 1,
|
|
User2Id: 2,
|
|
}
|
|
|
|
resp, err := client.CreateChat(ctx, req)
|
|
require.NoError(t, err)
|
|
require.NotNil(t, resp)
|
|
require.NotNil(t, resp.GetChat())
|
|
assert.Equal(t, int32(1), resp.GetChat().GetUser1Id())
|
|
assert.Equal(t, int32(2), resp.GetChat().GetUser2Id())
|
|
}
|
|
|
|
func TestSendAndStreamMessages(t *testing.T) {
|
|
s, lis, _ := setupTestServer(t)
|
|
defer s.Stop()
|
|
|
|
ctx := context.Background()
|
|
conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
|
|
return lis.Dial()
|
|
}), grpc.WithInsecure())
|
|
require.NoError(t, err)
|
|
defer conn.Close()
|
|
|
|
client := proto.NewMessageServiceClient(conn)
|
|
|
|
// Сначала создаем чат
|
|
chatResp, err := client.CreateChat(ctx, &proto.CreateChatRequest{
|
|
User1Id: 1,
|
|
User2Id: 2,
|
|
})
|
|
require.NoError(t, err)
|
|
chatId := chatResp.GetChat().GetId()
|
|
|
|
// Запускаем стрим
|
|
streamReq := &proto.StreamMessagesRequest{UserId: 2}
|
|
stream, err := client.StreamMessages(ctx, streamReq)
|
|
require.NoError(t, err)
|
|
|
|
// Отправляем сообщение
|
|
sendReq := &proto.SendMessageRequest{
|
|
ChatId: chatId,
|
|
SenderId: 1,
|
|
Content: "test message",
|
|
}
|
|
_, err = client.SendMessage(ctx, sendReq)
|
|
require.NoError(t, err)
|
|
|
|
// Пытаемся получить сообщение через стрим
|
|
done := make(chan bool)
|
|
go func() {
|
|
resp, err := stream.Recv()
|
|
assert.NoError(t, err)
|
|
if resp.GetMessage() != nil { // Игнорируем heartbeat сообщения
|
|
assert.Equal(t, "test message", resp.GetMessage().GetContent())
|
|
done <- true
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
// Успех
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatal("Timeout waiting for message")
|
|
}
|
|
}
|
|
func TestMultipleMessages(t *testing.T) {
|
|
s, lis, _ := setupTestServer(t)
|
|
defer s.Stop()
|
|
|
|
ctx := context.Background()
|
|
conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
|
|
return lis.Dial()
|
|
}), grpc.WithInsecure())
|
|
require.NoError(t, err)
|
|
defer conn.Close()
|
|
|
|
client := proto.NewMessageServiceClient(conn)
|
|
|
|
// Создаем чат
|
|
chatResp, err := client.CreateChat(ctx, &proto.CreateChatRequest{
|
|
User1Id: 1,
|
|
User2Id: 2,
|
|
})
|
|
require.NoError(t, err)
|
|
chatId := chatResp.GetChat().GetId()
|
|
|
|
// Запускаем стрим
|
|
streamReq := &proto.StreamMessagesRequest{UserId: 2}
|
|
stream, err := client.StreamMessages(ctx, streamReq)
|
|
require.NoError(t, err)
|
|
|
|
// Отправляем 5 сообщений
|
|
for i := 0; i < 5; i++ {
|
|
sendReq := &proto.SendMessageRequest{
|
|
ChatId: chatId,
|
|
SenderId: 1,
|
|
Content: fmt.Sprintf("test message %d", i+1),
|
|
}
|
|
_, err = client.SendMessage(ctx, sendReq)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// Проверяем получение всех сообщений
|
|
received := 0
|
|
done := make(chan bool)
|
|
go func() {
|
|
for {
|
|
resp, err := stream.Recv()
|
|
if err != nil {
|
|
t.Logf("Stream error: %v", err)
|
|
break
|
|
}
|
|
if resp.GetMessage() != nil {
|
|
t.Logf("Received message: %s", resp.GetMessage().GetContent())
|
|
received++
|
|
if received >= 5 {
|
|
done <- true
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
// Успех
|
|
assert.Equal(t, 5, received)
|
|
case <-time.After(10 * time.Second):
|
|
t.Fatal("Timeout waiting for messages")
|
|
}
|
|
}
|