v0.0.2.1 Текущий почти рабочий вариант messages
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
parent
595f5a333e
commit
f3ab1f4bf6
@ -12,6 +12,7 @@ CREATE TABLE messages (
|
||||
id SERIAL PRIMARY KEY,
|
||||
chat_id INTEGER NOT NULL REFERENCES chats(id) ON DELETE CASCADE,
|
||||
sender_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
||||
receiver_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
||||
content TEXT NOT NULL,
|
||||
status VARCHAR(20) NOT NULL DEFAULT 'sent' CHECK (status IN ('sent', 'delivered', 'read')),
|
||||
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
|
||||
|
||||
@ -391,9 +391,10 @@ type Message struct {
|
||||
Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
|
||||
ChatId int32 `protobuf:"varint,2,opt,name=chat_id,json=chatId,proto3" json:"chat_id,omitempty"`
|
||||
SenderId int32 `protobuf:"varint,3,opt,name=sender_id,json=senderId,proto3" json:"sender_id,omitempty"`
|
||||
Content string `protobuf:"bytes,4,opt,name=content,proto3" json:"content,omitempty"`
|
||||
Status string `protobuf:"bytes,5,opt,name=status,proto3" json:"status,omitempty"`
|
||||
CreatedAt *timestamppb.Timestamp `protobuf:"bytes,6,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"`
|
||||
ReceiverId int32 `protobuf:"varint,4,opt,name=receiver_id,json=receiverId,proto3" json:"receiver_id,omitempty"`
|
||||
Content string `protobuf:"bytes,5,opt,name=content,proto3" json:"content,omitempty"`
|
||||
Status string `protobuf:"bytes,6,opt,name=status,proto3" json:"status,omitempty"`
|
||||
CreatedAt *timestamppb.Timestamp `protobuf:"bytes,7,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"`
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
@ -449,6 +450,13 @@ func (x *Message) GetSenderId() int32 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *Message) GetReceiverId() int32 {
|
||||
if x != nil {
|
||||
return x.ReceiverId
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *Message) GetContent() string {
|
||||
if x != nil {
|
||||
return x.Content
|
||||
@ -756,15 +764,17 @@ const file_messages_proto_rawDesc = "" +
|
||||
"message_id\x18\x01 \x01(\x05R\tmessageId\x12\x16\n" +
|
||||
"\x06status\x18\x02 \x01(\tR\x06status\"0\n" +
|
||||
"\x15StreamMessagesRequest\x12\x17\n" +
|
||||
"\auser_id\x18\x01 \x01(\x05R\x06userId\"\xbc\x01\n" +
|
||||
"\auser_id\x18\x01 \x01(\x05R\x06userId\"\xdd\x01\n" +
|
||||
"\aMessage\x12\x0e\n" +
|
||||
"\x02id\x18\x01 \x01(\x05R\x02id\x12\x17\n" +
|
||||
"\achat_id\x18\x02 \x01(\x05R\x06chatId\x12\x1b\n" +
|
||||
"\tsender_id\x18\x03 \x01(\x05R\bsenderId\x12\x18\n" +
|
||||
"\acontent\x18\x04 \x01(\tR\acontent\x12\x16\n" +
|
||||
"\x06status\x18\x05 \x01(\tR\x06status\x129\n" +
|
||||
"\tsender_id\x18\x03 \x01(\x05R\bsenderId\x12\x1f\n" +
|
||||
"\vreceiver_id\x18\x04 \x01(\x05R\n" +
|
||||
"receiverId\x12\x18\n" +
|
||||
"\acontent\x18\x05 \x01(\tR\acontent\x12\x16\n" +
|
||||
"\x06status\x18\x06 \x01(\tR\x06status\x129\n" +
|
||||
"\n" +
|
||||
"created_at\x18\x06 \x01(\v2\x1a.google.protobuf.TimestampR\tcreatedAt\"\xf5\x01\n" +
|
||||
"created_at\x18\a \x01(\v2\x1a.google.protobuf.TimestampR\tcreatedAt\"\xf5\x01\n" +
|
||||
"\x04Chat\x12\x0e\n" +
|
||||
"\x02id\x18\x01 \x01(\x05R\x02id\x12\x19\n" +
|
||||
"\buser1_id\x18\x02 \x01(\x05R\auser1Id\x12\x19\n" +
|
||||
|
||||
@ -55,9 +55,10 @@ message Message {
|
||||
int32 id = 1;
|
||||
int32 chat_id = 2;
|
||||
int32 sender_id = 3;
|
||||
string content = 4;
|
||||
string status = 5;
|
||||
google.protobuf.Timestamp created_at = 6;
|
||||
int32 receiver_id = 4;
|
||||
string content = 5;
|
||||
string status = 6;
|
||||
google.protobuf.Timestamp created_at = 7;
|
||||
}
|
||||
|
||||
message Chat {
|
||||
|
||||
40
server.go
40
server.go
@ -96,30 +96,32 @@ func (s *server) CreateChat(ctx context.Context, req *proto.CreateChatRequest) (
|
||||
}
|
||||
|
||||
func (s *server) SendMessage(ctx context.Context, req *proto.SendMessageRequest) (*proto.MessageResponse, error) {
|
||||
|
||||
// Проверяем, что отправитель является участником чата
|
||||
var isParticipant bool
|
||||
err := s.db.QueryRow(ctx, `
|
||||
SELECT EXISTS(
|
||||
SELECT 1 FROM chats
|
||||
WHERE id = $1 AND (user1_id = $2 OR user2_id = $2)
|
||||
)`, req.ChatId, req.SenderId).Scan(&isParticipant)
|
||||
// Получаем информацию о чате, чтобы определить получателя
|
||||
var user1Id, user2Id int32
|
||||
err := s.db.QueryRow(ctx, "SELECT user1_id, user2_id FROM chats WHERE id = $1", req.ChatId).Scan(&user1Id, &user2Id)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to check chat participation: %v", err)
|
||||
return nil, fmt.Errorf("failed to get chat info: %v", err)
|
||||
}
|
||||
if !isParticipant {
|
||||
return nil, fmt.Errorf("user %d is not a participant of chat %d", req.SenderId, req.ChatId)
|
||||
|
||||
// Определяем получателя
|
||||
var receiverId int32
|
||||
if req.SenderId == user1Id {
|
||||
receiverId = user2Id
|
||||
} else if req.SenderId == user2Id {
|
||||
receiverId = user1Id
|
||||
} else {
|
||||
return nil, fmt.Errorf("sender %d is not a participant of chat %d", req.SenderId, req.ChatId)
|
||||
}
|
||||
|
||||
var message proto.Message
|
||||
var createdAt time.Time
|
||||
|
||||
err = s.db.QueryRow(ctx, `
|
||||
INSERT INTO messages (chat_id, sender_id, content)
|
||||
VALUES ($1, $2, $3)
|
||||
RETURNING id, chat_id, sender_id, content, status, created_at
|
||||
`, req.ChatId, req.SenderId, req.Content).Scan(
|
||||
&message.Id, &message.ChatId, &message.SenderId, &message.Content, &message.Status, &createdAt,
|
||||
INSERT INTO messages (chat_id, sender_id, receiver_id, content)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
RETURNING id, chat_id, sender_id, receiver_id, content, status, created_at
|
||||
`, req.ChatId, req.SenderId, receiverId, req.Content).Scan(
|
||||
&message.Id, &message.ChatId, &message.SenderId, &message.ReceiverId, &message.Content, &message.Status, &createdAt,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -129,15 +131,15 @@ func (s *server) SendMessage(ctx context.Context, req *proto.SendMessageRequest)
|
||||
|
||||
// Update chat's updated_at
|
||||
_, err = s.db.Exec(ctx, `
|
||||
UPDATE chats SET updated_at = NOW() WHERE id = $1
|
||||
`, req.ChatId)
|
||||
UPDATE chats SET updated_at = NOW() WHERE id = $1
|
||||
`, req.ChatId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Publish to Kafka for real-time delivery
|
||||
err = s.producer.WriteMessages(ctx, kafka.Message{
|
||||
Key: []byte(string(req.ChatId)),
|
||||
Key: []byte(string(receiverId)), // Отправляем сообщение получателю
|
||||
Value: []byte(message.String()),
|
||||
})
|
||||
if err != nil {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user