optimization: change the configuration file from being read globally to being read independently.

pull/1960/head
Gordon 2 years ago
parent fe0116a811
commit 27e472bf89

@ -15,118 +15,17 @@
package main package main
import ( import (
"context"
"fmt"
"net"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"strconv"
"syscall"
"time"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
_ "net/http/pprof"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/openimsdk/open-im-server/v3/internal/api"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
ginprom "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
) )
func main() { func main() {
apiCmd := cmd.NewApiCmd() apiCmd := cmd.NewApiCmd()
apiCmd.AddPortFlag() apiCmd.AddPortFlag()
apiCmd.AddPrometheusPortFlag() apiCmd.AddPrometheusPortFlag()
apiCmd.AddApi(run)
if err := apiCmd.Execute(); err != nil { if err := apiCmd.Execute(); err != nil {
util.ExitWithError(err) util.ExitWithError(err)
} }
} }
func run(port int, proPort int) error {
if port == 0 || proPort == 0 {
err := "port or proPort is empty:" + strconv.Itoa(port) + "," + strconv.Itoa(proPort)
return errs.Wrap(fmt.Errorf(err))
}
rdb, err := cache.NewRedis()
if err != nil {
return err
}
var client discoveryregistry.SvcDiscoveryRegistry
// Determine whether zk is passed according to whether it is a clustered deployment
client, err = kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery)
if err != nil {
return errs.Wrap(err, "register discovery err")
}
if err = client.CreateRpcRootNodes(config.Config.GetServiceNames()); err != nil {
return errs.Wrap(err, "create rpc root nodes error")
}
if err = client.RegisterConf2Registry(constant.OpenIMCommonConfigKey, config.Config.EncodeConfig()); err != nil {
return err
}
var (
netDone = make(chan struct{}, 1)
netErr error
)
router := api.NewGinRouter(client, rdb)
if config.Config.Prometheus.Enable {
go func() {
p := ginprom.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api"))
p.SetListenAddress(fmt.Sprintf(":%d", proPort))
if err = p.Use(router); err != nil && err != http.ErrServerClosed {
netErr = errs.Wrap(err, fmt.Sprintf("prometheus start err: %d", proPort))
netDone <- struct{}{}
}
}()
}
var address string
if config.Config.Api.ListenIP != "" {
address = net.JoinHostPort(config.Config.Api.ListenIP, strconv.Itoa(port))
} else {
address = net.JoinHostPort("0.0.0.0", strconv.Itoa(port))
}
server := http.Server{Addr: address, Handler: router}
go func() {
err = server.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
netErr = errs.Wrap(err, fmt.Sprintf("api start err: %s", server.Addr))
netDone <- struct{}{}
}
}()
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM)
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
select {
case <-sigs:
util.SIGUSR1Exit()
err := server.Shutdown(ctx)
if err != nil {
return errs.Wrap(err, "shutdown err")
}
case <-netDone:
close(netDone)
return netErr
}
return nil
}

@ -15,14 +15,13 @@
package main package main
import ( import (
"github.com/openimsdk/open-im-server/v3/internal/tools"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
) )
func main() { func main() {
cronTaskCmd := cmd.NewCronTaskCmd() cronTaskCmd := cmd.NewCronTaskCmd()
if err := cronTaskCmd.Exec(tools.StartTask); err != nil { if err := cronTaskCmd.Exec(); err != nil {
util.ExitWithError(err) util.ExitWithError(err)
} }
} }

@ -24,7 +24,6 @@ func main() {
msgGatewayCmd.AddWsPortFlag() msgGatewayCmd.AddWsPortFlag()
msgGatewayCmd.AddPortFlag() msgGatewayCmd.AddPortFlag()
msgGatewayCmd.AddPrometheusPortFlag() msgGatewayCmd.AddPrometheusPortFlag()
if err := msgGatewayCmd.Exec(); err != nil { if err := msgGatewayCmd.Exec(); err != nil {
util.ExitWithError(err) util.ExitWithError(err)
} }

@ -17,18 +17,14 @@ package main
import ( import (
"github.com/openimsdk/open-im-server/v3/internal/push" "github.com/openimsdk/open-im-server/v3/internal/push"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
) )
func main() { func main() {
pushCmd := cmd.NewRpcCmd(cmd.RpcPushServer) pushCmd := cmd.NewRpcCmd(cmd.RpcPushServer, push.Start)
pushCmd.AddPortFlag() pushCmd.AddPortFlag()
pushCmd.AddPrometheusPortFlag() pushCmd.AddPrometheusPortFlag()
if err := pushCmd.Exec(); err != nil { if err := pushCmd.Exec(); err != nil {
panic(err.Error())
}
if err := pushCmd.StartSvr(config.Config.RpcRegisterName.OpenImPushName, push.Start); err != nil {
util.ExitWithError(err) util.ExitWithError(err)
} }
} }

@ -17,19 +17,14 @@ package main
import ( import (
"github.com/openimsdk/open-im-server/v3/internal/rpc/auth" "github.com/openimsdk/open-im-server/v3/internal/rpc/auth"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
) )
func main() { func main() {
authCmd := cmd.NewRpcCmd(cmd.RpcAuthServer) authCmd := cmd.NewRpcCmd(cmd.RpcAuthServer, auth.Start)
authCmd.AddPortFlag() authCmd.AddPortFlag()
authCmd.AddPrometheusPortFlag() authCmd.AddPrometheusPortFlag()
if err := authCmd.Exec(); err != nil { if err := authCmd.Exec(); err != nil {
panic(err.Error())
}
if err := authCmd.StartSvr(config.Config.RpcRegisterName.OpenImAuthName, auth.Start); err != nil {
util.ExitWithError(err) util.ExitWithError(err)
} }
} }

@ -17,18 +17,14 @@ package main
import ( import (
"github.com/openimsdk/open-im-server/v3/internal/rpc/conversation" "github.com/openimsdk/open-im-server/v3/internal/rpc/conversation"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
) )
func main() { func main() {
rpcCmd := cmd.NewRpcCmd(cmd.RpcConversationServer) rpcCmd := cmd.NewRpcCmd(cmd.RpcConversationServer, conversation.Start)
rpcCmd.AddPortFlag() rpcCmd.AddPortFlag()
rpcCmd.AddPrometheusPortFlag() rpcCmd.AddPrometheusPortFlag()
if err := rpcCmd.Exec(); err != nil { if err := rpcCmd.Exec(); err != nil {
panic(err.Error())
}
if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImConversationName, conversation.Start); err != nil {
util.ExitWithError(err) util.ExitWithError(err)
} }
} }

@ -17,18 +17,14 @@ package main
import ( import (
"github.com/openimsdk/open-im-server/v3/internal/rpc/friend" "github.com/openimsdk/open-im-server/v3/internal/rpc/friend"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
) )
func main() { func main() {
rpcCmd := cmd.NewRpcCmd(cmd.RpcFriendServer) rpcCmd := cmd.NewRpcCmd(cmd.RpcFriendServer, friend.Start)
rpcCmd.AddPortFlag() rpcCmd.AddPortFlag()
rpcCmd.AddPrometheusPortFlag() rpcCmd.AddPrometheusPortFlag()
if err := rpcCmd.Exec(); err != nil { if err := rpcCmd.Exec(); err != nil {
panic(err.Error())
}
if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImFriendName, friend.Start); err != nil {
util.ExitWithError(err) util.ExitWithError(err)
} }
} }

