Merge branch 'tuoyun' of github.com:OpenIMSDK/Open-IM-Server into tuoyun

pull/232/head
wangchuxiao 3 years ago
commit b8d95f4b31

@ -38,7 +38,7 @@ func (mc *OfflineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) {
mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2MongoLowReliability mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2MongoLowReliability
} }
mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschatOffline.Topic}, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschatOffline.Topic},
config.Config.Kafka.Ws2mschatOffline.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongoOffline) config.Config.Kafka.Ws2mschatOffline.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongoOffline)

@ -58,7 +58,7 @@ func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) {
och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2MongoLowReliability och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2MongoLowReliability
} }
och.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, och.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic}, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo) config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo)

@ -26,7 +26,7 @@ type PersistentConsumerHandler struct {
func (pc *PersistentConsumerHandler) Init() { func (pc *PersistentConsumerHandler) Init() {
pc.msgHandle = make(map[string]fcb) pc.msgHandle = make(map[string]fcb)
pc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = pc.handleChatWs2Mysql pc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = pc.handleChatWs2Mysql
pc.persistentConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, pc.persistentConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic}, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql) config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql)

@ -26,7 +26,7 @@ type PushConsumerHandler struct {
func (ms *PushConsumerHandler) Init() { func (ms *PushConsumerHandler) Init() {
ms.msgHandle = make(map[string]fcb) ms.msgHandle = make(map[string]fcb)
ms.msgHandle[config.Config.Kafka.Ms2pschat.Topic] = ms.handleMs2PsChat ms.msgHandle[config.Config.Kafka.Ms2pschat.Topic] = ms.handleMs2PsChat
ms.pushConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, ms.pushConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ms2pschat.Topic}, config.Config.Kafka.Ms2pschat.Addr, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ms2pschat.Topic}, config.Config.Kafka.Ms2pschat.Addr,
config.Config.Kafka.ConsumerGroupID.MsgToPush) config.Config.Kafka.ConsumerGroupID.MsgToPush)
} }

@ -8,6 +8,7 @@ package kafka
import ( import (
"context" "context"
"fmt"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
) )
@ -23,16 +24,13 @@ type MConsumerGroupConfig struct {
IsReturnErr bool IsReturnErr bool
} }
func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addr []string, groupID string) *MConsumerGroup { func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []string, groupID string) *MConsumerGroup {
config := sarama.NewConfig() config := sarama.NewConfig()
config.Version = consumerConfig.KafkaVersion config.Version = consumerConfig.KafkaVersion
config.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial config.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial
config.Consumer.Return.Errors = consumerConfig.IsReturnErr config.Consumer.Return.Errors = consumerConfig.IsReturnErr
client, err := sarama.NewClient(addr, config) fmt.Println("init address is ", addrs, "topics is ", topics)
if err != nil { consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, config)
panic(err.Error())
}
consumerGroup, err := sarama.NewConsumerGroupFromClient(groupID, client)
if err != nil { if err != nil {
panic(err.Error()) panic(err.Error())
} }

Loading…
Cancel
Save