From 04f56a9c0070263aa3cb4eecc1650518d173b5f0 Mon Sep 17 00:00:00 2001 From: icey-yu <1186114839@qq.com> Date: Sat, 7 Sep 2024 17:13:17 +0800 Subject: [PATCH] feat: init --- internal/push/push_handler.go | 1 + pkg/rpccache/online.go | 91 ++++++++++++++++++++++++++--------- 2 files changed, 70 insertions(+), 22 deletions(-) diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index c1ad98f61..f15f5403f 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -125,6 +125,7 @@ func (*ConsumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil } func (*ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil } func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + for msg := range claim.Messages() { ctx := c.pushConsumerGroup.GetContextFromMsg(msg) c.handleMs2PsChat(ctx, msg.Value) diff --git a/pkg/rpccache/online.go b/pkg/rpccache/online.go index 4e86560a4..1db31f463 100644 --- a/pkg/rpccache/online.go +++ b/pkg/rpccache/online.go @@ -2,8 +2,11 @@ package rpccache import ( "context" + "github.com/openimsdk/protocol/user" + "golang.org/x/sync/errgroup" "math/rand" "strconv" + "sync/atomic" "time" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" @@ -23,18 +26,23 @@ func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb re user: user, group: group, fullUserCache: fullUserCache, + initDone: make(chan struct{}), } switch x.fullUserCache { case true: x.mapCache = cacheutil.NewCache[string, []int32]() - if err := x.initUsersOnlineStatus(mcontext.SetOperationID(context.TODO(), strconv.FormatInt(time.Now().UnixNano()+int64(rand.Uint32()), 10))); err != nil { - return nil, err - } + go func() { + ctx := mcontext.SetOperationID(context.TODO(), strconv.FormatInt(time.Now().UnixNano()+int64(rand.Uint32()), 10)) + if err := x.initUsersOnlineStatus(ctx); err != nil { + log.ZError(ctx, "initUsersOnlineStatus failed", err) + } + }() case false: x.lruCache = lru.NewSlotLRU(1024, localcache.LRUStringHash, func() lru.LRU[string, []int32] { return lru.NewLayLRU[string, []int32](2048, cachekey.OnlineExpire/2, time.Second*3, localcache.EmptyTarget{}, func(key string, value []int32) {}) }) + close(x.initDone) } go func() { @@ -77,42 +85,81 @@ type OnlineCache struct { lruCache lru.LRU[string, []int32] mapCache *cacheutil.Cache[string, []int32] + initDone chan struct{} } func (o *OnlineCache) initUsersOnlineStatus(ctx context.Context) error { log.ZDebug(ctx, "init users online status begin") + defer func() { + close(o.initDone) + }() var ( - totalSet int + totalSet atomic.Int64 + gr, _ = errgroup.WithContext(ctx) ) + gr.SetLimit(10) time.Sleep(time.Second * 10) defer func(t time.Time) { - log.ZInfo(ctx, "init users online status end", "cost", time.Since(t), "totalSet", totalSet) + log.ZInfo(ctx, "init users online status end", "cost", time.Since(t), "totalSet", totalSet.Load()) }(time.Now()) - for page := int32(1); ; page++ { - resp, err := o.user.GetAllUserID(ctx, page, constant.ParamMaxLength) - if err != nil { - return err - } + page := int32(1) + resp, err := o.user.GetAllUserID(ctx, page, constant.ParamMaxLength) + if err != nil { + return err + } + for page = 2; (page-1)*constant.ParamMaxLength < resp.Total; page++ { + page := page - usersStatus, err := o.user.GetUsersOnlinePlatform(ctx, resp.UserIDs) - if err != nil { - return err - } + gr.Go(func() error { + var ( + usersStatus []*user.OnlineStatus + resp *user.GetAllUserIDResp + err error + maxTries = 5 + ) - for _, user := range usersStatus { - if user.Status == constant.Online { - o.setUserOnline(user.UserID, user.PlatformIDs) + for i := 0; i < maxTries; i++ { + resp, err = o.user.GetAllUserID(ctx, page, constant.ParamMaxLength) + if err != nil { + log.ZWarn(ctx, "initUsersOnlineStatus: GetAllUserID failed", err, "page", page, "retries", i+1) + time.Sleep(time.Second * 5) + } else { + break + } + } + if err != nil { + return err } - totalSet++ - } - if len(resp.UserIDs) < constant.ParamMaxLength { - break - } + for i := 0; i < maxTries; i++ { + usersStatus, err = o.user.GetUsersOnlinePlatform(ctx, resp.UserIDs) + if err != nil { + log.ZError(ctx, "initUsersOnlineStatus: GetAllUserID failed", err, "page", page, "retries", i+1) + time.Sleep(time.Second * 5) + } else { + break + } + } + if err != nil { + return err + } + + for _, u := range usersStatus { + if u.Status == constant.Online { + o.setUserOnline(u.UserID, u.PlatformIDs) + } + totalSet.Add(1) + } + return nil + }) + + } + if err = gr.Wait(); err != nil { + return err } return nil }