optimization version log

pull/2336/head
withchao 1 year ago
parent eb362daaf2
commit 0aaf8b93a0

@ -14,8 +14,8 @@ require (
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.69-alpha.2
github.com/openimsdk/tools v0.0.49-alpha.23
github.com/pkg/errors v0.9.1
github.com/openimsdk/tools v0.0.49-alpha.24
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.9.0
go.mongodb.org/mongo-driver v1.14.0
@ -178,6 +178,4 @@ require (
gopkg.in/ini.v1 v1.67.0 // indirect
)
replace (
github.com/openimsdk/protocol => /Users/chao/Desktop/project/protocol
)
replace github.com/openimsdk/protocol => /Users/chao/Desktop/project/protocol

@ -286,10 +286,8 @@ github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJp0=
github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.69-alpha.2 h1:XNc3pmAXyW+PMo7tghr2O+uydYck1hogppHDW3+Y+3k=
github.com/openimsdk/protocol v0.0.69-alpha.2/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.49-alpha.23 h1:/KkJ7vfx8FAoJhq3veH9PWnxbSkEf+dTSshvDrHBR38=
github.com/openimsdk/tools v0.0.49-alpha.23/go.mod h1:g7mkHXYUPi0/8aAX8VPMHpnb3hqdV69Jph+bXOGvvNM=
github.com/openimsdk/tools v0.0.49-alpha.24 h1:lJsqnjTPujnr91LRQ6QmcTliMIa4fMOBSTri6rFz2ek=
github.com/openimsdk/tools v0.0.49-alpha.24/go.mod h1:g7mkHXYUPi0/8aAX8VPMHpnb3hqdV69Jph+bXOGvvNM=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=

@ -1,42 +0,0 @@
package dataver
import (
"github.com/openimsdk/tools/utils/datautil"
"go.mongodb.org/mongo-driver/bson/primitive"
)
type SyncResult struct {
Version uint
VersionID string
DeleteEID []string
Changes []string
Full bool
}
func VersionIDStr(id primitive.ObjectID) string {
if id.IsZero() {
return ""
}
return id.String()
}
func NewSyncResult(wl *WriteLog, fullIDs []string, versionID string) *SyncResult {
var findEIDs []string
var res SyncResult
if wl.Full() || VersionIDStr(wl.ID) != versionID {
res.Changes = fullIDs
res.Full = true
} else {
idSet := datautil.SliceSet(fullIDs)
for _, l := range wl.Logs {
if l.Deleted {
res.DeleteEID = append(res.DeleteEID, l.EID)
} else {
if _, ok := idSet[l.EID]; ok {
findEIDs = append(findEIDs, l.EID)
}
}
}
}
return &res
}

@ -1,38 +0,0 @@
package dataver
/*
UserIDs
500
1,2,3,4,5,6,7,8,9
1,3,5,7,8,9
1.sdk docID(), version
2.sdkidHashapiidHash, docID, version
3.version
500id500id
500ididHashidid
id
id
4.sdk
5.db
*/

@ -1,182 +0,0 @@
package listdemo
import (
"context"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/pagination"
"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
var (
ErrListNotFound = errors.New("list not found")
ErrElemExist = errors.New("elem exist")
ErrNotFound = mongo.ErrNoDocuments
)
type ListDoc interface {
IDName() string // 外层业务id字段名字 user_id
ElemsName() string // 外层列表名字 friends
VersionName() string // 外层版本号 version
DeleteVersion() string // 删除版本号
BuildDoc(lid any, e Elem) any // 返回一个组装的doc文档
}
type Elem interface {
IDName() string // 业务id名字 friend_user_id
IDValue() any // 业务id值 userID -> "100000000"
VersionName() string // 版本号
DeletedName() string // 删除字段名字
ToMap() map[string]any // 把结构体转换为map
}
type List[D any, E Elem] struct {
coll *mongo.Collection
lf ListDoc
}
func (l *List[D, E]) zeroE() E {
var t E
return t
}
func (l *List[D, E]) FindElem(ctx context.Context, lid any, eid any) (E, error) {
res, err := l.FindElems(ctx, lid, []any{eid})
if err != nil {
return l.zeroE(), err
}
if len(res) == 0 {
return l.zeroE(), ErrNotFound
}
return res[0], nil
}
// FindElems 查询Elems
func (l *List[D, E]) FindElems(ctx context.Context, lid any, eids []any) ([]E, error) {
//pipeline := []bson.M{
// {
// "$match": bson.M{
// l.lf.IDName(): lid,
// l.lf.IDName() + "." + l.lf.ElemsID(): bson.M{
// "$in": eids,
// },
// },
// },
// {
// "$unwind": "$" + l.lf.ElemsName(),
// },
// {
// "$match": bson.M{
// l.lf.IDName() + "." + l.lf.ElemsID(): bson.M{
// "$in": eids,
// },
// },
// },
//}
panic("todo")
}
func (l *List[D, E]) Find(ctx context.Context, filter any, opts ...*options.FindOptions) ([]E, error) {
return nil, nil
}
func (l *List[D, E]) Count(ctx context.Context, filter any, opts ...*options.CountOptions) (int64, error) {
return 0, nil
}
func (l *List[D, E]) Update(ctx context.Context, lid any, eid any) (*mongo.UpdateResult, error) {
return nil, nil
}
func (l *List[D, E]) Delete(ctx context.Context, lid any, eids any) (*mongo.UpdateResult, error) {
return nil, nil
}
func (l *List[D, E]) Page(ctx context.Context, filter any, pagination pagination.Pagination, opts ...*options.FindOptions) (int64, []E, error) {
return 0, nil, nil
}
func (l *List[D, E]) ElemIDs(ctx context.Context, filter any, opts ...*options.FindOptions) ([]E, error) {
return nil, nil
}
// InsertElem 插入一个
func (l *List[D, E]) InsertElem(ctx context.Context, lid any, e Elem) error {
if err := l.insertElem(ctx, lid, e); err == nil {
return nil
} else if !errors.Is(err, ErrListNotFound) {
return err
}
if _, err := l.coll.InsertOne(ctx, l.lf.BuildDoc(lid, e)); err == nil {
return nil
} else if mongo.IsDuplicateKeyError(err) {
return l.insertElem(ctx, lid, e)
} else {
return err
}
}
func (l *List[D, E]) insertElem(ctx context.Context, lid any, e Elem) error {
data := e.ToMap()
data[e.VersionName()] = "$max_version"
filter := bson.M{
l.lf.IDName(): lid,
}
pipeline := []bson.M{
{
"$addFields": bson.M{
"found_elem": bson.M{
"$in": bson.A{e.IDValue(), l.lf.ElemsName() + "." + e.IDName()},
},
},
},
{
"$set": bson.M{
"max_version": bson.M{
"$cond": bson.M{
"if": "$found_elem",
"then": "$max_version",
"else": bson.M{"$add": bson.A{"max_version", 1}},
},
},
},
},
{
"$set": bson.M{
l.lf.ElemsName(): bson.M{
"$cond": bson.M{
"if": "$found_elem",
"then": "$" + l.lf.ElemsName(),
"else": bson.M{
"$concatArrays": bson.A{
"$" + l.lf.ElemsName(),
bson.A{
data,
},
},
},
},
},
},
},
{
"$unset": "found_elem",
},
}
res, err := mongoutil.UpdateMany(ctx, l.coll, filter, pipeline)
if err != nil {
return err
}
if res.MatchedCount == 0 {
return ErrListNotFound
}
if res.ModifiedCount == 0 {
return ErrElemExist
}
return nil
}

@ -1,5 +0,0 @@
package listdemo
type friendModel struct {
db *List[*Friend, *FriendElem]
}

@ -1,86 +0,0 @@
package listdemo
import (
"time"
)
var (
_ Elem = (*FriendElem)(nil)
_ ListDoc = (*Friend)(nil)
)
type FriendElem struct {
FriendUserID string `bson:"friend_user_id"`
Nickname string `bson:"nickname"`
FaceURL string `bson:"face_url"`
Remark string `bson:"remark"`
CreateTime time.Time `bson:"create_time"`
AddSource int32 `bson:"add_source"`
OperatorUserID string `bson:"operator_user_id"`
Ex string `bson:"ex"`
IsPinned bool `bson:"is_pinned"`
Version uint `bson:"version"`
DeleteTime *time.Time `bson:"delete_time"`
}
func (f *FriendElem) IDName() string {
return "friend_user_id"
}
func (f *FriendElem) IDValue() any {
return f.FriendUserID
}
func (f *FriendElem) VersionName() string {
return "version"
}
func (f *FriendElem) DeletedName() string {
return "delete_time"
}
func (f *FriendElem) ToMap() map[string]any {
return map[string]any{
"friend_user_id": f.FriendUserID,
"nickname": f.Nickname,
"face_url": f.FaceURL,
"remark": f.Remark,
"create_time": f.CreateTime,
"add_source": f.AddSource,
"operator_user_id": f.OperatorUserID,
"ex": f.Ex,
"is_pinned": f.IsPinned,
"version": f.Version,
"delete_time": f.DeleteTime,
}
}
type Friend struct {
UserID string `bson:"user_id"`
Friends []*FriendElem `bson:"friends"`
Version uint `bson:"version"`
DeleteVersion uint `bson:"delete_version"`
}
func (f *Friend) BuildDoc(lid any, e Elem) any {
return &Friend{
UserID: lid.(string),
Friends: []*FriendElem{e.(*FriendElem)},
}
}
func (f *Friend) ElemsID() string {
return "user_id"
}
func (f *Friend) IDName() string {
return "user_id"
}
func (f *Friend) ElemsName() string {
return "friends"
}
func (f *Friend) VersionName() string {
return "version"
}

@ -1,322 +0,0 @@
package listdemo
import (
"context"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/datautil"
"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"time"
)
var (
ErrListNotFound = errors.New("list not found")
ErrElemExist = errors.New("elem exist")
ErrNeedFull = errors.New("need full")
ErrNotFound = mongo.ErrNoDocuments
)
const (
FirstVersion = 1
DefaultDeleteVersion = 0
)
type Elem struct {
ID string
Version uint
}
type ChangeLog struct {
ChangeIDs []Elem
DeleteIDs []Elem
}
type WriteLog struct {
DID string `bson:"d_id"`
Logs []LogElem `bson:"logs"`
Version uint `bson:"version"`
Deleted uint `bson:"deleted"`
LastUpdate time.Time `bson:"last_update"`
}
type WriteLogLen struct {
DID string `bson:"d_id"`
Logs []LogElem `bson:"logs"`
Version uint `bson:"version"`
Deleted uint `bson:"deleted"`
LastUpdate time.Time `bson:"last_update"`
LogLen int `bson:"log_len"`
}
type LogElem struct {
EID string `bson:"e_id"`
Deleted bool `bson:"deleted"`
Version uint `bson:"version"`
LastUpdate time.Time `bson:"last_update"`
}
type LogModel struct {
coll *mongo.Collection
}
func (l *LogModel) InitIndex(ctx context.Context) error {
_, err := l.coll.Indexes().CreateOne(ctx, mongo.IndexModel{
Keys: bson.M{
"d_id": 1,
},
})
return err
}
func (l *LogModel) writeLog(ctx context.Context, dId string, eId string, deleted bool, now time.Time) (*mongo.UpdateResult, error) {
filter := bson.M{
"d_id": dId,
}
elem := bson.M{
"e_id": eId,
"version": "$version",
"deleted": deleted,
"last_update": now,
}
pipeline := []bson.M{
{
"$addFields": bson.M{
"elem_index": bson.M{
"$indexOfArray": []any{"$logs.e_id", eId},
},
},
},
{
"$set": bson.M{
"version": bson.M{"$add": []any{"$version", 1}},
"last_update": now,
},
},
{
"$set": bson.M{
"logs": bson.M{
"$cond": bson.M{
"if": bson.M{
"$lt": []any{"$elem_index", 0},
},
"then": bson.M{
"$concatArrays": []any{
"$logs",
[]bson.M{
elem,
},
},
},
"else": bson.M{
"$map": bson.M{
"input": bson.M{
"$range": []any{0, bson.M{"$size": "$logs"}},
},
"as": "i",
"in": bson.M{
"$cond": bson.M{
"if": bson.M{
"$eq": []any{"$$i", "$elem_index"},
},
"then": elem,
"else": bson.M{
"$arrayElemAt": []any{
"$logs",
"$$i",
},
},
},
},
},
},
},
},
},
},
{
"$unset": "elem_index",
},
}
return mongoutil.UpdateMany(ctx, l.coll, filter, pipeline)
}
func (l *LogModel) WriteLogBatch(ctx context.Context, dId string, eIds []string, deleted bool) error {
if len(eIds) == 0 {
return errs.ErrArgs.WrapMsg("elem id is empty", "dId", dId)
}
if datautil.Duplicate(eIds) {
return errs.ErrArgs.WrapMsg("elem id is duplicate", "dId", dId, "eIds", eIds)
}
now := time.Now()
res, err := l.writeLogBatch(ctx, dId, eIds, deleted, now)
if err != nil {
return err
}
if res.MatchedCount > 0 {
return nil
}
wl := WriteLog{
DID: dId,
Logs: make([]LogElem, 0, len(eIds)),
Version: FirstVersion,
Deleted: DefaultDeleteVersion,
LastUpdate: now,
}
for _, eId := range eIds {
wl.Logs = append(wl.Logs, LogElem{
EID: eId,
Deleted: deleted,
Version: FirstVersion,
LastUpdate: now,
})
}
if _, err := l.coll.InsertOne(ctx, &wl); err == nil {
return nil
} else if !mongo.IsDuplicateKeyError(err) {
return err
}
if res, err := l.writeLogBatch(ctx, dId, eIds, deleted, now); err != nil {
return err
} else if res.ModifiedCount == 0 {
return errs.ErrInternalServer.WrapMsg("mongodb return value that should not occur", "coll", l.coll.Name(), "dId", dId, "eIds", eIds)
}
return nil
}
func (l *LogModel) writeLogBatch(ctx context.Context, dId string, eIds []string, deleted bool, now time.Time) (*mongo.UpdateResult, error) {
if len(eIds) == 0 {
return nil, errs.ErrArgs.WrapMsg("elem id is empty", "dId", dId)
}
filter := bson.M{
"d_id": dId,
}
elems := make([]bson.M, 0, len(eIds))
for _, eId := range eIds {
elems = append(elems, bson.M{
"e_id": eId,
"version": "$version",
"deleted": deleted,
"last_update": now,
})
}
pipeline := []bson.M{
{
"$addFields": bson.M{
"delete_e_ids": eIds,
},
},
{
"$set": bson.M{
"version": bson.M{"$add": []any{"$version", 1}},
"last_update": now,
},
},
{
"$set": bson.M{
"logs": bson.M{
"$filter": bson.M{
"input": "$logs",
"as": "log",
"cond": bson.M{
"$not": bson.M{
"$in": []any{"$$log.e_id", "$delete_e_ids"},
},
},
},
},
},
},
{
"$set": bson.M{
"logs": bson.M{
"$concatArrays": []any{
"$logs",
elems,
},
},
},
},
{
"$unset": "delete_e_ids",
},
}
return mongoutil.UpdateMany(ctx, l.coll, filter, pipeline)
}
func (l *LogModel) FindChangeLog(ctx context.Context, did string, version uint, limit int) (*WriteLogLen, error) {
pipeline := []bson.M{
{
"$match": bson.M{
"d_id": did,
},
},
{
"$addFields": bson.M{
"logs": bson.M{
"$cond": bson.M{
"if": bson.M{
"$or": []bson.M{
{"$lt": []any{"$version", version}},
{"$gte": []any{"$deleted", version}},
},
},
"then": []any{},
"else": "$logs",
},
},
},
},
{
"$addFields": bson.M{
"logs": bson.M{
"$filter": bson.M{
"input": "$logs",
"as": "l",
"cond": bson.M{
"$gt": []any{"$$l.version", version},
},
},
},
},
},
{
"$addFields": bson.M{
"log_len": bson.M{"$size": "$logs"},
},
},
{
"$addFields": bson.M{
"logs": bson.M{
"$cond": bson.M{
"if": bson.M{
"$gt": []any{"$log_len", limit},
},
"then": []any{},
"else": "$logs",
},
},
},
},
}
if limit <= 0 {
pipeline = pipeline[:len(pipeline)-1]
}
res, err := mongoutil.Aggregate[*WriteLogLen](ctx, l.coll, pipeline)
if err != nil {
return nil, err
}
if len(res) == 0 {
return &WriteLogLen{}, nil
}
return res[0], nil
}
func (l *LogModel) DeleteAfterUnchangedLog(ctx context.Context, deadline time.Time) error {
return mongoutil.DeleteMany(ctx, l.coll, bson.M{
"last_update": bson.M{
"$lt": deadline,
},
})
}

@ -1,61 +0,0 @@
package listdemo
import (
"context"
"errors"
"github.com/openimsdk/tools/db/mongoutil"
"testing"
)
func Result[V any](val V, err error) V {
if err != nil {
panic(err)
}
return val
}
func Check(err error) {
if err != nil {
panic(err)
}
}
func TestName(t *testing.T) {
cli := Result(mongoutil.NewMongoDB(context.Background(), &mongoutil.Config{Uri: "mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100", Database: "openim_v3"}))
db := cli.GetDB()
tx := cli.GetTx()
const num = 1
lm := &LogModel{coll: db.Collection("friend_version")}
err := tx.Transaction(context.Background(), func(ctx context.Context) error {
err := tx.Transaction(ctx, func(ctx context.Context) error {
return lm.WriteLogBatch(ctx, "100", []string{"1000", "2000"}, true)
})
if err != nil {
t.Log("--------->")
return err
}
return errors.New("1234")
})
t.Log(err)
//start := time.Now()
//eIds := make([]string, 0, num)
//for i := 0; i < num; i++ {
// eIds = append(eIds, strconv.Itoa(1000+(i)))
//}
//lm.WriteLogBatch1(context.Background(), "100", eIds, false)
//end := time.Now()
//t.Log(end.Sub(start)) // 509.962208ms
//t.Log(end.Sub(start) / num) // 511.496µs
//start := time.Now()
//wll, err := lm.FindChangeLog(context.Background(), "100", 3, 100)
//if err != nil {
// panic(err)
//}
//t.Log(time.Since(start))
//t.Log(wll)
}

@ -1,86 +0,0 @@
db.friend_version.updateMany(
{
"d_id": "100"
},
[
{
$addFields: {
elem_index: {
$indexOfArray: [
"$logs.e_id",
"1000"
]
}
}
},
{
$set: {
version: {
$add: ["$version", 1]
},
last_update: new Date(),
}
},
{
$set: {
logs: {
$cond: {
if: {
$lt: ["$elem_index", 0]
},
then: {
$concatArrays: [
"$logs",
[
{
e_id: "1000",
last_update: new Date(),
version: "$version",
deleted: false
}
]
]
},
else: {
$map: {
input: {
$range: [0, {
$size: "$logs"
}]
},
as: "i",
in: {
$cond: {
if: {
$eq: ["$$i", "$elem_index"]
},
then: {
e_id: "1000",
last_update: new Date(),
version: "$version",
deleted: false
},
else: {
$arrayElemAt: ["$logs", "$$i"]
}
},
},
},
},
},
},
},
},
{
$unset: ["elem_index"]
},
]
)

@ -1,63 +0,0 @@
db.friend_version.updateMany(
{
"d_id": "100"
},
[
{
$addFields: {
update_elem_ids: ["1000", "1001","1003", "2000"]
}
},
{
$set: {
version: {
$add: ["$version", 1]
},
last_update: new Date(),
}
},
{
$set: {
logs: {
$filter: {
input: "$logs",
as: "log",
cond: {
"$not": {
$in: ["$$log.e_id", "$update_elem_ids"]
}
}
}
},
},
},
{
$set: {
logs: {
$concatArrays: [
"$logs",
[
{
e_id: "1003",
last_update: ISODate("2024-05-25T06:32:10.238Z"),
version: "$version",
deleted: false
},
]
]
}
}
},
{
$unset: ["update_elem_ids"]
},
]
)

@ -1,59 +0,0 @@
db.friend_version.aggregate([
{
"$match": {
"d_id": "100",
}
},
{
"$project": {
"_id": 0,
"d_id": 0,
}
},
{
"$addFields": {
"logs": {
$cond: {
if: {
$or: [
{$lt: ["$version", 3]},
{$gte: ["$deleted", 3]},
],
},
then: [],
else: "$logs",
}
}
},
},
{
"$addFields": {
"logs": {
"$filter": {
input: "$logs",
as: "l",
cond: { $gt: ["$$l.version", 3] }
}
}
}
},
{
"$addFields": {
"log_len": {
$size: "$logs"
}
}
},
{
"$addFields": {
"logs": {
$cond: {
if: {$gt: ["$log_len", 1]},
then: [],
else: "$logs",
}
}
}
}
])

@ -1,10 +0,0 @@
db.friend_version.updateMany(
{
"d_id": "100"
},
[
],
)

@ -16,7 +16,6 @@ package cache
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/dataver"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
)
@ -40,5 +39,5 @@ type FriendCache interface {
FindSortFriendUserIDs(ctx context.Context, ownerUserID string) ([]string, error)
FindFriendIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*dataver.WriteLog, error)
FindFriendIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*relationtb.VersionLog, error)
}

