msg transfer handle change

pull/232/head
Gordon 3 years ago
parent 29dbb36961
commit 903c39206f

@ -55,8 +55,8 @@ kafka:
addr: [ 127.0.0.1:9092 ] #kafka配置默认即可
topic: "ms2ps_chat"
consumergroupid:
msgToRedis: redis
msgToMongo: mongo
msgToMongoOffline: mongo_offline
msgToMySql: mysql
msgToPush: push

@ -4,7 +4,6 @@ import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/kafka"
"Open_IM/pkg/common/log"
"Open_IM/pkg/statistics"
"fmt"
"sync"
@ -20,7 +19,8 @@ const ChannelNum = 100
var (
persistentCH PersistentConsumerHandler
historyCH OnlineHistoryConsumerHandler
historyCH OnlineHistoryRedisConsumerHandler
historyMongoCH OnlineHistoryMongoConsumerHandler
producer *kafka.Producer
producerToMongo *kafka.Producer
cmdCh chan Cmd2Value
@ -39,7 +39,6 @@ func Init() {
persistentCH.Init()
historyCH.Init(cmdCh)
onlineTopicStatus = OnlineTopicVacancy
log.Debug("come msg transfer ts", config.Config.Kafka.ConsumerGroupID.MsgToMongoOffline, config.Config.Kafka.Ws2mschatOffline.Topic)
//offlineHistoryCH.Init(cmdCh)
statistics.NewStatistics(&singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
statistics.NewStatistics(&groupMsgCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second groupMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
@ -54,6 +53,7 @@ func Run() {
fmt.Println("not start mysql consumer")
}
go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH)
go historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyMongoCH)
//go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH)
}
func SetOnlineTopicStatus(status int) {

@ -9,10 +9,8 @@ import (
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbMsg "Open_IM/pkg/proto/chat"
pbPush "Open_IM/pkg/proto/push"
server_api_params "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"context"
"errors"
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
"hash/crc32"
@ -36,28 +34,21 @@ type Cmd2Value struct {
Cmd int
Value interface{}
}
type OnlineHistoryConsumerHandler struct {
type OnlineHistoryRedisConsumerHandler struct {
msgHandle map[string]fcb
historyConsumerGroup *kfk.MConsumerGroup
cmdCh chan Cmd2Value
chArrays [ChannelNum]chan Cmd2Value
chMongoArrays [ChannelNum]chan Cmd2Value
msgDistributionCh chan Cmd2Value
}
func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) {
func (och *OnlineHistoryRedisConsumerHandler) Init(cmdCh chan Cmd2Value) {
och.msgHandle = make(map[string]fcb)
och.msgDistributionCh = make(chan Cmd2Value) //no buffer channel
go och.MessagesDistributionHandle()
och.cmdCh = cmdCh
for i := 0; i < ChannelNum; i++ {
och.chArrays[i] = make(chan Cmd2Value, 50)
go och.Run(i)
}
for i := 0; i < ChannelNum; i++ {
och.chMongoArrays[i] = make(chan Cmd2Value, 10000)
go och.MongoMessageRun(i)
}
if config.Config.ReliableStorage {
och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2Mongo
} else {
@ -66,34 +57,10 @@ func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) {
}
och.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo)
}
func (och *OnlineHistoryConsumerHandler) TriggerCmd(status int) {
operationID := utils.OperationIDGenerator()
err := sendCmd(och.cmdCh, Cmd2Value{Cmd: status, Value: ""}, 1)
if err != nil {
log.Error(operationID, "TriggerCmd failed ", err.Error(), status)
return
}
log.Debug(operationID, "TriggerCmd success", status)
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis)
}
func sendCmd(ch chan Cmd2Value, value Cmd2Value, timeout int64) error {
var flag = 0
select {
case ch <- value:
flag = 1
case <-time.After(time.Second * time.Duration(timeout)):
flag = 2
}
if flag == 1 {
return nil
} else {
return errors.New("send cmd timeout")
}
}
func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
for {
select {
case cmd := <-och.chArrays[channelID]:
@ -154,7 +121,7 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
}
}
}
func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) {
func (och *OnlineHistoryRedisConsumerHandler) SendMessageToMongoCH(aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) {
pid, offset, err := producerToMongo.SendMessage(&pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, MessageList: messages, TriggerID: triggerID}, aggregationID, triggerID)
if err != nil {
log.Error(triggerID, "kafka send failed", "send data", len(messages), "pid", pid, "offset", offset, "err", err.Error(), "key", aggregationID)
@ -167,47 +134,48 @@ func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(aggregationID stri
////go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) {
//och.chMongoArrays[channelID] <- Cmd2Value{Cmd: MongoMessages, Value: MsgChannelValue{aggregationID: aggregationID, msgList: messages, triggerID: triggerID, lastSeq: lastSeq}}
}
func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) {
for {
select {
case cmd := <-och.chMongoArrays[channelID]:
switch cmd.Cmd {
case MongoMessages:
msgChannelValue := cmd.Value.(MsgChannelValue)
msgList := msgChannelValue.msgList
triggerID := msgChannelValue.triggerID
aggregationID := msgChannelValue.aggregationID
lastSeq := msgChannelValue.lastSeq
err := db.DB.BatchInsertChat2DB(aggregationID, msgList, triggerID, lastSeq)
if err != nil {
log.NewError(triggerID, "single data insert to mongo err", err.Error(), msgList)
}
for _, v := range msgList {
if v.MsgData.ContentType == constant.DeleteMessageNotification {
tips := server_api_params.TipsComm{}
DeleteMessageTips := server_api_params.DeleteMessageTips{}
err := proto.Unmarshal(v.MsgData.Content, &tips)
if err != nil {
log.NewError(triggerID, "tips unmarshal err:", err.Error(), v.String())
continue
}
err = proto.Unmarshal(tips.Detail, &DeleteMessageTips)
if err != nil {
log.NewError(triggerID, "deleteMessageTips unmarshal err:", err.Error(), v.String())
continue
}
if unexistSeqList, err := db.DB.DelMsgBySeqList(DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID); err != nil {
log.NewError(v.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqList args: ", DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID, err.Error(), unexistSeqList)
}
}
}
}
}
}
}
//func (och *OnlineHistoryRedisConsumerHandler) MongoMessageRun(channelID int) {
// for {
// select {
// case cmd := <-och.chMongoArrays[channelID]:
// switch cmd.Cmd {
// case MongoMessages:
// msgChannelValue := cmd.Value.(MsgChannelValue)
// msgList := msgChannelValue.msgList
// triggerID := msgChannelValue.triggerID
// aggregationID := msgChannelValue.aggregationID
// lastSeq := msgChannelValue.lastSeq
// err := db.DB.BatchInsertChat2DB(aggregationID, msgList, triggerID, lastSeq)
// if err != nil {
// log.NewError(triggerID, "single data insert to mongo err", err.Error(), msgList)
// }
// for _, v := range msgList {
// if v.MsgData.ContentType == constant.DeleteMessageNotification {
// tips := server_api_params.TipsComm{}
// DeleteMessageTips := server_api_params.DeleteMessageTips{}
// err := proto.Unmarshal(v.MsgData.Content, &tips)
// if err != nil {
// log.NewError(triggerID, "tips unmarshal err:", err.Error(), v.String())
// continue
// }
// err = proto.Unmarshal(tips.Detail, &DeleteMessageTips)
// if err != nil {
// log.NewError(triggerID, "deleteMessageTips unmarshal err:", err.Error(), v.String())
// continue
// }
// if unexistSeqList, err := db.DB.DelMsgBySeqList(DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID); err != nil {
// log.NewError(v.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqList args: ", DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID, err.Error(), unexistSeqList)
// }
//
// }
// }
// }
// }
// }
//}
func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() {
func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
for {
aggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum)
select {
@ -253,7 +221,7 @@ func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() {
}
}
func (mc *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) {
func (mc *OnlineHistoryRedisConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) {
msg := cMsg.Value
now := time.Now()
msgFromMQ := pbMsg.MsgDataToMQ{}
@ -325,7 +293,7 @@ func (mc *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Consumer
log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String())
}
func (och *OnlineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) {
func (och *OnlineHistoryRedisConsumerHandler) handleChatWs2MongoLowReliability(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) {
msg := cMsg.Value
msgFromMQ := pbMsg.MsgDataToMQ{}
err := proto.Unmarshal(msg, &msgFromMQ)
@ -365,10 +333,10 @@ func (och *OnlineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg *
}
}
func (OnlineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (OnlineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (OnlineHistoryRedisConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (OnlineHistoryRedisConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
//func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
//func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
// claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
// log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition())
// for msg := range claim.Messages() {
@ -385,7 +353,7 @@ func (OnlineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error
// return nil
//}
func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
for {
@ -480,7 +448,7 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS
return nil
}
//func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
//func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
// claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
//
// for {

@ -0,0 +1,79 @@
package logic
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
kfk "Open_IM/pkg/common/kafka"
"Open_IM/pkg/common/log"
pbMsg "Open_IM/pkg/proto/chat"
server_api_params "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
)
type OnlineHistoryMongoConsumerHandler struct {
msgHandle map[string]fcb
historyConsumerGroup *kfk.MConsumerGroup
}
func (och *OnlineHistoryMongoConsumerHandler) Init(cmdCh chan Cmd2Value) {
och.msgHandle = make(map[string]fcb)
och.msgHandle[config.Config.Kafka.MsgToMongo.Topic] = och.handleChatWs2Mongo
och.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToMongo.Topic},
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo)
}
func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) {
msg := cMsg.Value
msgFromMQ := pbMsg.MsgDataToMongoByMQ{}
err := proto.Unmarshal(msg, &msgFromMQ)
if err != nil {
log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error())
return
}
err = db.DB.BatchInsertChat2DB(msgFromMQ.AggregationID, msgFromMQ.MessageList, msgFromMQ.TriggerID, msgFromMQ.LastSeq)
if err != nil {
log.NewError(msgFromMQ.TriggerID, "single data insert to mongo err", err.Error(), msgFromMQ.MessageList)
}
for _, v := range msgFromMQ.MessageList {
if v.MsgData.ContentType == constant.DeleteMessageNotification {
tips := server_api_params.TipsComm{}
DeleteMessageTips := server_api_params.DeleteMessageTips{}
err := proto.Unmarshal(v.MsgData.Content, &tips)
if err != nil {
log.NewError(msgFromMQ.TriggerID, "tips unmarshal err:", err.Error(), v.String())
continue
}
err = proto.Unmarshal(tips.Detail, &DeleteMessageTips)
if err != nil {
log.NewError(msgFromMQ.TriggerID, "deleteMessageTips unmarshal err:", err.Error(), v.String())
continue
}
if unexistSeqList, err := db.DB.DelMsgBySeqList(DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID); err != nil {
log.NewError(v.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqList args: ", DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID, err.Error(), unexistSeqList)
}
}
}
}
func (OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (OnlineHistoryMongoConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (och *OnlineHistoryMongoConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition())
for msg := range claim.Messages() {
log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key))
if len(msg.Value) != 0 {
och.msgHandle[msg.Topic](msg, string(msg.Key), sess)
} else {
log.Error("", "mongo msg get from kafka but is nil", msg.Key)
}
sess.MarkMessage(msg, "")
}
return nil
}

@ -213,8 +213,8 @@ type config struct {
Topic string `yaml:"topic"`
}
ConsumerGroupID struct {
MsgToRedis string `yaml:"msgToRedis"`
MsgToMongo string `yaml:"msgToMongo"`
MsgToMongoOffline string `yaml:"msgToMongoOffline"`
MsgToMySql string `yaml:"msgToMySql"`
MsgToPush string `yaml:"msgToPush"`
}

Loading…
Cancel
Save