diff --git a/go.mod b/go.mod index 6cf2089de..11d374d08 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,8 @@ go 1.19 require ( firebase.google.com/go v3.13.0+incompatible + github.com/OpenIMSDK/protocol v0.0.31 + github.com/OpenIMSDK/tools v0.0.20 github.com/bwmarrin/snowflake v0.3.0 // indirect github.com/dtm-labs/rockscache v0.1.1 github.com/gin-gonic/gin v1.9.1 @@ -33,8 +35,6 @@ require github.com/google/uuid v1.3.1 require ( github.com/IBM/sarama v1.41.3 - github.com/OpenIMSDK/protocol v0.0.31 - github.com/OpenIMSDK/tools v0.0.18 github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible github.com/go-redis/redis v6.15.9+incompatible github.com/redis/go-redis/v9 v9.2.1 diff --git a/go.sum b/go.sum index 2f9198c9f..30e4b3cb4 100644 --- a/go.sum +++ b/go.sum @@ -20,8 +20,8 @@ github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c= github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= github.com/OpenIMSDK/protocol v0.0.31 h1:ax43x9aqA6EKNXNukS5MT5BSTqkUmwO4uTvbJLtzCgE= github.com/OpenIMSDK/protocol v0.0.31/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= -github.com/OpenIMSDK/tools v0.0.18 h1:h3CvKB90DNd2aIJcOQ99cqgeW6C0na0PzR1TNsfxwL0= -github.com/OpenIMSDK/tools v0.0.18/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= +github.com/OpenIMSDK/tools v0.0.20 h1:zBTjQZRJ5lR1FIzP9mtWyAvh5dKsmJXQugi4p8X/97k= +github.com/OpenIMSDK/tools v0.0.20/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7 h1:uSoVVbwJiQipAclBbw+8quDsfcvFjOpI5iCf4p/cqCs= github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs= diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index 7f22c9d0d..224efe3ed 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -16,7 +16,6 @@ package friend import ( "context" - "github.com/OpenIMSDK/tools/tx" "github.com/OpenIMSDK/protocol/sdkws" diff --git a/tools/up35/pkg/convert.go b/tools/up35/pkg/convert.go new file mode 100644 index 000000000..91fdb474e --- /dev/null +++ b/tools/up35/pkg/convert.go @@ -0,0 +1,227 @@ +package pkg + +import ( + mongoModel "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" + mysqlModel "github.com/openimsdk/open-im-server/v3/tools/data-conversion/openim/mysql/v3" + mongoModelRtc "github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table" + mysqlModelRtc "github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mysql" + "time" +) + +type convert struct{} + +func (convert) User(v mysqlModel.UserModel) mongoModel.UserModel { + return mongoModel.UserModel{ + UserID: v.UserID, + Nickname: v.Nickname, + FaceURL: v.FaceURL, + Ex: v.Ex, + AppMangerLevel: v.AppMangerLevel, + GlobalRecvMsgOpt: v.GlobalRecvMsgOpt, + CreateTime: v.CreateTime, + } +} + +func (convert) Friend(v mysqlModel.FriendModel) mongoModel.FriendModel { + return mongoModel.FriendModel{ + OwnerUserID: v.OwnerUserID, + FriendUserID: v.FriendUserID, + Remark: v.Remark, + CreateTime: v.CreateTime, + AddSource: v.AddSource, + OperatorUserID: v.OperatorUserID, + Ex: v.Ex, + } +} + +func (convert) FriendRequest(v mysqlModel.FriendRequestModel) mongoModel.FriendRequestModel { + return mongoModel.FriendRequestModel{ + FromUserID: v.FromUserID, + ToUserID: v.ToUserID, + HandleResult: v.HandleResult, + ReqMsg: v.ReqMsg, + CreateTime: v.CreateTime, + HandlerUserID: v.HandlerUserID, + HandleMsg: v.HandleMsg, + HandleTime: v.HandleTime, + Ex: v.Ex, + } +} + +func (convert) Black(v mysqlModel.BlackModel) mongoModel.BlackModel { + return mongoModel.BlackModel{ + OwnerUserID: v.OwnerUserID, + BlockUserID: v.BlockUserID, + CreateTime: v.CreateTime, + AddSource: v.AddSource, + OperatorUserID: v.OperatorUserID, + Ex: v.Ex, + } +} + +func (convert) Group(v mysqlModel.GroupModel) mongoModel.GroupModel { + return mongoModel.GroupModel{ + GroupID: v.GroupID, + GroupName: v.GroupName, + Notification: v.Notification, + Introduction: v.Introduction, + FaceURL: v.FaceURL, + CreateTime: v.CreateTime, + Ex: v.Ex, + Status: v.Status, + CreatorUserID: v.CreatorUserID, + GroupType: v.GroupType, + NeedVerification: v.NeedVerification, + LookMemberInfo: v.LookMemberInfo, + ApplyMemberFriend: v.ApplyMemberFriend, + NotificationUpdateTime: v.NotificationUpdateTime, + NotificationUserID: v.NotificationUserID, + } +} + +func (convert) GroupMember(v mysqlModel.GroupMemberModel) mongoModel.GroupMemberModel { + return mongoModel.GroupMemberModel{ + GroupID: v.GroupID, + UserID: v.UserID, + Nickname: v.Nickname, + FaceURL: v.FaceURL, + RoleLevel: v.RoleLevel, + JoinTime: v.JoinTime, + JoinSource: v.JoinSource, + InviterUserID: v.InviterUserID, + OperatorUserID: v.OperatorUserID, + MuteEndTime: v.MuteEndTime, + Ex: v.Ex, + } +} + +func (convert) GroupRequest(v mysqlModel.GroupRequestModel) mongoModel.GroupRequestModel { + return mongoModel.GroupRequestModel{ + UserID: v.UserID, + GroupID: v.GroupID, + HandleResult: v.HandleResult, + ReqMsg: v.ReqMsg, + HandledMsg: v.HandledMsg, + ReqTime: v.ReqTime, + HandleUserID: v.HandleUserID, + HandledTime: v.HandledTime, + JoinSource: v.JoinSource, + InviterUserID: v.InviterUserID, + Ex: v.Ex, + } +} + +func (convert) Conversation(v mysqlModel.ConversationModel) mongoModel.ConversationModel { + return mongoModel.ConversationModel{ + OwnerUserID: v.OwnerUserID, + ConversationID: v.ConversationID, + ConversationType: v.ConversationType, + UserID: v.UserID, + GroupID: v.GroupID, + RecvMsgOpt: v.RecvMsgOpt, + IsPinned: v.IsPinned, + IsPrivateChat: v.IsPrivateChat, + BurnDuration: v.BurnDuration, + GroupAtType: v.GroupAtType, + AttachedInfo: v.AttachedInfo, + Ex: v.Ex, + MaxSeq: v.MaxSeq, + MinSeq: v.MinSeq, + CreateTime: v.CreateTime, + IsMsgDestruct: v.IsMsgDestruct, + MsgDestructTime: v.MsgDestructTime, + LatestMsgDestructTime: v.LatestMsgDestructTime, + } +} + +func (convert) Object(engine string) func(v mysqlModel.ObjectModel) mongoModel.ObjectModel { + return func(v mysqlModel.ObjectModel) mongoModel.ObjectModel { + return mongoModel.ObjectModel{ + Name: v.Name, + UserID: v.UserID, + Hash: v.Hash, + Engine: engine, + Key: v.Key, + Size: v.Size, + ContentType: v.ContentType, + Group: v.Cause, + CreateTime: v.CreateTime, + } + } +} + +func (convert) Log(v mysqlModel.Log) mongoModel.LogModel { + return mongoModel.LogModel{ + LogID: v.LogID, + Platform: v.Platform, + UserID: v.UserID, + CreateTime: v.CreateTime, + Url: v.Url, + FileName: v.FileName, + SystemType: v.SystemType, + Version: v.Version, + Ex: v.Ex, + } +} + +func (convert) SignalModel(v mysqlModelRtc.SignalModel) mongoModelRtc.SignalModel { + return mongoModelRtc.SignalModel{ + SID: v.SID, + InviterUserID: v.InviterUserID, + CustomData: v.CustomData, + GroupID: v.GroupID, + RoomID: v.RoomID, + Timeout: v.Timeout, + MediaType: v.MediaType, + PlatformID: v.PlatformID, + SessionType: v.SessionType, + InitiateTime: v.InitiateTime, + EndTime: v.EndTime, + FileURL: v.FileURL, + Title: v.Title, + Desc: v.Desc, + Ex: v.Ex, + IOSPushSound: v.IOSPushSound, + IOSBadgeCount: v.IOSBadgeCount, + SignalInfo: v.SignalInfo, + } +} + +func (convert) SignalInvitationModel(v mysqlModelRtc.SignalInvitationModel) mongoModelRtc.SignalInvitationModel { + return mongoModelRtc.SignalInvitationModel{ + SID: v.SID, + UserID: v.UserID, + Status: v.Status, + InitiateTime: v.InitiateTime, + HandleTime: v.HandleTime, + } +} + +func (convert) Meeting(v mysqlModelRtc.MeetingInfo) mongoModelRtc.MeetingInfo { + return mongoModelRtc.MeetingInfo{ + RoomID: v.RoomID, + MeetingName: v.MeetingName, + HostUserID: v.HostUserID, + Status: v.Status, + StartTime: time.Unix(v.StartTime, 0), + EndTime: time.Unix(v.EndTime, 0), + CreateTime: v.CreateTime, + Ex: v.Ex, + } +} + +func (convert) MeetingInvitationInfo(v mysqlModelRtc.MeetingInvitationInfo) mongoModelRtc.MeetingInvitationInfo { + return mongoModelRtc.MeetingInvitationInfo{ + RoomID: v.RoomID, + UserID: v.UserID, + CreateTime: v.CreateTime, + } +} + +func (convert) MeetingVideoRecord(v mysqlModelRtc.MeetingVideoRecord) mongoModelRtc.MeetingVideoRecord { + return mongoModelRtc.MeetingVideoRecord{ + RoomID: v.RoomID, + FileURL: v.FileURL, + CreateTime: v.CreateTime, + } +} diff --git a/tools/up35/pkg/internal/rtc/mongo/mgo/meeting.go b/tools/up35/pkg/internal/rtc/mongo/mgo/meeting.go new file mode 100644 index 000000000..e3bab7af9 --- /dev/null +++ b/tools/up35/pkg/internal/rtc/mongo/mgo/meeting.go @@ -0,0 +1,86 @@ +package mgo + +import ( + "context" + "github.com/OpenIMSDK/tools/mgoutil" + "github.com/OpenIMSDK/tools/pagination" + "github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "time" +) + +func NewMeeting(db *mongo.Database) (table.MeetingInterface, error) { + coll := db.Collection("meeting") + _, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ + { + Keys: bson.D{ + {Key: "room_id", Value: 1}, + }, + Options: options.Index().SetUnique(true), + }, + { + Keys: bson.D{ + {Key: "host_user_id", Value: 1}, + }, + }, + { + Keys: bson.D{ + {Key: "create_time", Value: -1}, + }, + }, + }) + if err != nil { + return nil, err + } + return &meeting{coll: coll}, nil +} + +type meeting struct { + coll *mongo.Collection +} + +func (x *meeting) Find(ctx context.Context, roomIDs []string) ([]*table.MeetingInfo, error) { + return mgoutil.Find[*table.MeetingInfo](ctx, x.coll, bson.M{"room_id": bson.M{"$in": roomIDs}}) +} + +func (x *meeting) CreateMeetingInfo(ctx context.Context, meetingInfo *table.MeetingInfo) error { + return mgoutil.InsertMany(ctx, x.coll, []*table.MeetingInfo{meetingInfo}) +} + +func (x *meeting) UpdateMeetingInfo(ctx context.Context, roomID string, update map[string]any) error { + if len(update) == 0 { + return nil + } + return mgoutil.UpdateOne(ctx, x.coll, bson.M{"room_id": roomID}, bson.M{"$set": update}, false) +} + +func (x *meeting) GetUnCompleteMeetingIDList(ctx context.Context, roomIDs []string) ([]string, error) { + if len(roomIDs) == 0 { + return nil, nil + } + return mgoutil.Find[string](ctx, x.coll, bson.M{"room_id": bson.M{"$in": roomIDs}, "status": 0}, options.Find().SetProjection(bson.M{"_id": 0, "room_id": 1})) +} + +func (x *meeting) Delete(ctx context.Context, roomIDs []string) error { + return mgoutil.DeleteMany(ctx, x.coll, bson.M{"room_id": bson.M{"$in": roomIDs}}) +} + +func (x *meeting) GetMeetingRecords(ctx context.Context, hostUserID string, startTime, endTime time.Time, pagination pagination.Pagination) (int64, []*table.MeetingInfo, error) { + var and []bson.M + if hostUserID != "" { + and = append(and, bson.M{"host_user_id": hostUserID}) + } + if !startTime.IsZero() { + and = append(and, bson.M{"create_time": bson.M{"$gte": startTime}}) + } + if !endTime.IsZero() { + and = append(and, bson.M{"create_time": bson.M{"$lte": endTime}}) + } + filter := bson.M{} + if len(and) > 0 { + filter["$and"] = and + } + return mgoutil.FindPage[*table.MeetingInfo](ctx, x.coll, filter, pagination, options.Find().SetSort(bson.M{"create_time": -1})) +} diff --git a/tools/up35/pkg/internal/rtc/mongo/mgo/meeting_invitation.go b/tools/up35/pkg/internal/rtc/mongo/mgo/meeting_invitation.go new file mode 100644 index 000000000..5286ffa95 --- /dev/null +++ b/tools/up35/pkg/internal/rtc/mongo/mgo/meeting_invitation.go @@ -0,0 +1,76 @@ +package mgo + +import ( + "context" + "github.com/OpenIMSDK/tools/mgoutil" + "github.com/OpenIMSDK/tools/pagination" + "github.com/OpenIMSDK/tools/utils" + "github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "time" +) + +func NewMeetingInvitation(db *mongo.Database) (table.MeetingInvitationInterface, error) { + coll := db.Collection("meeting_invitation") + _, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ + { + Keys: bson.D{ + {Key: "room_id", Value: 1}, + {Key: "user_id", Value: 1}, + }, + Options: options.Index().SetUnique(true), + }, + { + Keys: bson.D{ + {Key: "create_time", Value: -1}, + }, + }, + }) + if err != nil { + return nil, err + } + return &meetingInvitation{coll: coll}, nil +} + +type meetingInvitation struct { + coll *mongo.Collection +} + +func (x *meetingInvitation) FindUserIDs(ctx context.Context, roomID string) ([]string, error) { + return mgoutil.Find[string](ctx, x.coll, bson.M{"room_id": roomID}, options.Find().SetProjection(bson.M{"_id": 0, "user_id": 1})) +} + +func (x *meetingInvitation) CreateMeetingInvitationInfo(ctx context.Context, roomID string, inviteeUserIDs []string) error { + now := time.Now() + return mgoutil.InsertMany(ctx, x.coll, utils.Slice(inviteeUserIDs, func(userID string) *table.MeetingInvitationInfo { + return &table.MeetingInvitationInfo{ + RoomID: roomID, + UserID: userID, + CreateTime: now, + } + })) +} + +func (x *meetingInvitation) GetUserInvitedMeetingIDs(ctx context.Context, userID string) (meetingIDs []string, err error) { + fiveDaysAgo := time.Now().AddDate(0, 0, -5) + return mgoutil.Find[string](ctx, x.coll, bson.M{"user_id": userID, "create_time": bson.M{"$gte": fiveDaysAgo}}, options.Find().SetSort(bson.M{"create_time": -1}).SetProjection(bson.M{"_id": 0, "room_id": 1})) +} + +func (x *meetingInvitation) Delete(ctx context.Context, roomIDs []string) error { + return mgoutil.DeleteMany(ctx, x.coll, bson.M{"room_id": bson.M{"$in": roomIDs}}) +} + +func (x *meetingInvitation) GetMeetingRecords(ctx context.Context, joinedUserID string, startTime, endTime time.Time, pagination pagination.Pagination) (int64, []string, error) { + var and []bson.M + and = append(and, bson.M{"user_id": joinedUserID}) + if !startTime.IsZero() { + and = append(and, bson.M{"create_time": bson.M{"$gte": startTime}}) + } + if !endTime.IsZero() { + and = append(and, bson.M{"create_time": bson.M{"$lte": endTime}}) + } + opt := options.Find().SetSort(bson.M{"create_time": -1}).SetProjection(bson.M{"_id": 0, "room_id": 1}) + return mgoutil.FindPage[string](ctx, x.coll, bson.M{"$and": and}, pagination, opt) +} diff --git a/tools/up35/pkg/internal/rtc/mongo/mgo/meeting_record.go b/tools/up35/pkg/internal/rtc/mongo/mgo/meeting_record.go new file mode 100644 index 000000000..90fc9cdca --- /dev/null +++ b/tools/up35/pkg/internal/rtc/mongo/mgo/meeting_record.go @@ -0,0 +1,32 @@ +package mgo + +import ( + "context" + "github.com/OpenIMSDK/tools/mgoutil" + "github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" +) + +func NewMeetingRecord(db *mongo.Database) (table.MeetingRecordInterface, error) { + coll := db.Collection("meeting_record") + _, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ + { + Keys: bson.D{ + {Key: "room_id", Value: 1}, + }, + }, + }) + if err != nil { + return nil, err + } + return &meetingRecord{coll: coll}, nil +} + +type meetingRecord struct { + coll *mongo.Collection +} + +func (x *meetingRecord) CreateMeetingVideoRecord(ctx context.Context, meetingVideoRecord *table.MeetingVideoRecord) error { + return mgoutil.InsertMany(ctx, x.coll, []*table.MeetingVideoRecord{meetingVideoRecord}) +} diff --git a/tools/up35/pkg/internal/rtc/mongo/mgo/signal.go b/tools/up35/pkg/internal/rtc/mongo/mgo/signal.go new file mode 100644 index 000000000..0c1879007 --- /dev/null +++ b/tools/up35/pkg/internal/rtc/mongo/mgo/signal.go @@ -0,0 +1,89 @@ +package mgo + +import ( + "context" + "github.com/OpenIMSDK/tools/mgoutil" + "github.com/OpenIMSDK/tools/pagination" + "github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "time" +) + +func NewSignal(db *mongo.Database) (table.SignalInterface, error) { + coll := db.Collection("signal") + _, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ + { + Keys: bson.D{ + {Key: "sid", Value: 1}, + }, + Options: options.Index().SetUnique(true), + }, + { + Keys: bson.D{ + {Key: "inviter_user_id", Value: 1}, + }, + }, + { + Keys: bson.D{ + {Key: "initiate_time", Value: -1}, + }, + }, + }) + if err != nil { + return nil, err + } + return &signal{coll: coll}, nil +} + +type signal struct { + coll *mongo.Collection +} + +func (x *signal) Find(ctx context.Context, sids []string) ([]*table.SignalModel, error) { + return mgoutil.Find[*table.SignalModel](ctx, x.coll, bson.M{"sid": bson.M{"$in": sids}}) +} + +func (x *signal) CreateSignal(ctx context.Context, signalModel *table.SignalModel) error { + return mgoutil.InsertMany(ctx, x.coll, []*table.SignalModel{signalModel}) +} + +func (x *signal) Update(ctx context.Context, sid string, update map[string]any) error { + if len(update) == 0 { + return nil + } + return mgoutil.UpdateOne(ctx, x.coll, bson.M{"sid": sid}, bson.M{"$set": update}, false) +} + +func (x *signal) UpdateSignalFileURL(ctx context.Context, sID, fileURL string) error { + return x.Update(ctx, sID, map[string]any{"file_url": fileURL}) +} + +func (x *signal) UpdateSignalEndTime(ctx context.Context, sID string, endTime time.Time) error { + return x.Update(ctx, sID, map[string]any{"end_time": endTime}) +} + +func (x *signal) Delete(ctx context.Context, sids []string) error { + if len(sids) == 0 { + return nil + } + return mgoutil.DeleteMany(ctx, x.coll, bson.M{"sid": bson.M{"$in": sids}}) +} + +func (x *signal) PageSignal(ctx context.Context, sesstionType int32, sendID string, startTime, endTime time.Time, pagination pagination.Pagination) (int64, []*table.SignalModel, error) { + var and []bson.M + if !startTime.IsZero() { + and = append(and, bson.M{"initiate_time": bson.M{"$gte": startTime}}) + } + if !endTime.IsZero() { + and = append(and, bson.M{"initiate_time": bson.M{"$lte": endTime}}) + } + if sesstionType != 0 { + and = append(and, bson.M{"sesstion_type": sesstionType}) + } + if sendID != "" { + and = append(and, bson.M{"inviter_user_id": sendID}) + } + return mgoutil.FindPage[*table.SignalModel](ctx, x.coll, bson.M{"$and": and}, pagination, options.Find().SetSort(bson.M{"initiate_time": -1})) +} diff --git a/tools/up35/pkg/internal/rtc/mongo/mgo/signal_invitation.go b/tools/up35/pkg/internal/rtc/mongo/mgo/signal_invitation.go new file mode 100644 index 000000000..274f2f11f --- /dev/null +++ b/tools/up35/pkg/internal/rtc/mongo/mgo/signal_invitation.go @@ -0,0 +1,78 @@ +package mgo + +import ( + "context" + "github.com/OpenIMSDK/tools/mgoutil" + "github.com/OpenIMSDK/tools/pagination" + "github.com/OpenIMSDK/tools/utils" + "github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "time" +) + +func NewSignalInvitation(db *mongo.Database) (table.SignalInvitationInterface, error) { + coll := db.Collection("signal_invitation") + _, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ + { + Keys: bson.D{ + {Key: "sid", Value: 1}, + {Key: "user_id", Value: 1}, + }, + Options: options.Index().SetUnique(true), + }, + { + Keys: bson.D{ + {Key: "initiate_time", Value: -1}, + }, + }, + }) + if err != nil { + return nil, err + } + return &signalInvitation{coll: coll}, nil +} + +type signalInvitation struct { + coll *mongo.Collection +} + +func (x *signalInvitation) Find(ctx context.Context, sid string) ([]*table.SignalInvitationModel, error) { + return mgoutil.Find[*table.SignalInvitationModel](ctx, x.coll, bson.M{"sid": sid}) +} + +func (x *signalInvitation) CreateSignalInvitation(ctx context.Context, sid string, inviteeUserIDs []string) error { + now := time.Now() + return mgoutil.InsertMany(ctx, x.coll, utils.Slice(inviteeUserIDs, func(userID string) *table.SignalInvitationModel { + return &table.SignalInvitationModel{ + UserID: userID, + SID: sid, + InitiateTime: now, + HandleTime: time.Unix(0, 0), + } + })) +} + +func (x *signalInvitation) HandleSignalInvitation(ctx context.Context, sID, InviteeUserID string, status int32) error { + return mgoutil.UpdateOne(ctx, x.coll, bson.M{"sid": sID, "user_id": InviteeUserID}, bson.M{"$set": bson.M{"status": status, "handle_time": time.Now()}}, true) +} + +func (x *signalInvitation) PageSID(ctx context.Context, recvID string, startTime, endTime time.Time, pagination pagination.Pagination) (int64, []string, error) { + var and []bson.M + and = append(and, bson.M{"user_id": recvID}) + if !startTime.IsZero() { + and = append(and, bson.M{"initiate_time": bson.M{"$gte": startTime}}) + } + if !endTime.IsZero() { + and = append(and, bson.M{"initiate_time": bson.M{"$lte": endTime}}) + } + return mgoutil.FindPage[string](ctx, x.coll, bson.M{"$and": and}, pagination, options.Find().SetProjection(bson.M{"_id": 0, "sid": 1}).SetSort(bson.M{"initiate_time": -1})) +} + +func (x *signalInvitation) Delete(ctx context.Context, sids []string) error { + if len(sids) == 0 { + return nil + } + return mgoutil.DeleteMany(ctx, x.coll, bson.M{"sid": bson.M{"$in": sids}}) +} diff --git a/tools/up35/pkg/internal/rtc/mongo/table/meeting.go b/tools/up35/pkg/internal/rtc/mongo/table/meeting.go new file mode 100644 index 000000000..3b341cfb1 --- /dev/null +++ b/tools/up35/pkg/internal/rtc/mongo/table/meeting.go @@ -0,0 +1,51 @@ +package table + +import ( + "context" + "github.com/OpenIMSDK/tools/pagination" + "time" +) + +type MeetingInfo struct { + RoomID string `bson:"room_id"` + MeetingName string `bson:"meeting_name"` + HostUserID string `bson:"host_user_id"` + Status int64 `bson:"status"` + StartTime time.Time `bson:"start_time"` + EndTime time.Time `bson:"end_time"` + CreateTime time.Time `bson:"create_time"` + Ex string `bson:"ex"` +} + +type MeetingInterface interface { + Find(ctx context.Context, roomIDs []string) ([]*MeetingInfo, error) + CreateMeetingInfo(ctx context.Context, meetingInfo *MeetingInfo) error + UpdateMeetingInfo(ctx context.Context, roomID string, update map[string]any) error + GetUnCompleteMeetingIDList(ctx context.Context, roomIDs []string) ([]string, error) + Delete(ctx context.Context, roomIDs []string) error + GetMeetingRecords(ctx context.Context, hostUserID string, startTime, endTime time.Time, pagination pagination.Pagination) (int64, []*MeetingInfo, error) +} + +type MeetingInvitationInfo struct { + RoomID string `bson:"room_id"` + UserID string `bson:"user_id"` + CreateTime time.Time `bson:"create_time"` +} + +type MeetingInvitationInterface interface { + FindUserIDs(ctx context.Context, roomID string) ([]string, error) + CreateMeetingInvitationInfo(ctx context.Context, roomID string, inviteeUserIDs []string) error + GetUserInvitedMeetingIDs(ctx context.Context, userID string) (meetingIDs []string, err error) + Delete(ctx context.Context, roomIDs []string) error + GetMeetingRecords(ctx context.Context, joinedUserID string, startTime, endTime time.Time, pagination pagination.Pagination) (int64, []string, error) +} + +type MeetingVideoRecord struct { + RoomID string `bson:"room_id"` + FileURL string `bson:"file_url"` + CreateTime time.Time `bson:"create_time"` +} + +type MeetingRecordInterface interface { + CreateMeetingVideoRecord(ctx context.Context, meetingVideoRecord *MeetingVideoRecord) error +} diff --git a/tools/up35/pkg/internal/rtc/mongo/table/signal.go b/tools/up35/pkg/internal/rtc/mongo/table/signal.go new file mode 100644 index 000000000..0cec050ff --- /dev/null +++ b/tools/up35/pkg/internal/rtc/mongo/table/signal.go @@ -0,0 +1,73 @@ +package table + +import ( + "context" + "github.com/OpenIMSDK/tools/errs" + "github.com/OpenIMSDK/tools/pagination" + "github.com/redis/go-redis/v9" + "go.mongodb.org/mongo-driver/mongo" + "time" +) + +type SignalModel struct { + SID string `bson:"sid"` + InviterUserID string `bson:"inviter_user_id"` + CustomData string `bson:"custom_data"` + GroupID string `bson:"group_id"` + RoomID string `bson:"room_id"` + Timeout int32 `bson:"timeout"` + MediaType string `bson:"media_type"` + PlatformID int32 `bson:"platform_id"` + SessionType int32 `bson:"session_type"` + InitiateTime time.Time `bson:"initiate_time"` + EndTime time.Time `bson:"end_time"` + FileURL string `bson:"file_url"` + + Title string `bson:"title"` + Desc string `bson:"desc"` + Ex string `bson:"ex"` + IOSPushSound string `bson:"ios_push_sound"` + IOSBadgeCount bool `bson:"ios_badge_count"` + SignalInfo string `bson:"signal_info"` +} + +type SignalInterface interface { + Find(ctx context.Context, sids []string) ([]*SignalModel, error) + CreateSignal(ctx context.Context, signalModel *SignalModel) error + Update(ctx context.Context, sid string, update map[string]any) error + UpdateSignalFileURL(ctx context.Context, sID, fileURL string) error + UpdateSignalEndTime(ctx context.Context, sID string, endTime time.Time) error + Delete(ctx context.Context, sids []string) error + PageSignal(ctx context.Context, sesstionType int32, sendID string, startTime, endTime time.Time, pagination pagination.Pagination) (int64, []*SignalModel, error) +} + +type SignalInvitationModel struct { + SID string `bson:"sid"` + UserID string `bson:"user_id"` + Status int32 `bson:"status"` + InitiateTime time.Time `bson:"initiate_time"` + HandleTime time.Time `bson:"handle_time"` +} + +type SignalInvitationInterface interface { + Find(ctx context.Context, sid string) ([]*SignalInvitationModel, error) + CreateSignalInvitation(ctx context.Context, sid string, inviteeUserIDs []string) error + HandleSignalInvitation(ctx context.Context, sID, InviteeUserID string, status int32) error + PageSID(ctx context.Context, recvID string, startTime, endTime time.Time, pagination pagination.Pagination) (int64, []string, error) + Delete(ctx context.Context, sids []string) error +} + +func IsNotFound(err error) bool { + if err == nil { + return false + } + err = errs.Unwrap(err) + return err == mongo.ErrNoDocuments || err == redis.Nil +} + +func IsDuplicate(err error) bool { + if err == nil { + return false + } + return mongo.IsDuplicateKeyError(errs.Unwrap(err)) +} diff --git a/tools/up35/pkg/internal/rtc/mysql/meeting.go b/tools/up35/pkg/internal/rtc/mysql/meeting.go new file mode 100644 index 000000000..2c5bbed32 --- /dev/null +++ b/tools/up35/pkg/internal/rtc/mysql/meeting.go @@ -0,0 +1,40 @@ +package relation + +import ( + "time" +) + +type MeetingInfo struct { + RoomID string `gorm:"column:room_id;primary_key;size:128;index:room_id;index:status,priority:1"` + MeetingName string `gorm:"column:meeting_name;size:64"` + HostUserID string `gorm:"column:host_user_id;size:64;index:host_user_id"` + Status int64 `gorm:"column:status;index:status,priority:2"` + StartTime int64 `gorm:"column:start_time"` + EndTime int64 `gorm:"column:end_time"` + CreateTime time.Time `gorm:"column:create_time"` + Ex string `gorm:"column:ex;size:1024"` +} + +func (MeetingInfo) TableName() string { + return "meeting" +} + +type MeetingInvitationInfo struct { + RoomID string `gorm:"column:room_id;primary_key;size:128"` + UserID string `gorm:"column:user_id;primary_key;size:64;index:user_id"` + CreateTime time.Time `gorm:"column:create_time"` +} + +func (MeetingInvitationInfo) TableName() string { + return "meeting_invitation" +} + +type MeetingVideoRecord struct { + RoomID string `gorm:"column:room_id;size:128"` + FileURL string `gorm:"column:file_url"` + CreateTime time.Time `gorm:"column:create_time"` +} + +func (MeetingVideoRecord) TableName() string { + return "meeting_video_record" +} diff --git a/tools/up35/pkg/internal/rtc/mysql/signal.go b/tools/up35/pkg/internal/rtc/mysql/signal.go new file mode 100644 index 000000000..3a4d607d4 --- /dev/null +++ b/tools/up35/pkg/internal/rtc/mysql/signal.go @@ -0,0 +1,43 @@ +package relation + +import ( + "time" +) + +type SignalModel struct { + SID string `gorm:"column:sid;type:char(128);primary_key"` + InviterUserID string `gorm:"column:inviter_user_id;type:char(64);index:inviter_user_id_index"` + CustomData string `gorm:"column:custom_data;type:text"` + GroupID string `gorm:"column:group_id;type:char(64)"` + RoomID string `gorm:"column:room_id;primary_key;type:char(128)"` + Timeout int32 `gorm:"column:timeout"` + MediaType string `gorm:"column:media_type;type:char(64)"` + PlatformID int32 `gorm:"column:platform_id"` + SessionType int32 `gorm:"column:sesstion_type"` + InitiateTime time.Time `gorm:"column:initiate_time"` + EndTime time.Time `gorm:"column:end_time"` + FileURL string `gorm:"column:file_url" json:"-"` + + Title string `gorm:"column:title;size:128"` + Desc string `gorm:"column:desc;size:1024"` + Ex string `gorm:"column:ex;size:1024"` + IOSPushSound string `gorm:"column:ios_push_sound"` + IOSBadgeCount bool `gorm:"column:ios_badge_count"` + SignalInfo string `gorm:"column:signal_info;size:1024"` +} + +func (SignalModel) TableName() string { + return "signal" +} + +type SignalInvitationModel struct { + UserID string `gorm:"column:user_id;primary_key"` + SID string `gorm:"column:sid;type:char(128);primary_key"` + Status int32 `gorm:"column:status"` + InitiateTime time.Time `gorm:"column:initiate_time;primary_key"` + HandleTime time.Time `gorm:"column:handle_time"` +} + +func (SignalInvitationModel) TableName() string { + return "signal_invitation" +} diff --git a/tools/up35/pkg/pkg.go b/tools/up35/pkg/pkg.go new file mode 100644 index 000000000..d834b8492 --- /dev/null +++ b/tools/up35/pkg/pkg.go @@ -0,0 +1,206 @@ +package pkg + +import ( + "context" + "errors" + "fmt" + "gopkg.in/yaml.v3" + "log" + "os" + "reflect" + "strconv" + + "github.com/go-sql-driver/mysql" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + gormMysql "gorm.io/driver/mysql" + "gorm.io/gorm" + "gorm.io/gorm/logger" + + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" + rtcMgo "github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/mgo" +) + +const ( + versionTable = "dataver" + versionKey = "data_version" + versionValue = 35 +) + +func InitConfig(path string) error { + data, err := os.ReadFile(path) + if err != nil { + return err + } + return yaml.Unmarshal(data, &config.Config) +} + +func GetMysql() (*gorm.DB, error) { + conf := config.Config.Mysql + mysqlDSN := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", conf.Username, conf.Password, conf.Address[0], conf.Database) + return gorm.Open(gormMysql.Open(mysqlDSN), &gorm.Config{Logger: logger.Discard}) +} + +func GetMongo() (*mongo.Database, error) { + mgo, err := unrelation.NewMongo() + if err != nil { + return nil, err + } + return mgo.GetDatabase(), nil +} + +func Main(path string) error { + if err := InitConfig(path); err != nil { + return err + } + if config.Config.Mysql == nil { + return nil + } + mongoDB, err := GetMongo() + if err != nil { + return err + } + var version struct { + Key string `bson:"key"` + Value string `bson:"value"` + } + switch mongoDB.Collection(versionTable).FindOne(context.Background(), bson.M{"key": versionKey}).Decode(&version) { + case nil: + if ver, _ := strconv.Atoi(version.Value); ver >= versionValue { + return nil + } + case mongo.ErrNoDocuments: + default: + return err + } + mysqlDB, err := GetMysql() + if err != nil { + if mysqlErr, ok := err.(*mysql.MySQLError); ok && mysqlErr.Number == 1049 { + if err := SetMongoDataVersion(mongoDB, version.Value); err != nil { + return err + } + return nil // database not exist + } + return err + } + + var c convert + var tasks []func() error + tasks = append(tasks, + func() error { return NewTask(mysqlDB, mongoDB, mgo.NewUserMongo, c.User) }, + func() error { return NewTask(mysqlDB, mongoDB, mgo.NewFriendMongo, c.Friend) }, + func() error { return NewTask(mysqlDB, mongoDB, mgo.NewFriendRequestMongo, c.FriendRequest) }, + func() error { return NewTask(mysqlDB, mongoDB, mgo.NewBlackMongo, c.Black) }, + func() error { return NewTask(mysqlDB, mongoDB, mgo.NewGroupMongo, c.Group) }, + func() error { return NewTask(mysqlDB, mongoDB, mgo.NewGroupMember, c.GroupMember) }, + func() error { return NewTask(mysqlDB, mongoDB, mgo.NewGroupRequestMgo, c.GroupRequest) }, + func() error { return NewTask(mysqlDB, mongoDB, mgo.NewConversationMongo, c.Conversation) }, + func() error { return NewTask(mysqlDB, mongoDB, mgo.NewS3Mongo, c.Object(config.Config.Object.Enable)) }, + func() error { return NewTask(mysqlDB, mongoDB, mgo.NewLogMongo, c.Log) }, + + func() error { return NewTask(mysqlDB, mongoDB, rtcMgo.NewSignal, c.SignalModel) }, + func() error { return NewTask(mysqlDB, mongoDB, rtcMgo.NewSignalInvitation, c.SignalInvitationModel) }, + func() error { return NewTask(mysqlDB, mongoDB, rtcMgo.NewMeeting, c.Meeting) }, + func() error { return NewTask(mysqlDB, mongoDB, rtcMgo.NewMeetingInvitation, c.MeetingInvitationInfo) }, + func() error { return NewTask(mysqlDB, mongoDB, rtcMgo.NewMeetingRecord, c.MeetingVideoRecord) }, + ) + + for _, task := range tasks { + if err := task(); err != nil { + return err + } + } + + if err := SetMongoDataVersion(mongoDB, version.Value); err != nil { + return err + } + return nil +} + +func SetMongoDataVersion(db *mongo.Database, curver string) error { + filter := bson.M{"key": versionKey, "value": curver} + update := bson.M{"$set": bson.M{"key": versionKey, "value": strconv.Itoa(versionValue)}} + _, err := db.Collection(versionTable).UpdateOne(context.Background(), filter, update, options.Update().SetUpsert(true)) + return err +} + +// NewTask A mysql table B mongodb model C mongodb table +func NewTask[A interface{ TableName() string }, B any, C any](gormDB *gorm.DB, mongoDB *mongo.Database, mongoDBInit func(db *mongo.Database) (B, error), convert func(v A) C) error { + obj, err := mongoDBInit(mongoDB) + if err != nil { + return err + } + var zero A + tableName := zero.TableName() + coll, err := getColl(obj) + if err != nil { + return fmt.Errorf("get mongo collection %s failed, err: %w", tableName, err) + } + var count int + defer func() { + log.Printf("completed convert %s total %d\n", tableName, count) + }() + const batch = 100 + for page := 0; ; page++ { + res := make([]A, 0, batch) + if err := gormDB.Limit(batch).Offset(page * batch).Find(&res).Error; err != nil { + if mysqlErr, ok := err.(*mysql.MySQLError); ok && mysqlErr.Number == 1146 { + return nil // table not exist + } + return fmt.Errorf("find mysql table %s failed, err: %w", tableName, err) + } + if len(res) == 0 { + return nil + } + temp := make([]any, len(res)) + for i := range res { + temp[i] = convert(res[i]) + } + if err := insertMany(coll, temp); err != nil { + return fmt.Errorf("insert mongo table %s failed, err: %w", tableName, err) + } + count += len(res) + if len(res) < batch { + return nil + } + log.Printf("current convert %s completed %d\n", tableName, count) + } +} + +func insertMany(coll *mongo.Collection, objs []any) error { + if _, err := coll.InsertMany(context.Background(), objs); err != nil { + if !mongo.IsDuplicateKeyError(err) { + return err + } + } + for i := range objs { + _, err := coll.InsertOne(context.Background(), objs[i]) + switch { + case err == nil: + case mongo.IsDuplicateKeyError(err): + default: + return err + } + } + return nil +} + +func getColl(obj any) (_ *mongo.Collection, err error) { + defer func() { + if e := recover(); e != nil { + err = fmt.Errorf("not found %+v", e) + } + }() + stu := reflect.ValueOf(obj).Elem() + typ := reflect.TypeOf(&mongo.Collection{}).String() + for i := 0; i < stu.NumField(); i++ { + field := stu.Field(i) + if field.Type().String() == typ { + return (*mongo.Collection)(field.UnsafePointer()), nil + } + } + return nil, errors.New("not found") +} diff --git a/tools/up35/up35.go b/tools/up35/up35.go index 5d4740fca..0ce56ee13 100644 --- a/tools/up35/up35.go +++ b/tools/up35/up35.go @@ -1,367 +1,19 @@ package main import ( - "context" - "errors" "flag" - "fmt" - "github.com/go-sql-driver/mysql" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo" - mongoModel "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" - mysqlModel "github.com/openimsdk/open-im-server/v3/tools/data-conversion/openim/mysql/v3" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" - "gopkg.in/yaml.v3" - gormMysql "gorm.io/driver/mysql" - "gorm.io/gorm" - "gorm.io/gorm/logger" + "github.com/openimsdk/open-im-server/v3/tools/up35/pkg" "log" "os" - "reflect" - "strconv" -) - -const ( - versionTable = "dataver" - versionKey = "data_version" - versionValue = 35 ) func main() { var path string flag.StringVar(&path, "c", "", "path config file") flag.Parse() - if err := Main(path); err != nil { + if err := pkg.Main(path); err != nil { log.Fatal(err) return } os.Exit(0) } - -func InitConfig(path string) error { - data, err := os.ReadFile(path) - if err != nil { - return err - } - return yaml.Unmarshal(data, &config.Config) -} - -func GetMysql() (*gorm.DB, error) { - conf := config.Config.Mysql - mysqlDSN := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", conf.Username, conf.Password, conf.Address[0], conf.Database) - return gorm.Open(gormMysql.Open(mysqlDSN), &gorm.Config{Logger: logger.Discard}) -} - -func GetMongo() (*mongo.Database, error) { - mgo, err := unrelation.NewMongo() - if err != nil { - return nil, err - } - return mgo.GetDatabase(), nil -} - -func Main(path string) error { - if err := InitConfig(path); err != nil { - return err - } - if config.Config.Mysql == nil { - return nil - } - mongoDB, err := GetMongo() - if err != nil { - return err - } - var version struct { - Key string `bson:"key"` - Value string `bson:"value"` - } - switch mongoDB.Collection(versionTable).FindOne(context.Background(), bson.M{"key": versionKey}).Decode(&version) { - case nil: - if ver, _ := strconv.Atoi(version.Value); ver >= versionValue { - return nil - } - case mongo.ErrNoDocuments: - default: - return err - } - mysqlDB, err := GetMysql() - if err != nil { - if mysqlErr, ok := err.(*mysql.MySQLError); ok && mysqlErr.Number == 1049 { - if err := SetMongoDataVersion(mongoDB, version.Value); err != nil { - return err - } - return nil // database not exist - } - return err - } - - var c convert - var tasks []func() error - tasks = append(tasks, - func() error { return NewTask(mysqlDB, mongoDB, mgo.NewUserMongo, c.User) }, - func() error { return NewTask(mysqlDB, mongoDB, mgo.NewFriendMongo, c.Friend) }, - func() error { return NewTask(mysqlDB, mongoDB, mgo.NewFriendRequestMongo, c.FriendRequest) }, - func() error { return NewTask(mysqlDB, mongoDB, mgo.NewBlackMongo, c.Black) }, - func() error { return NewTask(mysqlDB, mongoDB, mgo.NewGroupMongo, c.Group) }, - func() error { return NewTask(mysqlDB, mongoDB, mgo.NewGroupMember, c.GroupMember) }, - func() error { return NewTask(mysqlDB, mongoDB, mgo.NewGroupRequestMgo, c.GroupRequest) }, - func() error { return NewTask(mysqlDB, mongoDB, mgo.NewConversationMongo, c.Conversation) }, - func() error { return NewTask(mysqlDB, mongoDB, mgo.NewS3Mongo, c.Object(config.Config.Object.Enable)) }, - func() error { return NewTask(mysqlDB, mongoDB, mgo.NewLogMongo, c.Log) }, - ) - - for _, task := range tasks { - if err := task(); err != nil { - return err - } - } - - if err := SetMongoDataVersion(mongoDB, version.Value); err != nil { - return err - } - return nil -} - -func SetMongoDataVersion(db *mongo.Database, curver string) error { - filter := bson.M{"key": versionKey, "value": curver} - update := bson.M{"$set": bson.M{"key": versionKey, "value": strconv.Itoa(versionValue)}} - _, err := db.Collection(versionTable).UpdateOne(context.Background(), filter, update, options.Update().SetUpsert(true)) - return err -} - -// NewTask A mysql table B mongodb model C mongodb table -func NewTask[A interface{ TableName() string }, B any, C any](gormDB *gorm.DB, mongoDB *mongo.Database, mongoDBInit func(db *mongo.Database) (B, error), convert func(v A) C) error { - obj, err := mongoDBInit(mongoDB) - if err != nil { - return err - } - var zero A - tableName := zero.TableName() - coll, err := getColl(obj) - if err != nil { - return fmt.Errorf("get mongo collection %s failed, err: %w", tableName, err) - } - var count int - defer func() { - log.Printf("completed convert %s total %d\n", tableName, count) - }() - const batch = 100 - for page := 0; ; page++ { - res := make([]A, 0, batch) - if err := gormDB.Limit(batch).Offset(page * batch).Find(&res).Error; err != nil { - if mysqlErr, ok := err.(*mysql.MySQLError); ok && mysqlErr.Number == 1146 { - return nil // table not exist - } - return fmt.Errorf("find mysql table %s failed, err: %w", tableName, err) - } - if len(res) == 0 { - return nil - } - temp := make([]any, len(res)) - for i := range res { - temp[i] = convert(res[i]) - } - if err := insertMany(coll, temp); err != nil { - return fmt.Errorf("insert mongo table %s failed, err: %w", tableName, err) - } - count += len(res) - if len(res) < batch { - return nil - } - log.Printf("current convert %s completed %d\n", tableName, count) - } -} - -func insertMany(coll *mongo.Collection, objs []any) error { - if _, err := coll.InsertMany(context.Background(), objs); err != nil { - if !mongo.IsDuplicateKeyError(err) { - return err - } - } - for i := range objs { - _, err := coll.InsertOne(context.Background(), objs[i]) - switch { - case err == nil: - case mongo.IsDuplicateKeyError(err): - default: - return err - } - } - return nil -} - -func getColl(obj any) (_ *mongo.Collection, err error) { - defer func() { - if e := recover(); e != nil { - err = fmt.Errorf("not found %+v", e) - } - }() - stu := reflect.ValueOf(obj).Elem() - typ := reflect.TypeOf(&mongo.Collection{}).String() - for i := 0; i < stu.NumField(); i++ { - field := stu.Field(i) - if field.Type().String() == typ { - return (*mongo.Collection)(field.UnsafePointer()), nil - } - } - return nil, errors.New("not found") -} - -type convert struct{} - -func (convert) User(v mysqlModel.UserModel) mongoModel.UserModel { - return mongoModel.UserModel{ - UserID: v.UserID, - Nickname: v.Nickname, - FaceURL: v.FaceURL, - Ex: v.Ex, - AppMangerLevel: v.AppMangerLevel, - GlobalRecvMsgOpt: v.GlobalRecvMsgOpt, - CreateTime: v.CreateTime, - } -} - -func (convert) Friend(v mysqlModel.FriendModel) mongoModel.FriendModel { - return mongoModel.FriendModel{ - OwnerUserID: v.OwnerUserID, - FriendUserID: v.FriendUserID, - Remark: v.Remark, - CreateTime: v.CreateTime, - AddSource: v.AddSource, - OperatorUserID: v.OperatorUserID, - Ex: v.Ex, - } -} - -func (convert) FriendRequest(v mysqlModel.FriendRequestModel) mongoModel.FriendRequestModel { - return mongoModel.FriendRequestModel{ - FromUserID: v.FromUserID, - ToUserID: v.ToUserID, - HandleResult: v.HandleResult, - ReqMsg: v.ReqMsg, - CreateTime: v.CreateTime, - HandlerUserID: v.HandlerUserID, - HandleMsg: v.HandleMsg, - HandleTime: v.HandleTime, - Ex: v.Ex, - } -} - -func (convert) Black(v mysqlModel.BlackModel) mongoModel.BlackModel { - return mongoModel.BlackModel{ - OwnerUserID: v.OwnerUserID, - BlockUserID: v.BlockUserID, - CreateTime: v.CreateTime, - AddSource: v.AddSource, - OperatorUserID: v.OperatorUserID, - Ex: v.Ex, - } -} - -func (convert) Group(v mysqlModel.GroupModel) mongoModel.GroupModel { - return mongoModel.GroupModel{ - GroupID: v.GroupID, - GroupName: v.GroupName, - Notification: v.Notification, - Introduction: v.Introduction, - FaceURL: v.FaceURL, - CreateTime: v.CreateTime, - Ex: v.Ex, - Status: v.Status, - CreatorUserID: v.CreatorUserID, - GroupType: v.GroupType, - NeedVerification: v.NeedVerification, - LookMemberInfo: v.LookMemberInfo, - ApplyMemberFriend: v.ApplyMemberFriend, - NotificationUpdateTime: v.NotificationUpdateTime, - NotificationUserID: v.NotificationUserID, - } -} - -func (convert) GroupMember(v mysqlModel.GroupMemberModel) mongoModel.GroupMemberModel { - return mongoModel.GroupMemberModel{ - GroupID: v.GroupID, - UserID: v.UserID, - Nickname: v.Nickname, - FaceURL: v.FaceURL, - RoleLevel: v.RoleLevel, - JoinTime: v.JoinTime, - JoinSource: v.JoinSource, - InviterUserID: v.InviterUserID, - OperatorUserID: v.OperatorUserID, - MuteEndTime: v.MuteEndTime, - Ex: v.Ex, - } -} - -func (convert) GroupRequest(v mysqlModel.GroupRequestModel) mongoModel.GroupRequestModel { - return mongoModel.GroupRequestModel{ - UserID: v.UserID, - GroupID: v.GroupID, - HandleResult: v.HandleResult, - ReqMsg: v.ReqMsg, - HandledMsg: v.HandledMsg, - ReqTime: v.ReqTime, - HandleUserID: v.HandleUserID, - HandledTime: v.HandledTime, - JoinSource: v.JoinSource, - InviterUserID: v.InviterUserID, - Ex: v.Ex, - } -} - -func (convert) Conversation(v mysqlModel.ConversationModel) mongoModel.ConversationModel { - return mongoModel.ConversationModel{ - OwnerUserID: v.OwnerUserID, - ConversationID: v.ConversationID, - ConversationType: v.ConversationType, - UserID: v.UserID, - GroupID: v.GroupID, - RecvMsgOpt: v.RecvMsgOpt, - IsPinned: v.IsPinned, - IsPrivateChat: v.IsPrivateChat, - BurnDuration: v.BurnDuration, - GroupAtType: v.GroupAtType, - AttachedInfo: v.AttachedInfo, - Ex: v.Ex, - MaxSeq: v.MaxSeq, - MinSeq: v.MinSeq, - CreateTime: v.CreateTime, - IsMsgDestruct: v.IsMsgDestruct, - MsgDestructTime: v.MsgDestructTime, - LatestMsgDestructTime: v.LatestMsgDestructTime, - } -} - -func (convert) Object(engine string) func(v mysqlModel.ObjectModel) mongoModel.ObjectModel { - return func(v mysqlModel.ObjectModel) mongoModel.ObjectModel { - return mongoModel.ObjectModel{ - Name: v.Name, - UserID: v.UserID, - Hash: v.Hash, - Engine: engine, - Key: v.Key, - Size: v.Size, - ContentType: v.ContentType, - Group: v.Cause, - CreateTime: v.CreateTime, - } - } -} - -func (convert) Log(v mysqlModel.Log) mongoModel.LogModel { - return mongoModel.LogModel{ - LogID: v.LogID, - Platform: v.Platform, - UserID: v.UserID, - CreateTime: v.CreateTime, - Url: v.Url, - FileName: v.FileName, - SystemType: v.SystemType, - Version: v.Version, - Ex: v.Ex, - } -}