From 0aaf8b93a020f5c90e4f17019ebb1bc09e76c58c Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Thu, 30 May 2024 15:41:45 +0800 Subject: [PATCH] optimization version log --- go.mod | 8 +- go.sum | 6 +- pkg/common/db/dataver/result.go | 42 --- pkg/common/db/dataver/todo.go | 38 --- pkg/common/listdemo/common.go | 182 ---------- pkg/common/listdemo/friend_model.go | 5 - pkg/common/listdemo/friend_table.go | 86 ----- pkg/common/listdemo2/common.go | 322 ------------------ pkg/common/listdemo2/common_test.go | 61 ---- pkg/common/listdemo2/demo.js | 86 ----- pkg/common/listdemo2/demo2.js | 63 ---- pkg/common/listdemo2/demo3.js | 59 ---- pkg/common/listdemo2/demo5.js | 10 - pkg/common/storage/cache/friend.go | 3 +- pkg/common/storage/cache/redis/friend.go | 3 +- pkg/common/storage/controller/friend.go | 5 +- pkg/common/storage/database/friend.go | 3 +- pkg/common/storage/database/mgo/friend.go | 16 +- .../database/mgo/version_log.go} | 116 ++----- pkg/common/storage/database/version_log.go | 18 + pkg/common/storage/model/user.go | 4 +- pkg/common/storage/model/version_log.go | 61 ++++ 22 files changed, 120 insertions(+), 1077 deletions(-) delete mode 100644 pkg/common/db/dataver/result.go delete mode 100644 pkg/common/db/dataver/todo.go delete mode 100644 pkg/common/listdemo/common.go delete mode 100644 pkg/common/listdemo/friend_model.go delete mode 100644 pkg/common/listdemo/friend_table.go delete mode 100644 pkg/common/listdemo2/common.go delete mode 100644 pkg/common/listdemo2/common_test.go delete mode 100644 pkg/common/listdemo2/demo.js delete mode 100644 pkg/common/listdemo2/demo2.js delete mode 100644 pkg/common/listdemo2/demo3.js delete mode 100644 pkg/common/listdemo2/demo5.js rename pkg/common/{db/dataver/common.go => storage/database/mgo/version_log.go} (56%) create mode 100644 pkg/common/storage/database/version_log.go create mode 100644 pkg/common/storage/model/version_log.go diff --git a/go.mod b/go.mod index 0ae542cd2..bd1e16a5b 100644 --- a/go.mod +++ b/go.mod @@ -14,8 +14,8 @@ require ( github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.69-alpha.2 - github.com/openimsdk/tools v0.0.49-alpha.23 - github.com/pkg/errors v0.9.1 + github.com/openimsdk/tools v0.0.49-alpha.24 + github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 go.mongodb.org/mongo-driver v1.14.0 @@ -178,6 +178,4 @@ require ( gopkg.in/ini.v1 v1.67.0 // indirect ) -replace ( - github.com/openimsdk/protocol => /Users/chao/Desktop/project/protocol -) +replace github.com/openimsdk/protocol => /Users/chao/Desktop/project/protocol diff --git a/go.sum b/go.sum index c0bd78645..1cc2cc1bc 100644 --- a/go.sum +++ b/go.sum @@ -286,10 +286,8 @@ github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJp0= github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.69-alpha.2 h1:XNc3pmAXyW+PMo7tghr2O+uydYck1hogppHDW3+Y+3k= -github.com/openimsdk/protocol v0.0.69-alpha.2/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= -github.com/openimsdk/tools v0.0.49-alpha.23 h1:/KkJ7vfx8FAoJhq3veH9PWnxbSkEf+dTSshvDrHBR38= -github.com/openimsdk/tools v0.0.49-alpha.23/go.mod h1:g7mkHXYUPi0/8aAX8VPMHpnb3hqdV69Jph+bXOGvvNM= +github.com/openimsdk/tools v0.0.49-alpha.24 h1:lJsqnjTPujnr91LRQ6QmcTliMIa4fMOBSTri6rFz2ek= +github.com/openimsdk/tools v0.0.49-alpha.24/go.mod h1:g7mkHXYUPi0/8aAX8VPMHpnb3hqdV69Jph+bXOGvvNM= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/pkg/common/db/dataver/result.go b/pkg/common/db/dataver/result.go deleted file mode 100644 index ec09ffa31..000000000 --- a/pkg/common/db/dataver/result.go +++ /dev/null @@ -1,42 +0,0 @@ -package dataver - -import ( - "github.com/openimsdk/tools/utils/datautil" - "go.mongodb.org/mongo-driver/bson/primitive" -) - -type SyncResult struct { - Version uint - VersionID string - DeleteEID []string - Changes []string - Full bool -} - -func VersionIDStr(id primitive.ObjectID) string { - if id.IsZero() { - return "" - } - return id.String() -} - -func NewSyncResult(wl *WriteLog, fullIDs []string, versionID string) *SyncResult { - var findEIDs []string - var res SyncResult - if wl.Full() || VersionIDStr(wl.ID) != versionID { - res.Changes = fullIDs - res.Full = true - } else { - idSet := datautil.SliceSet(fullIDs) - for _, l := range wl.Logs { - if l.Deleted { - res.DeleteEID = append(res.DeleteEID, l.EID) - } else { - if _, ok := idSet[l.EID]; ok { - findEIDs = append(findEIDs, l.EID) - } - } - } - } - return &res -} diff --git a/pkg/common/db/dataver/todo.go b/pkg/common/db/dataver/todo.go deleted file mode 100644 index cd2be14f0..000000000 --- a/pkg/common/db/dataver/todo.go +++ /dev/null @@ -1,38 +0,0 @@ -package dataver - -/* - -UserIDs 顺序 -前500顺序 - - -1,2,3,4,5,6,7,8,9 - -1,3,5,7,8,9 - - -1.sdk添加一个表记录 docID(后续换名字), version -2.sdk同步,先计算idHash,api调用参数idHash, docID, version -3.服务器先判断version变更记录,没有直接返回同步成功。 - 有变更,先查版本变更记录,在查前500id,变更记录只保留前500id中的 - 根据前500id计算idHash,不一致返回会全量id,不反悔删除id - 全量同步有标识,只返回全量id - 变更记录只包含id,不包括详细信息。 -4.sdk通过变更记录,同步数据不一致进行重试。 -5.先修改db,在自增版本号,外层加事务 - - - - - - - - - - - - - - - -*/ diff --git a/pkg/common/listdemo/common.go b/pkg/common/listdemo/common.go deleted file mode 100644 index 2d99cd8c9..000000000 --- a/pkg/common/listdemo/common.go +++ /dev/null @@ -1,182 +0,0 @@ -package listdemo - -import ( - "context" - "github.com/openimsdk/tools/db/mongoutil" - "github.com/openimsdk/tools/db/pagination" - "github.com/pkg/errors" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" -) - -var ( - ErrListNotFound = errors.New("list not found") - ErrElemExist = errors.New("elem exist") - ErrNotFound = mongo.ErrNoDocuments -) - -type ListDoc interface { - IDName() string // 外层业务id字段名字 user_id - ElemsName() string // 外层列表名字 friends - VersionName() string // 外层版本号 version - DeleteVersion() string // 删除版本号 - BuildDoc(lid any, e Elem) any // 返回一个组装的doc文档 -} - -type Elem interface { - IDName() string // 业务id名字 friend_user_id - IDValue() any // 业务id值 userID -> "100000000" - VersionName() string // 版本号 - DeletedName() string // 删除字段名字 - ToMap() map[string]any // 把结构体转换为map -} - -type List[D any, E Elem] struct { - coll *mongo.Collection - lf ListDoc -} - -func (l *List[D, E]) zeroE() E { - var t E - return t -} - -func (l *List[D, E]) FindElem(ctx context.Context, lid any, eid any) (E, error) { - res, err := l.FindElems(ctx, lid, []any{eid}) - if err != nil { - return l.zeroE(), err - } - if len(res) == 0 { - return l.zeroE(), ErrNotFound - } - return res[0], nil -} - -// FindElems 查询Elems -func (l *List[D, E]) FindElems(ctx context.Context, lid any, eids []any) ([]E, error) { - //pipeline := []bson.M{ - // { - // "$match": bson.M{ - // l.lf.IDName(): lid, - // l.lf.IDName() + "." + l.lf.ElemsID(): bson.M{ - // "$in": eids, - // }, - // }, - // }, - // { - // "$unwind": "$" + l.lf.ElemsName(), - // }, - // { - // "$match": bson.M{ - // l.lf.IDName() + "." + l.lf.ElemsID(): bson.M{ - // "$in": eids, - // }, - // }, - // }, - //} - panic("todo") -} - -func (l *List[D, E]) Find(ctx context.Context, filter any, opts ...*options.FindOptions) ([]E, error) { - return nil, nil -} - -func (l *List[D, E]) Count(ctx context.Context, filter any, opts ...*options.CountOptions) (int64, error) { - return 0, nil -} - -func (l *List[D, E]) Update(ctx context.Context, lid any, eid any) (*mongo.UpdateResult, error) { - - return nil, nil -} - -func (l *List[D, E]) Delete(ctx context.Context, lid any, eids any) (*mongo.UpdateResult, error) { - - return nil, nil -} - -func (l *List[D, E]) Page(ctx context.Context, filter any, pagination pagination.Pagination, opts ...*options.FindOptions) (int64, []E, error) { - return 0, nil, nil -} - -func (l *List[D, E]) ElemIDs(ctx context.Context, filter any, opts ...*options.FindOptions) ([]E, error) { - - return nil, nil -} - -// InsertElem 插入一个 -func (l *List[D, E]) InsertElem(ctx context.Context, lid any, e Elem) error { - if err := l.insertElem(ctx, lid, e); err == nil { - return nil - } else if !errors.Is(err, ErrListNotFound) { - return err - } - if _, err := l.coll.InsertOne(ctx, l.lf.BuildDoc(lid, e)); err == nil { - return nil - } else if mongo.IsDuplicateKeyError(err) { - return l.insertElem(ctx, lid, e) - } else { - return err - } -} - -func (l *List[D, E]) insertElem(ctx context.Context, lid any, e Elem) error { - data := e.ToMap() - data[e.VersionName()] = "$max_version" - filter := bson.M{ - l.lf.IDName(): lid, - } - pipeline := []bson.M{ - { - "$addFields": bson.M{ - "found_elem": bson.M{ - "$in": bson.A{e.IDValue(), l.lf.ElemsName() + "." + e.IDName()}, - }, - }, - }, - { - "$set": bson.M{ - "max_version": bson.M{ - "$cond": bson.M{ - "if": "$found_elem", - "then": "$max_version", - "else": bson.M{"$add": bson.A{"max_version", 1}}, - }, - }, - }, - }, - { - "$set": bson.M{ - l.lf.ElemsName(): bson.M{ - "$cond": bson.M{ - "if": "$found_elem", - "then": "$" + l.lf.ElemsName(), - "else": bson.M{ - "$concatArrays": bson.A{ - "$" + l.lf.ElemsName(), - bson.A{ - data, - }, - }, - }, - }, - }, - }, - }, - { - "$unset": "found_elem", - }, - } - res, err := mongoutil.UpdateMany(ctx, l.coll, filter, pipeline) - if err != nil { - return err - } - if res.MatchedCount == 0 { - return ErrListNotFound - } - if res.ModifiedCount == 0 { - return ErrElemExist - } - return nil -} diff --git a/pkg/common/listdemo/friend_model.go b/pkg/common/listdemo/friend_model.go deleted file mode 100644 index 13507bfa3..000000000 --- a/pkg/common/listdemo/friend_model.go +++ /dev/null @@ -1,5 +0,0 @@ -package listdemo - -type friendModel struct { - db *List[*Friend, *FriendElem] -} diff --git a/pkg/common/listdemo/friend_table.go b/pkg/common/listdemo/friend_table.go deleted file mode 100644 index b1ad877a2..000000000 --- a/pkg/common/listdemo/friend_table.go +++ /dev/null @@ -1,86 +0,0 @@ -package listdemo - -import ( - "time" -) - -var ( - _ Elem = (*FriendElem)(nil) - _ ListDoc = (*Friend)(nil) -) - -type FriendElem struct { - FriendUserID string `bson:"friend_user_id"` - Nickname string `bson:"nickname"` - FaceURL string `bson:"face_url"` - Remark string `bson:"remark"` - CreateTime time.Time `bson:"create_time"` - AddSource int32 `bson:"add_source"` - OperatorUserID string `bson:"operator_user_id"` - Ex string `bson:"ex"` - IsPinned bool `bson:"is_pinned"` - Version uint `bson:"version"` - DeleteTime *time.Time `bson:"delete_time"` -} - -func (f *FriendElem) IDName() string { - return "friend_user_id" -} - -func (f *FriendElem) IDValue() any { - return f.FriendUserID -} - -func (f *FriendElem) VersionName() string { - return "version" -} - -func (f *FriendElem) DeletedName() string { - return "delete_time" -} - -func (f *FriendElem) ToMap() map[string]any { - return map[string]any{ - "friend_user_id": f.FriendUserID, - "nickname": f.Nickname, - "face_url": f.FaceURL, - "remark": f.Remark, - "create_time": f.CreateTime, - "add_source": f.AddSource, - "operator_user_id": f.OperatorUserID, - "ex": f.Ex, - "is_pinned": f.IsPinned, - "version": f.Version, - "delete_time": f.DeleteTime, - } -} - -type Friend struct { - UserID string `bson:"user_id"` - Friends []*FriendElem `bson:"friends"` - Version uint `bson:"version"` - DeleteVersion uint `bson:"delete_version"` -} - -func (f *Friend) BuildDoc(lid any, e Elem) any { - return &Friend{ - UserID: lid.(string), - Friends: []*FriendElem{e.(*FriendElem)}, - } -} - -func (f *Friend) ElemsID() string { - return "user_id" -} - -func (f *Friend) IDName() string { - return "user_id" -} - -func (f *Friend) ElemsName() string { - return "friends" -} - -func (f *Friend) VersionName() string { - return "version" -} diff --git a/pkg/common/listdemo2/common.go b/pkg/common/listdemo2/common.go deleted file mode 100644 index 40419d533..000000000 --- a/pkg/common/listdemo2/common.go +++ /dev/null @@ -1,322 +0,0 @@ -package listdemo - -import ( - "context" - "github.com/openimsdk/tools/db/mongoutil" - "github.com/openimsdk/tools/errs" - "github.com/openimsdk/tools/utils/datautil" - "github.com/pkg/errors" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "time" -) - -var ( - ErrListNotFound = errors.New("list not found") - ErrElemExist = errors.New("elem exist") - ErrNeedFull = errors.New("need full") - ErrNotFound = mongo.ErrNoDocuments -) - -const ( - FirstVersion = 1 - DefaultDeleteVersion = 0 -) - -type Elem struct { - ID string - Version uint -} - -type ChangeLog struct { - ChangeIDs []Elem - DeleteIDs []Elem -} - -type WriteLog struct { - DID string `bson:"d_id"` - Logs []LogElem `bson:"logs"` - Version uint `bson:"version"` - Deleted uint `bson:"deleted"` - LastUpdate time.Time `bson:"last_update"` -} - -type WriteLogLen struct { - DID string `bson:"d_id"` - Logs []LogElem `bson:"logs"` - Version uint `bson:"version"` - Deleted uint `bson:"deleted"` - LastUpdate time.Time `bson:"last_update"` - LogLen int `bson:"log_len"` -} - -type LogElem struct { - EID string `bson:"e_id"` - Deleted bool `bson:"deleted"` - Version uint `bson:"version"` - LastUpdate time.Time `bson:"last_update"` -} - -type LogModel struct { - coll *mongo.Collection -} - -func (l *LogModel) InitIndex(ctx context.Context) error { - _, err := l.coll.Indexes().CreateOne(ctx, mongo.IndexModel{ - Keys: bson.M{ - "d_id": 1, - }, - }) - return err -} - -func (l *LogModel) writeLog(ctx context.Context, dId string, eId string, deleted bool, now time.Time) (*mongo.UpdateResult, error) { - filter := bson.M{ - "d_id": dId, - } - elem := bson.M{ - "e_id": eId, - "version": "$version", - "deleted": deleted, - "last_update": now, - } - pipeline := []bson.M{ - { - "$addFields": bson.M{ - "elem_index": bson.M{ - "$indexOfArray": []any{"$logs.e_id", eId}, - }, - }, - }, - { - "$set": bson.M{ - "version": bson.M{"$add": []any{"$version", 1}}, - "last_update": now, - }, - }, - { - "$set": bson.M{ - "logs": bson.M{ - "$cond": bson.M{ - "if": bson.M{ - "$lt": []any{"$elem_index", 0}, - }, - "then": bson.M{ - "$concatArrays": []any{ - "$logs", - []bson.M{ - elem, - }, - }, - }, - "else": bson.M{ - "$map": bson.M{ - "input": bson.M{ - "$range": []any{0, bson.M{"$size": "$logs"}}, - }, - "as": "i", - "in": bson.M{ - "$cond": bson.M{ - "if": bson.M{ - "$eq": []any{"$$i", "$elem_index"}, - }, - "then": elem, - "else": bson.M{ - "$arrayElemAt": []any{ - "$logs", - "$$i", - }, - }, - }, - }, - }, - }, - }, - }, - }, - }, - { - "$unset": "elem_index", - }, - } - return mongoutil.UpdateMany(ctx, l.coll, filter, pipeline) -} - -func (l *LogModel) WriteLogBatch(ctx context.Context, dId string, eIds []string, deleted bool) error { - if len(eIds) == 0 { - return errs.ErrArgs.WrapMsg("elem id is empty", "dId", dId) - } - if datautil.Duplicate(eIds) { - return errs.ErrArgs.WrapMsg("elem id is duplicate", "dId", dId, "eIds", eIds) - } - now := time.Now() - res, err := l.writeLogBatch(ctx, dId, eIds, deleted, now) - if err != nil { - return err - } - if res.MatchedCount > 0 { - return nil - } - wl := WriteLog{ - DID: dId, - Logs: make([]LogElem, 0, len(eIds)), - Version: FirstVersion, - Deleted: DefaultDeleteVersion, - LastUpdate: now, - } - for _, eId := range eIds { - wl.Logs = append(wl.Logs, LogElem{ - EID: eId, - Deleted: deleted, - Version: FirstVersion, - LastUpdate: now, - }) - } - if _, err := l.coll.InsertOne(ctx, &wl); err == nil { - return nil - } else if !mongo.IsDuplicateKeyError(err) { - return err - } - if res, err := l.writeLogBatch(ctx, dId, eIds, deleted, now); err != nil { - return err - } else if res.ModifiedCount == 0 { - return errs.ErrInternalServer.WrapMsg("mongodb return value that should not occur", "coll", l.coll.Name(), "dId", dId, "eIds", eIds) - } - return nil -} - -func (l *LogModel) writeLogBatch(ctx context.Context, dId string, eIds []string, deleted bool, now time.Time) (*mongo.UpdateResult, error) { - if len(eIds) == 0 { - return nil, errs.ErrArgs.WrapMsg("elem id is empty", "dId", dId) - } - filter := bson.M{ - "d_id": dId, - } - elems := make([]bson.M, 0, len(eIds)) - for _, eId := range eIds { - elems = append(elems, bson.M{ - "e_id": eId, - "version": "$version", - "deleted": deleted, - "last_update": now, - }) - } - pipeline := []bson.M{ - { - "$addFields": bson.M{ - "delete_e_ids": eIds, - }, - }, - { - "$set": bson.M{ - "version": bson.M{"$add": []any{"$version", 1}}, - "last_update": now, - }, - }, - { - "$set": bson.M{ - "logs": bson.M{ - "$filter": bson.M{ - "input": "$logs", - "as": "log", - "cond": bson.M{ - "$not": bson.M{ - "$in": []any{"$$log.e_id", "$delete_e_ids"}, - }, - }, - }, - }, - }, - }, - { - "$set": bson.M{ - "logs": bson.M{ - "$concatArrays": []any{ - "$logs", - elems, - }, - }, - }, - }, - { - "$unset": "delete_e_ids", - }, - } - return mongoutil.UpdateMany(ctx, l.coll, filter, pipeline) -} - -func (l *LogModel) FindChangeLog(ctx context.Context, did string, version uint, limit int) (*WriteLogLen, error) { - pipeline := []bson.M{ - { - "$match": bson.M{ - "d_id": did, - }, - }, - { - "$addFields": bson.M{ - "logs": bson.M{ - "$cond": bson.M{ - "if": bson.M{ - "$or": []bson.M{ - {"$lt": []any{"$version", version}}, - {"$gte": []any{"$deleted", version}}, - }, - }, - "then": []any{}, - "else": "$logs", - }, - }, - }, - }, - { - "$addFields": bson.M{ - "logs": bson.M{ - "$filter": bson.M{ - "input": "$logs", - "as": "l", - "cond": bson.M{ - "$gt": []any{"$$l.version", version}, - }, - }, - }, - }, - }, - { - "$addFields": bson.M{ - "log_len": bson.M{"$size": "$logs"}, - }, - }, - { - "$addFields": bson.M{ - "logs": bson.M{ - "$cond": bson.M{ - "if": bson.M{ - "$gt": []any{"$log_len", limit}, - }, - "then": []any{}, - "else": "$logs", - }, - }, - }, - }, - } - if limit <= 0 { - pipeline = pipeline[:len(pipeline)-1] - } - res, err := mongoutil.Aggregate[*WriteLogLen](ctx, l.coll, pipeline) - if err != nil { - return nil, err - } - if len(res) == 0 { - return &WriteLogLen{}, nil - } - return res[0], nil -} - -func (l *LogModel) DeleteAfterUnchangedLog(ctx context.Context, deadline time.Time) error { - return mongoutil.DeleteMany(ctx, l.coll, bson.M{ - "last_update": bson.M{ - "$lt": deadline, - }, - }) -} diff --git a/pkg/common/listdemo2/common_test.go b/pkg/common/listdemo2/common_test.go deleted file mode 100644 index c03cc31bd..000000000 --- a/pkg/common/listdemo2/common_test.go +++ /dev/null @@ -1,61 +0,0 @@ -package listdemo - -import ( - "context" - "errors" - "github.com/openimsdk/tools/db/mongoutil" - "testing" -) - -func Result[V any](val V, err error) V { - if err != nil { - panic(err) - } - return val -} - -func Check(err error) { - if err != nil { - panic(err) - } -} - -func TestName(t *testing.T) { - cli := Result(mongoutil.NewMongoDB(context.Background(), &mongoutil.Config{Uri: "mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100", Database: "openim_v3"})) - - db := cli.GetDB() - tx := cli.GetTx() - - const num = 1 - lm := &LogModel{coll: db.Collection("friend_version")} - - err := tx.Transaction(context.Background(), func(ctx context.Context) error { - err := tx.Transaction(ctx, func(ctx context.Context) error { - return lm.WriteLogBatch(ctx, "100", []string{"1000", "2000"}, true) - }) - if err != nil { - t.Log("--------->") - return err - } - return errors.New("1234") - }) - t.Log(err) - - //start := time.Now() - //eIds := make([]string, 0, num) - //for i := 0; i < num; i++ { - // eIds = append(eIds, strconv.Itoa(1000+(i))) - //} - //lm.WriteLogBatch1(context.Background(), "100", eIds, false) - //end := time.Now() - //t.Log(end.Sub(start)) // 509.962208ms - //t.Log(end.Sub(start) / num) // 511.496µs - - //start := time.Now() - //wll, err := lm.FindChangeLog(context.Background(), "100", 3, 100) - //if err != nil { - // panic(err) - //} - //t.Log(time.Since(start)) - //t.Log(wll) -} diff --git a/pkg/common/listdemo2/demo.js b/pkg/common/listdemo2/demo.js deleted file mode 100644 index be9c74cb9..000000000 --- a/pkg/common/listdemo2/demo.js +++ /dev/null @@ -1,86 +0,0 @@ -db.friend_version.updateMany( - { - "d_id": "100" - }, - [ - { - $addFields: { - elem_index: { - $indexOfArray: [ - "$logs.e_id", - "1000" - ] - } - } - }, - { - $set: { - version: { - $add: ["$version", 1] - }, - last_update: new Date(), - - } - }, - { - $set: { - logs: { - $cond: { - if: { - $lt: ["$elem_index", 0] - }, - then: { - $concatArrays: [ - "$logs", - [ - { - e_id: "1000", - last_update: new Date(), - version: "$version", - deleted: false - } - ] - ] - }, - else: { - $map: { - input: { - $range: [0, { - $size: "$logs" - }] - }, - as: "i", - in: { - $cond: { - if: { - $eq: ["$$i", "$elem_index"] - }, - then: { - e_id: "1000", - last_update: new Date(), - version: "$version", - deleted: false - }, - else: { - $arrayElemAt: ["$logs", "$$i"] - } - }, - - }, - - }, - - }, - - }, - - }, - - }, - - }, - { - $unset: ["elem_index"] - }, - ] -) diff --git a/pkg/common/listdemo2/demo2.js b/pkg/common/listdemo2/demo2.js deleted file mode 100644 index 15e7abcfe..000000000 --- a/pkg/common/listdemo2/demo2.js +++ /dev/null @@ -1,63 +0,0 @@ - -db.friend_version.updateMany( - { - "d_id": "100" - }, - [ - { - $addFields: { - update_elem_ids: ["1000", "1001","1003", "2000"] - } - }, - { - $set: { - version: { - $add: ["$version", 1] - }, - last_update: new Date(), - } - }, - { - $set: { - logs: { - $filter: { - input: "$logs", - as: "log", - cond: { - "$not": { - $in: ["$$log.e_id", "$update_elem_ids"] - } - } - } - }, - - }, - - }, - { - $set: { - logs: { - $concatArrays: [ - "$logs", - [ - { - e_id: "1003", - last_update: ISODate("2024-05-25T06:32:10.238Z"), - version: "$version", - deleted: false - }, - - ] - ] - } - } - }, - { - $unset: ["update_elem_ids"] - }, - - ] -) - - - diff --git a/pkg/common/listdemo2/demo3.js b/pkg/common/listdemo2/demo3.js deleted file mode 100644 index 5367971cf..000000000 --- a/pkg/common/listdemo2/demo3.js +++ /dev/null @@ -1,59 +0,0 @@ - -db.friend_version.aggregate([ - { - "$match": { - "d_id": "100", - } - }, - { - "$project": { - "_id": 0, - "d_id": 0, - } - }, - { - "$addFields": { - "logs": { - $cond: { - if: { - $or: [ - {$lt: ["$version", 3]}, - {$gte: ["$deleted", 3]}, - ], - }, - then: [], - else: "$logs", - } - } - }, - }, - { - "$addFields": { - "logs": { - "$filter": { - input: "$logs", - as: "l", - cond: { $gt: ["$$l.version", 3] } - } - } - } - }, - { - "$addFields": { - "log_len": { - $size: "$logs" - } - } - }, - { - "$addFields": { - "logs": { - $cond: { - if: {$gt: ["$log_len", 1]}, - then: [], - else: "$logs", - } - } - } - } -]) diff --git a/pkg/common/listdemo2/demo5.js b/pkg/common/listdemo2/demo5.js deleted file mode 100644 index 71a709a81..000000000 --- a/pkg/common/listdemo2/demo5.js +++ /dev/null @@ -1,10 +0,0 @@ -db.friend_version.updateMany( - { - "d_id": "100" - }, - [ - - - - ], -) \ No newline at end of file diff --git a/pkg/common/storage/cache/friend.go b/pkg/common/storage/cache/friend.go index 9dec3d5ab..3fee297ac 100644 --- a/pkg/common/storage/cache/friend.go +++ b/pkg/common/storage/cache/friend.go @@ -16,7 +16,6 @@ package cache import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/dataver" relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" ) @@ -40,5 +39,5 @@ type FriendCache interface { FindSortFriendUserIDs(ctx context.Context, ownerUserID string) ([]string, error) - FindFriendIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*dataver.WriteLog, error) + FindFriendIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*relationtb.VersionLog, error) } diff --git a/pkg/common/storage/cache/redis/friend.go b/pkg/common/storage/cache/redis/friend.go index d6754d7bf..f4edbca9a 100644 --- a/pkg/common/storage/cache/redis/friend.go +++ b/pkg/common/storage/cache/redis/friend.go @@ -16,7 +16,6 @@ package redis import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/dataver" "time" "github.com/dtm-labs/rockscache" @@ -186,6 +185,6 @@ func (f *FriendCacheRedis) FindSortFriendUserIDs(ctx context.Context, ownerUserI }) } -func (f *FriendCacheRedis) FindFriendIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*dataver.WriteLog, error) { +func (f *FriendCacheRedis) FindFriendIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*model.VersionLog, error) { return f.friendDB.FindIncrVersion(ctx, ownerUserID, version, limit) } diff --git a/pkg/common/storage/controller/friend.go b/pkg/common/storage/controller/friend.go index 0c83930b1..1af967b9b 100644 --- a/pkg/common/storage/controller/friend.go +++ b/pkg/common/storage/controller/friend.go @@ -17,7 +17,6 @@ package controller import ( "context" "fmt" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/dataver" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" @@ -81,7 +80,7 @@ type FriendDatabase interface { FindSortFriendUserIDs(ctx context.Context, ownerUserID string) ([]string, error) - FindFriendIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*dataver.WriteLog, error) + FindFriendIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*model.VersionLog, error) FindFriendUserID(ctx context.Context, friendUserID string) ([]string, error) @@ -362,7 +361,7 @@ func (f *friendDatabase) FindSortFriendUserIDs(ctx context.Context, ownerUserID return f.cache.FindSortFriendUserIDs(ctx, ownerUserID) } -func (f *friendDatabase) FindFriendIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*dataver.WriteLog, error) { +func (f *friendDatabase) FindFriendIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*model.VersionLog, error) { return f.cache.FindFriendIncrVersion(ctx, ownerUserID, version, limit) } diff --git a/pkg/common/storage/database/friend.go b/pkg/common/storage/database/friend.go index f5a999485..6ab1185bc 100644 --- a/pkg/common/storage/database/friend.go +++ b/pkg/common/storage/database/friend.go @@ -16,7 +16,6 @@ package database import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/dataver" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/tools/db/pagination" @@ -49,7 +48,7 @@ type Friend interface { // UpdateFriends update friends' fields UpdateFriends(ctx context.Context, ownerUserID string, friendUserIDs []string, val map[string]any) (err error) - FindIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*dataver.WriteLog, error) + FindIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*model.VersionLog, error) FindFriendUserID(ctx context.Context, friendUserID string) ([]string, error) diff --git a/pkg/common/storage/database/mgo/friend.go b/pkg/common/storage/database/mgo/friend.go index 8993ee829..19b2f4f18 100644 --- a/pkg/common/storage/database/mgo/friend.go +++ b/pkg/common/storage/database/mgo/friend.go @@ -16,7 +16,6 @@ package mgo import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/dataver" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" @@ -30,7 +29,7 @@ import ( // FriendMgo implements Friend using MongoDB as the storage backend. type FriendMgo struct { coll *mongo.Collection - owner dataver.DataLog + owner database.VersionLog } // NewFriendMongo creates a new instance of FriendMgo with the provided MongoDB database. @@ -46,7 +45,7 @@ func NewFriendMongo(db *mongo.Database) (database.Friend, error) { if err != nil { return nil, err } - owner, err := dataver.NewDataLog(db.Collection("friend_owner_log")) + owner, err := NewVersionLog(db.Collection("friend_owner_version_log")) if err != nil { return nil, err } @@ -100,15 +99,6 @@ func (f *FriendMgo) UpdateByMap(ctx context.Context, ownerUserID string, friendU }) } -// Update modifies multiple friend documents. -// func (f *FriendMgo) Update(ctx context.Context, friends []*relation.Friend) error { -// filter := bson.M{ -// "owner_user_id": ownerUserID, -// "friend_user_id": friendUserID, -// } -// return mgotool.UpdateMany(ctx, f.coll, filter, friends) -// } - // UpdateRemark updates the remark for a specific friend. func (f *FriendMgo) UpdateRemark(ctx context.Context, ownerUserID, friendUserID, remark string) error { return f.UpdateByMap(ctx, ownerUserID, friendUserID, map[string]any{"remark": remark}) @@ -206,7 +196,7 @@ func (f *FriendMgo) UpdateFriends(ctx context.Context, ownerUserID string, frien }) } -func (f *FriendMgo) FindIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*dataver.WriteLog, error) { +func (f *FriendMgo) FindIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*model.VersionLog, error) { return f.owner.FindChangeLog(ctx, ownerUserID, version, limit) } diff --git a/pkg/common/db/dataver/common.go b/pkg/common/storage/database/mgo/version_log.go similarity index 56% rename from pkg/common/db/dataver/common.go rename to pkg/common/storage/database/mgo/version_log.go index 5ade1fd58..fe123073e 100644 --- a/pkg/common/db/dataver/common.go +++ b/pkg/common/storage/database/mgo/version_log.go @@ -1,8 +1,10 @@ -package dataver +package mgo import ( "context" "errors" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/utils/datautil" @@ -13,84 +15,19 @@ import ( "time" ) -const ( - FirstVersion = 1 - DefaultDeleteVersion = 0 -) - -type WriteLog struct { - ID primitive.ObjectID `bson:"_id"` - DID string `bson:"d_id"` - Logs []Elem `bson:"logs"` - Version uint `bson:"version"` - Deleted uint `bson:"deleted"` - LastUpdate time.Time `bson:"last_update"` - LogLen int `bson:"log_len"` - queryDoc bool `bson:"-"` -} - -func (w *WriteLog) Full() bool { - return w.queryDoc || w.Version == 0 || len(w.Logs) != w.LogLen -} - -func (w *WriteLog) DeleteAndChangeIDs() (delIds []string, changeIds []string) { - for _, l := range w.Logs { - if l.Deleted { - delIds = append(delIds, l.EID) - } else { - changeIds = append(changeIds, l.EID) - } - } - return -} - -type Elem struct { - EID string `bson:"e_id"` - Deleted bool `bson:"deleted"` - Version uint `bson:"version"` - LastUpdate time.Time `bson:"last_update"` -} - -type tableWriteLog struct { - ID primitive.ObjectID `bson:"_id"` - DID string `bson:"d_id"` - Logs []Elem `bson:"logs"` - Version uint `bson:"version"` - Deleted uint `bson:"deleted"` - LastUpdate time.Time `bson:"last_update"` -} - -func (t *tableWriteLog) WriteLog() *WriteLog { - return &WriteLog{ - ID: t.ID, - DID: t.DID, - Logs: t.Logs, - Version: t.Version, - Deleted: t.Deleted, - LastUpdate: t.LastUpdate, - LogLen: 0, - } -} - -type DataLog interface { - WriteLog(ctx context.Context, dId string, eIds []string, deleted bool) error - FindChangeLog(ctx context.Context, dId string, version uint, limit int) (*WriteLog, error) - DeleteAfterUnchangedLog(ctx context.Context, deadline time.Time) error -} - -func NewDataLog(coll *mongo.Collection) (DataLog, error) { - lm := &logModel{coll: coll} +func NewVersionLog(coll *mongo.Collection) (database.VersionLog, error) { + lm := &VersionLogMgo{coll: coll} if lm.initIndex(context.Background()) != nil { return nil, errs.ErrInternalServer.WrapMsg("init index failed", "coll", coll.Name()) } return lm, nil } -type logModel struct { +type VersionLogMgo struct { coll *mongo.Collection } -func (l *logModel) initIndex(ctx context.Context) error { +func (l *VersionLogMgo) initIndex(ctx context.Context) error { _, err := l.coll.Indexes().CreateOne(ctx, mongo.IndexModel{ Keys: bson.M{ "d_id": 1, @@ -99,7 +36,7 @@ func (l *logModel) initIndex(ctx context.Context) error { return err } -func (l *logModel) WriteLog(ctx context.Context, dId string, eIds []string, deleted bool) error { +func (l *VersionLogMgo) WriteLog(ctx context.Context, dId string, eIds []string, deleted bool) error { if len(eIds) == 0 { return errs.ErrArgs.WrapMsg("elem id is empty", "dId", dId) } @@ -127,20 +64,20 @@ func (l *logModel) WriteLog(ctx context.Context, dId string, eIds []string, dele return nil } -func (l *logModel) initDoc(ctx context.Context, dId string, eIds []string, deleted bool, now time.Time) (*tableWriteLog, error) { - wl := tableWriteLog{ +func (l *VersionLogMgo) initDoc(ctx context.Context, dId string, eIds []string, deleted bool, now time.Time) (*model.VersionLogTable, error) { + wl := model.VersionLogTable{ ID: primitive.NewObjectID(), DID: dId, - Logs: make([]Elem, 0, len(eIds)), - Version: FirstVersion, - Deleted: DefaultDeleteVersion, + Logs: make([]model.VersionLogElem, 0, len(eIds)), + Version: database.FirstVersion, + Deleted: database.DefaultDeleteVersion, LastUpdate: now, } for _, eId := range eIds { - wl.Logs = append(wl.Logs, Elem{ + wl.Logs = append(wl.Logs, model.VersionLogElem{ EID: eId, Deleted: deleted, - Version: FirstVersion, + Version: database.FirstVersion, LastUpdate: now, }) } @@ -148,7 +85,7 @@ func (l *logModel) initDoc(ctx context.Context, dId string, eIds []string, delet return &wl, err } -func (l *logModel) writeLogBatch(ctx context.Context, dId string, eIds []string, deleted bool, now time.Time) (*mongo.UpdateResult, error) { +func (l *VersionLogMgo) writeLogBatch(ctx context.Context, dId string, eIds []string, deleted bool, now time.Time) (*mongo.UpdateResult, error) { if eIds == nil { eIds = []string{} } @@ -208,23 +145,22 @@ func (l *logModel) writeLogBatch(ctx context.Context, dId string, eIds []string, return mongoutil.UpdateMany(ctx, l.coll, filter, pipeline) } -func (l *logModel) findDoc(ctx context.Context, dId string) (*WriteLog, error) { - res, err := mongoutil.FindOne[*WriteLog](ctx, l.coll, bson.M{"d_id": dId}, options.FindOne().SetProjection(bson.M{"logs": 0})) +func (l *VersionLogMgo) findDoc(ctx context.Context, dId string) (*model.VersionLog, error) { + vl, err := mongoutil.FindOne[*model.VersionLogTable](ctx, l.coll, bson.M{"d_id": dId}, options.FindOne().SetProjection(bson.M{"logs": 0})) if err != nil { return nil, err } - res.queryDoc = true - return res, nil + return vl.VersionLog(), nil } -func (l *logModel) FindChangeLog(ctx context.Context, dId string, version uint, limit int) (*WriteLog, error) { +func (l *VersionLogMgo) FindChangeLog(ctx context.Context, dId string, version uint, limit int) (*model.VersionLog, error) { if wl, err := l.findChangeLog(ctx, dId, version, limit); err == nil { return wl, nil } else if !errors.Is(err, mongo.ErrNoDocuments) { return nil, err } if res, err := l.initDoc(ctx, dId, nil, false, time.Now()); err == nil { - return res.WriteLog(), nil + return res.VersionLog(), nil } else if mongo.IsDuplicateKeyError(err) { return l.findChangeLog(ctx, dId, version, limit) } else { @@ -232,7 +168,7 @@ func (l *logModel) FindChangeLog(ctx context.Context, dId string, version uint, } } -func (l *logModel) findChangeLog(ctx context.Context, dId string, version uint, limit int) (*WriteLog, error) { +func (l *VersionLogMgo) findChangeLog(ctx context.Context, dId string, version uint, limit int) (*model.VersionLog, error) { if version == 0 && limit == 0 { return l.findDoc(ctx, dId) } @@ -293,17 +229,17 @@ func (l *logModel) findChangeLog(ctx context.Context, dId string, version uint, if limit <= 0 { pipeline = pipeline[:len(pipeline)-1] } - res, err := mongoutil.Aggregate[*WriteLog](ctx, l.coll, pipeline) + vl, err := mongoutil.Aggregate[*model.VersionLog](ctx, l.coll, pipeline) if err != nil { return nil, err } - if len(res) == 0 { + if len(vl) == 0 { return nil, mongo.ErrNoDocuments } - return res[0], nil + return vl[0], nil } -func (l *logModel) DeleteAfterUnchangedLog(ctx context.Context, deadline time.Time) error { +func (l *VersionLogMgo) DeleteAfterUnchangedLog(ctx context.Context, deadline time.Time) error { return mongoutil.DeleteMany(ctx, l.coll, bson.M{ "last_update": bson.M{ "$lt": deadline, diff --git a/pkg/common/storage/database/version_log.go b/pkg/common/storage/database/version_log.go new file mode 100644 index 000000000..6783cf4ff --- /dev/null +++ b/pkg/common/storage/database/version_log.go @@ -0,0 +1,18 @@ +package database + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "time" +) + +const ( + FirstVersion = 1 + DefaultDeleteVersion = 0 +) + +type VersionLog interface { + WriteLog(ctx context.Context, dId string, eIds []string, deleted bool) error + FindChangeLog(ctx context.Context, dId string, version uint, limit int) (*model.VersionLog, error) + DeleteAfterUnchangedLog(ctx context.Context, deadline time.Time) error +} diff --git a/pkg/common/storage/model/user.go b/pkg/common/storage/model/user.go index c6a4f952c..f64d09e79 100644 --- a/pkg/common/storage/model/user.go +++ b/pkg/common/storage/model/user.go @@ -36,10 +36,10 @@ func (u *User) GetFaceURL() string { return u.FaceURL } -func (u User) GetUserID() string { +func (u *User) GetUserID() string { return u.UserID } -func (u User) GetEx() string { +func (u *User) GetEx() string { return u.Ex } diff --git a/pkg/common/storage/model/version_log.go b/pkg/common/storage/model/version_log.go new file mode 100644 index 000000000..044bd42da --- /dev/null +++ b/pkg/common/storage/model/version_log.go @@ -0,0 +1,61 @@ +package model + +import ( + "go.mongodb.org/mongo-driver/bson/primitive" + "time" +) + +type VersionLogElem struct { + EID string `bson:"e_id"` + Deleted bool `bson:"deleted"` + Version uint `bson:"version"` + LastUpdate time.Time `bson:"last_update"` +} + +type VersionLogTable struct { + ID primitive.ObjectID `bson:"_id"` + DID string `bson:"d_id"` + Logs []VersionLogElem `bson:"logs"` + Version uint `bson:"version"` + Deleted uint `bson:"deleted"` + LastUpdate time.Time `bson:"last_update"` +} + +func (v *VersionLogTable) VersionLog() *VersionLog { + return &VersionLog{ + ID: v.ID, + DID: v.DID, + Logs: v.Logs, + Version: v.Version, + Deleted: v.Deleted, + LastUpdate: v.LastUpdate, + LogLen: 0, + queryDoc: true, + } +} + +type VersionLog struct { + ID primitive.ObjectID `bson:"_id"` + DID string `bson:"d_id"` + Logs []VersionLogElem `bson:"logs"` + Version uint `bson:"version"` + Deleted uint `bson:"deleted"` + LastUpdate time.Time `bson:"last_update"` + LogLen int `bson:"log_len"` + queryDoc bool `bson:"-"` +} + +func (w *VersionLog) Full() bool { + return w.queryDoc || w.Version == 0 || len(w.Logs) != w.LogLen +} + +func (w *VersionLog) DeleteAndChangeIDs() (delIds []string, changeIds []string) { + for _, l := range w.Logs { + if l.Deleted { + delIds = append(delIds, l.EID) + } else { + changeIds = append(changeIds, l.EID) + } + } + return +}