@ -17,18 +17,14 @@ package main
import ( import (
"github.com/openimsdk/open-im-server/v3/internal/rpc/group" "github.com/openimsdk/open-im-server/v3/internal/rpc/group"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
) )
func main() { func main() {
rpcCmd := cmd.NewRpcCmd(cmd.RpcGroupServer) rpcCmd := cmd.NewRpcCmd(cmd.RpcGroupServer, group.Start)
rpcCmd.AddPortFlag() rpcCmd.AddPortFlag()
rpcCmd.AddPrometheusPortFlag() rpcCmd.AddPrometheusPortFlag()
if err := rpcCmd.Exec(); err != nil { if err := rpcCmd.Exec(); err != nil {
panic(err.Error())
}
if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImGroupName, group.Start); err != nil {
util.ExitWithError(err) util.ExitWithError(err)
} }
} }

@ -17,18 +17,14 @@ package main
import ( import (
"github.com/openimsdk/open-im-server/v3/internal/rpc/msg" "github.com/openimsdk/open-im-server/v3/internal/rpc/msg"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
) )
func main() { func main() {
rpcCmd := cmd.NewRpcCmd(cmd.RpcMsgServer) rpcCmd := cmd.NewRpcCmd(cmd.RpcMsgServer, msg.Start)
rpcCmd.AddPortFlag() rpcCmd.AddPortFlag()
rpcCmd.AddPrometheusPortFlag() rpcCmd.AddPrometheusPortFlag()
if err := rpcCmd.Exec(); err != nil { if err := rpcCmd.Exec(); err != nil {
panic(err.Error())
}
if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImMsgName, msg.Start); err != nil {
util.ExitWithError(err) util.ExitWithError(err)
} }
} }

@ -17,18 +17,14 @@ package main
import ( import (
"github.com/openimsdk/open-im-server/v3/internal/rpc/third" "github.com/openimsdk/open-im-server/v3/internal/rpc/third"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
) )
func main() { func main() {
rpcCmd := cmd.NewRpcCmd(cmd.RpcThirdServer) rpcCmd := cmd.NewRpcCmd(cmd.RpcThirdServer, third.Start)
rpcCmd.AddPortFlag() rpcCmd.AddPortFlag()
rpcCmd.AddPrometheusPortFlag() rpcCmd.AddPrometheusPortFlag()
if err := rpcCmd.Exec(); err != nil { if err := rpcCmd.Exec(); err != nil {
panic(err.Error())
}
if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImThirdName, third.Start); err != nil {
util.ExitWithError(err) util.ExitWithError(err)
} }
} }

@ -17,18 +17,14 @@ package main
import ( import (
"github.com/openimsdk/open-im-server/v3/internal/rpc/user" "github.com/openimsdk/open-im-server/v3/internal/rpc/user"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
) )
func main() { func main() {
rpcCmd := cmd.NewRpcCmd(cmd.RpcUserServer) rpcCmd := cmd.NewRpcCmd(cmd.RpcUserServer, user.Start)
rpcCmd.AddPortFlag() rpcCmd.AddPortFlag()
rpcCmd.AddPrometheusPortFlag() rpcCmd.AddPrometheusPortFlag()
if err := rpcCmd.Exec(); err != nil { if err := rpcCmd.Exec(); err != nil {
panic(err.Error())
}
if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImUserName, user.Start); err != nil {
util.ExitWithError(err) util.ExitWithError(err)
} }
} }

