откат до версии 0.2.1
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
madipo2611 2025-08-14 16:22:00 +03:00
parent d9cc22d3f4
commit a4e2c42647

View File

@ -3,7 +3,6 @@ package main
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/segmentio/kafka-go"
@ -138,46 +137,18 @@ func (s *server) SendMessage(ctx context.Context, req *proto.SendMessageRequest)
return nil, err
}
// Создаем MessageResponse для Kafka
msgResponse := &proto.MessageResponse{Message: &message}
// Сериализуем сообщение в JSON для Kafka
msgBytes, err := json.Marshal(protoMessageToMap(msgResponse.Message))
if err != nil {
log.Printf("Failed to marshal message: %v", err)
return nil, err
}
// Publish to Kafka for real-time delivery
err = s.producer.WriteMessages(ctx, kafka.Message{
Key: []byte(fmt.Sprintf("%d", receiverId)),
Value: msgBytes,
Key: []byte(string(receiverId)), // Отправляем сообщение получателю
Value: []byte(message.String()),
})
if err != nil {
log.Printf("Failed to publish message to Kafka: %v", err)
}
return msgResponse, nil
return &proto.MessageResponse{Message: &message}, nil
}
func mustMarshal(msg protobuf.Message) []byte {
data, err := protobuf.Marshal(msg)
if err != nil {
log.Fatalf("failed to marshal message: %v", err)
}
return data
}
func protoMessageToMap(msg *proto.Message) map[string]interface{} {
return map[string]interface{}{
"id": msg.Id,
"chatId": msg.ChatId,
"senderId": msg.SenderId,
"receiverId": msg.ReceiverId,
"content": msg.Content,
"status": msg.Status,
"createdAt": msg.CreatedAt.AsTime().Format(time.RFC3339Nano),
}
}
func (s *server) GetChat(ctx context.Context, req *proto.GetChatRequest) (*proto.ChatResponse, error) {
var chat proto.Chat
var createdAt, updatedAt time.Time
@ -329,47 +300,31 @@ func (s *server) UpdateMessageStatus(ctx context.Context, req *proto.UpdateMessa
}
func (s *server) StreamMessages(req *proto.StreamMessagesRequest, stream proto.MessageService_StreamMessagesServer) error {
log.Printf("Starting message stream for user %d", req.UserId)
// Create Kafka reader for this user's messages
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"kafka:9092"},
Topic: "user_messages",
GroupID: fmt.Sprintf("user-%d", req.UserId),
MinBytes: 10e3,
MaxBytes: 10e6,
Brokers: []string{"localhost:9092"},
Topic: "user_messages",
Partition: 0,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
defer reader.Close()
for {
m, err := reader.ReadMessage(stream.Context())
if err != nil {
log.Printf("Stream error for user %d: %v", req.UserId, err)
return err
}
log.Printf("Received Kafka message: %s", string(m.Value))
if string(m.Key) == fmt.Sprintf("%d", req.UserId) {
var msgData map[string]interface{}
if err := json.Unmarshal(m.Value, &msgData); err != nil {
log.Printf("Failed to unmarshal Kafka message: %v", err)
// Check if message is for this user
if string(m.Key) == string(req.UserId) {
var message proto.Message
if err := protobuf.Unmarshal(m.Value, &message); err != nil {
log.Printf("Failed to unmarshal message: %v", err)
continue
}
// Преобразуем обратно в proto.Message
createdAt, _ := time.Parse(time.RFC3339Nano, msgData["createdAt"].(string))
msg := &proto.Message{
Id: msgData["id"].(int32),
ChatId: msgData["chatId"].(int32),
SenderId: msgData["senderId"].(int32),
ReceiverId: msgData["receiverId"].(int32),
Content: msgData["content"].(string),
Status: msgData["status"].(string),
CreatedAt: timestamppb.New(createdAt),
}
log.Printf("Sending message to user %d: %+v", req.UserId, msg)
if err := stream.Send(&proto.MessageResponse{Message: msg}); err != nil {
if err := stream.Send(&proto.MessageResponse{Message: &message}); err != nil {
return err
}
}
@ -386,7 +341,7 @@ func main() {
// Initialize Kafka producer
producer := &kafka.Writer{
Addr: kafka.TCP("kafka:9092"),
Addr: kafka.TCP("89.104.69.222:9092"),
Topic: "user_messages",
Balancer: &kafka.Hash{},
}