pull/232/head
Gordon 3 years ago
parent 31e0a8a88b
commit b864f9c6a9

@ -15,7 +15,7 @@ const OnlineTopicVacancy = 0
const Msg = 2 const Msg = 2
const ConsumerMsgs = 3 const ConsumerMsgs = 3
const UserMessages = 4 const UserMessages = 4
const ChannelNum = 11 const ChannelNum = 100
var ( var (
persistentCH PersistentConsumerHandler persistentCH PersistentConsumerHandler
@ -52,7 +52,7 @@ func Run() {
fmt.Println("not start mysql consumer") fmt.Println("not start mysql consumer")
} }
go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH) go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH)
go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH) //go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH)
} }
func SetOnlineTopicStatus(status int) { func SetOnlineTopicStatus(status int) {
w.Lock() w.Lock()

@ -377,7 +377,7 @@ func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string, status str
} }
return err return err
case constant.OfflineStatus: case constant.OfflineStatus:
pid, offset, err := rpc.offlineProducer.SendMessage(m, key) pid, offset, err := rpc.onlineProducer.SendMessage(m, key)
if err != nil { if err != nil {
log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status) log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status)
} }

Loading…
Cancel
Save