commit
492f7a0ee9
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,207 @@
|
|||||||
|
package incrversion
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
|
"github.com/openimsdk/tools/errs"
|
||||||
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||||
|
)
|
||||||
|
|
||||||
|
type BatchOption[A, B any] struct {
|
||||||
|
Ctx context.Context
|
||||||
|
TargetKeys []string
|
||||||
|
VersionIDs []string
|
||||||
|
VersionNumbers []uint64
|
||||||
|
//SyncLimit int
|
||||||
|
Versions func(ctx context.Context, dIds []string, versions []uint64, limits []int) (map[string]*model.VersionLog, error)
|
||||||
|
CacheMaxVersions func(ctx context.Context, dIds []string) (map[string]*model.VersionLog, error)
|
||||||
|
Find func(ctx context.Context, dId string, ids []string) (A, error)
|
||||||
|
Resp func(versionsMap map[string]*model.VersionLog, deleteIdsMap map[string][]string, insertListMap, updateListMap map[string]A, fullMap map[string]bool) *B
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *BatchOption[A, B]) newError(msg string) error {
|
||||||
|
return errs.ErrInternalServer.WrapMsg(msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *BatchOption[A, B]) check() error {
|
||||||
|
if o.Ctx == nil {
|
||||||
|
return o.newError("opt ctx is nil")
|
||||||
|
}
|
||||||
|
if len(o.TargetKeys) == 0 {
|
||||||
|
return o.newError("targetKeys is empty")
|
||||||
|
}
|
||||||
|
if o.Versions == nil {
|
||||||
|
return o.newError("func versions is nil")
|
||||||
|
}
|
||||||
|
if o.Find == nil {
|
||||||
|
return o.newError("func find is nil")
|
||||||
|
}
|
||||||
|
if o.Resp == nil {
|
||||||
|
return o.newError("func resp is nil")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *BatchOption[A, B]) validVersions() []bool {
|
||||||
|
valids := make([]bool, len(o.VersionIDs))
|
||||||
|
for i, versionID := range o.VersionIDs {
|
||||||
|
objID, err := primitive.ObjectIDFromHex(versionID)
|
||||||
|
valids[i] = (err == nil && (!objID.IsZero()) && o.VersionNumbers[i] > 0)
|
||||||
|
}
|
||||||
|
return valids
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *BatchOption[A, B]) equalIDs(objIDs []primitive.ObjectID) []bool {
|
||||||
|
equals := make([]bool, len(o.VersionIDs))
|
||||||
|
for i, versionID := range o.VersionIDs {
|
||||||
|
equals[i] = versionID == objIDs[i].Hex()
|
||||||
|
}
|
||||||
|
return equals
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *BatchOption[A, B]) getVersions(tags *[]int) (versions map[string]*model.VersionLog, err error) {
|
||||||
|
var dIDs []string
|
||||||
|
var versionNums []uint64
|
||||||
|
var limits []int
|
||||||
|
|
||||||
|
valids := o.validVersions()
|
||||||
|
|
||||||
|
if o.CacheMaxVersions == nil {
|
||||||
|
for i, valid := range valids {
|
||||||
|
if valid {
|
||||||
|
(*tags)[i] = tagQuery
|
||||||
|
dIDs = append(dIDs, o.TargetKeys[i])
|
||||||
|
versionNums = append(versionNums, o.VersionNumbers[i])
|
||||||
|
limits = append(limits, syncLimit)
|
||||||
|
} else {
|
||||||
|
(*tags)[i] = tagFull
|
||||||
|
dIDs = append(dIDs, o.TargetKeys[i])
|
||||||
|
versionNums = append(versionNums, 0)
|
||||||
|
limits = append(limits, 0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
versions, err = o.Versions(o.Ctx, dIDs, versionNums, limits)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errs.Wrap(err)
|
||||||
|
}
|
||||||
|
return versions, nil
|
||||||
|
|
||||||
|
} else {
|
||||||
|
caches, err := o.CacheMaxVersions(o.Ctx, o.TargetKeys)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errs.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
objIDs := make([]primitive.ObjectID, len(o.VersionIDs))
|
||||||
|
|
||||||
|
for i, versionID := range o.VersionIDs {
|
||||||
|
objID, _ := primitive.ObjectIDFromHex(versionID)
|
||||||
|
objIDs[i] = objID
|
||||||
|
}
|
||||||
|
|
||||||
|
equals := o.equalIDs(objIDs)
|
||||||
|
for i, valid := range valids {
|
||||||
|
if !valid {
|
||||||
|
(*tags)[i] = tagFull
|
||||||
|
} else if !equals[i] {
|
||||||
|
(*tags)[i] = tagFull
|
||||||
|
} else if o.VersionNumbers[i] == uint64(caches[o.TargetKeys[i]].Version) {
|
||||||
|
(*tags)[i] = tagEqual
|
||||||
|
} else {
|
||||||
|
(*tags)[i] = tagQuery
|
||||||
|
dIDs = append(dIDs, o.TargetKeys[i])
|
||||||
|
versionNums = append(versionNums, o.VersionNumbers[i])
|
||||||
|
limits = append(limits, syncLimit)
|
||||||
|
|
||||||
|
delete(caches, o.TargetKeys[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if dIDs != nil {
|
||||||
|
versionMap, err := o.Versions(o.Ctx, dIDs, versionNums, limits)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errs.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for k, v := range versionMap {
|
||||||
|
caches[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
versions = caches
|
||||||
|
}
|
||||||
|
return versions, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *BatchOption[A, B]) Build() (*B, error) {
|
||||||
|
if err := o.check(); err != nil {
|
||||||
|
return nil, errs.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tags := make([]int, len(o.TargetKeys))
|
||||||
|
versions, err := o.getVersions(&tags)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errs.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fullMap := make(map[string]bool)
|
||||||
|
for i, tag := range tags {
|
||||||
|
switch tag {
|
||||||
|
case tagQuery:
|
||||||
|
vLog := versions[o.TargetKeys[i]]
|
||||||
|
fullMap[o.TargetKeys[i]] = vLog.ID.Hex() != o.VersionIDs[i] || uint64(vLog.Version) < o.VersionNumbers[i] || len(vLog.Logs) != vLog.LogLen
|
||||||
|
case tagFull:
|
||||||
|
fullMap[o.TargetKeys[i]] = true
|
||||||
|
case tagEqual:
|
||||||
|
fullMap[o.TargetKeys[i]] = false
|
||||||
|
default:
|
||||||
|
panic(fmt.Errorf("undefined tag %d", tag))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
insertIdsMap = make(map[string][]string)
|
||||||
|
deleteIdsMap = make(map[string][]string)
|
||||||
|
updateIdsMap = make(map[string][]string)
|
||||||
|
)
|
||||||
|
|
||||||
|
for _, targetKey := range o.TargetKeys {
|
||||||
|
if !fullMap[targetKey] {
|
||||||
|
version := versions[targetKey]
|
||||||
|
insertIds, deleteIds, updateIds := version.DeleteAndChangeIDs()
|
||||||
|
insertIdsMap[targetKey] = insertIds
|
||||||
|
deleteIdsMap[targetKey] = deleteIds
|
||||||
|
updateIdsMap[targetKey] = updateIds
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
insertListMap = make(map[string]A)
|
||||||
|
updateListMap = make(map[string]A)
|
||||||
|
)
|
||||||
|
|
||||||
|
for targetKey, insertIds := range insertIdsMap {
|
||||||
|
if len(insertIds) > 0 {
|
||||||
|
insertList, err := o.Find(o.Ctx, targetKey, insertIds)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errs.Wrap(err)
|
||||||
|
}
|
||||||
|
insertListMap[targetKey] = insertList
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for targetKey, updateIds := range updateIdsMap {
|
||||||
|
if len(updateIds) > 0 {
|
||||||
|
updateList, err := o.Find(o.Ctx, targetKey, updateIds)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errs.Wrap(err)
|
||||||
|
}
|
||||||
|
updateListMap[targetKey] = updateList
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return o.Resp(versions, deleteIdsMap, insertListMap, updateListMap, fullMap), nil
|
||||||
|
}
|
@ -0,0 +1,10 @@
|
|||||||
|
package prommetrics
|
||||||
|
|
||||||
|
import "github.com/prometheus/client_golang/prometheus"
|
||||||
|
|
||||||
|
var (
|
||||||
|
UserRegisterCounter = prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Name: "user_register_total",
|
||||||
|
Help: "The number of user login",
|
||||||
|
})
|
||||||
|
)
|
@ -1,189 +0,0 @@
|
|||||||
// Copyright © 2023 OpenIM. All rights reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package mgo
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
|
||||||
|
|
||||||
"github.com/openimsdk/tools/errs"
|
|
||||||
"go.mongodb.org/mongo-driver/bson"
|
|
||||||
"go.mongodb.org/mongo-driver/mongo"
|
|
||||||
"go.mongodb.org/mongo-driver/mongo/options"
|
|
||||||
)
|
|
||||||
|
|
||||||
// prefixes and suffixes.
|
|
||||||
const (
|
|
||||||
SubscriptionPrefix = "subscription_prefix"
|
|
||||||
SubscribedPrefix = "subscribed_prefix"
|
|
||||||
)
|
|
||||||
|
|
||||||
// MaximumSubscription Maximum number of subscriptions.
|
|
||||||
const (
|
|
||||||
MaximumSubscription = 3000
|
|
||||||
)
|
|
||||||
|
|
||||||
func NewUserMongoDriver(database *mongo.Database) database.SubscribeUser {
|
|
||||||
return &UserMongoDriver{
|
|
||||||
userCollection: database.Collection(model.SubscribeUserTableName),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type UserMongoDriver struct {
|
|
||||||
userCollection *mongo.Collection
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddSubscriptionList Subscriber's handling of thresholds.
|
|
||||||
func (u *UserMongoDriver) AddSubscriptionList(ctx context.Context, userID string, userIDList []string) error {
|
|
||||||
// Check the number of lists in the key.
|
|
||||||
pipeline := mongo.Pipeline{
|
|
||||||
{{"$match", bson.D{{"user_id", SubscriptionPrefix + userID}}}},
|
|
||||||
{{"$project", bson.D{{"count", bson.D{{"$size", "$user_id_list"}}}}}},
|
|
||||||
}
|
|
||||||
// perform aggregate operations
|
|
||||||
cursor, err := u.userCollection.Aggregate(ctx, pipeline)
|
|
||||||
if err != nil {
|
|
||||||
return errs.Wrap(err)
|
|
||||||
}
|
|
||||||
defer cursor.Close(ctx)
|
|
||||||
var cnt struct {
|
|
||||||
Count int `bson:"count"`
|
|
||||||
}
|
|
||||||
// iterate over aggregated results
|
|
||||||
for cursor.Next(ctx) {
|
|
||||||
err = cursor.Decode(&cnt)
|
|
||||||
if err != nil {
|
|
||||||
return errs.Wrap(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
var newUserIDList []string
|
|
||||||
// If the threshold is exceeded, pop out the previous MaximumSubscription - len(userIDList) and insert it.
|
|
||||||
if cnt.Count+len(userIDList) > MaximumSubscription {
|
|
||||||
newUserIDList, err = u.GetAllSubscribeList(ctx, userID)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
newUserIDList = newUserIDList[MaximumSubscription-len(userIDList):]
|
|
||||||
_, err = u.userCollection.UpdateOne(
|
|
||||||
ctx,
|
|
||||||
bson.M{"user_id": SubscriptionPrefix + userID},
|
|
||||||
bson.M{"$set": bson.M{"user_id_list": newUserIDList}},
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// Another way to subscribe to N before pop,Delete after testing
|
|
||||||
/*for i := 1; i <= MaximumSubscription-len(userIDList); i++ {
|
|
||||||
_, err := u.userCollection.UpdateOne(
|
|
||||||
ctx,
|
|
||||||
bson.M{"user_id": SubscriptionPrefix + userID},
|
|
||||||
bson.M{SubscriptionPrefix + userID: bson.M{"$pop": -1}},
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}*/
|
|
||||||
}
|
|
||||||
upsert := true
|
|
||||||
opts := &options.UpdateOptions{
|
|
||||||
Upsert: &upsert,
|
|
||||||
}
|
|
||||||
_, err = u.userCollection.UpdateOne(
|
|
||||||
ctx,
|
|
||||||
bson.M{"user_id": SubscriptionPrefix + userID},
|
|
||||||
bson.M{"$addToSet": bson.M{"user_id_list": bson.M{"$each": userIDList}}},
|
|
||||||
opts,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return errs.Wrap(err)
|
|
||||||
}
|
|
||||||
for _, user := range userIDList {
|
|
||||||
_, err = u.userCollection.UpdateOne(
|
|
||||||
ctx,
|
|
||||||
bson.M{"user_id": SubscribedPrefix + user},
|
|
||||||
bson.M{"$addToSet": bson.M{"user_id_list": userID}},
|
|
||||||
opts,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return errs.WrapMsg(err, "transaction failed")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// UnsubscriptionList Handling of unsubscribe.
|
|
||||||
func (u *UserMongoDriver) UnsubscriptionList(ctx context.Context, userID string, userIDList []string) error {
|
|
||||||
_, err := u.userCollection.UpdateOne(
|
|
||||||
ctx,
|
|
||||||
bson.M{"user_id": SubscriptionPrefix + userID},
|
|
||||||
bson.M{"$pull": bson.M{"user_id_list": bson.M{"$in": userIDList}}},
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return errs.Wrap(err)
|
|
||||||
}
|
|
||||||
err = u.RemoveSubscribedListFromUser(ctx, userID, userIDList)
|
|
||||||
if err != nil {
|
|
||||||
return errs.Wrap(err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RemoveSubscribedListFromUser Among the unsubscribed users, delete the user from the subscribed list.
|
|
||||||
func (u *UserMongoDriver) RemoveSubscribedListFromUser(ctx context.Context, userID string, userIDList []string) error {
|
|
||||||
var err error
|
|
||||||
for _, userIDTemp := range userIDList {
|
|
||||||
_, err = u.userCollection.UpdateOne(
|
|
||||||
ctx,
|
|
||||||
bson.M{"user_id": SubscribedPrefix + userIDTemp},
|
|
||||||
bson.M{"$pull": bson.M{"user_id_list": userID}},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
return errs.Wrap(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetAllSubscribeList Get all users subscribed by this user.
|
|
||||||
func (u *UserMongoDriver) GetAllSubscribeList(ctx context.Context, userID string) (userIDList []string, err error) {
|
|
||||||
var user model.SubscribeUser
|
|
||||||
cursor := u.userCollection.FindOne(
|
|
||||||
ctx,
|
|
||||||
bson.M{"user_id": SubscriptionPrefix + userID})
|
|
||||||
err = cursor.Decode(&user)
|
|
||||||
if err != nil {
|
|
||||||
if err == mongo.ErrNoDocuments {
|
|
||||||
return []string{}, nil
|
|
||||||
} else {
|
|
||||||
return nil, errs.Wrap(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return user.UserIDList, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetSubscribedList Get the user subscribed by those users.
|
|
||||||
func (u *UserMongoDriver) GetSubscribedList(ctx context.Context, userID string) (userIDList []string, err error) {
|
|
||||||
var user model.SubscribeUser
|
|
||||||
cursor := u.userCollection.FindOne(
|
|
||||||
ctx,
|
|
||||||
bson.M{"user_id": SubscribedPrefix + userID})
|
|
||||||
err = cursor.Decode(&user)
|
|
||||||
if err != nil {
|
|
||||||
if err == mongo.ErrNoDocuments {
|
|
||||||
return []string{}, nil
|
|
||||||
} else {
|
|
||||||
return nil, errs.Wrap(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return user.UserIDList, nil
|
|
||||||
}
|
|
@ -1,31 +0,0 @@
|
|||||||
// Copyright © 2023 OpenIM. All rights reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package database
|
|
||||||
|
|
||||||
import "context"
|
|
||||||
|
|
||||||
// SubscribeUser Operation interface of user mongodb.
|
|
||||||
type SubscribeUser interface {
|
|
||||||
// AddSubscriptionList Subscriber's handling of thresholds.
|
|
||||||
AddSubscriptionList(ctx context.Context, userID string, userIDList []string) error
|
|
||||||
// UnsubscriptionList Handling of unsubscribe.
|
|
||||||
UnsubscriptionList(ctx context.Context, userID string, userIDList []string) error
|
|
||||||
// RemoveSubscribedListFromUser Among the unsubscribed users, delete the user from the subscribed list.
|
|
||||||
RemoveSubscribedListFromUser(ctx context.Context, userID string, userIDList []string) error
|
|
||||||
// GetAllSubscribeList Get all users subscribed by this user
|
|
||||||
GetAllSubscribeList(ctx context.Context, id string) (userIDList []string, err error)
|
|
||||||
// GetSubscribedList Get the user subscribed by those users
|
|
||||||
GetSubscribedList(ctx context.Context, id string) (userIDList []string, err error)
|
|
||||||
}
|
|
Loading…
Reference in new issue