@ -17,7 +17,17 @@ package api
import ( import (
"context" "context"
"fmt" "fmt"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
ginprom "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
"net"
"net/http" "net/http"
"os"
"os/signal"
"strconv"
"syscall"
"time"
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/tools/apiresp" "github.com/OpenIMSDK/tools/apiresp"
@ -43,8 +53,87 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
) )
func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.UniversalClient) *gin.Engine { func Start(config *config.GlobalConfig, port int, proPort int) error {
discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) // 默认RPC中间件 if port == 0 || proPort == 0 {
err := "port or proPort is empty:" + strconv.Itoa(port) + "," + strconv.Itoa(proPort)
return errs.Wrap(fmt.Errorf(err))
}
rdb, err := cache.NewRedis()
if err != nil {
return err
}
var client discoveryregistry.SvcDiscoveryRegistry
// Determine whether zk is passed according to whether it is a clustered deployment
client, err = kdisc.NewDiscoveryRegister(config.Envs.Discovery)
if err != nil {
return errs.Wrap(err, "register discovery err")
}
if err = client.CreateRpcRootNodes(config.GetServiceNames()); err != nil {
return errs.Wrap(err, "create rpc root nodes error")
}
if err = client.RegisterConf2Registry(constant.OpenIMCommonConfigKey, config.EncodeConfig()); err != nil {
return errs.Wrap(err)
}
var (
netDone = make(chan struct{}, 1)
netErr error
)
router := newGinRouter(client, rdb)
if config.Prometheus.Enable {
go func() {
p := ginprom.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api"))
p.SetListenAddress(fmt.Sprintf(":%d", proPort))
if err = p.Use(router); err != nil && err != http.ErrServerClosed {
netErr = errs.Wrap(err, fmt.Sprintf("prometheus start err: %d", proPort))
netDone <- struct{}{}
}
}()
}
var address string
if config.Api.ListenIP != "" {
address = net.JoinHostPort(config.Api.ListenIP, strconv.Itoa(port))
} else {
address = net.JoinHostPort("0.0.0.0", strconv.Itoa(port))
}
server := http.Server{Addr: address, Handler: router}
go func() {
err = server.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
netErr = errs.Wrap(err, fmt.Sprintf("api start err: %s", server.Addr))
netDone <- struct{}{}
}
}()
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM)
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
select {
case <-sigs:
util.SIGUSR1Exit()
err := server.Shutdown(ctx)
if err != nil {
return errs.Wrap(err, "shutdown err")
}
case <-netDone:
close(netDone)
return netErr
}
return nil
}
func newGinRouter(disCov discoveryregistry.SvcDiscoveryRegistry, rdb redis.UniversalClient) *gin.Engine {
disCov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) // 默认RPC中间件
gin.SetMode(gin.ReleaseMode) gin.SetMode(gin.ReleaseMode)
r := gin.New() r := gin.New()
if v, ok := binding.Validator.Engine().(*validator.Validate); ok { if v, ok := binding.Validator.Engine().(*validator.Validate); ok {
@ -53,13 +142,13 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive
log.ZInfo(context.Background(), "load config", "config", config.Config) log.ZInfo(context.Background(), "load config", "config", config.Config)
r.Use(gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID()) r.Use(gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID())
// init rpc client here // init rpc client here
userRpc := rpcclient.NewUser(discov) userRpc := rpcclient.NewUser(disCov)
groupRpc := rpcclient.NewGroup(discov) groupRpc := rpcclient.NewGroup(disCov)
friendRpc := rpcclient.NewFriend(discov) friendRpc := rpcclient.NewFriend(disCov)
messageRpc := rpcclient.NewMessage(discov) messageRpc := rpcclient.NewMessage(disCov)
conversationRpc := rpcclient.NewConversation(discov) conversationRpc := rpcclient.NewConversation(disCov)
authRpc := rpcclient.NewAuth(discov) authRpc := rpcclient.NewAuth(disCov)
thirdRpc := rpcclient.NewThird(discov) thirdRpc := rpcclient.NewThird(disCov)
u := NewUserApi(*userRpc) u := NewUserApi(*userRpc)
m := NewMessageApi(messageRpc, userRpc) m := NewMessageApi(messageRpc, userRpc)

@ -33,7 +33,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
) )
func (s *Server) InitServer(disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { func (s *Server) InitServer(config *config.GlobalConfig, disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis() rdb, err := cache.NewRedis()
if err != nil { if err != nil {
return err return err
@ -46,11 +46,12 @@ func (s *Server) InitServer(disCov discoveryregistry.SvcDiscoveryRegistry, serve
return nil return nil
} }
func (s *Server) Start() error { func (s *Server) Start(conf *config.GlobalConfig) error {
return startrpc.Start( return startrpc.Start(
s.rpcPort, s.rpcPort,
config.Config.RpcRegisterName.OpenImMessageGatewayName, config.Config.RpcRegisterName.OpenImMessageGatewayName,
s.prometheusPort, s.prometheusPort,
conf,
s.InitServer, s.InitServer,
) )
} }

@ -22,7 +22,7 @@ import (
) )
// RunWsAndServer run ws server. // RunWsAndServer run ws server.
func RunWsAndServer(rpcPort, wsPort, prometheusPort int) error { func RunWsAndServer(conf *config.GlobalConfig, rpcPort, wsPort, prometheusPort int) error {
fmt.Println( fmt.Println(
"start rpc/msg_gateway server, port: ", "start rpc/msg_gateway server, port: ",
rpcPort, rpcPort,
@ -33,10 +33,10 @@ func RunWsAndServer(rpcPort, wsPort, prometheusPort int) error {
) )
longServer, err := NewWsServer( longServer, err := NewWsServer(
WithPort(wsPort), WithPort(wsPort),
WithMaxConnNum(int64(config.Config.LongConnSvr.WebsocketMaxConnNum)), WithMaxConnNum(int64(conf.LongConnSvr.WebsocketMaxConnNum)),
WithHandshakeTimeout(time.Duration(config.Config.LongConnSvr.WebsocketTimeout)*time.Second), WithHandshakeTimeout(time.Duration(conf.LongConnSvr.WebsocketTimeout)*time.Second),
WithMessageMaxMsgLength(config.Config.LongConnSvr.WebsocketMaxMsgLen), WithMessageMaxMsgLength(conf.LongConnSvr.WebsocketMaxMsgLen),
WithWriteBufferSize(config.Config.LongConnSvr.WebsocketWriteBufferSize), WithWriteBufferSize(conf.LongConnSvr.WebsocketWriteBufferSize),
) )
if err != nil { if err != nil {
return err return err
@ -45,7 +45,7 @@ func RunWsAndServer(rpcPort, wsPort, prometheusPort int) error {
hubServer := NewServer(rpcPort, prometheusPort, longServer) hubServer := NewServer(rpcPort, prometheusPort, longServer)
netDone := make(chan error) netDone := make(chan error)
go func() { go func() {
err = hubServer.Start() err = hubServer.Start(conf)
if err != nil { if err != nil {
netDone <- err netDone <- err
} }

@ -52,7 +52,7 @@ type MsgTransfer struct {
cancel context.CancelFunc cancel context.CancelFunc
} }
func StartTransfer(prometheusPort int) error { func StartTransfer(config *config.GlobalConfig, prometheusPort int) error {
rdb, err := cache.NewRedis() rdb, err := cache.NewRedis()
if err != nil { if err != nil {
return err return err
@ -66,12 +66,12 @@ func StartTransfer(prometheusPort int) error {
if err = mongo.CreateMsgIndex(); err != nil { if err = mongo.CreateMsgIndex(); err != nil {
return err return err
} }
client, err := kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery) client, err := kdisc.NewDiscoveryRegister(config.Envs.Discovery)
if err != nil { if err != nil {
return err return err
} }
if err := client.CreateRpcRootNodes(config.Config.GetServiceNames()); err != nil { if err := client.CreateRpcRootNodes(config.GetServiceNames()); err != nil {
return err return err
} }
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))

@ -16,6 +16,7 @@ package push
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/OpenIMSDK/tools/utils" "github.com/OpenIMSDK/tools/utils"
@ -34,9 +35,10 @@ import (
type pushServer struct { type pushServer struct {
pusher *Pusher pusher *Pusher
config *config.GlobalConfig
} }
func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis() rdb, err := cache.NewRedis()
if err != nil { if err != nil {
return err return err
@ -60,6 +62,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
pbpush.RegisterPushMsgServiceServer(server, &pushServer{ pbpush.RegisterPushMsgServiceServer(server, &pushServer{
pusher: pusher, pusher: pusher,
config: config,
}) })
consumer, err := NewConsumer(pusher) consumer, err := NewConsumer(pusher)

@ -42,9 +42,10 @@ type authServer struct {
authDatabase controller.AuthDatabase authDatabase controller.AuthDatabase
userRpcClient *rpcclient.UserRpcClient userRpcClient *rpcclient.UserRpcClient
RegisterCenter discoveryregistry.SvcDiscoveryRegistry RegisterCenter discoveryregistry.SvcDiscoveryRegistry
config *config.GlobalConfig
} }
func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis() rdb, err := cache.NewRedis()
if err != nil { if err != nil {
return err return err
@ -55,9 +56,10 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
RegisterCenter: client, RegisterCenter: client,
authDatabase: controller.NewAuthDatabase( authDatabase: controller.NewAuthDatabase(
cache.NewMsgCacheModel(rdb), cache.NewMsgCacheModel(rdb),
config.Config.Secret, config.Secret,
config.Config.TokenPolicy.Expire, config.TokenPolicy.Expire,
), ),
config: config,
}) })
return nil return nil
} }

@ -17,6 +17,7 @@ package conversation
import ( import (
"context" "context"
"errors" "errors"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"sort" "sort"
"github.com/OpenIMSDK/protocol/sdkws" "github.com/OpenIMSDK/protocol/sdkws"
@ -49,6 +50,7 @@ type conversationServer struct {
groupRpcClient *rpcclient.GroupRpcClient groupRpcClient *rpcclient.GroupRpcClient
conversationDatabase controller.ConversationDatabase conversationDatabase controller.ConversationDatabase
conversationNotificationSender *notification.ConversationNotificationSender conversationNotificationSender *notification.ConversationNotificationSender
config *config.GlobalConfig
} }
func (c *conversationServer) GetConversationNotReceiveMessageUserIDs( func (c *conversationServer) GetConversationNotReceiveMessageUserIDs(
@ -59,7 +61,7 @@ func (c *conversationServer) GetConversationNotReceiveMessageUserIDs(
panic("implement me") panic("implement me")
} }
func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis() rdb, err := cache.NewRedis()
if err != nil { if err != nil {
return err return err
@ -81,6 +83,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
conversationNotificationSender: notification.NewConversationNotificationSender(&msgRpcClient), conversationNotificationSender: notification.NewConversationNotificationSender(&msgRpcClient),
groupRpcClient: &groupRpcClient, groupRpcClient: &groupRpcClient,
conversationDatabase: controller.NewConversationDatabase(conversationDB, cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), tx.NewMongo(mongo.GetClient())), conversationDatabase: controller.NewConversationDatabase(conversationDB, cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), tx.NewMongo(mongo.GetClient())),
config: config,
}) })
return nil return nil
} }

@ -16,6 +16,7 @@ package friend
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/OpenIMSDK/tools/tx" "github.com/OpenIMSDK/tools/tx"
@ -51,9 +52,10 @@ type friendServer struct {
notificationSender *notification.FriendNotificationSender notificationSender *notification.FriendNotificationSender
conversationRpcClient rpcclient.ConversationRpcClient conversationRpcClient rpcclient.ConversationRpcClient
RegisterCenter registry.SvcDiscoveryRegistry RegisterCenter registry.SvcDiscoveryRegistry
config *config.GlobalConfig
} }
func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error { func Start(config *config.GlobalConfig, client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
// Initialize MongoDB // Initialize MongoDB
mongo, err := unrelation.NewMongo() mongo, err := unrelation.NewMongo()
if err != nil { if err != nil {
@ -106,6 +108,7 @@ func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
notificationSender: notificationSender, notificationSender: notificationSender,
RegisterCenter: client, RegisterCenter: client,
conversationRpcClient: rpcclient.NewConversationRpcClient(client), conversationRpcClient: rpcclient.NewConversationRpcClient(client),
config: config,
}) })
return nil return nil
@ -113,13 +116,12 @@ func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
// ok. // ok.
func (s *friendServer) ApplyToAddFriend(ctx context.Context, req *pbfriend.ApplyToAddFriendReq) (resp *pbfriend.ApplyToAddFriendResp, err error) { func (s *friendServer) ApplyToAddFriend(ctx context.Context, req *pbfriend.ApplyToAddFriendReq) (resp *pbfriend.ApplyToAddFriendResp, err error) {
defer log.ZInfo(ctx, utils.GetFuncName()+" Return")
resp = &pbfriend.ApplyToAddFriendResp{} resp = &pbfriend.ApplyToAddFriendResp{}
if err := authverify.CheckAccessV3(ctx, req.FromUserID); err != nil { if err := authverify.CheckAccessV3(ctx, req.FromUserID); err != nil {
return nil, err return nil, err
} }
if req.ToUserID == req.FromUserID { if req.ToUserID == req.FromUserID {
return nil, errs.ErrCanNotAddYourself.Wrap() return nil, errs.ErrCanNotAddYourself.Wrap("req.ToUserID", req.ToUserID)
} }
if err = CallbackBeforeAddFriend(ctx, req); err != nil && err != errs.ErrCallbackContinue { if err = CallbackBeforeAddFriend(ctx, req); err != nil && err != errs.ErrCallbackContinue {
return nil, err return nil, err

@ -17,6 +17,7 @@ package group
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"math/big" "math/big"
"math/rand" "math/rand"
"strconv" "strconv"
@ -59,7 +60,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
) )
func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
mongo, err := unrelation.NewMongo() mongo, err := unrelation.NewMongo()
if err != nil { if err != nil {
return err return err
@ -96,6 +97,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
}) })
gs.conversationRpcClient = conversationRpcClient gs.conversationRpcClient = conversationRpcClient
gs.msgRpcClient = msgRpcClient gs.msgRpcClient = msgRpcClient
gs.config = config
pbgroup.RegisterGroupServer(server, &gs) pbgroup.RegisterGroupServer(server, &gs)
return nil return nil
} }
@ -106,6 +108,7 @@ type groupServer struct {
Notification *notification.GroupNotificationSender Notification *notification.GroupNotificationSender
conversationRpcClient rpcclient.ConversationRpcClient conversationRpcClient rpcclient.ConversationRpcClient
msgRpcClient rpcclient.MessageRpcClient msgRpcClient rpcclient.MessageRpcClient
config *config.GlobalConfig
} }
func (s *groupServer) GetJoinedGroupIDs(ctx context.Context, req *pbgroup.GetJoinedGroupIDsReq) (*pbgroup.GetJoinedGroupIDsResp, error) { func (s *groupServer) GetJoinedGroupIDs(ctx context.Context, req *pbgroup.GetJoinedGroupIDsReq) (*pbgroup.GetJoinedGroupIDsResp, error) {

@ -16,6 +16,7 @@ package msg
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -44,6 +45,7 @@ type (
ConversationLocalCache *localcache.ConversationLocalCache ConversationLocalCache *localcache.ConversationLocalCache
Handlers MessageInterceptorChain Handlers MessageInterceptorChain
notificationSender *rpcclient.NotificationSender notificationSender *rpcclient.NotificationSender
config *config.GlobalConfig
} }
) )
@ -62,7 +64,7 @@ func (m *msgServer) execInterceptorHandler(ctx context.Context, req *msg.SendMsg
return nil return nil
} }
func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis() rdb, err := cache.NewRedis()
if err != nil { if err != nil {
return err return err
@ -93,6 +95,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
GroupLocalCache: localcache.NewGroupLocalCache(&groupRpcClient), GroupLocalCache: localcache.NewGroupLocalCache(&groupRpcClient),
ConversationLocalCache: localcache.NewConversationLocalCache(&conversationClient), ConversationLocalCache: localcache.NewConversationLocalCache(&conversationClient),
friend: &friendRpcClient, friend: &friendRpcClient,
config: config,
} }
s.notificationSender = rpcclient.NewNotificationSender(rpcclient.WithLocalSendMsg(s.SendMsg)) s.notificationSender = rpcclient.NewNotificationSender(rpcclient.WithLocalSendMsg(s.SendMsg))
s.addInterceptorHandler(MessageHasReadEnabled) s.addInterceptorHandler(MessageHasReadEnabled)

