From c6942f07dae378fd3c367ef8fbcdbf9015bb6c03 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Wed, 22 May 2024 18:38:29 +0800 Subject: [PATCH] new mongo --- pkg/common/listdemo2/common.go | 163 ++++++++++++++++++++++++++++ pkg/common/listdemo2/common_test.go | 61 +++++++++++ pkg/common/listdemo2/demo.js | 86 +++++++++++++++ 3 files changed, 310 insertions(+) create mode 100644 pkg/common/listdemo2/common.go create mode 100644 pkg/common/listdemo2/common_test.go create mode 100644 pkg/common/listdemo2/demo.js diff --git a/pkg/common/listdemo2/common.go b/pkg/common/listdemo2/common.go new file mode 100644 index 000000000..081de8bba --- /dev/null +++ b/pkg/common/listdemo2/common.go @@ -0,0 +1,163 @@ +package listdemo + +import ( + "context" + "github.com/openimsdk/tools/db/mongoutil" + "github.com/openimsdk/tools/errs" + "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "time" +) + +var ( + ErrListNotFound = errors.New("list not found") + ErrElemExist = errors.New("elem exist") + ErrNeedFull = errors.New("need full") + ErrNotFound = mongo.ErrNoDocuments +) + +type Elem struct { + ID string + Version uint +} + +type ChangeLog struct { + ChangeIDs []Elem + DeleteIDs []Elem +} + +type WriteLog struct { + DID string `bson:"d_id"` + Logs []LogElem `bson:"logs"` + Version uint `bson:"version"` + LastUpdate time.Time `bson:"last_update"` + DeleteVersion uint `bson:"delete_version"` +} + +type LogElem struct { + EID string `bson:"e_id"` + Deleted bool `bson:"deleted"` + Version uint `bson:"version"` + UpdateTime time.Time `bson:"update_time"` +} + +type LogModel struct { + coll *mongo.Collection +} + +func (l *LogModel) InitIndex(ctx context.Context) error { + return nil +} + +func (l *LogModel) WriteLog(ctx context.Context, dId string, eId string, deleted bool) error { + now := time.Now() + res, err := l.writeLog(ctx, dId, eId, deleted, now) + if err != nil { + return err + } + if res.MatchedCount > 0 { + return nil + } + wl := WriteLog{ + DID: dId, + Logs: []LogElem{ + { + EID: eId, + Deleted: deleted, + Version: 1, + UpdateTime: now, + }, + }, + Version: 1, + LastUpdate: now, + DeleteVersion: 0, + } + if _, err := l.coll.InsertOne(ctx, &wl); err == nil { + return nil + } else if !mongo.IsDuplicateKeyError(err) { + return err + } + if res, err := l.writeLog(ctx, dId, eId, deleted, now); err != nil { + return err + } else if res.ModifiedCount == 0 { + return errs.ErrInternalServer.WrapMsg("mongodb return value that should not occur", "coll", l.coll.Name(), "dId", dId, "eId", eId) + } + return nil +} + +func (l *LogModel) writeLog(ctx context.Context, dId string, eId string, deleted bool, now time.Time) (*mongo.UpdateResult, error) { + filter := bson.M{ + "d_id": dId, + } + elem := bson.M{ + "e_id": eId, + "version": "$version", + "deleted": deleted, + "update_time": now, + } + pipeline := []bson.M{ + { + "$addFields": bson.M{ + "elem_index": bson.M{ + "$indexOfArray": []any{"$logs.e_id", eId}, + }, + }, + }, + { + "$set": bson.M{ + "version": bson.M{"$add": []any{"$version", 1}}, + "update_time": now, + }, + }, + { + "$set": bson.M{ + "logs": bson.M{ + "$cond": bson.M{ + "if": bson.M{ + "$lt": []any{"$elem_index", 0}, + }, + "then": bson.M{ + "$concatArrays": []any{ + "$logs", + []bson.M{ + elem, + }, + }, + }, + "else": bson.M{ + "$map": bson.M{ + "input": bson.M{ + "$range": []any{0, bson.M{"$size": "$logs"}}, + }, + "as": "i", + "in": bson.M{ + "$cond": bson.M{ + "if": bson.M{ + "$eq": []any{"$$i", "$elem_index"}, + }, + "then": elem, + "else": bson.M{ + "$arrayElemAt": []any{ + "$logs", + "$$i", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + "$unset": "elem_index", + }, + } + return mongoutil.UpdateMany(ctx, l.coll, filter, pipeline) +} + +func (l *LogModel) FindChangeLog(ctx context.Context, did string, version uint) (*ChangeLog, error) { + return nil, nil +} diff --git a/pkg/common/listdemo2/common_test.go b/pkg/common/listdemo2/common_test.go new file mode 100644 index 000000000..91f8f8143 --- /dev/null +++ b/pkg/common/listdemo2/common_test.go @@ -0,0 +1,61 @@ +package listdemo + +import ( + "context" + "fmt" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "testing" + "time" +) + +func Result[V any](val V, err error) V { + if err != nil { + panic(err) + } + return val +} + +func Check(err error) { + if err != nil { + panic(err) + } +} + +func TestName(t *testing.T) { + cli := Result(mongo.Connect(context.Background(), options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))) + coll := cli.Database("openim_v3").Collection("demo") + _ = coll + //Result(coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ + // { + // Keys: map[string]int{"user_id": 1}, + // }, + // { + // Keys: map[string]int{"friends.friend_user_id": 1}, + // }, + //})) + + wl := WriteLog{ + DID: "100", + Logs: []LogElem{ + { + EID: "1000", + Deleted: false, + Version: 1, + UpdateTime: time.Now(), + }, + { + EID: "2000", + Deleted: false, + Version: 1, + UpdateTime: time.Now(), + }, + }, + Version: 2, + DeleteVersion: 0, + LastUpdate: time.Now(), + } + + fmt.Println(Result(coll.InsertOne(context.Background(), wl))) + +} diff --git a/pkg/common/listdemo2/demo.js b/pkg/common/listdemo2/demo.js new file mode 100644 index 000000000..c0044b238 --- /dev/null +++ b/pkg/common/listdemo2/demo.js @@ -0,0 +1,86 @@ +db.demo.updateMany( + { + "d_id": "100" + }, + [ + { + $addFields: { + elem_index: { + $indexOfArray: [ + "$logs.e_id", + "1000" + ] + } + } + }, + { + $set: { + version: { + $add: ["$version", 1] + }, + update_time: new Date(), + + } + }, + { + $set: { + logs: { + $cond: { + if: { + $lt: ["$elem_index", 0] + }, + then: { + $concatArrays: [ + "$logs", + [ + { + e_id: "1000", + update_time: new Date(), + version: "$version", + deleted: false + } + ] + ] + }, + else: { + $map: { + input: { + $range: [0, { + $size: "$logs" + }] + }, + as: "i", + in: { + $cond: { + if: { + $eq: ["$$i", "$elem_index"] + }, + then: { + e_id: "1000", + update_time: new Date(), + version: "$version", + deleted: false + }, + else: { + $arrayElemAt: ["$logs", "$$i"] + } + }, + + }, + + }, + + }, + + }, + + }, + + }, + + }, + { + $unset: ["elem_index"] + }, + ] +)