refactor: all module update.

pull/2148/head
Gordon 1 year ago
parent 4956e21807
commit bc0809321d

@ -17,7 +17,7 @@ package api
import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/network"
@ -53,7 +53,17 @@ import (
"google.golang.org/grpc/credentials/insecure"
)
func Start(ctx context.Context, index int, config *cmd.ApiConfig) error {
type Config struct {
RpcConfig config.API
RedisConfig config.Redis
MongodbConfig config.Mongo
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
MinioConfig config.Minio
}
func Start(ctx context.Context, index int, config *Config) error {
apiPort, err := datautil.GetElemByIndex(config.RpcConfig.Api.Ports, index)
if err != nil {
return err
@ -128,7 +138,7 @@ func Start(ctx context.Context, index int, config *cmd.ApiConfig) error {
return nil
}
func newGinRouter(disCov discovery.SvcDiscoveryRegistry, rdb redis.UniversalClient, config *cmd.ApiConfig) *gin.Engine {
func newGinRouter(disCov discovery.SvcDiscoveryRegistry, rdb redis.UniversalClient, config *Config) *gin.Engine {
disCov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
gin.SetMode(gin.ReleaseMode)
@ -311,7 +321,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, rdb redis.UniversalClie
return r
}
func GinParseToken(rdb redis.UniversalClient, config *cmd.ApiConfig) gin.HandlerFunc {
func GinParseToken(rdb redis.UniversalClient, config *Config) gin.HandlerFunc {
//todo TokenPolicy
dataBase := controller.NewAuthDatabase(
cache.NewTokenCacheModel(rdb),
@ -371,8 +381,9 @@ func GinParseToken(rdb redis.UniversalClient, config *cmd.ApiConfig) gin.Handler
}
// // handleGinError logs and returns an error response through Gin context.
// func handleGinError(c *gin.Context, logMessage string, errType errs.CodeError, detail string) {
// wrappedErr := errType.Wrap(detail)
// apiresp.GinError(c, wrappedErr)
// c.Abort()
// }
//
// func handleGinError(c *gin.Context, logMessage string, errType errs.CodeError, detail string) {
// wrappedErr := errType.Wrap(detail)
// apiresp.GinError(c, wrappedErr)
// c.Abort()
// }

@ -16,7 +16,6 @@ package msggateway
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
@ -32,7 +31,7 @@ import (
"google.golang.org/grpc"
)
func (s *Server) InitServer(ctx context.Context, config *cmd.MsgGatewayConfig, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
func (s *Server) InitServer(ctx context.Context, config *Config, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
if err != nil {
return err
@ -45,7 +44,7 @@ func (s *Server) InitServer(ctx context.Context, config *cmd.MsgGatewayConfig, d
return nil
}
func (s *Server) Start(ctx context.Context, index int, conf *cmd.MsgGatewayConfig) error {
func (s *Server) Start(ctx context.Context, index int, conf *Config) error {
return startrpc.Start(ctx, &conf.ZookeeperConfig, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP,
conf.MsgGateway.RPC.RegisterIP,
conf.MsgGateway.RPC.Ports, index,
@ -59,7 +58,7 @@ type Server struct {
rpcPort int
prometheusPort int
LongConnServer LongConnServer
config *cmd.MsgGatewayConfig
config *Config
pushTerminal map[int]struct{}
}
@ -67,7 +66,7 @@ func (s *Server) SetLongConnServer(LongConnServer LongConnServer) {
s.LongConnServer = LongConnServer
}
func NewServer(rpcPort int, proPort int, longConnServer LongConnServer, conf *cmd.MsgGatewayConfig) *Server {
func NewServer(rpcPort int, proPort int, longConnServer LongConnServer, conf *Config) *Server {
s := &Server{
rpcPort: rpcPort,
prometheusPort: proPort,

@ -16,15 +16,23 @@ package msggateway
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/utils/datautil"
"time"
"github.com/openimsdk/tools/log"
)
type Config struct {
MsgGateway config.MsgGateway
RedisConfig config.Redis
ZookeeperConfig config.ZooKeeper
Share config.Share
WebhooksConfig config.Webhooks
}
// Start run ws server.
func Start(ctx context.Context, index int, conf *cmd.MsgGatewayConfig) error {
func Start(ctx context.Context, index int, conf *Config) error {
log.CInfo(ctx, "MSG-GATEWAY server is initializing", "rpcPorts", conf.MsgGateway.RPC.Ports,
"wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports)
wsPort, err := datautil.GetElemByIndex(conf.MsgGateway.LongConnSvr.Ports, index)

@ -18,7 +18,6 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"net/http"
"strconv"
"sync"
@ -49,7 +48,7 @@ type LongConnServer interface {
GetUserPlatformCons(userID string, platform int) ([]*Client, bool, bool)
Validate(s any) error
SetCacheHandler(cache cache.TokenModel)
SetDiscoveryRegistry(client discovery.SvcDiscoveryRegistry, config *cmd.MsgGatewayConfig)
SetDiscoveryRegistry(client discovery.SvcDiscoveryRegistry, config *Config)
KickUserConn(client *Client) error
UnRegister(c *Client)
SetKickHandlerInfo(i *kickHandler)
@ -59,7 +58,7 @@ type LongConnServer interface {
}
type WsServer struct {
msgGatewayConfig *cmd.MsgGatewayConfig
msgGatewayConfig *Config
port int
wsMaxConnNum int64
registerChan chan *Client
@ -86,7 +85,7 @@ type kickHandler struct {
newClient *Client
}
func (ws *WsServer) SetDiscoveryRegistry(disCov discovery.SvcDiscoveryRegistry, config *cmd.MsgGatewayConfig) {
func (ws *WsServer) SetDiscoveryRegistry(disCov discovery.SvcDiscoveryRegistry, config *Config) {
ws.MessageHandler = NewGrpcHandler(ws.validate, disCov, &config.Share.RpcRegisterName)
u := rpcclient.NewUserRpcClient(disCov, config.Share.RpcRegisterName.User, &config.Share.IMAdmin)
ws.userClient = &u
@ -132,7 +131,7 @@ func (ws *WsServer) GetUserPlatformCons(userID string, platform int) ([]*Client,
return ws.clients.Get(userID, platform)
}
func NewWsServer(msgGatewayConfig *cmd.MsgGatewayConfig, opts ...Option) (*WsServer, error) {
func NewWsServer(msgGatewayConfig *Config, opts ...Option) (*WsServer, error) {
var config configs
for _, o := range opts {
o(&config)

@ -17,7 +17,6 @@ package msgtransfer
import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/utils/datautil"
@ -56,7 +55,17 @@ type MsgTransfer struct {
cancel context.CancelFunc
}
func Start(ctx context.Context, index int, config *cmd.MsgTransferConfig) error {
type Config struct {
MsgTransfer config.MsgTransfer
RedisConfig config.Redis
MongodbConfig config.Mongo
KafkaConfig config.Kafka
ZookeeperConfig config.ZooKeeper
Share config.Share
WebhooksConfig config.Webhooks
}
func Start(ctx context.Context, index int, config *Config) error {
log.CInfo(ctx, "MSG-TRANSFER server is initializing", "prometheusPorts", config.MsgTransfer.Prometheus.Ports, "index", index)
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
if err != nil {
@ -112,7 +121,7 @@ func NewMsgTransfer(kafkaConf *config.Kafka, msgDatabase controller.CommonMsgDat
}, nil
}
func (m *MsgTransfer) Start(index int, config *cmd.MsgTransferConfig) error {
func (m *MsgTransfer) Start(index int, config *Config) error {
prometheusPort, err := datautil.GetElemByIndex(config.MsgTransfer.Prometheus.Ports, index)
if err != nil {
return err

@ -16,7 +16,7 @@ package push
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
@ -35,7 +35,18 @@ type pushServer struct {
pusher *Pusher
}
func Start(ctx context.Context, config *cmd.PushConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
type Config struct {
RpcConfig config.Push
RedisConfig config.Redis
MongodbConfig config.Mongo
KafkaConfig config.Kafka
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
WebhooksConfig config.Webhooks
}
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
if err != nil {
return err

@ -17,7 +17,6 @@ package push
import (
"context"
"encoding/json"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/tools/errs"
"sync"
@ -49,7 +48,7 @@ import (
)
type Pusher struct {
config *cmd.PushConfig
config *Config
database controller.PushDatabase
discov discovery.SvcDiscoveryRegistry
offlinePusher offlinepush.OfflinePusher
@ -62,7 +61,7 @@ type Pusher struct {
var errNoOfflinePusher = errs.New("no offlinePusher is configured")
func NewPusher(config *cmd.PushConfig, discov discovery.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase,
func NewPusher(config *Config, discov discovery.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase,
groupLocalCache *rpccache.GroupLocalCache, conversationLocalCache *rpccache.ConversationLocalCache,
conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, msgRpcClient *rpcclient.MessageRpcClient,
) *Pusher {

@ -16,7 +16,7 @@ package auth
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
@ -40,10 +40,17 @@ type authServer struct {
authDatabase controller.AuthDatabase
userRpcClient *rpcclient.UserRpcClient
RegisterCenter discovery.SvcDiscoveryRegistry
config *cmd.AuthConfig
config *Config
}
func Start(ctx context.Context, config *cmd.AuthConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
type Config struct {
RpcConfig config.Auth
RedisConfig config.Redis
ZookeeperConfig config.ZooKeeper
Share config.Share
}
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
if err != nil {
return err

@ -16,7 +16,7 @@ package conversation
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/db/redisutil"
"sort"
@ -45,10 +45,19 @@ type conversationServer struct {
groupRpcClient *rpcclient.GroupRpcClient
conversationDatabase controller.ConversationDatabase
conversationNotificationSender *notification.ConversationNotificationSender
config *cmd.ConversationConfig
config *Config
}
func Start(ctx context.Context, config *cmd.ConversationConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
type Config struct {
RpcConfig config.Conversation
RedisConfig config.Redis
MongodbConfig config.Mongo
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
}
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
if err != nil {
return err

@ -16,7 +16,7 @@ package friend
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
@ -45,10 +45,20 @@ type friendServer struct {
notificationSender *notification.FriendNotificationSender
conversationRpcClient rpcclient.ConversationRpcClient
RegisterCenter discovery.SvcDiscoveryRegistry
config *cmd.FriendConfig
config *Config
}
func Start(ctx context.Context, config *cmd.FriendConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
type Config struct {
RpcConfig config.Friend
RedisConfig config.Redis
MongodbConfig config.Mongo
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
WebhooksConfig config.Webhooks
}
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
if err != nil {
return err

@ -17,7 +17,7 @@ package group
import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"math/big"
"math/rand"
"strconv"
@ -58,10 +58,20 @@ type groupServer struct {
notification *notification.GroupNotificationSender
conversationRpcClient rpcclient.ConversationRpcClient
msgRpcClient rpcclient.MessageRpcClient
config *cmd.GroupConfig
config *Config
}
func Start(ctx context.Context, config *cmd.GroupConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
type Config struct {
RpcConfig config.Group
RedisConfig config.Redis
MongodbConfig config.Mongo
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
WebhooksConfig config.Webhooks
}
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
if err != nil {
return err

@ -16,7 +16,6 @@ package msg
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/protocol/constant"
@ -24,9 +23,9 @@ import (
"github.com/openimsdk/protocol/sdkws"
)
type MessageInterceptorFunc func(ctx context.Context, globalConfig *cmd.MsgConfig, req *msg.SendMsgReq) (*sdkws.MsgData, error)
type MessageInterceptorFunc func(ctx context.Context, globalConfig *Config, req *msg.SendMsgReq) (*sdkws.MsgData, error)
func MessageHasReadEnabled(ctx context.Context, config *cmd.MsgConfig, req *msg.SendMsgReq) (*sdkws.MsgData, error) {
func MessageHasReadEnabled(ctx context.Context, config *Config, req *msg.SendMsgReq) (*sdkws.MsgData, error) {
switch {
case req.MsgData.ContentType == constant.HasReadReceipt && req.MsgData.SessionType == constant.SingleChatType:
if !config.RpcConfig.SingleMessageHasReadReceiptEnable {

@ -16,7 +16,7 @@ package msg
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
@ -47,7 +47,18 @@ type (
ConversationLocalCache *rpccache.ConversationLocalCache // Local cache for conversation data.
Handlers MessageInterceptorChain // Chain of handlers for processing messages.
notificationSender *rpcclient.NotificationSender // RPC client for sending notifications.
config *cmd.MsgConfig // Global configuration settings.
config *Config // Global configuration settings.
}
Config struct {
RpcConfig config.Msg
RedisConfig config.Redis
MongodbConfig config.Mongo
KafkaConfig config.Kafka
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
WebhooksConfig config.Webhooks
}
)
@ -56,7 +67,7 @@ func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorF
}
func Start(ctx context.Context, config *cmd.MsgConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
if err != nil {
return err

@ -15,7 +15,6 @@
package msg
import (
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs"
@ -23,7 +22,7 @@ import (
"go.mongodb.org/mongo-driver/mongo"
)
func isMessageHasReadEnabled(msgData *sdkws.MsgData, config *cmd.MsgConfig) bool {
func isMessageHasReadEnabled(msgData *sdkws.MsgData, config *Config) bool {
switch {
case msgData.ContentType == constant.HasReadReceipt && msgData.SessionType == constant.SingleChatType:
if config.RpcConfig.SingleMessageHasReadReceiptEnable {

@ -17,7 +17,7 @@ package third
import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"net/url"
"time"
@ -43,10 +43,19 @@ type thirdServer struct {
s3dataBase controller.S3Database
userRpcClient rpcclient.UserRpcClient
defaultExpire time.Duration
config *cmd.ThirdConfig
config *Config
}
type Config struct {
RpcConfig config.Third
RedisConfig config.Redis
MongodbConfig config.Mongo
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
MinioConfig config.Minio
}
func Start(ctx context.Context, config *cmd.ThirdConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
if err != nil {
return err
@ -80,11 +89,11 @@ func Start(ctx context.Context, config *cmd.ThirdConfig, client discovery.SvcDis
var o s3.Interface
switch enable {
case "minio":
o, err = minio.NewMinio(cache.NewMinioCache(rdb), minio.Config(config.Object.Minio))
o, err = minio.NewMinio(cache.NewMinioCache(rdb), *config.MinioConfig.Build())
case "cos":
o, err = cos.NewCos(cos.Config(config.Object.Cos))
o, err = cos.NewCos(*config.RpcConfig.Object.Cos.Build())
case "oss":
o, err = oss.NewOSS(oss.Config(config.Object.Oss))
o, err = oss.NewOSS(*config.RpcConfig.Object.Oss.Build())
default:
err = fmt.Errorf("invalid object enable: %s", enable)
}
@ -93,8 +102,8 @@ func Start(ctx context.Context, config *cmd.ThirdConfig, client discovery.SvcDis
}
third.RegisterThirdServer(server, &thirdServer{
apiURL: apiURL,
thirdDatabase: controller.NewThirdDatabase(cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis), logdb),
userRpcClient: rpcclient.NewUserRpcClient(client, config.RpcRegisterName.OpenImUserName, &config.Manager, &config.IMAdmin),
thirdDatabase: controller.NewThirdDatabase(cache.NewThirdCache(rdb), logdb),
userRpcClient: rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, &config.Share.IMAdmin),
s3dataBase: controller.NewS3Database(rdb, o, s3db),
defaultExpire: time.Hour * 24 * 7,
config: config,

@ -16,7 +16,7 @@ package user
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/db/redisutil"
"math/rand"
"strings"
@ -51,10 +51,21 @@ type userServer struct {
friendRpcClient *rpcclient.FriendRpcClient
groupRpcClient *rpcclient.GroupRpcClient
RegisterCenter registry.SvcDiscoveryRegistry
config *cmd.UserConfig
config *Config
}
func Start(ctx context.Context, config *cmd.UserConfig, client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
type Config struct {
RpcConfig config.User
RedisConfig config.Redis
MongodbConfig config.Mongo
KafkaConfig config.Kafka
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
WebhooksConfig config.Webhooks
}
func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
if err != nil {
return err

@ -16,7 +16,7 @@ package tools
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/db/redisutil"
"os"
"os/signal"
@ -29,7 +29,16 @@ import (
"github.com/robfig/cron/v3"
)
func Start(ctx context.Context, config *cmd.CronTaskConfig) error {
type CronTaskConfig struct {
CronTask config.CronTask
RedisConfig config.Redis
MongodbConfig config.Mongo
ZookeeperConfig config.ZooKeeper
Share config.Share
KafkaConfig config.Kafka
}
func Start(ctx context.Context, config *CronTaskConfig) error {
log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime",
config.CronTask.ChatRecordsClearTime, "msgDestructTime", config.CronTask.MsgDestructTime)
@ -94,7 +103,7 @@ func netlock(rdb redis.UniversalClient, key string, ttl time.Duration) bool {
return ok
}
func cronWrapFunc(config *cmd.CronTaskConfig, rdb redis.UniversalClient, key string, fn func()) func() {
func cronWrapFunc(config *CronTaskConfig, rdb redis.UniversalClient, key string, fn func()) func() {
enableCronLocker := config.CronTask.EnableCronLocker
return func() {
// if don't enable cron-locker, call fn directly.

@ -17,7 +17,6 @@ package tools
import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/tools/db/redisutil"
"math"
"math/rand"
@ -47,12 +46,12 @@ type MsgTool struct {
userDatabase controller.UserDatabase
groupDatabase controller.GroupDatabase
msgNotificationSender *notification.MsgNotificationSender
config *cmd.CronTaskConfig
config *CronTaskConfig
}
func NewMsgTool(msgDatabase controller.CommonMsgDatabase, userDatabase controller.UserDatabase,
groupDatabase controller.GroupDatabase, conversationDatabase controller.ConversationDatabase,
msgNotificationSender *notification.MsgNotificationSender, config *cmd.CronTaskConfig,
msgNotificationSender *notification.MsgNotificationSender, config *CronTaskConfig,
) *MsgTool {
return &MsgTool{
msgDatabase: msgDatabase,
@ -64,7 +63,7 @@ func NewMsgTool(msgDatabase controller.CommonMsgDatabase, userDatabase controlle
}
}
func InitMsgTool(ctx context.Context, config *cmd.CronTaskConfig) (*MsgTool, error) {
func InitMsgTool(ctx context.Context, config *CronTaskConfig) (*MsgTool, error) {
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
if err != nil {
return nil, err

@ -26,20 +26,11 @@ type ApiCmd struct {
*RootCmd
ctx context.Context
configMap map[string]StructEnvPrefix
apiConfig ApiConfig
}
type ApiConfig struct {
RpcConfig config.API
RedisConfig config.Redis
MongodbConfig config.Mongo
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
MinioConfig config.Minio
apiConfig api.Config
}
func NewApiCmd() *ApiCmd {
var apiConfig ApiConfig
var apiConfig api.Config
ret := &ApiCmd{apiConfig: apiConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMAPICfgFileName: {EnvPrefix: apiEnvPrefix, ConfigStruct: &apiConfig.RpcConfig},

@ -27,17 +27,11 @@ type AuthRpcCmd struct {
*RootCmd
ctx context.Context
configMap map[string]StructEnvPrefix
authConfig AuthConfig
}
type AuthConfig struct {
RpcConfig config.Auth
RedisConfig config.Redis
ZookeeperConfig config.ZooKeeper
Share config.Share
authConfig auth.Config
}
func NewAuthRpcCmd() *AuthRpcCmd {
var authConfig AuthConfig
var authConfig auth.Config
ret := &AuthRpcCmd{authConfig: authConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMRPCAuthCfgFileName: {EnvPrefix: authEnvPrefix, ConfigStruct: &authConfig.RpcConfig},

@ -27,19 +27,11 @@ type ConversationRpcCmd struct {
*RootCmd
ctx context.Context
configMap map[string]StructEnvPrefix
conversationConfig ConversationConfig
}
type ConversationConfig struct {
RpcConfig config.Conversation
RedisConfig config.Redis
MongodbConfig config.Mongo
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
conversationConfig conversation.Config
}
func NewConversationRpcCmd() *ConversationRpcCmd {
var conversationConfig ConversationConfig
var conversationConfig conversation.Config
ret := &ConversationRpcCmd{conversationConfig: conversationConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMRPCConversationCfgFileName: {EnvPrefix: conversationEnvPrefix, ConfigStruct: &conversationConfig.RpcConfig},

@ -26,19 +26,11 @@ type CronTaskCmd struct {
*RootCmd
ctx context.Context
configMap map[string]StructEnvPrefix
cronTaskConfig CronTaskConfig
}
type CronTaskConfig struct {
CronTask config.CronTask
RedisConfig config.Redis
MongodbConfig config.Mongo
ZookeeperConfig config.ZooKeeper
Share config.Share
KafkaConfig config.Kafka
cronTaskConfig tools.CronTaskConfig
}
func NewCronTaskCmd() *CronTaskCmd {
var cronTaskConfig CronTaskConfig
var cronTaskConfig tools.CronTaskConfig
ret := &CronTaskCmd{cronTaskConfig: cronTaskConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMCronTaskCfgFileName: {EnvPrefix: cornTaskEnvPrefix, ConfigStruct: &cronTaskConfig.CronTask},

@ -27,20 +27,11 @@ type FriendRpcCmd struct {
*RootCmd
ctx context.Context
configMap map[string]StructEnvPrefix
friendConfig FriendConfig
}
type FriendConfig struct {
RpcConfig config.Friend
RedisConfig config.Redis
MongodbConfig config.Mongo
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
WebhooksConfig config.Webhooks
friendConfig friend.Config
}
func NewFriendRpcCmd() *FriendRpcCmd {
var friendConfig FriendConfig
var friendConfig friend.Config
ret := &FriendRpcCmd{friendConfig: friendConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMRPCFriendCfgFileName: {EnvPrefix: friendEnvPrefix, ConfigStruct: &friendConfig.RpcConfig},

@ -27,20 +27,11 @@ type GroupRpcCmd struct {
*RootCmd
ctx context.Context
configMap map[string]StructEnvPrefix
groupConfig GroupConfig
}
type GroupConfig struct {
RpcConfig config.Group
RedisConfig config.Redis
MongodbConfig config.Mongo
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
WebhooksConfig config.Webhooks
groupConfig group.Config
}
func NewGroupRpcCmd() *GroupRpcCmd {
var groupConfig GroupConfig
var groupConfig group.Config
ret := &GroupRpcCmd{groupConfig: groupConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMRPCGroupCfgFileName: {EnvPrefix: groupEnvPrefix, ConfigStruct: &groupConfig.RpcConfig},

@ -27,21 +27,11 @@ type MsgRpcCmd struct {
*RootCmd
ctx context.Context
configMap map[string]StructEnvPrefix
msgConfig MsgConfig
}
type MsgConfig struct {
RpcConfig config.Msg
RedisConfig config.Redis
MongodbConfig config.Mongo
KafkaConfig config.Kafka
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
WebhooksConfig config.Webhooks
msgConfig msg.Config
}
func NewMsgRpcCmd() *MsgRpcCmd {
var msgConfig MsgConfig
var msgConfig msg.Config
ret := &MsgRpcCmd{msgConfig: msgConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMRPCMsgCfgFileName: {EnvPrefix: msgEnvPrefix, ConfigStruct: &msgConfig.RpcConfig},

@ -28,18 +28,11 @@ type MsgGatewayCmd struct {
*RootCmd
ctx context.Context
configMap map[string]StructEnvPrefix
msgGatewayConfig MsgGatewayConfig
}
type MsgGatewayConfig struct {
MsgGateway config.MsgGateway
RedisConfig config.Redis
ZookeeperConfig config.ZooKeeper
Share config.Share
WebhooksConfig config.Webhooks
msgGatewayConfig msggateway.Config
}
func NewMsgGatewayCmd() *MsgGatewayCmd {
var msgGatewayConfig MsgGatewayConfig
var msgGatewayConfig msggateway.Config
ret := &MsgGatewayCmd{msgGatewayConfig: msgGatewayConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMMsgGatewayCfgFileName: {EnvPrefix: msgGatewayEnvPrefix, ConfigStruct: &msgGatewayConfig.MsgGateway},

@ -26,20 +26,11 @@ type MsgTransferCmd struct {
*RootCmd
ctx context.Context
configMap map[string]StructEnvPrefix
msgTransferConfig MsgTransferConfig
}
type MsgTransferConfig struct {
MsgTransfer config.MsgTransfer
RedisConfig config.Redis
MongodbConfig config.Mongo
KafkaConfig config.Kafka
ZookeeperConfig config.ZooKeeper
Share config.Share
WebhooksConfig config.Webhooks
msgTransferConfig msgtransfer.Config
}
func NewMsgTransferCmd() *MsgTransferCmd {
var msgTransferConfig MsgTransferConfig
var msgTransferConfig msgtransfer.Config
ret := &MsgTransferCmd{msgTransferConfig: msgTransferConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMMsgTransferCfgFileName: {EnvPrefix: msgTransferEnvPrefix, ConfigStruct: &msgTransferConfig.MsgTransfer},

@ -27,21 +27,11 @@ type PushRpcCmd struct {
*RootCmd
ctx context.Context
configMap map[string]StructEnvPrefix
pushConfig PushConfig
}
type PushConfig struct {
RpcConfig config.Push
RedisConfig config.Redis
MongodbConfig config.Mongo
KafkaConfig config.Kafka
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
WebhooksConfig config.Webhooks
pushConfig push.Config
}
func NewPushRpcCmd() *PushRpcCmd {
var pushConfig PushConfig
var pushConfig push.Config
ret := &PushRpcCmd{pushConfig: pushConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMPushCfgFileName: {EnvPrefix: pushEnvPrefix, ConfigStruct: &pushConfig.RpcConfig},

@ -27,20 +27,11 @@ type ThirdRpcCmd struct {
*RootCmd
ctx context.Context
configMap map[string]StructEnvPrefix
thirdConfig ThirdConfig
}
type ThirdConfig struct {
RpcConfig config.Third
RedisConfig config.Redis
MongodbConfig config.Mongo
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
MinioConfig config.Minio
thirdConfig third.Config
}
func NewThirdRpcCmd() *ThirdRpcCmd {
var thirdConfig ThirdConfig
var thirdConfig third.Config
ret := &ThirdRpcCmd{thirdConfig: thirdConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMRPCThirdCfgFileName: {EnvPrefix: thridEnvPrefix, ConfigStruct: &thirdConfig.RpcConfig},

@ -27,21 +27,11 @@ type UserRpcCmd struct {
*RootCmd
ctx context.Context
configMap map[string]StructEnvPrefix
userConfig UserConfig
}
type UserConfig struct {
RpcConfig config.User
RedisConfig config.Redis
MongodbConfig config.Mongo
KafkaConfig config.Kafka
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
WebhooksConfig config.Webhooks
userConfig user.Config
}
func NewUserRpcCmd() *UserRpcCmd {
var userConfig UserConfig
var userConfig user.Config
ret := &UserRpcCmd{userConfig: userConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMRPCUserCfgFileName: {EnvPrefix: userEnvPrefix, ConfigStruct: &userConfig.RpcConfig},

@ -15,10 +15,13 @@
package config
import (
"fmt"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/mq/kafka"
"github.com/openimsdk/tools/s3/cos"
"github.com/openimsdk/tools/s3/minio"
"github.com/openimsdk/tools/s3/oss"
"time"
)
@ -277,23 +280,9 @@ type Third struct {
Prometheus Prometheus `mapstructure:"prometheus"`
Object struct {
Enable string `mapstructure:"enable"`
Cos struct {
BucketURL string `mapstructure:"bucketURL"`
SecretID string `mapstructure:"secretID"`
SecretKey string `mapstructure:"secretKey"`
SessionToken string `mapstructure:"sessionToken"`
PublicRead bool `mapstructure:"publicRead"`
} `mapstructure:"cos"`
Oss struct {
Endpoint string `mapstructure:"endpoint"`
Bucket string `mapstructure:"bucket"`
BucketURL string `mapstructure:"bucketURL"`
AccessKeyID string `mapstructure:"accessKeyID"`
AccessKeySecret string `mapstructure:"accessKeySecret"`
SessionToken string `mapstructure:"sessionToken"`
PublicRead bool `mapstructure:"publicRead"`
} `mapstructure:"oss"`
Kodo struct {
Cos Cos `mapstructure:"cos"`
Oss Oss `mapstructure:"oss"`
Kodo struct {
Endpoint string `mapstructure:"endpoint"`
Bucket string `mapstructure:"bucket"`
BucketURL string `mapstructure:"bucketURL"`
@ -312,6 +301,22 @@ type Third struct {
} `mapstructure:"aws"`
} `mapstructure:"object"`
}
type Cos struct {
BucketURL string `mapstructure:"bucketURL"`
SecretID string `mapstructure:"secretID"`
SecretKey string `mapstructure:"secretKey"`
SessionToken string `mapstructure:"sessionToken"`
PublicRead bool `mapstructure:"publicRead"`
}
type Oss struct {
Endpoint string `mapstructure:"endpoint"`
Bucket string `mapstructure:"bucket"`
BucketURL string `mapstructure:"bucketURL"`
AccessKeyID string `mapstructure:"accessKeyID"`
AccessKeySecret string `mapstructure:"accessKeySecret"`
SessionToken string `mapstructure:"sessionToken"`
PublicRead bool `mapstructure:"publicRead"`
}
type User struct {
RPC struct {
@ -474,15 +479,36 @@ func (k *Kafka) Build() *kafka.Config {
func (m *Minio) Build() *minio.Config {
return &minio.Config{
Bucket: m.Bucket,
Endpoint: "",
Endpoint: fmt.Sprintf("http://%s:%d", m.InternalIP, m.Port),
AccessKeyID: m.AccessKeyID,
SecretAccessKey: m.SecretAccessKey,
SessionToken: m.SessionToken,
SignEndpoint: "",
SignEndpoint: fmt.Sprintf("http://%s:%d", m.ExternalIP, m.Port),
PublicRead: m.PublicRead,
}
}
func (c *Cos) Build() *cos.Config {
return &cos.Config{
BucketURL: c.BucketURL,
SecretID: c.SecretID,
SecretKey: c.SecretKey,
SessionToken: c.SessionToken,
PublicRead: c.PublicRead,
}
}
func (o *Oss) Build() *oss.Config {
return &oss.Config{
Endpoint: o.Endpoint,
Bucket: o.Bucket,
BucketURL: o.BucketURL,
AccessKeyID: o.AccessKeyID,
AccessKeySecret: o.AccessKeySecret,
SessionToken: o.SessionToken,
PublicRead: o.PublicRead,
}
}
func (l *CacheConfig) Failed() time.Duration {
return time.Second * time.Duration(l.FailedExpire)

@ -17,7 +17,7 @@ package controller
import (
"context"
"encoding/json"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/internal/tools"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
@ -133,7 +133,7 @@ func NewCommonMsgDatabase(msgDocModel relation.MsgDocModelInterface, msg cache.M
}, nil
}
func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database, config *cmd.CronTaskConfig) (CommonMsgDatabase, error) {
func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database, config *tools.CronTaskConfig) (CommonMsgDatabase, error) {
msgDocModel, err := mgo.NewMsgMongo(database)
if err != nil {
return nil, err

@ -25,7 +25,7 @@ type PushDatabase interface {
}
type pushDataBase struct {
cache cache.MsgModel
cache cache.ThirdCache
}
func NewPushDatabase(cache cache.ThirdCache) PushDatabase {

@ -34,7 +34,7 @@ type ThirdDatabase interface {
}
type thirdDatabase struct {
cache cache.MsgModel
cache cache.ThirdCache
logdb relation.LogInterface
}
@ -58,7 +58,7 @@ func (t *thirdDatabase) UploadLogs(ctx context.Context, logs []*relation.LogMode
return t.logdb.Create(ctx, logs)
}
func NewThirdDatabase(cache cache.MsgModel, logdb relation.LogInterface) ThirdDatabase {
func NewThirdDatabase(cache cache.ThirdCache, logdb relation.LogInterface) ThirdDatabase {
return &thirdDatabase{cache: cache, logdb: logdb}
}

@ -19,39 +19,35 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/direct"
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/discovery/zookeeper"
"github.com/openimsdk/tools/errs"
"time"
)
const (
zookeeper = "zoopkeeper"
kubenetes = "k8s"
directT = "direct"
zookeeperConst = "zoopkeeper"
kubenetesConst = "k8s"
directConst = "direct"
)
// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
func NewDiscoveryRegister(zookeeperConfig *config.ZooKeeper) (discovery.SvcDiscoveryRegistry, error) {
func NewDiscoveryRegister(zookeeperConfig *config.ZooKeeper, env string) (discovery.SvcDiscoveryRegistry, error) {
switch zookeeperConfig.Env {
case "zookeeper":
return zookeeper.NewZookeeperDiscoveryRegister(&config.Zookeeper)
zk, err := zookeeper.NewZkClient(
zookeeperConfig.zkAddr,
schema,
switch env {
case zookeeperConst:
return zookeeper.NewZkClient(
zookeeperConfig.Address,
zookeeperConfig.Schema,
zookeeper.WithFreq(time.Hour),
zookeeper.WithUserNameAndPassword(username, password),
zookeeper.WithUserNameAndPassword(zookeeperConfig.Username, zookeeperConfig.Password),
zookeeper.WithRoundRobin(),
zookeeper.WithTimeout(10),
)
if err != nil {
return nil, err
}
case "k8s":
case kubenetesConst:
return kubernetes.NewK8sDiscoveryRegister(config.RpcRegisterName.OpenImMessageGatewayName)
case "direct":
case directConst:
return direct.NewConnDirect(config)
default:
return nil, errs.New("unsupported discovery type", "type", config.Envs.Discovery).Wrap()
return nil, errs.New("unsupported discovery type", "type", env).Wrap()
}
}

@ -18,7 +18,6 @@ import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
pbconversation "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/errs"
@ -30,7 +29,6 @@ type Conversation struct {
Client pbconversation.ConversationClient
conn grpc.ClientConnInterface
discov discovery.SvcDiscoveryRegistry
Config *config.GlobalConfig
}
func NewConversation(discov discovery.SvcDiscoveryRegistry, rpcRegisterName string) *Conversation {

Loading…
Cancel
Save