|
|
|
@ -13,6 +13,7 @@ import (
|
|
|
|
|
"github.com/gogo/protobuf/sortkeys"
|
|
|
|
|
"go.mongodb.org/mongo-driver/mongo/options"
|
|
|
|
|
"math/rand"
|
|
|
|
|
"sync"
|
|
|
|
|
|
|
|
|
|
//"github.com/garyburd/redigo/redis"
|
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
|
|
@ -77,6 +78,51 @@ func (d *DataBases) GetMinSeqFromMongo2(uid string) (MinSeq uint32, err error) {
|
|
|
|
|
return 1, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// deleteMsgByLogic
|
|
|
|
|
func (d *DataBases) DelMsgBySeqList(userID string, seqList []uint32, operationID string) (err error) {
|
|
|
|
|
sortkeys.Uint32s(seqList)
|
|
|
|
|
suffixUserID2SubSeqList := func(uid string, seqList []uint32) map[string][]uint32 {
|
|
|
|
|
t := make(map[string][]uint32)
|
|
|
|
|
for i := 0; i < len(seqList); i++ {
|
|
|
|
|
seqUid := getSeqUid(uid, seqList[i])
|
|
|
|
|
if value, ok := t[seqUid]; !ok {
|
|
|
|
|
var temp []uint32
|
|
|
|
|
t[seqUid] = append(temp, seqList[i])
|
|
|
|
|
} else {
|
|
|
|
|
t[seqUid] = append(value, seqList[i])
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return t
|
|
|
|
|
}(userID, seqList)
|
|
|
|
|
|
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
|
wg.Add(len(suffixUserID2SubSeqList))
|
|
|
|
|
for k, v := range suffixUserID2SubSeqList {
|
|
|
|
|
go func(suffixUserID string, subSeqList []uint32, operationID string) {
|
|
|
|
|
if e := d.DelMsgBySeqListInOneDoc(suffixUserID, subSeqList, operationID); e != nil {
|
|
|
|
|
log.Error(operationID, "DelMsgBySeqListInOneDoc failed ", e.Error(), suffixUserID, subSeqList)
|
|
|
|
|
err = e
|
|
|
|
|
}
|
|
|
|
|
wg.Done()
|
|
|
|
|
}(k, v, operationID)
|
|
|
|
|
}
|
|
|
|
|
wg.Wait()
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (d *DataBases) DelMsgBySeqListInOneDoc(suffixUserID string, seqList []uint32, operationID string) error {
|
|
|
|
|
seqMsgList, indexList, err := d.GetMsgAndIndexBySeqListInOneMongo2(suffixUserID, seqList, operationID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return utils.Wrap(err, "")
|
|
|
|
|
}
|
|
|
|
|
for i, v := range seqMsgList {
|
|
|
|
|
if err := d.ReplaceMsgByIndex(suffixUserID, v, operationID, indexList[i]); err != nil {
|
|
|
|
|
return utils.Wrap(err, "")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// deleteMsgByLogic
|
|
|
|
|
func (d *DataBases) DelMsgLogic(uid string, seqList []uint32, operationID string) error {
|
|
|
|
|
sortkeys.Uint32s(seqList)
|
|
|
|
@ -94,6 +140,27 @@ func (d *DataBases) DelMsgLogic(uid string, seqList []uint32, operationID string
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (d *DataBases) ReplaceMsgByIndex(suffixUserID string, msg *open_im_sdk.MsgData, operationID string, seqIndex int) error {
|
|
|
|
|
log.NewInfo(operationID, utils.GetSelfFuncName(), suffixUserID, *msg)
|
|
|
|
|
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
|
|
|
|
|
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
|
|
|
|
|
s := fmt.Sprintf("msg.%d.msg", seqIndex)
|
|
|
|
|
log.NewDebug(operationID, utils.GetSelfFuncName(), seqIndex, s)
|
|
|
|
|
msg.Status = constant.MsgDeleted
|
|
|
|
|
bytes, err := proto.Marshal(msg)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.NewError(operationID, utils.GetSelfFuncName(), "proto marshal failed ", err.Error(), msg.String())
|
|
|
|
|
return utils.Wrap(err, "")
|
|
|
|
|
}
|
|
|
|
|
updateResult, err := c.UpdateOne(ctx, bson.M{"uid": suffixUserID}, bson.M{"$set": bson.M{s: bytes}})
|
|
|
|
|
log.NewInfo(operationID, utils.GetSelfFuncName(), updateResult)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.NewError(operationID, utils.GetSelfFuncName(), "UpdateOne", err.Error())
|
|
|
|
|
return utils.Wrap(err, "")
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (d *DataBases) ReplaceMsgBySeq(uid string, msg *open_im_sdk.MsgData, operationID string) error {
|
|
|
|
|
log.NewInfo(operationID, utils.GetSelfFuncName(), uid, *msg)
|
|
|
|
|
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
|
|
|
|
@ -107,6 +174,7 @@ func (d *DataBases) ReplaceMsgBySeq(uid string, msg *open_im_sdk.MsgData, operat
|
|
|
|
|
log.NewError(operationID, utils.GetSelfFuncName(), "proto marshal", err.Error())
|
|
|
|
|
return utils.Wrap(err, "")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
updateResult, err := c.UpdateOne(
|
|
|
|
|
ctx, bson.M{"uid": uid},
|
|
|
|
|
bson.M{"$set": bson.M{s: bytes}})
|
|
|
|
@ -226,6 +294,36 @@ func (d *DataBases) GetMsgBySeqListMongo2(uid string, seqList []uint32, operatio
|
|
|
|
|
return seqMsg, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (d *DataBases) GetMsgAndIndexBySeqListInOneMongo2(suffixUserID string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, indexList []int, err error) {
|
|
|
|
|
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
|
|
|
|
|
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
|
|
|
|
|
sChat := UserChat{}
|
|
|
|
|
if err = c.FindOne(ctx, bson.M{"uid": suffixUserID}).Decode(&sChat); err != nil {
|
|
|
|
|
log.NewError(operationID, "not find seqUid", suffixUserID, err.Error())
|
|
|
|
|
return nil, nil, utils.Wrap(err, "")
|
|
|
|
|
}
|
|
|
|
|
singleCount := 0
|
|
|
|
|
var hasSeqList []uint32
|
|
|
|
|
for i := 0; i < len(sChat.Msg); i++ {
|
|
|
|
|
msg := new(open_im_sdk.MsgData)
|
|
|
|
|
if err = proto.Unmarshal(sChat.Msg[i].Msg, msg); err != nil {
|
|
|
|
|
log.NewError(operationID, "Unmarshal err", msg.String(), err.Error())
|
|
|
|
|
return nil, nil, err
|
|
|
|
|
}
|
|
|
|
|
if isContainInt32(msg.Seq, seqList) {
|
|
|
|
|
indexList = append(indexList, i)
|
|
|
|
|
seqMsg = append(seqMsg, msg)
|
|
|
|
|
hasSeqList = append(hasSeqList, msg.Seq)
|
|
|
|
|
singleCount++
|
|
|
|
|
if singleCount == len(seqList) {
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return seqMsg, indexList, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func genExceptionMessageBySeqList(seqList []uint32) (exceptionMsg []*open_im_sdk.MsgData) {
|
|
|
|
|
for _, v := range seqList {
|
|
|
|
|
msg := new(open_im_sdk.MsgData)
|
|
|
|
@ -658,7 +756,7 @@ func getMsgIndex(seq uint32) int {
|
|
|
|
|
seqSuffix := seq / singleGocMsgNum
|
|
|
|
|
var index uint32
|
|
|
|
|
if seqSuffix == 0 {
|
|
|
|
|
index = (seq - seqSuffix*5000) - 1
|
|
|
|
|
index = (seq - seqSuffix*singleGocMsgNum) - 1
|
|
|
|
|
} else {
|
|
|
|
|
index = seq - seqSuffix*singleGocMsgNum
|
|
|
|
|
}
|
|
|
|
|