|
|
|
@ -3,58 +3,98 @@ package main
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"errors"
|
|
|
|
|
"flag"
|
|
|
|
|
"fmt"
|
|
|
|
|
"github.com/go-sql-driver/mysql"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
|
|
|
|
|
mongoModel "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
|
|
|
|
|
mysqlModel "github.com/openimsdk/open-im-server/v3/tools/data-conversion/openim/mysql/v3"
|
|
|
|
|
"go.mongodb.org/mongo-driver/bson"
|
|
|
|
|
"go.mongodb.org/mongo-driver/mongo"
|
|
|
|
|
"go.mongodb.org/mongo-driver/mongo/options"
|
|
|
|
|
"gorm.io/driver/mysql"
|
|
|
|
|
"gopkg.in/yaml.v3"
|
|
|
|
|
gormMysql "gorm.io/driver/mysql"
|
|
|
|
|
"gorm.io/gorm"
|
|
|
|
|
"log"
|
|
|
|
|
"os"
|
|
|
|
|
"reflect"
|
|
|
|
|
"strconv"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
versionTable = "dataver"
|
|
|
|
|
versionKey = "data_version"
|
|
|
|
|
versionValue = 35
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
func main() {
|
|
|
|
|
var (
|
|
|
|
|
mysqlUsername = "root" // mysql用户名
|
|
|
|
|
mysqlPassword = "openIM123" // mysql密码
|
|
|
|
|
mysqlAddr = "127.0.0.1:13306" // mysql地址
|
|
|
|
|
mysqlDatabase = "openIM_v3" // mysql数据库名字
|
|
|
|
|
)
|
|
|
|
|
var path string
|
|
|
|
|
flag.StringVar(&path, "c", "", "path config file")
|
|
|
|
|
flag.Parse()
|
|
|
|
|
if err := Main(path); err != nil {
|
|
|
|
|
log.Fatal(err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
os.Exit(0)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var s3 = "minio" // 文件储存方式 minio, cos, oss
|
|
|
|
|
func InitConfig(path string) error {
|
|
|
|
|
data, err := os.ReadFile(path)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return yaml.Unmarshal(data, &config.Config)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
mongoUsername = "root" // mongodb用户名
|
|
|
|
|
mongoPassword = "openIM123" // mongodb密码
|
|
|
|
|
mongoHosts = "127.0.0.1:37017" // mongodb地址
|
|
|
|
|
mongoDatabase = "openIM_v3" // mongodb数据库名字
|
|
|
|
|
)
|
|
|
|
|
func GetMysql() (*gorm.DB, error) {
|
|
|
|
|
conf := config.Config.Mysql
|
|
|
|
|
mysqlDSN := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", conf.Username, conf.Password, conf.Address[0], conf.Database)
|
|
|
|
|
return gorm.Open(gormMysql.Open(mysqlDSN), &gorm.Config{ /* Logger: logger.Discard */ })
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
mysqlDSN := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", mysqlUsername, mysqlPassword, mysqlAddr, mysqlDatabase)
|
|
|
|
|
mysqlDB, err := gorm.Open(mysql.Open(mysqlDSN), &gorm.Config{ /* Logger: logger.Discard */ })
|
|
|
|
|
func GetMongo() (*mongo.Database, error) {
|
|
|
|
|
mgo, err := unrelation.NewMongo()
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Println("open mysql db failed", err)
|
|
|
|
|
return
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
log.Println("open mysql db success")
|
|
|
|
|
var mongoURI string
|
|
|
|
|
if mongoPassword != "" && mongoUsername != "" {
|
|
|
|
|
mongoURI = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin", mongoUsername, mongoPassword, mongoHosts, mongoDatabase, 100)
|
|
|
|
|
} else {
|
|
|
|
|
mongoURI = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d&authSource=admin", mongoHosts, mongoDatabase, 100)
|
|
|
|
|
return mgo.GetDatabase(), nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func Main(path string) error {
|
|
|
|
|
if err := InitConfig(path); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if config.Config.Mysql == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
mongoClient, err := mongo.Connect(context.Background(), options.Client().ApplyURI(mongoURI))
|
|
|
|
|
mongoDB, err := GetMongo()
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Println("open mongo db failed", err)
|
|
|
|
|
return
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
var version struct {
|
|
|
|
|
Key string `bson:"key"`
|
|
|
|
|
Value string `bson:"value"`
|
|
|
|
|
}
|
|
|
|
|
switch mongoDB.Collection(versionTable).FindOne(context.Background(), bson.M{"key": versionKey}).Decode(&version) {
|
|
|
|
|
case nil:
|
|
|
|
|
if ver, _ := strconv.Atoi(version.Value); ver >= versionValue {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
case mongo.ErrNoDocuments:
|
|
|
|
|
default:
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
mysqlDB, err := GetMysql()
|
|
|
|
|
if err != nil {
|
|
|
|
|
if mysqlErr, ok := err.(*mysql.MySQLError); ok && mysqlErr.Number == 1049 {
|
|
|
|
|
return nil // database not exist
|
|
|
|
|
}
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
log.Println("open mongo db success")
|
|
|
|
|
mongoDB := mongoClient.Database(mongoDatabase)
|
|
|
|
|
|
|
|
|
|
c := convert{}
|
|
|
|
|
|
|
|
|
|
var c convert
|
|
|
|
|
var tasks []func() error
|
|
|
|
|
tasks = append(tasks,
|
|
|
|
|
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewUserMongo, c.User) },
|
|
|
|
@ -65,16 +105,100 @@ func main() {
|
|
|
|
|
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewGroupMember, c.GroupMember) },
|
|
|
|
|
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewGroupRequestMgo, c.GroupRequest) },
|
|
|
|
|
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewConversationMongo, c.Conversation) },
|
|
|
|
|
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewS3Mongo, c.Object(s3)) },
|
|
|
|
|
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewS3Mongo, c.Object(config.Config.Object.Enable)) },
|
|
|
|
|
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewLogMongo, c.Log) },
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
for _, task := range tasks {
|
|
|
|
|
if err := task(); err != nil {
|
|
|
|
|
log.Println(err)
|
|
|
|
|
return
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
filter := bson.M{"key": versionKey, "value": version.Value}
|
|
|
|
|
update := bson.M{"$set": bson.M{"key": versionKey, "value": strconv.Itoa(versionValue)}}
|
|
|
|
|
if _, err := mongoDB.Collection(versionTable).UpdateOne(context.Background(), filter, update, options.Update().SetUpsert(true)); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewTask A mysql table B mongodb model C mongodb table
|
|
|
|
|
func NewTask[A interface{ TableName() string }, B any, C any](gormDB *gorm.DB, mongoDB *mongo.Database, mongoDBInit func(db *mongo.Database) (B, error), convert func(v A) C) error {
|
|
|
|
|
obj, err := mongoDBInit(mongoDB)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
var zero A
|
|
|
|
|
tableName := zero.TableName()
|
|
|
|
|
coll, err := getColl(obj)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("get mongo collection %s failed, err: %w", tableName, err)
|
|
|
|
|
}
|
|
|
|
|
var count int
|
|
|
|
|
defer func() {
|
|
|
|
|
log.Printf("completed convert %s total %d\n", tableName, count)
|
|
|
|
|
}()
|
|
|
|
|
const batch = 100
|
|
|
|
|
for page := 0; ; page++ {
|
|
|
|
|
res := make([]A, 0, batch)
|
|
|
|
|
if err := gormDB.Limit(batch).Offset(page * batch).Find(&res).Error; err != nil {
|
|
|
|
|
if mysqlErr, ok := err.(*mysql.MySQLError); ok && mysqlErr.Number == 1146 {
|
|
|
|
|
return nil // table not exist
|
|
|
|
|
}
|
|
|
|
|
return fmt.Errorf("find mysql table %s failed, err: %w", tableName, err)
|
|
|
|
|
}
|
|
|
|
|
if len(res) == 0 {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
temp := make([]any, len(res))
|
|
|
|
|
for i := range res {
|
|
|
|
|
temp[i] = convert(res[i])
|
|
|
|
|
}
|
|
|
|
|
if err := insertMany(coll, temp); err != nil {
|
|
|
|
|
return fmt.Errorf("insert mongo table %s failed, err: %w", tableName, err)
|
|
|
|
|
}
|
|
|
|
|
count += len(res)
|
|
|
|
|
if len(res) < batch {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
log.Printf("current convert %s completed %d\n", tableName, count)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func insertMany(coll *mongo.Collection, objs []any) error {
|
|
|
|
|
if _, err := coll.InsertMany(context.Background(), objs); err != nil {
|
|
|
|
|
if !mongo.IsDuplicateKeyError(err) {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
for i := range objs {
|
|
|
|
|
_, err := coll.InsertOne(context.Background(), objs[i])
|
|
|
|
|
switch {
|
|
|
|
|
case err == nil:
|
|
|
|
|
case mongo.IsDuplicateKeyError(err):
|
|
|
|
|
default:
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func getColl(obj any) (_ *mongo.Collection, err error) {
|
|
|
|
|
defer func() {
|
|
|
|
|
if e := recover(); e != nil {
|
|
|
|
|
err = fmt.Errorf("not found %+v", e)
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
stu := reflect.ValueOf(obj).Elem()
|
|
|
|
|
typ := reflect.TypeOf(&mongo.Collection{}).String()
|
|
|
|
|
for i := 0; i < stu.NumField(); i++ {
|
|
|
|
|
field := stu.Field(i)
|
|
|
|
|
if field.Type().String() == typ {
|
|
|
|
|
return (*mongo.Collection)(field.UnsafePointer()), nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil, errors.New("not found")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type convert struct{}
|
|
|
|
@ -232,60 +356,3 @@ func (convert) Log(v mysqlModel.Log) mongoModel.LogModel {
|
|
|
|
|
Ex: v.Ex,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewTask A mysql table B mongodb model C mongodb table
|
|
|
|
|
func NewTask[A interface{ TableName() string }, B any, C any](gormDB *gorm.DB, mongoDB *mongo.Database, mongoDBInit func(db *mongo.Database) (B, error), convert func(v A) C) error {
|
|
|
|
|
obj, err := mongoDBInit(mongoDB)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
var zero A
|
|
|
|
|
tableName := zero.TableName()
|
|
|
|
|
coll, err := getColl(obj)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("get mongo collection %s failed, err: %w", tableName, err)
|
|
|
|
|
}
|
|
|
|
|
var count int
|
|
|
|
|
defer func() {
|
|
|
|
|
log.Printf("completed convert %s total %d\n", tableName, count)
|
|
|
|
|
}()
|
|
|
|
|
const batch = 100
|
|
|
|
|
for page := 0; ; page++ {
|
|
|
|
|
res := make([]A, 0, batch)
|
|
|
|
|
if err := gormDB.Limit(batch).Offset(page * batch).Find(&res).Error; err != nil {
|
|
|
|
|
return fmt.Errorf("find mysql table %s failed, err: %w", tableName, err)
|
|
|
|
|
}
|
|
|
|
|
if len(res) == 0 {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
temp := make([]any, len(res))
|
|
|
|
|
for i := range res {
|
|
|
|
|
temp[i] = convert(res[i])
|
|
|
|
|
}
|
|
|
|
|
if _, err := coll.InsertMany(context.Background(), temp); err != nil {
|
|
|
|
|
return fmt.Errorf("insert mongo table %s failed, err: %w", tableName, err)
|
|
|
|
|
}
|
|
|
|
|
count += len(res)
|
|
|
|
|
if len(res) < batch {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
log.Printf("current convert %s completed %d\n", tableName, count)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func getColl(obj any) (_ *mongo.Collection, err error) {
|
|
|
|
|
defer func() {
|
|
|
|
|
if e := recover(); e != nil {
|
|
|
|
|
err = fmt.Errorf("not found %+v", e)
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
stu := reflect.ValueOf(obj).Elem()
|
|
|
|
|
typ := reflect.TypeOf(&mongo.Collection{}).String()
|
|
|
|
|
for i := 0; i < stu.NumField(); i++ {
|
|
|
|
|
field := stu.Field(i)
|
|
|
|
|
if field.Type().String() == typ {
|
|
|
|
|
return (*mongo.Collection)(field.UnsafePointer()), nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil, errors.New("not found")
|
|
|
|
|
}
|