refactor: kafka update.

pull/2148/head
Gordon 1 year ago
parent 0267fec106
commit f80e8bab3c

@ -15,13 +15,12 @@
package main
import (
"github.com/openimsdk/open-im-server/v3/internal/rpc/auth"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/tools/system/program"
)
func main() {
if err := cmd.NewAuthRpcCmd(auth.Start).Exec(); err != nil {
if err := cmd.NewAuthRpcCmd().Exec(); err != nil {
program.ExitWithError(err)
}
}

@ -15,16 +15,12 @@
package main
import (
"github.com/openimsdk/open-im-server/v3/internal/rpc/conversation"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/tools/system/program"
)
func main() {
rpcCmd := cmd.NewRpcCmd(cmd.RpcConversationServer, conversation.Start)
rpcCmd.AddPortFlag()
rpcCmd.AddPrometheusPortFlag()
if err := rpcCmd.Exec(); err != nil {
if err := cmd.NewConversationRpcCmd().Exec(); err != nil {
program.ExitWithError(err)
}
}

@ -9,5 +9,8 @@ rpcRegisterName:
auth: Auth
conversation: Conversation
third: Third
imAdmin:
userID: [ "imAdmin" ]
nickname: [ "imAdmin" ]

@ -48,7 +48,7 @@ func Start(ctx context.Context, config *cmd.AuthConfig, client discovery.SvcDisc
if err != nil {
return err
}
userRpcClient := rpcclient.NewUserRpcClient(client, config.ZookeeperConfig.RpcRegisterName.User, &config.Manager, &config.IMAdmin)
userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, &config.Share.IMAdmin)
pbauth.RegisterAuthServer(server, &authServer{
userRpcClient: &userRpcClient,
RegisterCenter: client,
@ -64,7 +64,7 @@ func Start(ctx context.Context, config *cmd.AuthConfig, client discovery.SvcDisc
func (s *authServer) UserToken(ctx context.Context, req *pbauth.UserTokenReq) (*pbauth.UserTokenResp, error) {
resp := pbauth.UserTokenResp{}
if req.Secret != s.config.Secret {
if req.Secret != s.config.RpcConfig.Secret {
return nil, errs.ErrNoPermission.WrapMsg("secret invalid")
}
if _, err := s.userRpcClient.GetUserInfo(ctx, req.UserID); err != nil {
@ -76,17 +76,17 @@ func (s *authServer) UserToken(ctx context.Context, req *pbauth.UserTokenReq) (*
}
prommetrics.UserLoginCounter.Inc()
resp.Token = token
resp.ExpireTimeSeconds = s.config.TokenPolicy.Expire * 24 * 60 * 60
resp.ExpireTimeSeconds = s.config.RpcConfig.TokenPolicy.Expire * 24 * 60 * 60
return &resp, nil
}
func (s *authServer) GetUserToken(ctx context.Context, req *pbauth.GetUserTokenReq) (*pbauth.GetUserTokenResp, error) {
if err := authverify.CheckAdmin(ctx, &s.config.Manager, &s.config.IMAdmin); err != nil {
if err := authverify.CheckAdmin(ctx, &s.config.Share.IMAdmin); err != nil {
return nil, err
}
resp := pbauth.GetUserTokenResp{}
if authverify.IsManagerUserID(req.UserID, &s.config.Manager, &s.config.IMAdmin) {
if authverify.IsManagerUserID(req.UserID, &s.config.Share.IMAdmin) {
return nil, errs.ErrNoPermission.WrapMsg("don't get Admin token")
}
if _, err := s.userRpcClient.GetUserInfo(ctx, req.UserID); err != nil {
@ -97,12 +97,12 @@ func (s *authServer) GetUserToken(ctx context.Context, req *pbauth.GetUserTokenR
return nil, err
}
resp.Token = token
resp.ExpireTimeSeconds = s.config.TokenPolicy.Expire * 24 * 60 * 60
resp.ExpireTimeSeconds = s.config.RpcConfig.TokenPolicy.Expire * 24 * 60 * 60
return &resp, nil
}
func (s *authServer) parseToken(ctx context.Context, tokensString string) (claims *tokenverify.Claims, err error) {
claims, err = tokenverify.GetClaimFromToken(tokensString, authverify.Secret(s.config.Secret))
claims, err = tokenverify.GetClaimFromToken(tokensString, authverify.Secret(s.config.RpcConfig.Secret))
if err != nil {
return nil, errs.Wrap(err)
}
@ -142,7 +142,7 @@ func (s *authServer) ParseToken(
}
func (s *authServer) ForceLogout(ctx context.Context, req *pbauth.ForceLogoutReq) (*pbauth.ForceLogoutResp, error) {
if err := authverify.CheckAdmin(ctx, &s.config.Manager, &s.config.IMAdmin); err != nil {
if err := authverify.CheckAdmin(ctx, &s.config.Share.IMAdmin); err != nil {
return nil, err
}
if err := s.forceKickOff(ctx, req.UserID, req.PlatformID, mcontext.GetOperationID(ctx)); err != nil {
@ -152,7 +152,7 @@ func (s *authServer) ForceLogout(ctx context.Context, req *pbauth.ForceLogoutReq
}
func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID int32, operationID string) error {
conns, err := s.RegisterCenter.GetConns(ctx, s.config.RpcRegisterName.OpenImMessageGatewayName)
conns, err := s.RegisterCenter.GetConns(ctx, s.config.Share.RpcRegisterName.MessageGateway)
if err != nil {
return err
}

@ -16,10 +16,10 @@ package conversation
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/tools/db/redisutil"
"sort"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
@ -45,14 +45,15 @@ type conversationServer struct {
groupRpcClient *rpcclient.GroupRpcClient
conversationDatabase controller.ConversationDatabase
conversationNotificationSender *notification.ConversationNotificationSender
config *cmd.ConversationConfig
}
func Start(ctx context.Context, config *config.GlobalConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
mgocli, err := mongoutil.NewMongoDB(ctx, config.Mongo.Build())
func Start(ctx context.Context, config *cmd.ConversationConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
if err != nil {
return err
}
rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build())
rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
if err != nil {
return err
}
@ -60,13 +61,13 @@ func Start(ctx context.Context, config *config.GlobalConfig, client discovery.Sv
if err != nil {
return err
}
groupRpcClient := rpcclient.NewGroupRpcClient(client, config.RpcRegisterName.OpenImGroupName)
msgRpcClient := rpcclient.NewMessageRpcClient(client, config.RpcRegisterName.OpenImMsgName)
userRpcClient := rpcclient.NewUserRpcClient(client, config.RpcRegisterName.OpenImUserName, &config.Manager, &config.IMAdmin)
groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group)
msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg)
userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, &config.Share.IMAdmin)
pbconversation.RegisterConversationServer(server, &conversationServer{
msgRpcClient: &msgRpcClient,
user: &userRpcClient,
conversationNotificationSender: notification.NewConversationNotificationSender(&config.Notification, &msgRpcClient),
conversationNotificationSender: notification.NewConversationNotificationSender(&config.NotificationConfig, &msgRpcClient),
groupRpcClient: &groupRpcClient,
conversationDatabase: controller.NewConversationDatabase(conversationDB, cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), mgocli.GetTx()),
})

@ -32,11 +32,8 @@ func Secret(secret string) jwt.Keyfunc {
}
}
func CheckAccessV3(ctx context.Context, ownerUserID string, manager *config.Manager, imAdmin *config.IMAdmin) (err error) {
func CheckAccessV3(ctx context.Context, ownerUserID string, imAdmin *config.IMAdmin) (err error) {
opUserID := mcontext.GetOpUserID(ctx)
if len(manager.UserID) > 0 && datautil.Contain(opUserID, manager.UserID...) {
return nil
}
if datautil.Contain(opUserID, imAdmin.UserID...) {
return nil
}
@ -46,15 +43,12 @@ func CheckAccessV3(ctx context.Context, ownerUserID string, manager *config.Mana
return servererrs.ErrNoPermission.WrapMsg("ownerUserID", ownerUserID)
}
func IsAppManagerUid(ctx context.Context, manager *config.Manager, imAdmin *config.IMAdmin) bool {
return (len(manager.UserID) > 0 && datautil.Contain(mcontext.GetOpUserID(ctx), manager.UserID...)) ||
datautil.Contain(mcontext.GetOpUserID(ctx), imAdmin.UserID...)
func IsAppManagerUid(ctx context.Context, imAdmin *config.IMAdmin) bool {
return datautil.Contain(mcontext.GetOpUserID(ctx), imAdmin.UserID...)
}
func CheckAdmin(ctx context.Context, manager *config.Manager, imAdmin *config.IMAdmin) error {
if len(manager.UserID) > 0 && datautil.Contain(mcontext.GetOpUserID(ctx), manager.UserID...) {
return nil
}
func CheckAdmin(ctx context.Context, imAdmin *config.IMAdmin) error {
if datautil.Contain(mcontext.GetOpUserID(ctx), imAdmin.UserID...) {
return nil
}
@ -65,9 +59,6 @@ func CheckIMAdmin(ctx context.Context, config *config.GlobalConfig) error {
if datautil.Contain(mcontext.GetOpUserID(ctx), config.IMAdmin.UserID...) {
return nil
}
if len(config.Manager.UserID) > 0 && datautil.Contain(mcontext.GetOpUserID(ctx), config.Manager.UserID...) {
return nil
}
return servererrs.ErrNoPermission.WrapMsg(fmt.Sprintf("user %s is not CheckIMAdmin userID", mcontext.GetOpUserID(ctx)))
}
@ -75,8 +66,8 @@ func ParseRedisInterfaceToken(redisToken any, secret string) (*tokenverify.Claim
return tokenverify.GetClaimFromToken(string(redisToken.([]uint8)), Secret(secret))
}
func IsManagerUserID(opUserID string, manager *config.Manager, imAdmin *config.IMAdmin) bool {
return (len(manager.UserID) > 0 && datautil.Contain(opUserID, manager.UserID...)) || datautil.Contain(opUserID, imAdmin.UserID...)
func IsManagerUserID(opUserID string, imAdmin *config.IMAdmin) bool {
return datautil.Contain(opUserID, imAdmin.UserID...)
}
func WsVerifyToken(token, userID, secret string, platformID int) error {

@ -16,18 +16,15 @@ package cmd
import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/rpc/auth"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/system/program"
"google.golang.org/grpc"
"github.com/spf13/cobra"
)
type rpcInitFuc func(ctx context.Context, config *AuthConfig, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error
type AuthRpcCmd struct {
*RootCmd
initFunc rpcInitFuc
ctx context.Context
configMap map[string]StructEnvPrefix
authConfig AuthConfig
@ -36,19 +33,24 @@ type AuthConfig struct {
RpcConfig config2.Auth
RedisConfig config2.Redis
ZookeeperConfig config2.ZooKeeper
Share config2.Share
}
func NewAuthRpcCmd(initFunc rpcInitFuc) *AuthRpcCmd {
func NewAuthRpcCmd() *AuthRpcCmd {
var authConfig AuthConfig
ret := &AuthRpcCmd{initFunc: initFunc, authConfig: authConfig}
ret := &AuthRpcCmd{authConfig: authConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMRPCAuthCfgFileName: {EnvPrefix: authEnvPrefix, ConfigStruct: &authConfig.RpcConfig},
RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &authConfig.RedisConfig},
ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &authConfig.ZookeeperConfig},
ShareFileName: {EnvPrefix: shareEnvPrefix, ConfigStruct: &authConfig.Share},
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config2.Version)
ret.RunE()
ret.Command.PreRunE = func(cmd *cobra.Command, args []string) error {
return ret.preRunE()
}
return ret
}
@ -56,8 +58,8 @@ func (a *AuthRpcCmd) Exec() error {
return a.Execute()
}
func (a *AuthRpcCmd) RunE() error {
func (a *AuthRpcCmd) preRunE() error {
return startrpc.Start(a.ctx, &a.authConfig.ZookeeperConfig, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP,
a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.Ports,
a.Index(), a.authConfig.ZookeeperConfig.RpcRegisterName.Auth, &a.authConfig, a.initFunc)
a.Index(), a.authConfig.Share.RpcRegisterName.Auth, &a.authConfig, auth.Start)
}

@ -31,6 +31,7 @@ const (
const (
FileName = "config.yaml"
NotificationFileName = "notification.yaml"
ShareFileName = "share.yaml"
KafkaConfigFileName = "kafka.yml"
RedisConfigFileName = "redis.yml"
WebhooksConfigFileName = "webhooks.yml"
@ -53,11 +54,14 @@ const (
)
const (
logEnvPrefix = "openim-log"
redisEnvPrefix = "openim-redis"
mongodbEnvPrefix = "openim-mongodb"
zoopkeeperEnvPrefix = "openim-zookeeper"
authEnvPrefix = "openim-auth"
notificationEnvPrefix = "openim-notification"
shareEnvPrefix = "openim-share"
logEnvPrefix = "openim-log"
redisEnvPrefix = "openim-redis"
mongodbEnvPrefix = "openim-mongodb"
zoopkeeperEnvPrefix = "openim-zookeeper"
authEnvPrefix = "openim-auth"
conversationEnvPrefix = "openim-conversation"
)
const (

@ -0,0 +1,68 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cmd
import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/rpc/conversation"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
)
type ConversationRpcCmd struct {
*RootCmd
ctx context.Context
configMap map[string]StructEnvPrefix
conversationConfig ConversationConfig
}
type ConversationConfig struct {
RpcConfig config2.Conversation
RedisConfig config2.Redis
MongodbConfig config2.Mongo
ZookeeperConfig config2.ZooKeeper
NotificationConfig config2.Notification
Share config2.Share
}
func NewConversationRpcCmd() *ConversationRpcCmd {
var conversationConfig ConversationConfig
ret := &ConversationRpcCmd{conversationConfig: conversationConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMRPCConversationCfgFileName: {EnvPrefix: conversationEnvPrefix, ConfigStruct: &conversationConfig.RpcConfig},
RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &conversationConfig.RedisConfig},
ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &conversationConfig.ZookeeperConfig},
MongodbConfigFileName: {EnvPrefix: mongodbEnvPrefix, ConfigStruct: &conversationConfig.MongodbConfig},
ShareFileName: {EnvPrefix: shareEnvPrefix, ConfigStruct: &conversationConfig.Share},
NotificationFileName: {EnvPrefix: notificationEnvPrefix, ConfigStruct: &conversationConfig.NotificationConfig},
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config2.Version)
ret.Command.PreRunE = func(cmd *cobra.Command, args []string) error {
return ret.preRunE()
}
return ret
}
func (a *ConversationRpcCmd) Exec() error {
return a.Execute()
}
func (a *ConversationRpcCmd) preRunE() error {
return startrpc.Start(a.ctx, &a.conversationConfig.ZookeeperConfig, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP,
a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.Ports,
a.Index(), a.conversationConfig.Share.RpcRegisterName.Auth, &a.conversationConfig, conversation.Start)
}

@ -29,7 +29,7 @@ type RootCmd struct {
processName string
port int
prometheusPort int
log *config2.Log
log config2.Log
index int
}
@ -100,6 +100,8 @@ func (r *RootCmd) initializeConfiguration(cmd *cobra.Command, opts *CmdOpts) err
if err != nil {
return err
}
// Load common configuration file
//opts.configMap[ShareFileName] = StructEnvPrefix{EnvPrefix: shareEnvPrefix, ConfigStruct: &r.share}
for configFileName, structEnvPrefix := range opts.configMap {
err := config2.LoadConfig(filepath.Join(configDirectory, configFileName),
structEnvPrefix.EnvPrefix, structEnvPrefix.ConfigStruct)
@ -107,8 +109,9 @@ func (r *RootCmd) initializeConfiguration(cmd *cobra.Command, opts *CmdOpts) err
return err
}
}
// Load common log configuration file
return config2.LoadConfig(filepath.Join(configDirectory, LogConfigFileName),
logEnvPrefix, r.log)
logEnvPrefix, &r.log)
}
func (r *RootCmd) applyOptions(opts ...func(*CmdOpts)) *CmdOpts {

@ -336,6 +336,11 @@ type Share struct {
Conversation string `mapstructure:"conversation"`
Third string `mapstructure:"third"`
} `mapstructure:"rpcRegisterName"`
IMAdmin IMAdmin `mapstructure:"imAdmin"`
}
type IMAdmin struct {
UserID []string `mapstructure:"userID"`
Nickname []string `mapstructure:"nickname"`
}
type Webhooks struct {

@ -35,13 +35,12 @@ type User struct {
Client user.UserClient
Discov discovery.SvcDiscoveryRegistry
MessageGateWayRpcName string
manager *config.Manager
imAdmin *config.IMAdmin
}
// NewUser initializes and returns a User instance based on the provided service discovery registry.
func NewUser(discov discovery.SvcDiscoveryRegistry, rpcRegisterName, messageGateWayRpcName string,
manager *config.Manager, imAdmin *config.IMAdmin) *User {
imAdmin *config.IMAdmin) *User {
conn, err := discov.GetConn(context.Background(), rpcRegisterName)
if err != nil {
program.ExitWithError(err)
@ -50,7 +49,6 @@ func NewUser(discov discovery.SvcDiscoveryRegistry, rpcRegisterName, messageGate
return &User{Discov: discov, Client: client,
conn: conn,
MessageGateWayRpcName: messageGateWayRpcName,
manager: manager,
imAdmin: imAdmin}
}
@ -65,8 +63,8 @@ func NewUserRpcClientByUser(user *User) *UserRpcClient {
// NewUserRpcClient initializes a UserRpcClient based on the provided service discovery registry.
func NewUserRpcClient(client discovery.SvcDiscoveryRegistry, rpcRegisterName string,
manager *config.Manager, imAdmin *config.IMAdmin) UserRpcClient {
return UserRpcClient(*NewUser(client, rpcRegisterName, "", manager, imAdmin))
imAdmin *config.IMAdmin) UserRpcClient {
return UserRpcClient(*NewUser(client, rpcRegisterName, "", imAdmin))
}
// GetUsersInfo retrieves information for multiple users based on their user IDs.
@ -169,7 +167,7 @@ func (u *UserRpcClient) Access(ctx context.Context, ownerUserID string) error {
if err != nil {
return err
}
return authverify.CheckAccessV3(ctx, ownerUserID, u.manager, u.imAdmin)
return authverify.CheckAccessV3(ctx, ownerUserID, u.imAdmin)
}
// GetAllUserIDs retrieves all user IDs with pagination options.

Loading…
Cancel
Save