merge: list -> fix-resp

pull/2379/head
icey-yu 1 year ago
commit 996fd4ea7c

@ -13,7 +13,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect
github.com/mitchellh/mapstructure v1.5.0 github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.69-alpha.12 github.com/openimsdk/protocol v0.0.69-alpha.17
github.com/openimsdk/tools v0.0.49-alpha.28 github.com/openimsdk/tools v0.0.49-alpha.28
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_golang v1.18.0
@ -176,3 +176,5 @@ require (
golang.org/x/crypto v0.21.0 // indirect golang.org/x/crypto v0.21.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect
) )
//replace github.com/openimsdk/protocol => /Users/chao/Desktop/project/protocol

@ -270,8 +270,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJp0= github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJp0=
github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.69-alpha.12 h1:3ZdwmD1y9vcduIC8o2EZS8Ds/fByqcuEFo+NkcBzgRo= github.com/openimsdk/protocol v0.0.69-alpha.17 h1:pEag4ZdlovE+AyLsw1VYFU/3sk6ayvGdPzgufQfKf9M=
github.com/openimsdk/protocol v0.0.69-alpha.12/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/protocol v0.0.69-alpha.17/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.49-alpha.28 h1:1CfdFxvKzyOIvgNMVMq4ZB2upAJ0evLbbigOhWQzhu8= github.com/openimsdk/tools v0.0.49-alpha.28 h1:1CfdFxvKzyOIvgNMVMq4ZB2upAJ0evLbbigOhWQzhu8=
github.com/openimsdk/tools v0.0.49-alpha.28/go.mod h1:rwsFI1G/nBHNfiNapbven41akRDPBbH4df0Cgy6xueU= github.com/openimsdk/tools v0.0.49-alpha.28/go.mod h1:rwsFI1G/nBHNfiNapbven41akRDPBbH4df0Cgy6xueU=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=

@ -180,7 +180,7 @@ func (c *ConsumerHandler) shouldPushOffline(_ context.Context, msg *sdkws.MsgDat
} }
func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID) log.ZDebug(ctx, "Get group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
var pushToUserIDs []string var pushToUserIDs []string
if err = c.webhookBeforeGroupOnlinePush(ctx, &c.config.WebhooksConfig.BeforeGroupOnlinePush, groupID, msg, if err = c.webhookBeforeGroupOnlinePush(ctx, &c.config.WebhooksConfig.BeforeGroupOnlinePush, groupID, msg,
&pushToUserIDs); err != nil { &pushToUserIDs); err != nil {

@ -11,6 +11,22 @@ import (
"github.com/openimsdk/protocol/relation" "github.com/openimsdk/protocol/relation"
) )
func (s *friendServer) NotificationUserInfoUpdate(ctx context.Context, req *relation.NotificationUserInfoUpdateReq) (*relation.NotificationUserInfoUpdateResp, error) {
userIDs, err := s.db.FindFriendUserIDs(ctx, req.UserID)
if err != nil {
return nil, err
}
for _, userID := range userIDs {
if err := s.db.OwnerIncrVersion(ctx, userID, []string{req.UserID}, model.VersionStateUpdate); err != nil {
return nil, err
}
}
for _, userID := range userIDs {
s.notificationSender.FriendInfoUpdatedNotification(ctx, req.UserID, userID)
}
return &relation.NotificationUserInfoUpdateResp{}, nil
}
func (s *friendServer) GetFullFriendUserIDs(ctx context.Context, req *relation.GetFullFriendUserIDsReq) (*relation.GetFullFriendUserIDsResp, error) { func (s *friendServer) GetFullFriendUserIDs(ctx context.Context, req *relation.GetFullFriendUserIDsReq) (*relation.GetFullFriendUserIDsResp, error) {
vl, err := s.db.FindMaxFriendVersionCache(ctx, req.UserID) vl, err := s.db.FindMaxFriendVersionCache(ctx, req.UserID)
if err != nil { if err != nil {

@ -17,17 +17,18 @@ package group
import ( import (
"context" "context"
"fmt" "fmt"
"math/big"
"math/rand"
"strconv"
"strings"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/common" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/common"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/localcache" "github.com/openimsdk/open-im-server/v3/pkg/localcache"
"math/big"
"math/rand"
"strconv"
"strings"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
@ -132,13 +133,17 @@ func (s *groupServer) NotificationUserInfoUpdate(ctx context.Context, req *pbgro
} }
groupIDs = append(groupIDs, member.GroupID) groupIDs = append(groupIDs, member.GroupID)
} }
for _, groupID := range groupIDs {
if err := s.db.MemberGroupIncrVersion(ctx, groupID, []string{req.UserID}, model.VersionStateUpdate); err != nil {
return nil, err
}
}
for _, groupID := range groupIDs { for _, groupID := range groupIDs {
s.notification.GroupMemberInfoSetNotification(ctx, groupID, req.UserID) s.notification.GroupMemberInfoSetNotification(ctx, groupID, req.UserID)
} }
if err = s.db.DeleteGroupMemberHash(ctx, groupIDs); err != nil { if err = s.db.DeleteGroupMemberHash(ctx, groupIDs); err != nil {
return nil, err return nil, err
} }
return &pbgroup.NotificationUserInfoUpdateResp{}, nil return &pbgroup.NotificationUserInfoUpdateResp{}, nil
} }
@ -527,6 +532,14 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbgroup.KickGrou
if datautil.Contain(opUserID, req.KickedUserIDs...) { if datautil.Contain(opUserID, req.KickedUserIDs...) {
return nil, errs.ErrArgs.WrapMsg("opUserID in KickedUserIDs") return nil, errs.ErrArgs.WrapMsg("opUserID in KickedUserIDs")
} }
owner, err := s.db.TakeGroupOwner(ctx, req.GroupID)
if err != nil {
return nil, err
}
if datautil.Contain(owner.UserID, req.KickedUserIDs...) {
return nil, errs.ErrArgs.WrapMsg("ownerUID can not Kick")
}
members, err := s.db.FindGroupMembers(ctx, req.GroupID, append(req.KickedUserIDs, opUserID)) members, err := s.db.FindGroupMembers(ctx, req.GroupID, append(req.KickedUserIDs, opUserID))
if err != nil { if err != nil {
return nil, err return nil, err

@ -324,6 +324,7 @@ func (g *GroupNotificationSender) GroupInfoSetNotification(ctx context.Context,
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return return
} }
g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID)
g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetNotification, tips, rpcclient.WithRpcGetUserName()) g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetNotification, tips, rpcclient.WithRpcGetUserName())
} }
@ -337,6 +338,7 @@ func (g *GroupNotificationSender) GroupInfoSetNameNotification(ctx context.Conte
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return return
} }
g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID)
g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetNameNotification, tips) g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetNameNotification, tips)
} }
@ -350,6 +352,7 @@ func (g *GroupNotificationSender) GroupInfoSetAnnouncementNotification(ctx conte
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return return
} }
g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID)
g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetAnnouncementNotification, tips, rpcclient.WithRpcGetUserName()) g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetAnnouncementNotification, tips, rpcclient.WithRpcGetUserName())
} }
@ -474,11 +477,16 @@ func (g *GroupNotificationSender) GroupOwnerTransferredNotification(ctx context.
} }
opUserID := mcontext.GetOpUserID(ctx) opUserID := mcontext.GetOpUserID(ctx)
var member map[string]*sdkws.GroupMemberFullInfo var member map[string]*sdkws.GroupMemberFullInfo
member, err = g.getGroupMemberMap(ctx, req.GroupID, []string{opUserID, req.NewOwnerUserID}) member, err = g.getGroupMemberMap(ctx, req.GroupID, []string{opUserID, req.NewOwnerUserID, req.OldOwnerUserID})
if err != nil { if err != nil {
return return
} }
tips := &sdkws.GroupOwnerTransferredTips{Group: group, OpUser: member[opUserID], NewGroupOwner: member[req.NewOwnerUserID]} tips := &sdkws.GroupOwnerTransferredTips{
Group: group,
OpUser: member[opUserID],
NewGroupOwner: member[req.NewOwnerUserID],
OldGroupOwnerInfo: member[req.OldOwnerUserID],
}
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return return
} }
@ -636,6 +644,7 @@ func (g *GroupNotificationSender) GroupMutedNotification(ctx context.Context, gr
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return return
} }
g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, groupID)
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMutedNotification, tips) g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMutedNotification, tips)
} }
@ -663,6 +672,7 @@ func (g *GroupNotificationSender) GroupCancelMutedNotification(ctx context.Conte
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return return
} }
g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, groupID)
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupCancelMutedNotification, tips) g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupCancelMutedNotification, tips)
} }

