Merge branch 'openimsdk:main' into wf/di

pull/3041/head
Monet Lee 9 months ago committed by GitHub
commit 826522e033
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -12,8 +12,8 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.72-alpha.70
github.com/openimsdk/tools v0.0.50-alpha.63
github.com/openimsdk/protocol v0.0.72-alpha.71
github.com/openimsdk/tools v0.0.50-alpha.65
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.9.0

@ -347,10 +347,10 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM=
github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.70 h1:j7vB81+rTthijRda2b8tlli9oWvPxr4yXHwZ8nPZIBQ=
github.com/openimsdk/protocol v0.0.72-alpha.70/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M=
github.com/openimsdk/tools v0.0.50-alpha.63 h1:dPoVvg4KWqYX/xtK3j96TwX2A/4jwT5S5XIHvSM9hTY=
github.com/openimsdk/tools v0.0.50-alpha.63/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk=
github.com/openimsdk/protocol v0.0.72-alpha.71 h1:R3utzOlqepaJWTAmnfJi4ccUM/XIoFasSyjQMOipM70=
github.com/openimsdk/protocol v0.0.72-alpha.71/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
github.com/openimsdk/tools v0.0.50-alpha.65 h1:BRtxkyWxDWPHuHphSwEyHZj7kJSR98am/fHOH84naK8=
github.com/openimsdk/tools v0.0.50-alpha.65/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=