@ -39,7 +39,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
) )
func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
mongo, err := unrelation.NewMongo() mongo, err := unrelation.NewMongo()
if err != nil { if err != nil {
return err return err
@ -52,11 +52,11 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
if err != nil { if err != nil {
return err return err
} }
apiURL := config.Config.Object.ApiURL apiURL := config.Object.ApiURL
if apiURL == "" { if apiURL == "" {
return fmt.Errorf("api url is empty") return fmt.Errorf("api url is empty")
} }
if _, err := url.Parse(config.Config.Object.ApiURL); err != nil { if _, err := url.Parse(config.Object.ApiURL); err != nil {
return err return err
} }
if apiURL[len(apiURL)-1] != '/' { if apiURL[len(apiURL)-1] != '/' {
@ -68,9 +68,9 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
return err return err
} }
// 根据配置文件策略选择 oss 方式 // 根据配置文件策略选择 oss 方式
enable := config.Config.Object.Enable enable := config.Object.Enable
var o s3.Interface var o s3.Interface
switch config.Config.Object.Enable { switch config.Object.Enable {
case "minio": case "minio":
o, err = minio.NewMinio(cache.NewMinioCache(rdb)) o, err = minio.NewMinio(cache.NewMinioCache(rdb))
case "cos": case "cos":
@ -89,6 +89,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
userRpcClient: rpcclient.NewUserRpcClient(client), userRpcClient: rpcclient.NewUserRpcClient(client),
s3dataBase: controller.NewS3Database(rdb, o, s3db), s3dataBase: controller.NewS3Database(rdb, o, s3db),
defaultExpire: time.Hour * 24 * 7, defaultExpire: time.Hour * 24 * 7,
config: config,
}) })
return nil return nil
} }
@ -99,6 +100,7 @@ type thirdServer struct {
s3dataBase controller.S3Database s3dataBase controller.S3Database
userRpcClient rpcclient.UserRpcClient userRpcClient rpcclient.UserRpcClient
defaultExpire time.Duration defaultExpire time.Duration
config *config.GlobalConfig
} }
func (t *thirdServer) FcmUpdateToken(ctx context.Context, req *third.FcmUpdateTokenReq) (resp *third.FcmUpdateTokenResp, err error) { func (t *thirdServer) FcmUpdateToken(ctx context.Context, req *third.FcmUpdateTokenReq) (resp *third.FcmUpdateTokenResp, err error) {

@ -59,6 +59,7 @@ type userServer struct {
friendRpcClient *rpcclient.FriendRpcClient friendRpcClient *rpcclient.FriendRpcClient
groupRpcClient *rpcclient.GroupRpcClient groupRpcClient *rpcclient.GroupRpcClient
RegisterCenter registry.SvcDiscoveryRegistry RegisterCenter registry.SvcDiscoveryRegistry
config *config.GlobalConfig
} }
func (s *userServer) GetGroupOnlineUser(ctx context.Context, req *pbuser.GetGroupOnlineUserReq) (*pbuser.GetGroupOnlineUserResp, error) { func (s *userServer) GetGroupOnlineUser(ctx context.Context, req *pbuser.GetGroupOnlineUserReq) (*pbuser.GetGroupOnlineUserResp, error) {
@ -66,7 +67,7 @@ func (s *userServer) GetGroupOnlineUser(ctx context.Context, req *pbuser.GetGrou
panic("implement me") panic("implement me")
} }
func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error { func Start(config *config.GlobalConfig, client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis() rdb, err := cache.NewRedis()
if err != nil { if err != nil {
return err return err
@ -76,11 +77,11 @@ func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
return err return err
} }
users := make([]*tablerelation.UserModel, 0) users := make([]*tablerelation.UserModel, 0)
if len(config.Config.IMAdmin.UserID) != len(config.Config.IMAdmin.Nickname) { if len(config.IMAdmin.UserID) != len(config.IMAdmin.Nickname) {
return errors.New("len(config.Config.AppNotificationAdmin.AppManagerUid) != len(config.Config.AppNotificationAdmin.Nickname)") return errors.New("len(config.Config.AppNotificationAdmin.AppManagerUid) != len(config.Config.AppNotificationAdmin.Nickname)")
} }
for k, v := range config.Config.IMAdmin.UserID { for k, v := range config.IMAdmin.UserID {
users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.Config.IMAdmin.Nickname[k], AppMangerLevel: constant.AppNotificationAdmin}) users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.IMAdmin.Nickname[k], AppMangerLevel: constant.AppNotificationAdmin})
} }
userDB, err := mgo.NewUserMongo(mongo.GetDatabase()) userDB, err := mgo.NewUserMongo(mongo.GetDatabase())
if err != nil { if err != nil {
@ -99,6 +100,7 @@ func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
groupRpcClient: &groupRpcClient, groupRpcClient: &groupRpcClient,
friendNotificationSender: notification.NewFriendNotificationSender(&msgRpcClient, notification.WithDBFunc(database.FindWithError)), friendNotificationSender: notification.NewFriendNotificationSender(&msgRpcClient, notification.WithDBFunc(database.FindWithError)),
userNotificationSender: notification.NewUserNotificationSender(&msgRpcClient, notification.WithUserFunc(database.FindWithError)), userNotificationSender: notification.NewUserNotificationSender(&msgRpcClient, notification.WithUserFunc(database.FindWithError)),
config: config,
} }
pbuser.RegisterUserServer(server, u) pbuser.RegisterUserServer(server, u)
return u.UserDatabase.InitOnce(context.Background(), users) return u.UserDatabase.InitOnce(context.Background(), users)

