fix: atomic online cache status

pull/2608/head
icey-yu 1 year ago
parent 3973b52fa9
commit 42245931ad

@ -116,7 +116,7 @@ func (*ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil
func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
c.onlineCache.Lock.Lock() c.onlineCache.Lock.Lock()
for c.onlineCache.CurrentPhase < rpccache.DoSubscribeOver { for c.onlineCache.CurrentPhase.Load() < rpccache.DoSubscribeOver {
c.onlineCache.Cond.Wait() c.onlineCache.Cond.Wait()
} }
c.onlineCache.Lock.Unlock() c.onlineCache.Lock.Unlock()

@ -69,7 +69,7 @@ func (c *Client) AsyncPost(ctx context.Context, command string, req callbackstru
func (c *Client) post(ctx context.Context, command string, input interface{}, output callbackstruct.CallbackResp, timeout int) error { func (c *Client) post(ctx context.Context, command string, input interface{}, output callbackstruct.CallbackResp, timeout int) error {
ctx = mcontext.WithMustInfoCtx([]string{mcontext.GetOperationID(ctx), mcontext.GetOpUserID(ctx), mcontext.GetOpUserPlatform(ctx), mcontext.GetConnID(ctx)}) ctx = mcontext.WithMustInfoCtx([]string{mcontext.GetOperationID(ctx), mcontext.GetOpUserID(ctx), mcontext.GetOpUserPlatform(ctx), mcontext.GetConnID(ctx)})
fullURL := c.url + "/" + command fullURL := c.url + "/" + command
log.ZDebug(ctx, "webhook", "url", fullURL, "input", input, "config", timeout) log.ZInfo(ctx, "webhook", "url", fullURL, "input", input, "config", timeout)
operationID, _ := ctx.Value(constant.OperationID).(string) operationID, _ := ctx.Value(constant.OperationID).(string)
b, err := c.client.Post(ctx, fullURL, map[string]string{constant.OperationID: operationID}, input, timeout) b, err := c.client.Post(ctx, fullURL, map[string]string{constant.OperationID: operationID}, input, timeout)
if err != nil { if err != nil {
@ -81,6 +81,6 @@ func (c *Client) post(ctx context.Context, command string, input interface{}, ou
if err := output.Parse(); err != nil { if err := output.Parse(); err != nil {
return err return err
} }
log.ZDebug(ctx, "webhook success", "url", fullURL, "input", input, "response", string(b)) log.ZInfo(ctx, "webhook success", "url", fullURL, "input", input, "response", string(b))
return nil return nil
} }

@ -48,7 +48,7 @@ func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb re
x.lruCache = lru.NewSlotLRU(1024, localcache.LRUStringHash, func() lru.LRU[string, []int32] { x.lruCache = lru.NewSlotLRU(1024, localcache.LRUStringHash, func() lru.LRU[string, []int32] {
return lru.NewLayLRU[string, []int32](2048, cachekey.OnlineExpire/2, time.Second*3, localcache.EmptyTarget{}, func(key string, value []int32) {}) return lru.NewLayLRU[string, []int32](2048, cachekey.OnlineExpire/2, time.Second*3, localcache.EmptyTarget{}, func(key string, value []int32) {})
}) })
x.CurrentPhase = DoSubscribeOver x.CurrentPhase.Store(DoSubscribeOver)
x.Cond.Broadcast() x.Cond.Broadcast()
} }
@ -58,10 +58,8 @@ func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb re
return x, nil return x, nil
} }
type initPhase int
const ( const (
Begin initPhase = iota Begin uint32 = iota
DoOnlineStatusOver DoOnlineStatusOver
DoSubscribeOver DoSubscribeOver
) )
@ -79,7 +77,7 @@ type OnlineCache struct {
Lock *sync.Mutex Lock *sync.Mutex
Cond *sync.Cond Cond *sync.Cond
CurrentPhase initPhase CurrentPhase atomic.Uint32
} }
func (o *OnlineCache) initUsersOnlineStatus(ctx context.Context) (err error) { func (o *OnlineCache) initUsersOnlineStatus(ctx context.Context) (err error) {
@ -95,7 +93,7 @@ func (o *OnlineCache) initUsersOnlineStatus(ctx context.Context) (err error) {
defer func(t time.Time) { defer func(t time.Time) {
log.ZInfo(ctx, "init users online status end", "cost", time.Since(t), "totalSet", totalSet.Load()) log.ZInfo(ctx, "init users online status end", "cost", time.Since(t), "totalSet", totalSet.Load())
o.CurrentPhase = DoOnlineStatusOver o.CurrentPhase.Store(DoOnlineStatusOver)
o.Cond.Broadcast() o.Cond.Broadcast()
}(time.Now()) }(time.Now())
@ -138,7 +136,7 @@ func (o *OnlineCache) initUsersOnlineStatus(ctx context.Context) (err error) {
func (o *OnlineCache) doSubscribe(ctx context.Context, rdb redis.UniversalClient, fn func(ctx context.Context, userID string, platformIDs []int32)) { func (o *OnlineCache) doSubscribe(ctx context.Context, rdb redis.UniversalClient, fn func(ctx context.Context, userID string, platformIDs []int32)) {
o.Lock.Lock() o.Lock.Lock()
ch := rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() ch := rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel()
for o.CurrentPhase < DoOnlineStatusOver { for o.CurrentPhase.Load() < DoOnlineStatusOver {
o.Cond.Wait() o.Cond.Wait()
} }
o.Lock.Unlock() o.Lock.Unlock()
@ -168,13 +166,13 @@ func (o *OnlineCache) doSubscribe(ctx context.Context, rdb redis.UniversalClient
} }
} }
if o.CurrentPhase == DoOnlineStatusOver { if o.CurrentPhase.Load() == DoOnlineStatusOver {
for done := false; !done; { for done := false; !done; {
select { select {
case message := <-ch: case message := <-ch:
doMessage(message) doMessage(message)
default: default:
o.CurrentPhase = DoSubscribeOver o.CurrentPhase.Store(DoSubscribeOver)
o.Cond.Broadcast() o.Cond.Broadcast()
done = true done = true
} }

Loading…
Cancel
Save