feat: join group notification and get seq

pull/2582/head
withchao 1 year ago
parent 4c142821aa
commit bf3cfa13c0

@ -12,7 +12,7 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.72-alpha.11
github.com/openimsdk/protocol v0.0.72-alpha.12
github.com/openimsdk/tools v0.0.50-alpha.11
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0

@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.11 h1:SlSG80C3Y0iOXlsbnh7ZqE9imMoBRy9i+9ebwsbSqfM=
github.com/openimsdk/protocol v0.0.72-alpha.11/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/protocol v0.0.72-alpha.12 h1:GXUtSFXlh1AeOmMjN1CsRfRZMTQYBWZ8mTuRoB7KxLQ=
github.com/openimsdk/protocol v0.0.72-alpha.12/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.50-alpha.11 h1:ClhkRjUVJWbmOiQ14G6do/ES1a6ZueDITv40Apwq/Tc=
github.com/openimsdk/tools v0.0.50-alpha.11/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=

@ -195,10 +195,10 @@ func (g GrpcHandler) PullMessageBySeqList(context context.Context, data *Req) ([
func (g GrpcHandler) GetSeqMessage(context context.Context, data *Req) ([]byte, error) {
req := msg.GetSeqMessageReq{}
if err := proto.Unmarshal(data.Data, &req); err != nil {
return nil, errs.WrapMsg(err, "error unmarshaling request", "action", "unmarshal", "dataType", "PullMessageBySeqsReq")
return nil, errs.WrapMsg(err, "error unmarshaling request", "action", "unmarshal", "dataType", "GetSeqMessage")
}
if err := g.validate.Struct(data); err != nil {
return nil, errs.WrapMsg(err, "validation failed", "action", "validate", "dataType", "PullMessageBySeqsReq")
return nil, errs.WrapMsg(err, "validation failed", "action", "validate", "dataType", "GetSeqMessage")
}
resp, err := g.msgRpcClient.GetSeqMessage(context, &req)
if err != nil {
@ -206,7 +206,7 @@ func (g GrpcHandler) GetSeqMessage(context context.Context, data *Req) ([]byte,
}
c, err := proto.Marshal(resp)
if err != nil {
return nil, errs.WrapMsg(err, "error marshaling response", "action", "marshal", "dataType", "PullMessageBySeqsResp")
return nil, errs.WrapMsg(err, "error marshaling response", "action", "marshal", "dataType", "GetSeqMessage")
}
return c, nil
}

@ -16,27 +16,28 @@ package group
import (
"context"
"errors"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification"
"github.com/openimsdk/protocol/constant"
pbgroup "github.com/openimsdk/protocol/group"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/stringutil"
"go.mongodb.org/mongo-driver/mongo"
)
// GroupApplicationReceiver
@ -246,7 +247,7 @@ func (g *GroupNotificationSender) fillOpUserByUserID(ctx context.Context, userID
member, err := g.db.TakeGroupMember(ctx, groupID, userID)
if err == nil {
*opUser = g.groupMemberDB2PB(member, 0)
} else if !errs.ErrRecordNotFound.Is(err) {
} else if !(errors.Is(err, mongo.ErrNoDocuments) || errs.ErrRecordNotFound.Is(err)) {
return err
}
}

@ -16,16 +16,15 @@ package msg
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/timeutil"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/timeutil"
)
func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessageBySeqsReq) (*sdkws.PullMessageBySeqsResp, error) {
@ -87,26 +86,32 @@ func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessag
}
func (m *msgServer) GetSeqMessage(ctx context.Context, req *msg.GetSeqMessageReq) (*msg.GetSeqMessageResp, error) {
conversations := make(map[string]*msg.ConversationMessage)
resp := &msg.GetSeqMessageResp{
Msgs: make(map[string]*sdkws.PullMsgs),
NotificationMsgs: make(map[string]*sdkws.PullMsgs),
}
for _, conv := range req.Conversations {
if _, ok := conversations[conv.ConversationID]; !ok {
continue
}
_, _, msgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, req.UserID, conv.ConversationID, conv.Seqs)
if err != nil {
return nil, err
}
values := make(map[int64]*sdkws.MsgData)
for i, data := range msgs {
values[data.Seq] = msgs[i]
}
conversations[conv.ConversationID] = &msg.ConversationMessage{
Msgs: values,
var pullMsgs *sdkws.PullMsgs
if ok := false; conversationutil.IsNotificationConversationID(conv.ConversationID) {
pullMsgs, ok = resp.NotificationMsgs[conv.ConversationID]
if !ok {
pullMsgs = &sdkws.PullMsgs{}
resp.NotificationMsgs[conv.ConversationID] = pullMsgs
}
} else {
pullMsgs, ok = resp.Msgs[conv.ConversationID]
if !ok {
pullMsgs = &sdkws.PullMsgs{}
resp.NotificationMsgs[conv.ConversationID] = pullMsgs
}
}
pullMsgs.Msgs = append(pullMsgs.Msgs, msgs...)
}
return &msg.GetSeqMessageResp{
Conversations: conversations,
}, nil
return resp, nil
}
func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sdkws.GetMaxSeqResp, error) {

@ -23,6 +23,10 @@ func IsGroupConversationID(conversationID string) bool {
return strings.HasPrefix(conversationID, "sg_")
}
func IsNotificationConversationID(conversationID string) bool {
return strings.HasPrefix(conversationID, "n_")
}
func GenConversationUniqueKeyForSingle(sendID, recvID string) string {
l := []string{sendID, recvID}
sort.Strings(l)

Loading…
Cancel
Save