From 72102b0378b85297a31e760763c8954db266bbe3 Mon Sep 17 00:00:00 2001 From: icey-yu <1186114839@qq.com> Date: Mon, 9 Sep 2024 17:02:03 +0800 Subject: [PATCH] feat: lock in online cache --- internal/push/push_handler.go | 2 ++ pkg/rpccache/online.go | 7 ++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 59649e4da..6b1df9efa 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -125,9 +125,11 @@ func (*ConsumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil } func (*ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil } func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + c.onlineCache.Lock.Lock() for c.onlineCache.CurrentPhase < rpccache.DoSubscribeOver { c.onlineCache.Cond.Wait() } + c.onlineCache.Lock.Unlock() for msg := range claim.Messages() { ctx := c.pushConsumerGroup.GetContextFromMsg(msg) c.handleMs2PsChat(ctx, msg.Value) diff --git a/pkg/rpccache/online.go b/pkg/rpccache/online.go index ac2a211f8..f41f7877f 100644 --- a/pkg/rpccache/online.go +++ b/pkg/rpccache/online.go @@ -23,11 +23,13 @@ import ( ) func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb redis.UniversalClient, fullUserCache bool, fn func(ctx context.Context, userID string, platformIDs []int32)) (*OnlineCache, error) { + l := &sync.Mutex{} x := &OnlineCache{ user: user, group: group, fullUserCache: fullUserCache, - Cond: sync.NewCond(&sync.Mutex{}), + Lock: l, + Cond: sync.NewCond(l), } switch x.fullUserCache { @@ -72,6 +74,7 @@ type OnlineCache struct { lruCache lru.LRU[string, []int32] mapCache *cacheutil.Cache[string, []int32] + Lock *sync.Mutex Cond *sync.Cond CurrentPhase initPhase } @@ -131,10 +134,12 @@ func (o *OnlineCache) initUsersOnlineStatus(ctx context.Context) (err error) { func (o *OnlineCache) doSubscribe(rdb redis.UniversalClient, fn func(ctx context.Context, userID string, platformIDs []int32)) { ctx := mcontext.SetOperationID(context.Background(), cachekey.OnlineChannel+strconv.FormatUint(rand.Uint64(), 10)) + o.Lock.Lock() ch := rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() for o.CurrentPhase < DoOnlineStatusOver { o.Cond.Wait() } + o.Lock.Unlock() doMessage := func(message *redis.Message) { userID, platformIDs, err := useronline.ParseUserOnlineStatus(message.Payload)