From cb0a1a20f1e30fa94731450a4daa0484d7e2a2da Mon Sep 17 00:00:00 2001 From: chenqinghe Date: Wed, 29 Dec 2021 21:45:38 +0800 Subject: [PATCH] refactor: refactor message queue --- .idea/.gitignore | 8 ++ .idea/Open-IM-Server.iml | 9 ++ .idea/modules.xml | 8 ++ .idea/vcs.xml | 6 + go.sum | 3 - .../msg_transfer/logic/history_msg_handler.go | 42 +++---- internal/msg_transfer/logic/init.go | 10 +- .../logic/persistent_msg_handler.go | 36 +++--- pkg/common/kafka/consumer_group.go | 53 --------- pkg/common/mq/consumer.go | 40 +++++++ pkg/common/{ => mq}/kafka/consumer.go | 1 + pkg/common/mq/kafka/consumer_group.go | 110 ++++++++++++++++++ pkg/common/{ => mq}/kafka/producer.go | 11 +- pkg/common/mq/producer.go | 7 ++ 14 files changed, 233 insertions(+), 111 deletions(-) create mode 100644 .idea/.gitignore create mode 100644 .idea/Open-IM-Server.iml create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml delete mode 100644 pkg/common/kafka/consumer_group.go create mode 100644 pkg/common/mq/consumer.go rename pkg/common/{ => mq}/kafka/consumer.go (99%) create mode 100644 pkg/common/mq/kafka/consumer_group.go rename pkg/common/{ => mq}/kafka/producer.go (82%) create mode 100644 pkg/common/mq/producer.go diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 000000000..1c2fda565 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/Open-IM-Server.iml b/.idea/Open-IM-Server.iml new file mode 100644 index 000000000..338a26630 --- /dev/null +++ b/.idea/Open-IM-Server.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 000000000..d81362bc6 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 000000000..9661ac713 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/go.sum b/go.sum index 56ac84b4c..5ae8a67e3 100644 --- a/go.sum +++ b/go.sum @@ -141,7 +141,6 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -181,12 +180,10 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= diff --git a/internal/msg_transfer/logic/history_msg_handler.go b/internal/msg_transfer/logic/history_msg_handler.go index 9ae3c5a0f..c73a7253a 100644 --- a/internal/msg_transfer/logic/history_msg_handler.go +++ b/internal/msg_transfer/logic/history_msg_handler.go @@ -1,16 +1,18 @@ package logic import ( + "Open_IM/pkg/common/mq" + "context" + "strings" + "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" - kfk "Open_IM/pkg/common/kafka" "Open_IM/pkg/common/log" + kfk "Open_IM/pkg/common/mq/kafka" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbMsg "Open_IM/pkg/proto/chat" pbPush "Open_IM/pkg/proto/push" "Open_IM/pkg/utils" - "context" - "strings" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" @@ -19,27 +21,24 @@ import ( type fcb func(msg []byte, msgKey string) type HistoryConsumerHandler struct { - msgHandle map[string]fcb - historyConsumerGroup *kfk.MConsumerGroup + historyConsumerGroup mq.Consumer } func (mc *HistoryConsumerHandler) Init() { - mc.msgHandle = make(map[string]fcb) - mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2Mongo mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, - OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic}, - config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo) - + OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo) + mc.historyConsumerGroup.RegisterMessageHandler(config.Config.Kafka.Ws2mschat.Topic, mq.MessageHandleFunc(mc.handleChatWs2Mongo)) } -func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) { +func (mc *HistoryConsumerHandler) handleChatWs2Mongo(message *mq.Message) error { + msg, msgKey := message.Value, string(message.Key) log.InfoByKv("chat come mongo!!!", "", "chat", string(msg)) time := utils.GetCurrentTimestampByNano() pbData := pbMsg.WSToMsgSvrChatMsg{} err := proto.Unmarshal(msg, &pbData) if err != nil { log.ErrorByKv("msg_transfer Unmarshal chat err", "", "chat", string(msg), "err", err.Error()) - return + return err } pbSaveData := pbMsg.MsgSvrToPushSvrChatMsg{} pbSaveData.SendID = pbData.SendID @@ -68,14 +67,14 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) err := saveUserChat(pbData.RecvID, &pbSaveData) if err != nil { log.NewError(pbSaveData.OperationID, "single data insert to mongo err", err.Error(), pbSaveData.String()) - return + return err } } else if msgKey == pbSaveData.SendID { err := saveUserChat(pbData.SendID, &pbSaveData) if err != nil { log.NewError(pbSaveData.OperationID, "single data insert to mongo err", err.Error(), pbSaveData.String()) - return + return err } } @@ -96,7 +95,7 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) err := saveUserChat(uidAndGroupID[0], &pbSaveData) if err != nil { log.NewError(pbSaveData.OperationID, "group data insert to mongo err", pbSaveData.String(), uidAndGroupID[0], err.Error()) - return + return err } } pbSaveData.Options = pbData.Options @@ -104,22 +103,13 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) go sendMessageToPush(&pbSaveData) default: log.NewError(pbSaveData.OperationID, "SessionType error", pbSaveData.String()) - return + return nil // not retry } log.NewDebug(pbSaveData.OperationID, "msg_transfer handle topic data to database success...", pbSaveData.String()) -} -func (HistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } -func (HistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } -func (mc *HistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, - claim sarama.ConsumerGroupClaim) error { - for msg := range claim.Messages() { - log.InfoByKv("kafka get info to mongo", "", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "chat", string(msg.Value)) - mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) - sess.MarkMessage(msg, "") - } return nil } + func sendMessageToPush(message *pbMsg.MsgSvrToPushSvrChatMsg) { log.InfoByKv("msg_transfer send message to push", message.OperationID, "message", message.String()) msg := pbPush.PushMsgReq{} diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index cce434d99..2d4c02ccb 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -2,14 +2,15 @@ package logic import ( "Open_IM/pkg/common/config" - "Open_IM/pkg/common/kafka" "Open_IM/pkg/common/log" + "Open_IM/pkg/common/mq" + "Open_IM/pkg/common/mq/kafka" ) var ( persistentCH PersistentConsumerHandler historyCH HistoryConsumerHandler - producer *kafka.Producer + producer mq.Producer ) func Init() { @@ -18,8 +19,9 @@ func Init() { historyCH.Init() producer = kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic) } + func Run() { //register mysqlConsumerHandler to - go persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(&persistentCH) - go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH) + go persistentCH.persistentConsumerGroup.Start() + go historyCH.historyConsumerGroup.Start() } diff --git a/internal/msg_transfer/logic/persistent_msg_handler.go b/internal/msg_transfer/logic/persistent_msg_handler.go index 88039041c..3ad249f40 100644 --- a/internal/msg_transfer/logic/persistent_msg_handler.go +++ b/internal/msg_transfer/logic/persistent_msg_handler.go @@ -7,38 +7,41 @@ package logic import ( + "Open_IM/pkg/common/mq" + "strings" + "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db/mysql_model/im_mysql_msg_model" - kfk "Open_IM/pkg/common/kafka" "Open_IM/pkg/common/log" + kfk "Open_IM/pkg/common/mq/kafka" pbMsg "Open_IM/pkg/proto/chat" "Open_IM/pkg/utils" + "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" - "strings" ) type PersistentConsumerHandler struct { - msgHandle map[string]fcb - persistentConsumerGroup *kfk.MConsumerGroup + persistentConsumerGroup mq.Consumer } func (pc *PersistentConsumerHandler) Init() { - pc.msgHandle = make(map[string]fcb) - pc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = pc.handleChatWs2Mysql pc.persistentConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, - OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic}, + OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql) + pc.persistentConsumerGroup.RegisterMessageHandler(config.Config.Kafka.Ws2mschat.Topic, mq.MessageHandleFunc(pc.handleChatWs2Mysql)) } -func (pc *PersistentConsumerHandler) handleChatWs2Mysql(msg []byte, msgKey string) { + +func (pc *PersistentConsumerHandler) handleChatWs2Mysql(message *mq.Message) error { + msg, msgKey := message.Value, string(message.Key) log.InfoByKv("chat come here mysql!!!", "", "chat", string(msg)) pbData := pbMsg.WSToMsgSvrChatMsg{} err := proto.Unmarshal(msg, &pbData) if err != nil { log.ErrorByKv("msg_transfer Unmarshal chat err", "", "chat", string(msg), "err", err.Error()) - return + return nil // not retry } Options := utils.JsonStringToMap(pbData.Options) //Control whether to store history messages (mysql) @@ -49,27 +52,18 @@ func (pc *PersistentConsumerHandler) handleChatWs2Mysql(msg []byte, msgKey strin log.InfoByKv("msg_transfer chat persisting", pbData.OperationID) if err = im_mysql_msg_model.InsertMessageToChatLog(pbData); err != nil { log.ErrorByKv("Message insert failed", pbData.OperationID, "err", err.Error(), "chat", pbData.String()) - return + return err } } else if pbData.SessionType == constant.GroupChatType && msgKey == "0" { pbData.RecvID = strings.Split(pbData.RecvID, " ")[1] log.InfoByKv("msg_transfer chat persisting", pbData.OperationID) if err = im_mysql_msg_model.InsertMessageToChatLog(pbData); err != nil { log.ErrorByKv("Message insert failed", pbData.OperationID, "err", err.Error(), "chat", pbData.String()) - return + return err } } } -} -func (PersistentConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } -func (PersistentConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } -func (pc *PersistentConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, - claim sarama.ConsumerGroupClaim) error { - for msg := range claim.Messages() { - log.InfoByKv("kafka get info to mysql", "", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "chat", string(msg.Value)) - pc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) - sess.MarkMessage(msg, "") - } + return nil } diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go deleted file mode 100644 index 4c4af5033..000000000 --- a/pkg/common/kafka/consumer_group.go +++ /dev/null @@ -1,53 +0,0 @@ -/* -** description(""). -** copyright('tuoyun,www.tuoyun.net'). -** author("fg,Gordon@tuoyun.net"). -** time(2021/5/11 9:36). - */ -package kafka - -import ( - "context" - "github.com/Shopify/sarama" -) - -type MConsumerGroup struct { - sarama.ConsumerGroup - groupID string - topics []string -} - -type MConsumerGroupConfig struct { - KafkaVersion sarama.KafkaVersion - OffsetsInitial int64 - IsReturnErr bool -} - -func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addr []string, groupID string) *MConsumerGroup { - config := sarama.NewConfig() - config.Version = consumerConfig.KafkaVersion - config.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial - config.Consumer.Return.Errors = consumerConfig.IsReturnErr - client, err := sarama.NewClient(addr, config) - if err != nil { - panic(err) - } - consumerGroup, err := sarama.NewConsumerGroupFromClient(groupID, client) - if err != nil { - panic(err) - } - return &MConsumerGroup{ - consumerGroup, - groupID, - topics, - } -} -func (mc *MConsumerGroup) RegisterHandleAndConsumer(handler sarama.ConsumerGroupHandler) { - ctx := context.Background() - for { - err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler) - if err != nil { - panic(err) - } - } -} diff --git a/pkg/common/mq/consumer.go b/pkg/common/mq/consumer.go new file mode 100644 index 000000000..6576acc33 --- /dev/null +++ b/pkg/common/mq/consumer.go @@ -0,0 +1,40 @@ +package mq + +import "time" + +type Consumer interface { + // RegisterMessageHandler is used to register message handler + // any received messages will be passed to handler to process + // once the Consumer started, it is forbidden to register handlers. + RegisterMessageHandler(topic string, handler MessageHandler) + + // Start to consume messages + Start() error +} + +type MessageHandler interface { + // HandleMessage process received messages, + // if returned error is nil, the message will be auto committed. + HandleMessage(msg *Message) error +} + +type MessageHandleFunc func(msg *Message) error + +func (fn MessageHandleFunc) HandleMessage(msg *Message) error { + return fn(msg) +} + +type Message struct { + Key, Value []byte + Topic string + Partition int32 + Offset int64 + Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp + BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp + Headers []*RecordHeader // only set if kafka is version 0.11+ +} + +type RecordHeader struct { + Key []byte + Value []byte +} diff --git a/pkg/common/kafka/consumer.go b/pkg/common/mq/kafka/consumer.go similarity index 99% rename from pkg/common/kafka/consumer.go rename to pkg/common/mq/kafka/consumer.go index eed6ef142..3b45265f9 100644 --- a/pkg/common/kafka/consumer.go +++ b/pkg/common/mq/kafka/consumer.go @@ -33,4 +33,5 @@ func NewKafkaConsumer(addr []string, topic string) *Consumer { p.PartitionList = partitionList return &p + } diff --git a/pkg/common/mq/kafka/consumer_group.go b/pkg/common/mq/kafka/consumer_group.go new file mode 100644 index 000000000..2585c72c1 --- /dev/null +++ b/pkg/common/mq/kafka/consumer_group.go @@ -0,0 +1,110 @@ +/* +** description(""). +** copyright('tuoyun,www.tuoyun.net'). +** author("fg,Gordon@tuoyun.net"). +** time(2021/5/11 9:36). + */ +package kafka + +import ( + "context" + "fmt" + "sync" + + "Open_IM/pkg/common/mq" + + "github.com/Shopify/sarama" +) + +type kafkaConsumerGroup struct { + sarama.ConsumerGroup + groupID string + + mu *sync.RWMutex + handlers map[string][]mq.MessageHandler +} + +var _ mq.Consumer = (*kafkaConsumerGroup)(nil) + +type MConsumerGroupConfig struct { + KafkaVersion sarama.KafkaVersion + OffsetsInitial int64 + IsReturnErr bool +} + +func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, addr []string, groupID string) *kafkaConsumerGroup { + config := sarama.NewConfig() + config.Version = consumerConfig.KafkaVersion + config.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial + config.Consumer.Return.Errors = consumerConfig.IsReturnErr + client, err := sarama.NewClient(addr, config) + if err != nil { + panic(err) + } + consumerGroup, err := sarama.NewConsumerGroupFromClient(groupID, client) + if err != nil { + panic(err) + } + return &kafkaConsumerGroup{ + ConsumerGroup: consumerGroup, + groupID: groupID, + handlers: make(map[string][]mq.MessageHandler), + } +} + +func (mc *kafkaConsumerGroup) RegisterMessageHandler(topic string, handler mq.MessageHandler) { + mc.mu.Lock() + defer mc.mu.Unlock() + + handlers := mc.handlers[topic] + handlers = append(handlers, handler) + mc.handlers[topic] = handlers +} + +func (mc *kafkaConsumerGroup) Start() error { + topics := make([]string, 0, len(mc.handlers)) + for topic := range mc.handlers { + topics = append(topics, topic) + } + + ctx := context.Background() + for { + err := mc.ConsumerGroup.Consume(ctx, topics, mc) + if err != nil { + panic(err) + } + } +} + +func (mc *kafkaConsumerGroup) Setup(sarama.ConsumerGroupSession) error { return nil } +func (mc *kafkaConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error { return nil } +func (mc *kafkaConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + for msg := range claim.Messages() { + + mc.mu.RLock() + handlers, ok := mc.handlers[msg.Topic] + mc.mu.RUnlock() + if !ok { + panic(fmt.Sprintf("no handlers for topic: %s", msg.Topic)) + } + + message := &mq.Message{ + Key: msg.Key, + Value: msg.Value, + Topic: msg.Topic, + Partition: msg.Partition, + Offset: msg.Offset, + Timestamp: msg.Timestamp, + } + for _, handler := range handlers { + for { + if err := handler.HandleMessage(message); err == nil { // error is nil, auto commit + sess.MarkMessage(msg, "") + break + } + } + } + } + + return nil +} diff --git a/pkg/common/kafka/producer.go b/pkg/common/mq/kafka/producer.go similarity index 82% rename from pkg/common/kafka/producer.go rename to pkg/common/mq/kafka/producer.go index c82df975f..57987f374 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/mq/kafka/producer.go @@ -2,19 +2,22 @@ package kafka import ( log2 "Open_IM/pkg/common/log" + "Open_IM/pkg/common/mq" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" ) -type Producer struct { +type kafkaProducer struct { topic string addr []string config *sarama.Config producer sarama.SyncProducer } -func NewKafkaProducer(addr []string, topic string) *Producer { - p := Producer{} +var _ mq.Producer = (*kafkaProducer)(nil) + +func NewKafkaProducer(addr []string, topic string) *kafkaProducer { + p := kafkaProducer{} p.config = sarama.NewConfig() //Instantiate a sarama Config p.config.Producer.Return.Successes = true //Whether to enable the successes channel to be notified after the message is sent successfully p.config.Producer.RequiredAcks = sarama.WaitForAll //Set producer Message Reply level 0 1 all @@ -32,7 +35,7 @@ func NewKafkaProducer(addr []string, topic string) *Producer { return &p } -func (p *Producer) SendMessage(m proto.Message, key ...string) (int32, int64, error) { +func (p *kafkaProducer) SendMessage(m proto.Message, key ...string) (int32, int64, error) { kMsg := &sarama.ProducerMessage{} kMsg.Topic = p.topic if len(key) == 1 { diff --git a/pkg/common/mq/producer.go b/pkg/common/mq/producer.go new file mode 100644 index 000000000..907061e15 --- /dev/null +++ b/pkg/common/mq/producer.go @@ -0,0 +1,7 @@ +package mq + +import "github.com/golang/protobuf/proto" + +type Producer interface { + SendMessage(m proto.Message, key ...string) (int32, int64, error) +}