|
|
@ -242,7 +242,7 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG
|
|
|
|
|
|
|
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) sendMessageToPushMQ(ctx context.Context, message *pbMsg.MsgDataToMQ, pushToUserID string) {
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) sendMessageToPushMQ(ctx context.Context, message *pbMsg.MsgDataToMQ, pushToUserID string) {
|
|
|
|
mqPushMsg := pbMsg.PushMsgDataToMQ{MsgData: message.MsgData, SourceID: pushToUserID}
|
|
|
|
mqPushMsg := pbMsg.PushMsgDataToMQ{MsgData: message.MsgData, SourceID: pushToUserID}
|
|
|
|
pid, offset, err := och.producerToPush.SendMessage(ctx, &mqPushMsg, mqPushMsg.SourceID)
|
|
|
|
pid, offset, err := och.producerToPush.SendMessage(ctx, mqPushMsg.SourceID, &mqPushMsg)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
log.Error(tracelog.GetOperationID(ctx), "kafka send failed", "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error())
|
|
|
|
log.Error(tracelog.GetOperationID(ctx), "kafka send failed", "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error())
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -251,7 +251,7 @@ func (och *OnlineHistoryRedisConsumerHandler) sendMessageToPushMQ(ctx context.Co
|
|
|
|
|
|
|
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) sendMessageToModifyMQ(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ) {
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) sendMessageToModifyMQ(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ) {
|
|
|
|
if len(messages) > 0 {
|
|
|
|
if len(messages) > 0 {
|
|
|
|
pid, offset, err := och.producerToModify.SendMessage(ctx, &pbMsg.MsgDataToModifyByMQ{AggregationID: aggregationID, Messages: messages, TriggerID: triggerID}, aggregationID)
|
|
|
|
pid, offset, err := och.producerToModify.SendMessage(ctx, aggregationID, &pbMsg.MsgDataToModifyByMQ{AggregationID: aggregationID, Messages: messages, TriggerID: triggerID})
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
log.Error(triggerID, "kafka send failed", "send data", len(messages), "pid", pid, "offset", offset, "err", err.Error(), "key", aggregationID)
|
|
|
|
log.Error(triggerID, "kafka send failed", "send data", len(messages), "pid", pid, "offset", offset, "err", err.Error(), "key", aggregationID)
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -260,7 +260,7 @@ func (och *OnlineHistoryRedisConsumerHandler) sendMessageToModifyMQ(ctx context.
|
|
|
|
|
|
|
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) SendMessageToMongoCH(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) {
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) SendMessageToMongoCH(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) {
|
|
|
|
if len(messages) > 0 {
|
|
|
|
if len(messages) > 0 {
|
|
|
|
pid, offset, err := och.producerToMongo.SendMessage(ctx, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, Messages: messages, TriggerID: triggerID}, aggregationID)
|
|
|
|
pid, offset, err := och.producerToMongo.SendMessage(ctx, aggregationID, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, Messages: messages, TriggerID: triggerID})
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
log.Error(triggerID, "kafka send failed", "send data", len(messages), "pid", pid, "offset", offset, "err", err.Error(), "key", aggregationID)
|
|
|
|
log.Error(triggerID, "kafka send failed", "send data", len(messages), "pid", pid, "offset", offset, "err", err.Error(), "key", aggregationID)
|
|
|
|
}
|
|
|
|
}
|
|
|
|