From bf3cfa13c0c331c10cde8bc0e5326b321064894f Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Wed, 4 Sep 2024 10:47:06 +0800 Subject: [PATCH] feat: join group notification and get seq --- go.mod | 2 +- go.sum | 4 +- internal/msggateway/message_handler.go | 6 +-- internal/rpc/group/notification.go | 15 +++---- internal/rpc/msg/sync_msg.go | 39 +++++++++++-------- pkg/util/conversationutil/conversationutil.go | 4 ++ 6 files changed, 40 insertions(+), 30 deletions(-) diff --git a/go.mod b/go.mod index 0bf4680a5..76166fa4f 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.11 + github.com/openimsdk/protocol v0.0.72-alpha.12 github.com/openimsdk/tools v0.0.50-alpha.11 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index 49f85ad7c..bfd625d49 100644 --- a/go.sum +++ b/go.sum @@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.11 h1:SlSG80C3Y0iOXlsbnh7ZqE9imMoBRy9i+9ebwsbSqfM= -github.com/openimsdk/protocol v0.0.72-alpha.11/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.72-alpha.12 h1:GXUtSFXlh1AeOmMjN1CsRfRZMTQYBWZ8mTuRoB7KxLQ= +github.com/openimsdk/protocol v0.0.72-alpha.12/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.50-alpha.11 h1:ClhkRjUVJWbmOiQ14G6do/ES1a6ZueDITv40Apwq/Tc= github.com/openimsdk/tools v0.0.50-alpha.11/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= diff --git a/internal/msggateway/message_handler.go b/internal/msggateway/message_handler.go index dfb4ca18a..2f9620ce1 100644 --- a/internal/msggateway/message_handler.go +++ b/internal/msggateway/message_handler.go @@ -195,10 +195,10 @@ func (g GrpcHandler) PullMessageBySeqList(context context.Context, data *Req) ([ func (g GrpcHandler) GetSeqMessage(context context.Context, data *Req) ([]byte, error) { req := msg.GetSeqMessageReq{} if err := proto.Unmarshal(data.Data, &req); err != nil { - return nil, errs.WrapMsg(err, "error unmarshaling request", "action", "unmarshal", "dataType", "PullMessageBySeqsReq") + return nil, errs.WrapMsg(err, "error unmarshaling request", "action", "unmarshal", "dataType", "GetSeqMessage") } if err := g.validate.Struct(data); err != nil { - return nil, errs.WrapMsg(err, "validation failed", "action", "validate", "dataType", "PullMessageBySeqsReq") + return nil, errs.WrapMsg(err, "validation failed", "action", "validate", "dataType", "GetSeqMessage") } resp, err := g.msgRpcClient.GetSeqMessage(context, &req) if err != nil { @@ -206,7 +206,7 @@ func (g GrpcHandler) GetSeqMessage(context context.Context, data *Req) ([]byte, } c, err := proto.Marshal(resp) if err != nil { - return nil, errs.WrapMsg(err, "error marshaling response", "action", "marshal", "dataType", "PullMessageBySeqsResp") + return nil, errs.WrapMsg(err, "error marshaling response", "action", "marshal", "dataType", "GetSeqMessage") } return c, nil } diff --git a/internal/rpc/group/notification.go b/internal/rpc/group/notification.go index f7eb6a1cf..64e922fe2 100644 --- a/internal/rpc/group/notification.go +++ b/internal/rpc/group/notification.go @@ -16,27 +16,28 @@ package group import ( "context" + "errors" "fmt" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" + "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification" - "github.com/openimsdk/protocol/msg" - - "github.com/openimsdk/open-im-server/v3/pkg/authverify" - "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" + "github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification" "github.com/openimsdk/protocol/constant" pbgroup "github.com/openimsdk/protocol/group" + "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/stringutil" + "go.mongodb.org/mongo-driver/mongo" ) // GroupApplicationReceiver @@ -246,7 +247,7 @@ func (g *GroupNotificationSender) fillOpUserByUserID(ctx context.Context, userID member, err := g.db.TakeGroupMember(ctx, groupID, userID) if err == nil { *opUser = g.groupMemberDB2PB(member, 0) - } else if !errs.ErrRecordNotFound.Is(err) { + } else if !(errors.Is(err, mongo.ErrNoDocuments) || errs.ErrRecordNotFound.Is(err)) { return err } } diff --git a/internal/rpc/msg/sync_msg.go b/internal/rpc/msg/sync_msg.go index 6867c438c..0b37b25c2 100644 --- a/internal/rpc/msg/sync_msg.go +++ b/internal/rpc/msg/sync_msg.go @@ -16,16 +16,15 @@ package msg import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil" - "github.com/openimsdk/tools/utils/datautil" - "github.com/openimsdk/tools/utils/timeutil" - "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" + "github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/utils/datautil" + "github.com/openimsdk/tools/utils/timeutil" ) func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessageBySeqsReq) (*sdkws.PullMessageBySeqsResp, error) { @@ -87,26 +86,32 @@ func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessag } func (m *msgServer) GetSeqMessage(ctx context.Context, req *msg.GetSeqMessageReq) (*msg.GetSeqMessageResp, error) { - conversations := make(map[string]*msg.ConversationMessage) + resp := &msg.GetSeqMessageResp{ + Msgs: make(map[string]*sdkws.PullMsgs), + NotificationMsgs: make(map[string]*sdkws.PullMsgs), + } for _, conv := range req.Conversations { - if _, ok := conversations[conv.ConversationID]; !ok { - continue - } _, _, msgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, req.UserID, conv.ConversationID, conv.Seqs) if err != nil { return nil, err } - values := make(map[int64]*sdkws.MsgData) - for i, data := range msgs { - values[data.Seq] = msgs[i] - } - conversations[conv.ConversationID] = &msg.ConversationMessage{ - Msgs: values, + var pullMsgs *sdkws.PullMsgs + if ok := false; conversationutil.IsNotificationConversationID(conv.ConversationID) { + pullMsgs, ok = resp.NotificationMsgs[conv.ConversationID] + if !ok { + pullMsgs = &sdkws.PullMsgs{} + resp.NotificationMsgs[conv.ConversationID] = pullMsgs + } + } else { + pullMsgs, ok = resp.Msgs[conv.ConversationID] + if !ok { + pullMsgs = &sdkws.PullMsgs{} + resp.NotificationMsgs[conv.ConversationID] = pullMsgs + } } + pullMsgs.Msgs = append(pullMsgs.Msgs, msgs...) } - return &msg.GetSeqMessageResp{ - Conversations: conversations, - }, nil + return resp, nil } func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sdkws.GetMaxSeqResp, error) { diff --git a/pkg/util/conversationutil/conversationutil.go b/pkg/util/conversationutil/conversationutil.go index cff543892..f0a44ab1e 100644 --- a/pkg/util/conversationutil/conversationutil.go +++ b/pkg/util/conversationutil/conversationutil.go @@ -23,6 +23,10 @@ func IsGroupConversationID(conversationID string) bool { return strings.HasPrefix(conversationID, "sg_") } +func IsNotificationConversationID(conversationID string) bool { + return strings.HasPrefix(conversationID, "n_") +} + func GenConversationUniqueKeyForSingle(sendID, recvID string) string { l := []string{sendID, recvID} sort.Strings(l)