From b729cac998d9b7b8cddcd57f14921bd64359aa6b Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 10 Nov 2021 18:13:04 +0800 Subject: [PATCH] add timer update redis minSeq --- cmd/open_im_timed_task/main.go | 78 ++++++++++++++++++++++------------ pkg/common/db/mongoModel.go | 34 ++++++++++++++- pkg/common/db/redisModel.go | 4 +- 3 files changed, 85 insertions(+), 31 deletions(-) diff --git a/cmd/open_im_timed_task/main.go b/cmd/open_im_timed_task/main.go index 1119ce577..a51c771e8 100644 --- a/cmd/open_im_timed_task/main.go +++ b/cmd/open_im_timed_task/main.go @@ -1,42 +1,64 @@ package main import ( - "Open_IM/pkg/common/config" - "Open_IM/pkg/common/db" - "fmt" + commonDB "Open_IM/pkg/common/db" + "Open_IM/pkg/common/db/mysql_model/im_mysql_model" + "Open_IM/pkg/common/log" "time" ) func main() { + //for { + // fmt.Println("start delete mongodb expired record") + // timeUnixBegin := time.Now().Unix() + // count, _ := db.DB.MgoUserCount() + // fmt.Println("mongodb record count: ", count) + // for i := 0; i < count; i++ { + // time.Sleep(1 * time.Millisecond) + // uid, _ := db.DB.MgoSkipUID(i) + // fmt.Println("operate uid: ", uid) + // err := db.DB.DelUserChat(uid) + // if err != nil { + // fmt.Println("operate uid failed: ", uid, err.Error()) + // } + // } + // + // timeUnixEnd := time.Now().Unix() + // costTime := timeUnixEnd - timeUnixBegin + // if costTime > int64(config.Config.Mongo.DBRetainChatRecords*24*3600) { + // continue + // } else { + // sleepTime := 0 + // if int64(config.Config.Mongo.DBRetainChatRecords*24*3600)-costTime > 24*3600 { + // sleepTime = 24 * 3600 + // } else { + // sleepTime = config.Config.Mongo.DBRetainChatRecords*24*3600 - int(costTime) + // } + // fmt.Println("sleep: ", sleepTime) + // time.Sleep(time.Duration(sleepTime) * time.Second) + // } + //} for { - fmt.Println("start delete mongodb expired record") - timeUnixBegin := time.Now().Unix() - count, _ := db.DB.MgoUserCount() - fmt.Println("mongodb record count: ", count) - for i := 0; i < count; i++ { - time.Sleep(1 * time.Millisecond) - uid, _ := db.DB.MgoSkipUID(i) - fmt.Println("operate uid: ", uid) - err := db.DB.DelUserChat(uid) - if err != nil { - fmt.Println("operate uid failed: ", uid, err.Error()) - } - } - - timeUnixEnd := time.Now().Unix() - costTime := timeUnixEnd - timeUnixBegin - if costTime > int64(config.Config.Mongo.DBRetainChatRecords*24*3600) { - continue + uidList, err := im_mysql_model.SelectAllUID() + if err != nil { + log.NewError("999999", err.Error()) } else { - sleepTime := 0 - if int64(config.Config.Mongo.DBRetainChatRecords*24*3600)-costTime > 24*3600 { - sleepTime = 24 * 3600 - } else { - sleepTime = config.Config.Mongo.DBRetainChatRecords*24*3600 - int(costTime) + for _, v := range uidList { + minSeq, err := commonDB.DB.GetMinSeqFromMongo(v) + if err != nil { + log.NewError("999999", "get user minSeq err", err.Error(), v) + continue + } else { + err := commonDB.DB.SetUserMinSeq(v, minSeq) + if err != nil { + log.NewError("999999", "set user minSeq err", err.Error(), v) + } + } + time.Sleep(time.Duration(100) * time.Millisecond) } - fmt.Println("sleep: ", sleepTime) - time.Sleep(time.Duration(sleepTime) * time.Second) + } + } } diff --git a/pkg/common/db/mongoModel.go b/pkg/common/db/mongoModel.go index d9c6f5d2f..93f64ff5f 100644 --- a/pkg/common/db/mongoModel.go +++ b/pkg/common/db/mongoModel.go @@ -6,6 +6,7 @@ import ( "Open_IM/pkg/common/log" pbMsg "Open_IM/pkg/proto/chat" "errors" + "github.com/garyburd/redigo/redis" "github.com/golang/protobuf/proto" "gopkg.in/mgo.v2/bson" "strconv" @@ -87,6 +88,34 @@ func (d *DataBases) GetMsgBySeqRange(uid string, seqBegin, seqEnd int64) (Single return SingleMsg, GroupMsg, MaxSeq, MinSeq, nil } +func (d *DataBases) GetMinSeqFromMongo(uid string) (MinSeq int64, err error) { + var i int64 + var seqUid string + session := d.mgoSession.Clone() + if session == nil { + return MinSeq, errors.New("session == nil") + } + defer session.Close() + c := session.DB(config.Config.Mongo.DBDatabase).C(cChat) + MaxSeq, err := d.GetUserMaxSeq(uid) + if err != nil && err != redis.ErrNil { + return MinSeq, err + } + NB := MaxSeq / singleGocMsgNum + for i = 0; i <= NB; i++ { + seqUid = indexGen(uid, i) + n, err := c.Find(bson.M{"uid": seqUid}).Count() + if err == nil && n != 0 { + if i == 0 { + MinSeq = 1 + } else { + MinSeq = i * singleGocMsgNum + } + break + } + } + return MinSeq, nil +} func (d *DataBases) GetMsgBySeqList(uid string, seqList []int64) (SingleMsg []*pbMsg.MsgFormat, GroupMsg []*pbMsg.MsgFormat, MaxSeq int64, MinSeq int64, err error) { allCount := 0 singleCount := 0 @@ -316,7 +345,7 @@ func getCurrentTimestampByMill() int64 { } func getSeqUid(uid string, seq int64) string { seqSuffix := seq / singleGocMsgNum - return uid + ":" + strconv.FormatInt(seqSuffix, 10) + return indexGen(uid, seqSuffix) } func isContainInt64(target int64, List []int64) bool { @@ -329,3 +358,6 @@ func isContainInt64(target int64, List []int64) bool { return false } +func indexGen(uid string, seqSuffix int64) string { + return uid + ":" + strconv.FormatInt(seqSuffix, 10) +} diff --git a/pkg/common/db/redisModel.go b/pkg/common/db/redisModel.go index 781cc86fe..b31fde8c1 100644 --- a/pkg/common/db/redisModel.go +++ b/pkg/common/db/redisModel.go @@ -45,9 +45,9 @@ func (d *DataBases) GetUserMaxSeq(uid string) (int64, error) { } //Set the user's minimum seq -func (d *DataBases) SetUserMinSeq(uid string) (err error) { +func (d *DataBases) SetUserMinSeq(uid string, minSeq int64) (err error) { key := userMinSeq + uid - _, err = d.Exec("SET", key) + _, err = d.Exec("SET", key, minSeq) return err }