@ -18,23 +18,11 @@ import (
"context"
"errors"
"fmt"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mw"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
"github.com/OpenIMSDK/tools/mw"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
@ -42,12 +30,22 @@ import (
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"net/http"
"os"
"os/signal"
"syscall"
)
type MsgTransfer struct {
historyCH * OnlineHistoryRedisConsumerHandler // 这个消费者聚合消息, 订阅的topic: ws2ms_chat, 修改通知发往msg_to_modify topic, 消息存入redis后Incr Redis, 再发消息到ms2pschat topic推送, 发消息到msg_to_mongo topic持久化
historyMongoCH * OnlineHistoryMongoConsumerHandler // mongoDB批量插入, 成功后删除redis中消息, 以及处理删除通知消息删除的 订阅的topic: msg_to_mongo
// modifyCH *ModifyMsgConsumerHandler // 负责消费修改消息通知的consumer, 订阅的topic: msg_to_modify
ctx context . Context
cancel context . CancelFunc
}
func StartTransfer ( prometheusPort int ) error {
@ -65,10 +63,6 @@ func StartTransfer(prometheusPort int) error {
return err
}
client , err := kdisc . NewDiscoveryRegister ( config . Config . Envs . Discovery )
/ *
client , err := openkeeper . NewClient ( config . Config . Zookeeper . ZkAddr , config . Config . Zookeeper . Schema ,
openkeeper . WithFreq ( time . Hour ) , openkeeper . WithRoundRobin ( ) , openkeeper . WithUserNameAndPassword ( config . Config . Zookeeper . Username ,
config . Config . Zookeeper . Password ) , openkeeper . WithTimeout ( 10 ) , openkeeper . WithLogger ( log . NewZkLogger ( ) ) ) * /
if err != nil {
return err
}
@ -109,27 +103,22 @@ func NewMsgTransfer(msgDatabase controller.CommonMsgDatabase, conversationRpcCli
}
func ( m * MsgTransfer ) Start ( prometheusPort int ) error {
ctx := context . Background ( )
fmt . Println ( "start msg transfer" , "prometheusPort:" , prometheusPort )
if prometheusPort <= 0 {
return errs . Wrap ( errors . New ( "prometheusPort not correct" ) )
}
m . ctx , m . cancel = context . WithCancel ( context . Background ( ) )
var wg sync . WaitGroup
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
m . historyCH . historyConsumerGroup . RegisterHandleAndConsumer ( ctx , m . historyCH )
} ( )
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
var (
netDone = make ( chan struct { } , 1 )
netErr error
)
m . historyMongoCH . historyConsumerGroup . RegisterHandleAndConsumer ( ctx , m . historyMongoCH )
} ( )
onError := func ( ctx context . Context , err error , errInfo string ) {
log . ZWarn ( ctx , errInfo , err )
}
go m . historyCH . historyConsumerGroup . RegisterHandleAndConsumer ( m . ctx , m . historyCH , onError )
go m . historyMongoCH . historyConsumerGroup . RegisterHandleAndConsumer ( m . ctx , m . historyMongoCH , onError )
if config . Config . Prometheus . Enable {
go func ( ) {
@ -141,30 +130,28 @@ func (m *MsgTransfer) Start(prometheusPort int) error {
http . Handle ( "/metrics" , promhttp . HandlerFor ( proreg , promhttp . HandlerOpts { Registry : proreg } ) )
err := http . ListenAndServe ( fmt . Sprintf ( ":%d" , prometheusPort ) , nil )
if err != nil && err != http . ErrServerClosed {
panic ( err )
netErr = errs . Wrap ( err , fmt . Sprintf ( "prometheus start err: %d" , prometheusPort ) )
netDone <- struct { } { }
}
} ( )
}
sigs := make ( chan os . Signal , 1 )
signal . Notify ( sigs , syscall . SIGINT , syscall . SIGTERM , syscall . SIGQUIT )
<- sigs
// graceful close kafka client.
go m . historyCH . historyConsumerGroup . Close ( )
go m . historyMongoCH . historyConsumerGroup . Close ( )
done := make ( chan struct { } , 1 )
go func ( ) {
wg . Wait ( )
close ( done )
} ( )
signal . Notify ( sigs , syscall . SIGTERM )
select {
case <- done :
log . ZInfo ( context . Background ( ) , "msgtrasfer exit successfully" )
case <- time . After ( 15 * time . Second ) :
log . ZError ( context . Background ( ) , "msgtransfer force to exit, timeout 15s" , nil )
case <- sigs :
util . SIGUSR1Exit ( )
// graceful close kafka client.
m . cancel ( )
m . historyCH . historyConsumerGroup . Close ( )
m . historyMongoCH . historyConsumerGroup . Close ( )
case <- netDone :
m . cancel ( )
m . historyCH . historyConsumerGroup . Close ( )
m . historyMongoCH . historyConsumerGroup . Close ( )
close ( netDone )
return netErr
}
return nil