fix: solve uncorrect outdated msg get. (#2513)

* refactor: refactor workflows contents.

* add tool workflows.

* update field.

* fix: remove chat error.

* Fix err.

* fix error.

* remove cn comment.

* update workflows files.

* update infra config.

* move workflows.

* feat: update bot.

* fix: solve uncorrect outdated msg get.

* update get docIDs logic.

* update

* update skip logic.

* fix

* update.
pull/2525/head
Monet Lee 4 months ago committed by GitHub
parent f3dfeb3bc4
commit 27024dfb03
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -29,7 +29,7 @@ jobs:
uses: peter-evans/create-or-update-comment@v4 uses: peter-evans/create-or-update-comment@v4
with: with:
issue-number: ${{ github.event.issue.number }} issue-number: ${{ github.event.issue.number }}
token: ${{ secrets.BOT_GITHUB_TOKEN }} token: ${{ secrets.BOT_TOKEN }}
body: | body: |
This issue is available for anyone to work on. **Make sure to reference this issue in your pull request.** :sparkles: Thank you for your contribution! :sparkles: This issue is available for anyone to work on. **Make sure to reference this issue in your pull request.** :sparkles: Thank you for your contribution! :sparkles:
[Join slack 🤖](https://join.slack.com/t/openimsdk/shared_invite/zt-22720d66b-o_FvKxMTGXtcnnnHiMqe9Q) to connect and communicate with our developers. [Join slack 🤖](https://join.slack.com/t/openimsdk/shared_invite/zt-22720d66b-o_FvKxMTGXtcnnnHiMqe9Q) to connect and communicate with our developers.

@ -634,11 +634,11 @@ func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Contex
conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination) conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination)
if err != nil { if err != nil {
log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber) // log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber)
continue continue
} }
log.ZDebug(ctx, "PageConversationIDs success", "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs) // log.ZDebug(ctx, "PageConversationIDs success", "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs)
if len(conversationIDs) == 0 { if len(conversationIDs) == 0 {
continue continue
} }

@ -30,8 +30,14 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
msgNum int msgNum int
start = time.Now() start = time.Now()
) )
clearMsg := func(ctx context.Context) (bool, error) { clearMsg := func(ctx context.Context) (bool, error) {
msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, 100) docIDs, err := m.MsgDatabase.GetDocIDs(ctx)
if err != nil {
return false, err
}
msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, docIDs, 5000)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -55,19 +61,14 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
return true, nil return true, nil
} }
for { _, err = clearMsg(ctx)
keep, err := clearMsg(ctx) if err != nil {
if err != nil { log.ZError(ctx, "clear msg failed", err, "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
log.ZError(ctx, "clear msg failed", err, "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start)) return nil, err
return nil, err
}
if !keep {
log.ZInfo(ctx, "clear msg success", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
break
}
log.ZInfo(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
} }
log.ZInfo(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
return &msg.ClearMsgResp{}, nil return &msg.ClearMsgResp{}, nil
} }

@ -81,6 +81,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
deltime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.RetainChatRecords)) deltime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.RetainChatRecords))
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli())) ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli()))
log.ZInfo(ctx, "clear chat records", "deltime", deltime, "timestamp", deltime.UnixMilli()) log.ZInfo(ctx, "clear chat records", "deltime", deltime, "timestamp", deltime.UnixMilli())
if _, err := msgClient.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: deltime.UnixMilli()}); err != nil { if _, err := msgClient.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: deltime.UnixMilli()}); err != nil {
log.ZError(ctx, "cron clear chat records failed", err, "deltime", deltime, "cont", time.Since(now)) log.ZError(ctx, "cron clear chat records failed", err, "deltime", deltime, "cont", time.Since(now))
return return

@ -18,11 +18,12 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"strings" "strings"
"time" "time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
@ -97,8 +98,10 @@ type CommonMsgDatabase interface {
ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)
// clear msg // clear msg
GetBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) GetBeforeMsg(ctx context.Context, ts int64, docIds []string, limit int) ([]*model.MsgDocModel, error)
DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error)
GetDocIDs(ctx context.Context) ([]string, error)
} }
func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) { func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) {
@ -912,8 +915,25 @@ func (db *commonMsgDatabase) ConvertMsgsDocLen(ctx context.Context, conversation
db.msgDocDatabase.ConvertMsgsDocLen(ctx, conversationIDs) db.msgDocDatabase.ConvertMsgsDocLen(ctx, conversationIDs)
} }
func (db *commonMsgDatabase) GetBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) { func (db *commonMsgDatabase) GetBeforeMsg(ctx context.Context, ts int64, docIDs []string, limit int) ([]*model.MsgDocModel, error) {
return db.msgDocDatabase.GetBeforeMsg(ctx, ts, limit) var msgs []*model.MsgDocModel
for i := 0; i < len(docIDs); i += 1000 {
end := i + 1000
if end > len(docIDs) {
end = len(docIDs)
}
res, err := db.msgDocDatabase.GetBeforeMsg(ctx, ts, docIDs[i:end], limit)
if err != nil {
return nil, err
}
msgs = append(msgs, res...)
if len(msgs) >= limit {
return msgs[:limit], nil
}
}
return msgs, nil
} }
func (db *commonMsgDatabase) DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) { func (db *commonMsgDatabase) DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) {
@ -936,8 +956,10 @@ func (db *commonMsgDatabase) DeleteDocMsgBefore(ctx context.Context, ts int64, d
return index, err return index, err
} }
if len(index) == notNull { if len(index) == notNull {
log.ZDebug(ctx, "Delete db in Doc", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq)
return index, db.msgDocDatabase.DeleteDoc(ctx, doc.DocID) return index, db.msgDocDatabase.DeleteDoc(ctx, doc.DocID)
} else { } else {
log.ZDebug(ctx, "delete db in index", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq)
return index, db.msgDocDatabase.DeleteMsgByIndex(ctx, doc.DocID, index) return index, db.msgDocDatabase.DeleteMsgByIndex(ctx, doc.DocID, index)
} }
} }
@ -955,3 +977,7 @@ func (db *commonMsgDatabase) setMinSeq(ctx context.Context, conversationID strin
} }
return db.seqConversation.SetMinSeq(ctx, conversationID, seq) return db.seqConversation.SetMinSeq(ctx, conversationID, seq)
} }
func (db *commonMsgDatabase) GetDocIDs(ctx context.Context) ([]string, error) {
return db.msgDocDatabase.GetDocIDs(ctx)
}

