From 790f39f480ad302725d1c5128c61c555458853bd Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Sat, 31 Aug 2024 17:18:48 +0800 Subject: [PATCH] feat: add GetSeqMessage --- go.mod | 6 ++++- go.sum | 4 ++-- internal/msggateway/client.go | 2 ++ internal/msggateway/constant.go | 1 + internal/msggateway/message_handler.go | 20 ++++++++++++++++ internal/rpc/msg/sync_msg.go | 23 +++++++++++++++++++ pkg/rpcclient/msg.go | 4 ++++ pkg/util/conversationutil/conversationutil.go | 4 ++++ 8 files changed, 61 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index ae30db056..cba81a1e9 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.9 + github.com/openimsdk/protocol v0.0.72-alpha.10 github.com/openimsdk/tools v0.0.49-alpha.55 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 @@ -194,3 +194,7 @@ require ( golang.org/x/crypto v0.21.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect ) + +//replace ( +// github.com/openimsdk/protocol => /Users/chao/Desktop/withchao/protocol +//) diff --git a/go.sum b/go.sum index 815b6badf..9fda15811 100644 --- a/go.sum +++ b/go.sum @@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.9 h1:Dyx4vs88IU4rJ2YcP/TdYp4ww8JjsMkV89hB/Eazx+A= -github.com/openimsdk/protocol v0.0.72-alpha.9/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.72-alpha.10 h1:5nroDqaPFj44q6fZk98PRtwEt9VDi3Kg6nVIRCPZ/oE= +github.com/openimsdk/protocol v0.0.72-alpha.10/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.49-alpha.55 h1:KPgC53oqiwZYssLKljhtXbWXifMlTj2SSQEusj4Uf4k= github.com/openimsdk/tools v0.0.49-alpha.55/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index dcb15b70d..5a274f9e7 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -220,6 +220,8 @@ func (c *Client) handleMessage(message []byte) error { resp, messageErr = c.longConnServer.SendSignalMessage(ctx, binaryReq) case WSPullMsgBySeqList: resp, messageErr = c.longConnServer.PullMessageBySeqList(ctx, binaryReq) + case WSPullMsg: + resp, messageErr = c.longConnServer.GetSeqMessage(ctx, binaryReq) case WsLogoutMsg: resp, messageErr = c.longConnServer.UserLogout(ctx, binaryReq) case WsSetBackgroundStatus: diff --git a/internal/msggateway/constant.go b/internal/msggateway/constant.go index dc5ad7786..154be014e 100644 --- a/internal/msggateway/constant.go +++ b/internal/msggateway/constant.go @@ -39,6 +39,7 @@ const ( WSPullMsgBySeqList = 1002 WSSendMsg = 1003 WSSendSignalMsg = 1004 + WSPullMsg = 1005 WSPushMsg = 2001 WSKickOnlineMsg = 2002 WsLogoutMsg = 2003 diff --git a/internal/msggateway/message_handler.go b/internal/msggateway/message_handler.go index 8a11e6ab3..dfb4ca18a 100644 --- a/internal/msggateway/message_handler.go +++ b/internal/msggateway/message_handler.go @@ -94,6 +94,7 @@ type MessageHandler interface { SendMessage(context context.Context, data *Req) ([]byte, error) SendSignalMessage(context context.Context, data *Req) ([]byte, error) PullMessageBySeqList(context context.Context, data *Req) ([]byte, error) + GetSeqMessage(context context.Context, data *Req) ([]byte, error) UserLogout(context context.Context, data *Req) ([]byte, error) SetUserDeviceBackground(context context.Context, data *Req) ([]byte, bool, error) } @@ -191,6 +192,25 @@ func (g GrpcHandler) PullMessageBySeqList(context context.Context, data *Req) ([ return c, nil } +func (g GrpcHandler) GetSeqMessage(context context.Context, data *Req) ([]byte, error) { + req := msg.GetSeqMessageReq{} + if err := proto.Unmarshal(data.Data, &req); err != nil { + return nil, errs.WrapMsg(err, "error unmarshaling request", "action", "unmarshal", "dataType", "PullMessageBySeqsReq") + } + if err := g.validate.Struct(data); err != nil { + return nil, errs.WrapMsg(err, "validation failed", "action", "validate", "dataType", "PullMessageBySeqsReq") + } + resp, err := g.msgRpcClient.GetSeqMessage(context, &req) + if err != nil { + return nil, err + } + c, err := proto.Marshal(resp) + if err != nil { + return nil, errs.WrapMsg(err, "error marshaling response", "action", "marshal", "dataType", "PullMessageBySeqsResp") + } + return c, nil +} + func (g GrpcHandler) UserLogout(context context.Context, data *Req) ([]byte, error) { req := push.DelUserPushTokenReq{} if err := proto.Unmarshal(data.Data, &req); err != nil { diff --git a/internal/rpc/msg/sync_msg.go b/internal/rpc/msg/sync_msg.go index f5b5ebda5..6867c438c 100644 --- a/internal/rpc/msg/sync_msg.go +++ b/internal/rpc/msg/sync_msg.go @@ -86,6 +86,29 @@ func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessag return resp, nil } +func (m *msgServer) GetSeqMessage(ctx context.Context, req *msg.GetSeqMessageReq) (*msg.GetSeqMessageResp, error) { + conversations := make(map[string]*msg.ConversationMessage) + for _, conv := range req.Conversations { + if _, ok := conversations[conv.ConversationID]; !ok { + continue + } + _, _, msgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, req.UserID, conv.ConversationID, conv.Seqs) + if err != nil { + return nil, err + } + values := make(map[int64]*sdkws.MsgData) + for i, data := range msgs { + values[data.Seq] = msgs[i] + } + conversations[conv.ConversationID] = &msg.ConversationMessage{ + Msgs: values, + } + } + return &msg.GetSeqMessageResp{ + Conversations: conversations, + }, nil +} + func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sdkws.GetMaxSeqResp, error) { if err := authverify.CheckAccessV3(ctx, req.UserID, m.config.Share.IMAdminUserID); err != nil { return nil, err diff --git a/pkg/rpcclient/msg.go b/pkg/rpcclient/msg.go index 715014800..5a06dac5d 100644 --- a/pkg/rpcclient/msg.go +++ b/pkg/rpcclient/msg.go @@ -221,6 +221,10 @@ func (m *MessageRpcClient) PullMessageBySeqList(ctx context.Context, req *sdkws. return resp, nil } +func (m *MessageRpcClient) GetSeqMessage(ctx context.Context, req *msg.GetSeqMessageReq) (*msg.GetSeqMessageResp, error) { + return m.Client.GetSeqMessage(ctx, req) +} + func (m *MessageRpcClient) GetConversationMaxSeq(ctx context.Context, conversationID string) (int64, error) { resp, err := m.Client.GetConversationMaxSeq(ctx, &msg.GetConversationMaxSeqReq{ConversationID: conversationID}) if err != nil { diff --git a/pkg/util/conversationutil/conversationutil.go b/pkg/util/conversationutil/conversationutil.go index 5683d8df8..cff543892 100644 --- a/pkg/util/conversationutil/conversationutil.go +++ b/pkg/util/conversationutil/conversationutil.go @@ -19,6 +19,10 @@ func GenGroupConversationID(groupID string) string { return "sg_" + groupID } +func IsGroupConversationID(conversationID string) bool { + return strings.HasPrefix(conversationID, "sg_") +} + func GenConversationUniqueKeyForSingle(sendID, recvID string) string { l := []string{sendID, recvID} sort.Strings(l)