sort version

pull/2427/head
withchao 1 year ago
parent bfca6ced2a
commit ae06f16c5a

@ -12,7 +12,7 @@ require (
github.com/gorilla/websocket v1.5.1 github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0 github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.69-alpha.30 github.com/openimsdk/protocol v0.0.69-alpha.38
github.com/openimsdk/tools v0.0.49-alpha.51 github.com/openimsdk/tools v0.0.49-alpha.51
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_golang v1.18.0

@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.69-alpha.30 h1:OXzCIpDpIY/GI6h1SDYWN51OS9Xv/BcHaOwq8whPKqI= github.com/openimsdk/protocol v0.0.69-alpha.38 h1:kVZCHIXg/el8YJFoIBWhZu1sbbTUqmzgF4l0W3sUH24=
github.com/openimsdk/protocol v0.0.69-alpha.30/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/protocol v0.0.69-alpha.38/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.49-alpha.51 h1:JTPEetVSNOczw1n+XjiPozaH2SBPQAc+9VlPE41wEeY= github.com/openimsdk/tools v0.0.49-alpha.51 h1:JTPEetVSNOczw1n+XjiPozaH2SBPQAc+9VlPE41wEeY=
github.com/openimsdk/tools v0.0.49-alpha.51/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/openimsdk/tools v0.0.49-alpha.51/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=

@ -16,6 +16,8 @@ package friend
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
@ -191,10 +193,37 @@ func (f *FriendNotificationSender) FriendDeletedNotification(ctx context.Context
f.Notification(ctx, req.OwnerUserID, req.FriendUserID, constant.FriendDeletedNotification, &tips) f.Notification(ctx, req.OwnerUserID, req.FriendUserID, constant.FriendDeletedNotification, &tips)
} }
func (f *FriendNotificationSender) setVersion(ctx context.Context, version *uint64, versionID *string, collName string, id string) {
versions := versionctx.GetVersionLog(ctx).Get()
for _, coll := range versions {
if coll.Name == collName && coll.Doc.DID == id {
*version = uint64(coll.Doc.Version)
*versionID = coll.Doc.ID.Hex()
return
}
}
}
func (f *FriendNotificationSender) setSortVersion(ctx context.Context, version *uint64, versionID *string, collName string, id string, sortVersion *uint64) {
versions := versionctx.GetVersionLog(ctx).Get()
for _, coll := range versions {
if coll.Name == collName && coll.Doc.DID == id {
*version = uint64(coll.Doc.Version)
*versionID = coll.Doc.ID.Hex()
for _, elem := range coll.Doc.Logs {
if elem.EID == relationtb.VersionSortChangeID {
*sortVersion = uint64(elem.Version)
}
}
}
}
}
func (f *FriendNotificationSender) FriendRemarkSetNotification(ctx context.Context, fromUserID, toUserID string) { func (f *FriendNotificationSender) FriendRemarkSetNotification(ctx context.Context, fromUserID, toUserID string) {
tips := sdkws.FriendInfoChangedTips{FromToUserID: &sdkws.FromToUserID{}} tips := sdkws.FriendInfoChangedTips{FromToUserID: &sdkws.FromToUserID{}}
tips.FromToUserID.FromUserID = fromUserID tips.FromToUserID.FromUserID = fromUserID
tips.FromToUserID.ToUserID = toUserID tips.FromToUserID.ToUserID = toUserID
f.setSortVersion(ctx, &tips.FriendVersion, &tips.FriendVersionID, database.FriendVersionName, toUserID, &tips.FriendSortVersion)
f.Notification(ctx, fromUserID, toUserID, constant.FriendRemarkSetNotification, &tips) f.Notification(ctx, fromUserID, toUserID, constant.FriendRemarkSetNotification, &tips)
} }

