|
|
|
@ -39,6 +39,7 @@ type OnlineHistoryConsumerHandler struct {
|
|
|
|
|
cmdCh chan Cmd2Value
|
|
|
|
|
msgCh chan Cmd2Value
|
|
|
|
|
chArrays [ChannelNum]chan Cmd2Value
|
|
|
|
|
chMongoArrays [ChannelNum]chan Cmd2Value
|
|
|
|
|
msgDistributionCh chan Cmd2Value
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -52,6 +53,10 @@ func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) {
|
|
|
|
|
och.chArrays[i] = make(chan Cmd2Value, 1000)
|
|
|
|
|
go och.Run(i)
|
|
|
|
|
}
|
|
|
|
|
for i := 0; i < ChannelNum; i++ {
|
|
|
|
|
och.chMongoArrays[i] = make(chan Cmd2Value, 1000)
|
|
|
|
|
go och.MongoMessageRun(i)
|
|
|
|
|
}
|
|
|
|
|
if config.Config.ReliableStorage {
|
|
|
|
|
och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2Mongo
|
|
|
|
|
} else {
|
|
|
|
@ -146,6 +151,28 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(userID string, triggerID string, messages []*pbMsg.MsgDataToMQ) {
|
|
|
|
|
hashCode := getHashCode(userID)
|
|
|
|
|
channelID := hashCode % ChannelNum
|
|
|
|
|
log.Debug(triggerID, "generate channelID", hashCode, channelID, userID)
|
|
|
|
|
//go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) {
|
|
|
|
|
och.chMongoArrays[channelID] <- Cmd2Value{Cmd: MongoMessages, Value: MsgChannelValue{userID: userID, msgList: messages, triggerID: triggerID}}
|
|
|
|
|
}
|
|
|
|
|
func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) {
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case cmd := <-och.chArrays[channelID]:
|
|
|
|
|
switch cmd.Cmd {
|
|
|
|
|
|
|
|
|
|
case MongoMessages:
|
|
|
|
|
msgChannelValue := cmd.Value.(MsgChannelValue)
|
|
|
|
|
msgList := msgChannelValue.msgList
|
|
|
|
|
triggerID := msgChannelValue.triggerID
|
|
|
|
|
userID := msgChannelValue.userID
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) {
|
|
|
|
|
// msg := cMsg.Value
|
|
|
|
@ -216,7 +243,6 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
|
|
|
|
|
// sess.MarkMessage(cMsg, "")
|
|
|
|
|
// log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String())
|
|
|
|
|
//}
|
|
|
|
|
|
|
|
|
|
func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() {
|
|
|
|
|
for {
|
|
|
|
|
UserAggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum)
|
|
|
|
|