|
|
@ -15,6 +15,9 @@
|
|
|
|
package controller
|
|
|
|
package controller
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
import (
|
|
|
|
|
|
|
|
relation2 "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation"
|
|
|
|
|
|
|
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
|
|
|
|
|
|
|
|
"gorm.io/gorm"
|
|
|
|
"time"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
|
@ -92,6 +95,7 @@ type CommonMsgDatabase interface {
|
|
|
|
GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
|
|
|
|
GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
|
|
|
|
SetSendMsgStatus(ctx context.Context, id string, status int32) error
|
|
|
|
SetSendMsgStatus(ctx context.Context, id string, status int32) error
|
|
|
|
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
|
|
|
|
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
|
|
|
|
|
|
|
|
SearchMessage(ctx context.Context, req *pbMsg.SearchMessageReq) (msgData []*sdkws.MsgData, err error)
|
|
|
|
|
|
|
|
|
|
|
|
// to mq
|
|
|
|
// to mq
|
|
|
|
MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error
|
|
|
|
MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error
|
|
|
@ -103,8 +107,9 @@ type CommonMsgDatabase interface {
|
|
|
|
RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*unRelationTb.GroupCount, dateCount map[string]int64, err error)
|
|
|
|
RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*unRelationTb.GroupCount, dateCount map[string]int64, err error)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func NewCommonMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.MsgModel) CommonMsgDatabase {
|
|
|
|
func NewCommonMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.MsgModel, msgMyqModel relation.ChatLogModelInterface) CommonMsgDatabase {
|
|
|
|
return &commonMsgDatabase{
|
|
|
|
return &commonMsgDatabase{
|
|
|
|
|
|
|
|
msgMyq: msgMyqModel,
|
|
|
|
msgDocDatabase: msgDocModel,
|
|
|
|
msgDocDatabase: msgDocModel,
|
|
|
|
cache: cacheModel,
|
|
|
|
cache: cacheModel,
|
|
|
|
producer: kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.LatestMsgToRedis.Topic),
|
|
|
|
producer: kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.LatestMsgToRedis.Topic),
|
|
|
@ -113,16 +118,18 @@ func NewCommonMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheMo
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database) CommonMsgDatabase {
|
|
|
|
func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database, dbGrom *gorm.DB) CommonMsgDatabase {
|
|
|
|
cacheModel := cache.NewMsgCacheModel(rdb)
|
|
|
|
cacheModel := cache.NewMsgCacheModel(rdb)
|
|
|
|
msgDocModel := unrelation.NewMsgMongoDriver(database)
|
|
|
|
msgDocModel := unrelation.NewMsgMongoDriver(database)
|
|
|
|
CommonMsgDatabase := NewCommonMsgDatabase(msgDocModel, cacheModel)
|
|
|
|
msgMyqModel := relation2.NewChatLogGorm(dbGrom)
|
|
|
|
|
|
|
|
CommonMsgDatabase := NewCommonMsgDatabase(msgDocModel, cacheModel, msgMyqModel)
|
|
|
|
return CommonMsgDatabase
|
|
|
|
return CommonMsgDatabase
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
type commonMsgDatabase struct {
|
|
|
|
type commonMsgDatabase struct {
|
|
|
|
msgDocDatabase unRelationTb.MsgDocModelInterface
|
|
|
|
msgDocDatabase unRelationTb.MsgDocModelInterface
|
|
|
|
msg unRelationTb.MsgDocModel
|
|
|
|
msg unRelationTb.MsgDocModel
|
|
|
|
|
|
|
|
msgMyq relation.ChatLogModelInterface
|
|
|
|
cache cache.MsgModel
|
|
|
|
cache cache.MsgModel
|
|
|
|
producer *kafka.Producer
|
|
|
|
producer *kafka.Producer
|
|
|
|
producerToMongo *kafka.Producer
|
|
|
|
producerToMongo *kafka.Producer
|
|
|
@ -884,3 +891,15 @@ func (db *commonMsgDatabase) RangeUserSendCount(ctx context.Context, start time.
|
|
|
|
func (db *commonMsgDatabase) RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*unRelationTb.GroupCount, dateCount map[string]int64, err error) {
|
|
|
|
func (db *commonMsgDatabase) RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*unRelationTb.GroupCount, dateCount map[string]int64, err error) {
|
|
|
|
return db.msgDocDatabase.RangeGroupSendCount(ctx, start, end, ase, pageNumber, showNumber)
|
|
|
|
return db.msgDocDatabase.RangeGroupSendCount(ctx, start, end, ase, pageNumber, showNumber)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbMsg.SearchMessageReq) (msgData []*sdkws.MsgData, err error) {
|
|
|
|
|
|
|
|
var totalMsgs []*sdkws.MsgData
|
|
|
|
|
|
|
|
msgs, err := db.msgDocDatabase.SearchMessage(ctx, req)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, msg := range msgs {
|
|
|
|
|
|
|
|
totalMsgs = append(totalMsgs, convert.MsgDB2Pb(msg.Msg))
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return totalMsgs, nil
|
|
|
|
|
|
|
|
}
|
|
|
|