|
|
@ -134,7 +134,7 @@ func (m *MsgMongoDriver) UpdateOneDoc(ctx context.Context, msg *table.MsgDocMode
|
|
|
|
return err
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, beginSeq, endSeq int64) (msgs []*sdkws.MsgData, seqs []int64, err error) {
|
|
|
|
func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, beginSeq, endSeq int64) (msgs []*sdkws.MsgData, err error) {
|
|
|
|
beginIndex := m.msg.GetMsgIndex(beginSeq)
|
|
|
|
beginIndex := m.msg.GetMsgIndex(beginSeq)
|
|
|
|
num := endSeq - beginSeq + 1
|
|
|
|
num := endSeq - beginSeq + 1
|
|
|
|
pipeline := bson.A{
|
|
|
|
pipeline := bson.A{
|
|
|
@ -151,7 +151,7 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID strin
|
|
|
|
}
|
|
|
|
}
|
|
|
|
cursor, err := m.MsgCollection.Aggregate(ctx, pipeline)
|
|
|
|
cursor, err := m.MsgCollection.Aggregate(ctx, pipeline)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
return nil, nil, errs.Wrap(err)
|
|
|
|
return nil, errs.Wrap(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
defer cursor.Close(ctx)
|
|
|
|
defer cursor.Close(ctx)
|
|
|
|
var doc table.MsgDocModel
|
|
|
|
var doc table.MsgDocModel
|
|
|
@ -159,28 +159,24 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID strin
|
|
|
|
for cursor.Next(ctx) {
|
|
|
|
for cursor.Next(ctx) {
|
|
|
|
err := cursor.Decode(&doc)
|
|
|
|
err := cursor.Decode(&doc)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
return nil, nil, err
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if i == 0 {
|
|
|
|
if i == 0 {
|
|
|
|
break
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if len(doc.Msg) < 1 {
|
|
|
|
|
|
|
|
return nil, nil, errs.ErrRecordNotFound.Wrap("mongo GetMsgBySeqIndex failed, len is 0")
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
log.ZDebug(ctx, "msgInfos", "num", len(doc.Msg))
|
|
|
|
log.ZDebug(ctx, "msgInfos", "num", len(doc.Msg))
|
|
|
|
for _, v := range doc.Msg {
|
|
|
|
for _, v := range doc.Msg {
|
|
|
|
var msg sdkws.MsgData
|
|
|
|
var msg sdkws.MsgData
|
|
|
|
if err := proto.Unmarshal(v.Msg, &msg); err != nil {
|
|
|
|
if err := proto.Unmarshal(v.Msg, &msg); err != nil {
|
|
|
|
return nil, nil, err
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if msg.Seq >= beginSeq && msg.Seq <= endSeq {
|
|
|
|
if msg.Seq >= beginSeq && msg.Seq <= endSeq {
|
|
|
|
log.ZDebug(ctx, "find msg", "msg", &msg)
|
|
|
|
log.ZDebug(ctx, "find msg", "msg", &msg)
|
|
|
|
msgs = append(msgs, &msg)
|
|
|
|
msgs = append(msgs, &msg)
|
|
|
|
seqs = append(seqs, msg.Seq)
|
|
|
|
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
log.ZWarn(ctx, "this msg is at wrong position", nil, "msg", &msg)
|
|
|
|
log.ZWarn(ctx, "this msg is at wrong position", nil, "msg", &msg)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return msgs, seqs, nil
|
|
|
|
return msgs, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|