diff --git a/pkg/common/db/dataver/common.go b/pkg/common/db/dataver/common.go index c146600fb..c7e4658ab 100644 --- a/pkg/common/db/dataver/common.go +++ b/pkg/common/db/dataver/common.go @@ -24,6 +24,23 @@ type WriteLog struct { LogLen int `bson:"log_len"` } +func (w *WriteLog) Full() bool { + if w.Version == 0 { + return true + } + return len(w.Logs) != w.LogLen +} + +func (w *WriteLog) DeleteEId() []string { + var eIds []string + for _, l := range w.Logs { + if l.Deleted { + eIds = append(eIds, l.EID) + } + } + return eIds +} + type Elem struct { EID string `bson:"e_id"` Deleted bool `bson:"deleted"` @@ -33,7 +50,7 @@ type Elem struct { type DataLog interface { WriteLog(ctx context.Context, dId string, eIds []string, deleted bool) error - FindChangeLog(ctx context.Context, did string, version uint, limit int) (*WriteLog, error) + FindChangeLog(ctx context.Context, dId string, version uint, limit int) (*WriteLog, error) DeleteAfterUnchangedLog(ctx context.Context, deadline time.Time) error } @@ -173,11 +190,11 @@ func (l *logModel) writeLogBatch(ctx context.Context, dId string, eIds []string, return mongoutil.UpdateMany(ctx, l.coll, filter, pipeline) } -func (l *logModel) FindChangeLog(ctx context.Context, did string, version uint, limit int) (*WriteLog, error) { +func (l *logModel) FindChangeLog(ctx context.Context, dId string, version uint, limit int) (*WriteLog, error) { pipeline := []bson.M{ { "$match": bson.M{ - "d_id": did, + "d_id": dId, }, }, { diff --git a/pkg/common/db/dataver/result.go b/pkg/common/db/dataver/result.go new file mode 100644 index 000000000..b6e6d22b4 --- /dev/null +++ b/pkg/common/db/dataver/result.go @@ -0,0 +1,32 @@ +package dataver + +type SyncResult[T any] struct { + Version uint + DeleteEID []string + Changes []T + Full bool +} + +func NewSyncResult[T any](wl *WriteLog, find func(eIds []string) ([]T, error)) (*SyncResult[T], error) { + var findEIDs []string + var res SyncResult[T] + if wl.Full() { + res.Full = true + } else { + for _, l := range wl.Logs { + if l.Deleted { + res.DeleteEID = append(res.DeleteEID, l.EID) + } else { + findEIDs = append(findEIDs, l.EID) + } + } + } + if res.Full || len(findEIDs) > 0 { + var err error + res.Changes, err = find(findEIDs) + if err != nil { + return nil, err + } + } + return &res, nil +} diff --git a/pkg/common/db/mgo/friend.go b/pkg/common/db/mgo/friend.go index a44244fca..78e74edd3 100644 --- a/pkg/common/db/mgo/friend.go +++ b/pkg/common/db/mgo/friend.go @@ -16,6 +16,7 @@ package mgo import ( "context" + "errors" "github.com/openimsdk/open-im-server/v3/pkg/common/db/dataver" "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" @@ -182,6 +183,20 @@ func (f *FriendMgo) UpdateFriends(ctx context.Context, ownerUserID string, frien }) } +func (f *FriendMgo) IncrSync(ctx context.Context, ownerUserID string, version uint, limit int) (*dataver.SyncResult[*relation.FriendModel], error) { + res, err := f.owner.FindChangeLog(ctx, ownerUserID, version, limit) + if err != nil { + return nil, err + } + return dataver.NewSyncResult[*relation.FriendModel](res, func(eIds []string) ([]*relation.FriendModel, error) { + if len(eIds) == 0 { + return nil, errors.New("todo") + } else { + return f.FindFriends(ctx, ownerUserID, eIds) + } + }) +} + func Success(fns ...func() error) error { for _, fn := range fns { if err := fn(); err != nil { diff --git a/pkg/common/listdemo2/common.go b/pkg/common/listdemo2/common.go index 2267d8630..40419d533 100644 --- a/pkg/common/listdemo2/common.go +++ b/pkg/common/listdemo2/common.go @@ -70,54 +70,6 @@ func (l *LogModel) InitIndex(ctx context.Context) error { return err } -func (l *LogModel) WriteLog1(ctx context.Context, dId string, eId string, deleted bool) { - if err := l.WriteLog(ctx, dId, eId, deleted); err != nil { - panic(err) - } -} - -func (l *LogModel) WriteLogBatch1(ctx context.Context, dId string, eIds []string, deleted bool) { - if err := l.WriteLogBatch(ctx, dId, eIds, deleted); err != nil { - panic(err) - } -} - -func (l *LogModel) WriteLog(ctx context.Context, dId string, eId string, deleted bool) error { - now := time.Now() - res, err := l.writeLog(ctx, dId, eId, deleted, now) - if err != nil { - return err - } - if res.MatchedCount > 0 { - return nil - } - wl := WriteLog{ - DID: dId, - Logs: []LogElem{ - { - EID: eId, - Deleted: deleted, - Version: FirstVersion, - LastUpdate: now, - }, - }, - Version: FirstVersion, - Deleted: DefaultDeleteVersion, - LastUpdate: now, - } - if _, err := l.coll.InsertOne(ctx, &wl); err == nil { - return nil - } else if !mongo.IsDuplicateKeyError(err) { - return err - } - if res, err := l.writeLog(ctx, dId, eId, deleted, now); err != nil { - return err - } else if res.ModifiedCount == 0 { - return errs.ErrInternalServer.WrapMsg("mongodb return value that should not occur", "coll", l.coll.Name(), "dId", dId, "eId", eId) - } - return nil -} - func (l *LogModel) writeLog(ctx context.Context, dId string, eId string, deleted bool, now time.Time) (*mongo.UpdateResult, error) { filter := bson.M{ "d_id": dId, @@ -356,7 +308,7 @@ func (l *LogModel) FindChangeLog(ctx context.Context, did string, version uint, return nil, err } if len(res) == 0 { - return nil, ErrNotFound + return &WriteLogLen{}, nil } return res[0], nil }