|
|
@ -2,7 +2,10 @@ package controller
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"fmt"
|
|
|
|
|
|
|
|
relation2 "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation"
|
|
|
|
|
|
|
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
|
|
|
|
|
|
"gorm.io/gorm"
|
|
|
|
"time"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
|
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
|
|
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
|
|
@ -95,12 +98,14 @@ type CommonMsgDatabase interface {
|
|
|
|
RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*unRelationTb.UserCount, dateCount map[string]int64, err error)
|
|
|
|
RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*unRelationTb.UserCount, dateCount map[string]int64, err error)
|
|
|
|
RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*unRelationTb.GroupCount, dateCount map[string]int64, err error)
|
|
|
|
RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*unRelationTb.GroupCount, dateCount map[string]int64, err error)
|
|
|
|
SearchMessage(ctx context.Context, req *pbMsg.SearchMessageReq) (msgData []*sdkws.MsgData, err error)
|
|
|
|
SearchMessage(ctx context.Context, req *pbMsg.SearchMessageReq) (msgData []*sdkws.MsgData, err error)
|
|
|
|
|
|
|
|
GetChatLog(ctx context.Context, req *pbMsg.GetChatLogsReq, number int32, number2 int32, int32s []int32) (int64, []relation.ChatLogModel, error)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func NewCommonMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.MsgModel) CommonMsgDatabase {
|
|
|
|
func NewCommonMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.MsgModel, msgMyqModel relation.ChatLogModelInterface) CommonMsgDatabase {
|
|
|
|
return &commonMsgDatabase{
|
|
|
|
return &commonMsgDatabase{
|
|
|
|
msgDocDatabase: msgDocModel,
|
|
|
|
msgDocDatabase: msgDocModel,
|
|
|
|
cache: cacheModel,
|
|
|
|
cache: cacheModel,
|
|
|
|
|
|
|
|
msgMyq: msgMyqModel,
|
|
|
|
producer: kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.LatestMsgToRedis.Topic),
|
|
|
|
producer: kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.LatestMsgToRedis.Topic),
|
|
|
|
producerToMongo: kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.MsgToMongo.Topic),
|
|
|
|
producerToMongo: kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.MsgToMongo.Topic),
|
|
|
|
producerToPush: kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.MsgToPush.Topic),
|
|
|
|
producerToPush: kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.MsgToPush.Topic),
|
|
|
@ -108,10 +113,11 @@ func NewCommonMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheMo
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database) CommonMsgDatabase {
|
|
|
|
func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database, dbGrom *gorm.DB) CommonMsgDatabase {
|
|
|
|
cacheModel := cache.NewMsgCacheModel(rdb)
|
|
|
|
cacheModel := cache.NewMsgCacheModel(rdb)
|
|
|
|
msgDocModel := unrelation.NewMsgMongoDriver(database)
|
|
|
|
msgDocModel := unrelation.NewMsgMongoDriver(database)
|
|
|
|
CommonMsgDatabase := NewCommonMsgDatabase(msgDocModel, cacheModel)
|
|
|
|
msgMyqModel := relation2.NewChatLogGorm(dbGrom)
|
|
|
|
|
|
|
|
CommonMsgDatabase := NewCommonMsgDatabase(msgDocModel, cacheModel, msgMyqModel)
|
|
|
|
return CommonMsgDatabase
|
|
|
|
return CommonMsgDatabase
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -120,6 +126,7 @@ type commonMsgDatabase struct {
|
|
|
|
extendMsgDatabase unRelationTb.ExtendMsgSetModelInterface
|
|
|
|
extendMsgDatabase unRelationTb.ExtendMsgSetModelInterface
|
|
|
|
extendMsgSetModel unRelationTb.ExtendMsgSetModel
|
|
|
|
extendMsgSetModel unRelationTb.ExtendMsgSetModel
|
|
|
|
msg unRelationTb.MsgDocModel
|
|
|
|
msg unRelationTb.MsgDocModel
|
|
|
|
|
|
|
|
msgMyq relation.ChatLogModelInterface
|
|
|
|
cache cache.MsgModel
|
|
|
|
cache cache.MsgModel
|
|
|
|
producer *kafka.Producer
|
|
|
|
producer *kafka.Producer
|
|
|
|
producerToMongo *kafka.Producer
|
|
|
|
producerToMongo *kafka.Producer
|
|
|
@ -127,6 +134,22 @@ type commonMsgDatabase struct {
|
|
|
|
producerToPush *kafka.Producer
|
|
|
|
producerToPush *kafka.Producer
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (db *commonMsgDatabase) GetChatLog(ctx context.Context, req *pbMsg.GetChatLogsReq, pageNumber int32, showNumber int32, int32s []int32) (int64, []relation.ChatLogModel, error) {
|
|
|
|
|
|
|
|
chatLog := &relation.ChatLogModel{
|
|
|
|
|
|
|
|
ContentType: req.ContentType,
|
|
|
|
|
|
|
|
RecvID: req.RecvID,
|
|
|
|
|
|
|
|
SendID: req.SendID,
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
if req.SendTime != "" {
|
|
|
|
|
|
|
|
sendTime, err := utils.TimeStringToTime(req.SendTime)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
return 0, nil, err
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
chatLog.SendTime = sendTime
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return db.msgMyq.GetChatLog(ctx, chatLog, pageNumber, showNumber, int32s)
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbMsg.SearchMessageReq) (msgData []*sdkws.MsgData, err error) {
|
|
|
|
func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbMsg.SearchMessageReq) (msgData []*sdkws.MsgData, err error) {
|
|
|
|
var totalMsgs []*sdkws.MsgData
|
|
|
|
var totalMsgs []*sdkws.MsgData
|
|
|
|
msgs, err := db.msgDocDatabase.SearchMessage(ctx, req)
|
|
|
|
msgs, err := db.msgDocDatabase.SearchMessage(ctx, req)
|
|
|
|