From ed0a834e2e9d4db7439361e6e0779fcf78337ffb Mon Sep 17 00:00:00 2001 From: chao <48119764+withchao@users.noreply.github.com> Date: Thu, 2 Jan 2025 17:15:38 +0800 Subject: [PATCH] feat: support GetLastMessage (#3029) * pb * fix: Modifying other fields while setting IsPrivateChat does not take effect * fix: quote message error revoke * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * upgrading pkg tools * fix * fix * optimize log output * feat: support GetLastMessage * feat: support GetLastMessage * feat: s3 switch * feat: s3 switch --- go.mod | 2 +- go.sum | 4 +- internal/msggateway/client.go | 2 + internal/msggateway/constant.go | 1 + internal/msggateway/message_handler.go | 13 ++ internal/rpc/msg/sync_msg.go | 8 + pkg/common/storage/controller/msg.go | 25 ++- pkg/common/storage/database/mgo/msg.go | 62 +++++++ pkg/common/storage/database/mgo/object.go | 12 ++ pkg/common/storage/database/msg.go | 1 + pkg/common/storage/database/object.go | 4 + tools/s3/README.md | 12 ++ tools/s3/internal/conversion.go | 202 ++++++++++++++++++++++ tools/s3/main.go | 23 +++ 14 files changed, 367 insertions(+), 4 deletions(-) create mode 100644 tools/s3/README.md create mode 100644 tools/s3/internal/conversion.go create mode 100644 tools/s3/main.go diff --git a/go.mod b/go.mod index efdac43f8..03a7a4d4d 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.69 + github.com/openimsdk/protocol v0.0.72-alpha.70 github.com/openimsdk/tools v0.0.50-alpha.63 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index f87e038d0..4c297134d 100644 --- a/go.sum +++ b/go.sum @@ -347,8 +347,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM= github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.69 h1:b22oY2XTdBR/BePqA73KsrM3GDF3Vk8YcBEXZU4ArJc= -github.com/openimsdk/protocol v0.0.72-alpha.69/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M= +github.com/openimsdk/protocol v0.0.72-alpha.70 h1:j7vB81+rTthijRda2b8tlli9oWvPxr4yXHwZ8nPZIBQ= +github.com/openimsdk/protocol v0.0.72-alpha.70/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M= github.com/openimsdk/tools v0.0.50-alpha.63 h1:dPoVvg4KWqYX/xtK3j96TwX2A/4jwT5S5XIHvSM9hTY= github.com/openimsdk/tools v0.0.50-alpha.63/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index 1040f2be2..bdb62aece 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -236,6 +236,8 @@ func (c *Client) handleMessage(message []byte) error { resp, messageErr = c.longConnServer.GetSeqMessage(ctx, binaryReq) case WSGetConvMaxReadSeq: resp, messageErr = c.longConnServer.GetConversationsHasReadAndMaxSeq(ctx, binaryReq) + case WsPullConvLastMessage: + resp, messageErr = c.longConnServer.GetLastMessage(ctx, binaryReq) case WsLogoutMsg: resp, messageErr = c.longConnServer.UserLogout(ctx, binaryReq) case WsSetBackgroundStatus: diff --git a/internal/msggateway/constant.go b/internal/msggateway/constant.go index a825c0519..1e7ab3bb7 100644 --- a/internal/msggateway/constant.go +++ b/internal/msggateway/constant.go @@ -47,6 +47,7 @@ const ( WSSendSignalMsg = 1004 WSPullMsg = 1005 WSGetConvMaxReadSeq = 1006 + WsPullConvLastMessage = 1007 WSPushMsg = 2001 WSKickOnlineMsg = 2002 WsLogoutMsg = 2003 diff --git a/internal/msggateway/message_handler.go b/internal/msggateway/message_handler.go index 9b59867d6..ca15e1ef6 100644 --- a/internal/msggateway/message_handler.go +++ b/internal/msggateway/message_handler.go @@ -108,6 +108,7 @@ type MessageHandler interface { GetSeqMessage(ctx context.Context, data *Req) ([]byte, error) UserLogout(ctx context.Context, data *Req) ([]byte, error) SetUserDeviceBackground(ctx context.Context, data *Req) ([]byte, bool, error) + GetLastMessage(ctx context.Context, data *Req) ([]byte, error) } var _ MessageHandler = (*GrpcHandler)(nil) @@ -266,3 +267,15 @@ func (g *GrpcHandler) SetUserDeviceBackground(ctx context.Context, data *Req) ([ } return nil, req.IsBackground, nil } + +func (g *GrpcHandler) GetLastMessage(ctx context.Context, data *Req) ([]byte, error) { + var req msg.GetLastMessageReq + if err := proto.Unmarshal(data.Data, &req); err != nil { + return nil, err + } + resp, err := g.msgClient.GetLastMessage(ctx, &req) + if err != nil { + return nil, err + } + return proto.Marshal(resp) +} diff --git a/internal/rpc/msg/sync_msg.go b/internal/rpc/msg/sync_msg.go index 7d4ffa3e6..6cf1c21d3 100644 --- a/internal/rpc/msg/sync_msg.go +++ b/internal/rpc/msg/sync_msg.go @@ -245,3 +245,11 @@ func (m *msgServer) SearchMessage(ctx context.Context, req *msg.SearchMessageReq func (m *msgServer) GetServerTime(ctx context.Context, _ *msg.GetServerTimeReq) (*msg.GetServerTimeResp, error) { return &msg.GetServerTimeResp{ServerTime: timeutil.GetCurrentTimestampByMill()}, nil } + +func (m *msgServer) GetLastMessage(ctx context.Context, req *msg.GetLastMessageReq) (*msg.GetLastMessageResp, error) { + msgs, err := m.MsgDatabase.GetLastMessage(ctx, req.ConversationIDs, req.UserID) + if err != nil { + return nil, err + } + return &msg.GetLastMessageResp{Msgs: msgs}, nil +} diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index d5ad12584..a93d581eb 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -97,6 +97,8 @@ type CommonMsgDatabase interface { DeleteDoc(ctx context.Context, docID string) error GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) + + GetLastMessage(ctx context.Context, conversationIDS []string, userID string) (map[string]*sdkws.MsgData, error) } func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) { @@ -811,8 +813,29 @@ func (db *commonMsgDatabase) GetMessageBySeqs(ctx context.Context, conversationI if v, ok := seqMsgs[seq]; ok { res = append(res, convert.MsgDB2Pb(v.Msg)) } else { - res = append(res, &sdkws.MsgData{Seq: seq}) + res = append(res, &sdkws.MsgData{Seq: seq, Status: constant.MsgStatusHasDeleted}) + } + } + return res, nil +} + +func (db *commonMsgDatabase) GetLastMessage(ctx context.Context, conversationIDs []string, userID string) (map[string]*sdkws.MsgData, error) { + res := make(map[string]*sdkws.MsgData) + for _, conversationID := range conversationIDs { + if _, ok := res[conversationID]; ok { + continue + } + msg, err := db.msgDocDatabase.GetLastMessage(ctx, conversationID) + if err != nil { + if errs.Unwrap(err) == mongo.ErrNoDocuments { + continue + } + return nil, err } + tmp := []*model.MsgInfoModel{msg} + db.handlerDeleteAndRevoked(ctx, userID, tmp) + db.handlerQuote(ctx, userID, conversationID, tmp) + res[conversationID] = convert.MsgDB2Pb(msg.Msg) } return res, nil } diff --git a/pkg/common/storage/database/mgo/msg.go b/pkg/common/storage/database/mgo/msg.go index 03ebff611..c440d4442 100644 --- a/pkg/common/storage/database/mgo/msg.go +++ b/pkg/common/storage/database/mgo/msg.go @@ -997,6 +997,68 @@ func (m *MsgMgo) GetLastMessageSeqByTime(ctx context.Context, conversationID str return seq, nil } +func (m *MsgMgo) GetLastMessage(ctx context.Context, conversationID string) (*model.MsgInfoModel, error) { + pipeline := []bson.M{ + { + "$match": bson.M{ + "doc_id": bson.M{ + "$regex": fmt.Sprintf("^%s", conversationID), + }, + }, + }, + { + "$match": bson.M{ + "msgs.msg.status": bson.M{ + "$lt": constant.MsgStatusHasDeleted, + }, + }, + }, + { + "$sort": bson.M{ + "_id": -1, + }, + }, + { + "$limit": 1, + }, + { + "$project": bson.M{ + "_id": 0, + "doc_id": 0, + }, + }, + { + "$unwind": "$msgs", + }, + { + "$match": bson.M{ + "msgs.msg.status": bson.M{ + "$lt": constant.MsgStatusHasDeleted, + }, + }, + }, + { + "$sort": bson.M{ + "msgs.msg.seq": -1, + }, + }, + { + "$limit": 1, + }, + } + type Result struct { + Msgs *model.MsgInfoModel `bson:"msgs"` + } + res, err := mongoutil.Aggregate[*Result](ctx, m.coll, pipeline) + if err != nil { + return nil, err + } + if len(res) == 0 { + return nil, errs.Wrap(mongo.ErrNoDocuments) + } + return res[0].Msgs, nil +} + func (m *MsgMgo) onlyFindDocIndex(ctx context.Context, docID string, indexes []int64) ([]*model.MsgInfoModel, error) { if len(indexes) == 0 { return nil, nil diff --git a/pkg/common/storage/database/mgo/object.go b/pkg/common/storage/database/mgo/object.go index 624eb84a0..ee48c6693 100644 --- a/pkg/common/storage/database/mgo/object.go +++ b/pkg/common/storage/database/mgo/object.go @@ -112,3 +112,15 @@ func (o *S3Mongo) FindExpirationObject(ctx context.Context, engine string, expir func (o *S3Mongo) GetKeyCount(ctx context.Context, engine string, key string) (int64, error) { return mongoutil.Count(ctx, o.coll, bson.M{"engine": engine, "key": key}) } + +func (o *S3Mongo) GetEngineCount(ctx context.Context, engine string) (int64, error) { + return mongoutil.Count(ctx, o.coll, bson.M{"engine": engine}) +} + +func (o *S3Mongo) GetEngineInfo(ctx context.Context, engine string, limit int, skip int) ([]*model.Object, error) { + return mongoutil.Find[*model.Object](ctx, o.coll, bson.M{"engine": engine}, options.Find().SetLimit(int64(limit)).SetSkip(int64(skip))) +} + +func (o *S3Mongo) UpdateEngine(ctx context.Context, oldEngine, oldName string, newEngine string) error { + return mongoutil.UpdateOne(ctx, o.coll, bson.M{"engine": oldEngine, "name": oldName}, bson.M{"$set": bson.M{"engine": newEngine}}, false) +} diff --git a/pkg/common/storage/database/msg.go b/pkg/common/storage/database/msg.go index b44e70296..e3c4e8ece 100644 --- a/pkg/common/storage/database/msg.go +++ b/pkg/common/storage/database/msg.go @@ -39,5 +39,6 @@ type Msg interface { DeleteDoc(ctx context.Context, docID string) error GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) + GetLastMessage(ctx context.Context, conversationID string) (*model.MsgInfoModel, error) FindSeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) } diff --git a/pkg/common/storage/database/object.go b/pkg/common/storage/database/object.go index 5541a159b..a0e4ebe2b 100644 --- a/pkg/common/storage/database/object.go +++ b/pkg/common/storage/database/object.go @@ -27,4 +27,8 @@ type ObjectInfo interface { Delete(ctx context.Context, engine string, name []string) error FindExpirationObject(ctx context.Context, engine string, expiration time.Time, needDelType []string, count int64) ([]*model.Object, error) GetKeyCount(ctx context.Context, engine string, key string) (int64, error) + + GetEngineCount(ctx context.Context, engine string) (int64, error) + GetEngineInfo(ctx context.Context, engine string, limit int, skip int) ([]*model.Object, error) + UpdateEngine(ctx context.Context, oldEngine, oldName string, newEngine string) error } diff --git a/tools/s3/README.md b/tools/s3/README.md new file mode 100644 index 000000000..ac30347d8 --- /dev/null +++ b/tools/s3/README.md @@ -0,0 +1,12 @@ +# After s3 switches the storage engine, convert the data + +- build +```shell +go build -o s3convert main.go +``` + +- start +```shell +./s3convert -config -name +# ./s3convert -config ./../../config -name minio +``` diff --git a/tools/s3/internal/conversion.go b/tools/s3/internal/conversion.go new file mode 100644 index 000000000..ba2174535 --- /dev/null +++ b/tools/s3/internal/conversion.go @@ -0,0 +1,202 @@ +package internal + +import ( + "context" + "errors" + "fmt" + "github.com/mitchellh/mapstructure" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" + "github.com/openimsdk/tools/db/mongoutil" + "github.com/openimsdk/tools/db/redisutil" + "github.com/openimsdk/tools/s3" + "github.com/openimsdk/tools/s3/aws" + "github.com/openimsdk/tools/s3/cos" + "github.com/openimsdk/tools/s3/kodo" + "github.com/openimsdk/tools/s3/minio" + "github.com/openimsdk/tools/s3/oss" + "github.com/spf13/viper" + "go.mongodb.org/mongo-driver/mongo" + "log" + "net/http" + "path/filepath" + "time" +) + +const defaultTimeout = time.Second * 10 + +func readConf(path string, val any) error { + v := viper.New() + v.SetConfigFile(path) + if err := v.ReadInConfig(); err != nil { + return err + } + fn := func(config *mapstructure.DecoderConfig) { + config.TagName = "mapstructure" + } + return v.Unmarshal(val, fn) +} + +func getS3(path string, name string, thirdConf *config.Third) (s3.Interface, error) { + switch name { + case "minio": + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + var minioConf config.Minio + if err := readConf(filepath.Join(path, minioConf.GetConfigFileName()), &minioConf); err != nil { + return nil, err + } + var redisConf config.Redis + if err := readConf(filepath.Join(path, redisConf.GetConfigFileName()), &redisConf); err != nil { + return nil, err + } + rdb, err := redisutil.NewRedisClient(ctx, redisConf.Build()) + if err != nil { + return nil, err + } + return minio.NewMinio(ctx, redis.NewMinioCache(rdb), *minioConf.Build()) + case "cos": + return cos.NewCos(*thirdConf.Object.Cos.Build()) + case "oss": + return oss.NewOSS(*thirdConf.Object.Oss.Build()) + case "kodo": + return kodo.NewKodo(*thirdConf.Object.Kodo.Build()) + case "aws": + return aws.NewAws(*thirdConf.Object.Aws.Build()) + default: + return nil, fmt.Errorf("invalid object enable: %s", name) + } +} + +func getMongo(path string) (database.ObjectInfo, error) { + var mongoConf config.Mongo + if err := readConf(filepath.Join(path, mongoConf.GetConfigFileName()), &mongoConf); err != nil { + return nil, err + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + mgocli, err := mongoutil.NewMongoDB(ctx, mongoConf.Build()) + if err != nil { + return nil, err + } + return mgo.NewS3Mongo(mgocli.GetDB()) +} + +func Main(path string, engine string) error { + var thirdConf config.Third + if err := readConf(filepath.Join(path, thirdConf.GetConfigFileName()), &thirdConf); err != nil { + return err + } + if thirdConf.Object.Enable == engine { + return errors.New("same s3 storage") + } + s3db, err := getMongo(path) + if err != nil { + return err + } + oldS3, err := getS3(path, engine, &thirdConf) + if err != nil { + return err + } + newS3, err := getS3(path, thirdConf.Object.Enable, &thirdConf) + if err != nil { + return err + } + count, err := getEngineCount(s3db, oldS3.Engine()) + if err != nil { + return err + } + log.Printf("engine %s count: %d", oldS3.Engine(), count) + var skip int + for i := 1; i <= count+1; i++ { + log.Printf("start %d/%d", i, count) + start := time.Now() + res, err := doObject(s3db, newS3, oldS3, skip) + if err != nil { + log.Printf("end [%s] %d/%d error %s", time.Since(start), i, count, err) + return err + } + log.Printf("end [%s] %d/%d result %+v", time.Since(start), i, count, *res) + if res.Skip { + skip++ + } + if res.End { + break + } + } + return nil +} + +func getEngineCount(db database.ObjectInfo, name string) (int, error) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + count, err := db.GetEngineCount(ctx, name) + if err != nil { + return 0, err + } + return int(count), nil +} + +func doObject(db database.ObjectInfo, newS3, oldS3 s3.Interface, skip int) (*Result, error) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + infos, err := db.GetEngineInfo(ctx, oldS3.Engine(), 1, skip) + if err != nil { + return nil, err + } + if len(infos) == 0 { + return &Result{End: true}, nil + } + obj := infos[0] + if _, err := db.Take(ctx, newS3.Engine(), obj.Name); err == nil { + return &Result{Skip: true}, nil + } else if !errors.Is(err, mongo.ErrNoDocuments) { + return nil, err + } + downloadURL, err := oldS3.AccessURL(ctx, obj.Key, time.Hour, &s3.AccessURLOption{}) + if err != nil { + return nil, err + } + putURL, err := newS3.PresignedPutObject(ctx, obj.Key, time.Hour) + if err != nil { + return nil, err + } + downloadResp, err := http.Get(downloadURL) + if err != nil { + return nil, err + } + defer downloadResp.Body.Close() + switch downloadResp.StatusCode { + case http.StatusNotFound: + return &Result{Skip: true}, nil + case http.StatusOK: + default: + return nil, fmt.Errorf("download object failed %s", downloadResp.Status) + } + log.Printf("file size %d", obj.Size) + request, err := http.NewRequest(http.MethodPut, putURL, downloadResp.Body) + if err != nil { + return nil, err + } + putResp, err := http.DefaultClient.Do(request) + if err != nil { + return nil, err + } + defer putResp.Body.Close() + if putResp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("put object failed %s", putResp.Status) + } + ctx, cancel = context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + if err := db.UpdateEngine(ctx, obj.Engine, obj.Name, newS3.Engine()); err != nil { + return nil, err + } + return &Result{}, nil +} + +type Result struct { + Skip bool + End bool +} diff --git a/tools/s3/main.go b/tools/s3/main.go new file mode 100644 index 000000000..1e661c9a7 --- /dev/null +++ b/tools/s3/main.go @@ -0,0 +1,23 @@ +package main + +import ( + "flag" + "fmt" + "github.com/openimsdk/open-im-server/v3/tools/s3/internal" + "os" +) + +func main() { + var ( + name string + config string + ) + flag.StringVar(&name, "name", "", "old previous storage name") + flag.StringVar(&config, "config", "", "config directory") + flag.Parse() + if err := internal.Main(config, name); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + fmt.Fprintln(os.Stdout, "success") +}