@ -16,7 +16,6 @@ package redis
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/dataver"
"time"
"github.com/dtm-labs/rockscache"
@ -186,6 +185,6 @@ func (f *FriendCacheRedis) FindSortFriendUserIDs(ctx context.Context, ownerUserI
})
}
func (f *FriendCacheRedis) FindFriendIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*dataver.WriteLog, error) {
func (f *FriendCacheRedis) FindFriendIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*model.VersionLog, error) {
return f.friendDB.FindIncrVersion(ctx, ownerUserID, version, limit)
}

@ -17,7 +17,6 @@ package controller
import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/dataver"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
@ -81,7 +80,7 @@ type FriendDatabase interface {
FindSortFriendUserIDs(ctx context.Context, ownerUserID string) ([]string, error)
FindFriendIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*dataver.WriteLog, error)
FindFriendIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*model.VersionLog, error)
FindFriendUserID(ctx context.Context, friendUserID string) ([]string, error)
@ -362,7 +361,7 @@ func (f *friendDatabase) FindSortFriendUserIDs(ctx context.Context, ownerUserID
return f.cache.FindSortFriendUserIDs(ctx, ownerUserID)
}
func (f *friendDatabase) FindFriendIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*dataver.WriteLog, error) {
func (f *friendDatabase) FindFriendIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*model.VersionLog, error) {
return f.cache.FindFriendIncrVersion(ctx, ownerUserID, version, limit)
}

