commit
df808658b2
@ -1,2 +1,3 @@
|
|||||||
chatRecordsClearTime: "0 2 * * *"
|
cronExecuteTime: "0 2 * * *"
|
||||||
retainChatRecords: 365
|
retainChatRecords: 365
|
||||||
|
fileExpireTime: 90
|
||||||
|
@ -1,38 +1,30 @@
|
|||||||
// Copyright © 2024 OpenIM. All rights reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package cachekey
|
package cachekey
|
||||||
|
|
||||||
const (
|
const (
|
||||||
maxSeq = "MAX_SEQ:"
|
MallocSeq = "MALLOC_SEQ:"
|
||||||
minSeq = "MIN_SEQ:"
|
MallocMinSeqLock = "MALLOC_MIN_SEQ:"
|
||||||
conversationUserMinSeq = "CON_USER_MIN_SEQ:"
|
|
||||||
hasReadSeq = "HAS_READ_SEQ:"
|
SeqUserMaxSeq = "SEQ_USER_MAX:"
|
||||||
|
SeqUserMinSeq = "SEQ_USER_MIN:"
|
||||||
|
SeqUserReadSeq = "SEQ_USER_READ:"
|
||||||
)
|
)
|
||||||
|
|
||||||
func GetMaxSeqKey(conversationID string) string {
|
func GetMallocSeqKey(conversationID string) string {
|
||||||
return maxSeq + conversationID
|
return MallocSeq + conversationID
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetMallocMinSeqKey(conversationID string) string {
|
||||||
|
return MallocMinSeqLock + conversationID
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetMinSeqKey(conversationID string) string {
|
func GetSeqUserMaxSeqKey(conversationID string, userID string) string {
|
||||||
return minSeq + conversationID
|
return SeqUserMaxSeq + conversationID + ":" + userID
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetHasReadSeqKey(conversationID string, userID string) string {
|
func GetSeqUserMinSeqKey(conversationID string, userID string) string {
|
||||||
return hasReadSeq + userID + ":" + conversationID
|
return SeqUserMinSeq + conversationID + ":" + userID
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetConversationUserMinSeqKey(conversationID, userID string) string {
|
func GetSeqUserReadSeqKey(conversationID string, userID string) string {
|
||||||
return conversationUserMinSeq + conversationID + "u:" + userID
|
return SeqUserReadSeq + conversationID + ":" + userID
|
||||||
}
|
}
|
||||||
|
@ -1,200 +0,0 @@
|
|||||||
// Copyright © 2023 OpenIM. All rights reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package redis
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
|
|
||||||
"github.com/openimsdk/tools/errs"
|
|
||||||
"github.com/openimsdk/tools/utils/stringutil"
|
|
||||||
"github.com/redis/go-redis/v9"
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
func NewSeqCache(rdb redis.UniversalClient) cache.SeqCache {
|
|
||||||
return &seqCache{rdb: rdb}
|
|
||||||
}
|
|
||||||
|
|
||||||
type seqCache struct {
|
|
||||||
rdb redis.UniversalClient
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *seqCache) getMaxSeqKey(conversationID string) string {
|
|
||||||
return cachekey.GetMaxSeqKey(conversationID)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *seqCache) getMinSeqKey(conversationID string) string {
|
|
||||||
return cachekey.GetMinSeqKey(conversationID)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *seqCache) getHasReadSeqKey(conversationID string, userID string) string {
|
|
||||||
return cachekey.GetHasReadSeqKey(conversationID, userID)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *seqCache) getConversationUserMinSeqKey(conversationID, userID string) string {
|
|
||||||
return cachekey.GetConversationUserMinSeqKey(conversationID, userID)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *seqCache) setSeq(ctx context.Context, conversationID string, seq int64, getkey func(conversationID string) string) error {
|
|
||||||
return errs.Wrap(c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err())
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *seqCache) getSeq(ctx context.Context, conversationID string, getkey func(conversationID string) string) (int64, error) {
|
|
||||||
val, err := c.rdb.Get(ctx, getkey(conversationID)).Int64()
|
|
||||||
if err != nil {
|
|
||||||
return 0, errs.Wrap(err)
|
|
||||||
}
|
|
||||||
return val, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *seqCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) {
|
|
||||||
m = make(map[string]int64, len(items))
|
|
||||||
var (
|
|
||||||
reverseMap = make(map[string]string, len(items))
|
|
||||||
keys = make([]string, len(items))
|
|
||||||
lock sync.Mutex
|
|
||||||
)
|
|
||||||
|
|
||||||
for i, v := range items {
|
|
||||||
keys[i] = getkey(v)
|
|
||||||
reverseMap[getkey(v)] = v
|
|
||||||
}
|
|
||||||
|
|
||||||
manager := NewRedisShardManager(c.rdb)
|
|
||||||
if err = manager.ProcessKeysBySlot(ctx, keys, func(ctx context.Context, _ int64, keys []string) error {
|
|
||||||
res, err := c.rdb.MGet(ctx, keys...).Result()
|
|
||||||
if err != nil && !errors.Is(err, redis.Nil) {
|
|
||||||
return errs.Wrap(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// len(res) <= len(items)
|
|
||||||
for i := range res {
|
|
||||||
strRes, ok := res[i].(string)
|
|
||||||
if !ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
val := stringutil.StringToInt64(strRes)
|
|
||||||
if val != 0 {
|
|
||||||
lock.Lock()
|
|
||||||
m[reverseMap[keys[i]]] = val
|
|
||||||
lock.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return m, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *seqCache) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error {
|
|
||||||
return c.setSeq(ctx, conversationID, maxSeq, c.getMaxSeqKey)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *seqCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (m map[string]int64, err error) {
|
|
||||||
return c.getSeqs(ctx, conversationIDs, c.getMaxSeqKey)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *seqCache) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
|
|
||||||
return c.getSeq(ctx, conversationID, c.getMaxSeqKey)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *seqCache) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error {
|
|
||||||
return c.setSeq(ctx, conversationID, minSeq, c.getMinSeqKey)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *seqCache) setSeqs(ctx context.Context, seqs map[string]int64, getkey func(key string) string) error {
|
|
||||||
for conversationID, seq := range seqs {
|
|
||||||
if err := c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err(); err != nil {
|
|
||||||
return errs.Wrap(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *seqCache) SetMinSeqs(ctx context.Context, seqs map[string]int64) error {
|
|
||||||
return c.setSeqs(ctx, seqs, c.getMinSeqKey)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *seqCache) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
|
|
||||||
return c.getSeqs(ctx, conversationIDs, c.getMinSeqKey)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *seqCache) GetMinSeq(ctx context.Context, conversationID string) (int64, error) {
|
|
||||||
return c.getSeq(ctx, conversationID, c.getMinSeqKey)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *seqCache) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
|
|
||||||
val, err := c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64()
|
|
||||||
if err != nil {
|
|
||||||
return 0, errs.Wrap(err)
|
|
||||||
}
|
|
||||||
return val, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *seqCache) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (m map[string]int64, err error) {
|
|
||||||
return c.getSeqs(ctx, userIDs, func(userID string) string {
|
|
||||||
return c.getConversationUserMinSeqKey(conversationID, userID)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *seqCache) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error {
|
|
||||||
return errs.Wrap(c.rdb.Set(ctx, c.getConversationUserMinSeqKey(conversationID, userID), minSeq, 0).Err())
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *seqCache) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) {
|
|
||||||
return c.setSeqs(ctx, seqs, func(userID string) string {
|
|
||||||
return c.getConversationUserMinSeqKey(conversationID, userID)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *seqCache) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error) {
|
|
||||||
return c.setSeqs(ctx, seqs, func(conversationID string) string {
|
|
||||||
return c.getConversationUserMinSeqKey(conversationID, userID)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *seqCache) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
|
|
||||||
return errs.Wrap(c.rdb.Set(ctx, c.getHasReadSeqKey(conversationID, userID), hasReadSeq, 0).Err())
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *seqCache) SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error {
|
|
||||||
return c.setSeqs(ctx, hasReadSeqs, func(userID string) string {
|
|
||||||
return c.getHasReadSeqKey(conversationID, userID)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *seqCache) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error {
|
|
||||||
return c.setSeqs(ctx, hasReadSeqs, func(conversationID string) string {
|
|
||||||
return c.getHasReadSeqKey(conversationID, userID)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *seqCache) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
|
|
||||||
return c.getSeqs(ctx, conversationIDs, func(conversationID string) string {
|
|
||||||
return c.getHasReadSeqKey(conversationID, userID)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *seqCache) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) {
|
|
||||||
val, err := c.rdb.Get(ctx, c.getHasReadSeqKey(conversationID, userID)).Int64()
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
return val, nil
|
|
||||||
}
|
|
@ -0,0 +1,255 @@
|
|||||||
|
package redis
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"github.com/dtm-labs/rockscache"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
||||||
|
"github.com/openimsdk/tools/errs"
|
||||||
|
"github.com/openimsdk/tools/log"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewSeqConversationCacheRedis(rdb redis.UniversalClient, mgo database.SeqConversation) cache.SeqConversationCache {
|
||||||
|
return &seqConversationCacheRedis{
|
||||||
|
rdb: rdb,
|
||||||
|
mgo: mgo,
|
||||||
|
lockTime: time.Second * 3,
|
||||||
|
dataTime: time.Hour * 24 * 365,
|
||||||
|
minSeqExpireTime: time.Hour,
|
||||||
|
rocks: rockscache.NewClient(rdb, *GetRocksCacheOptions()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type seqConversationCacheRedis struct {
|
||||||
|
rdb redis.UniversalClient
|
||||||
|
mgo database.SeqConversation
|
||||||
|
rocks *rockscache.Client
|
||||||
|
lockTime time.Duration
|
||||||
|
dataTime time.Duration
|
||||||
|
minSeqExpireTime time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqConversationCacheRedis) getMinSeqKey(conversationID string) string {
|
||||||
|
return cachekey.GetMallocMinSeqKey(conversationID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqConversationCacheRedis) SetMinSeq(ctx context.Context, conversationID string, seq int64) error {
|
||||||
|
if err := s.mgo.SetMinSeq(ctx, conversationID, seq); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := s.rocks.TagAsDeleted2(ctx, s.getMinSeqKey(conversationID)); err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqConversationCacheRedis) GetMinSeq(ctx context.Context, conversationID string) (int64, error) {
|
||||||
|
return getCache(ctx, s.rocks, s.getMinSeqKey(conversationID), s.minSeqExpireTime, func(ctx context.Context) (int64, error) {
|
||||||
|
return s.mgo.GetMinSeq(ctx, conversationID)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqConversationCacheRedis) getSeqMallocKey(conversationID string) string {
|
||||||
|
return cachekey.GetMallocSeqKey(conversationID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqConversationCacheRedis) setSeq(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64) (int64, error) {
|
||||||
|
if lastSeq < currSeq {
|
||||||
|
return 0, errs.New("lastSeq must be greater than currSeq")
|
||||||
|
}
|
||||||
|
// 0: success
|
||||||
|
// 1: success the lock has expired, but has not been locked by anyone else
|
||||||
|
// 2: already locked, but not by yourself
|
||||||
|
script := `
|
||||||
|
local key = KEYS[1]
|
||||||
|
local lockValue = ARGV[1]
|
||||||
|
local dataSecond = ARGV[2]
|
||||||
|
local curr_seq = tonumber(ARGV[3])
|
||||||
|
local last_seq = tonumber(ARGV[4])
|
||||||
|
if redis.call("EXISTS", key) == 0 then
|
||||||
|
redis.call("HSET", key, "CURR", curr_seq, "LAST", last_seq)
|
||||||
|
redis.call("EXPIRE", key, dataSecond)
|
||||||
|
return 1
|
||||||
|
end
|
||||||
|
if redis.call("HGET", key, "LOCK") ~= lockValue then
|
||||||
|
return 2
|
||||||
|
end
|
||||||
|
redis.call("HDEL", key, "LOCK")
|
||||||
|
redis.call("HSET", key, "CURR", curr_seq, "LAST", last_seq)
|
||||||
|
redis.call("EXPIRE", key, dataSecond)
|
||||||
|
return 0
|
||||||
|
`
|
||||||
|
result, err := s.rdb.Eval(ctx, script, []string{key}, owner, int64(s.dataTime/time.Second), currSeq, lastSeq).Int64()
|
||||||
|
if err != nil {
|
||||||
|
return 0, errs.Wrap(err)
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// malloc size=0 is to get the current seq size>0 is to allocate seq
|
||||||
|
func (s *seqConversationCacheRedis) malloc(ctx context.Context, key string, size int64) ([]int64, error) {
|
||||||
|
// 0: success
|
||||||
|
// 1: need to obtain and lock
|
||||||
|
// 2: already locked
|
||||||
|
// 3: exceeded the maximum value and locked
|
||||||
|
script := `
|
||||||
|
local key = KEYS[1]
|
||||||
|
local size = tonumber(ARGV[1])
|
||||||
|
local lockSecond = ARGV[2]
|
||||||
|
local dataSecond = ARGV[3]
|
||||||
|
local result = {}
|
||||||
|
if redis.call("EXISTS", key) == 0 then
|
||||||
|
local lockValue = math.random(0, 999999999)
|
||||||
|
redis.call("HSET", key, "LOCK", lockValue)
|
||||||
|
redis.call("EXPIRE", key, lockSecond)
|
||||||
|
table.insert(result, 1)
|
||||||
|
table.insert(result, lockValue)
|
||||||
|
return result
|
||||||
|
end
|
||||||
|
if redis.call("HEXISTS", key, "LOCK") == 1 then
|
||||||
|
table.insert(result, 2)
|
||||||
|
return result
|
||||||
|
end
|
||||||
|
local curr_seq = tonumber(redis.call("HGET", key, "CURR"))
|
||||||
|
local last_seq = tonumber(redis.call("HGET", key, "LAST"))
|
||||||
|
if size == 0 then
|
||||||
|
redis.call("EXPIRE", key, dataSecond)
|
||||||
|
table.insert(result, 0)
|
||||||
|
table.insert(result, curr_seq)
|
||||||
|
table.insert(result, last_seq)
|
||||||
|
return result
|
||||||
|
end
|
||||||
|
local max_seq = curr_seq + size
|
||||||
|
if max_seq > last_seq then
|
||||||
|
local lockValue = math.random(0, 999999999)
|
||||||
|
redis.call("HSET", key, "LOCK", lockValue)
|
||||||
|
redis.call("HSET", key, "CURR", last_seq)
|
||||||
|
redis.call("EXPIRE", key, lockSecond)
|
||||||
|
table.insert(result, 3)
|
||||||
|
table.insert(result, curr_seq)
|
||||||
|
table.insert(result, last_seq)
|
||||||
|
table.insert(result, lockValue)
|
||||||
|
return result
|
||||||
|
end
|
||||||
|
redis.call("HSET", key, "CURR", max_seq)
|
||||||
|
redis.call("EXPIRE", key, dataSecond)
|
||||||
|
table.insert(result, 0)
|
||||||
|
table.insert(result, curr_seq)
|
||||||
|
table.insert(result, last_seq)
|
||||||
|
return result
|
||||||
|
`
|
||||||
|
result, err := s.rdb.Eval(ctx, script, []string{key}, size, int64(s.lockTime/time.Second), int64(s.dataTime/time.Second)).Int64Slice()
|
||||||
|
if err != nil {
|
||||||
|
return nil, errs.Wrap(err)
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqConversationCacheRedis) wait(ctx context.Context) error {
|
||||||
|
timer := time.NewTimer(time.Second / 4)
|
||||||
|
defer timer.Stop()
|
||||||
|
select {
|
||||||
|
case <-timer.C:
|
||||||
|
return nil
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqConversationCacheRedis) setSeqRetry(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64) {
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
state, err := s.setSeq(ctx, key, owner, currSeq, lastSeq)
|
||||||
|
if err != nil {
|
||||||
|
log.ZError(ctx, "set seq cache failed", err, "key", key, "owner", owner, "currSeq", currSeq, "lastSeq", lastSeq, "count", i+1)
|
||||||
|
if err := s.wait(ctx); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
switch state {
|
||||||
|
case 0: // ideal state
|
||||||
|
case 1:
|
||||||
|
log.ZWarn(ctx, "set seq cache lock not found", nil, "key", key, "owner", owner, "currSeq", currSeq, "lastSeq", lastSeq)
|
||||||
|
case 2:
|
||||||
|
log.ZWarn(ctx, "set seq cache lock to be held by someone else", nil, "key", key, "owner", owner, "currSeq", currSeq, "lastSeq", lastSeq)
|
||||||
|
default:
|
||||||
|
log.ZError(ctx, "set seq cache lock unknown state", nil, "key", key, "owner", owner, "currSeq", currSeq, "lastSeq", lastSeq)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.ZError(ctx, "set seq cache retrying still failed", nil, "key", key, "owner", owner, "currSeq", currSeq, "lastSeq", lastSeq)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqConversationCacheRedis) getMallocSize(conversationID string, size int64) int64 {
|
||||||
|
if size == 0 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
var basicSize int64
|
||||||
|
if msgprocessor.IsGroupConversationID(conversationID) {
|
||||||
|
basicSize = 100
|
||||||
|
} else {
|
||||||
|
basicSize = 50
|
||||||
|
}
|
||||||
|
basicSize += size
|
||||||
|
return basicSize
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqConversationCacheRedis) Malloc(ctx context.Context, conversationID string, size int64) (int64, error) {
|
||||||
|
if size < 0 {
|
||||||
|
return 0, errs.New("size must be greater than 0")
|
||||||
|
}
|
||||||
|
key := s.getSeqMallocKey(conversationID)
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
states, err := s.malloc(ctx, key, size)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
switch states[0] {
|
||||||
|
case 0: // success
|
||||||
|
return states[1], nil
|
||||||
|
case 1: // not found
|
||||||
|
mallocSize := s.getMallocSize(conversationID, size)
|
||||||
|
seq, err := s.mgo.Malloc(ctx, conversationID, mallocSize)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
s.setSeqRetry(ctx, key, states[1], seq+size, seq+mallocSize)
|
||||||
|
return seq, nil
|
||||||
|
case 2: // locked
|
||||||
|
if err := s.wait(ctx); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
case 3: // exceeded cache max value
|
||||||
|
currSeq := states[1]
|
||||||
|
lastSeq := states[2]
|
||||||
|
mallocSize := s.getMallocSize(conversationID, size)
|
||||||
|
seq, err := s.mgo.Malloc(ctx, conversationID, mallocSize)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
if lastSeq == seq {
|
||||||
|
s.setSeqRetry(ctx, key, states[3], currSeq+size, seq+mallocSize)
|
||||||
|
return currSeq, nil
|
||||||
|
} else {
|
||||||
|
log.ZWarn(ctx, "malloc seq not equal cache last seq", nil, "conversationID", conversationID, "currSeq", currSeq, "lastSeq", lastSeq, "mallocSeq", seq)
|
||||||
|
s.setSeqRetry(ctx, key, states[3], seq+size, seq+mallocSize)
|
||||||
|
return seq, nil
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
log.ZError(ctx, "malloc seq unknown state", nil, "state", states[0], "conversationID", conversationID, "size", size)
|
||||||
|
return 0, errs.New(fmt.Sprintf("unknown state: %d", states[0]))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.ZError(ctx, "malloc seq retrying still failed", nil, "conversationID", conversationID, "size", size)
|
||||||
|
return 0, errs.New("malloc seq waiting for lock timeout", "conversationID", conversationID, "size", size)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqConversationCacheRedis) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
|
||||||
|
return s.Malloc(ctx, conversationID, 0)
|
||||||
|
}
|
@ -0,0 +1,109 @@
|
|||||||
|
package redis
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo/options"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func newTestSeq() *seqConversationCacheRedis {
|
||||||
|
mgocli, err := mongo.Connect(context.Background(), options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
model, err := mgo.NewSeqConversationMongo(mgocli.Database("openim_v3"))
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
opt := &redis.Options{
|
||||||
|
Addr: "172.16.8.48:16379",
|
||||||
|
Password: "openIM123",
|
||||||
|
DB: 1,
|
||||||
|
}
|
||||||
|
rdb := redis.NewClient(opt)
|
||||||
|
if err := rdb.Ping(context.Background()).Err(); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return NewSeqConversationCacheRedis(rdb, model).(*seqConversationCacheRedis)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSeq(t *testing.T) {
|
||||||
|
ts := newTestSeq()
|
||||||
|
var (
|
||||||
|
wg sync.WaitGroup
|
||||||
|
speed atomic.Int64
|
||||||
|
)
|
||||||
|
|
||||||
|
const count = 128
|
||||||
|
wg.Add(count)
|
||||||
|
for i := 0; i < count; i++ {
|
||||||
|
index := i + 1
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
var size int64 = 10
|
||||||
|
cID := strconv.Itoa(index * 1)
|
||||||
|
for i := 1; ; i++ {
|
||||||
|
//first, err := ts.mgo.Malloc(context.Background(), cID, size) // mongo
|
||||||
|
first, err := ts.Malloc(context.Background(), cID, size) // redis
|
||||||
|
if err != nil {
|
||||||
|
t.Logf("[%d-%d] %s %s", index, i, cID, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
speed.Add(size)
|
||||||
|
_ = first
|
||||||
|
//t.Logf("[%d] %d -> %d", i, first+1, first+size)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
wg.Wait()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
ticker := time.NewTicker(time.Second)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
ticker.Stop()
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
value := speed.Swap(0)
|
||||||
|
t.Logf("speed: %d/s", value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDel(t *testing.T) {
|
||||||
|
ts := newTestSeq()
|
||||||
|
for i := 1; i < 100; i++ {
|
||||||
|
var size int64 = 100
|
||||||
|
first, err := ts.Malloc(context.Background(), "100", size)
|
||||||
|
if err != nil {
|
||||||
|
t.Logf("[%d] %s", i, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
t.Logf("[%d] %d -> %d", i, first+1, first+size)
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSeqMalloc(t *testing.T) {
|
||||||
|
ts := newTestSeq()
|
||||||
|
t.Log(ts.GetMaxSeq(context.Background(), "100"))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMinSeq(t *testing.T) {
|
||||||
|
ts := newTestSeq()
|
||||||
|
t.Log(ts.GetMinSeq(context.Background(), "10000000"))
|
||||||
|
}
|
@ -0,0 +1,89 @@
|
|||||||
|
package redis
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/dtm-labs/rockscache"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
||||||
|
"github.com/openimsdk/tools/errs"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewSeqUserCacheRedis(rdb redis.UniversalClient, mgo database.SeqUser) cache.SeqUser {
|
||||||
|
return &seqUserCacheRedis{
|
||||||
|
rdb: rdb,
|
||||||
|
mgo: mgo,
|
||||||
|
readSeqWriteRatio: 100,
|
||||||
|
expireTime: time.Hour * 24 * 7,
|
||||||
|
readExpireTime: time.Hour * 24 * 30,
|
||||||
|
rocks: rockscache.NewClient(rdb, *GetRocksCacheOptions()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type seqUserCacheRedis struct {
|
||||||
|
rdb redis.UniversalClient
|
||||||
|
mgo database.SeqUser
|
||||||
|
rocks *rockscache.Client
|
||||||
|
expireTime time.Duration
|
||||||
|
readExpireTime time.Duration
|
||||||
|
readSeqWriteRatio int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqUserCacheRedis) getSeqUserMaxSeqKey(conversationID string, userID string) string {
|
||||||
|
return cachekey.GetSeqUserMaxSeqKey(conversationID, userID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqUserCacheRedis) getSeqUserMinSeqKey(conversationID string, userID string) string {
|
||||||
|
return cachekey.GetSeqUserMinSeqKey(conversationID, userID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqUserCacheRedis) getSeqUserReadSeqKey(conversationID string, userID string) string {
|
||||||
|
return cachekey.GetSeqUserReadSeqKey(conversationID, userID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqUserCacheRedis) GetMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
|
||||||
|
return getCache(ctx, s.rocks, s.getSeqUserMaxSeqKey(conversationID, userID), s.expireTime, func(ctx context.Context) (int64, error) {
|
||||||
|
return s.mgo.GetMaxSeq(ctx, conversationID, userID)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqUserCacheRedis) SetMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
|
||||||
|
if err := s.mgo.SetMaxSeq(ctx, conversationID, userID, seq); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return s.rocks.TagAsDeleted2(ctx, s.getSeqUserMaxSeqKey(conversationID, userID))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqUserCacheRedis) GetMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
|
||||||
|
return getCache(ctx, s.rocks, s.getSeqUserMinSeqKey(conversationID, userID), s.expireTime, func(ctx context.Context) (int64, error) {
|
||||||
|
return s.mgo.GetMaxSeq(ctx, conversationID, userID)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqUserCacheRedis) SetMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
|
||||||
|
if err := s.mgo.SetMinSeq(ctx, conversationID, userID, seq); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return s.rocks.TagAsDeleted2(ctx, s.getSeqUserMinSeqKey(conversationID, userID))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqUserCacheRedis) GetReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
|
||||||
|
return getCache(ctx, s.rocks, s.getSeqUserReadSeqKey(conversationID, userID), s.readExpireTime, func(ctx context.Context) (int64, error) {
|
||||||
|
return s.mgo.GetMaxSeq(ctx, conversationID, userID)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqUserCacheRedis) SetReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
|
||||||
|
if seq%s.readSeqWriteRatio == 0 {
|
||||||
|
if err := s.mgo.SetReadSeq(ctx, conversationID, userID, seq); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := s.rocks.RawSet(ctx, s.getSeqUserReadSeqKey(conversationID, userID), strconv.Itoa(int(seq)), s.readExpireTime); err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
@ -1,30 +0,0 @@
|
|||||||
package cache
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
)
|
|
||||||
|
|
||||||
type SeqCache interface {
|
|
||||||
SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error
|
|
||||||
GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
|
|
||||||
GetMaxSeq(ctx context.Context, conversationID string) (int64, error)
|
|
||||||
SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error
|
|
||||||
SetMinSeqs(ctx context.Context, seqs map[string]int64) error
|
|
||||||
GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
|
|
||||||
GetMinSeq(ctx context.Context, conversationID string) (int64, error)
|
|
||||||
GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
|
|
||||||
GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error)
|
|
||||||
SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error
|
|
||||||
// seqs map: key userID value minSeq
|
|
||||||
SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error)
|
|
||||||
// seqs map: key conversationID value minSeq
|
|
||||||
SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error
|
|
||||||
// has read seq
|
|
||||||
SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
|
|
||||||
// k: user, v: seq
|
|
||||||
SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error
|
|
||||||
// k: conversation, v :seq
|
|
||||||
UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error
|
|
||||||
GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)
|
|
||||||
GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error)
|
|
||||||
}
|
|
@ -0,0 +1,10 @@
|
|||||||
|
package cache
|
||||||
|
|
||||||
|
import "context"
|
||||||
|
|
||||||
|
type SeqConversationCache interface {
|
||||||
|
Malloc(ctx context.Context, conversationID string, size int64) (int64, error)
|
||||||
|
GetMaxSeq(ctx context.Context, conversationID string) (int64, error)
|
||||||
|
SetMinSeq(ctx context.Context, conversationID string, seq int64) error
|
||||||
|
GetMinSeq(ctx context.Context, conversationID string) (int64, error)
|
||||||
|
}
|
@ -0,0 +1,12 @@
|
|||||||
|
package cache
|
||||||
|
|
||||||
|
import "context"
|
||||||
|
|
||||||
|
type SeqUser interface {
|
||||||
|
GetMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error)
|
||||||
|
SetMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error
|
||||||
|
GetMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
|
||||||
|
SetMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error
|
||||||
|
GetReadSeq(ctx context.Context, conversationID string, userID string) (int64, error)
|
||||||
|
SetReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error
|
||||||
|
}
|
@ -0,0 +1,103 @@
|
|||||||
|
package mgo
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
|
"github.com/openimsdk/tools/db/mongoutil"
|
||||||
|
"go.mongodb.org/mongo-driver/bson"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo/options"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewSeqConversationMongo(db *mongo.Database) (database.SeqConversation, error) {
|
||||||
|
coll := db.Collection(database.SeqConversationName)
|
||||||
|
_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
|
||||||
|
Keys: bson.D{
|
||||||
|
{Key: "conversation_id", Value: 1},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &seqConversationMongo{coll: coll}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type seqConversationMongo struct {
|
||||||
|
coll *mongo.Collection
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqConversationMongo) setSeq(ctx context.Context, conversationID string, seq int64, field string) error {
|
||||||
|
filter := map[string]any{
|
||||||
|
"conversation_id": conversationID,
|
||||||
|
}
|
||||||
|
insert := bson.M{
|
||||||
|
"conversation_id": conversationID,
|
||||||
|
"min_seq": 0,
|
||||||
|
"max_seq": 0,
|
||||||
|
}
|
||||||
|
delete(insert, field)
|
||||||
|
update := map[string]any{
|
||||||
|
"$set": bson.M{
|
||||||
|
field: seq,
|
||||||
|
},
|
||||||
|
"$setOnInsert": insert,
|
||||||
|
}
|
||||||
|
opt := options.Update().SetUpsert(true)
|
||||||
|
return mongoutil.UpdateOne(ctx, s.coll, filter, update, false, opt)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqConversationMongo) Malloc(ctx context.Context, conversationID string, size int64) (int64, error) {
|
||||||
|
if size < 0 {
|
||||||
|
return 0, errors.New("size must be greater than 0")
|
||||||
|
}
|
||||||
|
if size == 0 {
|
||||||
|
return s.GetMaxSeq(ctx, conversationID)
|
||||||
|
}
|
||||||
|
filter := map[string]any{"conversation_id": conversationID}
|
||||||
|
update := map[string]any{
|
||||||
|
"$inc": map[string]any{"max_seq": size},
|
||||||
|
"$set": map[string]any{"min_seq": int64(0)},
|
||||||
|
}
|
||||||
|
opt := options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After).SetProjection(map[string]any{"_id": 0, "max_seq": 1})
|
||||||
|
lastSeq, err := mongoutil.FindOneAndUpdate[int64](ctx, s.coll, filter, update, opt)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return lastSeq - size, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqConversationMongo) SetMaxSeq(ctx context.Context, conversationID string, seq int64) error {
|
||||||
|
return s.setSeq(ctx, conversationID, seq, "max_seq")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqConversationMongo) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
|
||||||
|
seq, err := mongoutil.FindOne[int64](ctx, s.coll, bson.M{"conversation_id": conversationID}, options.FindOne().SetProjection(map[string]any{"_id": 0, "max_seq": 1}))
|
||||||
|
if err == nil {
|
||||||
|
return seq, nil
|
||||||
|
} else if IsNotFound(err) {
|
||||||
|
return 0, nil
|
||||||
|
} else {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqConversationMongo) GetMinSeq(ctx context.Context, conversationID string) (int64, error) {
|
||||||
|
seq, err := mongoutil.FindOne[int64](ctx, s.coll, bson.M{"conversation_id": conversationID}, options.FindOne().SetProjection(map[string]any{"_id": 0, "min_seq": 1}))
|
||||||
|
if err == nil {
|
||||||
|
return seq, nil
|
||||||
|
} else if IsNotFound(err) {
|
||||||
|
return 0, nil
|
||||||
|
} else {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqConversationMongo) SetMinSeq(ctx context.Context, conversationID string, seq int64) error {
|
||||||
|
return s.setSeq(ctx, conversationID, seq, "min_seq")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqConversationMongo) GetConversation(ctx context.Context, conversationID string) (*model.SeqConversation, error) {
|
||||||
|
return mongoutil.FindOne[*model.SeqConversation](ctx, s.coll, bson.M{"conversation_id": conversationID})
|
||||||
|
}
|
@ -0,0 +1,37 @@
|
|||||||
|
package mgo
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo/options"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Result[V any](val V, err error) V {
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return val
|
||||||
|
}
|
||||||
|
|
||||||
|
func Mongodb() *mongo.Database {
|
||||||
|
return Result(
|
||||||
|
mongo.Connect(context.Background(),
|
||||||
|
options.Client().
|
||||||
|
ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").
|
||||||
|
SetConnectTimeout(5*time.Second)),
|
||||||
|
).Database("openim_v3")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUserSeq(t *testing.T) {
|
||||||
|
uSeq := Result(NewSeqUserMongo(Mongodb())).(*seqUserMongo)
|
||||||
|
t.Log(uSeq.SetMinSeq(context.Background(), "1000", "2000", 4))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConversationSeq(t *testing.T) {
|
||||||
|
cSeq := Result(NewSeqConversationMongo(Mongodb())).(*seqConversationMongo)
|
||||||
|
t.Log(cSeq.SetMaxSeq(context.Background(), "2000", 10))
|
||||||
|
t.Log(cSeq.Malloc(context.Background(), "2000", 10))
|
||||||
|
t.Log(cSeq.GetMaxSeq(context.Background(), "2000"))
|
||||||
|
}
|
@ -0,0 +1,92 @@
|
|||||||
|
package mgo
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
||||||
|
"github.com/openimsdk/tools/db/mongoutil"
|
||||||
|
"go.mongodb.org/mongo-driver/bson"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo/options"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewSeqUserMongo(db *mongo.Database) (database.SeqUser, error) {
|
||||||
|
coll := db.Collection(database.SeqUserName)
|
||||||
|
_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
|
||||||
|
Keys: bson.D{
|
||||||
|
{Key: "user_id", Value: 1},
|
||||||
|
{Key: "conversation_id", Value: 1},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &seqUserMongo{coll: coll}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type seqUserMongo struct {
|
||||||
|
coll *mongo.Collection
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqUserMongo) setSeq(ctx context.Context, conversationID string, userID string, seq int64, field string) error {
|
||||||
|
filter := map[string]any{
|
||||||
|
"user_id": userID,
|
||||||
|
"conversation_id": conversationID,
|
||||||
|
}
|
||||||
|
insert := bson.M{
|
||||||
|
"user_id": userID,
|
||||||
|
"conversation_id": conversationID,
|
||||||
|
"min_seq": 0,
|
||||||
|
"max_seq": 0,
|
||||||
|
"read_seq": 0,
|
||||||
|
}
|
||||||
|
delete(insert, field)
|
||||||
|
update := map[string]any{
|
||||||
|
"$set": bson.M{
|
||||||
|
field: seq,
|
||||||
|
},
|
||||||
|
"$setOnInsert": insert,
|
||||||
|
}
|
||||||
|
opt := options.Update().SetUpsert(true)
|
||||||
|
return mongoutil.UpdateOne(ctx, s.coll, filter, update, false, opt)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqUserMongo) getSeq(ctx context.Context, conversationID string, userID string, failed string) (int64, error) {
|
||||||
|
filter := map[string]any{
|
||||||
|
"user_id": userID,
|
||||||
|
"conversation_id": conversationID,
|
||||||
|
}
|
||||||
|
opt := options.FindOne().SetProjection(bson.M{"_id": 0, failed: 1})
|
||||||
|
seq, err := mongoutil.FindOne[int64](ctx, s.coll, filter, opt)
|
||||||
|
if err == nil {
|
||||||
|
return seq, nil
|
||||||
|
} else if errors.Is(err, mongo.ErrNoDocuments) {
|
||||||
|
return 0, nil
|
||||||
|
} else {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqUserMongo) GetMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
|
||||||
|
return s.getSeq(ctx, conversationID, userID, "max_seq")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqUserMongo) SetMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
|
||||||
|
return s.setSeq(ctx, conversationID, userID, seq, "max_seq")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqUserMongo) GetMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
|
||||||
|
return s.getSeq(ctx, conversationID, userID, "min_seq")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqUserMongo) SetMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
|
||||||
|
return s.setSeq(ctx, conversationID, userID, seq, "min_seq")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqUserMongo) GetReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
|
||||||
|
return s.getSeq(ctx, conversationID, userID, "read_seq")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *seqUserMongo) SetReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
|
||||||
|
return s.setSeq(ctx, conversationID, userID, seq, "read_seq")
|
||||||
|
}
|
@ -0,0 +1,11 @@
|
|||||||
|
package database
|
||||||
|
|
||||||
|
import "context"
|
||||||
|
|
||||||
|
type SeqConversation interface {
|
||||||
|
Malloc(ctx context.Context, conversationID string, size int64) (int64, error)
|
||||||
|
GetMaxSeq(ctx context.Context, conversationID string) (int64, error)
|
||||||
|
SetMaxSeq(ctx context.Context, conversationID string, seq int64) error
|
||||||
|
GetMinSeq(ctx context.Context, conversationID string) (int64, error)
|
||||||
|
SetMinSeq(ctx context.Context, conversationID string, seq int64) error
|
||||||
|
}
|
@ -0,0 +1,12 @@
|
|||||||
|
package database
|
||||||
|
|
||||||
|
import "context"
|
||||||
|
|
||||||
|
type SeqUser interface {
|
||||||
|
GetMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error)
|
||||||
|
SetMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error
|
||||||
|
GetMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
|
||||||
|
SetMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error
|
||||||
|
GetReadSeq(ctx context.Context, conversationID string, userID string) (int64, error)
|
||||||
|
SetReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error
|
||||||
|
}
|
@ -0,0 +1,7 @@
|
|||||||
|
package model
|
||||||
|
|
||||||
|
type SeqConversation struct {
|
||||||
|
ConversationID string `bson:"conversation_id"`
|
||||||
|
MaxSeq int64 `bson:"max_seq"`
|
||||||
|
MinSeq int64 `bson:"min_seq"`
|
||||||
|
}
|
@ -0,0 +1,9 @@
|
|||||||
|
package model
|
||||||
|
|
||||||
|
type SeqUser struct {
|
||||||
|
UserID string `bson:"user_id"`
|
||||||
|
ConversationID string `bson:"conversation_id"`
|
||||||
|
MinSeq int64 `bson:"min_seq"`
|
||||||
|
MaxSeq int64 `bson:"max_seq"`
|
||||||
|
ReadSeq int64 `bson:"read_seq"`
|
||||||
|
}
|
@ -0,0 +1,331 @@
|
|||||||
|
package internal
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
|
||||||
|
"github.com/openimsdk/tools/db/mongoutil"
|
||||||
|
"github.com/openimsdk/tools/db/redisutil"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
"go.mongodb.org/mongo-driver/bson"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo/options"
|
||||||
|
"gopkg.in/yaml.v3"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
MaxSeq = "MAX_SEQ:"
|
||||||
|
MinSeq = "MIN_SEQ:"
|
||||||
|
ConversationUserMinSeq = "CON_USER_MIN_SEQ:"
|
||||||
|
HasReadSeq = "HAS_READ_SEQ:"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
batchSize = 100
|
||||||
|
dataVersionCollection = "data_version"
|
||||||
|
seqKey = "seq"
|
||||||
|
seqVersion = 38
|
||||||
|
)
|
||||||
|
|
||||||
|
func readConfig[T any](dir string, name string) (*T, error) {
|
||||||
|
data, err := os.ReadFile(filepath.Join(dir, name))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var conf T
|
||||||
|
if err := yaml.Unmarshal(data, &conf); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &conf, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func Main(conf string, del time.Duration) error {
|
||||||
|
redisConfig, err := readConfig[config.Redis](conf, cmd.RedisConfigFileName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
mongodbConfig, err := readConfig[config.Mongo](conf, cmd.MongodbConfigFileName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||||
|
defer cancel()
|
||||||
|
rdb, err := redisutil.NewRedisClient(ctx, redisConfig.Build())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
mgocli, err := mongoutil.NewMongoDB(ctx, mongodbConfig.Build())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
versionColl := mgocli.GetDB().Collection(dataVersionCollection)
|
||||||
|
converted, err := CheckVersion(versionColl, seqKey, seqVersion)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if converted {
|
||||||
|
fmt.Println("[seq] seq data has been converted")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if _, err := mgo.NewSeqConversationMongo(mgocli.GetDB()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
cSeq, err := mgo.NewSeqConversationMongo(mgocli.GetDB())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
uSeq, err := mgo.NewSeqUserMongo(mgocli.GetDB())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
uSpitHasReadSeq := func(id string) (conversationID string, userID string, err error) {
|
||||||
|
// HasReadSeq + userID + ":" + conversationID
|
||||||
|
arr := strings.Split(id, ":")
|
||||||
|
if len(arr) != 2 || arr[0] == "" || arr[1] == "" {
|
||||||
|
return "", "", fmt.Errorf("invalid has read seq id %s", id)
|
||||||
|
}
|
||||||
|
userID = arr[0]
|
||||||
|
conversationID = arr[1]
|
||||||
|
return
|
||||||
|
}
|
||||||
|
uSpitConversationUserMinSeq := func(id string) (conversationID string, userID string, err error) {
|
||||||
|
// ConversationUserMinSeq + conversationID + "u:" + userID
|
||||||
|
arr := strings.Split(id, "u:")
|
||||||
|
if len(arr) != 2 || arr[0] == "" || arr[1] == "" {
|
||||||
|
return "", "", fmt.Errorf("invalid has read seq id %s", id)
|
||||||
|
}
|
||||||
|
conversationID = arr[0]
|
||||||
|
userID = arr[1]
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ts := []*taskSeq{
|
||||||
|
{
|
||||||
|
Prefix: MaxSeq,
|
||||||
|
GetSeq: cSeq.GetMaxSeq,
|
||||||
|
SetSeq: cSeq.SetMinSeq,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Prefix: MinSeq,
|
||||||
|
GetSeq: cSeq.GetMinSeq,
|
||||||
|
SetSeq: cSeq.SetMinSeq,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Prefix: HasReadSeq,
|
||||||
|
GetSeq: func(ctx context.Context, id string) (int64, error) {
|
||||||
|
conversationID, userID, err := uSpitHasReadSeq(id)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return uSeq.GetReadSeq(ctx, conversationID, userID)
|
||||||
|
},
|
||||||
|
SetSeq: func(ctx context.Context, id string, seq int64) error {
|
||||||
|
conversationID, userID, err := uSpitHasReadSeq(id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return uSeq.SetReadSeq(ctx, conversationID, userID, seq)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Prefix: ConversationUserMinSeq,
|
||||||
|
GetSeq: func(ctx context.Context, id string) (int64, error) {
|
||||||
|
conversationID, userID, err := uSpitConversationUserMinSeq(id)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return uSeq.GetMinSeq(ctx, conversationID, userID)
|
||||||
|
},
|
||||||
|
SetSeq: func(ctx context.Context, id string, seq int64) error {
|
||||||
|
conversationID, userID, err := uSpitConversationUserMinSeq(id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return uSeq.SetMinSeq(ctx, conversationID, userID, seq)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
ctx = context.Background()
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(len(ts))
|
||||||
|
|
||||||
|
for i := range ts {
|
||||||
|
go func(task *taskSeq) {
|
||||||
|
defer wg.Done()
|
||||||
|
err := seqRedisToMongo(ctx, rdb, task.GetSeq, task.SetSeq, task.Prefix, del, &task.Count)
|
||||||
|
task.End = time.Now()
|
||||||
|
task.Error = err
|
||||||
|
}(ts[i])
|
||||||
|
}
|
||||||
|
start := time.Now()
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
wg.Wait()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
sigs := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(sigs, syscall.SIGTERM)
|
||||||
|
|
||||||
|
ticker := time.NewTicker(time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
var buf bytes.Buffer
|
||||||
|
|
||||||
|
printTaskInfo := func(now time.Time) {
|
||||||
|
buf.Reset()
|
||||||
|
buf.WriteString(now.Format(time.DateTime))
|
||||||
|
buf.WriteString(" \n")
|
||||||
|
for i := range ts {
|
||||||
|
task := ts[i]
|
||||||
|
if task.Error == nil {
|
||||||
|
if task.End.IsZero() {
|
||||||
|
buf.WriteString(fmt.Sprintf("[%s] converting %s* count %d", now.Sub(start), task.Prefix, atomic.LoadInt64(&task.Count)))
|
||||||
|
} else {
|
||||||
|
buf.WriteString(fmt.Sprintf("[%s] success %s* count %d", task.End.Sub(start), task.Prefix, atomic.LoadInt64(&task.Count)))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
buf.WriteString(fmt.Sprintf("[%s] failed %s* count %d error %s", task.End.Sub(start), task.Prefix, atomic.LoadInt64(&task.Count), task.Error))
|
||||||
|
}
|
||||||
|
buf.WriteString("\n")
|
||||||
|
}
|
||||||
|
fmt.Println(buf.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case s := <-sigs:
|
||||||
|
return fmt.Errorf("exit by signal %s", s)
|
||||||
|
case <-done:
|
||||||
|
errs := make([]error, 0, len(ts))
|
||||||
|
for i := range ts {
|
||||||
|
task := ts[i]
|
||||||
|
if task.Error != nil {
|
||||||
|
errs = append(errs, fmt.Errorf("seq %s failed %w", task.Prefix, task.Error))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(errs) > 0 {
|
||||||
|
return errors.Join(errs...)
|
||||||
|
}
|
||||||
|
printTaskInfo(time.Now())
|
||||||
|
if err := SetVersion(versionColl, seqKey, seqVersion); err != nil {
|
||||||
|
return fmt.Errorf("set mongodb seq version %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
case now := <-ticker.C:
|
||||||
|
printTaskInfo(now)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type taskSeq struct {
|
||||||
|
Prefix string
|
||||||
|
Count int64
|
||||||
|
Error error
|
||||||
|
End time.Time
|
||||||
|
GetSeq func(ctx context.Context, id string) (int64, error)
|
||||||
|
SetSeq func(ctx context.Context, id string, seq int64) error
|
||||||
|
}
|
||||||
|
|
||||||
|
func seqRedisToMongo(ctx context.Context, rdb redis.UniversalClient, getSeq func(ctx context.Context, id string) (int64, error), setSeq func(ctx context.Context, id string, seq int64) error, prefix string, delAfter time.Duration, count *int64) error {
|
||||||
|
var (
|
||||||
|
cursor uint64
|
||||||
|
keys []string
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
for {
|
||||||
|
keys, cursor, err = rdb.Scan(ctx, cursor, prefix+"*", batchSize).Result()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(keys) > 0 {
|
||||||
|
for _, key := range keys {
|
||||||
|
seqStr, err := rdb.Get(ctx, key).Result()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("redis get %s failed %w", key, err)
|
||||||
|
}
|
||||||
|
seq, err := strconv.Atoi(seqStr)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("invalid %s seq %s", key, seqStr)
|
||||||
|
}
|
||||||
|
if seq < 0 {
|
||||||
|
return fmt.Errorf("invalid %s seq %s", key, seqStr)
|
||||||
|
}
|
||||||
|
id := strings.TrimPrefix(key, prefix)
|
||||||
|
redisSeq := int64(seq)
|
||||||
|
mongoSeq, err := getSeq(ctx, id)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("get mongo seq %s failed %w", key, err)
|
||||||
|
}
|
||||||
|
if mongoSeq < redisSeq {
|
||||||
|
if err := setSeq(ctx, id, redisSeq); err != nil {
|
||||||
|
return fmt.Errorf("set mongo seq %s failed %w", key, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if delAfter > 0 {
|
||||||
|
if err := rdb.Expire(ctx, key, delAfter).Err(); err != nil {
|
||||||
|
return fmt.Errorf("redis expire key %s failed %w", key, err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err := rdb.Del(ctx, key).Err(); err != nil {
|
||||||
|
return fmt.Errorf("redis del key %s failed %w", key, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
atomic.AddInt64(count, 1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if cursor == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func CheckVersion(coll *mongo.Collection, key string, currentVersion int) (converted bool, err error) {
|
||||||
|
type VersionTable struct {
|
||||||
|
Key string `bson:"key"`
|
||||||
|
Value string `bson:"value"`
|
||||||
|
}
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
||||||
|
defer cancel()
|
||||||
|
res, err := mongoutil.FindOne[VersionTable](ctx, coll, bson.M{"key": key})
|
||||||
|
if err == nil {
|
||||||
|
ver, err := strconv.Atoi(res.Value)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("version %s parse error %w", res.Value, err)
|
||||||
|
}
|
||||||
|
if ver >= currentVersion {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
} else if errors.Is(err, mongo.ErrNoDocuments) {
|
||||||
|
return false, nil
|
||||||
|
} else {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func SetVersion(coll *mongo.Collection, key string, version int) error {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
||||||
|
defer cancel()
|
||||||
|
option := options.Update().SetUpsert(true)
|
||||||
|
filter := bson.M{"key": key, "value": strconv.Itoa(version)}
|
||||||
|
update := bson.M{"$set": bson.M{"key": key, "value": strconv.Itoa(version)}}
|
||||||
|
return mongoutil.UpdateOne(ctx, coll, filter, update, false, option)
|
||||||
|
}
|
@ -0,0 +1,22 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/tools/seq/internal"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
var (
|
||||||
|
config string
|
||||||
|
second int
|
||||||
|
)
|
||||||
|
flag.StringVar(&config, "c", "/Users/chao/Desktop/project/open-im-server/config", "config directory")
|
||||||
|
flag.IntVar(&second, "sec", 3600*24, "delayed deletion of the original seq key after conversion")
|
||||||
|
flag.Parse()
|
||||||
|
if err := internal.Main(config, time.Duration(second)*time.Second); err != nil {
|
||||||
|
fmt.Println("seq task", err)
|
||||||
|
}
|
||||||
|
fmt.Println("seq task success!")
|
||||||
|
}
|
Loading…
Reference in new issue