pull/351/head
wangchuxiao 2 years ago
parent c025d0d030
commit 665e2e3e83

@ -7,6 +7,8 @@ serverversion: 2.3.1
etcd:
etcdSchema: openim #默认即可
etcdAddr: [ 127.0.0.1:2379 ] #单机部署时,默认即可
userName:
password:
k8sMod: false #开启k8s模式 使用pod里面环境变量请求services调用服务 而并非etcd
@ -46,6 +48,8 @@ redis:
enableCluster: false #如果外部redis以集群方式启动需要打开此开关
kafka:
userName:
password:
ws2mschat:
addr: [ 127.0.0.1:9092 ] #kafka配置默认即可
topic: "ws2ms_chat" #用于mongo和mysql保存消息

@ -158,6 +158,8 @@ type config struct {
Etcd struct {
EtcdSchema string `yaml:"etcdSchema"`
EtcdAddr []string `yaml:"etcdAddr"`
UserName string `yaml:"userName"`
Password string `yaml:"password"`
}
Log struct {
StorageLocation string `yaml:"storageLocation"`
@ -219,6 +221,8 @@ type config struct {
}
Kafka struct {
UserName string `yaml:"userName"`
Password string `yaml:"password"`
Ws2mschat struct {
Addr []string `yaml:"addr"`
Topic string `yaml:"topic"`

@ -1,8 +1,10 @@
package kafka
import (
"github.com/Shopify/sarama"
"Open_IM/pkg/common/config"
"sync"
"github.com/Shopify/sarama"
)
type Consumer struct {
@ -17,8 +19,13 @@ func NewKafkaConsumer(addr []string, topic string) *Consumer {
p := Consumer{}
p.Topic = topic
p.addr = addr
consumer, err := sarama.NewConsumer(p.addr, nil)
consumerConfig := sarama.NewConfig()
if config.Config.Kafka.UserName != "" && config.Config.Kafka.Password != "" {
consumerConfig.Net.SASL.Enable = true
consumerConfig.Net.SASL.User = config.Config.Kafka.UserName
consumerConfig.Net.SASL.Password = config.Config.Kafka.Password
}
consumer, err := sarama.NewConsumer(p.addr, consumerConfig)
if err != nil {
panic(err.Error())
return nil

@ -1,6 +1,7 @@
package kafka
import (
"Open_IM/pkg/common/config"
log "Open_IM/pkg/common/log"
"Open_IM/pkg/utils"
"errors"
@ -25,7 +26,11 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
p.config.Producer.Return.Errors = true
p.config.Producer.RequiredAcks = sarama.WaitForAll //Set producer Message Reply level 0 1 all
p.config.Producer.Partitioner = sarama.NewHashPartitioner //Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly
if config.Config.Kafka.UserName != "" && config.Config.Kafka.Password != "" {
p.config.Net.SASL.Enable = true
p.config.Net.SASL.User = config.Config.Kafka.UserName
p.config.Net.SASL.Password = config.Config.Kafka.Password
}
p.addr = addr
p.topic = topic

@ -39,6 +39,8 @@ var (
func NewResolver(schema, etcdAddr, serviceName string, operationID string) (*Resolver, error) {
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: strings.Split(etcdAddr, ","),
Username: config.Config.Etcd.UserName,
Password: config.Config.Etcd.Password,
})
if err != nil {
log.Error(operationID, "etcd client v3 failed")

Loading…
Cancel
Save