@ -4,10 +4,13 @@ import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/internal/rpc/incrversion" "github.com/openimsdk/open-im-server/v3/internal/rpc/incrversion"
"github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/util/hashutil" "github.com/openimsdk/open-im-server/v3/pkg/util/hashutil"
"github.com/openimsdk/protocol/constant"
pbgroup "github.com/openimsdk/protocol/group" pbgroup "github.com/openimsdk/protocol/group"
"github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/sdkws"
"slices"
) )
func (s *groupServer) GetFullGroupMemberUserIDs(ctx context.Context, req *pbgroup.GetFullGroupMemberUserIDsReq) (*pbgroup.GetFullGroupMemberUserIDsResp, error) { func (s *groupServer) GetFullGroupMemberUserIDs(ctx context.Context, req *pbgroup.GetFullGroupMemberUserIDsReq) (*pbgroup.GetFullGroupMemberUserIDsResp, error) {
@ -53,12 +56,34 @@ func (s *groupServer) GetFullJoinGroupIDs(ctx context.Context, req *pbgroup.GetF
} }
func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgroup.GetIncrementalGroupMemberReq) (*pbgroup.GetIncrementalGroupMemberResp, error) { func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgroup.GetIncrementalGroupMemberReq) (*pbgroup.GetIncrementalGroupMemberResp, error) {
group, err := s.db.TakeGroup(ctx, req.GroupID)
if err != nil {
return nil, err
}
if group.Status == constant.GroupStatusDismissed {
return nil, servererrs.ErrDismissedAlready.Wrap()
}
var hasGroupUpdate bool
opt := incrversion.Option[*sdkws.GroupMemberFullInfo, pbgroup.GetIncrementalGroupMemberResp]{ opt := incrversion.Option[*sdkws.GroupMemberFullInfo, pbgroup.GetIncrementalGroupMemberResp]{
Ctx: ctx, Ctx: ctx,
VersionKey: req.GroupID, VersionKey: req.GroupID,
VersionID: req.VersionID, VersionID: req.VersionID,
VersionNumber: req.Version, VersionNumber: req.Version,
Version: s.db.FindMemberIncrVersion, Version: func(ctx context.Context, groupID string, version uint, limit int) (*model.VersionLog, error) {
vl, err := s.db.FindMemberIncrVersion(ctx, groupID, version, limit)
if err != nil {
return nil, err
}
vl.Logs = slices.DeleteFunc(vl.Logs, func(elem model.VersionLogElem) bool {
if elem.EID == "" {
vl.LogLen--
hasGroupUpdate = true
return true
}
return false
})
return vl, nil
},
CacheMaxVersion: s.db.FindMaxGroupMemberVersionCache, CacheMaxVersion: s.db.FindMaxGroupMemberVersionCache,
Find: func(ctx context.Context, ids []string) ([]*sdkws.GroupMemberFullInfo, error) { Find: func(ctx context.Context, ids []string) ([]*sdkws.GroupMemberFullInfo, error) {
return s.getGroupMembersInfo(ctx, req.GroupID, ids) return s.getGroupMembersInfo(ctx, req.GroupID, ids)
@ -75,7 +100,22 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou
} }
}, },
} }
return opt.Build() resp, err := opt.Build()
if err != nil {
return nil, err
}
if resp.Full || hasGroupUpdate {
count, err := s.db.FindGroupMemberNum(ctx, group.GroupID)
if err != nil {
return nil, err
}
owner, err := s.db.TakeGroupOwner(ctx, group.GroupID)
if err != nil {
return nil, err
}
resp.Group = s.groupDB2PB(group, owner.UserID, count)
}
return resp, nil
} }
func (s *groupServer) GetIncrementalJoinGroup(ctx context.Context, req *pbgroup.GetIncrementalJoinGroupReq) (*pbgroup.GetIncrementalJoinGroupResp, error) { func (s *groupServer) GetIncrementalJoinGroup(ctx context.Context, req *pbgroup.GetIncrementalJoinGroupReq) (*pbgroup.GetIncrementalJoinGroupResp, error) {

@ -16,10 +16,7 @@ package user
import ( import (
"context" "context"
"math/rand" "errors"
"strings"
"time"
"github.com/openimsdk/open-im-server/v3/internal/rpc/friend" "github.com/openimsdk/open-im-server/v3/internal/rpc/friend"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
@ -27,7 +24,13 @@ import (
tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/localcache" "github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/protocol/group"
friendpb "github.com/openimsdk/protocol/relation"
"github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/db/redisutil"
"math/rand"
"strings"
"sync"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/convert"
@ -120,7 +123,8 @@ func (s *userServer) GetDesignateUsers(ctx context.Context, req *pbuser.GetDesig
// deprecated: // deprecated:
// UpdateUserInfo //UpdateUserInfo
func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserInfoReq) (resp *pbuser.UpdateUserInfoResp, err error) { func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserInfoReq) (resp *pbuser.UpdateUserInfoResp, err error) {
resp = &pbuser.UpdateUserInfoResp{} resp = &pbuser.UpdateUserInfoResp{}
err = authverify.CheckAccessV3(ctx, req.UserInfo.UserID, s.config.Share.IMAdminUserID) err = authverify.CheckAccessV3(ctx, req.UserInfo.UserID, s.config.Share.IMAdminUserID)
@ -131,31 +135,33 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI
if err := s.webhookBeforeUpdateUserInfo(ctx, &s.config.WebhooksConfig.BeforeUpdateUserInfo, req); err != nil { if err := s.webhookBeforeUpdateUserInfo(ctx, &s.config.WebhooksConfig.BeforeUpdateUserInfo, req); err != nil {
return nil, err return nil, err
} }
data := convert.UserPb2DBMap(req.UserInfo) data := convert.UserPb2DBMap(req.UserInfo)
if err := s.db.UpdateByMap(ctx, req.UserInfo.UserID, data); err != nil { oldUser, err := s.db.GetUserByID(ctx, req.UserInfo.UserID)
return nil, err
}
s.friendNotificationSender.UserInfoUpdatedNotification(ctx, req.UserInfo.UserID)
friends, err := s.friendRpcClient.GetFriendIDs(ctx, req.UserInfo.UserID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if req.UserInfo.Nickname != "" || req.UserInfo.FaceURL != "" { if err := s.db.UpdateByMap(ctx, req.UserInfo.UserID, data); err != nil {
if err = s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil { return nil, err
return nil, err
}
}
for _, friendID := range friends {
s.friendNotificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, friendID)
} }
s.friendNotificationSender.UserInfoUpdatedNotification(ctx, req.UserInfo.UserID)
//friends, err := s.friendRpcClient.GetFriendIDs(ctx, req.UserInfo.UserID)
//if err != nil {
// return nil, err
//}
//if req.UserInfo.Nickname != "" || req.UserInfo.FaceURL != "" {
// if err = s.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID,oldUser); err != nil {
// return nil, err
// }
//}
//for _, friendID := range friends {
// s.friendNotificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, friendID)
//}
s.webhookAfterUpdateUserInfo(ctx, &s.config.WebhooksConfig.AfterUpdateUserInfo, req) s.webhookAfterUpdateUserInfo(ctx, &s.config.WebhooksConfig.AfterUpdateUserInfo, req)
if err = s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil { if err = s.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID, oldUser); err != nil {
return nil, err return nil, err
} }
return resp, nil return resp, nil
} }
func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUserInfoExReq) (resp *pbuser.UpdateUserInfoExResp, err error) { func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUserInfoExReq) (resp *pbuser.UpdateUserInfoExResp, err error) {
resp = &pbuser.UpdateUserInfoExResp{} resp = &pbuser.UpdateUserInfoExResp{}
err = authverify.CheckAccessV3(ctx, req.UserInfo.UserID, s.config.Share.IMAdminUserID) err = authverify.CheckAccessV3(ctx, req.UserInfo.UserID, s.config.Share.IMAdminUserID)
@ -165,30 +171,33 @@ func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUse
if err = s.webhookBeforeUpdateUserInfoEx(ctx, &s.config.WebhooksConfig.BeforeUpdateUserInfoEx, req); err != nil { if err = s.webhookBeforeUpdateUserInfoEx(ctx, &s.config.WebhooksConfig.BeforeUpdateUserInfoEx, req); err != nil {
return nil, err return nil, err
} }
oldUser, err := s.db.GetUserByID(ctx, req.UserInfo.UserID)
if err != nil {
return nil, err
}
data := convert.UserPb2DBMapEx(req.UserInfo) data := convert.UserPb2DBMapEx(req.UserInfo)
if err = s.db.UpdateByMap(ctx, req.UserInfo.UserID, data); err != nil { if err = s.db.UpdateByMap(ctx, req.UserInfo.UserID, data); err != nil {
return nil, err return nil, err
} }
s.friendNotificationSender.UserInfoUpdatedNotification(ctx, req.UserInfo.UserID) s.friendNotificationSender.UserInfoUpdatedNotification(ctx, req.UserInfo.UserID)
friends, err := s.friendRpcClient.GetFriendIDs(ctx, req.UserInfo.UserID) //friends, err := s.friendRpcClient.GetFriendIDs(ctx, req.UserInfo.UserID)
if err != nil { //if err != nil {
return nil, err // return nil, err
} //}
if req.UserInfo.Nickname != nil || req.UserInfo.FaceURL != nil { //if req.UserInfo.Nickname != nil || req.UserInfo.FaceURL != nil {
if err := s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil { // if err := s.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil {
return nil, err // return nil, err
} // }
} //}
for _, friendID := range friends { //for _, friendID := range friends {
s.friendNotificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, friendID) // s.friendNotificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, friendID)
} //}
s.webhookAfterUpdateUserInfoEx(ctx, &s.config.WebhooksConfig.AfterUpdateUserInfoEx, req) s.webhookAfterUpdateUserInfoEx(ctx, &s.config.WebhooksConfig.AfterUpdateUserInfoEx, req)
if err := s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil { if err := s.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID, oldUser); err != nil {
return nil, err return nil, err
} }
return resp, nil return resp, nil
} }
func (s *userServer) SetGlobalRecvMessageOpt(ctx context.Context, req *pbuser.SetGlobalRecvMessageOptReq) (resp *pbuser.SetGlobalRecvMessageOptResp, err error) { func (s *userServer) SetGlobalRecvMessageOpt(ctx context.Context, req *pbuser.SetGlobalRecvMessageOptReq) (resp *pbuser.SetGlobalRecvMessageOptResp, err error) {
resp = &pbuser.SetGlobalRecvMessageOptResp{} resp = &pbuser.SetGlobalRecvMessageOptResp{}
if _, err := s.db.FindWithError(ctx, []string{req.UserID}); err != nil { if _, err := s.db.FindWithError(ctx, []string{req.UserID}); err != nil {
@ -247,6 +256,7 @@ func (s *userServer) GetPaginationUsers(ctx context.Context, req *pbuser.GetPagi
return &pbuser.GetPaginationUsersResp{Total: int32(total), Users: convert.UsersDB2Pb(users)}, err return &pbuser.GetPaginationUsersResp{Total: int32(total), Users: convert.UsersDB2Pb(users)}, err
} }
} }
func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterReq) (resp *pbuser.UserRegisterResp, err error) { func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterReq) (resp *pbuser.UserRegisterResp, err error) {
@ -343,8 +353,7 @@ func (s *userServer) SubscribeOrCancelUsersStatus(ctx context.Context, req *pbus
// GetUserStatus Get the online status of the user. // GetUserStatus Get the online status of the user.
func (s *userServer) GetUserStatus(ctx context.Context, req *pbuser.GetUserStatusReq) (resp *pbuser.GetUserStatusResp, func (s *userServer) GetUserStatus(ctx context.Context, req *pbuser.GetUserStatusReq) (resp *pbuser.GetUserStatusResp,
err error, err error) {
) {
onlineStatusList, err := s.db.GetUserStatus(ctx, req.UserIDs) onlineStatusList, err := s.db.GetUserStatus(ctx, req.UserIDs)
if err != nil { if err != nil {
return nil, err return nil, err
@ -354,8 +363,7 @@ func (s *userServer) GetUserStatus(ctx context.Context, req *pbuser.GetUserStatu
// SetUserStatus Synchronize user's online status. // SetUserStatus Synchronize user's online status.
func (s *userServer) SetUserStatus(ctx context.Context, req *pbuser.SetUserStatusReq) (resp *pbuser.SetUserStatusResp, func (s *userServer) SetUserStatus(ctx context.Context, req *pbuser.SetUserStatusReq) (resp *pbuser.SetUserStatusResp,
err error, err error) {
) {
err = s.db.SetUserStatus(ctx, req.UserID, req.Status, req.PlatformID) err = s.db.SetUserStatus(ctx, req.UserID, req.Status, req.PlatformID)
if err != nil { if err != nil {
return nil, err return nil, err
@ -379,8 +387,7 @@ func (s *userServer) SetUserStatus(ctx context.Context, req *pbuser.SetUserStatu
// GetSubscribeUsersStatus Get the online status of subscribers. // GetSubscribeUsersStatus Get the online status of subscribers.
func (s *userServer) GetSubscribeUsersStatus(ctx context.Context, func (s *userServer) GetSubscribeUsersStatus(ctx context.Context,
req *pbuser.GetSubscribeUsersStatusReq, req *pbuser.GetSubscribeUsersStatusReq) (*pbuser.GetSubscribeUsersStatusResp, error) {
) (*pbuser.GetSubscribeUsersStatusResp, error) {
userList, err := s.db.GetAllSubscribeList(ctx, req.UserID) userList, err := s.db.GetAllSubscribeList(ctx, req.UserID)
if err != nil { if err != nil {
return nil, err return nil, err
@ -469,6 +476,7 @@ func (s *userServer) ProcessUserCommandUpdate(ctx context.Context, req *pbuser.P
} }
func (s *userServer) ProcessUserCommandGet(ctx context.Context, req *pbuser.ProcessUserCommandGetReq) (*pbuser.ProcessUserCommandGetResp, error) { func (s *userServer) ProcessUserCommandGet(ctx context.Context, req *pbuser.ProcessUserCommandGetReq) (*pbuser.ProcessUserCommandGetResp, error) {
err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID) err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID)
if err != nil { if err != nil {
return nil, err return nil, err
@ -687,6 +695,40 @@ func (s *userServer) userModelToResp(users []*tablerelation.User, pagination pag
return &pbuser.SearchNotificationAccountResp{Total: total, NotificationAccounts: notificationAccounts} return &pbuser.SearchNotificationAccountResp{Total: total, NotificationAccounts: notificationAccounts}
} }
func (s *userServer) NotificationUserInfoUpdate(ctx context.Context, userID string, oldUser *tablerelation.User) error {
user, err := s.db.GetUserByID(ctx, userID)
if err != nil {
return err
}
if user.Nickname == oldUser.Nickname && user.FaceURL == oldUser.FaceURL {
return nil
}
oldUserInfo := convert.UserDB2Pb(oldUser)
newUserInfo := convert.UserDB2Pb(user)
var wg sync.WaitGroup
var es [2]error
wg.Add(len(es))
go func() {
defer wg.Done()
_, es[0] = s.groupRpcClient.Client.NotificationUserInfoUpdate(ctx, &group.NotificationUserInfoUpdateReq{
UserID: userID,
OldUserInfo: oldUserInfo,
NewUserInfo: newUserInfo,
})
}()
go func() {
defer wg.Done()
_, es[1] = s.friendRpcClient.Client.NotificationUserInfoUpdate(ctx, &friendpb.NotificationUserInfoUpdateReq{
UserID: userID,
OldUserInfo: oldUserInfo,
NewUserInfo: newUserInfo,
})
}()
wg.Wait()
return errors.Join(es[:]...)
}
func (s *userServer) SortQuery(ctx context.Context, req *pbuser.SortQueryReq) (*pbuser.SortQueryResp, error) { func (s *userServer) SortQuery(ctx context.Context, req *pbuser.SortQueryReq) (*pbuser.SortQueryResp, error) {
users, err := s.db.SortQuery(ctx, req.UserIDName, req.Asc) users, err := s.db.SortQuery(ctx, req.UserIDName, req.Asc)
if err != nil { if err != nil {

@ -446,7 +446,7 @@ func (g *GroupCacheRedis) DelMaxJoinGroupVersion(userIDs ...string) cache.GroupC
func (g *GroupCacheRedis) FindMaxGroupMemberVersion(ctx context.Context, groupID string) (*model.VersionLog, error) { func (g *GroupCacheRedis) FindMaxGroupMemberVersion(ctx context.Context, groupID string) (*model.VersionLog, error) {
return getCache(ctx, g.rcClient, g.getGroupMemberMaxVersionKey(groupID), g.expireTime, func(ctx context.Context) (*model.VersionLog, error) { return getCache(ctx, g.rcClient, g.getGroupMemberMaxVersionKey(groupID), g.expireTime, func(ctx context.Context) (*model.VersionLog, error) {
return g.groupMemberDB.FindJoinIncrVersion(ctx, groupID, 0, 0) return g.groupMemberDB.FindMemberIncrVersion(ctx, groupID, 0, 0)
}) })
} }

@ -86,7 +86,7 @@ type FriendDatabase interface {
FindFriendUserID(ctx context.Context, friendUserID string) ([]string, error) FindFriendUserID(ctx context.Context, friendUserID string) ([]string, error)
//SearchFriend(ctx context.Context, ownerUserID, keyword string, pagination pagination.Pagination) (int64, []*model.Friend, error) OwnerIncrVersion(ctx context.Context, ownerUserID string, friendUserIDs []string, state int32) error
} }
type friendDatabase struct { type friendDatabase struct {
@ -376,3 +376,10 @@ func (f *friendDatabase) FindFriendUserID(ctx context.Context, friendUserID stri
//func (f *friendDatabase) SearchFriend(ctx context.Context, ownerUserID, keyword string, pagination pagination.Pagination) (int64, []*model.Friend, error) { //func (f *friendDatabase) SearchFriend(ctx context.Context, ownerUserID, keyword string, pagination pagination.Pagination) (int64, []*model.Friend, error) {
// return f.friend.SearchFriend(ctx, ownerUserID, keyword, pagination) // return f.friend.SearchFriend(ctx, ownerUserID, keyword, pagination)
//} //}
func (f *friendDatabase) OwnerIncrVersion(ctx context.Context, ownerUserID string, friendUserIDs []string, state int32) error {
if err := f.friend.IncrVersion(ctx, ownerUserID, friendUserIDs, state); err != nil {
return err
}
return f.cache.DelMaxFriendVersion(ownerUserID).ChainExecDel(ctx)
}

@ -109,6 +109,7 @@ type GroupDatabase interface {
FindMemberIncrVersion(ctx context.Context, groupID string, version uint, limit int) (*model.VersionLog, error) FindMemberIncrVersion(ctx context.Context, groupID string, version uint, limit int) (*model.VersionLog, error)
FindJoinIncrVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) FindJoinIncrVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error)
MemberGroupIncrVersion(ctx context.Context, groupID string, userIDs []string, state int32) error
//FindSortGroupMemberUserIDs(ctx context.Context, groupID string) ([]string, error) //FindSortGroupMemberUserIDs(ctx context.Context, groupID string) ([]string, error)
//FindSortJoinGroupIDs(ctx context.Context, userID string) ([]string, error) //FindSortJoinGroupIDs(ctx context.Context, userID string) ([]string, error)
@ -243,17 +244,10 @@ func (g *groupDatabase) UpdateGroup(ctx context.Context, groupID string, data ma
if err := g.groupDB.UpdateMap(ctx, groupID, data); err != nil { if err := g.groupDB.UpdateMap(ctx, groupID, data); err != nil {
return err return err
} }
userIDs, err := g.cache.GetGroupMemberIDs(ctx, groupID) if err := g.groupMemberDB.MemberGroupIncrVersion(ctx, groupID, []string{""}, model.VersionStateUpdate); err != nil {
if err != nil {
return err return err
} }
for _, userID := range userIDs { return g.cache.CloneGroupCache().DelGroupsInfo(groupID).DelMaxGroupMemberVersion(groupID).ChainExecDel(ctx)
if err := g.groupMemberDB.JoinGroupIncrVersion(ctx, userID, []string{groupID}, model.VersionStateUpdate); err != nil {
return err
}
}
return g.cache.CloneGroupCache().DelGroupsInfo(groupID).DelMaxJoinGroupVersion(userIDs...).ChainExecDel(ctx)
}) })
} }
@ -263,11 +257,11 @@ func (g *groupDatabase) DismissGroup(ctx context.Context, groupID string, delete
if err := g.groupDB.UpdateStatus(ctx, groupID, constant.GroupStatusDismissed); err != nil { if err := g.groupDB.UpdateStatus(ctx, groupID, constant.GroupStatusDismissed); err != nil {
return err return err
} }
userIDs, err := g.cache.GetGroupMemberIDs(ctx, groupID)
if err != nil {
return err
}
if deleteMember { if deleteMember {
userIDs, err := g.cache.GetGroupMemberIDs(ctx, groupID)
if err != nil {
return err
}
if err := g.groupMemberDB.Delete(ctx, groupID, nil); err != nil { if err := g.groupMemberDB.Delete(ctx, groupID, nil); err != nil {
return err return err
} }
@ -277,13 +271,18 @@ func (g *groupDatabase) DismissGroup(ctx context.Context, groupID string, delete
DelGroupMembersHash(groupID). DelGroupMembersHash(groupID).
DelGroupAllRoleLevel(groupID). DelGroupAllRoleLevel(groupID).
DelGroupMembersInfo(groupID, userIDs...). DelGroupMembersInfo(groupID, userIDs...).
DelMaxGroupMemberVersion(groupID) DelMaxGroupMemberVersion(groupID).
} DelMaxJoinGroupVersion(userIDs...)
c = c.DelMaxJoinGroupVersion(userIDs...) for _, userID := range userIDs {
if len(userIDs) > 0 { if err := g.groupMemberDB.JoinGroupIncrVersion(ctx, userID, []string{groupID}, model.VersionStateDelete); err != nil {
if err := g.groupMemberDB.JoinGroupIncrVersion(ctx, groupID, userIDs, model.VersionStateDelete); err != nil { return err
}
}
} else {
if err := g.groupMemberDB.MemberGroupIncrVersion(ctx, groupID, []string{""}, model.VersionStateUpdate); err != nil {
return err return err
} }
c = c.DelMaxGroupMemberVersion(groupID)
} }
return c.DelGroupsInfo(groupID).ChainExecDel(ctx) return c.DelGroupsInfo(groupID).ChainExecDel(ctx)
}) })
@ -401,22 +400,23 @@ func (g *groupDatabase) MapGroupMemberNum(ctx context.Context, groupIDs []string
func (g *groupDatabase) TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error { func (g *groupDatabase) TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error {
return g.ctxTx.Transaction(ctx, func(ctx context.Context) error { return g.ctxTx.Transaction(ctx, func(ctx context.Context) error {
if err := g.groupMemberDB.UpdateRoleLevel(ctx, groupID, oldOwnerUserID, roleLevel); err != nil { if err := g.groupMemberDB.UpdateUserRoleLevels(ctx, groupID, oldOwnerUserID, roleLevel, newOwnerUserID, constant.GroupOwner); err != nil {
return err
}
if err := g.groupMemberDB.UpdateRoleLevel(ctx, groupID, newOwnerUserID, constant.GroupOwner); err != nil {
return err return err
} }
c := g.cache.CloneGroupCache() c := g.cache.CloneGroupCache()
return c.DelGroupMembersInfo(groupID, oldOwnerUserID, newOwnerUserID). return c.DelGroupMembersInfo(groupID, oldOwnerUserID, newOwnerUserID).
DelGroupAllRoleLevel(groupID). DelGroupAllRoleLevel(groupID).
DelGroupMembersHash(groupID). DelGroupMembersHash(groupID).
DelJoinedGroupID(oldOwnerUserID, newOwnerUserID). DelMaxGroupMemberVersion(groupID).
DelGroupMemberIDs(groupID).
ChainExecDel(ctx) ChainExecDel(ctx)
}) })
} }
func (g *groupDatabase) UpdateGroupMember(ctx context.Context, groupID string, userID string, data map[string]any) error { func (g *groupDatabase) UpdateGroupMember(ctx context.Context, groupID string, userID string, data map[string]any) error {
if len(data) == 0 {
return nil
}
return g.ctxTx.Transaction(ctx, func(ctx context.Context) error { return g.ctxTx.Transaction(ctx, func(ctx context.Context) error {
if err := g.groupMemberDB.Update(ctx, groupID, userID, data); err != nil { if err := g.groupMemberDB.Update(ctx, groupID, userID, data); err != nil {
return err return err
@ -424,9 +424,9 @@ func (g *groupDatabase) UpdateGroupMember(ctx context.Context, groupID string, u
c := g.cache.CloneGroupCache() c := g.cache.CloneGroupCache()
c = c.DelGroupMembersInfo(groupID, userID) c = c.DelGroupMembersInfo(groupID, userID)
if g.groupMemberDB.IsUpdateRoleLevel(data) { if g.groupMemberDB.IsUpdateRoleLevel(data) {
c = c.DelGroupAllRoleLevel(groupID) c = c.DelGroupAllRoleLevel(groupID).DelGroupMemberIDs(groupID)
} }
c = c.DelMaxGroupMemberVersion(groupID).DelMaxJoinGroupVersion(userID) c = c.DelMaxGroupMemberVersion(groupID)
return c.ChainExecDel(ctx) return c.ChainExecDel(ctx)
}) })
} }
@ -439,9 +439,9 @@ func (g *groupDatabase) UpdateGroupMembers(ctx context.Context, data []*common.B
return err return err
} }
if g.groupMemberDB.IsUpdateRoleLevel(item.Map) { if g.groupMemberDB.IsUpdateRoleLevel(item.Map) {
c = c.DelGroupAllRoleLevel(item.GroupID) c = c.DelGroupAllRoleLevel(item.GroupID).DelGroupMemberIDs(item.GroupID)
} }
c = c.DelGroupMembersInfo(item.GroupID, item.UserID).DelGroupMembersHash(item.GroupID) c = c.DelGroupMembersInfo(item.GroupID, item.UserID).DelMaxGroupMemberVersion(item.GroupID).DelGroupMembersHash(item.GroupID)
} }
return c.ChainExecDel(ctx) return c.ChainExecDel(ctx)
}) })
@ -501,14 +501,6 @@ func (g *groupDatabase) FindJoinIncrVersion(ctx context.Context, userID string,
return g.groupMemberDB.FindJoinIncrVersion(ctx, userID, version, limit) return g.groupMemberDB.FindJoinIncrVersion(ctx, userID, version, limit)
} }
//func (g *groupDatabase) FindSortGroupMemberUserIDs(ctx context.Context, groupID string) ([]string, error) {
// return g.cache.FindSortGroupMemberUserIDs(ctx, groupID)
//}
//
//func (g *groupDatabase) FindSortJoinGroupIDs(ctx context.Context, userID string) ([]string, error) {
// return g.cache.FindSortJoinGroupIDs(ctx, userID)
//}
func (g *groupDatabase) FindMaxGroupMemberVersionCache(ctx context.Context, groupID string) (*model.VersionLog, error) { func (g *groupDatabase) FindMaxGroupMemberVersionCache(ctx context.Context, groupID string) (*model.VersionLog, error) {
return g.cache.FindMaxGroupMemberVersion(ctx, groupID) return g.cache.FindMaxGroupMemberVersion(ctx, groupID)
} }
@ -524,3 +516,10 @@ func (g *groupDatabase) SearchJoinGroup(ctx context.Context, userID string, keyw
} }
return g.groupDB.SearchJoin(ctx, groupIDs, keyword, pagination) return g.groupDB.SearchJoin(ctx, groupIDs, keyword, pagination)
} }
func (g *groupDatabase) MemberGroupIncrVersion(ctx context.Context, groupID string, userIDs []string, state int32) error {
if err := g.groupMemberDB.MemberGroupIncrVersion(ctx, groupID, userIDs, state); err != nil {
return err
}
return g.cache.DelMaxGroupMemberVersion(groupID).ChainExecDel(ctx)
}

@ -55,4 +55,6 @@ type Friend interface {
//SearchFriend(ctx context.Context, ownerUserID, keyword string, pagination pagination.Pagination) (int64, []*model.Friend, error) //SearchFriend(ctx context.Context, ownerUserID, keyword string, pagination pagination.Pagination) (int64, []*model.Friend, error)
FindOwnerFriendUserIds(ctx context.Context, ownerUserID string, limit int) ([]string, error) FindOwnerFriendUserIds(ctx context.Context, ownerUserID string, limit int) ([]string, error)
IncrVersion(ctx context.Context, ownerUserID string, friendUserIDs []string, state int32) error
} }

@ -25,6 +25,7 @@ type GroupMember interface {
Delete(ctx context.Context, groupID string, userIDs []string) (err error) Delete(ctx context.Context, groupID string, userIDs []string) (err error)
Update(ctx context.Context, groupID string, userID string, data map[string]any) (err error) Update(ctx context.Context, groupID string, userID string, data map[string]any) (err error)
UpdateRoleLevel(ctx context.Context, groupID string, userID string, roleLevel int32) error UpdateRoleLevel(ctx context.Context, groupID string, userID string, roleLevel int32) error
UpdateUserRoleLevels(ctx context.Context, groupID string, firstUserID string, firstUserRoleLevel int32, secondUserID string, secondUserRoleLevel int32) error
FindMemberUserID(ctx context.Context, groupID string) (userIDs []string, err error) FindMemberUserID(ctx context.Context, groupID string) (userIDs []string, err error)
Take(ctx context.Context, groupID string, userID string) (groupMember *model.GroupMember, err error) Take(ctx context.Context, groupID string, userID string) (groupMember *model.GroupMember, err error)
TakeOwner(ctx context.Context, groupID string) (groupMember *model.GroupMember, err error) TakeOwner(ctx context.Context, groupID string) (groupMember *model.GroupMember, err error)
@ -35,6 +36,7 @@ type GroupMember interface {
FindUserManagedGroupID(ctx context.Context, userID string) (groupIDs []string, err error) FindUserManagedGroupID(ctx context.Context, userID string) (groupIDs []string, err error)
IsUpdateRoleLevel(data map[string]any) bool IsUpdateRoleLevel(data map[string]any) bool
JoinGroupIncrVersion(ctx context.Context, userID string, groupIDs []string, state int32) error JoinGroupIncrVersion(ctx context.Context, userID string, groupIDs []string, state int32) error
MemberGroupIncrVersion(ctx context.Context, groupID string, userIDs []string, state int32) error
FindMemberIncrVersion(ctx context.Context, groupID string, version uint, limit int) (*model.VersionLog, error) FindMemberIncrVersion(ctx context.Context, groupID string, version uint, limit int) (*model.VersionLog, error)
FindJoinIncrVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) FindJoinIncrVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error)
} }

@ -18,6 +18,8 @@ import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"go.mongodb.org/mongo-driver/bson/primitive"
"time"
"github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/pagination" "github.com/openimsdk/tools/db/pagination"
@ -53,11 +55,19 @@ func NewFriendMongo(db *mongo.Database) (database.Friend, error) {
} }
func (f *FriendMgo) friendSort() any { func (f *FriendMgo) friendSort() any {
return bson.D{{"is_pinned", -1}, {"create_time", 1}} return bson.D{{"is_pinned", -1}, {"_id", 1}}
} }
// Create inserts multiple friend records. // Create inserts multiple friend records.
func (f *FriendMgo) Create(ctx context.Context, friends []*model.Friend) error { func (f *FriendMgo) Create(ctx context.Context, friends []*model.Friend) error {
for i, friend := range friends {
if friend.ID.IsZero() {
friends[i].ID = primitive.NewObjectID()
}
if friend.CreateTime.IsZero() {
friends[i].CreateTime = time.Now()
}
}
return mongoutil.IncrVersion(func() error { return mongoutil.IncrVersion(func() error {
return mongoutil.InsertMany(ctx, f.coll, friends) return mongoutil.InsertMany(ctx, f.coll, friends)
}, func() error { }, func() error {
@ -108,13 +118,43 @@ func (f *FriendMgo) UpdateRemark(ctx context.Context, ownerUserID, friendUserID,
return f.UpdateByMap(ctx, ownerUserID, friendUserID, map[string]any{"remark": remark}) return f.UpdateByMap(ctx, ownerUserID, friendUserID, map[string]any{"remark": remark})
} }
func (f *FriendMgo) fillTime(friends ...*model.Friend) {
for i, friend := range friends {
if friend.CreateTime.IsZero() {
friends[i].CreateTime = friend.ID.Timestamp()
}
}
}
func (f *FriendMgo) findOne(ctx context.Context, filter any) (*model.Friend, error) {
friend, err := mongoutil.FindOne[*model.Friend](ctx, f.coll, filter)
if err != nil {
return nil, err
}
f.fillTime(friend)
return friend, nil
}
func (f *FriendMgo) find(ctx context.Context, filter any) ([]*model.Friend, error) {
friends, err := mongoutil.Find[*model.Friend](ctx, f.coll, filter)
if err != nil {
return nil, err
}
f.fillTime(friends...)
return friends, nil
}
func (f *FriendMgo) findPage(ctx context.Context, filter any, pagination pagination.Pagination, opts ...*options.FindOptions) (int64, []*model.Friend, error) {
return mongoutil.FindPage[*model.Friend](ctx, f.coll, filter, pagination, opts...)
}
// Take retrieves a single friend document. Returns an error if not found. // Take retrieves a single friend document. Returns an error if not found.
func (f *FriendMgo) Take(ctx context.Context, ownerUserID, friendUserID string) (*model.Friend, error) { func (f *FriendMgo) Take(ctx context.Context, ownerUserID, friendUserID string) (*model.Friend, error) {
filter := bson.M{ filter := bson.M{
"owner_user_id": ownerUserID, "owner_user_id": ownerUserID,
"friend_user_id": friendUserID, "friend_user_id": friendUserID,
} }
return mongoutil.FindOne[*model.Friend](ctx, f.coll, filter) return f.findOne(ctx, filter)
} }
// FindUserState finds the friendship status between two users. // FindUserState finds the friendship status between two users.
@ -125,7 +165,7 @@ func (f *FriendMgo) FindUserState(ctx context.Context, userID1, userID2 string)
{"owner_user_id": userID2, "friend_user_id": userID1}, {"owner_user_id": userID2, "friend_user_id": userID1},
}, },
} }
return mongoutil.Find[*model.Friend](ctx, f.coll, filter) return f.find(ctx, filter)
} }
// FindFriends retrieves a list of friends for a given owner. Missing friends do not cause an error. // FindFriends retrieves a list of friends for a given owner. Missing friends do not cause an error.
@ -134,7 +174,7 @@ func (f *FriendMgo) FindFriends(ctx context.Context, ownerUserID string, friendU
"owner_user_id": ownerUserID, "owner_user_id": ownerUserID,
"friend_user_id": bson.M{"$in": friendUserIDs}, "friend_user_id": bson.M{"$in": friendUserIDs},
} }
return mongoutil.Find[*model.Friend](ctx, f.coll, filter) return f.find(ctx, filter)
} }
// FindReversalFriends finds users who have added the specified user as a friend. // FindReversalFriends finds users who have added the specified user as a friend.
@ -143,14 +183,14 @@ func (f *FriendMgo) FindReversalFriends(ctx context.Context, friendUserID string
"owner_user_id": bson.M{"$in": ownerUserIDs}, "owner_user_id": bson.M{"$in": ownerUserIDs},
"friend_user_id": friendUserID, "friend_user_id": friendUserID,
} }
return mongoutil.Find[*model.Friend](ctx, f.coll, filter) return f.find(ctx, filter)
} }
// FindOwnerFriends retrieves a paginated list of friends for a given owner. // FindOwnerFriends retrieves a paginated list of friends for a given owner.
func (f *FriendMgo) FindOwnerFriends(ctx context.Context, ownerUserID string, pagination pagination.Pagination) (int64, []*model.Friend, error) { func (f *FriendMgo) FindOwnerFriends(ctx context.Context, ownerUserID string, pagination pagination.Pagination) (int64, []*model.Friend, error) {
filter := bson.M{"owner_user_id": ownerUserID} filter := bson.M{"owner_user_id": ownerUserID}
opt := options.Find().SetSort(f.friendSort()) opt := options.Find().SetSort(f.friendSort())
return mongoutil.FindPage[*model.Friend](ctx, f.coll, filter, pagination, opt) return f.findPage(ctx, filter, pagination, opt)
} }
func (f *FriendMgo) FindOwnerFriendUserIds(ctx context.Context, ownerUserID string, limit int) ([]string, error) { func (f *FriendMgo) FindOwnerFriendUserIds(ctx context.Context, ownerUserID string, limit int) ([]string, error) {
@ -162,7 +202,8 @@ func (f *FriendMgo) FindOwnerFriendUserIds(ctx context.Context, ownerUserID stri
// FindInWhoseFriends finds users who have added the specified user as a friend, with pagination. // FindInWhoseFriends finds users who have added the specified user as a friend, with pagination.
func (f *FriendMgo) FindInWhoseFriends(ctx context.Context, friendUserID string, pagination pagination.Pagination) (int64, []*model.Friend, error) { func (f *FriendMgo) FindInWhoseFriends(ctx context.Context, friendUserID string, pagination pagination.Pagination) (int64, []*model.Friend, error) {
filter := bson.M{"friend_user_id": friendUserID} filter := bson.M{"friend_user_id": friendUserID}
return mongoutil.FindPage[*model.Friend](ctx, f.coll, filter, pagination) opt := options.Find().SetSort(f.friendSort())
return f.findPage(ctx, filter, pagination, opt)
} }
// FindFriendUserIDs retrieves a list of friend user IDs for a given owner. // FindFriendUserIDs retrieves a list of friend user IDs for a given owner.
@ -204,17 +245,6 @@ func (f *FriendMgo) FindFriendUserID(ctx context.Context, friendUserID string) (
return mongoutil.Find[string](ctx, f.coll, filter, options.Find().SetProjection(bson.M{"_id": 0, "owner_user_id": 1}).SetSort(f.friendSort())) return mongoutil.Find[string](ctx, f.coll, filter, options.Find().SetProjection(bson.M{"_id": 0, "owner_user_id": 1}).SetSort(f.friendSort()))
} }
//func (f *FriendMgo) SearchFriend(ctx context.Context, ownerUserID, keyword string, pagination pagination.Pagination) (int64, []*model.Friend, error) { func (f *FriendMgo) IncrVersion(ctx context.Context, ownerUserID string, friendUserIDs []string, state int32) error {
// filter := bson.M{ return f.owner.IncrVersion(ctx, ownerUserID, friendUserIDs, state)
// "owner_user_id": ownerUserID, }
// }
// if keyword != "" {
// filter["$or"] = []bson.M{
// {"remark": bson.M{"$regex": keyword, "$options": "i"}},
// {"nickname": bson.M{"$regex": keyword, "$options": "i"}},
// {"friend_user_id": bson.M{"$regex": keyword, "$options": "i"}},
// }
// }
// opt := options.Find().SetSort(f.friendSort())
// return mongoutil.FindPage[*model.Friend](ctx, f.coll, filter, pagination, opt)
//}

@ -18,6 +18,7 @@ import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/mongoutil"
@ -114,11 +115,26 @@ func (g *GroupMemberMgo) Delete(ctx context.Context, groupID string, userIDs []s
func (g *GroupMemberMgo) UpdateRoleLevel(ctx context.Context, groupID string, userID string, roleLevel int32) error { func (g *GroupMemberMgo) UpdateRoleLevel(ctx context.Context, groupID string, userID string, roleLevel int32) error {
return mongoutil.IncrVersion(func() error { return mongoutil.IncrVersion(func() error {
return g.Update(ctx, groupID, userID, bson.M{"role_level": roleLevel}) return mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": userID},
bson.M{"$set": bson.M{"role_level": roleLevel}}, true)
}, func() error { }, func() error {
return g.member.IncrVersion(ctx, groupID, []string{userID}, model.VersionStateUpdate) return g.member.IncrVersion(ctx, groupID, []string{userID}, model.VersionStateUpdate)
})
}
func (g *GroupMemberMgo) UpdateUserRoleLevels(ctx context.Context, groupID string, firstUserID string, firstUserRoleLevel int32, secondUserID string, secondUserRoleLevel int32) error {
return mongoutil.IncrVersion(func() error {
if err := mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": firstUserID},
bson.M{"$set": bson.M{"role_level": firstUserRoleLevel}}, true); err != nil {
return err
}
if err := mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": secondUserID},
bson.M{"$set": bson.M{"role_level": secondUserRoleLevel}}, true); err != nil {
return err
}
return nil
}, func() error { }, func() error {
return g.join.IncrVersion(ctx, userID, []string{groupID}, model.VersionStateUpdate) return g.member.IncrVersion(ctx, groupID, []string{firstUserID, secondUserID}, model.VersionStateUpdate)
}) })
} }
@ -184,10 +200,16 @@ func (g *GroupMemberMgo) JoinGroupIncrVersion(ctx context.Context, userID string
return g.join.IncrVersion(ctx, userID, groupIDs, state) return g.join.IncrVersion(ctx, userID, groupIDs, state)
} }
func (g *GroupMemberMgo) MemberGroupIncrVersion(ctx context.Context, groupID string, userIDs []string, state int32) error {
return g.member.IncrVersion(ctx, groupID, userIDs, state)
}
func (g *GroupMemberMgo) FindMemberIncrVersion(ctx context.Context, groupID string, version uint, limit int) (*model.VersionLog, error) { func (g *GroupMemberMgo) FindMemberIncrVersion(ctx context.Context, groupID string, version uint, limit int) (*model.VersionLog, error) {
log.ZDebug(ctx, "find member incr version", "groupID", groupID, "version", version)
return g.member.FindChangeLog(ctx, groupID, version, limit) return g.member.FindChangeLog(ctx, groupID, version, limit)
} }
func (g *GroupMemberMgo) FindJoinIncrVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) { func (g *GroupMemberMgo) FindJoinIncrVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) {
log.ZDebug(ctx, "find join incr version", "userID", userID, "version", version)
return g.join.FindChangeLog(ctx, userID, version, limit) return g.join.FindChangeLog(ctx, userID, version, limit)
} }

