parent
5528e8e44d
commit
c6942f07da
@ -0,0 +1,163 @@
|
|||||||
|
package listdemo
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/openimsdk/tools/db/mongoutil"
|
||||||
|
"github.com/openimsdk/tools/errs"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"go.mongodb.org/mongo-driver/bson"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrListNotFound = errors.New("list not found")
|
||||||
|
ErrElemExist = errors.New("elem exist")
|
||||||
|
ErrNeedFull = errors.New("need full")
|
||||||
|
ErrNotFound = mongo.ErrNoDocuments
|
||||||
|
)
|
||||||
|
|
||||||
|
type Elem struct {
|
||||||
|
ID string
|
||||||
|
Version uint
|
||||||
|
}
|
||||||
|
|
||||||
|
type ChangeLog struct {
|
||||||
|
ChangeIDs []Elem
|
||||||
|
DeleteIDs []Elem
|
||||||
|
}
|
||||||
|
|
||||||
|
type WriteLog struct {
|
||||||
|
DID string `bson:"d_id"`
|
||||||
|
Logs []LogElem `bson:"logs"`
|
||||||
|
Version uint `bson:"version"`
|
||||||
|
LastUpdate time.Time `bson:"last_update"`
|
||||||
|
DeleteVersion uint `bson:"delete_version"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type LogElem struct {
|
||||||
|
EID string `bson:"e_id"`
|
||||||
|
Deleted bool `bson:"deleted"`
|
||||||
|
Version uint `bson:"version"`
|
||||||
|
UpdateTime time.Time `bson:"update_time"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type LogModel struct {
|
||||||
|
coll *mongo.Collection
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *LogModel) InitIndex(ctx context.Context) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *LogModel) WriteLog(ctx context.Context, dId string, eId string, deleted bool) error {
|
||||||
|
now := time.Now()
|
||||||
|
res, err := l.writeLog(ctx, dId, eId, deleted, now)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if res.MatchedCount > 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
wl := WriteLog{
|
||||||
|
DID: dId,
|
||||||
|
Logs: []LogElem{
|
||||||
|
{
|
||||||
|
EID: eId,
|
||||||
|
Deleted: deleted,
|
||||||
|
Version: 1,
|
||||||
|
UpdateTime: now,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Version: 1,
|
||||||
|
LastUpdate: now,
|
||||||
|
DeleteVersion: 0,
|
||||||
|
}
|
||||||
|
if _, err := l.coll.InsertOne(ctx, &wl); err == nil {
|
||||||
|
return nil
|
||||||
|
} else if !mongo.IsDuplicateKeyError(err) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if res, err := l.writeLog(ctx, dId, eId, deleted, now); err != nil {
|
||||||
|
return err
|
||||||
|
} else if res.ModifiedCount == 0 {
|
||||||
|
return errs.ErrInternalServer.WrapMsg("mongodb return value that should not occur", "coll", l.coll.Name(), "dId", dId, "eId", eId)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *LogModel) writeLog(ctx context.Context, dId string, eId string, deleted bool, now time.Time) (*mongo.UpdateResult, error) {
|
||||||
|
filter := bson.M{
|
||||||
|
"d_id": dId,
|
||||||
|
}
|
||||||
|
elem := bson.M{
|
||||||
|
"e_id": eId,
|
||||||
|
"version": "$version",
|
||||||
|
"deleted": deleted,
|
||||||
|
"update_time": now,
|
||||||
|
}
|
||||||
|
pipeline := []bson.M{
|
||||||
|
{
|
||||||
|
"$addFields": bson.M{
|
||||||
|
"elem_index": bson.M{
|
||||||
|
"$indexOfArray": []any{"$logs.e_id", eId},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"$set": bson.M{
|
||||||
|
"version": bson.M{"$add": []any{"$version", 1}},
|
||||||
|
"update_time": now,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"$set": bson.M{
|
||||||
|
"logs": bson.M{
|
||||||
|
"$cond": bson.M{
|
||||||
|
"if": bson.M{
|
||||||
|
"$lt": []any{"$elem_index", 0},
|
||||||
|
},
|
||||||
|
"then": bson.M{
|
||||||
|
"$concatArrays": []any{
|
||||||
|
"$logs",
|
||||||
|
[]bson.M{
|
||||||
|
elem,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"else": bson.M{
|
||||||
|
"$map": bson.M{
|
||||||
|
"input": bson.M{
|
||||||
|
"$range": []any{0, bson.M{"$size": "$logs"}},
|
||||||
|
},
|
||||||
|
"as": "i",
|
||||||
|
"in": bson.M{
|
||||||
|
"$cond": bson.M{
|
||||||
|
"if": bson.M{
|
||||||
|
"$eq": []any{"$$i", "$elem_index"},
|
||||||
|
},
|
||||||
|
"then": elem,
|
||||||
|
"else": bson.M{
|
||||||
|
"$arrayElemAt": []any{
|
||||||
|
"$logs",
|
||||||
|
"$$i",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"$unset": "elem_index",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return mongoutil.UpdateMany(ctx, l.coll, filter, pipeline)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *LogModel) FindChangeLog(ctx context.Context, did string, version uint) (*ChangeLog, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
@ -0,0 +1,61 @@
|
|||||||
|
package listdemo
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo/options"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Result[V any](val V, err error) V {
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return val
|
||||||
|
}
|
||||||
|
|
||||||
|
func Check(err error) {
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestName(t *testing.T) {
|
||||||
|
cli := Result(mongo.Connect(context.Background(), options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)))
|
||||||
|
coll := cli.Database("openim_v3").Collection("demo")
|
||||||
|
_ = coll
|
||||||
|
//Result(coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
|
||||||
|
// {
|
||||||
|
// Keys: map[string]int{"user_id": 1},
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// Keys: map[string]int{"friends.friend_user_id": 1},
|
||||||
|
// },
|
||||||
|
//}))
|
||||||
|
|
||||||
|
wl := WriteLog{
|
||||||
|
DID: "100",
|
||||||
|
Logs: []LogElem{
|
||||||
|
{
|
||||||
|
EID: "1000",
|
||||||
|
Deleted: false,
|
||||||
|
Version: 1,
|
||||||
|
UpdateTime: time.Now(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
EID: "2000",
|
||||||
|
Deleted: false,
|
||||||
|
Version: 1,
|
||||||
|
UpdateTime: time.Now(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Version: 2,
|
||||||
|
DeleteVersion: 0,
|
||||||
|
LastUpdate: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println(Result(coll.InsertOne(context.Background(), wl)))
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,86 @@
|
|||||||
|
db.demo.updateMany(
|
||||||
|
{
|
||||||
|
"d_id": "100"
|
||||||
|
},
|
||||||
|
[
|
||||||
|
{
|
||||||
|
$addFields: {
|
||||||
|
elem_index: {
|
||||||
|
$indexOfArray: [
|
||||||
|
"$logs.e_id",
|
||||||
|
"1000"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
$set: {
|
||||||
|
version: {
|
||||||
|
$add: ["$version", 1]
|
||||||
|
},
|
||||||
|
update_time: new Date(),
|
||||||
|
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
$set: {
|
||||||
|
logs: {
|
||||||
|
$cond: {
|
||||||
|
if: {
|
||||||
|
$lt: ["$elem_index", 0]
|
||||||
|
},
|
||||||
|
then: {
|
||||||
|
$concatArrays: [
|
||||||
|
"$logs",
|
||||||
|
[
|
||||||
|
{
|
||||||
|
e_id: "1000",
|
||||||
|
update_time: new Date(),
|
||||||
|
version: "$version",
|
||||||
|
deleted: false
|
||||||
|
}
|
||||||
|
]
|
||||||
|
]
|
||||||
|
},
|
||||||
|
else: {
|
||||||
|
$map: {
|
||||||
|
input: {
|
||||||
|
$range: [0, {
|
||||||
|
$size: "$logs"
|
||||||
|
}]
|
||||||
|
},
|
||||||
|
as: "i",
|
||||||
|
in: {
|
||||||
|
$cond: {
|
||||||
|
if: {
|
||||||
|
$eq: ["$$i", "$elem_index"]
|
||||||
|
},
|
||||||
|
then: {
|
||||||
|
e_id: "1000",
|
||||||
|
update_time: new Date(),
|
||||||
|
version: "$version",
|
||||||
|
deleted: false
|
||||||
|
},
|
||||||
|
else: {
|
||||||
|
$arrayElemAt: ["$logs", "$$i"]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
},
|
||||||
|
|
||||||
|
},
|
||||||
|
|
||||||
|
},
|
||||||
|
|
||||||
|
},
|
||||||
|
|
||||||
|
},
|
||||||
|
|
||||||
|
},
|
||||||
|
|
||||||
|
},
|
||||||
|
{
|
||||||
|
$unset: ["elem_index"]
|
||||||
|
},
|
||||||
|
]
|
||||||
|
)
|
Loading…
Reference in new issue