This commit is contained in:
parent
3ea3ad8e57
commit
57a3f6ca7b
@ -13,9 +13,9 @@ steps:
|
||||
- rm -fr /home/tailly_messages
|
||||
- mkdir /home/tailly_messages
|
||||
- cd /home/tailly_messages
|
||||
- git clone https://admin:103e837d56b8d0bb029e9b5e03a20f568611c610@git.altomta.ru/admin/tailly_messages . || true
|
||||
- git clone https://admin:2bfa8b81e8787c9c0bb89e1a7bbd929b2d63aaf2@git.altomta.ru/admin/tailly_messages . || true
|
||||
- git pull
|
||||
- docker stop tailly_messages || true
|
||||
- docker rm tailly_messages || true
|
||||
- DOCKER_BUILDKIT=1 docker build -t tailly_messages .
|
||||
- docker run -d --name tailly_messages --network tailly_net -p 50052:50052 tailly_messages
|
||||
- docker run -d --name tailly_messages --network tailly_net -p 50051:50051 tailly_messages
|
||||
|
||||
@ -37,7 +37,7 @@ COPY --from=builder /app/server /usr/local/bin/server
|
||||
|
||||
# Настройки среды
|
||||
ENV GIN_MODE=release \
|
||||
PORT=50052
|
||||
PORT=50051
|
||||
|
||||
# Открываем порт
|
||||
EXPOSE $PORT
|
||||
|
||||
77
server.go
77
server.go
@ -3,6 +3,7 @@ package main
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/jackc/pgx/v4/pgxpool"
|
||||
"github.com/segmentio/kafka-go"
|
||||
@ -137,18 +138,46 @@ func (s *server) SendMessage(ctx context.Context, req *proto.SendMessageRequest)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Создаем MessageResponse для Kafka
|
||||
msgResponse := &proto.MessageResponse{Message: &message}
|
||||
|
||||
// Сериализуем сообщение в JSON для Kafka
|
||||
msgBytes, err := json.Marshal(protoMessageToMap(msgResponse.Message))
|
||||
if err != nil {
|
||||
log.Printf("Failed to marshal message: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Publish to Kafka for real-time delivery
|
||||
err = s.producer.WriteMessages(ctx, kafka.Message{
|
||||
Key: []byte(string(receiverId)), // Отправляем сообщение получателю
|
||||
Value: []byte(message.String()),
|
||||
Key: []byte(fmt.Sprintf("%d", receiverId)),
|
||||
Value: msgBytes,
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("Failed to publish message to Kafka: %v", err)
|
||||
}
|
||||
|
||||
return &proto.MessageResponse{Message: &message}, nil
|
||||
return msgResponse, nil
|
||||
}
|
||||
|
||||
func mustMarshal(msg protobuf.Message) []byte {
|
||||
data, err := protobuf.Marshal(msg)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to marshal message: %v", err)
|
||||
}
|
||||
return data
|
||||
}
|
||||
func protoMessageToMap(msg *proto.Message) map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"id": msg.Id,
|
||||
"chatId": msg.ChatId,
|
||||
"senderId": msg.SenderId,
|
||||
"receiverId": msg.ReceiverId,
|
||||
"content": msg.Content,
|
||||
"status": msg.Status,
|
||||
"createdAt": msg.CreatedAt.AsTime().Format(time.RFC3339Nano),
|
||||
}
|
||||
}
|
||||
func (s *server) GetChat(ctx context.Context, req *proto.GetChatRequest) (*proto.ChatResponse, error) {
|
||||
var chat proto.Chat
|
||||
var createdAt, updatedAt time.Time
|
||||
@ -300,31 +329,47 @@ func (s *server) UpdateMessageStatus(ctx context.Context, req *proto.UpdateMessa
|
||||
}
|
||||
|
||||
func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error {
|
||||
// Create Kafka reader for this user's messages
|
||||
log.Printf("Starting message stream for user %d", req.UserId)
|
||||
|
||||
reader := kafka.NewReader(kafka.ReaderConfig{
|
||||
Brokers: []string{"localhost:9092"},
|
||||
Topic: "user_messages",
|
||||
Partition: 0,
|
||||
MinBytes: 10e3, // 10KB
|
||||
MaxBytes: 10e6, // 10MB
|
||||
Brokers: []string{"89.104.69.222:9092"},
|
||||
Topic: "user_messages",
|
||||
GroupID: fmt.Sprintf("user-%d", req.UserId),
|
||||
MinBytes: 10e3,
|
||||
MaxBytes: 10e6,
|
||||
})
|
||||
defer reader.Close()
|
||||
|
||||
for {
|
||||
m, err := reader.ReadMessage(stream.Context())
|
||||
if err != nil {
|
||||
log.Printf("Stream error for user %d: %v", req.UserId, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Check if message is for this user
|
||||
if string(m.Key) == string(req.UserId) {
|
||||
var message proto.Message
|
||||
if err := protobuf.Unmarshal(m.Value, &message); err != nil {
|
||||
log.Printf("Failed to unmarshal message: %v", err)
|
||||
log.Printf("Received Kafka message: %s", string(m.Value))
|
||||
|
||||
if string(m.Key) == fmt.Sprintf("%d", req.UserId) {
|
||||
var msgData map[string]interface{}
|
||||
if err := json.Unmarshal(m.Value, &msgData); err != nil {
|
||||
log.Printf("Failed to unmarshal Kafka message: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if err := stream.Send(&proto.MessageResponse{Message: &message}); err != nil {
|
||||
// Преобразуем обратно в proto.Message
|
||||
createdAt, _ := time.Parse(time.RFC3339Nano, msgData["createdAt"].(string))
|
||||
msg := &proto.Message{
|
||||
Id: msgData["id"].(int32),
|
||||
ChatId: msgData["chatId"].(int32),
|
||||
SenderId: msgData["senderId"].(int32),
|
||||
ReceiverId: msgData["receiverId"].(int32),
|
||||
Content: msgData["content"].(string),
|
||||
Status: msgData["status"].(string),
|
||||
CreatedAt: timestamppb.New(createdAt),
|
||||
}
|
||||
|
||||
log.Printf("Sending message to user %d: %+v", req.UserId, msg)
|
||||
if err := stream.Send(&proto.MessageResponse{Message: msg}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -333,7 +378,7 @@ func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.M
|
||||
|
||||
func main() {
|
||||
// Initialize database connection
|
||||
pool, err := pgxpool.Connect(context.Background(), "postgres://tailly_v2:i0Oq%2675LA%26M612ceuy@79.174.89.104:15452/tailly_v2")
|
||||
pool, err := pgxpool.Connect(context.Background(), "postgres://tailly_v2:BBP%263XP956%26D8y6cYJ@79.174.89.104:15452/tailly_v2")
|
||||
if err != nil {
|
||||
log.Fatalf("Unable to connect to database: %v", err)
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user