diff --git a/config/local-cache.yml b/config/local-cache.yml index 06e211ebb..036dfaa17 100644 --- a/config/local-cache.yml +++ b/config/local-cache.yml @@ -1,3 +1,10 @@ +auth: + topic: DELETE_CACHE_AUTH + slotNum: 100 + slotSize: 2000 + successExpire: 300 + failedExpire: 5 + user: topic: DELETE_CACHE_USER slotNum: 100 diff --git a/go.mod b/go.mod index 775765706..7b45b0048 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.73-alpha.12 + github.com/openimsdk/protocol v0.0.73-alpha.14 github.com/openimsdk/tools v0.0.50-alpha.97 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index 329a916ec..354f80189 100644 --- a/go.sum +++ b/go.sum @@ -347,8 +347,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.15-alpha.11 h1:PQudYDRESYeYlUYrrLLJhYIlUPO5x7FAx+o5El9U/Bw= github.com/openimsdk/gomake v0.0.15-alpha.11/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce2WmwEtbP4JvKk= -github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= +github.com/openimsdk/protocol v0.0.73-alpha.14 h1:lv9wNiPRm6G7q74TfpMobKrSfeTaBlZ+Ps3O6UFPmaE= +github.com/openimsdk/protocol v0.0.73-alpha.14/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= github.com/openimsdk/tools v0.0.50-alpha.97 h1:6ik5w3PpgDG6VjSo3nb3FT/fxN3JX7iIARVxVu9g7VY= github.com/openimsdk/tools v0.0.50-alpha.97/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 7a8607164..a78a714ca 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -18,10 +18,13 @@ import ( "context" "errors" + "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/mcache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" "github.com/openimsdk/open-im-server/v3/pkg/dbbuild" + "github.com/openimsdk/open-im-server/v3/pkg/localcache" + "github.com/openimsdk/open-im-server/v3/pkg/rpccache" "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -46,6 +49,7 @@ import ( type authServer struct { pbauth.UnimplementedAuthServer authDatabase controller.AuthDatabase + AuthLocalCache *rpccache.AuthLocalCache RegisterCenter discovery.Conn config *Config userClient *rpcli.UserClient @@ -53,11 +57,12 @@ type authServer struct { } type Config struct { - RpcConfig config.Auth - RedisConfig config.Redis - MongoConfig config.Mongo - Share config.Share - Discovery config.Discovery + RpcConfig config.Auth + RedisConfig config.Redis + MongoConfig config.Mongo + Share config.Share + LocalCacheConfig config.LocalCache + Discovery config.Discovery } func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { @@ -78,12 +83,19 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg } token = mcache.NewTokenCacheModel(mc, config.RpcConfig.TokenPolicy.Expire) } else { - token = redis2.NewTokenCacheModel(rdb, config.RpcConfig.TokenPolicy.Expire) + token = redis2.NewTokenCacheModel(rdb, &config.LocalCacheConfig, config.RpcConfig.TokenPolicy.Expire) } userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) if err != nil { return err } + authConn, err := client.GetConn(ctx, config.Discovery.RpcService.Auth) + if err != nil { + return err + } + + localcache.InitLocalCache(&config.LocalCacheConfig) + pbauth.RegisterAuthServer(server, &authServer{ RegisterCenter: client, authDatabase: controller.NewAuthDatabase( @@ -93,9 +105,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg config.Share.MultiLogin, config.Share.IMAdminUser.UserIDs, ), - config: config, - userClient: rpcli.NewUserClient(userConn), - adminUserIDs: config.Share.IMAdminUser.UserIDs, + AuthLocalCache: rpccache.NewAuthLocalCache(rpcli.NewAuthClient(authConn), &config.LocalCacheConfig, rdb), + config: config, + userClient: rpcli.NewUserClient(userConn), + adminUserIDs: config.Share.IMAdminUser.UserIDs, }) return nil } @@ -121,6 +134,7 @@ func (s *authServer) GetAdminToken(ctx context.Context, req *pbauth.GetAdminToke } prommetrics.UserLoginCounter.Inc() + resp.Token = token resp.ExpireTimeSeconds = s.config.RpcConfig.TokenPolicy.Expire * 24 * 60 * 60 return &resp, nil @@ -151,20 +165,34 @@ func (s *authServer) GetUserToken(ctx context.Context, req *pbauth.GetUserTokenR if err != nil { return nil, err } + resp.Token = token resp.ExpireTimeSeconds = s.config.RpcConfig.TokenPolicy.Expire * 24 * 60 * 60 return &resp, nil } +func (s *authServer) GetExistingToken(ctx context.Context, req *pbauth.GetExistingTokenReq) (*pbauth.GetExistingTokenResp, error) { + m, err := s.authDatabase.GetTokensWithoutError(ctx, req.UserID, int(req.PlatformID)) + if err != nil { + return nil, err + } + + return &pbauth.GetExistingTokenResp{ + TokenStates: convert.TokenMapDB2Pb(m), + }, nil +} + func (s *authServer) parseToken(ctx context.Context, tokensString string) (claims *tokenverify.Claims, err error) { claims, err = tokenverify.GetClaimFromToken(tokensString, authverify.Secret(s.config.Share.Secret)) if err != nil { return nil, err } - m, err := s.authDatabase.GetTokensWithoutError(ctx, claims.UserID, claims.PlatformID) + + m, err := s.AuthLocalCache.GetExistingToken(ctx, claims.UserID, claims.PlatformID) if err != nil { return nil, err } + if len(m) == 0 { isAdmin := authverify.CheckUserIsAdmin(ctx, claims.UserID) if isAdmin { diff --git a/pkg/common/cmd/auth.go b/pkg/common/cmd/auth.go index 6e59b6dc6..d624e9dae 100644 --- a/pkg/common/cmd/auth.go +++ b/pkg/common/cmd/auth.go @@ -40,6 +40,7 @@ func NewAuthRpcCmd() *AuthRpcCmd { config.RedisConfigFileName: &authConfig.RedisConfig, config.MongodbConfigFileName: &authConfig.MongoConfig, config.ShareFileName: &authConfig.Share, + config.LocalCacheConfigFileName: &authConfig.LocalCacheConfig, config.DiscoveryConfigFilename: &authConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 63716838a..c5000a6e5 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -43,6 +43,7 @@ type CacheConfig struct { } type LocalCache struct { + Auth CacheConfig `yaml:"auth"` User CacheConfig `yaml:"user"` Group CacheConfig `yaml:"group"` Friend CacheConfig `yaml:"friend"` diff --git a/pkg/common/convert/auth.go b/pkg/common/convert/auth.go new file mode 100644 index 000000000..a0f00c8fe --- /dev/null +++ b/pkg/common/convert/auth.go @@ -0,0 +1,25 @@ +package convert + +func TokenMapDB2Pb(tokenMapDB map[string]int) map[string]int32 { + if tokenMapDB == nil { + return nil + } + + tokenMapPB := make(map[string]int32, len(tokenMapDB)) + for k, v := range tokenMapDB { + tokenMapPB[k] = int32(v) + } + return tokenMapPB +} + +func TokenMapPb2DB(tokenMapPB map[string]int32) map[string]int { + if tokenMapPB == nil { + return nil + } + + tokenMapDB := make(map[string]int, len(tokenMapPB)) + for k, v := range tokenMapPB { + tokenMapDB[k] = int(v) + } + return tokenMapDB +} \ No newline at end of file diff --git a/pkg/common/storage/cache/redis/token.go b/pkg/common/storage/cache/redis/token.go index c74ccce66..9ad4c319f 100644 --- a/pkg/common/storage/cache/redis/token.go +++ b/pkg/common/storage/cache/redis/token.go @@ -2,13 +2,16 @@ package redis import ( "context" + "encoding/json" "strconv" "sync" "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/utils/datautil" "github.com/redis/go-redis/v9" ) @@ -16,16 +19,26 @@ import ( type tokenCache struct { rdb redis.UniversalClient accessExpire time.Duration + localCache *config.LocalCache } -func NewTokenCacheModel(rdb redis.UniversalClient, accessExpire int64) cache.TokenModel { - c := &tokenCache{rdb: rdb} +func NewTokenCacheModel(rdb redis.UniversalClient, localCache *config.LocalCache, accessExpire int64) cache.TokenModel { + c := &tokenCache{rdb: rdb, localCache: localCache} c.accessExpire = c.getExpireTime(accessExpire) return c } func (c *tokenCache) SetTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { - return errs.Wrap(c.rdb.HSet(ctx, cachekey.GetTokenKey(userID, platformID), token, flag).Err()) + key := cachekey.GetTokenKey(userID, platformID) + if err := c.rdb.HSet(ctx, key, token, flag).Err(); err != nil { + return errs.Wrap(err) + } + + if c.localCache != nil { + c.removeLocalTokenCache(ctx, key) + } + + return nil } // SetTokenFlagEx set token and flag with expire time @@ -37,6 +50,11 @@ func (c *tokenCache) SetTokenFlagEx(ctx context.Context, userID string, platform if err := c.rdb.Expire(ctx, key, c.accessExpire).Err(); err != nil { return errs.Wrap(err) } + + if c.localCache != nil { + c.removeLocalTokenCache(ctx, key) + } + return nil } @@ -106,7 +124,17 @@ func (c *tokenCache) SetTokenMapByUidPid(ctx context.Context, userID string, pla for k, v := range m { mm[k] = v } - return errs.Wrap(c.rdb.HSet(ctx, cachekey.GetTokenKey(userID, platformID), mm).Err()) + + err := c.rdb.HSet(ctx, cachekey.GetTokenKey(userID, platformID), mm).Err() + if err != nil { + return errs.Wrap(err) + } + + if c.localCache != nil { + c.removeLocalTokenCache(ctx, cachekey.GetTokenKey(userID, platformID)) + } + + return nil } func (c *tokenCache) BatchSetTokenMapByUidPid(ctx context.Context, tokens map[string]map[string]any) error { @@ -124,11 +152,23 @@ func (c *tokenCache) BatchSetTokenMapByUidPid(ctx context.Context, tokens map[st }); err != nil { return err } + + if c.localCache != nil { + c.removeLocalTokenCache(ctx, keys...) + } return nil } func (c *tokenCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error { - return errs.Wrap(c.rdb.HDel(ctx, cachekey.GetTokenKey(userID, platformID), fields...).Err()) + key := cachekey.GetTokenKey(userID, platformID) + if err := c.rdb.HDel(ctx, key, fields...).Err(); err != nil { + return errs.Wrap(err) + } + + if c.localCache != nil { + c.removeLocalTokenCache(ctx, key) + } + return nil } func (c *tokenCache) getExpireTime(t int64) time.Duration { @@ -161,6 +201,11 @@ func (c *tokenCache) DeleteTokenByTokenMap(ctx context.Context, userID string, t return err } + // Remove local cache for the token + if c.localCache != nil { + c.removeLocalTokenCache(ctx, keys...) + } + return nil } @@ -175,5 +220,29 @@ func (c *tokenCache) DeleteAndSetTemporary(ctx context.Context, userID string, p if err := c.rdb.HDel(ctx, key, fields...).Err(); err != nil { return errs.Wrap(err) } + + if c.localCache != nil { + c.removeLocalTokenCache(ctx, key) + } return nil } + +func (c *tokenCache) removeLocalTokenCache(ctx context.Context, keys ...string) { + if len(keys) == 0 { + return + } + + topic := c.localCache.Auth.Topic + if topic == "" { + return + } + + data, err := json.Marshal(keys) + if err != nil { + log.ZWarn(ctx, "keys json marshal failed", err, "topic", topic, "keys", keys) + } else { + if err := c.rdb.Publish(ctx, topic, string(data)).Err(); err != nil { + log.ZWarn(ctx, "redis publish cache delete error", err, "topic", topic, "keys", keys) + } + } +} diff --git a/pkg/common/storage/controller/group.go b/pkg/common/storage/controller/group.go index 037ab1c39..24209cd6c 100644 --- a/pkg/common/storage/controller/group.go +++ b/pkg/common/storage/controller/group.go @@ -194,7 +194,6 @@ func (g *groupDatabase) CreateGroup(ctx context.Context, groups []*model.Group, } for _, group := range groups { c = c.DelGroupsInfo(group.GroupID). - DelGroupMembersHash(group.GroupID). DelGroupMembersHash(group.GroupID). DelGroupsMemberNum(group.GroupID). DelGroupMemberIDs(group.GroupID). diff --git a/pkg/localcache/cache.go b/pkg/localcache/cache.go index 92695c05d..07d36cf46 100644 --- a/pkg/localcache/cache.go +++ b/pkg/localcache/cache.go @@ -47,15 +47,15 @@ func New[V any](opts ...Option) Cache[V] { if opt.localSlotNum > 0 && opt.localSlotSize > 0 { createSimpleLRU := func() lru.LRU[string, V] { if opt.expirationEvict { - return lru.NewExpirationLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) + return lru.NewExpirationLRU(opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) } else { - return lru.NewLazyLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) + return lru.NewLazyLRU(opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) } } if opt.localSlotNum == 1 { c.local = createSimpleLRU() } else { - c.local = lru.NewSlotLRU[string, V](opt.localSlotNum, LRUStringHash, createSimpleLRU) + c.local = lru.NewSlotLRU(opt.localSlotNum, LRUStringHash, createSimpleLRU) } if opt.linkSlotNum > 0 { c.link = link.New(opt.linkSlotNum) @@ -71,6 +71,8 @@ type cache[V any] struct { } func (c *cache[V]) onEvict(key string, value V) { + _ = value + if c.link != nil { lks := c.link.Del(key) for k := range lks { diff --git a/pkg/localcache/init.go b/pkg/localcache/init.go index d1c16f675..ad339da7c 100644 --- a/pkg/localcache/init.go +++ b/pkg/localcache/init.go @@ -15,10 +15,11 @@ package localcache import ( - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "strings" "sync" + + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" ) var ( @@ -32,6 +33,10 @@ func InitLocalCache(localCache *config.LocalCache) { Local config.CacheConfig Keys []string }{ + { + Local: localCache.Auth, + Keys: []string{cachekey.UidPidToken}, + }, { Local: localCache.User, Keys: []string{cachekey.UserInfoKey, cachekey.UserGlobalRecvMsgOptKey}, diff --git a/pkg/rpccache/auth.go b/pkg/rpccache/auth.go new file mode 100644 index 000000000..dd8c18627 --- /dev/null +++ b/pkg/rpccache/auth.go @@ -0,0 +1,69 @@ +package rpccache + +import ( + "context" + "time" + + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/convert" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/localcache" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + "github.com/openimsdk/protocol/auth" + "github.com/openimsdk/tools/log" + "github.com/redis/go-redis/v9" +) + +func NewAuthLocalCache(client *rpcli.AuthClient, localCache *config.LocalCache, cli redis.UniversalClient) *AuthLocalCache { + lc := localCache.Auth + log.ZDebug(context.Background(), "AuthLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable()) + x := &AuthLocalCache{ + client: client, + local: localcache.New[[]byte]( + localcache.WithLocalSlotNum(lc.SlotNum), + localcache.WithLocalSlotSize(lc.SlotSize), + localcache.WithLinkSlotNum(lc.SlotNum), + localcache.WithLocalSuccessTTL(lc.Success()), + localcache.WithLocalFailedTTL(lc.Failed()), + ), + } + if lc.Enable() { + go subscriberRedisDeleteCache(context.Background(), cli, lc.Topic, x.local.DelLocal) + } + return x +} + +type AuthLocalCache struct { + client *rpcli.AuthClient + local localcache.Cache[[]byte] +} + +func (a *AuthLocalCache) GetExistingToken(ctx context.Context, userID string, platformID int) (val map[string]int, err error) { + resp, err := a.getExistingToken(ctx, userID, platformID) + if err != nil { + return nil, err + } + + res := convert.TokenMapPb2DB(resp.TokenStates) + + return res, nil +} + +func (a *AuthLocalCache) getExistingToken(ctx context.Context, userID string, platformID int) (val *auth.GetExistingTokenResp, err error) { + start := time.Now() + log.ZDebug(ctx, "AuthLocalCache GetExistingToken req", "userID", userID, "platformID", platformID) + defer func() { + if err != nil { + log.ZError(ctx, "AuthLocalCache GetExistingToken error", err, "cost", time.Since(start), "userID", userID, "platformID", platformID) + } else { + log.ZDebug(ctx, "AuthLocalCache GetExistingToken resp", "cost", time.Since(start), "userID", userID, "platformID", platformID, "val", val) + } + }() + + var cache cacheProto[auth.GetExistingTokenResp] + + return cache.Unmarshal(a.local.Get(ctx, cachekey.GetTokenKey(userID, platformID), func(ctx context.Context) ([]byte, error) { + log.ZDebug(ctx, "AuthLocalCache GetExistingToken call rpc", "userID", userID, "platformID", platformID) + return cache.Marshal(a.client.AuthClient.GetExistingToken(ctx, &auth.GetExistingTokenReq{UserID: userID, PlatformID: int32(platformID)})) + })) +}