From 33e644e4fdfbe05c9c90fdb07ebbcdbe1390b6a6 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Wed, 29 Nov 2023 15:39:01 +0800 Subject: [PATCH] up35 --- go.work | 2 +- pkg/common/config/config.go | 24 +-- tools/component/component.go | 3 + tools/component/component_test.go | 47 +++++- tools/mysql2mongo/README.md | 37 ----- tools/{mysql2mongo => up35}/go.mod | 12 +- tools/{mysql2mongo => up35}/go.sum | 17 ++ tools/{mysql2mongo => up35}/main.go | 249 ++++++++++++++++++---------- 8 files changed, 247 insertions(+), 144 deletions(-) delete mode 100644 tools/mysql2mongo/README.md rename tools/{mysql2mongo => up35}/go.mod (77%) rename tools/{mysql2mongo => up35}/go.sum (86%) rename tools/{mysql2mongo => up35}/main.go (74%) diff --git a/go.work b/go.work index 5c6c91a53..63100b9f6 100644 --- a/go.work +++ b/go.work @@ -13,5 +13,5 @@ use ( ./tools/component ./tools/url2im ./tools/data-conversion - ./tools/mysql2mongo + ./tools/up35 ) diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index fdb1cee00..a75a76ed7 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -45,6 +45,18 @@ type POfflinePush struct { Ext string `yaml:"ext"` } +type MYSQL struct { + Address []string `yaml:"address"` + Username string `yaml:"username"` + Password string `yaml:"password"` + Database string `yaml:"database"` + MaxOpenConn int `yaml:"maxOpenConn"` + MaxIdleConn int `yaml:"maxIdleConn"` + MaxLifeTime int `yaml:"maxLifeTime"` + LogLevel int `yaml:"logLevel"` + SlowThreshold int `yaml:"slowThreshold"` +} + type configStruct struct { Envs struct { Discovery string `yaml:"discovery"` @@ -56,17 +68,7 @@ type configStruct struct { Password string `yaml:"password"` } `yaml:"zookeeper"` - Mysql struct { - Address []string `yaml:"address"` - Username string `yaml:"username"` - Password string `yaml:"password"` - Database string `yaml:"database"` - MaxOpenConn int `yaml:"maxOpenConn"` - MaxIdleConn int `yaml:"maxIdleConn"` - MaxLifeTime int `yaml:"maxLifeTime"` - LogLevel int `yaml:"logLevel"` - SlowThreshold int `yaml:"slowThreshold"` - } `yaml:"mysql"` + Mysql *MYSQL `yaml:"mysql"` Mongo struct { Uri string `yaml:"uri"` diff --git a/tools/component/component.go b/tools/component/component.go index bb93a5b44..140313c30 100644 --- a/tools/component/component.go +++ b/tools/component/component.go @@ -133,6 +133,9 @@ func exactIP(urll string) string { } func checkMysql() error { + if config.Config.Mysql == nil { + return nil + } var sqlDB *sql.DB defer func() { if sqlDB != nil { diff --git a/tools/component/component_test.go b/tools/component/component_test.go index afa51ef2c..c4f7f7e16 100644 --- a/tools/component/component_test.go +++ b/tools/component/component_test.go @@ -15,11 +15,14 @@ package main import ( + "context" + "strconv" "testing" - - "github.com/stretchr/testify/assert" + "time" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/assert" ) func TestCheckMysql(t *testing.T) { @@ -40,3 +43,43 @@ func mockInitCfg() error { config.Config.Mysql.Address = []string{"127.0.0.1:13306"} return nil } + +func TestRedis(t *testing.T) { + config.Config.Redis.Address = []string{ + "172.16.8.142:7000", + //"172.16.8.142:7000", "172.16.8.142:7001", "172.16.8.142:7002", "172.16.8.142:7003", "172.16.8.142:7004", "172.16.8.142:7005", + } + + var redisClient redis.UniversalClient + defer func() { + if redisClient != nil { + redisClient.Close() + } + }() + if len(config.Config.Redis.Address) > 1 { + redisClient = redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: config.Config.Redis.Address, + Username: config.Config.Redis.Username, + Password: config.Config.Redis.Password, + }) + } else { + redisClient = redis.NewClient(&redis.Options{ + Addr: config.Config.Redis.Address[0], + Username: config.Config.Redis.Username, + Password: config.Config.Redis.Password, + }) + } + _, err := redisClient.Ping(context.Background()).Result() + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 1000000; i++ { + val, err := redisClient.Set(context.Background(), "b_"+strconv.Itoa(i), "test", time.Second*10).Result() + t.Log("index", i, "resp", val, "err", err) + if err != nil { + return + } + } + +} diff --git a/tools/mysql2mongo/README.md b/tools/mysql2mongo/README.md deleted file mode 100644 index 3d9448332..000000000 --- a/tools/mysql2mongo/README.md +++ /dev/null @@ -1,37 +0,0 @@ -# OpenIM V3.4.0 至 V3.5.0 数据迁移指南 - ---- -从3.5.0起,我们从MySQL切换到了MongoDB,这意味着您需要将数据从MySQL迁移到MongoDB。我们提供了一个工具来帮助您完成这项工作。本次迁移完成后完全兼容v3之前的数据。 - -### 1. 数据备份 - -在开始数据迁移之前,强烈建议备份所有相关的数据以防止任何可能的数据丢失。 - -### 2. 迁移数据 - -+ 位置: `open-im-server/tools/mysql2mongo/main.go` - -```bash -// 数据库配置 -var ( - mysqlUsername = "root" // mysql用户名 - mysqlPassword = "openIM123" // mysql密码 - mysqlAddr = "127.0.0.1:13306" // mysql地址 - mysqlDatabase = "openIM_v3" // mysql数据库名字 -) - -var s3 = "minio" // 文件储存方式 minio, cos, oss - -var ( - mongoUsername = "root" // mongodb用户名 - mongoPassword = "openIM123" // mongodb密码 - mongoHosts = "127.0.0.1:13306" // mongodb地址 - mongoDatabase = "openIM_v3" // mongodb数据库名字 -) -``` - -**执行数据迁移命令:** - -```bash -go run main.go -``` \ No newline at end of file diff --git a/tools/mysql2mongo/go.mod b/tools/up35/go.mod similarity index 77% rename from tools/mysql2mongo/go.mod rename to tools/up35/go.mod index 80de0287f..5ae3907bf 100644 --- a/tools/mysql2mongo/go.mod +++ b/tools/up35/go.mod @@ -1,24 +1,29 @@ -module github.com/openimsdk/open-im-server/v3/tools/mysql2mongo +module github.com/openimsdk/open-im-server/v3/tools/up35 go 1.19 require ( + github.com/go-sql-driver/mysql v1.7.1 github.com/openimsdk/open-im-server/v3 v3.5.0 github.com/openimsdk/open-im-server/v3/tools/data-conversion v0.0.0-00010101000000-000000000000 go.mongodb.org/mongo-driver v1.12.1 + gopkg.in/yaml.v3 v3.0.1 gorm.io/driver/mysql v1.5.1 gorm.io/gorm v1.25.4 ) require ( + github.com/OpenIMSDK/protocol v0.0.31 // indirect + github.com/OpenIMSDK/tools v0.0.18 // indirect github.com/bwmarrin/snowflake v0.3.0 // indirect - github.com/go-sql-driver/mysql v1.7.1 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/jinzhu/copier v0.4.0 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/klauspost/compress v1.16.7 // indirect + github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect + github.com/lestrrat-go/strftime v1.0.6 // indirect github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 // indirect github.com/pkg/errors v0.9.1 // indirect @@ -26,6 +31,9 @@ require ( github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect + go.uber.org/atomic v1.7.0 // indirect + go.uber.org/multierr v1.6.0 // indirect + go.uber.org/zap v1.24.0 // indirect golang.org/x/crypto v0.14.0 // indirect golang.org/x/image v0.13.0 // indirect golang.org/x/net v0.17.0 // indirect diff --git a/tools/mysql2mongo/go.sum b/tools/up35/go.sum similarity index 86% rename from tools/mysql2mongo/go.sum rename to tools/up35/go.sum index b48f15e9f..1a81e5b33 100644 --- a/tools/mysql2mongo/go.sum +++ b/tools/up35/go.sum @@ -2,8 +2,10 @@ github.com/OpenIMSDK/protocol v0.0.31 h1:ax43x9aqA6EKNXNukS5MT5BSTqkUmwO4uTvbJLt github.com/OpenIMSDK/protocol v0.0.31/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= github.com/OpenIMSDK/tools v0.0.18 h1:h3CvKB90DNd2aIJcOQ99cqgeW6C0na0PzR1TNsfxwL0= github.com/OpenIMSDK/tools v0.0.18/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= +github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0= github.com/bwmarrin/snowflake v0.3.0/go.mod h1:NdZxfVWX+oR6y2K0o6qAYv6gIOP9rjG0/E9WsDpxqwE= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= @@ -24,11 +26,16 @@ github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8= +github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is= github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible h1:Y6sqxHMyB1D2YSzWkLibYKgg+SwmyFU9dF2hn6MdTj4= +github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible/go.mod h1:ZQnN8lSECaebrkQytbHj4xNgtg8CR7RYXnPok8e0EHA= github.com/lestrrat-go/strftime v1.0.6 h1:CFGsDEt1pOpFNU+TJB0nhz9jl+K0hZSLE205AhTIGQQ= +github.com/lestrrat-go/strftime v1.0.6/go.mod h1:f7jQKgV5nnJpYgdEasS+/y7EsTb8ykN2z68n3TtcTaw= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 h1:zYyBkD/k9seD2A7fsi6Oo2LfFZAehjjQMERAvZLEDnQ= @@ -36,6 +43,9 @@ github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646/go.mod h1:jpp1/29i3P1S github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= @@ -49,8 +59,12 @@ github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5t go.mongodb.org/mongo-driver v1.12.1 h1:nLkghSU8fQNaK7oUmDhQFsnrtcoNy7Z6LVFKsEecqgE= go.mongodb.org/mongo-driver v1.12.1/go.mod h1:/rGBTebI3XYboVmgz+Wv3Bcbl3aD0QF9zl6kDDw18rQ= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= +go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= +go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= @@ -100,7 +114,10 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gorm.io/driver/mysql v1.5.1 h1:WUEH5VF9obL/lTtzjmML/5e6VfFR/788coz2uaVCAZw= gorm.io/driver/mysql v1.5.1/go.mod h1:Jo3Xu7mMhCyj8dlrb3WoCaRd1FhsVh+yMXb1jUInf5o= gorm.io/gorm v1.25.1/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k= diff --git a/tools/mysql2mongo/main.go b/tools/up35/main.go similarity index 74% rename from tools/mysql2mongo/main.go rename to tools/up35/main.go index 35751fd4a..a5fcf3f5e 100644 --- a/tools/mysql2mongo/main.go +++ b/tools/up35/main.go @@ -3,58 +3,98 @@ package main import ( "context" "errors" + "flag" "fmt" + "github.com/go-sql-driver/mysql" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo" mongoModel "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" mysqlModel "github.com/openimsdk/open-im-server/v3/tools/data-conversion/openim/mysql/v3" + "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" - "gorm.io/driver/mysql" + "gopkg.in/yaml.v3" + gormMysql "gorm.io/driver/mysql" "gorm.io/gorm" "log" + "os" "reflect" + "strconv" +) + +const ( + versionTable = "dataver" + versionKey = "data_version" + versionValue = 35 ) func main() { - var ( - mysqlUsername = "root" // mysql用户名 - mysqlPassword = "openIM123" // mysql密码 - mysqlAddr = "127.0.0.1:13306" // mysql地址 - mysqlDatabase = "openIM_v3" // mysql数据库名字 - ) + var path string + flag.StringVar(&path, "c", "", "path config file") + flag.Parse() + if err := Main(path); err != nil { + log.Fatal(err) + return + } + os.Exit(0) +} - var s3 = "minio" // 文件储存方式 minio, cos, oss +func InitConfig(path string) error { + data, err := os.ReadFile(path) + if err != nil { + return err + } + return yaml.Unmarshal(data, &config.Config) +} - var ( - mongoUsername = "root" // mongodb用户名 - mongoPassword = "openIM123" // mongodb密码 - mongoHosts = "127.0.0.1:37017" // mongodb地址 - mongoDatabase = "openIM_v3" // mongodb数据库名字 - ) +func GetMysql() (*gorm.DB, error) { + conf := config.Config.Mysql + mysqlDSN := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", conf.Username, conf.Password, conf.Address[0], conf.Database) + return gorm.Open(gormMysql.Open(mysqlDSN), &gorm.Config{ /* Logger: logger.Discard */ }) +} - mysqlDSN := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", mysqlUsername, mysqlPassword, mysqlAddr, mysqlDatabase) - mysqlDB, err := gorm.Open(mysql.Open(mysqlDSN), &gorm.Config{ /* Logger: logger.Discard */ }) +func GetMongo() (*mongo.Database, error) { + mgo, err := unrelation.NewMongo() if err != nil { - log.Println("open mysql db failed", err) - return + return nil, err } - log.Println("open mysql db success") - var mongoURI string - if mongoPassword != "" && mongoUsername != "" { - mongoURI = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin", mongoUsername, mongoPassword, mongoHosts, mongoDatabase, 100) - } else { - mongoURI = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d&authSource=admin", mongoHosts, mongoDatabase, 100) + return mgo.GetDatabase(), nil +} + +func Main(path string) error { + if err := InitConfig(path); err != nil { + return err + } + if config.Config.Mysql == nil { + return nil } - mongoClient, err := mongo.Connect(context.Background(), options.Client().ApplyURI(mongoURI)) + mongoDB, err := GetMongo() if err != nil { - log.Println("open mongo db failed", err) - return + return err + } + var version struct { + Key string `bson:"key"` + Value string `bson:"value"` + } + switch mongoDB.Collection(versionTable).FindOne(context.Background(), bson.M{"key": versionKey}).Decode(&version) { + case nil: + if ver, _ := strconv.Atoi(version.Value); ver >= versionValue { + return nil + } + case mongo.ErrNoDocuments: + default: + return err + } + mysqlDB, err := GetMysql() + if err != nil { + if mysqlErr, ok := err.(*mysql.MySQLError); ok && mysqlErr.Number == 1049 { + return nil // database not exist + } + return err } - log.Println("open mongo db success") - mongoDB := mongoClient.Database(mongoDatabase) - - c := convert{} + var c convert var tasks []func() error tasks = append(tasks, func() error { return NewTask(mysqlDB, mongoDB, mgo.NewUserMongo, c.User) }, @@ -65,16 +105,100 @@ func main() { func() error { return NewTask(mysqlDB, mongoDB, mgo.NewGroupMember, c.GroupMember) }, func() error { return NewTask(mysqlDB, mongoDB, mgo.NewGroupRequestMgo, c.GroupRequest) }, func() error { return NewTask(mysqlDB, mongoDB, mgo.NewConversationMongo, c.Conversation) }, - func() error { return NewTask(mysqlDB, mongoDB, mgo.NewS3Mongo, c.Object(s3)) }, + func() error { return NewTask(mysqlDB, mongoDB, mgo.NewS3Mongo, c.Object(config.Config.Object.Enable)) }, func() error { return NewTask(mysqlDB, mongoDB, mgo.NewLogMongo, c.Log) }, ) for _, task := range tasks { if err := task(); err != nil { - log.Println(err) - return + return err + } + } + + filter := bson.M{"key": versionKey, "value": version.Value} + update := bson.M{"$set": bson.M{"key": versionKey, "value": strconv.Itoa(versionValue)}} + if _, err := mongoDB.Collection(versionTable).UpdateOne(context.Background(), filter, update, options.Update().SetUpsert(true)); err != nil { + return err + } + return nil +} + +// NewTask A mysql table B mongodb model C mongodb table +func NewTask[A interface{ TableName() string }, B any, C any](gormDB *gorm.DB, mongoDB *mongo.Database, mongoDBInit func(db *mongo.Database) (B, error), convert func(v A) C) error { + obj, err := mongoDBInit(mongoDB) + if err != nil { + return err + } + var zero A + tableName := zero.TableName() + coll, err := getColl(obj) + if err != nil { + return fmt.Errorf("get mongo collection %s failed, err: %w", tableName, err) + } + var count int + defer func() { + log.Printf("completed convert %s total %d\n", tableName, count) + }() + const batch = 100 + for page := 0; ; page++ { + res := make([]A, 0, batch) + if err := gormDB.Limit(batch).Offset(page * batch).Find(&res).Error; err != nil { + if mysqlErr, ok := err.(*mysql.MySQLError); ok && mysqlErr.Number == 1146 { + return nil // table not exist + } + return fmt.Errorf("find mysql table %s failed, err: %w", tableName, err) + } + if len(res) == 0 { + return nil + } + temp := make([]any, len(res)) + for i := range res { + temp[i] = convert(res[i]) + } + if err := insertMany(coll, temp); err != nil { + return fmt.Errorf("insert mongo table %s failed, err: %w", tableName, err) + } + count += len(res) + if len(res) < batch { + return nil + } + log.Printf("current convert %s completed %d\n", tableName, count) + } +} + +func insertMany(coll *mongo.Collection, objs []any) error { + if _, err := coll.InsertMany(context.Background(), objs); err != nil { + if !mongo.IsDuplicateKeyError(err) { + return err } } + for i := range objs { + _, err := coll.InsertOne(context.Background(), objs[i]) + switch { + case err == nil: + case mongo.IsDuplicateKeyError(err): + default: + return err + } + } + return nil +} + +func getColl(obj any) (_ *mongo.Collection, err error) { + defer func() { + if e := recover(); e != nil { + err = fmt.Errorf("not found %+v", e) + } + }() + stu := reflect.ValueOf(obj).Elem() + typ := reflect.TypeOf(&mongo.Collection{}).String() + for i := 0; i < stu.NumField(); i++ { + field := stu.Field(i) + if field.Type().String() == typ { + return (*mongo.Collection)(field.UnsafePointer()), nil + } + } + return nil, errors.New("not found") } type convert struct{} @@ -232,60 +356,3 @@ func (convert) Log(v mysqlModel.Log) mongoModel.LogModel { Ex: v.Ex, } } - -// NewTask A mysql table B mongodb model C mongodb table -func NewTask[A interface{ TableName() string }, B any, C any](gormDB *gorm.DB, mongoDB *mongo.Database, mongoDBInit func(db *mongo.Database) (B, error), convert func(v A) C) error { - obj, err := mongoDBInit(mongoDB) - if err != nil { - return err - } - var zero A - tableName := zero.TableName() - coll, err := getColl(obj) - if err != nil { - return fmt.Errorf("get mongo collection %s failed, err: %w", tableName, err) - } - var count int - defer func() { - log.Printf("completed convert %s total %d\n", tableName, count) - }() - const batch = 100 - for page := 0; ; page++ { - res := make([]A, 0, batch) - if err := gormDB.Limit(batch).Offset(page * batch).Find(&res).Error; err != nil { - return fmt.Errorf("find mysql table %s failed, err: %w", tableName, err) - } - if len(res) == 0 { - return nil - } - temp := make([]any, len(res)) - for i := range res { - temp[i] = convert(res[i]) - } - if _, err := coll.InsertMany(context.Background(), temp); err != nil { - return fmt.Errorf("insert mongo table %s failed, err: %w", tableName, err) - } - count += len(res) - if len(res) < batch { - return nil - } - log.Printf("current convert %s completed %d\n", tableName, count) - } -} - -func getColl(obj any) (_ *mongo.Collection, err error) { - defer func() { - if e := recover(); e != nil { - err = fmt.Errorf("not found %+v", e) - } - }() - stu := reflect.ValueOf(obj).Elem() - typ := reflect.TypeOf(&mongo.Collection{}).String() - for i := 0; i < stu.NumField(); i++ { - field := stu.Field(i) - if field.Type().String() == typ { - return (*mongo.Collection)(field.UnsafePointer()), nil - } - } - return nil, errors.New("not found") -}