feat: supports getting messages based on session ID and seq (#2582)

* fix: GroupApplicationAcceptedNotification

* fix: GroupApplicationAcceptedNotification

* fix: NotificationUserInfoUpdate

* cicd: robot automated Change

* fix: component

* fix: getConversationInfo

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

* fix: minio config url recognition error

* update gomake version

* update gomake version

* fix: seq conversion bug

* fix: redis pipe exec

* fix: ImportFriends

* fix: A large number of logs keysAndValues ​​length is not even

* feat: mark read aggregate write

* feat: online status supports redis cluster

* feat: online status supports redis cluster

* feat: online status supports redis cluster

* merge

* merge

* read seq is written to mongo

* read seq is written to mongo

* fix: invitation to join group notification

* fix: friend op_user_id

* feat: optimizing asynchronous context

* feat: optimizing memamq size

* feat: add GetSeqMessage

* feat: GroupApplicationAgreeMemberEnterNotification

* feat: GroupApplicationAgreeMemberEnterNotification

* feat: go.mod

* feat: go.mod

* feat: join group notification and get seq

---------

Co-authored-by: withchao <withchao@users.noreply.github.com>
pull/2611/head
chao 4 weeks ago committed by GitHub
parent 38a8802107
commit 2477a6658c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -12,7 +12,7 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.72-alpha.9
github.com/openimsdk/protocol v0.0.72-alpha.12
github.com/openimsdk/tools v0.0.50-alpha.11
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0

@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.9 h1:Dyx4vs88IU4rJ2YcP/TdYp4ww8JjsMkV89hB/Eazx+A=
github.com/openimsdk/protocol v0.0.72-alpha.9/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/protocol v0.0.72-alpha.12 h1:GXUtSFXlh1AeOmMjN1CsRfRZMTQYBWZ8mTuRoB7KxLQ=
github.com/openimsdk/protocol v0.0.72-alpha.12/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.50-alpha.11 h1:ClhkRjUVJWbmOiQ14G6do/ES1a6ZueDITv40Apwq/Tc=
github.com/openimsdk/tools v0.0.50-alpha.11/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=

@ -220,6 +220,8 @@ func (c *Client) handleMessage(message []byte) error {
resp, messageErr = c.longConnServer.SendSignalMessage(ctx, binaryReq)
case WSPullMsgBySeqList:
resp, messageErr = c.longConnServer.PullMessageBySeqList(ctx, binaryReq)
case WSPullMsg:
resp, messageErr = c.longConnServer.GetSeqMessage(ctx, binaryReq)
case WsLogoutMsg:
resp, messageErr = c.longConnServer.UserLogout(ctx, binaryReq)
case WsSetBackgroundStatus:

@ -39,6 +39,7 @@ const (
WSPullMsgBySeqList = 1002
WSSendMsg = 1003
WSSendSignalMsg = 1004
WSPullMsg = 1005
WSPushMsg = 2001
WSKickOnlineMsg = 2002
WsLogoutMsg = 2003

@ -94,6 +94,7 @@ type MessageHandler interface {
SendMessage(context context.Context, data *Req) ([]byte, error)
SendSignalMessage(context context.Context, data *Req) ([]byte, error)
PullMessageBySeqList(context context.Context, data *Req) ([]byte, error)
GetSeqMessage(context context.Context, data *Req) ([]byte, error)
UserLogout(context context.Context, data *Req) ([]byte, error)
SetUserDeviceBackground(context context.Context, data *Req) ([]byte, bool, error)
}
@ -191,6 +192,25 @@ func (g GrpcHandler) PullMessageBySeqList(context context.Context, data *Req) ([
return c, nil
}
func (g GrpcHandler) GetSeqMessage(context context.Context, data *Req) ([]byte, error) {
req := msg.GetSeqMessageReq{}
if err := proto.Unmarshal(data.Data, &req); err != nil {
return nil, errs.WrapMsg(err, "error unmarshaling request", "action", "unmarshal", "dataType", "GetSeqMessage")
}
if err := g.validate.Struct(data); err != nil {
return nil, errs.WrapMsg(err, "validation failed", "action", "validate", "dataType", "GetSeqMessage")
}
resp, err := g.msgRpcClient.GetSeqMessage(context, &req)
if err != nil {
return nil, err
}
c, err := proto.Marshal(resp)
if err != nil {
return nil, errs.WrapMsg(err, "error marshaling response", "action", "marshal", "dataType", "GetSeqMessage")
}
return c, nil
}
func (g GrpcHandler) UserLogout(context context.Context, data *Req) ([]byte, error) {
req := push.DelUserPushTokenReq{}
if err := proto.Unmarshal(data.Data, &req); err != nil {

@ -833,7 +833,7 @@ func (g *groupServer) GroupApplicationResponse(ctx context.Context, req *pbgroup
if member == nil {
log.ZDebug(ctx, "GroupApplicationResponse", "member is nil")
} else {
if err = g.notification.MemberEnterNotification(ctx, req.GroupID, req.FromUserID); err != nil {
if err = g.notification.GroupApplicationAgreeMemberEnterNotification(ctx, req.GroupID, groupRequest.InviterUserID, req.FromUserID); err != nil {
return nil, err
}
}

@ -16,27 +16,28 @@ package group
import (
"context"
"errors"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification"
"github.com/openimsdk/protocol/constant"
pbgroup "github.com/openimsdk/protocol/group"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/stringutil"
"go.mongodb.org/mongo-driver/mongo"
)
// GroupApplicationReceiver
@ -227,10 +228,13 @@ func (g *GroupNotificationSender) groupMemberDB2PB(member *model.GroupMember, ap
} */
func (g *GroupNotificationSender) fillOpUser(ctx context.Context, opUser **sdkws.GroupMemberFullInfo, groupID string) (err error) {
return g.fillOpUserByUserID(ctx, mcontext.GetOpUserID(ctx), opUser, groupID)
}
func (g *GroupNotificationSender) fillOpUserByUserID(ctx context.Context, userID string, opUser **sdkws.GroupMemberFullInfo, groupID string) error {
if opUser == nil {
return errs.ErrInternalServer.WrapMsg("**sdkws.GroupMemberFullInfo is nil")
}
userID := mcontext.GetOpUserID(ctx)
if groupID != "" {
if authverify.IsManagerUserID(userID, g.config.Share.IMAdminUserID) {
*opUser = &sdkws.GroupMemberFullInfo{
@ -243,7 +247,7 @@ func (g *GroupNotificationSender) fillOpUser(ctx context.Context, opUser **sdkws
member, err := g.db.TakeGroupMember(ctx, groupID, userID)
if err == nil {
*opUser = g.groupMemberDB2PB(member, 0)
} else if !errs.ErrRecordNotFound.Is(err) {
} else if !(errors.Is(err, mongo.ErrNoDocuments) || errs.ErrRecordNotFound.Is(err)) {
return err
}
}
@ -509,7 +513,7 @@ func (g *GroupNotificationSender) MemberKickedNotification(ctx context.Context,
g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.MemberKickedNotification, tips)
}
func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, groupID string, entrantUserID ...string) error {
func (g *GroupNotificationSender) GroupApplicationAgreeMemberEnterNotification(ctx context.Context, groupID string, invitedOpUserID string, entrantUserID ...string) error {
var err error
defer func() {
if err != nil {
@ -546,15 +550,32 @@ func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, g
return err
}
tips := &sdkws.MemberInvitedTips{Group: group, InvitedUserList: users}
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
tips := &sdkws.MemberInvitedTips{
Group: group,
InvitedUserList: users,
}
opUserID := mcontext.GetOpUserID(ctx)
if err = g.fillOpUserByUserID(ctx, opUserID, &tips.OpUser, tips.Group.GroupID); err != nil {
return nil
}
switch {
case invitedOpUserID == "":
case invitedOpUserID == opUserID:
tips.InviterUser = tips.OpUser
default:
if err = g.fillOpUserByUserID(ctx, invitedOpUserID, &tips.InviterUser, tips.Group.GroupID); err != nil {
return err
}
}
g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID)
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberInvitedNotification, tips)
return nil
}
func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, groupID string, entrantUserID ...string) error {
return g.GroupApplicationAgreeMemberEnterNotification(ctx, groupID, "", entrantUserID...)
}
func (g *GroupNotificationSender) GroupDismissedNotification(ctx context.Context, tips *sdkws.GroupDismissedTips) {
var err error
defer func() {

@ -16,16 +16,15 @@ package msg
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/timeutil"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/timeutil"
)
func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessageBySeqsReq) (*sdkws.PullMessageBySeqsResp, error) {
@ -86,6 +85,35 @@ func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessag
return resp, nil
}
func (m *msgServer) GetSeqMessage(ctx context.Context, req *msg.GetSeqMessageReq) (*msg.GetSeqMessageResp, error) {
resp := &msg.GetSeqMessageResp{
Msgs: make(map[string]*sdkws.PullMsgs),
NotificationMsgs: make(map[string]*sdkws.PullMsgs),
}
for _, conv := range req.Conversations {
_, _, msgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, req.UserID, conv.ConversationID, conv.Seqs)
if err != nil {
return nil, err
}
var pullMsgs *sdkws.PullMsgs
if ok := false; conversationutil.IsNotificationConversationID(conv.ConversationID) {
pullMsgs, ok = resp.NotificationMsgs[conv.ConversationID]
if !ok {
pullMsgs = &sdkws.PullMsgs{}
resp.NotificationMsgs[conv.ConversationID] = pullMsgs
}
} else {
pullMsgs, ok = resp.Msgs[conv.ConversationID]
if !ok {
pullMsgs = &sdkws.PullMsgs{}
resp.NotificationMsgs[conv.ConversationID] = pullMsgs
}
}
pullMsgs.Msgs = append(pullMsgs.Msgs, msgs...)
}
return resp, nil
}
func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sdkws.GetMaxSeqResp, error) {
if err := authverify.CheckAccessV3(ctx, req.UserID, m.config.Share.IMAdminUserID); err != nil {
return nil, err

@ -221,6 +221,10 @@ func (m *MessageRpcClient) PullMessageBySeqList(ctx context.Context, req *sdkws.
return resp, nil
}
func (m *MessageRpcClient) GetSeqMessage(ctx context.Context, req *msg.GetSeqMessageReq) (*msg.GetSeqMessageResp, error) {
return m.Client.GetSeqMessage(ctx, req)
}
func (m *MessageRpcClient) GetConversationMaxSeq(ctx context.Context, conversationID string) (int64, error) {
resp, err := m.Client.GetConversationMaxSeq(ctx, &msg.GetConversationMaxSeqReq{ConversationID: conversationID})
if err != nil {

@ -19,6 +19,14 @@ func GenGroupConversationID(groupID string) string {
return "sg_" + groupID
}
func IsGroupConversationID(conversationID string) bool {
return strings.HasPrefix(conversationID, "sg_")
}
func IsNotificationConversationID(conversationID string) bool {
return strings.HasPrefix(conversationID, "n_")
}
func GenConversationUniqueKeyForSingle(sendID, recvID string) string {
l := []string{sendID, recvID}
sort.Strings(l)

Loading…
Cancel
Save