You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Open-IM-Server/internal/msg_transfer/logic/init.go

70 lines
2.4 KiB

package logic
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/kafka"
"Open_IM/pkg/statistics"
"fmt"
"sync"
)
3 years ago
const OnlineTopicBusy = 1
const OnlineTopicVacancy = 0
const Msg = 2
const ConsumerMsgs = 3
3 years ago
const AggregationMessages = 4
const MongoMessages = 5
3 years ago
const ChannelNum = 100
var (
persistentCH PersistentConsumerHandler
historyCH OnlineHistoryRedisConsumerHandler
historyMongoCH OnlineHistoryMongoConsumerHandler
producer *kafka.Producer
2 years ago
producerToMongo *kafka.Producer
cmdCh chan Cmd2Value
3 years ago
onlineTopicStatus int
w *sync.Mutex
singleMsgSuccessCount uint64
groupMsgCount uint64
singleMsgFailedCount uint64
3 years ago
singleMsgSuccessCountMutex sync.Mutex
)
func Init() {
cmdCh = make(chan Cmd2Value, 10000)
w = new(sync.Mutex)
2 years ago
persistentCH.Init() // 订阅ws2mschat 消费到 mysql
historyCH.Init(cmdCh) // 订阅ws2mschat 如果可靠性存储 消费到 incrseq 再存入mongo 再push || 非可靠性 直接incr再push 初始化ws2mschat
historyMongoCH.Init()
3 years ago
onlineTopicStatus = OnlineTopicVacancy
3 years ago
//offlineHistoryCH.Init(cmdCh)
statistics.NewStatistics(&singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
statistics.NewStatistics(&groupMsgCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second groupMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
producer = kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic)
2 years ago
producerToMongo = kafka.NewKafkaProducer(config.Config.Kafka.MsgToMongo.Addr, config.Config.Kafka.MsgToMongo.Topic)
}
func Run() {
//register mysqlConsumerHandler to
if config.Config.ChatPersistenceMysql {
go persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(&persistentCH)
} else {
fmt.Println("not start mysql consumer")
}
go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH)
go historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyMongoCH)
3 years ago
//go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH)
}
3 years ago
func SetOnlineTopicStatus(status int) {
w.Lock()
defer w.Unlock()
onlineTopicStatus = status
}
3 years ago
func GetOnlineTopicStatus() int {
w.Lock()
defer w.Unlock()
return onlineTopicStatus
}