From 8920b487181eb61a7f5402d8aae5dba6366486d8 Mon Sep 17 00:00:00 2001 From: icey-yu <1186114839@qq.com> Date: Thu, 5 Sep 2024 17:39:05 +0800 Subject: [PATCH] feat: push err --- internal/msggateway/init.go | 2 +- internal/push/push_handler.go | 5 ++++- pkg/rpccache/online.go | 7 +++++-- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 58e626082..50da06097 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -58,7 +58,7 @@ func Start(ctx context.Context, index int, conf *Config) error { ) hubServer := NewServer(rpcPort, longServer, conf, func(srv *Server) error { - longServer.online = rpccache.NewOnlineCache(srv.userRcp, nil, rdb, false, longServer.subscriberUserOnlineStatusChanges) + longServer.online, _ = rpccache.NewOnlineCache(srv.userRcp, nil, rdb, false, longServer.subscriberUserOnlineStatusChanges) return nil }) diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 4b621361e..00f9c01fb 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -76,7 +76,10 @@ func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher, consumerHandler.conversationLocalCache = rpccache.NewConversationLocalCache(consumerHandler.conversationRpcClient, &config.LocalCacheConfig, rdb) consumerHandler.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL) consumerHandler.config = config - consumerHandler.onlineCache = rpccache.NewOnlineCache(userRpcClient, consumerHandler.groupLocalCache, rdb, config.RpcConfig.FullUserCache, nil) + consumerHandler.onlineCache, err = rpccache.NewOnlineCache(userRpcClient, consumerHandler.groupLocalCache, rdb, config.RpcConfig.FullUserCache, nil) + if err != nil { + return nil, err + } return &consumerHandler, nil } diff --git a/pkg/rpccache/online.go b/pkg/rpccache/online.go index 0b01b5e51..3b6eda4ce 100644 --- a/pkg/rpccache/online.go +++ b/pkg/rpccache/online.go @@ -17,7 +17,7 @@ import ( "time" ) -func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb redis.UniversalClient, fullUserCache bool, fn func(ctx context.Context, userID string, platformIDs []int32)) *OnlineCache { +func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb redis.UniversalClient, fullUserCache bool, fn func(ctx context.Context, userID string, platformIDs []int32)) (*OnlineCache, error) { x := &OnlineCache{ user: user, group: group, @@ -27,6 +27,9 @@ func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb re switch x.fullUserCache { case true: x.mapCache = cacheutil.NewCache[string, []int32]() + if err := x.initUsersOnlineStatus(context.TODO()); err != nil { + return nil, err + } case false: x.lruCache = lru.NewSlotLRU(1024, localcache.LRUStringHash, func() lru.LRU[string, []int32] { return lru.NewLayLRU[string, []int32](2048, cachekey.OnlineExpire/2, time.Second*3, localcache.EmptyTarget{}, func(key string, value []int32) {}) @@ -60,7 +63,7 @@ func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb re } }() - return x + return x, nil } type OnlineCache struct {