@ -29,6 +29,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/mqbuild"
"github.com/openimsdk/open-im-server/v3/pkg/mqbuild"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/mq"
"github.com/openimsdk/tools/utils/jsonutil"
"github.com/openimsdk/tools/utils/jsonutil"
"github.com/openimsdk/tools/utils/network"
"github.com/openimsdk/tools/utils/network"
@ -52,16 +53,16 @@ import (
)
)
type MsgTransfer struct {
type MsgTransfer struct {
historyConsumer mq . Consumer
historyMongoConsumer mq . Consumer
// This consumer aggregated messages, subscribed to the topic:toRedis,
// This consumer aggregated messages, subscribed to the topic:toRedis,
// the message is stored in redis, Incr Redis, and then the message is sent to toPush topic for push,
// the message is stored in redis, Incr Redis, and then the message is sent to toPush topic for push,
// and the message is sent to toMongo topic for persistence
// and the message is sent to toMongo topic for persistence
history C H * OnlineHistoryRedisConsumerHandler
history Handler * OnlineHistoryRedisConsumerHandler
//This consumer handle message to mongo
//This consumer handle message to mongo
historyMongoCH * OnlineHistoryMongoConsumerHandler
historyMongoHandler * OnlineHistoryMongoConsumerHandler
ctx context . Context
ctx context . Context
cancel context . CancelFunc
cancel context . CancelFunc
runTimeEnv string
}
}
type Config struct {
type Config struct {
@ -75,11 +76,10 @@ type Config struct {
}
}
func Start ( ctx context . Context , index int , config * Config ) error {
func Start ( ctx context . Context , index int , config * Config ) error {
runTimeEnv := runtimeenv . PrintRuntimeEnvironment ( )
builder := mqbuild . NewBuilder ( & config . Discovery , & config . KafkaConfig )
builder := mqbuild . NewBuilder ( & config . Discovery , & config . KafkaConfig )
log . CInfo ( ctx , "MSG-TRANSFER server is initializing" , "runTimeEnv" , run TimeEnv , "prometheusPorts" ,
log . CInfo ( ctx , "MSG-TRANSFER server is initializing" , "runTimeEnv" , run timeenv. RuntimeEnvironment ( ) , "prometheusPorts" ,
config . MsgTransfer . Prometheus . Ports , "index" , index )
config . MsgTransfer . Prometheus . Ports , "index" , index )
mgocli , err := mongoutil . NewMongoDB ( ctx , config . MongodbConfig . Build ( ) )
mgocli , err := mongoutil . NewMongoDB ( ctx , config . MongodbConfig . Build ( ) )
@ -90,7 +90,7 @@ func Start(ctx context.Context, index int, config *Config) error {
if err != nil {
if err != nil {
return err
return err
}
}
client , err := discRegister . NewDiscoveryRegister ( & config . Discovery , runTimeEnv , nil )
client , err := discRegister . NewDiscoveryRegister ( & config . Discovery , nil )
if err != nil {
if err != nil {
return err
return err
}
}
@ -137,19 +137,25 @@ func Start(ctx context.Context, index int, config *Config) error {
if err != nil {
if err != nil {
return err
return err
}
}
historyCH , err := NewOnlineHistoryRedisConsumerHandler ( ctx , client , config , msgTransferDatabase )
historyConsumer , err := builder . GetTopicConsumer ( ctx , config . KafkaConfig . ToRedisTopic )
if err != nil {
return err
}
historyMongoConsumer , err := builder . GetTopicConsumer ( ctx , config . KafkaConfig . ToMongoTopic )
if err != nil {
if err != nil {
return err
return err
}
}
historyMongoCH , err := NewOnlineHistoryMongoConsumerHandler ( & config . KafkaConfig , msgTransferDatabase )
history Handler , err := NewOnlineHistory RedisConsumerHandler( ctx , client , c onfig, msgTransferDatabase )
if err != nil {
if err != nil {
return err
return err
}
}
historyMongoHandler := NewOnlineHistoryMongoConsumerHandler ( msgTransferDatabase )
msgTransfer := & MsgTransfer {
msgTransfer := & MsgTransfer {
historyCH : historyCH ,
historyConsumer : historyConsumer ,
historyMongoCH : historyMongoCH ,
historyMongoConsumer : historyMongoConsumer ,
runTimeEnv : runTimeEnv ,
historyHandler : historyHandler ,
historyMongoHandler : historyMongoHandler ,
}
}
return msgTransfer . Start ( index , config , client )
return msgTransfer . Start ( index , config , client )
@ -162,10 +168,30 @@ func (m *MsgTransfer) Start(index int, config *Config, client discovery.SvcDisco
netErr error
netErr error
)
)
go m . historyCH . historyConsumerGroup . RegisterHandleAndConsumer ( m . ctx , m . historyCH )
go func ( ) {
go m . historyMongoCH . historyConsumerGroup . RegisterHandleAndConsumer ( m . ctx , m . historyMongoCH )
for {
go m . historyCH . HandleUserHasReadSeqMessages ( m . ctx )
if err := m . historyConsumer . Subscribe ( m . ctx , m . historyHandler . HandlerRedisMessage ) ; err != nil {
err := m . historyCH . redisMessageBatches . Start ( )
log . ZError ( m . ctx , "historyConsumer err" , err )
return
}
}
} ( )
go func ( ) {
fn := func ( ctx context . Context , key string , value [ ] byte ) error {
m . historyMongoHandler . HandleChatWs2Mongo ( ctx , key , value )
return nil
}
for {
if err := m . historyMongoConsumer . Subscribe ( m . ctx , fn ) ; err != nil {
log . ZError ( m . ctx , "historyMongoConsumer err" , err )
return
}
}
} ( )
go m . historyHandler . HandleUserHasReadSeqMessages ( m . ctx )
err := m . historyHandler . redisMessageBatches . Start ( )
if err != nil {
if err != nil {
return err
return err
}
}
@ -237,18 +263,18 @@ func (m *MsgTransfer) Start(index int, config *Config, client discovery.SvcDisco
case <- sigs :
case <- sigs :
program . SIGTERMExit ( )
program . SIGTERMExit ( )
// graceful close kafka client.
// graceful close kafka client.
_ = m . historyConsumer . Close ( )
_ = m . historyMongoConsumer . Close ( )
m . cancel ( )
m . cancel ( )
m . historyCH . redisMessageBatches . Close ( )
m . historyHandler . redisMessageBatches . Close ( )
m . historyCH . Close ( )
m . historyHandler . Close ( )
m . historyCH . historyConsumerGroup . Close ( )
m . historyMongoCH . historyConsumerGroup . Close ( )
return nil
return nil
case <- netDone :
case <- netDone :
_ = m . historyConsumer . Close ( )
_ = m . historyMongoConsumer . Close ( )
m . cancel ( )
m . cancel ( )
m . historyCH . redisMessageBatches . Close ( )
m . historyHandler . redisMessageBatches . Close ( )
m . historyCH . Close ( )
m . historyHandler . Close ( )
m . historyCH . historyConsumerGroup . Close ( )
m . historyMongoCH . historyConsumerGroup . Close ( )
close ( netDone )
close ( netDone )
return netErr
return netErr
}
}