From 721a89cdfaf959dd5f18ea5340df545acbfd397d Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Wed, 20 Aug 2025 16:12:02 +0800 Subject: [PATCH] feat: improve auth localcache. --- internal/rpc/auth/auth.go | 8 +-- pkg/common/storage/cache/redis/token.go | 79 +++++++++++++++++++++++-- pkg/common/storage/controller/group.go | 1 - pkg/rpccache/auth.go | 12 ++-- 4 files changed, 79 insertions(+), 21 deletions(-) diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 381a00a63..a78a714ca 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -83,7 +83,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg } token = mcache.NewTokenCacheModel(mc, config.RpcConfig.TokenPolicy.Expire) } else { - token = redis2.NewTokenCacheModel(rdb, config.RpcConfig.TokenPolicy.Expire) + token = redis2.NewTokenCacheModel(rdb, &config.LocalCacheConfig, config.RpcConfig.TokenPolicy.Expire) } userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) if err != nil { @@ -135,9 +135,6 @@ func (s *authServer) GetAdminToken(ctx context.Context, req *pbauth.GetAdminToke prommetrics.UserLoginCounter.Inc() - // Remove local cache for the token - s.AuthLocalCache.RemoveLocalTokenCache(ctx, req.UserID, int(constant.AdminPlatformID)) - resp.Token = token resp.ExpireTimeSeconds = s.config.RpcConfig.TokenPolicy.Expire * 24 * 60 * 60 return &resp, nil @@ -169,9 +166,6 @@ func (s *authServer) GetUserToken(ctx context.Context, req *pbauth.GetUserTokenR return nil, err } - // Remove local cache for the token - s.AuthLocalCache.RemoveLocalTokenCache(ctx, req.UserID, int(req.PlatformID)) - resp.Token = token resp.ExpireTimeSeconds = s.config.RpcConfig.TokenPolicy.Expire * 24 * 60 * 60 return &resp, nil diff --git a/pkg/common/storage/cache/redis/token.go b/pkg/common/storage/cache/redis/token.go index c74ccce66..9ad4c319f 100644 --- a/pkg/common/storage/cache/redis/token.go +++ b/pkg/common/storage/cache/redis/token.go @@ -2,13 +2,16 @@ package redis import ( "context" + "encoding/json" "strconv" "sync" "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/utils/datautil" "github.com/redis/go-redis/v9" ) @@ -16,16 +19,26 @@ import ( type tokenCache struct { rdb redis.UniversalClient accessExpire time.Duration + localCache *config.LocalCache } -func NewTokenCacheModel(rdb redis.UniversalClient, accessExpire int64) cache.TokenModel { - c := &tokenCache{rdb: rdb} +func NewTokenCacheModel(rdb redis.UniversalClient, localCache *config.LocalCache, accessExpire int64) cache.TokenModel { + c := &tokenCache{rdb: rdb, localCache: localCache} c.accessExpire = c.getExpireTime(accessExpire) return c } func (c *tokenCache) SetTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { - return errs.Wrap(c.rdb.HSet(ctx, cachekey.GetTokenKey(userID, platformID), token, flag).Err()) + key := cachekey.GetTokenKey(userID, platformID) + if err := c.rdb.HSet(ctx, key, token, flag).Err(); err != nil { + return errs.Wrap(err) + } + + if c.localCache != nil { + c.removeLocalTokenCache(ctx, key) + } + + return nil } // SetTokenFlagEx set token and flag with expire time @@ -37,6 +50,11 @@ func (c *tokenCache) SetTokenFlagEx(ctx context.Context, userID string, platform if err := c.rdb.Expire(ctx, key, c.accessExpire).Err(); err != nil { return errs.Wrap(err) } + + if c.localCache != nil { + c.removeLocalTokenCache(ctx, key) + } + return nil } @@ -106,7 +124,17 @@ func (c *tokenCache) SetTokenMapByUidPid(ctx context.Context, userID string, pla for k, v := range m { mm[k] = v } - return errs.Wrap(c.rdb.HSet(ctx, cachekey.GetTokenKey(userID, platformID), mm).Err()) + + err := c.rdb.HSet(ctx, cachekey.GetTokenKey(userID, platformID), mm).Err() + if err != nil { + return errs.Wrap(err) + } + + if c.localCache != nil { + c.removeLocalTokenCache(ctx, cachekey.GetTokenKey(userID, platformID)) + } + + return nil } func (c *tokenCache) BatchSetTokenMapByUidPid(ctx context.Context, tokens map[string]map[string]any) error { @@ -124,11 +152,23 @@ func (c *tokenCache) BatchSetTokenMapByUidPid(ctx context.Context, tokens map[st }); err != nil { return err } + + if c.localCache != nil { + c.removeLocalTokenCache(ctx, keys...) + } return nil } func (c *tokenCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error { - return errs.Wrap(c.rdb.HDel(ctx, cachekey.GetTokenKey(userID, platformID), fields...).Err()) + key := cachekey.GetTokenKey(userID, platformID) + if err := c.rdb.HDel(ctx, key, fields...).Err(); err != nil { + return errs.Wrap(err) + } + + if c.localCache != nil { + c.removeLocalTokenCache(ctx, key) + } + return nil } func (c *tokenCache) getExpireTime(t int64) time.Duration { @@ -161,6 +201,11 @@ func (c *tokenCache) DeleteTokenByTokenMap(ctx context.Context, userID string, t return err } + // Remove local cache for the token + if c.localCache != nil { + c.removeLocalTokenCache(ctx, keys...) + } + return nil } @@ -175,5 +220,29 @@ func (c *tokenCache) DeleteAndSetTemporary(ctx context.Context, userID string, p if err := c.rdb.HDel(ctx, key, fields...).Err(); err != nil { return errs.Wrap(err) } + + if c.localCache != nil { + c.removeLocalTokenCache(ctx, key) + } return nil } + +func (c *tokenCache) removeLocalTokenCache(ctx context.Context, keys ...string) { + if len(keys) == 0 { + return + } + + topic := c.localCache.Auth.Topic + if topic == "" { + return + } + + data, err := json.Marshal(keys) + if err != nil { + log.ZWarn(ctx, "keys json marshal failed", err, "topic", topic, "keys", keys) + } else { + if err := c.rdb.Publish(ctx, topic, string(data)).Err(); err != nil { + log.ZWarn(ctx, "redis publish cache delete error", err, "topic", topic, "keys", keys) + } + } +} diff --git a/pkg/common/storage/controller/group.go b/pkg/common/storage/controller/group.go index 037ab1c39..24209cd6c 100644 --- a/pkg/common/storage/controller/group.go +++ b/pkg/common/storage/controller/group.go @@ -194,7 +194,6 @@ func (g *groupDatabase) CreateGroup(ctx context.Context, groups []*model.Group, } for _, group := range groups { c = c.DelGroupsInfo(group.GroupID). - DelGroupMembersHash(group.GroupID). DelGroupMembersHash(group.GroupID). DelGroupsMemberNum(group.GroupID). DelGroupMemberIDs(group.GroupID). diff --git a/pkg/rpccache/auth.go b/pkg/rpccache/auth.go index 94e718d0c..dd8c18627 100644 --- a/pkg/rpccache/auth.go +++ b/pkg/rpccache/auth.go @@ -2,6 +2,7 @@ package rpccache import ( "context" + "time" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" @@ -49,12 +50,13 @@ func (a *AuthLocalCache) GetExistingToken(ctx context.Context, userID string, pl } func (a *AuthLocalCache) getExistingToken(ctx context.Context, userID string, platformID int) (val *auth.GetExistingTokenResp, err error) { + start := time.Now() log.ZDebug(ctx, "AuthLocalCache GetExistingToken req", "userID", userID, "platformID", platformID) defer func() { if err != nil { - log.ZError(ctx, "AuthLocalCache GetExistingToken error", err, "userID", userID, "platformID", platformID) + log.ZError(ctx, "AuthLocalCache GetExistingToken error", err, "cost", time.Since(start), "userID", userID, "platformID", platformID) } else { - log.ZDebug(ctx, "AuthLocalCache GetExistingToken resp", "userID", userID, "platformID", platformID, "val", val) + log.ZDebug(ctx, "AuthLocalCache GetExistingToken resp", "cost", time.Since(start), "userID", userID, "platformID", platformID, "val", val) } }() @@ -65,9 +67,3 @@ func (a *AuthLocalCache) getExistingToken(ctx context.Context, userID string, pl return cache.Marshal(a.client.AuthClient.GetExistingToken(ctx, &auth.GetExistingTokenReq{UserID: userID, PlatformID: int32(platformID)})) })) } - -func (a *AuthLocalCache) RemoveLocalTokenCache(ctx context.Context, userID string, platformID int) { - key := cachekey.GetTokenKey(userID, platformID) - a.local.DelLocal(ctx, key) - log.ZDebug(ctx, "AuthLocalCache RemoveLocalTokenCache", "userID", userID, "platformID", platformID, "key", key) -}