diff --git a/go.mod b/go.mod index 11a87d882..1086560bc 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/minio/minio-go/v7 v7.0.63 github.com/mitchellh/mapstructure v1.5.0 github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 // indirect + github.com/openimsdk/localcache v0.0.1 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.17.0 github.com/robfig/cron/v3 v3.0.1 @@ -37,7 +38,6 @@ require ( github.com/IBM/sarama v1.41.3 github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible github.com/go-redis/redis v6.15.9+incompatible - github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/redis/go-redis/v9 v9.2.1 github.com/stathat/consistent v1.0.0 github.com/tencentyun/cos-go-sdk-v5 v0.7.45 @@ -82,6 +82,7 @@ require ( github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-uuid v1.0.3 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect @@ -157,3 +158,5 @@ require ( golang.org/x/crypto v0.14.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect ) + +replace github.com/openimsdk/localcache => ./pkg/localcache diff --git a/pkg/common/localcache/business.go b/pkg/common/localcache/business.go deleted file mode 100644 index f011719df..000000000 --- a/pkg/common/localcache/business.go +++ /dev/null @@ -1,70 +0,0 @@ -package localcache - -import ( - "context" - "encoding/json" - "github.com/OpenIMSDK/tools/log" - "github.com/dtm-labs/rockscache" - "github.com/redis/go-redis/v9" -) - -func WithRedisDeleteSubscribe(topic string, cli redis.UniversalClient) Option { - return WithDeleteLocal(func(fn func(key ...string)) { - if fn == nil { - log.ZDebug(context.Background(), "WithRedisDeleteSubscribe fn is nil", "topic", topic) - return - } - msg := cli.Subscribe(context.Background(), topic).Channel() - for m := range msg { - log.ZDebug(context.Background(), "WithRedisDeleteSubscribe delete", "topic", m.Channel, "payload", m.Payload) - var key []string - if err := json.Unmarshal([]byte(m.Payload), &key); err != nil { - log.ZError(context.Background(), "WithRedisDeleteSubscribe json unmarshal error", err, "topic", topic, "payload", m.Payload) - continue - } - if len(key) == 0 { - continue - } - fn(key...) - } - }) -} - -func WithRedisDeletePublish(topic string, cli redis.UniversalClient) Option { - return WithDeleteKeyBefore(func(ctx context.Context, key ...string) { - data, err := json.Marshal(key) - if err != nil { - log.ZError(ctx, "json marshal error", err, "topic", topic, "key", key) - return - } - if err := cli.Publish(ctx, topic, data).Err(); err != nil { - log.ZError(ctx, "redis publish error", err, "topic", topic, "key", key) - } else { - log.ZDebug(ctx, "redis publish success", "topic", topic, "key", key) - } - }) -} - -func WithRedisDelete(cli redis.UniversalClient) Option { - return WithDeleteKeyBefore(func(ctx context.Context, key ...string) { - for _, s := range key { - if err := cli.Del(ctx, s).Err(); err != nil { - log.ZError(ctx, "redis delete error", err, "key", s) - } else { - log.ZDebug(ctx, "redis delete success", "key", s) - } - } - }) -} - -func WithRocksCacheDelete(cli *rockscache.Client) Option { - return WithDeleteKeyBefore(func(ctx context.Context, key ...string) { - for _, k := range key { - if err := cli.TagAsDeleted2(ctx, k); err != nil { - log.ZError(ctx, "rocksdb delete error", err, "key", k) - } else { - log.ZDebug(ctx, "rocksdb delete success", "key", k) - } - } - }) -} diff --git a/pkg/common/localcache/lru_test.go b/pkg/common/localcache/lru_test.go deleted file mode 100644 index fff925eaa..000000000 --- a/pkg/common/localcache/lru_test.go +++ /dev/null @@ -1,55 +0,0 @@ -package localcache - -//func TestName(t *testing.T) { -// target := &cacheTarget{} -// l := NewCache[string](100, 1000, time.Second*20, time.Second*5, target, nil) -// //l := NewLRU[string, string](1000, time.Second*20, time.Second*5, target) -// -// fn := func(key string, n int, fetch func() (string, error)) { -// for i := 0; i < n; i++ { -// //v, err := l.Get(key, fetch) -// //if err == nil { -// // t.Log("key", key, "value", v) -// //} else { -// // t.Error("key", key, err) -// //} -// l.Get(key, fetch) -// //time.Sleep(time.Second / 100) -// } -// } -// -// tmp := make(map[string]struct{}) -// -// var wg sync.WaitGroup -// for i := 0; i < 10000; i++ { -// wg.Add(1) -// key := fmt.Sprintf("key_%d", i%200) -// tmp[key] = struct{}{} -// go func() { -// defer wg.Done() -// //t.Log(key) -// fn(key, 10000, func() (string, error) { -// //time.Sleep(time.Second * 3) -// //t.Log(time.Now(), "key", key, "fetch") -// //if rand.Uint32()%5 == 0 { -// // return "value_" + key, nil -// //} -// //return "", errors.New("rand error") -// return "value_" + key, nil -// }) -// }() -// -// //wg.Add(1) -// //go func() { -// // defer wg.Done() -// // for i := 0; i < 10; i++ { -// // l.Del(key) -// // time.Sleep(time.Second / 3) -// // } -// //}() -// } -// wg.Wait() -// t.Log(len(tmp)) -// t.Log(target.String()) -// -//} diff --git a/pkg/common/localcache/singleflight.go b/pkg/common/localcache/singleflight.go deleted file mode 100644 index 5dcd70669..000000000 --- a/pkg/common/localcache/singleflight.go +++ /dev/null @@ -1,43 +0,0 @@ -package localcache - -import "sync" - -type call[K comparable, V any] struct { - wg sync.WaitGroup - val V - err error -} - -type SingleFlight[K comparable, V any] struct { - mu sync.Mutex - m map[K]*call[K, V] -} - -func NewSingleFlight[K comparable, V any]() *SingleFlight[K, V] { - return &SingleFlight[K, V]{m: make(map[K]*call[K, V])} -} - -func (r *SingleFlight[K, V]) Do(key K, fn func() (V, error)) (V, error) { - r.mu.Lock() - if r.m == nil { - r.m = make(map[K]*call[K, V]) - } - if c, ok := r.m[key]; ok { - r.mu.Unlock() - c.wg.Wait() - return c.val, c.err - } - c := new(call[K, V]) - c.wg.Add(1) - r.m[key] = c - r.mu.Unlock() - - c.val, c.err = fn() - c.wg.Done() - - r.mu.Lock() - delete(r.m, key) - r.mu.Unlock() - - return c.val, c.err -} diff --git a/pkg/common/localcache/target.go b/pkg/common/localcache/target.go deleted file mode 100644 index 2edf51ddb..000000000 --- a/pkg/common/localcache/target.go +++ /dev/null @@ -1,59 +0,0 @@ -package localcache - -import ( - "fmt" - "sync/atomic" -) - -type Target interface { - IncrGetHit() - IncrGetSuccess() - IncrGetFailed() - - IncrDelHit() - IncrDelNotFound() -} - -type cacheTarget struct { - getHit int64 - getSuccess int64 - getFailed int64 - delHit int64 - delNotFound int64 -} - -func (r *cacheTarget) IncrGetHit() { - atomic.AddInt64(&r.getHit, 1) -} - -func (r *cacheTarget) IncrGetSuccess() { - atomic.AddInt64(&r.getSuccess, 1) -} - -func (r *cacheTarget) IncrGetFailed() { - atomic.AddInt64(&r.getFailed, 1) -} - -func (r *cacheTarget) IncrDelHit() { - atomic.AddInt64(&r.delHit, 1) -} - -func (r *cacheTarget) IncrDelNotFound() { - atomic.AddInt64(&r.delNotFound, 1) -} - -func (r *cacheTarget) String() string { - return fmt.Sprintf("getHit: %d, getSuccess: %d, getFailed: %d, delHit: %d, delNotFound: %d", r.getHit, r.getSuccess, r.getFailed, r.delHit, r.delNotFound) -} - -type emptyTarget struct{} - -func (e emptyTarget) IncrGetHit() {} - -func (e emptyTarget) IncrGetSuccess() {} - -func (e emptyTarget) IncrGetFailed() {} - -func (e emptyTarget) IncrDelHit() {} - -func (e emptyTarget) IncrDelNotFound() {} diff --git a/pkg/common/localcache/timingwheel.go b/pkg/common/localcache/timingwheel.go deleted file mode 100644 index ca1bac033..000000000 --- a/pkg/common/localcache/timingwheel.go +++ /dev/null @@ -1,71 +0,0 @@ -package localcache - -import ( - "sync" - "time" -) - -type Execute[K comparable, V any] func(K, V) - -type Task[K comparable, V any] struct { - key K - value V -} - -type TimeWheel[K comparable, V any] struct { - ticker *time.Ticker - slots [][]Task[K, V] - currentPos int - size int - slotMutex sync.Mutex - execute Execute[K, V] -} - -func NewTimeWheel[K comparable, V any](size int, tickDuration time.Duration, execute Execute[K, V]) *TimeWheel[K, V] { - return &TimeWheel[K, V]{ - ticker: time.NewTicker(tickDuration), - slots: make([][]Task[K, V], size), - currentPos: 0, - size: size, - execute: execute, - } -} - -func (tw *TimeWheel[K, V]) Start() { - for range tw.ticker.C { - tw.tick() - } -} - -func (tw *TimeWheel[K, V]) Stop() { - tw.ticker.Stop() -} - -func (tw *TimeWheel[K, V]) tick() { - tw.slotMutex.Lock() - defer tw.slotMutex.Unlock() - - tasks := tw.slots[tw.currentPos] - tw.slots[tw.currentPos] = nil - if len(tasks) > 0 { - go func(tasks []Task[K, V]) { - for _, task := range tasks { - tw.execute(task.key, task.value) - } - }(tasks) - } - - tw.currentPos = (tw.currentPos + 1) % tw.size -} - -func (tw *TimeWheel[K, V]) AddTask(delay int, task Task[K, V]) { - if delay < 0 || delay >= tw.size { - return - } - - tw.slotMutex.Lock() - defer tw.slotMutex.Unlock() - - pos := (tw.currentPos + delay) % tw.size - tw.slots[pos] = append(tw.slots[pos], task) -} diff --git a/pkg/localcache/business.go b/pkg/localcache/business.go new file mode 100644 index 000000000..c5260b3e2 --- /dev/null +++ b/pkg/localcache/business.go @@ -0,0 +1,71 @@ +package localcache + +// +//import ( +// "context" +// "encoding/json" +// "github.com/OpenIMSDK/tools/log" +// "github.com/dtm-labs/rockscache" +// "github.com/redis/go-redis/v9" +//) +// +//func WithRedisDeleteSubscribe(topic string, cli redis.UniversalClient) Option { +// return WithDeleteLocal(func(fn func(key ...string)) { +// if fn == nil { +// log.ZDebug(context.Background(), "WithRedisDeleteSubscribe fn is nil", "topic", topic) +// return +// } +// msg := cli.Subscribe(context.Background(), topic).Channel() +// for m := range msg { +// log.ZDebug(context.Background(), "WithRedisDeleteSubscribe delete", "topic", m.Channel, "payload", m.Payload) +// var key []string +// if err := json.Unmarshal([]byte(m.Payload), &key); err != nil { +// log.ZError(context.Background(), "WithRedisDeleteSubscribe json unmarshal error", err, "topic", topic, "payload", m.Payload) +// continue +// } +// if len(key) == 0 { +// continue +// } +// fn(key...) +// } +// }) +//} +// +//func WithRedisDeletePublish(topic string, cli redis.UniversalClient) Option { +// return WithDeleteKeyBefore(func(ctx context.Context, key ...string) { +// data, err := json.Marshal(key) +// if err != nil { +// log.ZError(ctx, "json marshal error", err, "topic", topic, "key", key) +// return +// } +// if err := cli.Publish(ctx, topic, data).Err(); err != nil { +// log.ZError(ctx, "redis publish error", err, "topic", topic, "key", key) +// } else { +// log.ZDebug(ctx, "redis publish success", "topic", topic, "key", key) +// } +// }) +//} +// +//func WithRedisDelete(cli redis.UniversalClient) Option { +// return WithDeleteKeyBefore(func(ctx context.Context, key ...string) { +// for _, s := range key { +// if err := cli.Del(ctx, s).Err(); err != nil { +// log.ZError(ctx, "redis delete error", err, "key", s) +// } else { +// log.ZDebug(ctx, "redis delete success", "key", s) +// } +// } +// }) +//} +// +//func WithRocksCacheDelete(cli *rockscache.Client) Option { +// return WithDeleteKeyBefore(func(ctx context.Context, key ...string) { +// for _, k := range key { +// if err := cli.TagAsDeleted2(ctx, k); err != nil { +// log.ZError(ctx, "rocksdb delete error", err, "key", k) +// } else { +// log.ZDebug(ctx, "rocksdb delete success", "key", k) +// } +// } +// }) +//} diff --git a/pkg/common/localcache/cache.go b/pkg/localcache/cache.go similarity index 88% rename from pkg/common/localcache/cache.go rename to pkg/localcache/cache.go index 56cf14ff8..2d2e8018e 100644 --- a/pkg/common/localcache/cache.go +++ b/pkg/localcache/cache.go @@ -2,9 +2,9 @@ package localcache import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/link" - "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/local" - lopt "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/option" + "github.com/openimsdk/localcache/link" + "github.com/openimsdk/localcache/local" + lopt "github.com/openimsdk/localcache/option" ) type Cache[V any] interface { diff --git a/pkg/localcache/go.mod b/pkg/localcache/go.mod new file mode 100644 index 000000000..5f0793042 --- /dev/null +++ b/pkg/localcache/go.mod @@ -0,0 +1,5 @@ +module github.com/openimsdk/localcache + +go 1.19 + +require github.com/hashicorp/golang-lru/v2 v2.0.7 diff --git a/pkg/common/localcache/link/link.go b/pkg/localcache/link/link.go similarity index 100% rename from pkg/common/localcache/link/link.go rename to pkg/localcache/link/link.go diff --git a/pkg/common/localcache/link/link_test.go b/pkg/localcache/link/link_test.go similarity index 100% rename from pkg/common/localcache/link/link_test.go rename to pkg/localcache/link/link_test.go diff --git a/pkg/localcache/local/cache.go b/pkg/localcache/local/cache.go new file mode 100644 index 000000000..06569965e --- /dev/null +++ b/pkg/localcache/local/cache.go @@ -0,0 +1,50 @@ +package local + +import ( + "hash/fnv" + "time" + "unsafe" +) + +type Cache[V any] interface { + Get(key string, fetch func() (V, error)) (V, error) + Del(key string) bool +} + +func NewCache[V any](slotNum, slotSize int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[string, V]) Cache[V] { + c := &slot[V]{ + n: uint64(slotNum), + slots: make([]*LRU[string, V], slotNum), + target: target, + } + for i := 0; i < slotNum; i++ { + c.slots[i] = NewLRU[string, V](slotSize, successTTL, failedTTL, c.target, onEvict) + } + return c +} + +type slot[V any] struct { + n uint64 + slots []*LRU[string, V] + target Target +} + +func (c *slot[V]) index(s string) uint64 { + h := fnv.New64a() + _, _ = h.Write(*(*[]byte)(unsafe.Pointer(&s))) + return h.Sum64() % c.n +} + +func (c *slot[V]) Get(key string, fetch func() (V, error)) (V, error) { + return c.slots[c.index(key)].Get(key, fetch) +} + +func (c *slot[V]) Del(key string) bool { + if c.slots[c.index(key)].Del(key) { + c.target.IncrDelHit() + return true + } else { + c.target.IncrDelNotFound() + return false + } +} diff --git a/pkg/common/localcache/callback.go b/pkg/localcache/local/callback.go similarity index 86% rename from pkg/common/localcache/callback.go rename to pkg/localcache/local/callback.go index 4bd37a2c2..32aef112b 100644 --- a/pkg/common/localcache/callback.go +++ b/pkg/localcache/local/callback.go @@ -1,4 +1,4 @@ -package localcache +package local import "github.com/hashicorp/golang-lru/v2/simplelru" diff --git a/pkg/common/localcache/lru.go b/pkg/localcache/local/lru.go similarity index 91% rename from pkg/common/localcache/lru.go rename to pkg/localcache/local/lru.go index 4fd1704d2..45dc3b651 100644 --- a/pkg/common/localcache/lru.go +++ b/pkg/localcache/local/lru.go @@ -1,4 +1,4 @@ -package localcache +package local import ( "github.com/hashicorp/golang-lru/v2/simplelru" @@ -30,7 +30,6 @@ func NewLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, successTTL: successTTL, failedTTL: failedTTL, target: target, - s: NewSingleFlight[K, V](), } } @@ -40,7 +39,6 @@ type LRU[K comparable, V any] struct { successTTL time.Duration failedTTL time.Duration target Target - s *SingleFlight[K, V] } func (x *LRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { @@ -80,10 +78,5 @@ func (x *LRU[K, V]) Del(key K) bool { x.lock.Lock() ok := x.core.Remove(key) x.lock.Unlock() - if ok { - x.target.IncrDelHit() - } else { - x.target.IncrDelNotFound() - } return ok } diff --git a/pkg/localcache/local/lru_test.go b/pkg/localcache/local/lru_test.go new file mode 100644 index 000000000..a6e7553ee --- /dev/null +++ b/pkg/localcache/local/lru_test.go @@ -0,0 +1,95 @@ +package local + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" +) + +type cacheTarget struct { + getHit int64 + getSuccess int64 + getFailed int64 + delHit int64 + delNotFound int64 +} + +func (r *cacheTarget) IncrGetHit() { + atomic.AddInt64(&r.getHit, 1) +} + +func (r *cacheTarget) IncrGetSuccess() { + atomic.AddInt64(&r.getSuccess, 1) +} + +func (r *cacheTarget) IncrGetFailed() { + atomic.AddInt64(&r.getFailed, 1) +} + +func (r *cacheTarget) IncrDelHit() { + atomic.AddInt64(&r.delHit, 1) +} + +func (r *cacheTarget) IncrDelNotFound() { + atomic.AddInt64(&r.delNotFound, 1) +} + +func (r *cacheTarget) String() string { + return fmt.Sprintf("getHit: %d, getSuccess: %d, getFailed: %d, delHit: %d, delNotFound: %d", r.getHit, r.getSuccess, r.getFailed, r.delHit, r.delNotFound) +} + +func TestName(t *testing.T) { + target := &cacheTarget{} + l := NewCache[string](100, 1000, time.Second*20, time.Second*5, target, nil) + //l := NewLRU[string, string](1000, time.Second*20, time.Second*5, target) + + fn := func(key string, n int, fetch func() (string, error)) { + for i := 0; i < n; i++ { + //v, err := l.Get(key, fetch) + //if err == nil { + // t.Log("key", key, "value", v) + //} else { + // t.Error("key", key, err) + //} + l.Get(key, fetch) + //time.Sleep(time.Second / 100) + } + } + + tmp := make(map[string]struct{}) + + var wg sync.WaitGroup + for i := 0; i < 10000; i++ { + wg.Add(1) + key := fmt.Sprintf("key_%d", i%200) + tmp[key] = struct{}{} + go func() { + defer wg.Done() + //t.Log(key) + fn(key, 10000, func() (string, error) { + //time.Sleep(time.Second * 3) + //t.Log(time.Now(), "key", key, "fetch") + //if rand.Uint32()%5 == 0 { + // return "value_" + key, nil + //} + //return "", errors.New("rand error") + return "value_" + key, nil + }) + }() + + //wg.Add(1) + //go func() { + // defer wg.Done() + // for i := 0; i < 10; i++ { + // l.Del(key) + // time.Sleep(time.Second / 3) + // } + //}() + } + wg.Wait() + t.Log(len(tmp)) + t.Log(target.String()) + +} diff --git a/pkg/localcache/local/target.go b/pkg/localcache/local/target.go new file mode 100644 index 000000000..6cb134fb0 --- /dev/null +++ b/pkg/localcache/local/target.go @@ -0,0 +1,10 @@ +package local + +type Target interface { + IncrGetHit() + IncrGetSuccess() + IncrGetFailed() + + IncrDelHit() + IncrDelNotFound() +} diff --git a/pkg/common/localcache/option.go b/pkg/localcache/option.go similarity index 97% rename from pkg/common/localcache/option.go rename to pkg/localcache/option.go index f23a04e68..9f77bc502 100644 --- a/pkg/common/localcache/option.go +++ b/pkg/localcache/option.go @@ -2,7 +2,7 @@ package localcache import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/local" + "github.com/openimsdk/localcache/local" "time" ) diff --git a/pkg/common/localcache/option/option.go b/pkg/localcache/option/option.go similarity index 100% rename from pkg/common/localcache/option/option.go rename to pkg/localcache/option/option.go diff --git a/pkg/common/localcache/tool.go b/pkg/localcache/tool.go similarity index 100% rename from pkg/common/localcache/tool.go rename to pkg/localcache/tool.go diff --git a/pkg/rpccache/conversation.go b/pkg/rpccache/conversation.go index 181b8fd8a..5a76990d0 100644 --- a/pkg/rpccache/conversation.go +++ b/pkg/rpccache/conversation.go @@ -2,9 +2,9 @@ package rpccache import ( "context" + "github.com/openimsdk/localcache" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/localcache" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/redis/go-redis/v9" ) diff --git a/pkg/rpccache/friend.go b/pkg/rpccache/friend.go index 82d0a03a5..6d640ea8f 100644 --- a/pkg/rpccache/friend.go +++ b/pkg/rpccache/friend.go @@ -3,10 +3,10 @@ package rpccache import ( "context" "github.com/OpenIMSDK/tools/log" + "github.com/openimsdk/localcache" + "github.com/openimsdk/localcache/option" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/localcache" - "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/option" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/redis/go-redis/v9" ) diff --git a/pkg/rpccache/group.go b/pkg/rpccache/group.go index 0c3bc6b93..3c3333b92 100644 --- a/pkg/rpccache/group.go +++ b/pkg/rpccache/group.go @@ -2,9 +2,9 @@ package rpccache import ( "context" + "github.com/openimsdk/localcache" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/localcache" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/redis/go-redis/v9" )