@ -19,22 +19,29 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
)
const (
// wait for Restart http call return
waitHttp = time.Millisecond * 200
)
type ConfigManager struct {
imAdminUserID []string
config *config.AllConfig
client *clientv3.Client
configPath string
runtimeEnv string
configPath string
runtimeEnv string
}
func NewConfigManager(IMAdminUserID []string, cfg *config.AllConfig, client *clientv3.Client, configPath string, runtimeEnv string) *ConfigManager {
return &ConfigManager{
cm := &ConfigManager{
imAdminUserID: IMAdminUserID,
config: cfg,
client: client,
configPath: configPath,
runtimeEnv: runtimeEnv,
}
return cm
}
func (cm *ConfigManager) CheckAdmin(c *gin.Context) {
@ -85,49 +92,49 @@ func (cm *ConfigManager) SetConfig(c *gin.Context) {
var err error
switch req.ConfigName {
case cm.config.Discovery.GetConfigFileName():
err = compareAndSave[config.Discovery](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
err = compareAndSave[config.Discovery](c, cm.config.Name2Config(req.ConfigName), &req, cm)
case cm.config.Kafka.GetConfigFileName():
err = compareAndSave[config.Kafka](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
err = compareAndSave[config.Kafka](c, cm.config.Name2Config(req.ConfigName), &req, cm)
case cm.config.LocalCache.GetConfigFileName():
err = compareAndSave[config.LocalCache](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
err = compareAndSave[config.LocalCache](c, cm.config.Name2Config(req.ConfigName), &req, cm)
case cm.config.Log.GetConfigFileName():
err = compareAndSave[config.Log](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
err = compareAndSave[config.Log](c, cm.config.Name2Config(req.ConfigName), &req, cm)
case cm.config.Minio.GetConfigFileName():
err = compareAndSave[config.Minio](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
err = compareAndSave[config.Minio](c, cm.config.Name2Config(req.ConfigName), &req, cm)
case cm.config.Mongo.GetConfigFileName():
err = compareAndSave[config.Mongo](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
err = compareAndSave[config.Mongo](c, cm.config.Name2Config(req.ConfigName), &req, cm)
case cm.config.Notification.GetConfigFileName():
err = compareAndSave[config.Notification](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
err = compareAndSave[config.Notification](c, cm.config.Name2Config(req.ConfigName), &req, cm)
case cm.config.API.GetConfigFileName():
err = compareAndSave[config.API](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
err = compareAndSave[config.API](c, cm.config.Name2Config(req.ConfigName), &req, cm)
case cm.config.CronTask.GetConfigFileName():
err = compareAndSave[config.CronTask](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
err = compareAndSave[config.CronTask](c, cm.config.Name2Config(req.ConfigName), &req, cm)
case cm.config.MsgGateway.GetConfigFileName():
err = compareAndSave[config.MsgGateway](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
err = compareAndSave[config.MsgGateway](c, cm.config.Name2Config(req.ConfigName), &req, cm)
case cm.config.MsgTransfer.GetConfigFileName():
err = compareAndSave[config.MsgTransfer](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
err = compareAndSave[config.MsgTransfer](c, cm.config.Name2Config(req.ConfigName), &req, cm)
case cm.config.Push.GetConfigFileName():
err = compareAndSave[config.Push](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
err = compareAndSave[config.Push](c, cm.config.Name2Config(req.ConfigName), &req, cm)
case cm.config.Auth.GetConfigFileName():
err = compareAndSave[config.Auth](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
err = compareAndSave[config.Auth](c, cm.config.Name2Config(req.ConfigName), &req, cm)
case cm.config.Conversation.GetConfigFileName():
err = compareAndSave[config.Conversation](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
err = compareAndSave[config.Conversation](c, cm.config.Name2Config(req.ConfigName), &req, cm)
case cm.config.Friend.GetConfigFileName():
err = compareAndSave[config.Friend](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
err = compareAndSave[config.Friend](c, cm.config.Name2Config(req.ConfigName), &req, cm)
case cm.config.Group.GetConfigFileName():
err = compareAndSave[config.Group](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
err = compareAndSave[config.Group](c, cm.config.Name2Config(req.ConfigName), &req, cm)
case cm.config.Msg.GetConfigFileName():
err = compareAndSave[config.Msg](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
err = compareAndSave[config.Msg](c, cm.config.Name2Config(req.ConfigName), &req, cm)
case cm.config.Third.GetConfigFileName():
err = compareAndSave[config.Third](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
err = compareAndSave[config.Third](c, cm.config.Name2Config(req.ConfigName), &req, cm)
case cm.config.User.GetConfigFileName():
err = compareAndSave[config.User](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
err = compareAndSave[config.User](c, cm.config.Name2Config(req.ConfigName), &req, cm)
case cm.config.Redis.GetConfigFileName():
err = compareAndSave[config.Redis](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
err = compareAndSave[config.Redis](c, cm.config.Name2Config(req.ConfigName), &req, cm)
case cm.config.Share.GetConfigFileName():
err = compareAndSave[config.Share](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
err = compareAndSave[config.Share](c, cm.config.Name2Config(req.ConfigName), &req, cm)
case cm.config.Webhooks.GetConfigFileName():
err = compareAndSave[config.Webhooks](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
err = compareAndSave[config.Webhooks](c, cm.config.Name2Config(req.ConfigName), &req, cm)
default:
apiresp.GinError(c, errs.ErrArgs.Wrap())
return
@ -139,7 +146,7 @@ func (cm *ConfigManager) SetConfig(c *gin.Context) {
apiresp.GinSuccess(c, nil)
}
func compareAndSave[T any](c *gin.Context, old any, req *apistruct.SetConfigReq, client *clientv3.Client) error {
func compareAndSave[T any](c *gin.Context, old any, req *apistruct.SetConfigReq, cm *ConfigManager) error {
conf := new(T)
err := json.Unmarshal([]byte(req.Data), &conf)
if err != nil {
@ -153,7 +160,7 @@ func compareAndSave[T any](c *gin.Context, old any, req *apistruct.SetConfigReq,
if err != nil {
return errs.ErrArgs.WithDetail(err.Error()).Wrap()
}
_, err = client.Put(c, etcd.BuildKey(req.ConfigName), string(data))
_, err = cm.client.Put(c, etcd.BuildKey(req.ConfigName), string(data))
if err != nil {
return errs.WrapMsg(err, "save to etcd failed")
}
@ -161,16 +168,19 @@ func compareAndSave[T any](c *gin.Context, old any, req *apistruct.SetConfigReq,
}
func (cm *ConfigManager) ResetConfig(c *gin.Context) {
go cm.resetConfig(c)
go func() {
if err := cm.resetConfig(c, true); err != nil {
log.ZError(c, "reset config err", err)
}
}()
apiresp.GinSuccess(c, nil)
}
func (cm *ConfigManager) resetConfig(c *gin.Context) {
func (cm *ConfigManager) resetConfig(c *gin.Context, checkChange bool, ops ...clientv3.Op) error {
txn := cm.client.Txn(c)
type initConf struct {
old any
new any
isChanged bool
old any
new any
}
configMap := map[string]*initConf{
cm.config.Discovery.GetConfigFileName(): {old: &cm.config.Discovery, new: new(config.Discovery)},
@ -210,13 +220,12 @@ func (cm *ConfigManager) resetConfig(c *gin.Context) {
log.ZError(c, "load config failed", err)
continue
}
v.isChanged = reflect.DeepEqual(v.old, v.new)
if !v.isChanged {
equal := reflect.DeepEqual(v.old, v.new)
if !checkChange || !equal {
changedKeys = append(changedKeys, k)
}
}
ops := make([]clientv3.Op, 0)
for _, k := range changedKeys {
data, err := json.Marshal(configMap[k].new)
if err != nil {
@ -229,10 +238,10 @@ func (cm *ConfigManager) resetConfig(c *gin.Context) {
txn.Then(ops...)
_, err := txn.Commit()
if err != nil {
log.ZError(c, "commit etcd txn failed", err)
return
return errs.WrapMsg(err, "commit etcd txn failed")
}
}
return nil
}
func (cm *ConfigManager) Restart(c *gin.Context) {
@ -241,10 +250,63 @@ func (cm *ConfigManager) Restart(c *gin.Context) {
}
func (cm *ConfigManager) restart(c *gin.Context) {
time.Sleep(time.Millisecond * 200) // wait for Restart http call return
time.Sleep(waitHttp) // wait for Restart http call return
t := time.Now().Unix()
_, err := cm.client.Put(c, etcd.BuildKey(etcd.RestartKey), strconv.Itoa(int(t)))
if err != nil {
log.ZError(c, "restart etcd put key failed", err)
}
}
func (cm *ConfigManager) SetEnableConfigManager(c *gin.Context) {
if cm.config.Discovery.Enable != config.ETCD {
apiresp.GinError(c, errs.New("only etcd support config manager").Wrap())
return
}
var req apistruct.SetEnableConfigManagerReq
if err := c.BindJSON(&req); err != nil {
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
return
}
var enableStr string
if req.Enable {
enableStr = etcd.Enable
} else {
enableStr = etcd.Disable
}
resp, err := cm.client.Get(c, etcd.BuildKey(etcd.EnableConfigCenterKey))
if err != nil {
apiresp.GinError(c, errs.WrapMsg(err, "getEnableConfigManager failed"))
return
}
if !(resp.Count > 0 && string(resp.Kvs[0].Value) == etcd.Enable) && req.Enable {
go func() {
time.Sleep(waitHttp) // wait for Restart http call return
err := cm.resetConfig(c, false, clientv3.OpPut(etcd.BuildKey(etcd.EnableConfigCenterKey), enableStr))
if err != nil {
log.ZError(c, "resetConfig failed", err)
}
}()
} else {
_, err = cm.client.Put(c, etcd.BuildKey(etcd.EnableConfigCenterKey), enableStr)
if err != nil {
apiresp.GinError(c, errs.WrapMsg(err, "setEnableConfigManager failed"))
return
}
}
apiresp.GinSuccess(c, nil)
}
func (cm *ConfigManager) GetEnableConfigManager(c *gin.Context) {
resp, err := cm.client.Get(c, etcd.BuildKey(etcd.EnableConfigCenterKey))
if err != nil {
apiresp.GinError(c, errs.WrapMsg(err, "getEnableConfigManager failed"))
return
}
var enable bool
if resp.Count > 0 && string(resp.Kvs[0].Value) == etcd.Enable {
enable = true
}
apiresp.GinSuccess(c, &apistruct.GetEnableConfigManagerResp{Enable: enable})
}

@ -58,7 +58,9 @@ func Start(ctx context.Context, index int, config *Config) error {
config.RuntimeEnv = runtimeenv.PrintRuntimeEnvironment()
client, err := kdisc.NewDiscoveryRegister(&config.Discovery, config.RuntimeEnv)
client, err := kdisc.NewDiscoveryRegister(&config.Discovery, config.RuntimeEnv, []string{
config.Discovery.RpcService.MessageGateway,
})
if err != nil {
return errs.WrapMsg(err, "failed to register discovery service")
}

@ -2,7 +2,6 @@ package api
import (
"context"
"fmt"
"net/http"
"strings"
@ -29,8 +28,6 @@ import (
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mw"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
const (
@ -56,8 +53,6 @@ func prommetricsGin() gin.HandlerFunc {
}
func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cfg *Config) (*gin.Engine, error) {
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
authConn, err := client.GetConn(ctx, cfg.Discovery.RpcService.Auth)
if err != nil {
return nil, err
@ -314,6 +309,8 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf
configGroup.POST("/get_config", cm.GetConfig)
configGroup.POST("/set_config", cm.SetConfig)
configGroup.POST("/reset_config", cm.ResetConfig)
configGroup.POST("/set_enable_config_manager", cm.SetEnableConfigManager)
configGroup.POST("/get_enable_config_manager", cm.GetEnableConfigManager)
}
{
r.POST("/restart", cm.CheckAdmin, cm.Restart)

@ -16,9 +16,10 @@ package msggateway
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"sync/atomic"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
@ -64,6 +65,9 @@ func (s *Server) Start(ctx context.Context, index int, conf *Config) error {
conf.WebhooksConfig.GetConfigFileName(),
conf.RedisConfig.GetConfigFileName(),
},
[]string{
conf.Discovery.RpcService.MessageGateway,
},
s.InitServer,
)
}

@ -87,7 +87,7 @@ func Start(ctx context.Context, index int, config *Config) error {
if err != nil {
return err
}
client, err := discRegister.NewDiscoveryRegister(&config.Discovery, runTimeEnv)
client, err := discRegister.NewDiscoveryRegister(&config.Discovery, runTimeEnv, nil)
if err != nil {
return err
}

@ -17,6 +17,7 @@ package auth
import (
"context"
"errors"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
@ -118,9 +119,13 @@ func (s *authServer) GetUserToken(ctx context.Context, req *pbauth.GetUserTokenR
if authverify.IsManagerUserID(req.UserID, s.config.Share.IMAdminUserID) {
return nil, errs.ErrNoPermission.WrapMsg("don't get Admin token")
}
if err := s.userClient.CheckUser(ctx, []string{req.UserID}); err != nil {
user, err := s.userClient.GetUserInfo(ctx, req.UserID)
if err != nil {
return nil, err
}
if user.AppMangerLevel >= constant.AppNotificationAdmin {
return nil, errs.ErrArgs.WrapMsg("app account can`t get token")
}
token, err := s.authDatabase.CreateToken(ctx, req.UserID, int(req.PlatformID))
if err != nil {
return nil, err

@ -17,7 +17,6 @@ package user
import (
"context"
"errors"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"math/rand"
"strings"
"sync"
@ -32,6 +31,7 @@ import (
tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/protocol/group"
friendpb "github.com/openimsdk/protocol/relation"
"github.com/openimsdk/tools/db/redisutil"
@ -480,7 +480,9 @@ func (s *userServer) AddNotificationAccount(ctx context.Context, req *pbuser.Add
if err := authverify.CheckAdmin(ctx, s.config.Share.IMAdminUserID); err != nil {
return nil, err
}
if req.AppMangerLevel < constant.AppNotificationAdmin {
return nil, errs.ErrArgs.WithDetail("app level not supported")
}
if req.UserID == "" {
for i := 0; i < 20; i++ {
userId := s.genUserID()
@ -506,16 +508,17 @@ func (s *userServer) AddNotificationAccount(ctx context.Context, req *pbuser.Add
Nickname: req.NickName,
FaceURL: req.FaceURL,
CreateTime: time.Now(),
AppMangerLevel: constant.AppNotificationAdmin,
AppMangerLevel: req.AppMangerLevel,
}
if err := s.db.Create(ctx, []*tablerelation.User{user}); err != nil {
return nil, err
}
return &pbuser.AddNotificationAccountResp{
UserID: req.UserID,
NickName: req.NickName,
FaceURL: req.FaceURL,
UserID: req.UserID,
NickName: req.NickName,
FaceURL: req.FaceURL,
AppMangerLevel: req.AppMangerLevel,
}, nil
}
@ -595,8 +598,13 @@ func (s *userServer) GetNotificationAccount(ctx context.Context, req *pbuser.Get
if err != nil {
return nil, servererrs.ErrUserIDNotFound.Wrap()
}
if user.AppMangerLevel == constant.AppAdmin || user.AppMangerLevel == constant.AppNotificationAdmin {
return &pbuser.GetNotificationAccountResp{}, nil
if user.AppMangerLevel == constant.AppAdmin || user.AppMangerLevel >= constant.AppNotificationAdmin {
return &pbuser.GetNotificationAccountResp{Account: &pbuser.NotificationAccountInfo{
UserID: user.UserID,
FaceURL: user.FaceURL,
NickName: user.Nickname,
AppMangerLevel: user.AppMangerLevel,
}}, nil
}
return nil, errs.ErrNoPermission.WrapMsg("notification messages cannot be sent for this ID")
@ -621,11 +629,12 @@ func (s *userServer) userModelToResp(users []*tablerelation.User, pagination pag
accounts := make([]*pbuser.NotificationAccountInfo, 0)
var total int64
for _, v := range users {
if v.AppMangerLevel == constant.AppNotificationAdmin && !datautil.Contain(v.UserID, s.config.Share.IMAdminUserID...) {
if v.AppMangerLevel >= constant.AppNotificationAdmin && !datautil.Contain(v.UserID, s.config.Share.IMAdminUserID...) {
temp := &pbuser.NotificationAccountInfo{
UserID: v.UserID,
FaceURL: v.FaceURL,
NickName: v.Nickname,
UserID: v.UserID,
FaceURL: v.FaceURL,
NickName: v.Nickname,
AppMangerLevel: v.AppMangerLevel,
}
accounts = append(accounts, temp)
total += 1

@ -2,11 +2,14 @@ package tools
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery"
disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd"
pbconversation "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/third"
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/mw"
@ -27,38 +30,47 @@ type CronTaskConfig struct {
runTimeEnv string
}
func Start(ctx context.Context, config *CronTaskConfig) error {
config.runTimeEnv = runtimeenv.PrintRuntimeEnvironment()
func Start(ctx context.Context, conf *CronTaskConfig) error {
conf.runTimeEnv = runtimeenv.PrintRuntimeEnvironment()
log.CInfo(ctx, "CRON-TASK server is initializing", "runTimeEnv", config.runTimeEnv, "chatRecordsClearTime", config.CronTask.CronExecuteTime, "msgDestructTime", config.CronTask.RetainChatRecords)
if config.CronTask.RetainChatRecords < 1 {
log.CInfo(ctx, "CRON-TASK server is initializing", "runTimeEnv", conf.runTimeEnv, "chatRecordsClearTime", conf.CronTask.CronExecuteTime, "msgDestructTime", conf.CronTask.RetainChatRecords)
if conf.CronTask.RetainChatRecords < 1 {
return errs.New("msg destruct time must be greater than 1").Wrap()
}
client, err := kdisc.NewDiscoveryRegister(&config.Discovery, config.runTimeEnv)
client, err := kdisc.NewDiscoveryRegister(&conf.Discovery, conf.runTimeEnv, nil)
if err != nil {
return errs.WrapMsg(err, "failed to register discovery service")
}
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
ctx = mcontext.SetOpUserID(ctx, config.Share.IMAdminUserID[0])
ctx = mcontext.SetOpUserID(ctx, conf.Share.IMAdminUserID[0])
msgConn, err := client.GetConn(ctx, config.Discovery.RpcService.Msg)
msgConn, err := client.GetConn(ctx, conf.Discovery.RpcService.Msg)
if err != nil {
return err
}
thirdConn, err := client.GetConn(ctx, config.Discovery.RpcService.Third)
thirdConn, err := client.GetConn(ctx, conf.Discovery.RpcService.Third)
if err != nil {
return err
}
conversationConn, err := client.GetConn(ctx, config.Discovery.RpcService.Conversation)
conversationConn, err := client.GetConn(ctx, conf.Discovery.RpcService.Conversation)
if err != nil {
return err
}
if conf.Discovery.Enable == config.ETCD {
cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), []string{
conf.CronTask.GetConfigFileName(),
conf.Share.GetConfigFileName(),
conf.Discovery.GetConfigFileName(),
})
cm.Watch(ctx)
}
srv := &cronServer{
ctx: ctx,
config: config,
config: conf,
cron: cron.New(),
msgClient: msg.NewMsgClient(msgConn),
conversationClient: pbconversation.NewConversationClient(conversationConn),
@ -74,7 +86,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
if err := srv.registerClearUserMsg(); err != nil {
return err
}
log.ZDebug(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime)
log.ZDebug(ctx, "start cron task", "CronExecuteTime", conf.CronTask.CronExecuteTime)
srv.cron.Start()
<-ctx.Done()
return nil

@ -14,3 +14,11 @@ type SetConfigReq struct {
ConfigName string `json:"configName"`
Data string `json:"data"`
}
type SetEnableConfigManagerReq struct {
Enable bool `json:"enable"`
}
type GetEnableConfigManagerResp struct {
Enable bool `json:"enable"`
}

@ -64,5 +64,8 @@ func (a *AuthRpcCmd) runE() error {
a.authConfig.RedisConfig.GetConfigFileName(),
a.authConfig.Discovery.GetConfigFileName(),
},
[]string{
a.authConfig.Discovery.RpcService.MessageGateway,
},
auth.Start)
}

@ -68,6 +68,6 @@ func (a *ConversationRpcCmd) runE() error {
a.conversationConfig.Share.GetConfigFileName(),
a.conversationConfig.LocalCacheConfig.GetConfigFileName(),
a.conversationConfig.Discovery.GetConfigFileName(),
},
}, nil,
conversation.Start)
}

@ -70,6 +70,6 @@ func (a *FriendRpcCmd) runE() error {
a.relationConfig.WebhooksConfig.GetConfigFileName(),
a.relationConfig.LocalCacheConfig.GetConfigFileName(),
a.relationConfig.Discovery.GetConfigFileName(),
},
}, nil,
relation.Start)
}

@ -71,6 +71,6 @@ func (a *GroupRpcCmd) runE() error {
a.groupConfig.WebhooksConfig.GetConfigFileName(),
a.groupConfig.LocalCacheConfig.GetConfigFileName(),
a.groupConfig.Discovery.GetConfigFileName(),
},
}, nil,
group.Start, versionctx.EnableVersionCtx())
}

@ -72,6 +72,6 @@ func (a *MsgRpcCmd) runE() error {
a.msgConfig.WebhooksConfig.GetConfigFileName(),
a.msgConfig.LocalCacheConfig.GetConfigFileName(),
a.msgConfig.Discovery.GetConfigFileName(),
},
}, nil,
msg.Start)
}

@ -72,5 +72,8 @@ func (a *PushRpcCmd) runE() error {
a.pushConfig.LocalCacheConfig.GetConfigFileName(),
a.pushConfig.Discovery.GetConfigFileName(),
},
[]string{
a.pushConfig.Discovery.RpcService.MessageGateway,
},
push.Start)
}

@ -93,7 +93,7 @@ func (r *RootCmd) initEtcd() error {
return err
}
if disConfig.Enable == config.ETCD {
discov, _ := kdisc.NewDiscoveryRegister(&disConfig, env)
discov, _ := kdisc.NewDiscoveryRegister(&disConfig, env, nil)
r.etcdClient = discov.(*etcd.SvcDiscoveryRegistryImpl).GetClient()
}
return nil
@ -113,7 +113,9 @@ func (r *RootCmd) persistentPreRun(cmd *cobra.Command, opts ...func(*CmdOpts)) e
if err := r.initializeLogger(cmdOpts); err != nil {
return errs.WrapMsg(err, "failed to initialize logger")
}
if err := r.etcdClient.Close(); err != nil {
return errs.WrapMsg(err, "failed to close etcd client")
}
return nil
}
@ -141,9 +143,24 @@ func (r *RootCmd) updateConfigFromEtcd(opts *CmdOpts) error {
if r.etcdClient == nil {
return nil
}
ctx := context.TODO()
res, err := r.etcdClient.Get(ctx, disetcd.BuildKey(disetcd.EnableConfigCenterKey))
if err != nil {
log.ZWarn(ctx, "root cmd updateConfigFromEtcd, etcd Get EnableConfigCenterKey err: %v", errs.Wrap(err))
return nil
}
if res.Count == 0 {
return nil
} else {
if string(res.Kvs[0].Value) == disetcd.Disable {
return nil
} else if string(res.Kvs[0].Value) != disetcd.Enable {
return errs.New("unknown EnableConfigCenter value").Wrap()
}
}
update := func(configFileName string, configStruct any) error {
ctx := context.TODO()
key := disetcd.BuildKey(configFileName)
etcdRes, err := r.etcdClient.Get(ctx, key)
if err != nil {

@ -70,6 +70,6 @@ func (a *ThirdRpcCmd) runE() error {
a.thirdConfig.MinioConfig.GetConfigFileName(),
a.thirdConfig.LocalCacheConfig.GetConfigFileName(),
a.thirdConfig.Discovery.GetConfigFileName(),
},
}, nil,
third.Start)
}

@ -72,6 +72,6 @@ func (a *UserRpcCmd) runE() error {
a.userConfig.WebhooksConfig.GetConfigFileName(),
a.userConfig.LocalCacheConfig.GetConfigFileName(),
a.userConfig.Discovery.GetConfigFileName(),
},
}, nil,
user.Start)
}

@ -28,7 +28,7 @@ import (
)
// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
func NewDiscoveryRegister(discovery *config.Discovery, runtimeEnv string) (discovery.SvcDiscoveryRegistry, error) {
func NewDiscoveryRegister(discovery *config.Discovery, runtimeEnv string, watchNames []string) (discovery.SvcDiscoveryRegistry, error) {
if runtimeEnv == config.KUBERNETES {
return kubernetes.NewKubernetesConnManager(discovery.Kubernetes.Namespace,
grpc.WithDefaultCallOptions(
@ -42,6 +42,7 @@ func NewDiscoveryRegister(discovery *config.Discovery, runtimeEnv string) (disco
return etcd.NewSvcDiscoveryRegistry(
discovery.Etcd.RootDirectory,
discovery.Etcd.Address,
watchNames,
etcd.WithDialTimeout(10*time.Second),
etcd.WithMaxCallSendMsgSize(20*1024*1024),
etcd.WithUsernameAndPassword(discovery.Etcd.Username, discovery.Etcd.Password))

@ -14,11 +14,6 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
)
const (
ConfigKeyPrefix = "/open-im/config/"
RestartKey = "restart"
)
var (
ShutDowns []func() error
)

@ -0,0 +1,9 @@
package etcd
const (
ConfigKeyPrefix = "/open-im/config/"
RestartKey = "restart"
EnableConfigCenterKey = "enable-config-center"
Enable = "enable"
Disable = "disable"
)

@ -1,15 +0,0 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcd // import "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd"

@ -49,7 +49,7 @@ import (
// Start rpc server.
func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP,
registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T,
watchConfigNames []string,
watchConfigNames []string, watchServiceNames []string,
rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error,
options ...grpc.ServerOption) error {
@ -95,7 +95,7 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf
if autoSetPorts && discovery.Enable != conf.ETCD {
return errs.New("only etcd support autoSetPorts", "rpcRegisterName", rpcRegisterName).Wrap()
}
client, err := kdisc.NewDiscoveryRegister(discovery, runTimeEnv)
client, err := kdisc.NewDiscoveryRegister(discovery, runTimeEnv, watchServiceNames)
if err != nil {
return err
}

@ -270,8 +270,8 @@ func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]s
}
}
log.ZInfo(ctx, "get users online", "online users length", len(userIDs), "offline users length", len(offlineUserIDs), "cost", time.Since(t))
return userIDs, offlineUserIDs, nil
log.ZInfo(ctx, "get users online", "online users length", len(onlineUserIDs), "offline users length", len(offlineUserIDs), "cost", time.Since(t))
return onlineUserIDs, offlineUserIDs, nil
}
func (o *OnlineCache) setUserOnline(userID string, platformIDs []int32) {

Loading…
Cancel
Save