From 32b773811281d29b19e1fa043f9650a9b9a77e1c Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Thu, 21 Mar 2024 17:55:05 +0800 Subject: [PATCH] feat: msg format --- go.mod | 2 +- go.sum | 4 +- internal/msgtransfer/init.go | 10 +- internal/rpc/msg/server.go | 7 +- internal/rpc/user/user.go | 2 +- internal/tools/msg.go | 2 +- pkg/common/db/controller/msg.go | 6 +- pkg/common/db/controller/msg_test.go | 250 ------ pkg/common/db/controller/user.go | 6 +- pkg/common/db/{unrelation => mgo}/msg.go | 774 ++++++------------ .../{unrelation/user.go => mgo/subscribe.go} | 12 +- .../db/table/{unrelation => relation}/msg.go | 25 +- .../user.go => relation/subscribe.go} | 12 +- pkg/common/db/table/unrelation/common.go | 20 - pkg/common/db/table/unrelation/doc.go | 15 - pkg/common/db/table/unrelation/super_group.go | 53 -- pkg/common/db/unrelation/doc.go | 15 - pkg/common/db/unrelation/mongo.go | 43 - pkg/common/db/unrelation/msg_convert.go | 81 -- pkg/common/db/unrelation/super_group.go | 163 ---- 20 files changed, 296 insertions(+), 1206 deletions(-) delete mode 100644 pkg/common/db/controller/msg_test.go rename pkg/common/db/{unrelation => mgo}/msg.go (58%) rename pkg/common/db/{unrelation/user.go => mgo/subscribe.go} (94%) rename pkg/common/db/table/{unrelation => relation}/msg.go (90%) rename pkg/common/db/table/{unrelation/user.go => relation/subscribe.go} (86%) delete mode 100644 pkg/common/db/table/unrelation/common.go delete mode 100644 pkg/common/db/table/unrelation/doc.go delete mode 100644 pkg/common/db/table/unrelation/super_group.go delete mode 100644 pkg/common/db/unrelation/doc.go delete mode 100644 pkg/common/db/unrelation/msg_convert.go delete mode 100644 pkg/common/db/unrelation/super_group.go diff --git a/go.mod b/go.mod index edd3c9ac7..1019a8a07 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/localcache v0.0.1 github.com/openimsdk/protocol v0.0.58-google - github.com/openimsdk/tools v0.0.46-alpha.11 + github.com/openimsdk/tools v0.0.46-alpha.14 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/sirupsen/logrus v1.9.3 // indirect diff --git a/go.sum b/go.sum index 83fe7e458..3985aad83 100644 --- a/go.sum +++ b/go.sum @@ -270,8 +270,8 @@ github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= github.com/openimsdk/protocol v0.0.58-google h1:cGNUVaXO9LqcFgIb4NvrtEOrv0spGecoQKyN8YWhyZs= github.com/openimsdk/protocol v0.0.58-google/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= -github.com/openimsdk/tools v0.0.46-alpha.11 h1:2+Rd+rhjjwvdKe3aJ7voRhBG+KJhh6nCNwfjrZotpbE= -github.com/openimsdk/tools v0.0.46-alpha.11/go.mod h1:NAHPJyNUJm0n0WaZfIRC5s6Np+timv+xKIn5I8SKYaM= +github.com/openimsdk/tools v0.0.46-alpha.14 h1:9znmFmI9sQn5mNLUXhtf84lVL/KpnQNoxBi6yiM1vZM= +github.com/openimsdk/tools v0.0.46-alpha.14/go.mod h1:NAHPJyNUJm0n0WaZfIRC5s6Np+timv+xKIn5I8SKYaM= github.com/pelletier/go-buffruneio v0.2.0/go.mod h1:JkE26KsDizTr40EUHkXVtNPvgGtbSNq5BcowyYOWdKo= github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ= github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4= diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index d2157e649..3acd9739e 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo" "github.com/openimsdk/tools/log" "net/http" "os" @@ -65,10 +66,6 @@ func Start(ctx context.Context, config *config.GlobalConfig, prometheusPort, ind if err != nil { return err } - - if err = mongo.CreateMsgIndex(); err != nil { - return err - } client, err := kdisc.NewDiscoveryRegister(config) if err != nil { return err @@ -80,7 +77,10 @@ func Start(ctx context.Context, config *config.GlobalConfig, prometheusPort, ind client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) msgModel := cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis) - msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase(config.Mongo.Database)) + msgDocModel, err := mgo.NewMsgMongo(mongo.GetDatabase(config.Mongo.Database)) + if err != nil { + return err + } msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, &config.Kafka) if err != nil { return err diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 5eb6fea3d..7710c72ea 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -19,6 +19,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo" "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" "github.com/openimsdk/open-im-server/v3/pkg/rpccache" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" @@ -72,11 +73,11 @@ func Start(ctx context.Context, config *config.GlobalConfig, client discoveryreg if err != nil { return err } - if err := mongo.CreateMsgIndex(); err != nil { + cacheModel := cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis) + msgDocModel, err := mgo.NewMsgMongo(mongo.GetDatabase(config.Mongo.Database)) + if err != nil { return err } - cacheModel := cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis) - msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase(config.Mongo.Database)) conversationClient := rpcclient.NewConversationRpcClient(client, config.RpcRegisterName.OpenImConversationName) userRpcClient := rpcclient.NewUserRpcClient(client, config.RpcRegisterName.OpenImUserName, &config.Manager, &config.IMAdmin) groupRpcClient := rpcclient.NewGroupRpcClient(client, config.RpcRegisterName.OpenImGroupName) diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 26fe855c7..d1939f35e 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -80,7 +80,7 @@ func Start(ctx context.Context, config *config.GlobalConfig, client registry.Svc return err } cache := cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt()) - userMongoDB := unrelation.NewUserMongoDriver(mongo.GetDatabase(config.Mongo.Database)) + userMongoDB := mgo.NewUserMongoDriver(mongo.GetDatabase(config.Mongo.Database)) database := controller.NewUserDatabase(userDB, cache, tx.NewMongo(mongo.GetClient()), userMongoDB) friendRpcClient := rpcclient.NewFriendRpcClient(client, config.RpcRegisterName.OpenImFriendName) groupRpcClient := rpcclient.NewGroupRpcClient(client, config.RpcRegisterName.OpenImGroupName) diff --git a/internal/tools/msg.go b/internal/tools/msg.go index c619c4d73..2b82f5e3e 100644 --- a/internal/tools/msg.go +++ b/internal/tools/msg.go @@ -84,7 +84,7 @@ func InitMsgTool(ctx context.Context, config *config.GlobalConfig) (*MsgTool, er if err != nil { return nil, err } - userMongoDB := unrelation.NewUserMongoDriver(mongo.GetDatabase(config.Mongo.Database)) + userMongoDB := mgo.NewUserMongoDriver(mongo.GetDatabase(config.Mongo.Database)) ctxTx := tx.NewMongo(mongo.GetClient()) userDatabase := controller.NewUserDatabase( userDB, diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 6c61c2940..1c8bafc75 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "errors" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo" "time" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -146,7 +147,10 @@ func NewCommonMsgDatabase(msgDocModel unrelationtb.MsgDocModelInterface, cacheMo func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database, config *config.GlobalConfig) (CommonMsgDatabase, error) { cacheModel := cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis) - msgDocModel := unrelation.NewMsgMongoDriver(database) + msgDocModel, err := mgo.NewMsgMongo(database) + if err != nil { + return nil, err + } return NewCommonMsgDatabase(msgDocModel, cacheModel, &config.Kafka) } diff --git a/pkg/common/db/controller/msg_test.go b/pkg/common/db/controller/msg_test.go deleted file mode 100644 index 858324145..000000000 --- a/pkg/common/db/controller/msg_test.go +++ /dev/null @@ -1,250 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package controller - -import ( - "context" - "fmt" - "math/rand" - "strconv" - "sync" - "testing" - "time" - - "github.com/openimsdk/tools/log" - - "go.mongodb.org/mongo-driver/bson" - - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" -) - -func Test_BatchInsertChat2DB(t *testing.T) { - conf := config.NewGlobalConfig() - conf.Mongo.Address = []string{"192.168.44.128:37017"} - // conf.Mongo.Timeout = 60 - conf.Mongo.Database = "openIM" - // conf.Mongo.Source = "admin" - conf.Mongo.Username = "root" - conf.Mongo.Password = "openIM123" - conf.Mongo.MaxPoolSize = 100 - conf.RetainChatRecords = 3650 - conf.ChatRecordsClearTime = "0 2 * * 3" - - mongo, err := unrelation.NewMongoDB(context.Background(), &conf.Mongo) - if err != nil { - t.Fatal(err) - } - err = mongo.GetDatabase(conf.Mongo.Database).Client().Ping(context.Background(), nil) - if err != nil { - panic(err) - } - - db := &commonMsgDatabase{ - msgDocDatabase: unrelation.NewMsgMongoDriver(mongo.GetDatabase(conf.Mongo.Database)), - } - - //ctx := context.Background() - //msgs := make([]*sdkws.MsgData, 0, 1) - //for i := 0; i < cap(msgs); i++ { - // msgs = append(msgs, &sdkws.MsgData{ - // Content: []byte(fmt.Sprintf("test-%d", i)), - // SendTime: time.Now().UnixMilli(), - // }) - //} - //err = db.BatchInsertChat2DB(ctx, "test", msgs, 0) - //if err != nil { - // panic(err) - //} - - _ = db.BatchInsertChat2DB - c := mongo.GetDatabase(conf.Mongo.Database).Collection("msg") - - ch := make(chan int) - rand.Seed(time.Now().UnixNano()) - - index := 10 - - var wg sync.WaitGroup - for i := 0; i < 1000; i++ { - wg.Add(1) - go func(channelID int) { - defer wg.Done() - <-ch - var arr []string - for i := 0; i < 500; i++ { - arr = append(arr, strconv.Itoa(i+1)) - } - rand.Shuffle(len(arr), func(i, j int) { - arr[i], arr[j] = arr[j], arr[i] - }) - for j, s := range arr { - if j == 0 { - fmt.Printf("channnelID: %d, arr[0]: %s\n", channelID, arr[j]) - } - filter := bson.M{"doc_id": "test:0"} - update := bson.M{ - "$addToSet": bson.M{ - fmt.Sprintf("msgs.%d.del_list", index): bson.M{"$each": []string{s}}, - }, - } - _, err := c.UpdateOne(context.Background(), filter, update) - if err != nil { - t.Fatal(err) - } - } - }(i) - } - - for i := 0; i < 1000; i++ { - wg.Add(1) - go func() { - defer wg.Done() - <-ch - var arr []string - for i := 0; i < 500; i++ { - arr = append(arr, strconv.Itoa(1001+i)) - } - rand.Shuffle(len(arr), func(i, j int) { - arr[i], arr[j] = arr[j], arr[i] - }) - for _, s := range arr { - filter := bson.M{"doc_id": "test:0"} - update := bson.M{ - "$addToSet": bson.M{ - fmt.Sprintf("msgs.%d.read_list", index): bson.M{"$each": []string{s}}, - }, - } - _, err := c.UpdateOne(context.Background(), filter, update) - if err != nil { - t.Fatal(err) - } - } - }() - } - - time.Sleep(time.Second * 2) - - close(ch) - - wg.Wait() - -} - -func GetDB() *commonMsgDatabase { - conf := config.NewGlobalConfig() - conf.Mongo.Address = []string{"203.56.175.233:37017"} - // conf.Mongo.Timeout = 60 - conf.Mongo.Database = "openim_v3" - // conf.Mongo.Source = "admin" - conf.Mongo.Username = "root" - conf.Mongo.Password = "openIM123" - conf.Mongo.MaxPoolSize = 100 - conf.RetainChatRecords = 3650 - conf.ChatRecordsClearTime = "0 2 * * 3" - - mongo, err := unrelation.NewMongoDB(context.Background(), &conf.Mongo) - if err != nil { - panic(err) - } - err = mongo.GetDatabase(conf.Mongo.Database).Client().Ping(context.Background(), nil) - if err != nil { - panic(err) - } - return &commonMsgDatabase{ - msgDocDatabase: unrelation.NewMsgMongoDriver(mongo.GetDatabase(conf.Mongo.Database)), - } -} - -func Test_Insert(t *testing.T) { - db := GetDB() - ctx := context.Background() - var arr []any - for i := 0; i < 345; i++ { - if i%2 == 0 { - arr = append(arr, (*unrelationtb.MsgDataModel)(nil)) - continue - } - arr = append(arr, &unrelationtb.MsgDataModel{ - Seq: int64(i), - Content: fmt.Sprintf("test-%d", i), - }) - } - if err := db.BatchInsertBlock(ctx, "test", arr, updateKeyMsg, 1); err != nil { - t.Fatal(err) - } -} - -func Test_Revoke(t *testing.T) { - db := GetDB() - ctx := context.Background() - var arr []any - for i := 0; i < 456; i++ { - arr = append(arr, &unrelationtb.RevokeModel{ - UserID: "uid_" + strconv.Itoa(i), - Nickname: "uname_" + strconv.Itoa(i), - Time: time.Now().UnixMilli(), - }) - } - if err := db.BatchInsertBlock(ctx, "test", arr, updateKeyRevoke, 123); err != nil { - t.Fatal(err) - } -} - -func Test_FindBySeq(t *testing.T) { - if err := log.InitFromConfig("", "", 6, true, false, "", 2, 1, "1,0"); err != nil { - t.Fatal(err) - } - db := GetDB() - ctx := context.Background() - fmt.Println( - db.msgDocDatabase.(*unrelation.MsgMongoDriver).GetMsgBySeqIndexIn1Doc(ctx, "100", "si_100_101:0", []int64{1}), - ) - //res, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, "123456", "test:0", []int64{1, 2, 3}) - //if err != nil { - // t.Fatal(err) - //} - //db.GetMsgBySeqs(ctx, "100", "si_100_101:0", []int64{6}) - //data, _ := json.Marshal(res) - //fmt.Println(string(data)) -} - -//func Test_Delete(t *testing.T) { -// db := GetDB() -// ctx := context.Background() -// var arr []any -// for i := 0; i < 123; i++ { -// arr = append(arr, []string{"uid_1", "uid_2"}) -// } -// if err := db.BatchInsertBlock(ctx, "test", arr, updateKeyDel, 210); err != nil { -// t.Fatal(err) -// } -//} - -func TestName(t *testing.T) { - db := GetDB() - var seqs []int64 - for i := int64(1); i <= 50; i++ { - seqs = append(seqs, i) - } - msgs, err := db.getMsgBySeqsRange(context.Background(), "4931176757", "si_3866692501_4931176757", seqs, seqs[0], seqs[len(seqs)-1]) - if err != nil { - t.Fatal(err) - } - - t.Log(msgs) - -} diff --git a/pkg/common/db/controller/user.go b/pkg/common/db/controller/user.go index bd490342f..9f0845c08 100644 --- a/pkg/common/db/controller/user.go +++ b/pkg/common/db/controller/user.go @@ -87,10 +87,10 @@ type userDatabase struct { tx tx.CtxTx userDB relation.UserModelInterface cache cache.UserCache - mongoDB unrelationtb.UserModelInterface + mongoDB unrelationtb.SubscribeUserModelInterface } -func NewUserDatabase(userDB relation.UserModelInterface, cache cache.UserCache, tx tx.CtxTx, mongoDB unrelationtb.UserModelInterface) UserDatabase { +func NewUserDatabase(userDB relation.UserModelInterface, cache cache.UserCache, tx tx.CtxTx, mongoDB unrelationtb.SubscribeUserModelInterface) UserDatabase { return &userDatabase{userDB: userDB, cache: cache, tx: tx, mongoDB: mongoDB} } @@ -161,7 +161,7 @@ func (u *userDatabase) Create(ctx context.Context, users []*relation.UserModel) } //// Update (non-zero value) externally guarantees that userID exists. -//func (u *userDatabase) Update(ctx context.Context, user *relation.UserModel) (err error) { +//func (u *userDatabase) Update(ctx context.Context, user *relation.SubscribeUserModel) (err error) { // if err := u.userDB.Update(ctx, user); err != nil { // return err // } diff --git a/pkg/common/db/unrelation/msg.go b/pkg/common/db/mgo/msg.go similarity index 58% rename from pkg/common/db/unrelation/msg.go rename to pkg/common/db/mgo/msg.go index 9a82a72dc..23ad10a7a 100644 --- a/pkg/common/db/unrelation/msg.go +++ b/pkg/common/db/mgo/msg.go @@ -1,68 +1,56 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package unrelation +package mgo import ( "context" - "encoding/json" "errors" "fmt" - "time" - - table "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/mgoutil" + "github.com/openimsdk/tools/utils" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" - "google.golang.org/protobuf/proto" + "time" ) var ErrMsgListNotExist = errors.New("user not have msg in mongoDB") -type MsgMongoDriver struct { - MsgCollection *mongo.Collection - model table.MsgDocModel +func NewMsgMongo(db *mongo.Database) (relation.MsgDocModelInterface, error) { + coll := db.Collection(new(relation.MsgDocModel).TableName()) + _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ + Keys: bson.D{ + {Key: "doc_id", Value: 1}, + }, + Options: options.Index().SetUnique(true), + }) + if err != nil { + return nil, errs.Wrap(err) + } + return &MsgMgo{coll: coll}, nil } -func NewMsgMongoDriver(database *mongo.Database) table.MsgDocModelInterface { - collection := database.Collection(table.MsgDocModel{}.TableName()) - return &MsgMongoDriver{MsgCollection: collection} +type MsgMgo struct { + coll *mongo.Collection + model relation.MsgDocModel } -func (m *MsgMongoDriver) PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []table.MsgInfoModel) error { - return m.MsgCollection.FindOneAndUpdate(ctx, bson.M{"doc_id": docID}, bson.M{"$push": bson.M{"msgs": bson.M{"$each": msgsToMongo}}}). - Err() +func (m *MsgMgo) PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []relation.MsgInfoModel) error { + filter := bson.M{"doc_id": docID} + update := bson.M{"$push": bson.M{"msgs": bson.M{"$each": msgsToMongo}}} + return mgoutil.UpdateOne(ctx, m.coll, filter, update, false) } -func (m *MsgMongoDriver) Create(ctx context.Context, model *table.MsgDocModel) error { - _, err := m.MsgCollection.InsertOne(ctx, model) - return err +func (m *MsgMgo) Create(ctx context.Context, model *relation.MsgDocModel) error { + return mgoutil.InsertMany(ctx, m.coll, []*relation.MsgDocModel{model}) } -func (m *MsgMongoDriver) UpdateMsg( - ctx context.Context, - docID string, - index int64, - key string, - value any, -) (*mongo.UpdateResult, error) { +func (m *MsgMgo) UpdateMsg(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error) { var field string if key == "" { field = fmt.Sprintf("msgs.%d", index) @@ -71,21 +59,10 @@ func (m *MsgMongoDriver) UpdateMsg( } filter := bson.M{"doc_id": docID} update := bson.M{"$set": bson.M{field: value}} - res, err := m.MsgCollection.UpdateOne(ctx, filter, update) - if err != nil { - return nil, errs.Wrap(err) - } - return res, nil + return mgoutil.UpdateOneResult(ctx, m.coll, filter, update) } -// PushUnique value must slice. -func (m *MsgMongoDriver) PushUnique( - ctx context.Context, - docID string, - index int64, - key string, - value any, -) (*mongo.UpdateResult, error) { +func (m *MsgMgo) PushUnique(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error) { var field string if key == "" { field = fmt.Sprintf("msgs.%d", index) @@ -98,138 +75,24 @@ func (m *MsgMongoDriver) PushUnique( field: bson.M{"$each": value}, }, } - res, err := m.MsgCollection.UpdateOne(ctx, filter, update) - if err != nil { - return nil, errs.Wrap(err) - } - return res, nil -} - -func (m *MsgMongoDriver) UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error { - _, err := m.MsgCollection.UpdateOne( - ctx, - bson.M{"doc_id": docID}, - bson.M{"$set": bson.M{fmt.Sprintf("msgs.%d.msg", index): msg}}, - ) - if err != nil { - return errs.Wrap(err) - } - return nil -} - -func (m *MsgMongoDriver) UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error { - msg.Status = status - bytes, err := proto.Marshal(msg) - if err != nil { - return errs.Wrap(err) - } - _, err = m.MsgCollection.UpdateOne( - ctx, - bson.M{"doc_id": docID}, - bson.M{"$set": bson.M{fmt.Sprintf("msgs.%d.msg", seqIndex): bytes}}, - ) - if err != nil { - return errs.WrapMsg(err, fmt.Sprintf("docID is %s, seqIndex is %d", docID, seqIndex)) - } - return nil -} - -func (m *MsgMongoDriver) FindOneByDocID(ctx context.Context, docID string) (*table.MsgDocModel, error) { - doc := &table.MsgDocModel{} - err := m.MsgCollection.FindOne(ctx, bson.M{"doc_id": docID}).Decode(doc) - return doc, err -} - -func (m *MsgMongoDriver) GetMsgDocModelByIndex( - ctx context.Context, - conversationID string, - index, sort int64, -) (*table.MsgDocModel, error) { - if sort != 1 && sort != -1 { - return nil, errs.ErrArgs.WrapMsg("mongo sort must be 1 or -1") - } - findOpts := options.Find().SetLimit(1).SetSkip(index).SetSort(bson.M{"doc_id": sort}) - cursor, err := m.MsgCollection.Find( - ctx, - bson.M{"doc_id": primitive.Regex{Pattern: fmt.Sprintf("^%s:", conversationID)}}, - findOpts, - ) - if err != nil { - return nil, errs.WrapMsg(err, fmt.Sprintf("conversationID is %s", conversationID)) - } - var msgs []table.MsgDocModel - err = cursor.All(ctx, &msgs) - if err != nil { - return nil, errs.WrapMsg(err, fmt.Sprintf("cursor is %s", cursor.Current.String())) - } - if len(msgs) > 0 { - return &msgs[0], nil - } - return nil, ErrMsgListNotExist -} - -func (m *MsgMongoDriver) GetNewestMsg(ctx context.Context, conversationID string) (*table.MsgInfoModel, error) { - var skip int64 = 0 - for { - msgDocModel, err := m.GetMsgDocModelByIndex(ctx, conversationID, skip, -1) - if err != nil { - return nil, err - } - for i := len(msgDocModel.Msg) - 1; i >= 0; i-- { - if msgDocModel.Msg[i].Msg != nil { - return msgDocModel.Msg[i], nil - } - } - skip++ - } + return mgoutil.UpdateOneResult(ctx, m.coll, filter, update) } -func (m *MsgMongoDriver) GetOldestMsg(ctx context.Context, conversationID string) (*table.MsgInfoModel, error) { - var skip int64 = 0 - for { - msgDocModel, err := m.GetMsgDocModelByIndex(ctx, conversationID, skip, 1) - if err != nil { - return nil, err - } - for i, v := range msgDocModel.Msg { - if v.Msg != nil { - return msgDocModel.Msg[i], nil - } - } - skip++ - } +func (m *MsgMgo) UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error { + filter := bson.M{"doc_id": docID} + update := bson.M{"$set": bson.M{fmt.Sprintf("msgs.%d.msg", index): msg}} + return mgoutil.UpdateOne(ctx, m.coll, filter, update, false) } -func (m *MsgMongoDriver) DeleteMsgsInOneDocByIndex(ctx context.Context, docID string, indexes []int) error { - updates := bson.M{ - "$set": bson.M{}, - } - for _, index := range indexes { - updates["$set"].(bson.M)[fmt.Sprintf("msgs.%d", index)] = bson.M{ - "msg": nil, - } - } - _, err := m.MsgCollection.UpdateMany(ctx, bson.M{"doc_id": docID}, updates) - if err != nil { - return errs.WrapMsg(err, fmt.Sprintf("docID is %s, indexes is %v", docID, indexes)) - } - return nil +func (m *MsgMgo) IsExistDocID(ctx context.Context, docID string) (bool, error) { + return mgoutil.Exist(ctx, m.coll, bson.M{"doc_id": docID}) } -func (m *MsgMongoDriver) DeleteDocs(ctx context.Context, docIDs []string) error { - if docIDs == nil { - return nil - } - _, err := m.MsgCollection.DeleteMany(ctx, bson.M{"doc_id": bson.M{"$in": docIDs}}) - return err +func (m *MsgMgo) FindOneByDocID(ctx context.Context, docID string) (*relation.MsgDocModel, error) { + return mgoutil.FindOne[*relation.MsgDocModel](ctx, m.coll, bson.M{"doc_id": docID}) } -func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc( - ctx context.Context, - userID string, - docID string, - seqs []int64, -) (msgs []*table.MsgInfoModel, err error) { +func (m *MsgMgo) GetMsgBySeqIndexIn1Doc(ctx context.Context, userID, docID string, seqs []int64) ([]*relation.MsgInfoModel, error) { indexs := make([]int64, 0, len(seqs)) for _, seq := range seqs { indexs = append(indexs, m.model.GetMsgIndex(seq)) @@ -270,20 +133,14 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc( {Key: "msgs.del_list", Value: 0}, }}}, } - - cur, err := m.MsgCollection.Aggregate(ctx, pipeline) + msgDocModel, err := mgoutil.Aggregate[*relation.MsgDocModel](ctx, m.coll, pipeline) if err != nil { - return nil, errs.Wrap(err) - } - defer cur.Close(ctx) - var msgDocModel []table.MsgDocModel - if err := cur.All(ctx, &msgDocModel); err != nil { - return nil, errs.WrapMsg(err, fmt.Sprintf("docID is %s, seqs is %v", docID, seqs)) + return nil, err } if len(msgDocModel) == 0 { return nil, errs.Wrap(mongo.ErrNoDocuments) } - msgs = make([]*table.MsgInfoModel, 0, len(msgDocModel[0].Msg)) + msgs := make([]*relation.MsgInfoModel, 0, len(msgDocModel[0].Msg)) for i := range msgDocModel[0].Msg { msg := msgDocModel[0].Msg[i] if msg == nil || msg.Msg == nil { @@ -303,14 +160,14 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc( Seq: msg.Msg.Seq, Ex: msg.Msg.Ex, } - data, err := json.Marshal(&revokeContent) + data, err := utils.JsonMarshal(&revokeContent) if err != nil { return nil, errs.WrapMsg(err, fmt.Sprintf("docID is %s, seqs is %v", docID, seqs)) } elem := sdkws.NotificationElem{ Detail: string(data), } - content, err := json.Marshal(&elem) + content, err := utils.JsonMarshal(&elem) if err != nil { return nil, errs.WrapMsg(err, fmt.Sprintf("docID is %s, seqs is %v", docID, seqs)) } @@ -322,16 +179,72 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc( return msgs, nil } -func (m *MsgMongoDriver) IsExistDocID(ctx context.Context, docID string) (bool, error) { - count, err := m.MsgCollection.CountDocuments(ctx, bson.M{"doc_id": docID}) +func (m *MsgMgo) GetNewestMsg(ctx context.Context, conversationID string) (*relation.MsgInfoModel, error) { + for skip := int64(0); ; skip++ { + msgDocModel, err := m.GetMsgDocModelByIndex(ctx, conversationID, skip, -1) + if err != nil { + return nil, err + } + for i := len(msgDocModel.Msg) - 1; i >= 0; i-- { + if msgDocModel.Msg[i].Msg != nil { + return msgDocModel.Msg[i], nil + } + } + } +} + +func (m *MsgMgo) GetOldestMsg(ctx context.Context, conversationID string) (*relation.MsgInfoModel, error) { + for skip := int64(0); ; skip++ { + msgDocModel, err := m.GetMsgDocModelByIndex(ctx, conversationID, skip, 1) + if err != nil { + return nil, err + } + for i, v := range msgDocModel.Msg { + if v.Msg != nil { + return msgDocModel.Msg[i], nil + } + } + } +} + +func (m *MsgMgo) DeleteDocs(ctx context.Context, docIDs []string) error { + if len(docIDs) == 0 { + return nil + } + return mgoutil.DeleteMany(ctx, m.coll, bson.M{"doc_id": bson.M{"$in": docIDs}}) +} + +func (m *MsgMgo) GetMsgDocModelByIndex(ctx context.Context, conversationID string, index, sort int64) (*relation.MsgDocModel, error) { + if sort != 1 && sort != -1 { + return nil, errs.ErrArgs.WrapMsg("mongo sort must be 1 or -1") + } + opt := options.Find().SetLimit(1).SetSkip(index).SetSort(bson.M{"doc_id": sort}).SetLimit(1) + filter := bson.M{"doc_id": primitive.Regex{Pattern: fmt.Sprintf("^%s:", conversationID)}} + msgs, err := mgoutil.Find[*relation.MsgDocModel](ctx, m.coll, filter, opt) if err != nil { - return false, errs.WrapMsg(err, fmt.Sprintf("docID is %s", docID)) + return nil, err + } + if len(msgs) > 0 { + return msgs[0], nil + } + return nil, errs.Wrap(ErrMsgListNotExist) +} + +func (m *MsgMgo) DeleteMsgsInOneDocByIndex(ctx context.Context, docID string, indexes []int) error { + update := bson.M{ + "$set": bson.M{}, + } + for _, index := range indexes { + update["$set"].(bson.M)[fmt.Sprintf("msgs.%d", index)] = bson.M{ + "msg": nil, + } } - return count > 0, nil + _, err := mgoutil.UpdateMany(ctx, m.coll, bson.M{"doc_id": docID}, update) + return err } -func (m *MsgMongoDriver) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, docID string, indexes []int64) error { - updates := []mongo.WriteModel{} +func (m *MsgMgo) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, docID string, indexes []int64) error { + var updates []mongo.WriteModel for _, index := range indexes { filter := bson.M{ "doc_id": docID, @@ -349,204 +262,110 @@ func (m *MsgMongoDriver) MarkSingleChatMsgsAsRead(ctx context.Context, userID st SetUpdate(update) updates = append(updates, updateModel) } - _, err := m.MsgCollection.BulkWrite(ctx, updates) - return errs.WrapMsg(err, fmt.Sprintf("docID is %s, indexes is %v", docID, indexes)) + if _, err := m.coll.BulkWrite(ctx, updates); err != nil { + return errs.WrapMsg(err, fmt.Sprintf("docID is %s, indexes is %v", docID, indexes)) + } + return nil +} + +func (m *MsgMgo) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int32, []*relation.MsgInfoModel, error) { + var pipe mongo.Pipeline + condition := bson.A{} + if req.SendTime != "" { + // Changed to keyed fields for bson.M to avoid govet errors + condition = append(condition, bson.M{"$eq": bson.A{bson.M{"$dateToString": bson.M{"format": "%Y-%m-%d", "date": bson.M{"$toDate": "$$item.msg.send_time"}}}, req.SendTime}}) + } + if req.ContentType != 0 { + condition = append(condition, bson.M{"$eq": bson.A{"$$item.msg.content_type", req.ContentType}}) + } + if req.SessionType != 0 { + condition = append(condition, bson.M{"$eq": bson.A{"$$item.msg.session_type", req.SessionType}}) + } + if req.RecvID != "" { + condition = append(condition, bson.M{"$regexFind": bson.M{"input": "$$item.msg.recv_id", "regex": req.RecvID}}) + } + if req.SendID != "" { + condition = append(condition, bson.M{"$regexFind": bson.M{"input": "$$item.msg.send_id", "regex": req.SendID}}) + } + + or := bson.A{ + bson.M{"doc_id": bson.M{"$regex": "^si_", "$options": "i"}}, + bson.M{"doc_id": bson.M{"$regex": "^g_", "$options": "i"}}, + bson.M{"doc_id": bson.M{"$regex": "^sg_", "$options": "i"}}, + } + + // Use bson.D with keyed fields to specify the order explicitly + pipe = mongo.Pipeline{ + {{"$match", bson.D{{Key: "$or", Value: or}}}}, + {{"$project", bson.D{ + {Key: "msgs", Value: bson.D{ + {Key: "$filter", Value: bson.D{ + {Key: "input", Value: "$msgs"}, + {Key: "as", Value: "item"}, + {Key: "cond", Value: bson.D{{Key: "$and", Value: condition}}}, + }}, + }}, + {Key: "doc_id", Value: 1}, + }}}, + {{"$unwind", bson.M{"path": "$msgs"}}}, + {{"$sort", bson.M{"msgs.msg.send_time": -1}}}, + } + type docModel struct { + DocID string `bson:"doc_id"` + Msg *relation.MsgInfoModel `bson:"msgs"` + } + msgsDocs, err := mgoutil.Aggregate[*docModel](ctx, m.coll, pipe) + if err != nil { + return 0, nil, err + } + msgs := make([]*relation.MsgInfoModel, 0) + for _, doc := range msgsDocs { + msgInfo := doc.Msg + if msgInfo == nil || msgInfo.Msg == nil { + continue + } + if msgInfo.Revoke != nil { + revokeContent := sdkws.MessageRevokedContent{ + RevokerID: msgInfo.Revoke.UserID, + RevokerRole: msgInfo.Revoke.Role, + ClientMsgID: msgInfo.Msg.ClientMsgID, + RevokerNickname: msgInfo.Revoke.Nickname, + RevokeTime: msgInfo.Revoke.Time, + SourceMessageSendTime: msgInfo.Msg.SendTime, + SourceMessageSendID: msgInfo.Msg.SendID, + SourceMessageSenderNickname: msgInfo.Msg.SenderNickname, + SessionType: msgInfo.Msg.SessionType, + Seq: msgInfo.Msg.Seq, + Ex: msgInfo.Msg.Ex, + } + data, err := utils.JsonMarshal(&revokeContent) + if err != nil { + return 0, nil, errs.WrapMsg(err, "json.Marshal revokeContent") + } + elem := sdkws.NotificationElem{Detail: string(data)} + content, err := utils.JsonMarshal(&elem) + if err != nil { + return 0, nil, errs.WrapMsg(err, "json.Marshal elem") + } + msgInfo.Msg.ContentType = constant.MsgRevokeNotification + msgInfo.Msg.Content = string(content) + } + msgs = append(msgs, msgInfo) + } + start := (req.Pagination.PageNumber - 1) * req.Pagination.ShowNumber + n := int32(len(msgs)) + if start >= n { + return n, []*relation.MsgInfoModel{}, nil + } + if start+req.Pagination.ShowNumber < n { + msgs = msgs[start : start+req.Pagination.ShowNumber] + } else { + msgs = msgs[start:] + } + return n, msgs, nil } -// RangeUserSendCount -// db.msg.aggregate([ -// -// { -// $match: { -// "msgs.msg.send_time": { -// "$gte": 0, -// "$lt": 1788122092317 -// } -// } -// }, -// { -// "$addFields": { -// "msgs": { -// "$filter": { -// "input": "$msgs", -// "as": "item", -// "cond": { -// "$and": [ -// { -// $gte: ["$$item.msg.send_time", 0] -// }, -// { -// $lt: ["$$item.msg.send_time", 1788122092317] -// } -// ] -// } -// } -// } -// } -// }, -// { -// "$project": { -// "_id": 0, -// -// }, -// -// }, -// { -// "$project": { -// "result": { -// "$map": { -// "input": "$msgs", -// "as": "item", -// "in": { -// user_id: "$$item.msg.send_id", -// send_date: { -// $dateToString: { -// format: "%Y-%m-%d", -// date: { -// $toDate: "$$item.msg.send_time" -// } -// } -// } -// } -// } -// } -// }, -// -// }, -// { -// "$unwind": "$result" -// }, -// { -// "$group": { -// _id: "$result.send_date", -// count: { -// $sum: 1 -// }, -// original: { -// $push: "$$ROOT" -// } -// } -// }, -// { -// "$addFields": { -// "dates": "$$ROOT" -// } -// }, -// { -// "$project": { -// "_id": 0, -// "count": 0, -// "dates.original": 0, -// -// }, -// -// }, -// { -// "$group": { -// _id: null, -// count: { -// $sum: 1 -// }, -// dates: { -// $push: "$dates" -// }, -// original: { -// $push: "$original" -// }, -// -// } -// }, -// { -// "$unwind": "$original" -// }, -// { -// "$unwind": "$original" -// }, -// { -// "$group": { -// _id: "$original.result.user_id", -// count: { -// $sum: 1 -// }, -// original: { -// $push: "$dates" -// }, -// -// } -// }, -// { -// "$addFields": { -// "dates": { -// $arrayElemAt: ["$original", 0] -// } -// } -// }, -// { -// "$project": { -// original: 0 -// } -// }, -// { -// $sort: { -// count: - 1 -// } -// }, -// { -// "$group": { -// _id: null, -// user_count: { -// $sum: 1 -// }, -// users: { -// $push: "$$ROOT" -// }, -// -// } -// }, -// { -// "$addFields": { -// "dates": { -// $arrayElemAt: ["$users", 0] -// } -// } -// }, -// { -// "$addFields": { -// "dates": "$dates.dates" -// } -// }, -// { -// "$project": { -// _id: 0, -// "users.dates": 0, -// -// } -// }, -// { -// "$addFields": { -// "msg_count": { -// $sum: "$users.count" -// } -// } -// }, -// { -// "$addFields": { -// users: { -// $slice: ["$users", 0, 10] -// } -// } -// } -// -// ]). -func (m *MsgMongoDriver) RangeUserSendCount( - ctx context.Context, - start time.Time, - end time.Time, - group bool, - ase bool, - pageNumber int32, - showNumber int32, -) (msgCount int64, userCount int64, users []*table.UserCount, dateCount map[string]int64, err error) { +func (m *MsgMgo) RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*relation.UserCount, dateCount map[string]int64, err error) { var sort int if ase { sort = 1 @@ -773,21 +592,16 @@ func (m *MsgMongoDriver) RangeUserSendCount( }, }, } - cur, err := m.MsgCollection.Aggregate(ctx, pipeline, options.Aggregate().SetAllowDiskUse(true)) + result, err := mgoutil.Aggregate[*Result](ctx, m.coll, pipeline, options.Aggregate().SetAllowDiskUse(true)) if err != nil { - return 0, 0, nil, nil, errs.Wrap(err) - } - defer cur.Close(ctx) - var result []Result - if err = cur.All(ctx, &result); err != nil { - return 0, 0, nil, nil, errs.Wrap(err) + return 0, 0, nil, nil, err } if len(result) == 0 { return 0, 0, nil, nil, errs.Wrap(err) } - users = make([]*table.UserCount, len(result[0].Users)) + users = make([]*relation.UserCount, len(result[0].Users)) for i, r := range result[0].Users { - users[i] = &table.UserCount{ + users[i] = &relation.UserCount{ UserID: r.UserID, Count: r.Count, } @@ -799,14 +613,7 @@ func (m *MsgMongoDriver) RangeUserSendCount( return result[0].MsgCount, result[0].UserCount, users, dateCount, nil } -func (m *MsgMongoDriver) RangeGroupSendCount( - ctx context.Context, - start time.Time, - end time.Time, - ase bool, - pageNumber int32, - showNumber int32, -) (msgCount int64, userCount int64, groups []*table.GroupCount, dateCount map[string]int64, err error) { +func (m *MsgMgo) RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*relation.GroupCount, dateCount map[string]int64, err error) { var sort int if ase { sort = 1 @@ -1022,21 +829,16 @@ func (m *MsgMongoDriver) RangeGroupSendCount( }, }, } - cur, err := m.MsgCollection.Aggregate(ctx, pipeline, options.Aggregate().SetAllowDiskUse(true)) + result, err := mgoutil.Aggregate[*Result](ctx, m.coll, pipeline, options.Aggregate().SetAllowDiskUse(true)) if err != nil { - return 0, 0, nil, nil, errs.Wrap(err) - } - defer cur.Close(ctx) - var result []Result - if err = cur.All(ctx, &result); err != nil { - return 0, 0, nil, nil, errs.Wrap(err) + return 0, 0, nil, nil, err } if len(result) == 0 { return 0, 0, nil, nil, errs.Wrap(err) } - groups = make([]*table.GroupCount, len(result[0].Groups)) + groups = make([]*relation.GroupCount, len(result[0].Groups)) for i, r := range result[0].Groups { - groups[i] = &table.GroupCount{ + groups[i] = &relation.GroupCount{ GroupID: r.GroupID, Count: r.Count, } @@ -1048,113 +850,51 @@ func (m *MsgMongoDriver) RangeGroupSendCount( return result[0].MsgCount, result[0].UserCount, groups, dateCount, nil } -func (m *MsgMongoDriver) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int32, []*table.MsgInfoModel, error) { - total, msgs, err := m.searchMessage(ctx, req) - if err != nil { - return 0, nil, err - } - return total, msgs, nil -} - -func (m *MsgMongoDriver) searchMessage(ctx context.Context, req *msg.SearchMessageReq) (int32, []*table.MsgInfoModel, error) { - var pipe mongo.Pipeline - condition := bson.A{} - if req.SendTime != "" { - // Changed to keyed fields for bson.M to avoid govet errors - condition = append(condition, bson.M{"$eq": bson.A{bson.M{"$dateToString": bson.M{"format": "%Y-%m-%d", "date": bson.M{"$toDate": "$$item.msg.send_time"}}}, req.SendTime}}) - } - if req.ContentType != 0 { - condition = append(condition, bson.M{"$eq": bson.A{"$$item.msg.content_type", req.ContentType}}) - } - if req.SessionType != 0 { - condition = append(condition, bson.M{"$eq": bson.A{"$$item.msg.session_type", req.SessionType}}) - } - if req.RecvID != "" { - condition = append(condition, bson.M{"$regexFind": bson.M{"input": "$$item.msg.recv_id", "regex": req.RecvID}}) - } - if req.SendID != "" { - condition = append(condition, bson.M{"$regexFind": bson.M{"input": "$$item.msg.send_id", "regex": req.SendID}}) - } - - or := bson.A{ - bson.M{"doc_id": bson.M{"$regex": "^si_", "$options": "i"}}, - bson.M{"doc_id": bson.M{"$regex": "^g_", "$options": "i"}}, - bson.M{"doc_id": bson.M{"$regex": "^sg_", "$options": "i"}}, - } - - // Use bson.D with keyed fields to specify the order explicitly - pipe = mongo.Pipeline{ - {{"$match", bson.D{{Key: "$or", Value: or}}}}, - {{"$project", bson.D{ - {Key: "msgs", Value: bson.D{ - {Key: "$filter", Value: bson.D{ - {Key: "input", Value: "$msgs"}, - {Key: "as", Value: "item"}, - {Key: "cond", Value: bson.D{{Key: "$and", Value: condition}}}, - }}, - }}, - {Key: "doc_id", Value: 1}, - }}}, - {{"$unwind", bson.M{"path": "$msgs"}}}, - {{"$sort", bson.M{"msgs.msg.send_time": -1}}}, - } - cursor, err := m.MsgCollection.Aggregate(ctx, pipe) - if err != nil { - return 0, nil, err - } - type docModel struct { - DocID string `bson:"doc_id"` - Msg *table.MsgInfoModel `bson:"msgs"` - } - var msgsDocs []docModel - err = cursor.All(ctx, &msgsDocs) - if err != nil { - return 0, nil, errs.WrapMsg(err, "cursor.All msgsDocs") - } - log.ZDebug(ctx, "query mongoDB", "result", msgsDocs) - msgs := make([]*table.MsgInfoModel, 0) - for _, doc := range msgsDocs { - msgInfo := doc.Msg - if msgInfo == nil || msgInfo.Msg == nil { +func (m *MsgMgo) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) { + for _, conversationID := range conversationIDs { + regex := primitive.Regex{Pattern: fmt.Sprintf("^%s:", conversationID)} + msgDocs, err := mgoutil.Find[*relation.MsgDocModel](ctx, m.coll, bson.M{"doc_id": regex}) + if err != nil { + log.ZError(ctx, "convertAll find msg doc failed", err, "conversationID", conversationID) continue } - if msgInfo.Revoke != nil { - revokeContent := sdkws.MessageRevokedContent{ - RevokerID: msgInfo.Revoke.UserID, - RevokerRole: msgInfo.Revoke.Role, - ClientMsgID: msgInfo.Msg.ClientMsgID, - RevokerNickname: msgInfo.Revoke.Nickname, - RevokeTime: msgInfo.Revoke.Time, - SourceMessageSendTime: msgInfo.Msg.SendTime, - SourceMessageSendID: msgInfo.Msg.SendID, - SourceMessageSenderNickname: msgInfo.Msg.SenderNickname, - SessionType: msgInfo.Msg.SessionType, - Seq: msgInfo.Msg.Seq, - Ex: msgInfo.Msg.Ex, + if len(msgDocs) < 1 { + continue + } + log.ZInfo(ctx, "msg doc convert", "conversationID", conversationID, "len(msgDocs)", len(msgDocs)) + if len(msgDocs[0].Msg) == int(m.model.GetSingleGocMsgNum5000()) { + if err := mgoutil.DeleteMany(ctx, m.coll, bson.M{"doc_id": regex}); err != nil { + log.ZError(ctx, "convertAll delete many failed", err, "conversationID", conversationID) + continue } - data, err := json.Marshal(&revokeContent) - if err != nil { - return 0, nil, errs.WrapMsg(err, "json.Marshal revokeContent") + var newMsgDocs []any + for _, msgDoc := range msgDocs { + if int64(len(msgDoc.Msg)) == m.model.GetSingleGocMsgNum() { + continue + } + var index int64 + for index < int64(len(msgDoc.Msg)) { + msg := msgDoc.Msg[index] + if msg != nil && msg.Msg != nil { + msgDocModel := relation.MsgDocModel{DocID: m.model.GetDocID(conversationID, msg.Msg.Seq)} + end := index + m.model.GetSingleGocMsgNum() + if int(end) >= len(msgDoc.Msg) { + msgDocModel.Msg = msgDoc.Msg[index:] + } else { + msgDocModel.Msg = msgDoc.Msg[index:end] + } + newMsgDocs = append(newMsgDocs, msgDocModel) + index = end + } else { + break + } + } } - elem := sdkws.NotificationElem{Detail: string(data)} - content, err := json.Marshal(&elem) - if err != nil { - return 0, nil, errs.WrapMsg(err, "json.Marshal elem") + if err = mgoutil.InsertMany(ctx, m.coll, newMsgDocs); err != nil { + log.ZError(ctx, "convertAll insert many failed", err, "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs)) + } else { + log.ZInfo(ctx, "msg doc convert", "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs)) } - msgInfo.Msg.ContentType = constant.MsgRevokeNotification - msgInfo.Msg.Content = string(content) } - msgs = append(msgs, msgInfo) } - start := (req.Pagination.PageNumber - 1) * req.Pagination.ShowNumber - n := int32(len(msgs)) - if start >= n { - return n, []*table.MsgInfoModel{}, nil - } - if start+req.Pagination.ShowNumber < n { - msgs = msgs[start : start+req.Pagination.ShowNumber] - } else { - msgs = msgs[start:] - } - return n, msgs, nil } diff --git a/pkg/common/db/unrelation/user.go b/pkg/common/db/mgo/subscribe.go similarity index 94% rename from pkg/common/db/unrelation/user.go rename to pkg/common/db/mgo/subscribe.go index 7d1662ee4..d3077f109 100644 --- a/pkg/common/db/unrelation/user.go +++ b/pkg/common/db/mgo/subscribe.go @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -package unrelation +package mgo import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation" "github.com/openimsdk/tools/errs" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" @@ -35,9 +35,9 @@ const ( MaximumSubscription = 3000 ) -func NewUserMongoDriver(database *mongo.Database) unrelation.UserModelInterface { +func NewUserMongoDriver(database *mongo.Database) relation.SubscribeUserModelInterface { return &UserMongoDriver{ - userCollection: database.Collection(unrelation.SubscribeUser), + userCollection: database.Collection(relation.SubscribeUser), } } @@ -155,7 +155,7 @@ func (u *UserMongoDriver) RemoveSubscribedListFromUser(ctx context.Context, user // GetAllSubscribeList Get all users subscribed by this user. func (u *UserMongoDriver) GetAllSubscribeList(ctx context.Context, userID string) (userIDList []string, err error) { - var user unrelation.UserModel + var user relation.SubscribeUserModel cursor := u.userCollection.FindOne( ctx, bson.M{"user_id": SubscriptionPrefix + userID}) @@ -172,7 +172,7 @@ func (u *UserMongoDriver) GetAllSubscribeList(ctx context.Context, userID string // GetSubscribedList Get the user subscribed by those users. func (u *UserMongoDriver) GetSubscribedList(ctx context.Context, userID string) (userIDList []string, err error) { - var user unrelation.UserModel + var user relation.SubscribeUserModel cursor := u.userCollection.FindOne( ctx, bson.M{"user_id": SubscribedPrefix + userID}) diff --git a/pkg/common/db/table/unrelation/msg.go b/pkg/common/db/table/relation/msg.go similarity index 90% rename from pkg/common/db/table/unrelation/msg.go rename to pkg/common/db/table/relation/msg.go index ff23d8488..8adfa4370 100644 --- a/pkg/common/db/table/unrelation/msg.go +++ b/pkg/common/db/table/relation/msg.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package unrelation +package relation import ( "context" @@ -110,23 +110,8 @@ type MsgDocModelInterface interface { DeleteMsgsInOneDocByIndex(ctx context.Context, docID string, indexes []int) error MarkSingleChatMsgsAsRead(ctx context.Context, userID string, docID string, indexes []int64) error SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int32, []*MsgInfoModel, error) - RangeUserSendCount( - ctx context.Context, - start time.Time, - end time.Time, - group bool, - ase bool, - pageNumber int32, - showNumber int32, - ) (msgCount int64, userCount int64, users []*UserCount, dateCount map[string]int64, err error) - RangeGroupSendCount( - ctx context.Context, - start time.Time, - end time.Time, - ase bool, - pageNumber int32, - showNumber int32, - ) (msgCount int64, userCount int64, groups []*GroupCount, dateCount map[string]int64, err error) + RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*UserCount, dateCount map[string]int64, err error) + RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*GroupCount, dateCount map[string]int64, err error) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) } @@ -165,11 +150,11 @@ func (m MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[st return t } -func (m MsgDocModel) GetMsgIndex(seq int64) int64 { +func (MsgDocModel) GetMsgIndex(seq int64) int64 { return (seq - 1) % singleGocMsgNum } -func (m MsgDocModel) indexGen(conversationID string, seqSuffix int64) string { +func (MsgDocModel) indexGen(conversationID string, seqSuffix int64) string { return conversationID + ":" + strconv.FormatInt(seqSuffix, 10) } diff --git a/pkg/common/db/table/unrelation/user.go b/pkg/common/db/table/relation/subscribe.go similarity index 86% rename from pkg/common/db/table/unrelation/user.go rename to pkg/common/db/table/relation/subscribe.go index 1505829e5..4e184cf38 100644 --- a/pkg/common/db/table/unrelation/user.go +++ b/pkg/common/db/table/relation/subscribe.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package unrelation +package relation import "context" @@ -21,18 +21,18 @@ const ( SubscribeUser = "subscribe_user" ) -// UserModel collection structure. -type UserModel struct { +// SubscribeUserModel collection structure. +type SubscribeUserModel struct { UserID string `bson:"user_id" json:"userID"` UserIDList []string `bson:"user_id_list" json:"userIDList"` } -func (UserModel) TableName() string { +func (SubscribeUserModel) TableName() string { return SubscribeUser } -// UserModelInterface Operation interface of user mongodb. -type UserModelInterface interface { +// SubscribeUserModelInterface Operation interface of user mongodb. +type SubscribeUserModelInterface interface { // AddSubscriptionList Subscriber's handling of thresholds. AddSubscriptionList(ctx context.Context, userID string, userIDList []string) error // UnsubscriptionList Handling of unsubscribe. diff --git a/pkg/common/db/table/unrelation/common.go b/pkg/common/db/table/unrelation/common.go deleted file mode 100644 index bd46ccc2a..000000000 --- a/pkg/common/db/table/unrelation/common.go +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package unrelation - -type CommonUserModel struct { - UserID string `bson:"user_id"` - UserName string `bson:"user_name"` -} diff --git a/pkg/common/db/table/unrelation/doc.go b/pkg/common/db/table/unrelation/doc.go deleted file mode 100644 index b69ef5526..000000000 --- a/pkg/common/db/table/unrelation/doc.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright © 2024 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package unrelation // import "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation" diff --git a/pkg/common/db/table/unrelation/super_group.go b/pkg/common/db/table/unrelation/super_group.go deleted file mode 100644 index ff70a62c9..000000000 --- a/pkg/common/db/table/unrelation/super_group.go +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package unrelation - -// import ( -// "context" -//) -// -// const ( -// CSuperGroup = "super_group" -// CUserToSuperGroup = "user_to_super_group" -//) -// -// type SuperGroupModel struct { -// GroupID string `bson:"group_id" json:"groupID"` -// MemberIDs []string `bson:"member_id_list" json:"memberIDList"` -//} -// -// func (SuperGroupModel) TableName() string { -// return CSuperGroup -//} -// -//type UserToSuperGroupModel struct { -// UserID string `bson:"user_id" json:"userID"` -// GroupIDs []string `bson:"group_id_list" json:"groupIDList"` -//} -// -//func (UserToSuperGroupModel) TableName() string { -// return CUserToSuperGroup -//} -// -//type SuperGroupModelInterface interface { -// CreateSuperGroup(ctx context.Context, groupID string, initMemberIDs []string) error -// TakeSuperGroup(ctx context.Context, groupID string) (group *SuperGroupModel, err error) -// FindSuperGroup(ctx context.Context, groupIDs []string) (groups []*SuperGroupModel, err error) -// AddUserToSuperGroup(ctx context.Context, groupID string, userIDs []string) error -// RemoverUserFromSuperGroup(ctx context.Context, groupID string, userIDs []string) error -// GetSuperGroupByUserID(ctx context.Context, userID string) (*UserToSuperGroupModel, error) -// DeleteSuperGroup(ctx context.Context, groupID string) error -// RemoveGroupFromUser(ctx context.Context, groupID string, userIDs []string) error -//} diff --git a/pkg/common/db/unrelation/doc.go b/pkg/common/db/unrelation/doc.go deleted file mode 100644 index 99b9c3407..000000000 --- a/pkg/common/db/unrelation/doc.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright © 2024 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package unrelation // import "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" diff --git a/pkg/common/db/unrelation/mongo.go b/pkg/common/db/unrelation/mongo.go index 4ceeccd9b..dff69d3c3 100644 --- a/pkg/common/db/unrelation/mongo.go +++ b/pkg/common/db/unrelation/mongo.go @@ -23,10 +23,8 @@ import ( "time" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/mw/specialerror" - "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) @@ -127,44 +125,3 @@ func (m *Mongo) GetClient() *mongo.Client { func (m *Mongo) GetDatabase(database string) *mongo.Database { return m.db.Database(database) } - -// CreateMsgIndex creates an index for messages in MongoDB. -func (m *Mongo) CreateMsgIndex() error { - return m.createMongoIndex(unrelation.Msg, true, "doc_id") -} - -// createMongoIndex creates an index in a MongoDB collection. -func (m *Mongo) createMongoIndex(collection string, isUnique bool, keys ...string) error { - db := m.GetDatabase(m.mongoConf.Database).Collection(collection) - opts := options.CreateIndexes().SetMaxTime(10 * time.Second) - indexView := db.Indexes() - - keysDoc := buildIndexKeys(keys) - - index := mongo.IndexModel{ - Keys: keysDoc, - } - if isUnique { - index.Options = options.Index().SetUnique(true) - } - - _, err := indexView.CreateOne(context.Background(), index, opts) - if err != nil { - return errs.WrapMsg(err, "CreateIndex") - } - return nil -} - -// buildIndexKeys builds the BSON document for index keys. -func buildIndexKeys(keys []string) bson.D { - keysDoc := bson.D{} - for _, key := range keys { - direction := 1 // default direction is ascending - if strings.HasPrefix(key, "-") { - direction = -1 // descending order for prefixed with "-" - key = strings.TrimLeft(key, "-") - } - keysDoc = append(keysDoc, bson.E{Key: key, Value: direction}) - } - return keysDoc -} diff --git a/pkg/common/db/unrelation/msg_convert.go b/pkg/common/db/unrelation/msg_convert.go deleted file mode 100644 index 7d90bab7c..000000000 --- a/pkg/common/db/unrelation/msg_convert.go +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package unrelation - -import ( - "context" - "fmt" - - table "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation" - "github.com/openimsdk/tools/log" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/bson/primitive" -) - -func (m *MsgMongoDriver) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) { - for _, conversationID := range conversationIDs { - regex := primitive.Regex{Pattern: fmt.Sprintf("^%s:", conversationID)} - cursor, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": regex}) - if err != nil { - log.ZError(ctx, "convertAll find msg doc failed", err, "conversationID", conversationID) - continue - } - var msgDocs []table.MsgDocModel - err = cursor.All(ctx, &msgDocs) - if err != nil { - log.ZError(ctx, "convertAll cursor all failed", err, "conversationID", conversationID) - continue - } - if len(msgDocs) < 1 { - continue - } - log.ZInfo(ctx, "msg doc convert", "conversationID", conversationID, "len(msgDocs)", len(msgDocs)) - if len(msgDocs[0].Msg) == int(m.model.GetSingleGocMsgNum5000()) { - if _, err := m.MsgCollection.DeleteMany(ctx, bson.M{"doc_id": regex}); err != nil { - log.ZError(ctx, "convertAll delete many failed", err, "conversationID", conversationID) - continue - } - var newMsgDocs []any - for _, msgDoc := range msgDocs { - if int64(len(msgDoc.Msg)) == m.model.GetSingleGocMsgNum() { - continue - } - var index int64 - for index < int64(len(msgDoc.Msg)) { - msg := msgDoc.Msg[index] - if msg != nil && msg.Msg != nil { - msgDocModel := table.MsgDocModel{DocID: m.model.GetDocID(conversationID, msg.Msg.Seq)} - end := index + m.model.GetSingleGocMsgNum() - if int(end) >= len(msgDoc.Msg) { - msgDocModel.Msg = msgDoc.Msg[index:] - } else { - msgDocModel.Msg = msgDoc.Msg[index:end] - } - newMsgDocs = append(newMsgDocs, msgDocModel) - index = end - } else { - break - } - } - } - _, err = m.MsgCollection.InsertMany(ctx, newMsgDocs) - if err != nil { - log.ZError(ctx, "convertAll insert many failed", err, "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs)) - } else { - log.ZInfo(ctx, "msg doc convert", "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs)) - } - } - } -} diff --git a/pkg/common/db/unrelation/super_group.go b/pkg/common/db/unrelation/super_group.go deleted file mode 100644 index 80b5ee1c3..000000000 --- a/pkg/common/db/unrelation/super_group.go +++ /dev/null @@ -1,163 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package unrelation - -// -// import ( -// "context" -// -// "go.mongodb.org/mongo-driver/bson" -// "go.mongodb.org/mongo-driver/mongo" -// "go.mongodb.org/mongo-driver/mongo/options" -// -// "github.com/openimsdk/tools/utils" -// -// "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation" -//) -// -// func NewSuperGroupMongoDriver(database *mongo.Database) unrelation.SuperGroupModelInterface { -// return &SuperGroupMongoDriver{ -// superGroupCollection: database.Collection(unrelation.CSuperGroup), -// userToSuperGroupCollection: database.Collection(unrelation.CUserToSuperGroup), -// } -//} -// -// type SuperGroupMongoDriver struct { -// superGroupCollection *mongo.Collection -// userToSuperGroupCollection *mongo.Collection -//} -// -// func (s *SuperGroupMongoDriver) CreateSuperGroup(ctx context.Context, groupID string, initMemberIDs []string) error { -// _, err := s.superGroupCollection.InsertOne(ctx, &unrelation.SuperGroupModel{ -// GroupID: groupID, -// MemberIDs: initMemberIDs, -// }) -// if err != nil { -// return err -// } -// for _, userID := range initMemberIDs { -// _, err = s.userToSuperGroupCollection.UpdateOne( -// ctx, -// bson.M{"user_id": userID}, -// bson.M{"$addToSet": bson.M{"group_id_list": groupID}}, -// &options.UpdateOptions{ -// Upsert: utils.ToPtr(true), -// }, -// ) -// if err != nil { -// return err -// } -// } -// return nil -//} -// -//func (s *SuperGroupMongoDriver) TakeSuperGroup( -// ctx context.Context, -// groupID string, -//) (group *unrelation.SuperGroupModel, err error) { -// if err := s.superGroupCollection.FindOne(ctx, bson.M{"group_id": groupID}).Decode(&group); err != nil { -// return nil, utils.Wrap(err, "") -// } -// return group, nil -//} -// -//func (s *SuperGroupMongoDriver) FindSuperGroup( -// ctx context.Context, -// groupIDs []string, -//) (groups []*unrelation.SuperGroupModel, err error) { -// cursor, err := s.superGroupCollection.Find(ctx, bson.M{"group_id": bson.M{ -// "$in": groupIDs, -// }}) -// if err != nil { -// return nil, err -// } -// defer cursor.Close(ctx) -// if err := cursor.All(ctx, &groups); err != nil { -// return nil, utils.Wrap(err, "") -// } -// return groups, nil -//} -// -//func (s *SuperGroupMongoDriver) AddUserToSuperGroup(ctx context.Context, groupID string, userIDs []string) error { -// _, err := s.superGroupCollection.UpdateOne( -// ctx, -// bson.M{"group_id": groupID}, -// bson.M{"$addToSet": bson.M{"member_id_list": bson.M{"$each": userIDs}}}, -// ) -// if err != nil { -// return err -// } -// upsert := true -// opts := &options.UpdateOptions{ -// Upsert: &upsert, -// } -// for _, userID := range userIDs { -// _, err = s.userToSuperGroupCollection.UpdateOne( -// ctx, -// bson.M{"user_id": userID}, -// bson.M{"$addToSet": bson.M{"group_id_list": groupID}}, -// opts, -// ) -// if err != nil { -// return utils.Wrap(err, "transaction failed") -// } -// } -// return nil -//} -// -//func (s *SuperGroupMongoDriver) RemoverUserFromSuperGroup(ctx context.Context, groupID string, userIDs []string) error { -// _, err := s.superGroupCollection.UpdateOne( -// ctx, -// bson.M{"group_id": groupID}, -// bson.M{"$pull": bson.M{"member_id_list": bson.M{"$in": userIDs}}}, -// ) -// if err != nil { -// return err -// } -// err = s.RemoveGroupFromUser(ctx, groupID, userIDs) -// if err != nil { -// return err -// } -// return nil -//} -// -//func (s *SuperGroupMongoDriver) GetSuperGroupByUserID( -// ctx context.Context, -// userID string, -//) (*unrelation.UserToSuperGroupModel, error) { -// var user unrelation.UserToSuperGroupModel -// err := s.userToSuperGroupCollection.FindOne(ctx, bson.M{"user_id": userID}).Decode(&user) -// return &user, utils.Wrap(err, "") -//} -// -//func (s *SuperGroupMongoDriver) DeleteSuperGroup(ctx context.Context, groupID string) error { -// group, err := s.TakeSuperGroup(ctx, groupID) -// if err != nil { -// return err -// } -// if _, err := s.superGroupCollection.DeleteOne(ctx, bson.M{"group_id": groupID}); err != nil { -// return utils.Wrap(err, "") -// } -// return s.RemoveGroupFromUser(ctx, groupID, group.MemberIDs) -//} -// -//func (s *SuperGroupMongoDriver) RemoveGroupFromUser(ctx context.Context, groupID string, userIDs []string) error { -// _, err := s.userToSuperGroupCollection.UpdateOne( -// ctx, -// bson.M{"user_id": bson.M{"$in": userIDs}}, -// bson.M{"$pull": bson.M{"group_id_list": groupID}}, -// ) -// return utils.Wrap(err, "") -//}