Merge remote-tracking branch 'origin/tuoyun' into tuoyun

pull/232/head
skiffer-git 3 years ago
commit 024f454702

@ -15,7 +15,7 @@ const OnlineTopicVacancy = 0
const Msg = 2 const Msg = 2
const ConsumerMsgs = 3 const ConsumerMsgs = 3
const UserMessages = 4 const UserMessages = 4
const ChannelNum = 11 const ChannelNum = 100
var ( var (
persistentCH PersistentConsumerHandler persistentCH PersistentConsumerHandler
@ -52,7 +52,7 @@ func Run() {
fmt.Println("not start mysql consumer") fmt.Println("not start mysql consumer")
} }
go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH) go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH)
go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH) //go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH)
} }
func SetOnlineTopicStatus(status int) { func SetOnlineTopicStatus(status int) {
w.Lock() w.Lock()

@ -380,7 +380,7 @@ func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string, status str
} }
return err return err
case constant.OfflineStatus: case constant.OfflineStatus:
pid, offset, err := rpc.offlineProducer.SendMessage(m, key) pid, offset, err := rpc.onlineProducer.SendMessage(m, key)
if err != nil { if err != nil {
log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status) log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status)
} }

Loading…
Cancel
Save