@ -16,7 +16,6 @@ package database
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/dataver"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/pagination"
@ -49,7 +48,7 @@ type Friend interface {
// UpdateFriends update friends' fields
UpdateFriends(ctx context.Context, ownerUserID string, friendUserIDs []string, val map[string]any) (err error)
FindIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*dataver.WriteLog, error)
FindIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*model.VersionLog, error)
FindFriendUserID(ctx context.Context, friendUserID string) ([]string, error)

@ -16,7 +16,6 @@ package mgo
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/dataver"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
@ -30,7 +29,7 @@ import (
// FriendMgo implements Friend using MongoDB as the storage backend.
type FriendMgo struct {
coll *mongo.Collection
owner dataver.DataLog
owner database.VersionLog
}
// NewFriendMongo creates a new instance of FriendMgo with the provided MongoDB database.
@ -46,7 +45,7 @@ func NewFriendMongo(db *mongo.Database) (database.Friend, error) {
if err != nil {
return nil, err
}
owner, err := dataver.NewDataLog(db.Collection("friend_owner_log"))
owner, err := NewVersionLog(db.Collection("friend_owner_version_log"))
if err != nil {
return nil, err
}
@ -100,15 +99,6 @@ func (f *FriendMgo) UpdateByMap(ctx context.Context, ownerUserID string, friendU
})
}
// Update modifies multiple friend documents.
// func (f *FriendMgo) Update(ctx context.Context, friends []*relation.Friend) error {
// filter := bson.M{
// "owner_user_id": ownerUserID,
// "friend_user_id": friendUserID,
// }
// return mgotool.UpdateMany(ctx, f.coll, filter, friends)
// }
// UpdateRemark updates the remark for a specific friend.
func (f *FriendMgo) UpdateRemark(ctx context.Context, ownerUserID, friendUserID, remark string) error {
return f.UpdateByMap(ctx, ownerUserID, friendUserID, map[string]any{"remark": remark})
@ -206,7 +196,7 @@ func (f *FriendMgo) UpdateFriends(ctx context.Context, ownerUserID string, frien
})
}
func (f *FriendMgo) FindIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*dataver.WriteLog, error) {
func (f *FriendMgo) FindIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*model.VersionLog, error) {
return f.owner.FindChangeLog(ctx, ownerUserID, version, limit)
}

