diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index fc5a00210..4ecf20de5 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -116,7 +116,7 @@ func (*ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { c.onlineCache.Lock.Lock() - for c.onlineCache.CurrentPhase < rpccache.DoSubscribeOver { + for c.onlineCache.CurrentPhase.Load() < rpccache.DoSubscribeOver { c.onlineCache.Cond.Wait() } c.onlineCache.Lock.Unlock() diff --git a/pkg/common/webhook/http_client.go b/pkg/common/webhook/http_client.go index 14fe51beb..e46f08806 100644 --- a/pkg/common/webhook/http_client.go +++ b/pkg/common/webhook/http_client.go @@ -69,7 +69,7 @@ func (c *Client) AsyncPost(ctx context.Context, command string, req callbackstru func (c *Client) post(ctx context.Context, command string, input interface{}, output callbackstruct.CallbackResp, timeout int) error { ctx = mcontext.WithMustInfoCtx([]string{mcontext.GetOperationID(ctx), mcontext.GetOpUserID(ctx), mcontext.GetOpUserPlatform(ctx), mcontext.GetConnID(ctx)}) fullURL := c.url + "/" + command - log.ZDebug(ctx, "webhook", "url", fullURL, "input", input, "config", timeout) + log.ZInfo(ctx, "webhook", "url", fullURL, "input", input, "config", timeout) operationID, _ := ctx.Value(constant.OperationID).(string) b, err := c.client.Post(ctx, fullURL, map[string]string{constant.OperationID: operationID}, input, timeout) if err != nil { @@ -81,6 +81,6 @@ func (c *Client) post(ctx context.Context, command string, input interface{}, ou if err := output.Parse(); err != nil { return err } - log.ZDebug(ctx, "webhook success", "url", fullURL, "input", input, "response", string(b)) + log.ZInfo(ctx, "webhook success", "url", fullURL, "input", input, "response", string(b)) return nil } diff --git a/pkg/rpccache/online.go b/pkg/rpccache/online.go index a66c77292..a02a0662d 100644 --- a/pkg/rpccache/online.go +++ b/pkg/rpccache/online.go @@ -48,7 +48,7 @@ func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb re x.lruCache = lru.NewSlotLRU(1024, localcache.LRUStringHash, func() lru.LRU[string, []int32] { return lru.NewLayLRU[string, []int32](2048, cachekey.OnlineExpire/2, time.Second*3, localcache.EmptyTarget{}, func(key string, value []int32) {}) }) - x.CurrentPhase = DoSubscribeOver + x.CurrentPhase.Store(DoSubscribeOver) x.Cond.Broadcast() } @@ -58,10 +58,8 @@ func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb re return x, nil } -type initPhase int - const ( - Begin initPhase = iota + Begin uint32 = iota DoOnlineStatusOver DoSubscribeOver ) @@ -79,7 +77,7 @@ type OnlineCache struct { Lock *sync.Mutex Cond *sync.Cond - CurrentPhase initPhase + CurrentPhase atomic.Uint32 } func (o *OnlineCache) initUsersOnlineStatus(ctx context.Context) (err error) { @@ -95,7 +93,7 @@ func (o *OnlineCache) initUsersOnlineStatus(ctx context.Context) (err error) { defer func(t time.Time) { log.ZInfo(ctx, "init users online status end", "cost", time.Since(t), "totalSet", totalSet.Load()) - o.CurrentPhase = DoOnlineStatusOver + o.CurrentPhase.Store(DoOnlineStatusOver) o.Cond.Broadcast() }(time.Now()) @@ -138,7 +136,7 @@ func (o *OnlineCache) initUsersOnlineStatus(ctx context.Context) (err error) { func (o *OnlineCache) doSubscribe(ctx context.Context, rdb redis.UniversalClient, fn func(ctx context.Context, userID string, platformIDs []int32)) { o.Lock.Lock() ch := rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() - for o.CurrentPhase < DoOnlineStatusOver { + for o.CurrentPhase.Load() < DoOnlineStatusOver { o.Cond.Wait() } o.Lock.Unlock() @@ -168,13 +166,13 @@ func (o *OnlineCache) doSubscribe(ctx context.Context, rdb redis.UniversalClient } } - if o.CurrentPhase == DoOnlineStatusOver { + if o.CurrentPhase.Load() == DoOnlineStatusOver { for done := false; !done; { select { case message := <-ch: doMessage(message) default: - o.CurrentPhase = DoSubscribeOver + o.CurrentPhase.Store(DoSubscribeOver) o.Cond.Broadcast() done = true }