@ -8,6 +8,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/datautil"
"golang.org/x/exp/rand"
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/msg"
@ -1226,10 +1227,53 @@ func (m *MsgMgo) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string
} }
} }
func (m *MsgMgo) GetBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) { func (m *MsgMgo) GetDocIDs(ctx context.Context) ([]string, error) {
limit := 5000
var skip int
var docIDs []string
var offset int
count, err := m.coll.CountDocuments(ctx, bson.M{})
if err != nil {
return nil, err
}
if count < int64(limit) {
skip = 0
} else {
rand.Seed(uint64(time.Now().UnixMilli()))
skip = rand.Intn(int(count / int64(limit)))
offset = skip * limit
}
log.ZDebug(ctx, "offset", "skip", skip, "offset", offset)
res, err := mongoutil.Aggregate[*model.MsgDocModel](ctx, m.coll, []bson.M{
{
"$project": bson.M{
"doc_id": 1,
},
},
{
"$skip": offset,
},
{
"$limit": limit,
},
})
for _, doc := range res {
docIDs = append(docIDs, doc.DocID)
}
return docIDs, errs.Wrap(err)
}
func (m *MsgMgo) GetBeforeMsg(ctx context.Context, ts int64, docIDs []string, limit int) ([]*model.MsgDocModel, error) {
return mongoutil.Aggregate[*model.MsgDocModel](ctx, m.coll, []bson.M{ return mongoutil.Aggregate[*model.MsgDocModel](ctx, m.coll, []bson.M{
{ {
"$match": bson.M{ "$match": bson.M{
"doc_id": bson.M{
"$in": docIDs,
},
"msgs.msg.send_time": bson.M{ "msgs.msg.send_time": bson.M{
"$lt": ts, "$lt": ts,
}, },

@ -16,10 +16,11 @@ package database
import ( import (
"context" "context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/msg"
"go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo"
"time"
) )
type Msg interface { type Msg interface {
@ -44,5 +45,7 @@ type Msg interface {
DeleteDoc(ctx context.Context, docID string) error DeleteDoc(ctx context.Context, docID string) error
DeleteMsgByIndex(ctx context.Context, docID string, index []int) error DeleteMsgByIndex(ctx context.Context, docID string, index []int) error
GetBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) GetBeforeMsg(ctx context.Context, ts int64, docIDs []string, limit int) ([]*model.MsgDocModel, error)
GetDocIDs(ctx context.Context) ([]string, error)
} }

Loading…
Cancel
Save