@ -1,8 +1,10 @@
package dataver
package mgo
import (
"context"
"errors"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/datautil"
@ -13,84 +15,19 @@ import (
"time"
)
const (
FirstVersion = 1
DefaultDeleteVersion = 0
)
type WriteLog struct {
ID primitive.ObjectID `bson:"_id"`
DID string `bson:"d_id"`
Logs []Elem `bson:"logs"`
Version uint `bson:"version"`
Deleted uint `bson:"deleted"`
LastUpdate time.Time `bson:"last_update"`
LogLen int `bson:"log_len"`
queryDoc bool `bson:"-"`
}
func (w *WriteLog) Full() bool {
return w.queryDoc || w.Version == 0 || len(w.Logs) != w.LogLen
}
func (w *WriteLog) DeleteAndChangeIDs() (delIds []string, changeIds []string) {
for _, l := range w.Logs {
if l.Deleted {
delIds = append(delIds, l.EID)
} else {
changeIds = append(changeIds, l.EID)
}
}
return
}
type Elem struct {
EID string `bson:"e_id"`
Deleted bool `bson:"deleted"`
Version uint `bson:"version"`
LastUpdate time.Time `bson:"last_update"`
}
type tableWriteLog struct {
ID primitive.ObjectID `bson:"_id"`
DID string `bson:"d_id"`
Logs []Elem `bson:"logs"`
Version uint `bson:"version"`
Deleted uint `bson:"deleted"`
LastUpdate time.Time `bson:"last_update"`
}
func (t *tableWriteLog) WriteLog() *WriteLog {
return &WriteLog{
ID: t.ID,
DID: t.DID,
Logs: t.Logs,
Version: t.Version,
Deleted: t.Deleted,
LastUpdate: t.LastUpdate,
LogLen: 0,
}
}
type DataLog interface {
WriteLog(ctx context.Context, dId string, eIds []string, deleted bool) error
FindChangeLog(ctx context.Context, dId string, version uint, limit int) (*WriteLog, error)
DeleteAfterUnchangedLog(ctx context.Context, deadline time.Time) error
}
func NewDataLog(coll *mongo.Collection) (DataLog, error) {
lm := &logModel{coll: coll}
func NewVersionLog(coll *mongo.Collection) (database.VersionLog, error) {
lm := &VersionLogMgo{coll: coll}
if lm.initIndex(context.Background()) != nil {
return nil, errs.ErrInternalServer.WrapMsg("init index failed", "coll", coll.Name())
}
return lm, nil
}
type logModel struct {
type VersionLogMgo struct {
coll *mongo.Collection
}
func (l *logModel) initIndex(ctx context.Context) error {
func (l *VersionLogMgo) initIndex(ctx context.Context) error {
_, err := l.coll.Indexes().CreateOne(ctx, mongo.IndexModel{
Keys: bson.M{
"d_id": 1,
@ -99,7 +36,7 @@ func (l *logModel) initIndex(ctx context.Context) error {
return err
}
func (l *logModel) WriteLog(ctx context.Context, dId string, eIds []string, deleted bool) error {
func (l *VersionLogMgo) WriteLog(ctx context.Context, dId string, eIds []string, deleted bool) error {
if len(eIds) == 0 {
return errs.ErrArgs.WrapMsg("elem id is empty", "dId", dId)
}
@ -127,20 +64,20 @@ func (l *logModel) WriteLog(ctx context.Context, dId string, eIds []string, dele
return nil
}
func (l *logModel) initDoc(ctx context.Context, dId string, eIds []string, deleted bool, now time.Time) (*tableWriteLog, error) {
wl := tableWriteLog{
func (l *VersionLogMgo) initDoc(ctx context.Context, dId string, eIds []string, deleted bool, now time.Time) (*model.VersionLogTable, error) {
wl := model.VersionLogTable{
ID: primitive.NewObjectID(),
DID: dId,
Logs: make([]Elem, 0, len(eIds)),
Version: FirstVersion,
Deleted: DefaultDeleteVersion,
Logs: make([]model.VersionLogElem, 0, len(eIds)),
Version: database.FirstVersion,
Deleted: database.DefaultDeleteVersion,
LastUpdate: now,
}
for _, eId := range eIds {
wl.Logs = append(wl.Logs, Elem{
wl.Logs = append(wl.Logs, model.VersionLogElem{
EID: eId,
Deleted: deleted,
Version: FirstVersion,
Version: database.FirstVersion,
LastUpdate: now,
})
}
@ -148,7 +85,7 @@ func (l *logModel) initDoc(ctx context.Context, dId string, eIds []string, delet
return &wl, err
}
func (l *logModel) writeLogBatch(ctx context.Context, dId string, eIds []string, deleted bool, now time.Time) (*mongo.UpdateResult, error) {
func (l *VersionLogMgo) writeLogBatch(ctx context.Context, dId string, eIds []string, deleted bool, now time.Time) (*mongo.UpdateResult, error) {
if eIds == nil {
eIds = []string{}
}
@ -208,23 +145,22 @@ func (l *logModel) writeLogBatch(ctx context.Context, dId string, eIds []string,
return mongoutil.UpdateMany(ctx, l.coll, filter, pipeline)
}
func (l *logModel) findDoc(ctx context.Context, dId string) (*WriteLog, error) {
res, err := mongoutil.FindOne[*WriteLog](ctx, l.coll, bson.M{"d_id": dId}, options.FindOne().SetProjection(bson.M{"logs": 0}))
func (l *VersionLogMgo) findDoc(ctx context.Context, dId string) (*model.VersionLog, error) {
vl, err := mongoutil.FindOne[*model.VersionLogTable](ctx, l.coll, bson.M{"d_id": dId}, options.FindOne().SetProjection(bson.M{"logs": 0}))
if err != nil {
return nil, err
}
res.queryDoc = true
return res, nil
return vl.VersionLog(), nil
}
func (l *logModel) FindChangeLog(ctx context.Context, dId string, version uint, limit int) (*WriteLog, error) {
func (l *VersionLogMgo) FindChangeLog(ctx context.Context, dId string, version uint, limit int) (*model.VersionLog, error) {
if wl, err := l.findChangeLog(ctx, dId, version, limit); err == nil {
return wl, nil
} else if !errors.Is(err, mongo.ErrNoDocuments) {
return nil, err
}
if res, err := l.initDoc(ctx, dId, nil, false, time.Now()); err == nil {
return res.WriteLog(), nil
return res.VersionLog(), nil
} else if mongo.IsDuplicateKeyError(err) {
return l.findChangeLog(ctx, dId, version, limit)
} else {
@ -232,7 +168,7 @@ func (l *logModel) FindChangeLog(ctx context.Context, dId string, version uint,
}
}
func (l *logModel) findChangeLog(ctx context.Context, dId string, version uint, limit int) (*WriteLog, error) {
func (l *VersionLogMgo) findChangeLog(ctx context.Context, dId string, version uint, limit int) (*model.VersionLog, error) {
if version == 0 && limit == 0 {
return l.findDoc(ctx, dId)
}
@ -293,17 +229,17 @@ func (l *logModel) findChangeLog(ctx context.Context, dId string, version uint,
if limit <= 0 {
pipeline = pipeline[:len(pipeline)-1]
}
res, err := mongoutil.Aggregate[*WriteLog](ctx, l.coll, pipeline)
vl, err := mongoutil.Aggregate[*model.VersionLog](ctx, l.coll, pipeline)
if err != nil {
return nil, err
}
if len(res) == 0 {
if len(vl) == 0 {
return nil, mongo.ErrNoDocuments
}
return res[0], nil
return vl[0], nil
}
func (l *logModel) DeleteAfterUnchangedLog(ctx context.Context, deadline time.Time) error {
func (l *VersionLogMgo) DeleteAfterUnchangedLog(ctx context.Context, deadline time.Time) error {
return mongoutil.DeleteMany(ctx, l.coll, bson.M{
"last_update": bson.M{
"$lt": deadline,

@ -0,0 +1,18 @@
package database
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"time"
)
const (
FirstVersion = 1
DefaultDeleteVersion = 0
)
type VersionLog interface {
WriteLog(ctx context.Context, dId string, eIds []string, deleted bool) error
FindChangeLog(ctx context.Context, dId string, version uint, limit int) (*model.VersionLog, error)
DeleteAfterUnchangedLog(ctx context.Context, deadline time.Time) error
}

@ -36,10 +36,10 @@ func (u *User) GetFaceURL() string {
return u.FaceURL
}
func (u User) GetUserID() string {
func (u *User) GetUserID() string {
return u.UserID
}
func (u User) GetEx() string {
func (u *User) GetEx() string {
return u.Ex
}

@ -0,0 +1,61 @@
package model
import (
"go.mongodb.org/mongo-driver/bson/primitive"
"time"
)
type VersionLogElem struct {
EID string `bson:"e_id"`
Deleted bool `bson:"deleted"`
Version uint `bson:"version"`
LastUpdate time.Time `bson:"last_update"`
}
type VersionLogTable struct {
ID primitive.ObjectID `bson:"_id"`
DID string `bson:"d_id"`
Logs []VersionLogElem `bson:"logs"`
Version uint `bson:"version"`
Deleted uint `bson:"deleted"`
LastUpdate time.Time `bson:"last_update"`
}
func (v *VersionLogTable) VersionLog() *VersionLog {
return &VersionLog{
ID: v.ID,
DID: v.DID,
Logs: v.Logs,
Version: v.Version,
Deleted: v.Deleted,
LastUpdate: v.LastUpdate,
LogLen: 0,
queryDoc: true,
}
}
type VersionLog struct {
ID primitive.ObjectID `bson:"_id"`
DID string `bson:"d_id"`
Logs []VersionLogElem `bson:"logs"`
Version uint `bson:"version"`
Deleted uint `bson:"deleted"`
LastUpdate time.Time `bson:"last_update"`
LogLen int `bson:"log_len"`
queryDoc bool `bson:"-"`
}
func (w *VersionLog) Full() bool {
return w.queryDoc || w.Version == 0 || len(w.Logs) != w.LogLen
}
func (w *VersionLog) DeleteAndChangeIDs() (delIds []string, changeIds []string) {
for _, l := range w.Logs {
if l.Deleted {
delIds = append(delIds, l.EID)
} else {
changeIds = append(changeIds, l.EID)
}
}
return
}
Loading…
Cancel
Save