@ -8,8 +8,6 @@ package kafka
import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
"github.com/Shopify/sarama"
)
@ -42,16 +40,9 @@ func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []str
}
}
func ( mc * MConsumerGroup ) GetContextFromMsg ( cMsg * sarama . ConsumerMessage , rootFuncName string ) context . Context {
ctx := mcontext . NewCtx ( rootFuncName )
var operationID string
for _ , v := range cMsg . Headers {
if string ( v . Key ) == constant . OperationID {
operationID = string ( v . Value )
}
}
mcontext . SetOperationID ( ctx , operationID )
return ctx
func ( mc * MConsumerGroup ) GetContextFromMsg ( cMsg * sarama . ConsumerMessage ) context . Context {
return GetContextWithMQHeader ( cMsg . Headers )
}
func ( mc * MConsumerGroup ) RegisterHandleAndConsumer ( handler sarama . ConsumerGroupHandler ) {