v0.0.21 правки в messageStream и обновление статуса сообщения
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
parent
9eabb910cd
commit
aa839737b6
@ -151,36 +151,49 @@ func (r *subscriptionResolver) runMessageStream(ctx context.Context, userID int,
|
||||
_, err := r.MessageClient.UpdateMessageStatus(ctx, &proto.UpdateMessageStatusRequest{
|
||||
MessageId: 0, // 0 = все сообщения для пользователя
|
||||
Status: "DELIVERED",
|
||||
UserId: int32(userID),
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("Failed to mark messages as delivered: %v", err)
|
||||
}
|
||||
|
||||
stream, err := r.MessageClient.StreamMessages(ctx, &proto.StreamMessagesRequest{
|
||||
streamCtx, cancel := context.WithTimeout(context.Background(), time.Hour)
|
||||
defer cancel()
|
||||
|
||||
stream, err := r.MessageClient.StreamMessages(streamCtx, &proto.StreamMessagesRequest{
|
||||
UserId: int32(userID),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("stream creation failed: %w", err)
|
||||
}
|
||||
defer log.Println("Stream closed")
|
||||
|
||||
// Heartbeat для поддержания соединения
|
||||
heartbeat := time.NewTicker(15 * time.Second)
|
||||
defer heartbeat.Stop()
|
||||
|
||||
for {
|
||||
msg, err := stream.Recv()
|
||||
if err != nil {
|
||||
return fmt.Errorf("stream receive failed: %w", err)
|
||||
}
|
||||
|
||||
if msg == nil || msg.Message == nil {
|
||||
continue // Пропускаем heartbeat
|
||||
}
|
||||
|
||||
log.Printf("Received message: %+v", msg.Message)
|
||||
|
||||
select {
|
||||
case <-heartbeat.C:
|
||||
// Пустой heartbeat для поддержания соединения
|
||||
continue
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case messageChan <- protoMessageToDomain(msg.Message):
|
||||
log.Printf("Message forwarded to channel: %d", msg.Message.Id)
|
||||
default:
|
||||
msg, err := stream.Recv()
|
||||
if err != nil {
|
||||
return fmt.Errorf("stream receive failed: %w", err)
|
||||
}
|
||||
|
||||
if msg.GetMessage() == nil {
|
||||
continue // Пропускаем heartbeat
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case messageChan <- protoMessageToDomain(msg.Message):
|
||||
log.Printf("Message delivered to channel: %d", msg.Message.Id)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -88,7 +88,7 @@ func (s *Server) configureRouter() {
|
||||
|
||||
srv.AddTransport(&transport.Websocket{
|
||||
Upgrader: wsUpgrader,
|
||||
KeepAlivePingInterval: 10 * time.Second,
|
||||
KeepAlivePingInterval: 30 * time.Second,
|
||||
})
|
||||
|
||||
// Раздельные эндпоинты:
|
||||
|
||||
@ -294,6 +294,7 @@ type UpdateMessageStatusRequest struct {
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
MessageId int32 `protobuf:"varint,1,opt,name=message_id,json=messageId,proto3" json:"message_id,omitempty"`
|
||||
Status string `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"`
|
||||
UserId int32 `protobuf:"varint,3,opt,name=userId,proto3" json:"userId,omitempty"`
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
@ -342,6 +343,13 @@ func (x *UpdateMessageStatusRequest) GetStatus() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *UpdateMessageStatusRequest) GetUserId() int32 {
|
||||
if x != nil {
|
||||
return x.UserId
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
type StreamMessagesRequest struct {
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
UserId int32 `protobuf:"varint,1,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"`
|
||||
@ -758,11 +766,12 @@ const file_messages_proto_rawDesc = "" +
|
||||
"\x05limit\x18\x02 \x01(\x05R\x05limit\x12\x16\n" +
|
||||
"\x06offset\x18\x03 \x01(\x05R\x06offset\".\n" +
|
||||
"\x13GetUserChatsRequest\x12\x17\n" +
|
||||
"\auser_id\x18\x01 \x01(\x05R\x06userId\"S\n" +
|
||||
"\auser_id\x18\x01 \x01(\x05R\x06userId\"k\n" +
|
||||
"\x1aUpdateMessageStatusRequest\x12\x1d\n" +
|
||||
"\n" +
|
||||
"message_id\x18\x01 \x01(\x05R\tmessageId\x12\x16\n" +
|
||||
"\x06status\x18\x02 \x01(\tR\x06status\"0\n" +
|
||||
"\x06status\x18\x02 \x01(\tR\x06status\x12\x16\n" +
|
||||
"\x06userId\x18\x03 \x01(\x05R\x06userId\"0\n" +
|
||||
"\x15StreamMessagesRequest\x12\x17\n" +
|
||||
"\auser_id\x18\x01 \x01(\x05R\x06userId\"\xdd\x01\n" +
|
||||
"\aMessage\x12\x0e\n" +
|
||||
|
||||
@ -45,6 +45,7 @@ message GetUserChatsRequest {
|
||||
message UpdateMessageStatusRequest {
|
||||
int32 message_id = 1;
|
||||
string status = 2;
|
||||
int32 userId = 3;
|
||||
}
|
||||
|
||||
message StreamMessagesRequest {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user