@ -4,6 +4,7 @@ import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/util/hashutil" "github.com/openimsdk/open-im-server/v3/pkg/util/hashutil"
"github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/sdkws"
"slices"
"github.com/openimsdk/open-im-server/v3/internal/rpc/incrversion" "github.com/openimsdk/open-im-server/v3/internal/rpc/incrversion"
"github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/authverify"
@ -52,12 +53,27 @@ func (s *friendServer) GetIncrementalFriends(ctx context.Context, req *relation.
if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil { if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil {
return nil, err return nil, err
} }
var sortVersion uint64
opt := incrversion.Option[*sdkws.FriendInfo, relation.GetIncrementalFriendsResp]{ opt := incrversion.Option[*sdkws.FriendInfo, relation.GetIncrementalFriendsResp]{
Ctx: ctx, Ctx: ctx,
VersionKey: req.UserID, VersionKey: req.UserID,
VersionID: req.VersionID, VersionID: req.VersionID,
VersionNumber: req.Version, VersionNumber: req.Version,
Version: s.db.FindFriendIncrVersion, Version: func(ctx context.Context, ownerUserID string, version uint, limit int) (*model.VersionLog, error) {
vl, err := s.db.FindFriendIncrVersion(ctx, ownerUserID, version, limit)
if err != nil {
return nil, err
}
vl.Logs = slices.DeleteFunc(vl.Logs, func(elem model.VersionLogElem) bool {
if elem.EID == model.VersionSortChangeID {
vl.LogLen--
sortVersion = uint64(elem.Version)
return true
}
return false
})
return vl, nil
},
CacheMaxVersion: s.db.FindMaxFriendVersionCache, CacheMaxVersion: s.db.FindMaxFriendVersionCache,
Find: func(ctx context.Context, ids []string) ([]*sdkws.FriendInfo, error) { Find: func(ctx context.Context, ids []string) ([]*sdkws.FriendInfo, error) {
return s.getFriend(ctx, req.UserID, ids) return s.getFriend(ctx, req.UserID, ids)
@ -65,12 +81,13 @@ func (s *friendServer) GetIncrementalFriends(ctx context.Context, req *relation.
ID: func(elem *sdkws.FriendInfo) string { return elem.FriendUser.UserID }, ID: func(elem *sdkws.FriendInfo) string { return elem.FriendUser.UserID },
Resp: func(version *model.VersionLog, deleteIds []string, insertList, updateList []*sdkws.FriendInfo, full bool) *relation.GetIncrementalFriendsResp { Resp: func(version *model.VersionLog, deleteIds []string, insertList, updateList []*sdkws.FriendInfo, full bool) *relation.GetIncrementalFriendsResp {
return &relation.GetIncrementalFriendsResp{ return &relation.GetIncrementalFriendsResp{
VersionID: version.ID.Hex(), VersionID: version.ID.Hex(),
Version: uint64(version.Version), Version: uint64(version.Version),
Full: full, Full: full,
Delete: deleteIds, Delete: deleteIds,
Insert: insertList, Insert: insertList,
Update: updateList, Update: updateList,
SortVersion: sortVersion,
} }
}, },
} }

@ -306,6 +306,21 @@ func (g *GroupNotificationSender) setVersion(ctx context.Context, version *uint6
} }
} }
func (g *GroupNotificationSender) setSortVersion(ctx context.Context, version *uint64, versionID *string, collName string, id string, sortVersion *uint64) {
versions := versionctx.GetVersionLog(ctx).Get()
for _, coll := range versions {
if coll.Name == collName && coll.Doc.DID == id {
*version = uint64(coll.Doc.Version)
*versionID = coll.Doc.ID.Hex()
for _, elem := range coll.Doc.Logs {
if elem.EID == model.VersionSortChangeID {
*sortVersion = uint64(elem.Version)
}
}
}
}
}
func (g *GroupNotificationSender) GroupCreatedNotification(ctx context.Context, tips *sdkws.GroupCreatedTips) { func (g *GroupNotificationSender) GroupCreatedNotification(ctx context.Context, tips *sdkws.GroupCreatedTips) {
var err error var err error
defer func() { defer func() {
@ -707,7 +722,7 @@ func (g *GroupNotificationSender) GroupMemberInfoSetNotification(ctx context.Con
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
return return
} }
g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID) g.setSortVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID, &tips.GroupSortVersion)
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberInfoSetNotification, tips) g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberInfoSetNotification, tips)
} }

