feat: add GetSeqMessage

pull/2582/head
withchao 1 year ago
parent 36ff8d4fa5
commit 790f39f480

@ -12,7 +12,7 @@ require (
github.com/gorilla/websocket v1.5.1 github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0 github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.72-alpha.9 github.com/openimsdk/protocol v0.0.72-alpha.10
github.com/openimsdk/tools v0.0.49-alpha.55 github.com/openimsdk/tools v0.0.49-alpha.55
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_golang v1.18.0
@ -194,3 +194,7 @@ require (
golang.org/x/crypto v0.21.0 // indirect golang.org/x/crypto v0.21.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect
) )
//replace (
// github.com/openimsdk/protocol => /Users/chao/Desktop/withchao/protocol
//)

@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.9 h1:Dyx4vs88IU4rJ2YcP/TdYp4ww8JjsMkV89hB/Eazx+A= github.com/openimsdk/protocol v0.0.72-alpha.10 h1:5nroDqaPFj44q6fZk98PRtwEt9VDi3Kg6nVIRCPZ/oE=
github.com/openimsdk/protocol v0.0.72-alpha.9/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/protocol v0.0.72-alpha.10/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.49-alpha.55 h1:KPgC53oqiwZYssLKljhtXbWXifMlTj2SSQEusj4Uf4k= github.com/openimsdk/tools v0.0.49-alpha.55 h1:KPgC53oqiwZYssLKljhtXbWXifMlTj2SSQEusj4Uf4k=
github.com/openimsdk/tools v0.0.49-alpha.55/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/openimsdk/tools v0.0.49-alpha.55/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=

