diff --git a/config/config.yaml b/config/config.yaml index 2224e914f..f474fc06a 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -7,6 +7,8 @@ serverversion: 2.3.1 etcd: etcdSchema: openim #默认即可 etcdAddr: [ 127.0.0.1:2379 ] #单机部署时,默认即可 + userName: + password: k8sMod: false #开启k8s模式 使用pod里面环境变量请求services调用服务 而并非etcd @@ -46,6 +48,8 @@ redis: enableCluster: false #如果外部redis以集群方式启动,需要打开此开关 kafka: + userName: + password: ws2mschat: addr: [ 127.0.0.1:9092 ] #kafka配置,默认即可 topic: "ws2ms_chat" #用于mongo和mysql保存消息 diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 56cc66bb7..e246c76bf 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -158,6 +158,8 @@ type config struct { Etcd struct { EtcdSchema string `yaml:"etcdSchema"` EtcdAddr []string `yaml:"etcdAddr"` + UserName string `yaml:"userName"` + Password string `yaml:"password"` } Log struct { StorageLocation string `yaml:"storageLocation"` @@ -219,6 +221,8 @@ type config struct { } Kafka struct { + UserName string `yaml:"userName"` + Password string `yaml:"password"` Ws2mschat struct { Addr []string `yaml:"addr"` Topic string `yaml:"topic"` diff --git a/pkg/common/kafka/consumer.go b/pkg/common/kafka/consumer.go index 771145906..dd3182fad 100644 --- a/pkg/common/kafka/consumer.go +++ b/pkg/common/kafka/consumer.go @@ -1,8 +1,10 @@ package kafka import ( - "github.com/Shopify/sarama" + "Open_IM/pkg/common/config" "sync" + + "github.com/Shopify/sarama" ) type Consumer struct { @@ -17,8 +19,13 @@ func NewKafkaConsumer(addr []string, topic string) *Consumer { p := Consumer{} p.Topic = topic p.addr = addr - - consumer, err := sarama.NewConsumer(p.addr, nil) + consumerConfig := sarama.NewConfig() + if config.Config.Kafka.UserName != "" && config.Config.Kafka.Password != "" { + consumerConfig.Net.SASL.Enable = true + consumerConfig.Net.SASL.User = config.Config.Kafka.UserName + consumerConfig.Net.SASL.Password = config.Config.Kafka.Password + } + consumer, err := sarama.NewConsumer(p.addr, consumerConfig) if err != nil { panic(err.Error()) return nil diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index 15c08ac04..23dfc3085 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -1,6 +1,7 @@ package kafka import ( + "Open_IM/pkg/common/config" log "Open_IM/pkg/common/log" "Open_IM/pkg/utils" "errors" @@ -25,7 +26,11 @@ func NewKafkaProducer(addr []string, topic string) *Producer { p.config.Producer.Return.Errors = true p.config.Producer.RequiredAcks = sarama.WaitForAll //Set producer Message Reply level 0 1 all p.config.Producer.Partitioner = sarama.NewHashPartitioner //Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly - + if config.Config.Kafka.UserName != "" && config.Config.Kafka.Password != "" { + p.config.Net.SASL.Enable = true + p.config.Net.SASL.User = config.Config.Kafka.UserName + p.config.Net.SASL.Password = config.Config.Kafka.Password + } p.addr = addr p.topic = topic diff --git a/pkg/grpc-etcdv3/getcdv3/resolver.go b/pkg/grpc-etcdv3/getcdv3/resolver.go index db280e040..8f6a383d6 100644 --- a/pkg/grpc-etcdv3/getcdv3/resolver.go +++ b/pkg/grpc-etcdv3/getcdv3/resolver.go @@ -39,6 +39,8 @@ var ( func NewResolver(schema, etcdAddr, serviceName string, operationID string) (*Resolver, error) { etcdCli, err := clientv3.New(clientv3.Config{ Endpoints: strings.Split(etcdAddr, ","), + Username: config.Config.Etcd.UserName, + Password: config.Config.Etcd.Password, }) if err != nil { log.Error(operationID, "etcd client v3 failed")