@ -10,7 +10,6 @@ import (
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
pbgroup "github.com/openimsdk/protocol/group" pbgroup "github.com/openimsdk/protocol/group"
"github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/sdkws"
"slices"
) )
func (s *groupServer) GetFullGroupMemberUserIDs(ctx context.Context, req *pbgroup.GetFullGroupMemberUserIDsReq) (*pbgroup.GetFullGroupMemberUserIDsResp, error) { func (s *groupServer) GetFullGroupMemberUserIDs(ctx context.Context, req *pbgroup.GetFullGroupMemberUserIDsReq) (*pbgroup.GetFullGroupMemberUserIDsResp, error) {
@ -63,7 +62,10 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou
if group.Status == constant.GroupStatusDismissed { if group.Status == constant.GroupStatusDismissed {
return nil, servererrs.ErrDismissedAlready.Wrap() return nil, servererrs.ErrDismissedAlready.Wrap()
} }
var hasGroupUpdate bool var (
hasGroupUpdate bool
sortVersion uint64
)
opt := incrversion.Option[*sdkws.GroupMemberFullInfo, pbgroup.GetIncrementalGroupMemberResp]{ opt := incrversion.Option[*sdkws.GroupMemberFullInfo, pbgroup.GetIncrementalGroupMemberResp]{
Ctx: ctx, Ctx: ctx,
VersionKey: req.GroupID, VersionKey: req.GroupID,
@ -74,14 +76,20 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou
if err != nil { if err != nil {
return nil, err return nil, err
} }
vl.Logs = slices.DeleteFunc(vl.Logs, func(elem model.VersionLogElem) bool { logs := make([]model.VersionLogElem, 0, len(vl.Logs))
if elem.EID == "" { for i, log := range vl.Logs {
switch log.EID {
case model.VersionGroupChangeID:
vl.LogLen-- vl.LogLen--
hasGroupUpdate = true hasGroupUpdate = true
return true case model.VersionSortChangeID:
vl.LogLen--
sortVersion = uint64(log.Version)
default:
logs = append(logs, vl.Logs[i])
} }
return false }
}) vl.Logs = logs
if vl.LogLen > 0 { if vl.LogLen > 0 {
hasGroupUpdate = true hasGroupUpdate = true
} }
@ -94,12 +102,13 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou
ID: func(elem *sdkws.GroupMemberFullInfo) string { return elem.UserID }, ID: func(elem *sdkws.GroupMemberFullInfo) string { return elem.UserID },
Resp: func(version *model.VersionLog, delIDs []string, insertList, updateList []*sdkws.GroupMemberFullInfo, full bool) *pbgroup.GetIncrementalGroupMemberResp { Resp: func(version *model.VersionLog, delIDs []string, insertList, updateList []*sdkws.GroupMemberFullInfo, full bool) *pbgroup.GetIncrementalGroupMemberResp {
return &pbgroup.GetIncrementalGroupMemberResp{ return &pbgroup.GetIncrementalGroupMemberResp{
VersionID: version.ID.Hex(), VersionID: version.ID.Hex(),
Version: uint64(version.Version), Version: uint64(version.Version),
Full: full, Full: full,
Delete: delIDs, Delete: delIDs,
Insert: insertList, Insert: insertList,
Update: updateList, Update: updateList,
SortVersion: sortVersion,
} }
}, },
} }

@ -118,7 +118,7 @@ func (g *GroupMemberMgo) UpdateRoleLevel(ctx context.Context, groupID string, us
return mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": userID}, return mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": userID},
bson.M{"$set": bson.M{"role_level": roleLevel}}, true) bson.M{"$set": bson.M{"role_level": roleLevel}}, true)
}, func() error { }, func() error {
return g.member.IncrVersion(ctx, groupID, []string{userID}, model.VersionStateUpdate) return g.member.IncrVersion(ctx, groupID, []string{model.VersionSortChangeID, userID}, model.VersionStateUpdate)
}) })
} }
func (g *GroupMemberMgo) UpdateUserRoleLevels(ctx context.Context, groupID string, firstUserID string, firstUserRoleLevel int32, secondUserID string, secondUserRoleLevel int32) error { func (g *GroupMemberMgo) UpdateUserRoleLevels(ctx context.Context, groupID string, firstUserID string, firstUserRoleLevel int32, secondUserID string, secondUserRoleLevel int32) error {
@ -131,10 +131,9 @@ func (g *GroupMemberMgo) UpdateUserRoleLevels(ctx context.Context, groupID strin
bson.M{"$set": bson.M{"role_level": secondUserRoleLevel}}, true); err != nil { bson.M{"$set": bson.M{"role_level": secondUserRoleLevel}}, true); err != nil {
return err return err
} }
return nil return nil
}, func() error { }, func() error {
return g.member.IncrVersion(ctx, groupID, []string{firstUserID, secondUserID}, model.VersionStateUpdate) return g.member.IncrVersion(ctx, groupID, []string{model.VersionSortChangeID, firstUserID, secondUserID}, model.VersionStateUpdate)
}) })
} }
@ -145,7 +144,13 @@ func (g *GroupMemberMgo) Update(ctx context.Context, groupID string, userID stri
return mongoutil.IncrVersion(func() error { return mongoutil.IncrVersion(func() error {
return mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": userID}, bson.M{"$set": data}, true) return mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": userID}, bson.M{"$set": data}, true)
}, func() error { }, func() error {
return g.member.IncrVersion(ctx, groupID, []string{userID}, model.VersionStateUpdate) var userIDs []string
if g.IsUpdateRoleLevel(data) {
userIDs = []string{model.VersionSortChangeID, userID}
} else {
userIDs = []string{userID}
}
return g.member.IncrVersion(ctx, groupID, userIDs, model.VersionStateUpdate)
}) })
} }