@ -220,6 +220,8 @@ func (c *Client) handleMessage(message []byte) error {
resp, messageErr = c.longConnServer.SendSignalMessage(ctx, binaryReq) resp, messageErr = c.longConnServer.SendSignalMessage(ctx, binaryReq)
case WSPullMsgBySeqList: case WSPullMsgBySeqList:
resp, messageErr = c.longConnServer.PullMessageBySeqList(ctx, binaryReq) resp, messageErr = c.longConnServer.PullMessageBySeqList(ctx, binaryReq)
case WSPullMsg:
resp, messageErr = c.longConnServer.GetSeqMessage(ctx, binaryReq)
case WsLogoutMsg: case WsLogoutMsg:
resp, messageErr = c.longConnServer.UserLogout(ctx, binaryReq) resp, messageErr = c.longConnServer.UserLogout(ctx, binaryReq)
case WsSetBackgroundStatus: case WsSetBackgroundStatus:

@ -39,6 +39,7 @@ const (
WSPullMsgBySeqList = 1002 WSPullMsgBySeqList = 1002
WSSendMsg = 1003 WSSendMsg = 1003
WSSendSignalMsg = 1004 WSSendSignalMsg = 1004
WSPullMsg = 1005
WSPushMsg = 2001 WSPushMsg = 2001
WSKickOnlineMsg = 2002 WSKickOnlineMsg = 2002
WsLogoutMsg = 2003 WsLogoutMsg = 2003

@ -94,6 +94,7 @@ type MessageHandler interface {
SendMessage(context context.Context, data *Req) ([]byte, error) SendMessage(context context.Context, data *Req) ([]byte, error)
SendSignalMessage(context context.Context, data *Req) ([]byte, error) SendSignalMessage(context context.Context, data *Req) ([]byte, error)
PullMessageBySeqList(context context.Context, data *Req) ([]byte, error) PullMessageBySeqList(context context.Context, data *Req) ([]byte, error)
GetSeqMessage(context context.Context, data *Req) ([]byte, error)
UserLogout(context context.Context, data *Req) ([]byte, error) UserLogout(context context.Context, data *Req) ([]byte, error)
SetUserDeviceBackground(context context.Context, data *Req) ([]byte, bool, error) SetUserDeviceBackground(context context.Context, data *Req) ([]byte, bool, error)
} }
@ -191,6 +192,25 @@ func (g GrpcHandler) PullMessageBySeqList(context context.Context, data *Req) ([
return c, nil return c, nil
} }
func (g GrpcHandler) GetSeqMessage(context context.Context, data *Req) ([]byte, error) {
req := msg.GetSeqMessageReq{}
if err := proto.Unmarshal(data.Data, &req); err != nil {
return nil, errs.WrapMsg(err, "error unmarshaling request", "action", "unmarshal", "dataType", "PullMessageBySeqsReq")
}
if err := g.validate.Struct(data); err != nil {
return nil, errs.WrapMsg(err, "validation failed", "action", "validate", "dataType", "PullMessageBySeqsReq")
}
resp, err := g.msgRpcClient.GetSeqMessage(context, &req)
if err != nil {
return nil, err
}
c, err := proto.Marshal(resp)
if err != nil {
return nil, errs.WrapMsg(err, "error marshaling response", "action", "marshal", "dataType", "PullMessageBySeqsResp")
}
return c, nil
}
func (g GrpcHandler) UserLogout(context context.Context, data *Req) ([]byte, error) { func (g GrpcHandler) UserLogout(context context.Context, data *Req) ([]byte, error) {
req := push.DelUserPushTokenReq{} req := push.DelUserPushTokenReq{}
if err := proto.Unmarshal(data.Data, &req); err != nil { if err := proto.Unmarshal(data.Data, &req); err != nil {

@ -86,6 +86,29 @@ func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessag
return resp, nil return resp, nil
} }
func (m *msgServer) GetSeqMessage(ctx context.Context, req *msg.GetSeqMessageReq) (*msg.GetSeqMessageResp, error) {
conversations := make(map[string]*msg.ConversationMessage)
for _, conv := range req.Conversations {
if _, ok := conversations[conv.ConversationID]; !ok {
continue
}
_, _, msgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, req.UserID, conv.ConversationID, conv.Seqs)
if err != nil {
return nil, err
}
values := make(map[int64]*sdkws.MsgData)
for i, data := range msgs {
values[data.Seq] = msgs[i]
}
conversations[conv.ConversationID] = &msg.ConversationMessage{
Msgs: values,
}
}
return &msg.GetSeqMessageResp{
Conversations: conversations,
}, nil
}
func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sdkws.GetMaxSeqResp, error) { func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sdkws.GetMaxSeqResp, error) {
if err := authverify.CheckAccessV3(ctx, req.UserID, m.config.Share.IMAdminUserID); err != nil { if err := authverify.CheckAccessV3(ctx, req.UserID, m.config.Share.IMAdminUserID); err != nil {
return nil, err return nil, err

@ -221,6 +221,10 @@ func (m *MessageRpcClient) PullMessageBySeqList(ctx context.Context, req *sdkws.
return resp, nil return resp, nil
} }
func (m *MessageRpcClient) GetSeqMessage(ctx context.Context, req *msg.GetSeqMessageReq) (*msg.GetSeqMessageResp, error) {
return m.Client.GetSeqMessage(ctx, req)
}
func (m *MessageRpcClient) GetConversationMaxSeq(ctx context.Context, conversationID string) (int64, error) { func (m *MessageRpcClient) GetConversationMaxSeq(ctx context.Context, conversationID string) (int64, error) {
resp, err := m.Client.GetConversationMaxSeq(ctx, &msg.GetConversationMaxSeqReq{ConversationID: conversationID}) resp, err := m.Client.GetConversationMaxSeq(ctx, &msg.GetConversationMaxSeqReq{ConversationID: conversationID})
if err != nil { if err != nil {

@ -19,6 +19,10 @@ func GenGroupConversationID(groupID string) string {
return "sg_" + groupID return "sg_" + groupID
} }
func IsGroupConversationID(conversationID string) bool {
return strings.HasPrefix(conversationID, "sg_")
}
func GenConversationUniqueKeyForSingle(sendID, recvID string) string { func GenConversationUniqueKeyForSingle(sendID, recvID string) string {
l := []string{sendID, recvID} l := []string{sendID, recvID}
sort.Strings(l) sort.Strings(l)

Loading…
Cancel
Save