From 46a8d171049faefba46f0c3cc19a9c4099b35cc7 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Fri, 20 Dec 2024 15:26:32 +0800 Subject: [PATCH] refactoring scheduled tasks --- go.mod | 4 +-- go.sum | 8 ++--- internal/rpc/conversation/conversation.go | 39 ++++++++++++++++----- internal/rpc/msg/clear.go | 5 ++- internal/rpc/third/s3.go | 17 ++++----- pkg/common/storage/database/mgo/msg.go | 10 +++--- pkg/common/storage/database/mgo/msg_test.go | 6 ++-- 7 files changed, 56 insertions(+), 33 deletions(-) diff --git a/go.mod b/go.mod index 4eaa18ccc..6c1d421c8 100644 --- a/go.mod +++ b/go.mod @@ -14,8 +14,8 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.66 - github.com/openimsdk/tools v0.0.50-alpha.57 + github.com/openimsdk/protocol v0.0.72-alpha.67 + github.com/openimsdk/tools v0.0.50-alpha.58 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 6e283eb66..86ed9ed6e 100644 --- a/go.sum +++ b/go.sum @@ -347,10 +347,10 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM= github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.66 h1:5KoDY6M4T+pXg449ScF6hqeQ+WenBwNyUJn/t8W0oBQ= -github.com/openimsdk/protocol v0.0.72-alpha.66/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M= -github.com/openimsdk/tools v0.0.50-alpha.57 h1:oIKV6vYhqp7TRmZ6Pe+r9RNl1D5s7aB/kE9yQVEWcSY= -github.com/openimsdk/tools v0.0.50-alpha.57/go.mod h1:muCtxguNJv8lFwLei27UASu2Nvg4ERSeN0R4K5tivk0= +github.com/openimsdk/protocol v0.0.72-alpha.67 h1:zlLbVkoT0OYsjO2RCutQuDFllcfNvZfdYchvlR6UIe0= +github.com/openimsdk/protocol v0.0.72-alpha.67/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M= +github.com/openimsdk/tools v0.0.50-alpha.58 h1:hkFL02Bzzp/l5x+tb7kJ9zes7hilh65EQ4qEIthsQX4= +github.com/openimsdk/tools v0.0.50-alpha.58/go.mod h1:muCtxguNJv8lFwLei27UASu2Nvg4ERSeN0R4K5tivk0= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index 9fcd5abb1..696ada152 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -769,7 +769,8 @@ func (c *conversationServer) ClearUserConversationMsg(ctx context.Context, req * if err != nil { return nil, err } - for _, conversation := range conversations { + latestMsgDestructTime := time.UnixMilli(req.Timestamp) + for i, conversation := range conversations { if conversation.IsMsgDestruct == false || conversation.MsgDestructTime == 0 { continue } @@ -778,17 +779,37 @@ func (c *conversationServer) ClearUserConversationMsg(ctx context.Context, req * if err != nil { return nil, err } - if resp.Seq == 0 { + if resp.Seq <= 0 { + log.ZDebug(ctx, "ClearUserConversationMsg GetLastMessageSeqByTime seq <= 0", "index", i, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "msgDestructTime", conversation.MsgDestructTime, "seq", resp.Seq) + if err := c.setConversationMinSeqAndLatestMsgDestructTime(ctx, conversation.ConversationID, conversation.OwnerUserID, -1, latestMsgDestructTime); err != nil { + return nil, err + } continue } - _, err = c.SetConversationMinSeq(ctx, &pbconversation.SetConversationMinSeqReq{ - ConversationID: conversation.ConversationID, - OwnerUserID: []string{conversation.OwnerUserID}, - MinSeq: resp.Seq + 1, - }) - if err != nil { + resp.Seq++ + if err := c.setConversationMinSeqAndLatestMsgDestructTime(ctx, conversation.ConversationID, conversation.OwnerUserID, resp.Seq, latestMsgDestructTime); err != nil { return nil, err } + log.ZDebug(ctx, "ClearUserConversationMsg set min seq", "index", i, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "seq", resp.Seq, "msgDestructTime", conversation.MsgDestructTime) + } + return &pbconversation.ClearUserConversationMsgResp{Count: int32(len(conversations))}, nil +} + +func (c *conversationServer) setConversationMinSeqAndLatestMsgDestructTime(ctx context.Context, conversationID string, ownerUserID string, minSeq int64, latestMsgDestructTime time.Time) error { + update := map[string]any{ + "latest_msg_destruct_time": latestMsgDestructTime, } - return &pbconversation.ClearUserConversationMsgResp{}, nil + if minSeq >= 0 { + req := &pbmsg.SetUserConversationMinSeqReq{ConversationID: conversationID, OwnerUserID: []string{ownerUserID}, MinSeq: minSeq} + if _, err := pbmsg.SetUserConversationMinSeqCaller.Invoke(ctx, req); err != nil { + return err + } + update["min_seq"] = minSeq + } + + if err := c.conversationDatabase.UpdateUsersConversationField(ctx, []string{ownerUserID}, conversationID, update); err != nil { + return err + } + c.conversationNotificationSender.ConversationChangeNotification(ctx, ownerUserID, []string{conversationID}) + return nil } diff --git a/internal/rpc/msg/clear.go b/internal/rpc/msg/clear.go index eb7a4b7a6..7a2d36300 100644 --- a/internal/rpc/msg/clear.go +++ b/internal/rpc/msg/clear.go @@ -27,10 +27,11 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) if err != nil { return nil, err } - for _, doc := range docs { + for i, doc := range docs { if err := m.MsgDatabase.DeleteDoc(ctx, doc.DocID); err != nil { return nil, err } + log.ZDebug(ctx, "DestructMsgs delete doc", "index", i, "docID", doc.DocID) index := strings.LastIndex(doc.DocID, ":") if index < 0 { continue @@ -51,9 +52,11 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) if conversationID == "" { continue } + minSeq++ if err := m.MsgDatabase.SetMinSeq(ctx, conversationID, minSeq); err != nil { return nil, err } + log.ZDebug(ctx, "DestructMsgs delete doc set min seq", "index", i, "docID", doc.DocID, "conversationID", conversationID, "setMinSeq", minSeq) } return &msg.DestructMsgsResp{Count: int32(len(docs))}, nil } diff --git a/internal/rpc/third/s3.go b/internal/rpc/third/s3.go index e1b42ca33..8796fe824 100644 --- a/internal/rpc/third/s3.go +++ b/internal/rpc/third/s3.go @@ -295,22 +295,19 @@ func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteO if err != nil { return nil, err } - keyCount := make(map[string]int) - for _, obj := range models { - keyCount[obj.Key]++ - } - for _, obj := range models { - count, err := t.s3dataBase.GetKeyCount(ctx, engine, obj.Key) - if err != nil { - return nil, err - } + for i, obj := range models { if err := t.s3dataBase.DeleteSpecifiedData(ctx, engine, []string{obj.Name}); err != nil { return nil, errs.Wrap(err) } if err := t.s3dataBase.DelS3Key(ctx, engine, obj.Name); err != nil { return nil, err } - if int(count) <= keyCount[obj.Key] { + count, err := t.s3dataBase.GetKeyCount(ctx, engine, obj.Key) + if err != nil { + return nil, err + } + log.ZDebug(ctx, "delete s3 object record", "index", i, "s3", obj, "count", count) + if count == 0 { if err := t.s3.DeleteObject(ctx, obj.Key); err != nil { return nil, err } diff --git a/pkg/common/storage/database/mgo/msg.go b/pkg/common/storage/database/mgo/msg.go index e01465466..f37176695 100644 --- a/pkg/common/storage/database/mgo/msg.go +++ b/pkg/common/storage/database/mgo/msg.go @@ -1318,16 +1318,14 @@ func (m *MsgMgo) GetLastMessageSeqByTime(ctx context.Context, conversationID str { "$match": bson.M{ "doc_id": bson.M{ - "$regex": fmt.Sprintf("^%s:", conversationID), + "$regex": fmt.Sprintf("^%s", conversationID), }, }, }, { "$match": bson.M{ "msgs.msg.send_time": bson.M{ - "msgs.msg.send_time": bson.M{ - "$lte": time, - }, + "$lte": time, }, }, }, @@ -1342,6 +1340,7 @@ func (m *MsgMgo) GetLastMessageSeqByTime(ctx context.Context, conversationID str { "$project": bson.M{ "_id": 0, + "doc_id": 1, "msgs.msg.send_time": 1, "msgs.msg.seq": 1, }, @@ -1356,6 +1355,9 @@ func (m *MsgMgo) GetLastMessageSeqByTime(ctx context.Context, conversationID str } var seq int64 for _, v := range res[0].Msg { + if v.Msg == nil { + continue + } if v.Msg.SendTime <= time { seq = v.Msg.Seq } diff --git a/pkg/common/storage/database/mgo/msg_test.go b/pkg/common/storage/database/mgo/msg_test.go index 7218ef1ad..992090552 100644 --- a/pkg/common/storage/database/mgo/msg_test.go +++ b/pkg/common/storage/database/mgo/msg_test.go @@ -95,13 +95,13 @@ func TestName4(t *testing.T) { defer cancel() cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.66:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))) - msg, err := NewConversationMongo(cli.Database("openim_v3")) + msg, err := NewMsgMongo(cli.Database("openim_v3")) if err != nil { panic(err) } - ts := time.Now().UnixMilli() + ts := time.Now().Add(-time.Hour * 24 * 5).UnixMilli() t.Log(ts) - res, err := msg.FindRandConversation(ctx, ts, 10) + res, err := msg.GetLastMessageSeqByTime(ctx, "sg_1523453548", ts) if err != nil { panic(err) }