@ -152,8 +152,24 @@ func (l *VersionLogMgo) writeLogBatch2(ctx context.Context, dId string, eIds []s
"$unset": "delete_e_ids", "$unset": "delete_e_ids",
}, },
} }
opt := options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After).SetProjection(bson.M{"logs": 0}) projection := bson.M{
return mongoutil.FindOneAndUpdate[*model.VersionLog](ctx, l.coll, filter, pipeline, opt) "logs": 0,
}
opt := options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After).SetProjection(projection)
res, err := mongoutil.FindOneAndUpdate[*model.VersionLog](ctx, l.coll, filter, pipeline, opt)
if err != nil {
return nil, err
}
res.Logs = make([]model.VersionLogElem, 0, len(eIds))
for _, id := range eIds {
res.Logs = append(res.Logs, model.VersionLogElem{
EID: id,
State: state,
Version: res.Version,
LastUpdate: res.LastUpdate,
})
}
return res, nil
} }
func (l *VersionLogMgo) findDoc(ctx context.Context, dId string) (*model.VersionLog, error) { func (l *VersionLogMgo) findDoc(ctx context.Context, dId string) (*model.VersionLog, error) {

@ -9,12 +9,12 @@ import (
"time" "time"
) )
func Result[V any](val V, err error) V { //func Result[V any](val V, err error) V {
if err != nil { // if err != nil {
panic(err) // panic(err)
} // }
return val // return val
} //}
func Check(err error) { func Check(err error) {
if err != nil { if err != nil {
@ -30,7 +30,7 @@ func TestName(t *testing.T) {
panic(err) panic(err)
} }
vl := tmp.(*VersionLogMgo) vl := tmp.(*VersionLogMgo)
res, err := vl.writeLogBatch2(context.Background(), "100", []string{"1000", "1001", "1003"}, model.VersionStateInsert, time.Now()) res, err := vl.incrVersionResult(context.Background(), "100", []string{"1000", "1001", "1003"}, model.VersionStateInsert)
if err != nil { if err != nil {
t.Log(err) t.Log(err)
return return

@ -14,6 +14,11 @@ const (
VersionStateUpdate VersionStateUpdate
) )
const (
VersionGroupChangeID = ""
VersionSortChangeID = "$"
)
type VersionLogElem struct { type VersionLogElem struct {
EID string `bson:"e_id"` EID string `bson:"e_id"`
State int32 `bson:"state"` State int32 `bson:"state"`

Loading…
Cancel
Save