From 95d2227da26750a39d64e645410a7be8ab1cfa79 Mon Sep 17 00:00:00 2001 From: icey-yu <1186114839@qq.com> Date: Mon, 9 Sep 2024 16:40:57 +0800 Subject: [PATCH] feat: get all online users and init push --- go.mod | 2 +- go.sum | 4 +- internal/push/push_handler.go | 4 +- internal/rpc/user/online.go | 20 +++ pkg/common/storage/cache/online.go | 1 + pkg/common/storage/cache/redis/online.go | 31 +++++ pkg/rpccache/online.go | 163 ++++++++++++----------- pkg/rpcclient/user.go | 4 + 8 files changed, 150 insertions(+), 79 deletions(-) diff --git a/go.mod b/go.mod index a1de58f3a..bbff40635 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.15 + github.com/openimsdk/protocol v0.0.72-alpha.18 github.com/openimsdk/tools v0.0.50-alpha.12 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index 1f18de864..1c878dd6a 100644 --- a/go.sum +++ b/go.sum @@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.15 h1:m7WwSVLjuXGnk6AzduCHyzNh5rqxduvakeicKf6UMLE= -github.com/openimsdk/protocol v0.0.72-alpha.15/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.72-alpha.18 h1:EytTtgZuXMG1cgTlJryqXXSO1J3t3wrLIn3Os2PRBEE= +github.com/openimsdk/protocol v0.0.72-alpha.18/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.50-alpha.12 h1:rV3BxgqN+F79vZvdoQ+97Eob8ScsRVEM8D+Wrcl23uo= github.com/openimsdk/tools v0.0.50-alpha.12/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 50f35448d..59649e4da 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -125,7 +125,9 @@ func (*ConsumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil } func (*ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil } func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { - + for c.onlineCache.CurrentPhase < rpccache.DoSubscribeOver { + c.onlineCache.Cond.Wait() + } for msg := range claim.Messages() { ctx := c.pushConsumerGroup.GetContextFromMsg(msg) c.handleMs2PsChat(ctx, msg.Value) diff --git a/internal/rpc/user/online.go b/internal/rpc/user/online.go index 5ac4085a9..4e7823306 100644 --- a/internal/rpc/user/online.go +++ b/internal/rpc/user/online.go @@ -2,6 +2,7 @@ package user import ( "context" + "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/protocol/constant" pbuser "github.com/openimsdk/protocol/user" @@ -81,3 +82,22 @@ func (s *userServer) SetUserOnlineStatus(ctx context.Context, req *pbuser.SetUse } return &pbuser.SetUserOnlineStatusResp{}, nil } + +func (s *userServer) GetAllOnlineUsers(ctx context.Context, req *pbuser.GetAllOnlineUsersReq) (*pbuser.GetAllOnlineUsersResp, error) { + resMap, nextCursor, err := s.online.GetAllOnlineUsers(ctx, req.Cursor) + if err != nil { + return nil, err + } + resp := &pbuser.GetAllOnlineUsersResp{ + StatusList: make([]*pbuser.OnlineStatus, 0, len(resMap)), + NextCursor: nextCursor, + } + for userID, plats := range resMap { + resp.StatusList = append(resp.StatusList, &pbuser.OnlineStatus{ + UserID: userID, + Status: int32(datautil.If(len(plats) > 0, constant.Online, constant.Offline)), + PlatformIDs: plats, + }) + } + return resp, nil +} diff --git a/pkg/common/storage/cache/online.go b/pkg/common/storage/cache/online.go index 7669c8a11..d21ae616a 100644 --- a/pkg/common/storage/cache/online.go +++ b/pkg/common/storage/cache/online.go @@ -5,4 +5,5 @@ import "context" type OnlineCache interface { GetOnline(ctx context.Context, userID string) ([]int32, error) SetUserOnline(ctx context.Context, userID string, online, offline []int32) error + GetAllOnlineUsers(ctx context.Context, cursor uint64) (map[string][]int32, uint64, error) } diff --git a/pkg/common/storage/cache/redis/online.go b/pkg/common/storage/cache/redis/online.go index ee1db7e23..8b2efa160 100644 --- a/pkg/common/storage/cache/redis/online.go +++ b/pkg/common/storage/cache/redis/online.go @@ -2,8 +2,10 @@ package redis import ( "context" + "fmt" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" + "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/redis/go-redis/v9" @@ -49,6 +51,35 @@ func (s *userOnline) GetOnline(ctx context.Context, userID string) ([]int32, err return platformIDs, nil } +func (s *userOnline) GetAllOnlineUsers(ctx context.Context, cursor uint64) (map[string][]int32, uint64, error) { + result := make(map[string][]int32) + + keys, nextCursor, err := s.rdb.Scan(ctx, cursor, fmt.Sprintf("%s*", cachekey.OnlineKey), constant.ParamMaxLength).Result() + if err != nil { + return nil, 0, err + } + + for _, key := range keys { + strValues, err := s.rdb.ZRange(ctx, key, 0, -1).Result() + if err != nil { + return nil, 0, err + } + + values := make([]int32, 0, len(strValues)) + for _, value := range strValues { + intValue, err := strconv.Atoi(value) + if err != nil { + return nil, 0, errs.Wrap(err) + } + values = append(values, int32(intValue)) + } + + result[key] = values + } + + return result, nextCursor, nil +} + func (s *userOnline) SetUserOnline(ctx context.Context, userID string, online, offline []int32) error { script := ` local key = KEYS[1] diff --git a/pkg/rpccache/online.go b/pkg/rpccache/online.go index 9b3a27e31..ac2a211f8 100644 --- a/pkg/rpccache/online.go +++ b/pkg/rpccache/online.go @@ -3,10 +3,11 @@ package rpccache import ( "context" "fmt" + "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/user" - "golang.org/x/sync/errgroup" "math/rand" "strconv" + "sync" "sync/atomic" "time" @@ -15,7 +16,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/localcache/lru" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/util/useronline" - "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/db/cacheutil" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" @@ -27,7 +27,7 @@ func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb re user: user, group: group, fullUserCache: fullUserCache, - initDone: make(chan struct{}), + Cond: sync.NewCond(&sync.Mutex{}), } switch x.fullUserCache { @@ -43,39 +43,24 @@ func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb re x.lruCache = lru.NewSlotLRU(1024, localcache.LRUStringHash, func() lru.LRU[string, []int32] { return lru.NewLayLRU[string, []int32](2048, cachekey.OnlineExpire/2, time.Second*3, localcache.EmptyTarget{}, func(key string, value []int32) {}) }) - close(x.initDone) + x.CurrentPhase = DoSubscribeOver + x.Cond.Broadcast() } go func() { - ctx := mcontext.SetOperationID(context.Background(), cachekey.OnlineChannel+strconv.FormatUint(rand.Uint64(), 10)) - for message := range rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() { - userID, platformIDs, err := useronline.ParseUserOnlineStatus(message.Payload) - if err != nil { - log.ZError(ctx, "OnlineCache setHasUserOnline redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel) - continue - } - - switch x.fullUserCache { - case true: - if len(platformIDs) == 0 { - // offline - x.mapCache.Delete(userID) - } else { - x.mapCache.Store(userID, platformIDs) - } - case false: - storageCache := x.setHasUserOnline(userID, platformIDs) - log.ZDebug(ctx, "OnlineCache setHasUserOnline", "userID", userID, "platformIDs", platformIDs, "payload", message.Payload, "storageCache", storageCache) - if fn != nil { - fn(ctx, userID, platformIDs) - } - } - - } + x.doSubscribe(rdb, fn) }() return x, nil } +type initPhase int + +const ( + Begin initPhase = iota + DoOnlineStatusOver + DoSubscribeOver +) + type OnlineCache struct { user rpcclient.UserRpcClient group *GroupLocalCache @@ -86,83 +71,111 @@ type OnlineCache struct { lruCache lru.LRU[string, []int32] mapCache *cacheutil.Cache[string, []int32] - initDone chan struct{} + + Cond *sync.Cond + CurrentPhase initPhase } -func (o *OnlineCache) initUsersOnlineStatus(ctx context.Context) error { +func (o *OnlineCache) initUsersOnlineStatus(ctx context.Context) (err error) { log.ZDebug(ctx, "init users online status begin") defer func() { - close(o.initDone) + o.CurrentPhase = DoOnlineStatusOver + o.Cond.Broadcast() }() var ( totalSet atomic.Int64 maxTries = 5 retryInterval = time.Second * 5 - gr, _ = errgroup.WithContext(ctx) - ) - gr.SetLimit(10) - time.Sleep(time.Second * 10) + resp *user.GetAllOnlineUsersResp + ) defer func(t time.Time) { log.ZInfo(ctx, "init users online status end", "cost", time.Since(t), "totalSet", totalSet.Load()) }(time.Now()) - page := int32(1) - resp, err := o.user.GetAllUserID(ctx, page, constant.ParamMaxLength) - if err != nil { + retryOperation := func(operation func() error, operationName string) error { + for i := 0; i < maxTries; i++ { + if err = operation(); err != nil { + log.ZWarn(ctx, fmt.Sprintf("initUsersOnlineStatus: %s failed", operationName), err) + time.Sleep(retryInterval) + } else { + return nil + } + } return err } - for page = 2; (page-1)*constant.ParamMaxLength < resp.Total; page++ { - page := page - - gr.Go(func() error { - var ( - usersStatus []*user.OnlineStatus - resp *user.GetAllUserIDResp - err error - retryOperation = func(operation func() error, operationName string) error { - for i := 0; i < maxTries; i++ { - if err = operation(); err != nil { - log.ZWarn(ctx, fmt.Sprintf("initUsersOnlineStatus: %s failed", operationName), err, "page", page, "retries", i+1) - time.Sleep(retryInterval) - } else { - return nil - } - } - return err - } - ) - - if err = retryOperation(func() error { - resp, err = o.user.GetAllUserID(ctx, page, constant.ParamMaxLength) - return err - }, "GetAllUserID"); err != nil { - return err - } - if err = retryOperation(func() error { - usersStatus, err = o.user.GetUsersOnlinePlatform(ctx, resp.UserIDs) - return err - }, "GetUsersOnlinePlatform"); err != nil { + for resp == nil || resp.NextCursor != 0 { + if err = retryOperation(func() error { + resp, err = o.user.GetAllOnlineUsers(ctx, 0) + if err != nil { return err } - for _, u := range usersStatus { + for _, u := range resp.StatusList { if u.Status == constant.Online { o.setUserOnline(u.UserID, u.PlatformIDs) } totalSet.Add(1) } return nil - }) + }, "getAllOnlineUsers"); err != nil { + return err + } + } + + return nil +} +func (o *OnlineCache) doSubscribe(rdb redis.UniversalClient, fn func(ctx context.Context, userID string, platformIDs []int32)) { + ctx := mcontext.SetOperationID(context.Background(), cachekey.OnlineChannel+strconv.FormatUint(rand.Uint64(), 10)) + ch := rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() + for o.CurrentPhase < DoOnlineStatusOver { + o.Cond.Wait() } - if err = gr.Wait(); err != nil { - return err + + doMessage := func(message *redis.Message) { + userID, platformIDs, err := useronline.ParseUserOnlineStatus(message.Payload) + if err != nil { + log.ZError(ctx, "OnlineCache setHasUserOnline redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel) + return + } + + switch o.fullUserCache { + case true: + if len(platformIDs) == 0 { + // offline + o.mapCache.Delete(userID) + } else { + o.mapCache.Store(userID, platformIDs) + } + case false: + storageCache := o.setHasUserOnline(userID, platformIDs) + log.ZDebug(ctx, "OnlineCache setHasUserOnline", "userID", userID, "platformIDs", platformIDs, "payload", message.Payload, "storageCache", storageCache) + if fn != nil { + fn(ctx, userID, platformIDs) + } + } + } + + if o.CurrentPhase == DoOnlineStatusOver { + for done := false; !done; { + select { + case message := <-ch: + doMessage(message) + default: + o.CurrentPhase = DoSubscribeOver + o.Cond.Broadcast() + done = true + } + } + } + + for message := range ch { + doMessage(message) } - return nil } func (o *OnlineCache) getUserOnlinePlatform(ctx context.Context, userID string) ([]int32, error) { diff --git a/pkg/rpcclient/user.go b/pkg/rpcclient/user.go index a02873ac5..375cc993c 100644 --- a/pkg/rpcclient/user.go +++ b/pkg/rpcclient/user.go @@ -224,3 +224,7 @@ func (u *UserRpcClient) GetUserOnlinePlatform(ctx context.Context, userID string } return resp[0].PlatformIDs, nil } + +func (u *UserRpcClient) GetAllOnlineUsers(ctx context.Context, cursor uint64) (*user.GetAllOnlineUsersResp, error) { + return u.Client.GetAllOnlineUsers(ctx, &user.GetAllOnlineUsersReq{Cursor: cursor}) +}