@ -31,8 +31,8 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
) )
func StartTask() error { func StartTask(config *config.GlobalConfig) error {
fmt.Println("cron task start, config", config.Config.ChatRecordsClearTime) fmt.Println("cron task start, config", config.ChatRecordsClearTime)
msgTool, err := InitMsgTool() msgTool, err := InitMsgTool()
if err != nil { if err != nil {
@ -48,14 +48,14 @@ func StartTask() error {
// register cron tasks // register cron tasks
var crontab = cron.New() var crontab = cron.New()
fmt.Println("start chatRecordsClearTime cron task", "cron config", config.Config.ChatRecordsClearTime) fmt.Println("start chatRecordsClearTime cron task", "cron config", config.ChatRecordsClearTime)
_, err = crontab.AddFunc(config.Config.ChatRecordsClearTime, cronWrapFunc(rdb, "cron_clear_msg_and_fix_seq", msgTool.AllConversationClearMsgAndFixSeq)) _, err = crontab.AddFunc(config.ChatRecordsClearTime, cronWrapFunc(config, rdb, "cron_clear_msg_and_fix_seq", msgTool.AllConversationClearMsgAndFixSeq))
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
fmt.Println("start msgDestruct cron task", "cron config", config.Config.MsgDestructTime) fmt.Println("start msgDestruct cron task", "cron config", config.MsgDestructTime)
_, err = crontab.AddFunc(config.Config.MsgDestructTime, cronWrapFunc(rdb, "cron_conversations_destruct_msgs", msgTool.ConversationsDestructMsgs)) _, err = crontab.AddFunc(config.MsgDestructTime, cronWrapFunc(config, rdb, "cron_conversations_destruct_msgs", msgTool.ConversationsDestructMsgs))
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
@ -93,8 +93,8 @@ func netlock(rdb redis.UniversalClient, key string, ttl time.Duration) bool {
return ok return ok
} }
func cronWrapFunc(rdb redis.UniversalClient, key string, fn func()) func() { func cronWrapFunc(config *config.GlobalConfig, rdb redis.UniversalClient, key string, fn func()) func() {
enableCronLocker := config.Config.EnableCronLocker enableCronLocker := config.EnableCronLocker
return func() { return func() {
// if don't enable cron-locker, call fn directly. // if don't enable cron-locker, call fn directly.
if !enableCronLocker { if !enableCronLocker {

@ -44,7 +44,7 @@ func CheckAccessV3(ctx context.Context, ownerUserID string) (err error) {
if opUserID == ownerUserID { if opUserID == ownerUserID {
return nil return nil
} }
return errs.ErrNoPermission.Wrap(utils.GetSelfFuncName()) return errs.ErrNoPermission.Wrap("ownerUserID", ownerUserID)
} }
func IsAppManagerUid(ctx context.Context) bool { func IsAppManagerUid(ctx context.Context) bool {

@ -16,33 +16,43 @@ package cmd
import ( import (
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
"github.com/openimsdk/open-im-server/v3/internal/api"
"github.com/spf13/cobra" "github.com/spf13/cobra"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
) )
type ApiCmd struct { type ApiCmd struct {
*RootCmd *RootCmd
initFunc func(config *config.GlobalConfig, port int, promPort int) error
} }
func NewApiCmd() *ApiCmd { func NewApiCmd() *ApiCmd {
ret := &ApiCmd{NewRootCmd("api")} ret := &ApiCmd{RootCmd: NewRootCmd("api"), initFunc: api.Start}
ret.SetRootCmdPt(ret) ret.SetRootCmdPt(ret)
ret.addPreRun()
ret.addRunE()
return ret return ret
} }
func (a *ApiCmd) AddApi(f func(port int, promPort int) error) { func (a *ApiCmd) addPreRun() {
a.Command.PreRun = func(cmd *cobra.Command, args []string) {
a.port = a.getPortFlag(cmd)
a.prometheusPort = a.getPrometheusPortFlag(cmd)
}
}
func (a *ApiCmd) addRunE() {
a.Command.RunE = func(cmd *cobra.Command, args []string) error { a.Command.RunE = func(cmd *cobra.Command, args []string) error {
return f(a.getPortFlag(cmd), a.getPrometheusPortFlag(cmd)) return a.initFunc(a.config, a.port, a.prometheusPort)
} }
} }
func (a *ApiCmd) GetPortFromConfig(portType string) int { func (a *ApiCmd) GetPortFromConfig(portType string) int {
if portType == constant.FlagPort { if portType == constant.FlagPort {
return config2.Config.Api.OpenImApiPort[0] return a.config.Api.OpenImApiPort[0]
} else if portType == constant.FlagPrometheusPort { } else if portType == constant.FlagPrometheusPort {
return config2.Config.Prometheus.ApiPrometheusPort[0] return a.config.Prometheus.ApiPrometheusPort[0]
} }
return 0 return 0
} }

@ -14,25 +14,30 @@
package cmd package cmd
import "github.com/spf13/cobra" import (
"github.com/openimsdk/open-im-server/v3/internal/tools"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/spf13/cobra"
)
type CronTaskCmd struct { type CronTaskCmd struct {
*RootCmd *RootCmd
initFunc func(config *config.GlobalConfig) error
} }
func NewCronTaskCmd() *CronTaskCmd { func NewCronTaskCmd() *CronTaskCmd {
ret := &CronTaskCmd{NewRootCmd("cronTask", WithCronTaskLogName())} ret := &CronTaskCmd{RootCmd: NewRootCmd("cronTask", WithCronTaskLogName()),
initFunc: tools.StartTask}
ret.SetRootCmdPt(ret) ret.SetRootCmdPt(ret)
return ret return ret
} }
func (c *CronTaskCmd) addRunE(f func() error) { func (c *CronTaskCmd) addRunE() {
c.Command.RunE = func(cmd *cobra.Command, args []string) error { c.Command.RunE = func(cmd *cobra.Command, args []string) error {
return f() return c.initFunc(c.config)
} }
} }
func (c *CronTaskCmd) Exec(f func() error) error { func (c *CronTaskCmd) Exec() error {
c.addRunE(f)
return c.Execute() return c.Execute()
} }

@ -31,6 +31,7 @@ type MsgGatewayCmd struct {
func NewMsgGatewayCmd() *MsgGatewayCmd { func NewMsgGatewayCmd() *MsgGatewayCmd {
ret := &MsgGatewayCmd{NewRootCmd("msgGateway")} ret := &MsgGatewayCmd{NewRootCmd("msgGateway")}
ret.addRunE()
ret.SetRootCmdPt(ret) ret.SetRootCmdPt(ret)
return ret return ret
} }
@ -52,12 +53,11 @@ func (m *MsgGatewayCmd) getWsPortFlag(cmd *cobra.Command) int {
func (m *MsgGatewayCmd) addRunE() { func (m *MsgGatewayCmd) addRunE() {
m.Command.RunE = func(cmd *cobra.Command, args []string) error { m.Command.RunE = func(cmd *cobra.Command, args []string) error {
return msggateway.RunWsAndServer(m.getPortFlag(cmd), m.getWsPortFlag(cmd), m.getPrometheusPortFlag(cmd)) return msggateway.RunWsAndServer(m.config, m.getPortFlag(cmd), m.getWsPortFlag(cmd), m.getPrometheusPortFlag(cmd))
} }
} }
func (m *MsgGatewayCmd) Exec() error { func (m *MsgGatewayCmd) Exec() error {
m.addRunE()
return m.Execute() return m.Execute()
} }

@ -20,8 +20,6 @@ import (
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
"github.com/spf13/cobra" "github.com/spf13/cobra"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/internal/msgtransfer" "github.com/openimsdk/open-im-server/v3/internal/msgtransfer"
) )
@ -31,18 +29,18 @@ type MsgTransferCmd struct {
func NewMsgTransferCmd() *MsgTransferCmd { func NewMsgTransferCmd() *MsgTransferCmd {
ret := &MsgTransferCmd{NewRootCmd("msgTransfer")} ret := &MsgTransferCmd{NewRootCmd("msgTransfer")}
ret.addRunE()
ret.SetRootCmdPt(ret) ret.SetRootCmdPt(ret)
return ret return ret
} }
func (m *MsgTransferCmd) addRunE() { func (m *MsgTransferCmd) addRunE() {
m.Command.RunE = func(cmd *cobra.Command, args []string) error { m.Command.RunE = func(cmd *cobra.Command, args []string) error {
return msgtransfer.StartTransfer(m.getPrometheusPortFlag(cmd)) return msgtransfer.StartTransfer(m.config, m.getPrometheusPortFlag(cmd))
} }
} }
func (m *MsgTransferCmd) Exec() error { func (m *MsgTransferCmd) Exec() error {
m.addRunE()
return m.Execute() return m.Execute()
} }
@ -51,7 +49,7 @@ func (m *MsgTransferCmd) GetPortFromConfig(portType string) int {
return 0 return 0
} else if portType == constant.FlagPrometheusPort { } else if portType == constant.FlagPrometheusPort {
n := m.getTransferProgressFlagValue() n := m.getTransferProgressFlagValue()
return config2.Config.Prometheus.MessageTransferPrometheusPort[n] return m.config.Prometheus.MessageTransferPrometheusPort[n]
} }
return 0 return 0
} }
@ -61,10 +59,10 @@ func (m *MsgTransferCmd) AddTransferProgressFlag() {
} }
func (m *MsgTransferCmd) getTransferProgressFlagValue() int { func (m *MsgTransferCmd) getTransferProgressFlagValue() int {
nindex, err := m.Command.Flags().GetInt(constant.FlagTransferProgressIndex) nIndex, err := m.Command.Flags().GetInt(constant.FlagTransferProgressIndex)
if err != nil { if err != nil {
fmt.Println("get transfercmd error,make sure it is k8s env or not") fmt.Println("get transfer cmd error,make sure it is k8s env or not")
return 0 return 0
} }
return nindex return nIndex
} }

@ -36,6 +36,11 @@ type RootCmd struct {
port int port int
prometheusPort int prometheusPort int
cmdItf RootCmdPt cmdItf RootCmdPt
config *config.GlobalConfig
}
func (rc *RootCmd) Port() int {
return rc.port
} }
type CmdOpts struct { type CmdOpts struct {
@ -55,7 +60,7 @@ func WithLogName(logName string) func(*CmdOpts) {
} }
func NewRootCmd(name string, opts ...func(*CmdOpts)) *RootCmd { func NewRootCmd(name string, opts ...func(*CmdOpts)) *RootCmd {
rootCmd := &RootCmd{Name: name} rootCmd := &RootCmd{Name: name, config: config.NewGlobalConfig()}
cmd := cobra.Command{ cmd := cobra.Command{
Use: "Start openIM application", Use: "Start openIM application",
Short: fmt.Sprintf(`Start %s `, name), Short: fmt.Sprintf(`Start %s `, name),
@ -97,7 +102,7 @@ func (rc *RootCmd) applyOptions(opts ...func(*CmdOpts)) *CmdOpts {
} }
func (rc *RootCmd) initializeLogger(cmdOpts *CmdOpts) error { func (rc *RootCmd) initializeLogger(cmdOpts *CmdOpts) error {
logConfig := config.Config.Log logConfig := rc.config.Log
return log.InitFromConfig( return log.InitFromConfig(
@ -164,7 +169,7 @@ func (r *RootCmd) GetPrometheusPortFlag() int {
func (r *RootCmd) getConfFromCmdAndInit(cmdLines *cobra.Command) error { func (r *RootCmd) getConfFromCmdAndInit(cmdLines *cobra.Command) error {
configFolderPath, _ := cmdLines.Flags().GetString(constant.FlagConf) configFolderPath, _ := cmdLines.Flags().GetString(constant.FlagConf)
fmt.Println("The directory of the configuration file to start the process:", configFolderPath) fmt.Println("The directory of the configuration file to start the process:", configFolderPath)
return config2.InitConfig(configFolderPath) return config2.InitConfig(r.config, configFolderPath)
} }
func (r *RootCmd) Execute() error { func (r *RootCmd) Execute() error {

@ -16,6 +16,7 @@ package cmd
import ( import (
"errors" "errors"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -28,89 +29,131 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
) )
type rpcInitFuc func(config *config2.GlobalConfig, disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error
type RpcCmd struct { type RpcCmd struct {
*RootCmd *RootCmd
RpcRegisterName string
initFunc rpcInitFuc
} }
func NewRpcCmd(name string) *RpcCmd { func NewRpcCmd(name string, initFunc rpcInitFuc) *RpcCmd {
ret := &RpcCmd{NewRootCmd(name)} ret := &RpcCmd{RootCmd: NewRootCmd(name), initFunc: initFunc}
ret.addPreRun()
ret.addRunE()
ret.SetRootCmdPt(ret) ret.SetRootCmdPt(ret)
return ret return ret
} }
func (a *RpcCmd) Exec() error { func (a *RpcCmd) addPreRun() {
a.Command.Run = func(cmd *cobra.Command, args []string) { a.Command.PreRun = func(cmd *cobra.Command, args []string) {
a.port = a.getPortFlag(cmd) a.port = a.getPortFlag(cmd)
a.prometheusPort = a.getPrometheusPortFlag(cmd) a.prometheusPort = a.getPrometheusPortFlag(cmd)
} }
}
func (a *RpcCmd) addRunE() {
a.Command.RunE = func(cmd *cobra.Command, args []string) error {
rpcRegisterName, err := a.GetRpcRegisterNameFromConfig()
if err != nil {
return err
} else {
return a.StartSvr(rpcRegisterName, a.initFunc)
}
}
}
func (a *RpcCmd) Exec() error {
return a.Execute() return a.Execute()
} }
func (a *RpcCmd) StartSvr(name string, rpcFn func(discov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error) error { func (a *RpcCmd) StartSvr(name string, rpcFn func(config *config2.GlobalConfig, disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error) error {
if a.GetPortFlag() == 0 { if a.GetPortFlag() == 0 {
return errors.New("port is required") return errs.Wrap(errors.New("port is required"))
} }
return startrpc.Start(a.GetPortFlag(), name, a.GetPrometheusPortFlag(), rpcFn) return startrpc.Start(a.GetPortFlag(), name, a.GetPrometheusPortFlag(), a.config, rpcFn)
} }
func (a *RpcCmd) GetPortFromConfig(portType string) int { func (a *RpcCmd) GetPortFromConfig(portType string) int {
switch a.Name { switch a.Name {
case RpcPushServer: case RpcPushServer:
if portType == constant.FlagPort { if portType == constant.FlagPort {
return config2.Config.RpcPort.OpenImPushPort[0] return a.config.RpcPort.OpenImPushPort[0]
} }
if portType == constant.FlagPrometheusPort { if portType == constant.FlagPrometheusPort {
return config2.Config.Prometheus.PushPrometheusPort[0] return a.config.Prometheus.PushPrometheusPort[0]
} }
case RpcAuthServer: case RpcAuthServer:
if portType == constant.FlagPort { if portType == constant.FlagPort {
return config2.Config.RpcPort.OpenImAuthPort[0] return a.config.RpcPort.OpenImAuthPort[0]
} }
if portType == constant.FlagPrometheusPort { if portType == constant.FlagPrometheusPort {
return config2.Config.Prometheus.AuthPrometheusPort[0] return a.config.Prometheus.AuthPrometheusPort[0]
} }
case RpcConversationServer: case RpcConversationServer:
if portType == constant.FlagPort { if portType == constant.FlagPort {
return config2.Config.RpcPort.OpenImConversationPort[0] return a.config.RpcPort.OpenImConversationPort[0]
} }
if portType == constant.FlagPrometheusPort { if portType == constant.FlagPrometheusPort {
return config2.Config.Prometheus.ConversationPrometheusPort[0] return a.config.Prometheus.ConversationPrometheusPort[0]
} }
case RpcFriendServer: case RpcFriendServer:
if portType == constant.FlagPort { if portType == constant.FlagPort {
return config2.Config.RpcPort.OpenImFriendPort[0] return a.config.RpcPort.OpenImFriendPort[0]
} }
if portType == constant.FlagPrometheusPort { if portType == constant.FlagPrometheusPort {
return config2.Config.Prometheus.FriendPrometheusPort[0] return a.config.Prometheus.FriendPrometheusPort[0]
} }
case RpcGroupServer: case RpcGroupServer:
if portType == constant.FlagPort { if portType == constant.FlagPort {
return config2.Config.RpcPort.OpenImGroupPort[0] return a.config.RpcPort.OpenImGroupPort[0]
} }
if portType == constant.FlagPrometheusPort { if portType == constant.FlagPrometheusPort {
return config2.Config.Prometheus.GroupPrometheusPort[0] return a.config.Prometheus.GroupPrometheusPort[0]
} }
case RpcMsgServer: case RpcMsgServer:
if portType == constant.FlagPort { if portType == constant.FlagPort {
return config2.Config.RpcPort.OpenImMessagePort[0] return a.config.RpcPort.OpenImMessagePort[0]
} }
if portType == constant.FlagPrometheusPort { if portType == constant.FlagPrometheusPort {
return config2.Config.Prometheus.MessagePrometheusPort[0] return a.config.Prometheus.MessagePrometheusPort[0]
} }
case RpcThirdServer: case RpcThirdServer:
if portType == constant.FlagPort { if portType == constant.FlagPort {
return config2.Config.RpcPort.OpenImThirdPort[0] return a.config.RpcPort.OpenImThirdPort[0]
} }
if portType == constant.FlagPrometheusPort { if portType == constant.FlagPrometheusPort {
return config2.Config.Prometheus.ThirdPrometheusPort[0] return a.config.Prometheus.ThirdPrometheusPort[0]
} }
case RpcUserServer: case RpcUserServer:
if portType == constant.FlagPort { if portType == constant.FlagPort {
return config2.Config.RpcPort.OpenImUserPort[0] return a.config.RpcPort.OpenImUserPort[0]
} }
if portType == constant.FlagPrometheusPort { if portType == constant.FlagPrometheusPort {
return config2.Config.Prometheus.UserPrometheusPort[0] return a.config.Prometheus.UserPrometheusPort[0]
} }
} }
return 0 return 0
} }
func (a *RpcCmd) GetRpcRegisterNameFromConfig() (string, error) {
switch a.Name {
case RpcPushServer:
return a.config.RpcRegisterName.OpenImPushName, nil
case RpcAuthServer:
return a.config.RpcRegisterName.OpenImAuthName, nil
case RpcConversationServer:
return a.config.RpcRegisterName.OpenImConversationName, nil
case RpcFriendServer:
return a.config.RpcRegisterName.OpenImFriendName, nil
case RpcGroupServer:
return a.config.RpcRegisterName.OpenImGroupName, nil
case RpcMsgServer:
return a.config.RpcRegisterName.OpenImMsgName, nil
case RpcThirdServer:
return a.config.RpcRegisterName.OpenImThirdName, nil
case RpcUserServer:
return a.config.RpcRegisterName.OpenImUserName, nil
}
return "", errs.Wrap(errors.New("can not get rpc register name"), a.Name)
}

@ -21,7 +21,7 @@ import (
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
) )
var Config configStruct var Config GlobalConfig
const ConfKey = "conf" const ConfKey = "conf"
@ -57,7 +57,7 @@ type MYSQL struct {
SlowThreshold int `yaml:"slowThreshold"` SlowThreshold int `yaml:"slowThreshold"`
} }
type configStruct struct { type GlobalConfig struct {
Envs struct { Envs struct {
Discovery string `yaml:"discovery"` Discovery string `yaml:"discovery"`
} }
@ -331,6 +331,10 @@ type configStruct struct {
Notification notification `yaml:"notification"` Notification notification `yaml:"notification"`
} }
func NewGlobalConfig() *GlobalConfig {
return &GlobalConfig{}
}
type notification struct { type notification struct {
GroupCreated NotificationConf `yaml:"groupCreated"` GroupCreated NotificationConf `yaml:"groupCreated"`
GroupInfoSet NotificationConf `yaml:"groupInfoSet"` GroupInfoSet NotificationConf `yaml:"groupInfoSet"`
@ -370,7 +374,7 @@ type notification struct {
ConversationSetPrivate NotificationConf `yaml:"conversationSetPrivate"` ConversationSetPrivate NotificationConf `yaml:"conversationSetPrivate"`
} }
func (c *configStruct) GetServiceNames() []string { func (c *GlobalConfig) GetServiceNames() []string {
return []string{ return []string{
c.RpcRegisterName.OpenImUserName, c.RpcRegisterName.OpenImUserName,
c.RpcRegisterName.OpenImFriendName, c.RpcRegisterName.OpenImFriendName,
@ -384,7 +388,7 @@ func (c *configStruct) GetServiceNames() []string {
} }
} }
func (c *configStruct) RegisterConf2Registry(registry discoveryregistry.SvcDiscoveryRegistry) error { func (c *GlobalConfig) RegisterConf2Registry(registry discoveryregistry.SvcDiscoveryRegistry) error {
data, err := yaml.Marshal(c) data, err := yaml.Marshal(c)
if err != nil { if err != nil {
return err return err
@ -392,11 +396,11 @@ func (c *configStruct) RegisterConf2Registry(registry discoveryregistry.SvcDisco
return registry.RegisterConf2Registry(ConfKey, data) return registry.RegisterConf2Registry(ConfKey, data)
} }
func (c *configStruct) GetConfFromRegistry(registry discoveryregistry.SvcDiscoveryRegistry) ([]byte, error) { func (c *GlobalConfig) GetConfFromRegistry(registry discoveryregistry.SvcDiscoveryRegistry) ([]byte, error) {
return registry.GetConfFromRegistry(ConfKey) return registry.GetConfFromRegistry(ConfKey)
} }
func (c *configStruct) EncodeConfig() []byte { func (c *GlobalConfig) EncodeConfig() []byte {
buf := bytes.NewBuffer(nil) buf := bytes.NewBuffer(nil)
if err := yaml.NewEncoder(buf).Encode(c); err != nil { if err := yaml.NewEncoder(buf).Encode(c); err != nil {
panic(err) panic(err)

@ -106,7 +106,7 @@ func initConfig(config any, configName, configFolderPath string) error {
return nil return nil
} }
func InitConfig(configFolderPath string) error { func InitConfig(config *GlobalConfig, configFolderPath string) error {
if configFolderPath == "" { if configFolderPath == "" {
envConfigPath := os.Getenv("OPENIMCONFIG") envConfigPath := os.Getenv("OPENIMCONFIG")
if envConfigPath != "" { if envConfigPath != "" {
@ -116,9 +116,9 @@ func InitConfig(configFolderPath string) error {
} }
} }
if err := initConfig(&Config, FileName, configFolderPath); err != nil { if err := initConfig(config, FileName, configFolderPath); err != nil {
return err return err
} }
return initConfig(&Config.Notification, NotificationFileName, configFolderPath) return initConfig(config.Notification, NotificationFileName, configFolderPath)
} }

@ -18,6 +18,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"net" "net"
"net/http" "net/http"
"os" "os"
@ -34,7 +35,7 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
@ -53,36 +54,37 @@ func Start(
rpcPort int, rpcPort int,
rpcRegisterName string, rpcRegisterName string,
prometheusPort int, prometheusPort int,
rpcFn func(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error, config *config2.GlobalConfig,
rpcFn func(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error,
options ...grpc.ServerOption, options ...grpc.ServerOption,
) error { ) error {
fmt.Printf("start %s server, port: %d, prometheusPort: %d, OpenIM version: %s\n", fmt.Printf("start %s server, port: %d, prometheusPort: %d, OpenIM version: %s\n",
rpcRegisterName, rpcPort, prometheusPort, config.Version) rpcRegisterName, rpcPort, prometheusPort, config2.Version)
rpcTcpAddr := net.JoinHostPort(network.GetListenIP(config.Config.Rpc.ListenIP), strconv.Itoa(rpcPort)) rpcTcpAddr := net.JoinHostPort(network.GetListenIP(config.Rpc.ListenIP), strconv.Itoa(rpcPort))
listener, err := net.Listen( listener, err := net.Listen(
"tcp", "tcp",
rpcTcpAddr, rpcTcpAddr,
) )
if err != nil { if err != nil {
return errs.Wrap(err, "rpc start err", rpcTcpAddr) return errs.Wrap(err, "listen err", rpcTcpAddr)
} }
defer listener.Close() defer listener.Close()
client, err := kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery) client, err := kdisc.NewDiscoveryRegister(config.Envs.Discovery)
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
defer client.Close() defer client.Close()
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
registerIP, err := network.GetRpcRegisterIP(config.Config.Rpc.RegisterIP) registerIP, err := network.GetRpcRegisterIP(config.Rpc.RegisterIP)
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
var reg *prometheus.Registry var reg *prometheus.Registry
var metric *grpcprometheus.ServerMetrics var metric *grpcprometheus.ServerMetrics
if config.Config.Prometheus.Enable { if config.Prometheus.Enable {
cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName) cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName)
reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics) reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics)
options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()), options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()),
@ -97,7 +99,7 @@ func Start(
once.Do(srv.GracefulStop) once.Do(srv.GracefulStop)
}() }()
err = rpcFn(client, srv) err = rpcFn(config, client, srv)
if err != nil { if err != nil {
return err return err
} }
@ -117,7 +119,7 @@ func Start(
httpServer *http.Server httpServer *http.Server
) )
go func() { go func() {
if config.Config.Prometheus.Enable && prometheusPort != 0 { if config.Prometheus.Enable && prometheusPort != 0 {
metric.InitializeMetrics(srv) metric.InitializeMetrics(srv)
// Create a HTTP server for prometheus. // Create a HTTP server for prometheus.
httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)} httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)}

Loading…
Cancel
Save