refactor: api update

pull/2148/head
Gordon 1 year ago
parent 8c10e4f532
commit 9b3aedc795

@ -22,10 +22,7 @@ import (
)
func main() {
apiCmd := cmd.NewApiCmd(cmd.ApiServer)
//apiCmd.AddPortFlag()
//apiCmd.AddPrometheusPortFlag()
if err := apiCmd.Execute(); err != nil {
if err := cmd.NewApiCmd().Exec(); err != nil {
program.ExitWithError(err)
}
}

@ -20,8 +20,7 @@ import (
)
func main() {
cronTaskCmd := cmd.NewCronTaskCmd(cmd.CronTaskServer)
if err := cronTaskCmd.Exec(); err != nil {
if err := cmd.NewCronTaskCmd().Exec(); err != nil {
program.ExitWithError(err)
}
}

@ -40,15 +40,13 @@ type MessageApi struct {
*rpcclient.Message
validate *validator.Validate
userRpcClient *rpcclient.UserRpcClient
manager *config.Manager
imAdmin *config.IMAdmin
}
func NewMessageApi(msgRpcClient *rpcclient.Message, userRpcClient *rpcclient.User, manager *config.Manager,
func NewMessageApi(msgRpcClient *rpcclient.Message, userRpcClient *rpcclient.User,
imAdmin *config.IMAdmin) MessageApi {
return MessageApi{Message: msgRpcClient, validate: validator.New(),
userRpcClient: rpcclient.NewUserRpcClientByUser(userRpcClient),
manager: manager, imAdmin: imAdmin}
userRpcClient: rpcclient.NewUserRpcClientByUser(userRpcClient), imAdmin: imAdmin}
}
func (MessageApi) SetOptions(options map[string]bool, value bool) {
@ -206,7 +204,7 @@ func (m *MessageApi) SendMessage(c *gin.Context) {
}
// Check if the user has the app manager role.
if !authverify.IsAppManagerUid(c, m.manager, m.imAdmin) {
if !authverify.IsAppManagerUid(c, m.imAdmin) {
// Respond with a permission error if the user is not an app manager.
apiresp.GinError(c, errs.ErrNoPermission.WrapMsg("only app manager can send message"))
return
@ -261,7 +259,7 @@ func (m *MessageApi) SendBusinessNotification(c *gin.Context) {
return
}
if !authverify.IsAppManagerUid(c, m.manager, m.imAdmin) {
if !authverify.IsAppManagerUid(c, m.imAdmin) {
apiresp.GinError(c, errs.ErrNoPermission.WrapMsg("only app manager can send message"))
return
}
@ -280,7 +278,7 @@ func (m *MessageApi) SendBusinessNotification(c *gin.Context) {
SessionType: constant.SingleChatType,
CreateTime: timeutil.GetCurrentTimestampByMill(),
ClientMsgID: idutil.GetMsgIDByMD5(mcontext.GetOpUserID(c)),
Options: config.GetOptionsByNotification(config.NotificationConf{
Options: config.GetOptionsByNotification(config.NotificationConfig{
IsSendMsg: false,
ReliabilityLevel: 1,
UnreadCount: false,
@ -304,7 +302,7 @@ func (m *MessageApi) BatchSendMsg(c *gin.Context) {
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
return
}
if err := authverify.CheckAdmin(c, m.manager, m.imAdmin); err != nil {
if err := authverify.CheckAdmin(c, m.imAdmin); err != nil {
apiresp.GinError(c, errs.ErrNoPermission.WrapMsg("only app manager can send message"))
return
}

@ -17,7 +17,10 @@ package api
import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/network"
"net"
"net/http"
"os"
@ -51,11 +54,16 @@ import (
"google.golang.org/grpc/credentials/insecure"
)
func Start(ctx context.Context, config *config.GlobalConfig, port int, proPort int) error {
if port == 0 || proPort == 0 {
return errs.New("port or proPort is empty", "port", port, "proPort", proPort).Wrap()
func Start(ctx context.Context, index int, config *cmd.ApiConfig) error {
apiPort, err := datautil.GetElemByIndex(config.RpcConfig.Api.Ports, index)
if err != nil {
return err
}
prometheusPort, err := datautil.GetElemByIndex(config.RpcConfig.Prometheus.Ports, index)
if err != nil {
return err
}
rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build())
rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
if err != nil {
return err
}
@ -63,46 +71,36 @@ func Start(ctx context.Context, config *config.GlobalConfig, port int, proPort i
var client discovery.SvcDiscoveryRegistry
// Determine whether zk is passed according to whether it is a clustered deployment
client, err = kdisc.NewDiscoveryRegister(config)
client, err = kdisc.NewDiscoveryRegister(&config.ZookeeperConfig)
if err != nil {
return errs.WrapMsg(err, "failed to register discovery service")
}
if err = client.CreateRpcRootNodes(config.GetServiceNames()); err != nil {
if err = client.CreateRpcRootNodes(config.Share.RpcRegisterName.GetServiceNames()); err != nil {
return errs.WrapMsg(err, "failed to create RPC root nodes")
}
if err = client.RegisterConf2Registry(constant.OpenIMCommonConfigKey, config.EncodeConfig()); err != nil {
return errs.WrapMsg(err, "failed to register configuration to registry")
}
var (
netDone = make(chan struct{}, 1)
netErr error
)
router := newGinRouter(client, rdb, config)
if config.Prometheus.Enable {
if config.RpcConfig.Prometheus.Enable {
go func() {
p := ginprom.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api"))
p.SetListenAddress(fmt.Sprintf(":%d", proPort))
p.SetListenAddress(fmt.Sprintf(":%d", prometheusPort))
if err = p.Use(router); err != nil && err != http.ErrServerClosed {
netErr = errs.WrapMsg(err, fmt.Sprintf("prometheus start err: %d", proPort))
netErr = errs.WrapMsg(err, fmt.Sprintf("prometheus start err: %d", prometheusPort))
netDone <- struct{}{}
}
}()
}
var address string
if config.Api.ListenIP != "" {
address = net.JoinHostPort(config.Api.ListenIP, strconv.Itoa(port))
} else {
address = net.JoinHostPort("0.0.0.0", strconv.Itoa(port))
}
address := net.JoinHostPort(network.GetListenIP(config.RpcConfig.Api.ListenIP), strconv.Itoa(apiPort))
server := http.Server{Addr: address, Handler: router}
log.CInfo(ctx, "API server is initializing", "address", address, "apiPort", port, "prometheusPort", proPort)
log.CInfo(ctx, "API server is initializing", "address", address, "apiPort", apiPort, "prometheusPort", prometheusPort)
go func() {
err = server.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
@ -131,8 +129,9 @@ func Start(ctx context.Context, config *config.GlobalConfig, port int, proPort i
return nil
}
func newGinRouter(disCov discovery.SvcDiscoveryRegistry, rdb redis.UniversalClient, config *config.GlobalConfig) *gin.Engine {
disCov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
func newGinRouter(disCov discovery.SvcDiscoveryRegistry, rdb redis.UniversalClient, config *cmd.ApiConfig) *gin.Engine {
disCov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
gin.SetMode(gin.ReleaseMode)
r := gin.New()
if v, ok := binding.Validator.Engine().(*validator.Validate); ok {
@ -140,17 +139,17 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, rdb redis.UniversalClie
}
r.Use(gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID())
// init rpc client here
userRpc := rpcclient.NewUser(disCov, config.RpcRegisterName.OpenImUserName, config.RpcRegisterName.OpenImMessageGatewayName,
&config.Manager, &config.IMAdmin)
groupRpc := rpcclient.NewGroup(disCov, config.RpcRegisterName.OpenImGroupName)
friendRpc := rpcclient.NewFriend(disCov, config.RpcRegisterName.OpenImFriendName)
messageRpc := rpcclient.NewMessage(disCov, config.RpcRegisterName.OpenImMsgName)
conversationRpc := rpcclient.NewConversation(disCov, config.RpcRegisterName.OpenImConversationName)
authRpc := rpcclient.NewAuth(disCov, config.RpcRegisterName.OpenImAuthName)
thirdRpc := rpcclient.NewThird(disCov, config.RpcRegisterName.OpenImThirdName, config.Prometheus.GrafanaUrl)
userRpc := rpcclient.NewUser(disCov, config.Share.RpcRegisterName.User, config.Share.RpcRegisterName.MessageGateway,
&config.Share.IMAdmin)
groupRpc := rpcclient.NewGroup(disCov, config.Share.RpcRegisterName.Group)
friendRpc := rpcclient.NewFriend(disCov, config.Share.RpcRegisterName.Friend)
messageRpc := rpcclient.NewMessage(disCov, config.Share.RpcRegisterName.Msg)
conversationRpc := rpcclient.NewConversation(disCov, config.Share.RpcRegisterName.Conversation)
authRpc := rpcclient.NewAuth(disCov, config.Share.RpcRegisterName.Auth)
thirdRpc := rpcclient.NewThird(disCov, config.Share.RpcRegisterName.Third, config.RpcConfig.Prometheus.GrafanaURL)
u := NewUserApi(*userRpc)
m := NewMessageApi(messageRpc, userRpc, &config.Manager, &config.IMAdmin)
m := NewMessageApi(messageRpc, userRpc, &config.Share.IMAdmin)
ParseToken := GinParseToken(rdb, config)
userRouterGroup := r.Group("/user")
{

@ -16,46 +16,49 @@ package cmd
import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/api"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
)
type ApiCmd struct {
*RootCmd
ctx context.Context
ctx context.Context
configMap map[string]StructEnvPrefix
apiConfig ApiConfig
}
func NewApiCmd(name string) *ApiCmd {
ret := &ApiCmd{RootCmd: NewRootCmd(program.GetProcessName(), name)}
ret.ctx = context.WithValue(context.Background(), "version", config2.Version)
ret.SetRootCmdPt(ret)
ret.addPreRun()
ret.addRunE()
return ret
type ApiConfig struct {
RpcConfig config.API
RedisConfig config.Redis
MongodbConfig config.Mongo
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
MinioConfig config.Minio
}
func (a *ApiCmd) addPreRun() {
a.Command.PreRun = func(cmd *cobra.Command, args []string) {
a.port = a.getPortFlag(cmd)
a.prometheusPort = a.getPrometheusPortFlag(cmd)
func NewApiCmd() *ApiCmd {
var apiConfig ApiConfig
ret := &ApiCmd{apiConfig: apiConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMAPICfgFileName: {EnvPrefix: apiEnvPrefix, ConfigStruct: &apiConfig.RpcConfig},
RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &apiConfig.RedisConfig},
ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &apiConfig.ZookeeperConfig},
ShareFileName: {EnvPrefix: shareEnvPrefix, ConfigStruct: &apiConfig.Share},
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
ret.Command.PreRunE = func(cmd *cobra.Command, args []string) error {
return ret.preRunE()
}
return ret
}
func (a *ApiCmd) addRunE() {
a.Command.RunE = func(cmd *cobra.Command, args []string) error {
return api.Start(a.ctx, a.config, a.port, a.prometheusPort)
}
func (a *ApiCmd) Exec() error {
return a.Execute()
}
func (a *ApiCmd) GetPortFromConfig(portType string) int {
if portType == constant.FlagPort {
return a.config.Api.OpenImApiPort[0]
} else if portType == constant.FlagPrometheusPort {
return a.config.Prometheus.ApiPrometheusPort[0]
}
return 0
func (a *ApiCmd) preRunE() error {
return api.Start(a.ctx, a.Index(), &a.apiConfig)
}

@ -63,6 +63,7 @@ const (
minioEnvPrefix = "openim-minio"
kafkaEnvPrefix = "openim-kafka"
zoopkeeperEnvPrefix = "openim-zookeeper"
apiEnvPrefix = "openim-api"
authEnvPrefix = "openim-auth"
conversationEnvPrefix = "openim-conversation"
friendEnvPrefix = "openim-friend"

@ -325,20 +325,36 @@ type WebhookConfig struct {
}
type Share struct {
Env string `mapstructure:"env"`
RpcRegisterName struct {
User string `mapstructure:"user"`
Friend string `mapstructure:"friend"`
Msg string `mapstructure:"msg"`
Push string `mapstructure:"push"`
MessageGateway string `mapstructure:"messageGateway"`
Group string `mapstructure:"group"`
Auth string `mapstructure:"auth"`
Conversation string `mapstructure:"conversation"`
Third string `mapstructure:"third"`
} `mapstructure:"rpcRegisterName"`
IMAdmin IMAdmin `mapstructure:"imAdmin"`
Env string `mapstructure:"env"`
RpcRegisterName RpcRegisterName `mapstructure:"rpcRegisterName"`
IMAdmin IMAdmin `mapstructure:"imAdmin"`
}
type RpcRegisterName struct {
User string `mapstructure:"user"`
Friend string `mapstructure:"friend"`
Msg string `mapstructure:"msg"`
Push string `mapstructure:"push"`
MessageGateway string `mapstructure:"messageGateway"`
Group string `mapstructure:"group"`
Auth string `mapstructure:"auth"`
Conversation string `mapstructure:"conversation"`
Third string `mapstructure:"third"`
}
func (r *RpcRegisterName) GetServiceNames() []string {
return []string{
r.User,
r.Friend,
r.Msg,
r.Push,
r.MessageGateway,
r.Group,
r.Auth,
r.Conversation,
r.Third,
}
}
type IMAdmin struct {
UserID []string `mapstructure:"userID"`
Nickname []string `mapstructure:"nickname"`

@ -16,14 +16,19 @@ package discoveryregister
import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/direct"
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes"
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/zookeeper"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/errs"
"time"
)
const (
zookeeper = "zoopkeeper"
kubenetes = "k8s"
direct = "direct"
)
// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
func NewDiscoveryRegister(zookeeperConfig *config.ZooKeeper) (discovery.SvcDiscoveryRegistry, error) {

@ -18,6 +18,7 @@ import (
"context"
"fmt"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/utils/datautil"
"github.com/prometheus/client_golang/prometheus"
"net"
"net/http"
@ -47,11 +48,11 @@ func Start[T any](ctx context.Context, zookeeperConfig *config2.ZooKeeper, prome
registerIP string, rpcPorts []int, index int, rpcRegisterName string, config T, rpcFn func(ctx context.Context,
config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error {
rpcPort, err := getElemByIndex(rpcPorts, index)
rpcPort, err := datautil.GetElemByIndex(rpcPorts, index)
if err != nil {
return err
}
prometheusPort, err := getElemByIndex(prometheusConfig.Ports, index)
prometheusPort, err := datautil.GetElemByIndex(prometheusConfig.Ports, index)
if err != nil {
return err
}
@ -173,11 +174,3 @@ func gracefulStopWithCtx(ctx context.Context, f func()) error {
return nil
}
}
func getElemByIndex(array []int, index int) (int, error) {
if index < 0 || index >= len(array) {
return 0, errs.New("index out of range", "index", index, "array", array).Wrap()
}
return array[index], nil
}

Loading…
Cancel
Save