@ -1,6 +1,7 @@
package msgtransfer
import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
@ -22,20 +23,24 @@ const ChannelNum = 100
type MsgChannelValue struct {
aggregationID string //maybe userID or super groupID
triggerID string
msgList [ ] * pbMsg . MsgDataToMQ
ctx context . Context
ctxMsgList [ ] * ContextMsg
lastSeq uint64
}
type TriggerChannelValue struct {
triggerID string
cMsgList [ ] * sarama . ConsumerMessage
ctx context . Context
cMsgList [ ] * sarama . ConsumerMessage
}
type Cmd2Value struct {
Cmd int
Value interface { }
}
type ContextMsg struct {
message * pbMsg . MsgDataToMQ
ctx context . Context
}
type OnlineHistoryRedisConsumerHandler struct {
historyConsumerGroup * kafka . MConsumerGroup
@ -80,38 +85,39 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
switch cmd . Cmd {
case AggregationMessages :
msgChannelValue := cmd . Value . ( MsgChannelValue )
msgList := msgChannelValue . m sgList
triggerID := msgChannelValue . triggerID
ctxMsgList := msgChannelValue . ctxM sgList
ctx := msgChannelValue . ctx
storageMsgList := make ( [ ] * pbMsg . MsgDataToMQ , 0 , 80 )
notStoragePushMsgList := make ( [ ] * pbMsg . MsgDataToMQ , 0 , 80 )
log . Debug ( triggerID , "msg arrived channel" , "channel id" , channelID , msgList , msgChannelValue . aggregationID , len ( msgList ) )
storagePushMsgList := make ( [ ] * ContextMsg , 0 , 80 )
notStoragePushMsgList := make ( [ ] * ContextMsg , 0 , 80 )
log . ZDebug ( ctx , "msg arrived channel" , "channel id" , channelID , "msgList length" , len ( ctxMsgList ) , "aggregationID" , msgChannelValue . aggregationID )
var modifyMsgList [ ] * pbMsg . MsgDataToMQ
ctx := mcontext . NewCtx ( "redis consumer" )
mcontext . SetOperationID ( ctx , triggerID )
for _ , v := range m sgList {
log . Debug( triggerID , "msg come to storage center" , v . String ( ) )
isHistory := utils . GetSwitchFromOptions ( v . MsgData. Options , constant . IsHistory )
isSenderSync := utils . GetSwitchFromOptions ( v . MsgData. Options , constant . IsSenderSync )
//ctx := mcontext.NewCtx("redis consumer")
//mcontext.SetOperationID(ctx, triggerID)
for _ , v := range ctxM sgList {
log . ZDebug( ctx , "msg come to storage center" , "message" , v . message . String ( ) )
isHistory := utils . GetSwitchFromOptions ( v . message. MsgData. Options , constant . IsHistory )
isSenderSync := utils . GetSwitchFromOptions ( v . message. MsgData. Options , constant . IsSenderSync )
if isHistory {
storageMsgList = append ( storageMsgList , v )
//log.NewWarn(triggerID, "storageMsgList to mongodb client msgID: ", v.MsgData.ClientMsgID)
storageMsgList = append ( storageMsgList , v .message )
storagePushMsgList = append ( storagePushMsgList , v )
} else {
if ! ( ! isSenderSync && msgChannelValue . aggregationID == v . MsgData. SendID ) {
if ! ( ! isSenderSync && msgChannelValue . aggregationID == v . message. MsgData. SendID ) {
notStoragePushMsgList = append ( notStoragePushMsgList , v )
}
}
if v . MsgData. ContentType == constant . ReactionMessageModifier || v . MsgData . ContentType == constant . ReactionMessageDeleter {
modifyMsgList = append ( modifyMsgList , v )
if v . message. MsgData. ContentType == constant . ReactionMessageModifier || v . message . MsgData . ContentType == constant . ReactionMessageDeleter {
modifyMsgList = append ( modifyMsgList , v .message )
}
}
if len ( modifyMsgList ) > 0 {
och . msgDatabase . MsgToModifyMQ ( ctx , msgChannelValue . aggregationID , triggerID , modifyMsgList )
och . msgDatabase . MsgToModifyMQ ( ctx , msgChannelValue . aggregationID , "" , modifyMsgList )
}
log . Debug( triggerID , "msg storage length ", len ( storageMsgList ) , "push length" , len ( notStoragePushMsgList ) )
log . ZDebug( ctx , "msg storage length" , "storageMsgList ", len ( storageMsgList ) , "push length" , len ( notStoragePushMsgList ) )
if len ( storageMsgList ) > 0 {
lastSeq , err := och . msgDatabase . BatchInsertChat2Cache ( ctx , msgChannelValue . aggregationID , storageMsgList )
if err != nil {
log . NewError( triggerID , "single data insert to redis err" , err . Error ( ) , storageMsgList )
log . ZError( ctx , "batch data insert to redis err" , err , "storageMsgList" , storageMsgList )
och . singleMsgFailedCountMutex . Lock ( )
och . singleMsgFailedCount += uint64 ( len ( storageMsgList ) )
och . singleMsgFailedCountMutex . Unlock ( )
@ -119,18 +125,20 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
och . singleMsgSuccessCountMutex . Lock ( )
och . singleMsgSuccessCount += uint64 ( len ( storageMsgList ) )
och . singleMsgSuccessCountMutex . Unlock ( )
och . msgDatabase . MsgToMongoMQ ( ctx , msgChannelValue . aggregationID , triggerID , storageMsgList , lastSeq )
for _ , v := range storage MsgList {
och . msgDatabase . MsgToPushMQ ( ctx, msgChannelValue . aggregationID , v )
och . msgDatabase . MsgToMongoMQ ( ctx , msgChannelValue . aggregationID , "" , storageMsgList , lastSeq )
for _ , v := range storage Push MsgList {
och . msgDatabase . MsgToPushMQ ( v. ctx, msgChannelValue . aggregationID , v . message )
}
for _ , v := range notStoragePushMsgList {
och . msgDatabase . MsgToPushMQ ( ctx, msgChannelValue . aggregationID , v )
och . msgDatabase . MsgToPushMQ ( v. ctx, msgChannelValue . aggregationID , v . message )
}
}
} else {
for _ , v := range notStoragePushMsgList {
och . msgDatabase . MsgToPushMQ ( ctx , msgChannelValue . aggregationID , v )
p , o , err := och . msgDatabase . MsgToPushMQ ( v . ctx , msgChannelValue . aggregationID , v . message )
if err != nil {
log . ZError ( v . ctx , "kafka send failed" , err , "msg" , v . message . String ( ) , "pid" , p , "offset" , o )
}
}
}
}
@ -140,40 +148,43 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
func ( och * OnlineHistoryRedisConsumerHandler ) MessagesDistributionHandle ( ) {
for {
aggregationMsgs := make ( map [ string ] [ ] * pbMsg. MsgDataToMQ , ChannelNum )
aggregationMsgs := make ( map [ string ] [ ] * ContextMsg , ChannelNum )
select {
case cmd := <- och . msgDistributionCh :
switch cmd . Cmd {
case ConsumerMsgs :
triggerChannelValue := cmd . Value . ( TriggerChannelValue )
triggerID := triggerChannelValue . triggerID
ctx := triggerChannelValue . ctx
consumerMessages := triggerChannelValue . cMsgList
//Aggregation map[userid]message list
log . Debug( triggerID , "batch messages come to distribution center ", len ( consumerMessages ) )
log . ZDebug( ctx , "batch messages come to distribution center ", "length ", len ( consumerMessages ) )
for i := 0 ; i < len ( consumerMessages ) ; i ++ {
ctxMsg := & ContextMsg { }
msgFromMQ := pbMsg . MsgDataToMQ { }
err := proto . Unmarshal ( consumerMessages [ i ] . Value , & msgFromMQ )
if err != nil {
log . Error( triggerID , "msg_transfer Unmarshal msg err" , "msg" , string ( consumerMessages [ i ] . Value ) , "err" , err . Error ( ) )
log . ZError( ctx , "msg_transfer Unmarshal msg err" , err , string ( consumerMessages [ i ] . Value ) )
return
}
log . Debug ( triggerID , "single msg come to distribution center" , msgFromMQ . String ( ) , string ( consumerMessages [ i ] . Key ) )
ctxMsg . ctx = kafka . GetContextWithMQHeader ( consumerMessages [ i ] . Headers )
ctxMsg . message = & msgFromMQ
log . ZDebug ( ctx , "single msg come to distribution center" , msgFromMQ . String ( ) , string ( consumerMessages [ i ] . Key ) )
if oldM , ok := aggregationMsgs [ string ( consumerMessages [ i ] . Key ) ] ; ok {
oldM = append ( oldM , & msgFromMQ )
oldM = append ( oldM , ctxMsg )
aggregationMsgs [ string ( consumerMessages [ i ] . Key ) ] = oldM
} else {
m := make ( [ ] * pbMsg. MsgDataToMQ , 0 , 100 )
m = append ( m , & msgFromMQ )
m := make ( [ ] * ContextMsg , 0 , 100 )
m = append ( m , ctxMsg )
aggregationMsgs [ string ( consumerMessages [ i ] . Key ) ] = m
}
}
log . Debug( triggerID , "generate map list users len ", len ( aggregationMsgs ) )
log . ZDebug( ctx , "generate map list users len" , "length ", len ( aggregationMsgs ) )
for aggregationID , v := range aggregationMsgs {
if len ( v ) >= 0 {
hashCode := utils . GetHashCode ( aggregationID )
channelID := hashCode % ChannelNum
log . Debug( triggerID , "generate channelID" , hashCode , channelID , aggregationID )
och . chArrays [ channelID ] <- Cmd2Value { Cmd : AggregationMessages , Value : MsgChannelValue { aggregationID : aggregationID , msgList: v , triggerID : triggerID } }
log . ZDebug( ctx , "generate channelID" , "hashCode" , hashCode , "channelID" , channelID , "aggregationID" , aggregationID )
och . chArrays [ channelID ] <- Cmd2Value { Cmd : AggregationMessages , Value : MsgChannelValue { aggregationID : aggregationID , ctxMsgList: v , ctx : ctx } }
}
}
}
@ -181,8 +192,10 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
}
}
func ( OnlineHistoryRedisConsumerHandler ) Setup ( _ sarama . ConsumerGroupSession ) error { return nil }
func ( OnlineHistoryRedisConsumerHandler ) Cleanup ( _ sarama . ConsumerGroupSession ) error { return nil }
func ( och * OnlineHistoryRedisConsumerHandler ) Setup ( _ sarama . ConsumerGroupSession ) error { return nil }
func ( och * OnlineHistoryRedisConsumerHandler ) Cleanup ( _ sarama . ConsumerGroupSession ) error {
return nil
}
func ( och * OnlineHistoryRedisConsumerHandler ) ConsumeClaim ( sess sarama . ConsumerGroupSession , claim sarama . ConsumerGroupClaim ) error { // a instance in the consumer group
for {
@ -194,10 +207,10 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG
}
}
rwLock := new ( sync . RWMutex )
log . NewDebug ( "" , "online new session msg come" , claim . HighWaterMarkOffset ( ) , claim . Topic ( ) , claim . Partition ( ) )
log . ZDebug ( context . Background ( ) , "online new session msg come" , "highWaterMarkOffset" ,
claim . HighWaterMarkOffset ( ) , "topic" , claim . Topic ( ) , "partition" , claim . Partition ( ) )
cMsg := make ( [ ] * sarama . ConsumerMessage , 0 , 1000 )
t := time . NewTicker ( time . Duration ( 100 ) * time . Millisecond )
var triggerID string
go func ( ) {
for {
select {
@ -211,18 +224,18 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG
cMsg = make ( [ ] * sarama . ConsumerMessage , 0 , 1000 )
rwLock . Unlock ( )
split := 1000
triggerID = utils . OperationIDGenerator ( )
log . Debug( triggerID , "timer trigger msg consumer start ", len ( ccMsg ) )
ctx := mcontext . WithTriggerIDContext ( context . Background ( ) , utils . OperationIDGenerator ( ) )
log . ZDebug( ctx , "timer trigger msg consumer start ", "length ", len ( ccMsg ) )
for i := 0 ; i < len ( ccMsg ) / split ; i ++ {
//log.Debug()
och . msgDistributionCh <- Cmd2Value { Cmd : ConsumerMsgs , Value : TriggerChannelValue {
triggerID: triggerID , cMsgList : ccMsg [ i * split : ( i + 1 ) * split ] } }
ctx: ctx , cMsgList : ccMsg [ i * split : ( i + 1 ) * split ] } }
}
if ( len ( ccMsg ) % split ) > 0 {
och . msgDistributionCh <- Cmd2Value { Cmd : ConsumerMsgs , Value : TriggerChannelValue {
triggerID: triggerID , cMsgList : ccMsg [ split * ( len ( ccMsg ) / split ) : ] } }
ctx: ctx , cMsgList : ccMsg [ split * ( len ( ccMsg ) / split ) : ] } }
}
log . Debug( triggerID , "timer trigger msg consumer end" , len ( cMsg) )
log . ZDebug( ctx , "timer trigger msg consumer end" , "length" , len ( c cMsg) )
}
}
}