feat: lock in online cache

pull/2608/head
icey-yu 1 year ago
parent 95d2227da2
commit 72102b0378

@ -125,9 +125,11 @@ func (*ConsumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil }
func (*ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil } func (*ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }
func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
c.onlineCache.Lock.Lock()
for c.onlineCache.CurrentPhase < rpccache.DoSubscribeOver { for c.onlineCache.CurrentPhase < rpccache.DoSubscribeOver {
c.onlineCache.Cond.Wait() c.onlineCache.Cond.Wait()
} }
c.onlineCache.Lock.Unlock()
for msg := range claim.Messages() { for msg := range claim.Messages() {
ctx := c.pushConsumerGroup.GetContextFromMsg(msg) ctx := c.pushConsumerGroup.GetContextFromMsg(msg)
c.handleMs2PsChat(ctx, msg.Value) c.handleMs2PsChat(ctx, msg.Value)

@ -23,11 +23,13 @@ import (
) )
func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb redis.UniversalClient, fullUserCache bool, fn func(ctx context.Context, userID string, platformIDs []int32)) (*OnlineCache, error) { func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb redis.UniversalClient, fullUserCache bool, fn func(ctx context.Context, userID string, platformIDs []int32)) (*OnlineCache, error) {
l := &sync.Mutex{}
x := &OnlineCache{ x := &OnlineCache{
user: user, user: user,
group: group, group: group,
fullUserCache: fullUserCache, fullUserCache: fullUserCache,
Cond: sync.NewCond(&sync.Mutex{}), Lock: l,
Cond: sync.NewCond(l),
} }
switch x.fullUserCache { switch x.fullUserCache {
@ -72,6 +74,7 @@ type OnlineCache struct {
lruCache lru.LRU[string, []int32] lruCache lru.LRU[string, []int32]
mapCache *cacheutil.Cache[string, []int32] mapCache *cacheutil.Cache[string, []int32]
Lock *sync.Mutex
Cond *sync.Cond Cond *sync.Cond
CurrentPhase initPhase CurrentPhase initPhase
} }
@ -131,10 +134,12 @@ func (o *OnlineCache) initUsersOnlineStatus(ctx context.Context) (err error) {
func (o *OnlineCache) doSubscribe(rdb redis.UniversalClient, fn func(ctx context.Context, userID string, platformIDs []int32)) { func (o *OnlineCache) doSubscribe(rdb redis.UniversalClient, fn func(ctx context.Context, userID string, platformIDs []int32)) {
ctx := mcontext.SetOperationID(context.Background(), cachekey.OnlineChannel+strconv.FormatUint(rand.Uint64(), 10)) ctx := mcontext.SetOperationID(context.Background(), cachekey.OnlineChannel+strconv.FormatUint(rand.Uint64(), 10))
o.Lock.Lock()
ch := rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() ch := rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel()
for o.CurrentPhase < DoOnlineStatusOver { for o.CurrentPhase < DoOnlineStatusOver {
o.Cond.Wait() o.Cond.Wait()
} }
o.Lock.Unlock()
doMessage := func(message *redis.Message) { doMessage := func(message *redis.Message) {
userID, platformIDs, err := useronline.ParseUserOnlineStatus(message.Payload) userID, platformIDs, err := useronline.ParseUserOnlineStatus(message.Payload)

Loading…
Cancel
Save