|
|
|
@ -95,7 +95,6 @@ type CommonMsgDatabase interface {
|
|
|
|
|
|
|
|
|
|
// to mq
|
|
|
|
|
MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error
|
|
|
|
|
MsgToModifyMQ(ctx context.Context, key, conversarionID string, msgs []*sdkws.MsgData) error
|
|
|
|
|
MsgToPushMQ(ctx context.Context, key, conversarionID string, msg2mq *sdkws.MsgData) (int32, int64, error)
|
|
|
|
|
MsgToMongoMQ(ctx context.Context, key, conversarionID string, msgs []*sdkws.MsgData, lastSeq int64) error
|
|
|
|
|
|
|
|
|
@ -143,14 +142,13 @@ func NewCommonMsgDatabase(msgDocModel relation.MsgDocModelInterface, msg cache.M
|
|
|
|
|
//}
|
|
|
|
|
|
|
|
|
|
type commonMsgDatabase struct {
|
|
|
|
|
msgDocDatabase relation.MsgDocModelInterface
|
|
|
|
|
msgTable relation.MsgDocModel
|
|
|
|
|
msg cache.MsgCache
|
|
|
|
|
seq cache.SeqCache
|
|
|
|
|
producer *kafka.Producer
|
|
|
|
|
producerToMongo *kafka.Producer
|
|
|
|
|
producerToModify *kafka.Producer
|
|
|
|
|
producerToPush *kafka.Producer
|
|
|
|
|
msgDocDatabase relation.MsgDocModelInterface
|
|
|
|
|
msgTable relation.MsgDocModel
|
|
|
|
|
msg cache.MsgCache
|
|
|
|
|
seq cache.SeqCache
|
|
|
|
|
producer *kafka.Producer
|
|
|
|
|
producerToMongo *kafka.Producer
|
|
|
|
|
producerToPush *kafka.Producer
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error {
|
|
|
|
@ -158,14 +156,6 @@ func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sd
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (db *commonMsgDatabase) MsgToModifyMQ(ctx context.Context, key, conversationID string, messages []*sdkws.MsgData) error {
|
|
|
|
|
if len(messages) > 0 {
|
|
|
|
|
_, _, err := db.producerToModify.SendMessage(ctx, key, &pbmsg.MsgDataToModifyByMQ{ConversationID: conversationID, Messages: messages})
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (db *commonMsgDatabase) MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) {
|
|
|
|
|
partition, offset, err := db.producerToPush.SendMessage(ctx, key, &pbmsg.PushMsgDataToMQ{MsgData: msg2mq, ConversationID: conversationID})
|
|
|
|
|
if err != nil {
|
|
|
|
|