@ -8,6 +8,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx"
"github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo"
@ -169,7 +170,9 @@ func (l *VersionLogMgo) FindChangeLog(ctx context.Context, dId string, version u
} else if !errors.Is(err, mongo.ErrNoDocuments) { } else if !errors.Is(err, mongo.ErrNoDocuments) {
return nil, err return nil, err
} }
log.ZDebug(ctx, "init doc", "dId", dId)
if res, err := l.initDoc(ctx, dId, nil, 0, time.Now()); err == nil { if res, err := l.initDoc(ctx, dId, nil, 0, time.Now()); err == nil {
log.ZDebug(ctx, "init doc success", "dId", dId)
return res, nil return res, nil
} else if mongo.IsDuplicateKeyError(err) { } else if mongo.IsDuplicateKeyError(err) {
return l.findChangeLog(ctx, dId, version, limit) return l.findChangeLog(ctx, dId, version, limit)

@ -15,17 +15,19 @@
package model package model
import ( import (
"go.mongodb.org/mongo-driver/bson/primitive"
"time" "time"
) )
// Friend represents the data structure for a friend relationship in MongoDB. // Friend represents the data structure for a friend relationship in MongoDB.
type Friend struct { type Friend struct {
OwnerUserID string `bson:"owner_user_id"` ID primitive.ObjectID `bson:"_id"`
FriendUserID string `bson:"friend_user_id"` OwnerUserID string `bson:"owner_user_id"`
Remark string `bson:"remark"` FriendUserID string `bson:"friend_user_id"`
CreateTime time.Time `bson:"create_time"` Remark string `bson:"remark"`
AddSource int32 `bson:"add_source"` CreateTime time.Time `bson:"create_time"`
OperatorUserID string `bson:"operator_user_id"` AddSource int32 `bson:"add_source"`
Ex string `bson:"ex"` OperatorUserID string `bson:"operator_user_id"`
IsPinned bool `bson:"is_pinned"` Ex string `bson:"ex"`
IsPinned bool `bson:"is_pinned"`
} }

Loading…
Cancel
Save