diff --git a/.idea/.gitignore b/.idea/.gitignore
new file mode 100644
index 000000000..1c2fda565
--- /dev/null
+++ b/.idea/.gitignore
@@ -0,0 +1,8 @@
+# Default ignored files
+/shelf/
+/workspace.xml
+# Editor-based HTTP Client requests
+/httpRequests/
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml
diff --git a/.idea/Open-IM-Server.iml b/.idea/Open-IM-Server.iml
new file mode 100644
index 000000000..338a26630
--- /dev/null
+++ b/.idea/Open-IM-Server.iml
@@ -0,0 +1,9 @@
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
new file mode 100644
index 000000000..d81362bc6
--- /dev/null
+++ b/.idea/modules.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 000000000..9661ac713
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/go.sum b/go.sum
index 56ac84b4c..5ae8a67e3 100644
--- a/go.sum
+++ b/go.sum
@@ -141,7 +141,6 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
-github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
@@ -181,12 +180,10 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
-github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
-github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
diff --git a/internal/msg_transfer/logic/history_msg_handler.go b/internal/msg_transfer/logic/history_msg_handler.go
index 9ae3c5a0f..c73a7253a 100644
--- a/internal/msg_transfer/logic/history_msg_handler.go
+++ b/internal/msg_transfer/logic/history_msg_handler.go
@@ -1,16 +1,18 @@
package logic
import (
+ "Open_IM/pkg/common/mq"
+ "context"
+ "strings"
+
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
- kfk "Open_IM/pkg/common/kafka"
"Open_IM/pkg/common/log"
+ kfk "Open_IM/pkg/common/mq/kafka"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbMsg "Open_IM/pkg/proto/chat"
pbPush "Open_IM/pkg/proto/push"
"Open_IM/pkg/utils"
- "context"
- "strings"
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
@@ -19,27 +21,24 @@ import (
type fcb func(msg []byte, msgKey string)
type HistoryConsumerHandler struct {
- msgHandle map[string]fcb
- historyConsumerGroup *kfk.MConsumerGroup
+ historyConsumerGroup mq.Consumer
}
func (mc *HistoryConsumerHandler) Init() {
- mc.msgHandle = make(map[string]fcb)
- mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2Mongo
mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0,
- OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
- config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo)
-
+ OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo)
+ mc.historyConsumerGroup.RegisterMessageHandler(config.Config.Kafka.Ws2mschat.Topic, mq.MessageHandleFunc(mc.handleChatWs2Mongo))
}
-func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) {
+func (mc *HistoryConsumerHandler) handleChatWs2Mongo(message *mq.Message) error {
+ msg, msgKey := message.Value, string(message.Key)
log.InfoByKv("chat come mongo!!!", "", "chat", string(msg))
time := utils.GetCurrentTimestampByNano()
pbData := pbMsg.WSToMsgSvrChatMsg{}
err := proto.Unmarshal(msg, &pbData)
if err != nil {
log.ErrorByKv("msg_transfer Unmarshal chat err", "", "chat", string(msg), "err", err.Error())
- return
+ return err
}
pbSaveData := pbMsg.MsgSvrToPushSvrChatMsg{}
pbSaveData.SendID = pbData.SendID
@@ -68,14 +67,14 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string)
err := saveUserChat(pbData.RecvID, &pbSaveData)
if err != nil {
log.NewError(pbSaveData.OperationID, "single data insert to mongo err", err.Error(), pbSaveData.String())
- return
+ return err
}
} else if msgKey == pbSaveData.SendID {
err := saveUserChat(pbData.SendID, &pbSaveData)
if err != nil {
log.NewError(pbSaveData.OperationID, "single data insert to mongo err", err.Error(), pbSaveData.String())
- return
+ return err
}
}
@@ -96,7 +95,7 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string)
err := saveUserChat(uidAndGroupID[0], &pbSaveData)
if err != nil {
log.NewError(pbSaveData.OperationID, "group data insert to mongo err", pbSaveData.String(), uidAndGroupID[0], err.Error())
- return
+ return err
}
}
pbSaveData.Options = pbData.Options
@@ -104,22 +103,13 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string)
go sendMessageToPush(&pbSaveData)
default:
log.NewError(pbSaveData.OperationID, "SessionType error", pbSaveData.String())
- return
+ return nil // not retry
}
log.NewDebug(pbSaveData.OperationID, "msg_transfer handle topic data to database success...", pbSaveData.String())
-}
-func (HistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
-func (HistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
-func (mc *HistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
- claim sarama.ConsumerGroupClaim) error {
- for msg := range claim.Messages() {
- log.InfoByKv("kafka get info to mongo", "", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "chat", string(msg.Value))
- mc.msgHandle[msg.Topic](msg.Value, string(msg.Key))
- sess.MarkMessage(msg, "")
- }
return nil
}
+
func sendMessageToPush(message *pbMsg.MsgSvrToPushSvrChatMsg) {
log.InfoByKv("msg_transfer send message to push", message.OperationID, "message", message.String())
msg := pbPush.PushMsgReq{}
diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go
index cce434d99..2d4c02ccb 100644
--- a/internal/msg_transfer/logic/init.go
+++ b/internal/msg_transfer/logic/init.go
@@ -2,14 +2,15 @@ package logic
import (
"Open_IM/pkg/common/config"
- "Open_IM/pkg/common/kafka"
"Open_IM/pkg/common/log"
+ "Open_IM/pkg/common/mq"
+ "Open_IM/pkg/common/mq/kafka"
)
var (
persistentCH PersistentConsumerHandler
historyCH HistoryConsumerHandler
- producer *kafka.Producer
+ producer mq.Producer
)
func Init() {
@@ -18,8 +19,9 @@ func Init() {
historyCH.Init()
producer = kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic)
}
+
func Run() {
//register mysqlConsumerHandler to
- go persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(&persistentCH)
- go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH)
+ go persistentCH.persistentConsumerGroup.Start()
+ go historyCH.historyConsumerGroup.Start()
}
diff --git a/internal/msg_transfer/logic/persistent_msg_handler.go b/internal/msg_transfer/logic/persistent_msg_handler.go
index 88039041c..3ad249f40 100644
--- a/internal/msg_transfer/logic/persistent_msg_handler.go
+++ b/internal/msg_transfer/logic/persistent_msg_handler.go
@@ -7,38 +7,41 @@
package logic
import (
+ "Open_IM/pkg/common/mq"
+ "strings"
+
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/mysql_model/im_mysql_msg_model"
- kfk "Open_IM/pkg/common/kafka"
"Open_IM/pkg/common/log"
+ kfk "Open_IM/pkg/common/mq/kafka"
pbMsg "Open_IM/pkg/proto/chat"
"Open_IM/pkg/utils"
+
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
- "strings"
)
type PersistentConsumerHandler struct {
- msgHandle map[string]fcb
- persistentConsumerGroup *kfk.MConsumerGroup
+ persistentConsumerGroup mq.Consumer
}
func (pc *PersistentConsumerHandler) Init() {
- pc.msgHandle = make(map[string]fcb)
- pc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = pc.handleChatWs2Mysql
pc.persistentConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0,
- OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
+ OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false},
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql)
+ pc.persistentConsumerGroup.RegisterMessageHandler(config.Config.Kafka.Ws2mschat.Topic, mq.MessageHandleFunc(pc.handleChatWs2Mysql))
}
-func (pc *PersistentConsumerHandler) handleChatWs2Mysql(msg []byte, msgKey string) {
+
+func (pc *PersistentConsumerHandler) handleChatWs2Mysql(message *mq.Message) error {
+ msg, msgKey := message.Value, string(message.Key)
log.InfoByKv("chat come here mysql!!!", "", "chat", string(msg))
pbData := pbMsg.WSToMsgSvrChatMsg{}
err := proto.Unmarshal(msg, &pbData)
if err != nil {
log.ErrorByKv("msg_transfer Unmarshal chat err", "", "chat", string(msg), "err", err.Error())
- return
+ return nil // not retry
}
Options := utils.JsonStringToMap(pbData.Options)
//Control whether to store history messages (mysql)
@@ -49,27 +52,18 @@ func (pc *PersistentConsumerHandler) handleChatWs2Mysql(msg []byte, msgKey strin
log.InfoByKv("msg_transfer chat persisting", pbData.OperationID)
if err = im_mysql_msg_model.InsertMessageToChatLog(pbData); err != nil {
log.ErrorByKv("Message insert failed", pbData.OperationID, "err", err.Error(), "chat", pbData.String())
- return
+ return err
}
} else if pbData.SessionType == constant.GroupChatType && msgKey == "0" {
pbData.RecvID = strings.Split(pbData.RecvID, " ")[1]
log.InfoByKv("msg_transfer chat persisting", pbData.OperationID)
if err = im_mysql_msg_model.InsertMessageToChatLog(pbData); err != nil {
log.ErrorByKv("Message insert failed", pbData.OperationID, "err", err.Error(), "chat", pbData.String())
- return
+ return err
}
}
}
-}
-func (PersistentConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
-func (PersistentConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
-func (pc *PersistentConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
- claim sarama.ConsumerGroupClaim) error {
- for msg := range claim.Messages() {
- log.InfoByKv("kafka get info to mysql", "", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "chat", string(msg.Value))
- pc.msgHandle[msg.Topic](msg.Value, string(msg.Key))
- sess.MarkMessage(msg, "")
- }
+
return nil
}
diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go
deleted file mode 100644
index 4c4af5033..000000000
--- a/pkg/common/kafka/consumer_group.go
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
-** description("").
-** copyright('tuoyun,www.tuoyun.net').
-** author("fg,Gordon@tuoyun.net").
-** time(2021/5/11 9:36).
- */
-package kafka
-
-import (
- "context"
- "github.com/Shopify/sarama"
-)
-
-type MConsumerGroup struct {
- sarama.ConsumerGroup
- groupID string
- topics []string
-}
-
-type MConsumerGroupConfig struct {
- KafkaVersion sarama.KafkaVersion
- OffsetsInitial int64
- IsReturnErr bool
-}
-
-func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addr []string, groupID string) *MConsumerGroup {
- config := sarama.NewConfig()
- config.Version = consumerConfig.KafkaVersion
- config.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial
- config.Consumer.Return.Errors = consumerConfig.IsReturnErr
- client, err := sarama.NewClient(addr, config)
- if err != nil {
- panic(err)
- }
- consumerGroup, err := sarama.NewConsumerGroupFromClient(groupID, client)
- if err != nil {
- panic(err)
- }
- return &MConsumerGroup{
- consumerGroup,
- groupID,
- topics,
- }
-}
-func (mc *MConsumerGroup) RegisterHandleAndConsumer(handler sarama.ConsumerGroupHandler) {
- ctx := context.Background()
- for {
- err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler)
- if err != nil {
- panic(err)
- }
- }
-}
diff --git a/pkg/common/mq/consumer.go b/pkg/common/mq/consumer.go
new file mode 100644
index 000000000..6576acc33
--- /dev/null
+++ b/pkg/common/mq/consumer.go
@@ -0,0 +1,40 @@
+package mq
+
+import "time"
+
+type Consumer interface {
+ // RegisterMessageHandler is used to register message handler
+ // any received messages will be passed to handler to process
+ // once the Consumer started, it is forbidden to register handlers.
+ RegisterMessageHandler(topic string, handler MessageHandler)
+
+ // Start to consume messages
+ Start() error
+}
+
+type MessageHandler interface {
+ // HandleMessage process received messages,
+ // if returned error is nil, the message will be auto committed.
+ HandleMessage(msg *Message) error
+}
+
+type MessageHandleFunc func(msg *Message) error
+
+func (fn MessageHandleFunc) HandleMessage(msg *Message) error {
+ return fn(msg)
+}
+
+type Message struct {
+ Key, Value []byte
+ Topic string
+ Partition int32
+ Offset int64
+ Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp
+ BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
+ Headers []*RecordHeader // only set if kafka is version 0.11+
+}
+
+type RecordHeader struct {
+ Key []byte
+ Value []byte
+}
diff --git a/pkg/common/kafka/consumer.go b/pkg/common/mq/kafka/consumer.go
similarity index 99%
rename from pkg/common/kafka/consumer.go
rename to pkg/common/mq/kafka/consumer.go
index eed6ef142..3b45265f9 100644
--- a/pkg/common/kafka/consumer.go
+++ b/pkg/common/mq/kafka/consumer.go
@@ -33,4 +33,5 @@ func NewKafkaConsumer(addr []string, topic string) *Consumer {
p.PartitionList = partitionList
return &p
+
}
diff --git a/pkg/common/mq/kafka/consumer_group.go b/pkg/common/mq/kafka/consumer_group.go
new file mode 100644
index 000000000..2585c72c1
--- /dev/null
+++ b/pkg/common/mq/kafka/consumer_group.go
@@ -0,0 +1,110 @@
+/*
+** description("").
+** copyright('tuoyun,www.tuoyun.net').
+** author("fg,Gordon@tuoyun.net").
+** time(2021/5/11 9:36).
+ */
+package kafka
+
+import (
+ "context"
+ "fmt"
+ "sync"
+
+ "Open_IM/pkg/common/mq"
+
+ "github.com/Shopify/sarama"
+)
+
+type kafkaConsumerGroup struct {
+ sarama.ConsumerGroup
+ groupID string
+
+ mu *sync.RWMutex
+ handlers map[string][]mq.MessageHandler
+}
+
+var _ mq.Consumer = (*kafkaConsumerGroup)(nil)
+
+type MConsumerGroupConfig struct {
+ KafkaVersion sarama.KafkaVersion
+ OffsetsInitial int64
+ IsReturnErr bool
+}
+
+func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, addr []string, groupID string) *kafkaConsumerGroup {
+ config := sarama.NewConfig()
+ config.Version = consumerConfig.KafkaVersion
+ config.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial
+ config.Consumer.Return.Errors = consumerConfig.IsReturnErr
+ client, err := sarama.NewClient(addr, config)
+ if err != nil {
+ panic(err)
+ }
+ consumerGroup, err := sarama.NewConsumerGroupFromClient(groupID, client)
+ if err != nil {
+ panic(err)
+ }
+ return &kafkaConsumerGroup{
+ ConsumerGroup: consumerGroup,
+ groupID: groupID,
+ handlers: make(map[string][]mq.MessageHandler),
+ }
+}
+
+func (mc *kafkaConsumerGroup) RegisterMessageHandler(topic string, handler mq.MessageHandler) {
+ mc.mu.Lock()
+ defer mc.mu.Unlock()
+
+ handlers := mc.handlers[topic]
+ handlers = append(handlers, handler)
+ mc.handlers[topic] = handlers
+}
+
+func (mc *kafkaConsumerGroup) Start() error {
+ topics := make([]string, 0, len(mc.handlers))
+ for topic := range mc.handlers {
+ topics = append(topics, topic)
+ }
+
+ ctx := context.Background()
+ for {
+ err := mc.ConsumerGroup.Consume(ctx, topics, mc)
+ if err != nil {
+ panic(err)
+ }
+ }
+}
+
+func (mc *kafkaConsumerGroup) Setup(sarama.ConsumerGroupSession) error { return nil }
+func (mc *kafkaConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error { return nil }
+func (mc *kafkaConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+ for msg := range claim.Messages() {
+
+ mc.mu.RLock()
+ handlers, ok := mc.handlers[msg.Topic]
+ mc.mu.RUnlock()
+ if !ok {
+ panic(fmt.Sprintf("no handlers for topic: %s", msg.Topic))
+ }
+
+ message := &mq.Message{
+ Key: msg.Key,
+ Value: msg.Value,
+ Topic: msg.Topic,
+ Partition: msg.Partition,
+ Offset: msg.Offset,
+ Timestamp: msg.Timestamp,
+ }
+ for _, handler := range handlers {
+ for {
+ if err := handler.HandleMessage(message); err == nil { // error is nil, auto commit
+ sess.MarkMessage(msg, "")
+ break
+ }
+ }
+ }
+ }
+
+ return nil
+}
diff --git a/pkg/common/kafka/producer.go b/pkg/common/mq/kafka/producer.go
similarity index 82%
rename from pkg/common/kafka/producer.go
rename to pkg/common/mq/kafka/producer.go
index c82df975f..57987f374 100644
--- a/pkg/common/kafka/producer.go
+++ b/pkg/common/mq/kafka/producer.go
@@ -2,19 +2,22 @@ package kafka
import (
log2 "Open_IM/pkg/common/log"
+ "Open_IM/pkg/common/mq"
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
)
-type Producer struct {
+type kafkaProducer struct {
topic string
addr []string
config *sarama.Config
producer sarama.SyncProducer
}
-func NewKafkaProducer(addr []string, topic string) *Producer {
- p := Producer{}
+var _ mq.Producer = (*kafkaProducer)(nil)
+
+func NewKafkaProducer(addr []string, topic string) *kafkaProducer {
+ p := kafkaProducer{}
p.config = sarama.NewConfig() //Instantiate a sarama Config
p.config.Producer.Return.Successes = true //Whether to enable the successes channel to be notified after the message is sent successfully
p.config.Producer.RequiredAcks = sarama.WaitForAll //Set producer Message Reply level 0 1 all
@@ -32,7 +35,7 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
return &p
}
-func (p *Producer) SendMessage(m proto.Message, key ...string) (int32, int64, error) {
+func (p *kafkaProducer) SendMessage(m proto.Message, key ...string) (int32, int64, error) {
kMsg := &sarama.ProducerMessage{}
kMsg.Topic = p.topic
if len(key) == 1 {
diff --git a/pkg/common/mq/producer.go b/pkg/common/mq/producer.go
new file mode 100644
index 000000000..907061e15
--- /dev/null
+++ b/pkg/common/mq/producer.go
@@ -0,0 +1,7 @@
+package mq
+
+import "github.com/golang/protobuf/proto"
+
+type Producer interface {
+ SendMessage(m proto.Message, key ...string) (int32, int64, error)
+}