pull/3142/head
parent
04bac6ddb7
commit
65dbe8ef6a
@ -1,15 +0,0 @@
|
||||
// Copyright © 2024 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package redispubsub // import "github.com/openimsdk/open-im-server/v3/pkg/common/redispubsub"
|
@ -1,30 +0,0 @@
|
||||
// Copyright © 2024 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package redispubsub
|
||||
|
||||
import "github.com/redis/go-redis/v9"
|
||||
|
||||
type Publisher struct {
|
||||
client redis.UniversalClient
|
||||
channel string
|
||||
}
|
||||
|
||||
func NewPublisher(client redis.UniversalClient, channel string) *Publisher {
|
||||
return &Publisher{client: client, channel: channel}
|
||||
}
|
||||
|
||||
func (p *Publisher) Publish(message string) error {
|
||||
return p.client.Publish(ctx, p.channel, message).Err()
|
||||
}
|
@ -1,49 +0,0 @@
|
||||
// Copyright © 2024 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package redispubsub
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
var ctx = context.Background()
|
||||
|
||||
type Subscriber struct {
|
||||
client redis.UniversalClient
|
||||
channel string
|
||||
}
|
||||
|
||||
func NewSubscriber(client redis.UniversalClient, channel string) *Subscriber {
|
||||
return &Subscriber{client: client, channel: channel}
|
||||
}
|
||||
|
||||
func (s *Subscriber) OnMessage(ctx context.Context, callback func(string)) error {
|
||||
messageChannel := s.client.Subscribe(ctx, s.channel).Channel()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case msg := <-messageChannel:
|
||||
callback(msg.Payload)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
@ -0,0 +1,105 @@
|
||||
package mcache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
||||
"github.com/openimsdk/tools/log"
|
||||
"github.com/openimsdk/tools/s3/minio"
|
||||
)
|
||||
|
||||
func NewMinioCache(cache database.Cache) minio.Cache {
|
||||
return &minioCache{
|
||||
cache: cache,
|
||||
expireTime: time.Hour * 24 * 7,
|
||||
}
|
||||
}
|
||||
|
||||
type minioCache struct {
|
||||
cache database.Cache
|
||||
expireTime time.Duration
|
||||
}
|
||||
|
||||
func (g *minioCache) getObjectImageInfoKey(key string) string {
|
||||
return cachekey.GetObjectImageInfoKey(key)
|
||||
}
|
||||
|
||||
func (g *minioCache) getMinioImageThumbnailKey(key string, format string, width int, height int) string {
|
||||
return cachekey.GetMinioImageThumbnailKey(key, format, width, height)
|
||||
}
|
||||
|
||||
func (g *minioCache) DelObjectImageInfoKey(ctx context.Context, keys ...string) error {
|
||||
ks := make([]string, 0, len(keys))
|
||||
for _, key := range keys {
|
||||
ks = append(ks, g.getObjectImageInfoKey(key))
|
||||
}
|
||||
return g.cache.Del(ctx, ks)
|
||||
}
|
||||
|
||||
func (g *minioCache) DelImageThumbnailKey(ctx context.Context, key string, format string, width int, height int) error {
|
||||
return g.cache.Del(ctx, []string{g.getMinioImageThumbnailKey(key, format, width, height)})
|
||||
}
|
||||
|
||||
func (g *minioCache) GetImageObjectKeyInfo(ctx context.Context, key string, fn func(ctx context.Context) (*minio.ImageInfo, error)) (*minio.ImageInfo, error) {
|
||||
return getCache[*minio.ImageInfo](ctx, g.cache, g.getObjectImageInfoKey(key), g.expireTime, fn)
|
||||
}
|
||||
|
||||
func (g *minioCache) GetThumbnailKey(ctx context.Context, key string, format string, width int, height int, minioCache func(ctx context.Context) (string, error)) (string, error) {
|
||||
return getCache[string](ctx, g.cache, g.getMinioImageThumbnailKey(key, format, width, height), g.expireTime, minioCache)
|
||||
}
|
||||
|
||||
func getCache[V any](ctx context.Context, cache database.Cache, key string, expireTime time.Duration, fn func(ctx context.Context) (V, error)) (V, error) {
|
||||
getDB := func() (V, bool, error) {
|
||||
res, err := cache.Get(ctx, []string{key})
|
||||
if err != nil {
|
||||
var val V
|
||||
return val, false, err
|
||||
}
|
||||
var val V
|
||||
if str, ok := res[key]; ok {
|
||||
if json.Unmarshal([]byte(str), &val) != nil {
|
||||
return val, false, err
|
||||
}
|
||||
return val, true, nil
|
||||
}
|
||||
return val, false, nil
|
||||
}
|
||||
dbVal, ok, err := getDB()
|
||||
if err != nil {
|
||||
return dbVal, err
|
||||
}
|
||||
if ok {
|
||||
return dbVal, nil
|
||||
}
|
||||
lockValue, err := cache.Lock(ctx, key, time.Minute)
|
||||
if err != nil {
|
||||
return dbVal, err
|
||||
}
|
||||
defer func() {
|
||||
if err := cache.Unlock(ctx, key, lockValue); err != nil {
|
||||
log.ZError(ctx, "unlock cache key", err, "key", key, "value", lockValue)
|
||||
}
|
||||
}()
|
||||
dbVal, ok, err = getDB()
|
||||
if err != nil {
|
||||
return dbVal, err
|
||||
}
|
||||
if ok {
|
||||
return dbVal, nil
|
||||
}
|
||||
val, err := fn(ctx)
|
||||
if err != nil {
|
||||
return val, err
|
||||
}
|
||||
data, err := json.Marshal(val)
|
||||
if err != nil {
|
||||
return val, err
|
||||
}
|
||||
if err := cache.Set(ctx, key, string(data), expireTime); err != nil {
|
||||
return val, err
|
||||
}
|
||||
return val, nil
|
||||
}
|
@ -0,0 +1,74 @@
|
||||
package mcache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
|
||||
)
|
||||
|
||||
func NewOnlineCache() cache.OnlineCache {
|
||||
return &onlineCache{
|
||||
user: make(map[string]map[int32]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
type onlineCache struct {
|
||||
lock sync.RWMutex
|
||||
user map[string]map[int32]struct{}
|
||||
}
|
||||
|
||||
func (x *onlineCache) GetOnline(ctx context.Context, userID string) ([]int32, error) {
|
||||
x.lock.RLock()
|
||||
defer x.lock.RUnlock()
|
||||
pSet, ok := x.user[userID]
|
||||
if !ok {
|
||||
return nil, nil
|
||||
}
|
||||
res := make([]int32, 0, len(pSet))
|
||||
for k := range pSet {
|
||||
res = append(res, k)
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (x *onlineCache) SetUserOnline(ctx context.Context, userID string, online, offline []int32) error {
|
||||
x.lock.Lock()
|
||||
defer x.lock.Unlock()
|
||||
pSet, ok := x.user[userID]
|
||||
if ok {
|
||||
for _, p := range offline {
|
||||
delete(pSet, p)
|
||||
}
|
||||
}
|
||||
if len(online) > 0 {
|
||||
if !ok {
|
||||
pSet = make(map[int32]struct{})
|
||||
x.user[userID] = pSet
|
||||
}
|
||||
for _, p := range online {
|
||||
pSet[p] = struct{}{}
|
||||
}
|
||||
}
|
||||
if len(pSet) == 0 {
|
||||
delete(x.user, userID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *onlineCache) GetAllOnlineUsers(ctx context.Context, cursor uint64) (map[string][]int32, uint64, error) {
|
||||
if cursor != 0 {
|
||||
return nil, 0, nil
|
||||
}
|
||||
x.lock.RLock()
|
||||
defer x.lock.RUnlock()
|
||||
res := make(map[string][]int32)
|
||||
for k, v := range x.user {
|
||||
pSet := make([]int32, 0, len(v))
|
||||
for p := range v {
|
||||
pSet = append(pSet, p)
|
||||
}
|
||||
res[k] = pSet
|
||||
}
|
||||
return res, 0, nil
|
||||
}
|
@ -0,0 +1,98 @@
|
||||
package mcache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
||||
"github.com/openimsdk/tools/errs"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
func NewThirdCache(cache database.Cache) cache.ThirdCache {
|
||||
return &thirdCache{
|
||||
cache: cache,
|
||||
}
|
||||
}
|
||||
|
||||
type thirdCache struct {
|
||||
cache database.Cache
|
||||
}
|
||||
|
||||
func (c *thirdCache) getGetuiTokenKey() string {
|
||||
return cachekey.GetGetuiTokenKey()
|
||||
}
|
||||
|
||||
func (c *thirdCache) getGetuiTaskIDKey() string {
|
||||
return cachekey.GetGetuiTaskIDKey()
|
||||
}
|
||||
|
||||
func (c *thirdCache) getUserBadgeUnreadCountSumKey(userID string) string {
|
||||
return cachekey.GetUserBadgeUnreadCountSumKey(userID)
|
||||
}
|
||||
|
||||
func (c *thirdCache) getFcmAccountTokenKey(account string, platformID int) string {
|
||||
return cachekey.GetFcmAccountTokenKey(account, platformID)
|
||||
}
|
||||
|
||||
func (c *thirdCache) get(ctx context.Context, key string) (string, error) {
|
||||
res, err := c.cache.Get(ctx, []string{key})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if val, ok := res[key]; ok {
|
||||
return val, nil
|
||||
}
|
||||
return "", errs.Wrap(redis.Nil)
|
||||
}
|
||||
|
||||
func (c *thirdCache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) {
|
||||
return errs.Wrap(c.cache.Set(ctx, c.getFcmAccountTokenKey(account, platformID), fcmToken, time.Duration(expireTime)*time.Second))
|
||||
}
|
||||
|
||||
func (c *thirdCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) {
|
||||
return c.get(ctx, c.getFcmAccountTokenKey(account, platformID))
|
||||
}
|
||||
|
||||
func (c *thirdCache) DelFcmToken(ctx context.Context, account string, platformID int) error {
|
||||
return c.cache.Del(ctx, []string{c.getFcmAccountTokenKey(account, platformID)})
|
||||
}
|
||||
|
||||
func (c *thirdCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
|
||||
return c.cache.Incr(ctx, c.getUserBadgeUnreadCountSumKey(userID), 1)
|
||||
}
|
||||
|
||||
func (c *thirdCache) SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error {
|
||||
return c.cache.Set(ctx, c.getUserBadgeUnreadCountSumKey(userID), strconv.Itoa(value), 0)
|
||||
}
|
||||
|
||||
func (c *thirdCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
|
||||
str, err := c.get(ctx, c.getUserBadgeUnreadCountSumKey(userID))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
val, err := strconv.Atoi(str)
|
||||
if err != nil {
|
||||
return 0, errs.WrapMsg(err, "strconv.Atoi", "str", str)
|
||||
}
|
||||
return val, nil
|
||||
}
|
||||
|
||||
func (c *thirdCache) SetGetuiToken(ctx context.Context, token string, expireTime int64) error {
|
||||
return c.cache.Set(ctx, c.getGetuiTokenKey(), token, time.Duration(expireTime)*time.Second)
|
||||
}
|
||||
|
||||
func (c *thirdCache) GetGetuiToken(ctx context.Context) (string, error) {
|
||||
return c.get(ctx, c.getGetuiTokenKey())
|
||||
}
|
||||
|
||||
func (c *thirdCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error {
|
||||
return c.cache.Set(ctx, c.getGetuiTaskIDKey(), taskID, time.Duration(expireTime)*time.Second)
|
||||
}
|
||||
|
||||
func (c *thirdCache) GetGetuiTaskID(ctx context.Context) (string, error) {
|
||||
return c.get(ctx, c.getGetuiTaskIDKey())
|
||||
}
|
@ -0,0 +1,130 @@
|
||||
package mcache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
||||
"github.com/openimsdk/tools/errs"
|
||||
"github.com/openimsdk/tools/log"
|
||||
)
|
||||
|
||||
func NewTokenCacheModel(cache database.Cache, accessExpire int64) cache.TokenModel {
|
||||
c := &tokenCache{cache: cache}
|
||||
c.accessExpire = c.getExpireTime(accessExpire)
|
||||
return c
|
||||
}
|
||||
|
||||
type tokenCache struct {
|
||||
cache database.Cache
|
||||
accessExpire time.Duration
|
||||
}
|
||||
|
||||
func (x *tokenCache) getTokenKey(userID string, platformID int, token string) string {
|
||||
return cachekey.GetTokenKey(userID, platformID) + ":" + token
|
||||
|
||||
}
|
||||
|
||||
func (x *tokenCache) SetTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
|
||||
return x.cache.Set(ctx, x.getTokenKey(userID, platformID, token), strconv.Itoa(flag), x.accessExpire)
|
||||
}
|
||||
|
||||
// SetTokenFlagEx set token and flag with expire time
|
||||
func (x *tokenCache) SetTokenFlagEx(ctx context.Context, userID string, platformID int, token string, flag int) error {
|
||||
return x.SetTokenFlag(ctx, userID, platformID, token, flag)
|
||||
}
|
||||
|
||||
func (x *tokenCache) GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error) {
|
||||
prefix := x.getTokenKey(userID, platformID, "")
|
||||
m, err := x.cache.Prefix(ctx, prefix)
|
||||
if err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
}
|
||||
mm := make(map[string]int)
|
||||
for k, v := range m {
|
||||
state, err := strconv.Atoi(v)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "token value is not int", err, "value", v, "userID", userID, "platformID", platformID)
|
||||
continue
|
||||
}
|
||||
mm[strings.TrimPrefix(k, prefix)] = state
|
||||
}
|
||||
return mm, nil
|
||||
}
|
||||
|
||||
func (x *tokenCache) GetAllTokensWithoutError(ctx context.Context, userID string) (map[int]map[string]int, error) {
|
||||
prefix := cachekey.UidPidToken + userID + ":"
|
||||
tokens, err := x.cache.Prefix(ctx, prefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res := make(map[int]map[string]int)
|
||||
for key, flagStr := range tokens {
|
||||
flag, err := strconv.Atoi(flagStr)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "token value is not int", err, "key", key, "value", flagStr, "userID", userID)
|
||||
continue
|
||||
}
|
||||
arr := strings.SplitN(strings.TrimPrefix(key, prefix), ":", 2)
|
||||
if len(arr) != 2 {
|
||||
log.ZError(ctx, "token value is not int", err, "key", key, "value", flagStr, "userID", userID)
|
||||
continue
|
||||
}
|
||||
platformID, err := strconv.Atoi(arr[0])
|
||||
if err != nil {
|
||||
log.ZError(ctx, "token value is not int", err, "key", key, "value", flagStr, "userID", userID)
|
||||
continue
|
||||
}
|
||||
token := arr[1]
|
||||
if token == "" {
|
||||
log.ZError(ctx, "token value is not int", err, "key", key, "value", flagStr, "userID", userID)
|
||||
continue
|
||||
}
|
||||
tk, ok := res[platformID]
|
||||
if !ok {
|
||||
tk = make(map[string]int)
|
||||
res[platformID] = tk
|
||||
}
|
||||
tk[token] = flag
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (x *tokenCache) SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error {
|
||||
for token, flag := range m {
|
||||
err := x.SetTokenFlag(ctx, userID, platformID, token, flag)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *tokenCache) BatchSetTokenMapByUidPid(ctx context.Context, tokens map[string]map[string]any) error {
|
||||
for prefix, tokenFlag := range tokens {
|
||||
for token, flag := range tokenFlag {
|
||||
flagStr := fmt.Sprintf("%v", flag)
|
||||
if err := x.cache.Set(ctx, prefix+":"+token, flagStr, x.accessExpire); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *tokenCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error {
|
||||
keys := make([]string, 0, len(fields))
|
||||
for _, token := range fields {
|
||||
keys = append(keys, x.getTokenKey(userID, platformID, token))
|
||||
}
|
||||
return x.cache.Del(ctx, keys)
|
||||
}
|
||||
|
||||
func (x *tokenCache) getExpireTime(t int64) time.Duration {
|
||||
return time.Hour * 24 * time.Duration(t)
|
||||
}
|
@ -1,3 +0,0 @@
|
||||
package redis
|
||||
|
||||
// todo: token online third minio
|
@ -0,0 +1,16 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Cache interface {
|
||||
Get(ctx context.Context, key []string) (map[string]string, error)
|
||||
Prefix(ctx context.Context, prefix string) (map[string]string, error)
|
||||
Set(ctx context.Context, key string, value string, expireAt time.Duration) error
|
||||
Incr(ctx context.Context, key string, value int) (int, error)
|
||||
Del(ctx context.Context, key []string) error
|
||||
Lock(ctx context.Context, key string, duration time.Duration) (string, error)
|
||||
Unlock(ctx context.Context, key string, value string) error
|
||||
}
|
@ -0,0 +1,183 @@
|
||||
package mgo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
"github.com/openimsdk/tools/db/mongoutil"
|
||||
"github.com/openimsdk/tools/errs"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
)
|
||||
|
||||
func NewCacheMgo(db *mongo.Database) (*CacheMgo, error) {
|
||||
coll := db.Collection(database.CacheName)
|
||||
_, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
|
||||
{
|
||||
Keys: bson.D{
|
||||
{Key: "key", Value: 1},
|
||||
},
|
||||
Options: options.Index().SetUnique(true),
|
||||
},
|
||||
{
|
||||
Keys: bson.D{
|
||||
{Key: "expire_at", Value: 1},
|
||||
},
|
||||
Options: options.Index().SetExpireAfterSeconds(0),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
}
|
||||
return &CacheMgo{coll: coll}, nil
|
||||
}
|
||||
|
||||
type CacheMgo struct {
|
||||
coll *mongo.Collection
|
||||
}
|
||||
|
||||
func (x *CacheMgo) findToMap(res []model.Cache, now time.Time) map[string]string {
|
||||
kv := make(map[string]string)
|
||||
for _, re := range res {
|
||||
if re.ExpireAt != nil && re.ExpireAt.Before(now) {
|
||||
continue
|
||||
}
|
||||
kv[re.Key] = re.Value
|
||||
}
|
||||
return kv
|
||||
|
||||
}
|
||||
|
||||
func (x *CacheMgo) Get(ctx context.Context, key []string) (map[string]string, error) {
|
||||
if len(key) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
now := time.Now()
|
||||
res, err := mongoutil.Find[model.Cache](ctx, x.coll, bson.M{
|
||||
"key": bson.M{"$in": key},
|
||||
"$or": []bson.M{
|
||||
{"expire_at": bson.M{"$gt": now}},
|
||||
{"expire_at": nil},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return x.findToMap(res, now), nil
|
||||
}
|
||||
|
||||
func (x *CacheMgo) Prefix(ctx context.Context, prefix string) (map[string]string, error) {
|
||||
now := time.Now()
|
||||
res, err := mongoutil.Find[model.Cache](ctx, x.coll, bson.M{
|
||||
"key": bson.M{"$regex": "^" + prefix},
|
||||
"$or": []bson.M{
|
||||
{"expire_at": bson.M{"$gt": now}},
|
||||
{"expire_at": nil},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return x.findToMap(res, now), nil
|
||||
}
|
||||
|
||||
func (x *CacheMgo) Set(ctx context.Context, key string, value string, expireAt time.Duration) error {
|
||||
cv := &model.Cache{
|
||||
Key: key,
|
||||
Value: value,
|
||||
}
|
||||
if expireAt > 0 {
|
||||
now := time.Now().Add(expireAt)
|
||||
cv.ExpireAt = &now
|
||||
}
|
||||
opt := options.Update().SetUpsert(true)
|
||||
return mongoutil.UpdateOne(ctx, x.coll, bson.M{"key": key}, bson.M{"$set": cv}, false, opt)
|
||||
}
|
||||
|
||||
func (x *CacheMgo) Incr(ctx context.Context, key string, value int) (int, error) {
|
||||
pipeline := mongo.Pipeline{
|
||||
{
|
||||
{"$set", bson.M{
|
||||
"value": bson.M{
|
||||
"$toString": bson.M{
|
||||
"$add": bson.A{
|
||||
bson.M{"$toInt": "$value"},
|
||||
value,
|
||||
},
|
||||
},
|
||||
},
|
||||
}},
|
||||
},
|
||||
}
|
||||
opt := options.FindOneAndUpdate().SetReturnDocument(options.After)
|
||||
res, err := mongoutil.FindOneAndUpdate[model.Cache](ctx, x.coll, bson.M{"key": key}, pipeline, opt)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return strconv.Atoi(res.Value)
|
||||
}
|
||||
|
||||
func (x *CacheMgo) Del(ctx context.Context, key []string) error {
|
||||
if len(key) == 0 {
|
||||
return nil
|
||||
}
|
||||
_, err := x.coll.DeleteMany(ctx, bson.M{"key": bson.M{"$in": key}})
|
||||
return err
|
||||
}
|
||||
|
||||
func (x *CacheMgo) lockKey(key string) string {
|
||||
return "LOCK_" + key
|
||||
}
|
||||
|
||||
func (x *CacheMgo) Lock(ctx context.Context, key string, duration time.Duration) (string, error) {
|
||||
tmp, err := uuid.NewUUID()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if duration <= 0 || duration > time.Minute*10 {
|
||||
duration = time.Minute * 10
|
||||
}
|
||||
cv := &model.Cache{
|
||||
Key: x.lockKey(key),
|
||||
Value: tmp.String(),
|
||||
ExpireAt: nil,
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
|
||||
defer cancel()
|
||||
wait := func() error {
|
||||
timeout := time.NewTimer(time.Millisecond * 100)
|
||||
defer timeout.Stop()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-timeout.C:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
for {
|
||||
if err := mongoutil.DeleteOne(ctx, x.coll, bson.M{"key": key, "expire_at": bson.M{"$lt": time.Now()}}); err != nil {
|
||||
return "", err
|
||||
}
|
||||
expireAt := time.Now().Add(duration)
|
||||
cv.ExpireAt = &expireAt
|
||||
if err := mongoutil.InsertMany[*model.Cache](ctx, x.coll, []*model.Cache{cv}); err != nil {
|
||||
if mongo.IsDuplicateKeyError(err) {
|
||||
if err := wait(); err != nil {
|
||||
return "", err
|
||||
}
|
||||
continue
|
||||
}
|
||||
return "", err
|
||||
}
|
||||
return cv.Value, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (x *CacheMgo) Unlock(ctx context.Context, key string, value string) error {
|
||||
return mongoutil.DeleteOne(ctx, x.coll, bson.M{"key": x.lockKey(key), "value": value})
|
||||
}
|
@ -0,0 +1,133 @@
|
||||
package mgo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
"github.com/openimsdk/tools/db/mongoutil"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
)
|
||||
|
||||
func TestName1111(t *testing.T) {
|
||||
coll := Mongodb().Collection("temp")
|
||||
|
||||
//updatePipeline := mongo.Pipeline{
|
||||
// {
|
||||
// {"$set", bson.M{
|
||||
// "age": bson.M{
|
||||
// "$toString": bson.M{
|
||||
// "$add": bson.A{
|
||||
// bson.M{"$toInt": "$age"},
|
||||
// 1,
|
||||
// },
|
||||
// },
|
||||
// },
|
||||
// }},
|
||||
// },
|
||||
//}
|
||||
|
||||
pipeline := mongo.Pipeline{
|
||||
{
|
||||
{"$set", bson.M{
|
||||
"value": bson.M{
|
||||
"$toString": bson.M{
|
||||
"$add": bson.A{
|
||||
bson.M{"$toInt": "$value"},
|
||||
1,
|
||||
},
|
||||
},
|
||||
},
|
||||
}},
|
||||
},
|
||||
}
|
||||
|
||||
opt := options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After)
|
||||
res, err := mongoutil.FindOneAndUpdate[model.Cache](context.Background(), coll, bson.M{"key": "123456"}, pipeline, opt)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
t.Log(res)
|
||||
}
|
||||
|
||||
func TestName33333(t *testing.T) {
|
||||
c, err := NewCacheMgo(Mongodb())
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := c.Set(context.Background(), "123456", "123456", time.Hour); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err := c.Set(context.Background(), "123666", "123666", time.Hour); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
res1, err := c.Get(context.Background(), []string{"123456"})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
t.Log(res1)
|
||||
|
||||
res2, err := c.Prefix(context.Background(), "123")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
t.Log(res2)
|
||||
}
|
||||
|
||||
func TestName1111aa(t *testing.T) {
|
||||
|
||||
c, err := NewCacheMgo(Mongodb())
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
var count int
|
||||
|
||||
key := "123456"
|
||||
|
||||
doFunc := func() {
|
||||
value, err := c.Lock(context.Background(), key, time.Second*30)
|
||||
if err != nil {
|
||||
t.Log("Lock error", err)
|
||||
return
|
||||
}
|
||||
tmp := count
|
||||
tmp++
|
||||
count = tmp
|
||||
t.Log("count", tmp)
|
||||
if err := c.Unlock(context.Background(), key, value); err != nil {
|
||||
t.Log("Unlock error", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := c.Lock(context.Background(), key, time.Second*10); err != nil {
|
||||
t.Log(err)
|
||||
return
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 32; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < 100; i++ {
|
||||
doFunc()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
}
|
||||
|
||||
func TestName111111a(t *testing.T) {
|
||||
arr := strings.SplitN("1:testkakskdask:1111", ":", 2)
|
||||
t.Log(arr)
|
||||
}
|
@ -0,0 +1,9 @@
|
||||
package model
|
||||
|
||||
import "time"
|
||||
|
||||
type Cache struct {
|
||||
Key string `bson:"key"`
|
||||
Value string `bson:"value"`
|
||||
ExpireAt *time.Time `bson:"expire_at"`
|
||||
}
|
Loading…
Reference in new issue