parent
d98f63734b
commit
ce3b4eec08
@ -1,119 +0,0 @@
|
|||||||
// Copyright © 2023 OpenIM. All rights reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package msgtransfer
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"github.com/OpenIMSDK/protocol/constant"
|
|
||||||
pbmsg "github.com/OpenIMSDK/protocol/msg"
|
|
||||||
"github.com/OpenIMSDK/tools/log"
|
|
||||||
"github.com/OpenIMSDK/tools/utils"
|
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
|
|
||||||
kfk "github.com/openimsdk/open-im-server/v3/pkg/common/kafka"
|
|
||||||
|
|
||||||
"github.com/IBM/sarama"
|
|
||||||
"google.golang.org/protobuf/proto"
|
|
||||||
)
|
|
||||||
|
|
||||||
type PersistentConsumerHandler struct {
|
|
||||||
persistentConsumerGroup *kfk.MConsumerGroup
|
|
||||||
chatLogDatabase controller.ChatLogDatabase
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewPersistentConsumerHandler(database controller.ChatLogDatabase) *PersistentConsumerHandler {
|
|
||||||
return &PersistentConsumerHandler{
|
|
||||||
persistentConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{
|
|
||||||
KafkaVersion: sarama.V2_0_0_0,
|
|
||||||
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false,
|
|
||||||
}, []string{config.Config.Kafka.LatestMsgToRedis.Topic},
|
|
||||||
config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql),
|
|
||||||
chatLogDatabase: database,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pc *PersistentConsumerHandler) handleChatWs2Mysql(
|
|
||||||
ctx context.Context,
|
|
||||||
cMsg *sarama.ConsumerMessage,
|
|
||||||
msgKey string,
|
|
||||||
_ sarama.ConsumerGroupSession,
|
|
||||||
) {
|
|
||||||
msg := cMsg.Value
|
|
||||||
var tag bool
|
|
||||||
msgFromMQ := pbmsg.MsgDataToMQ{}
|
|
||||||
err := proto.Unmarshal(msg, &msgFromMQ)
|
|
||||||
if err != nil {
|
|
||||||
log.ZError(ctx, "msg_transfer Unmarshal msg err", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
log.ZDebug(ctx, "handleChatWs2Mysql", "msg", msgFromMQ.MsgData)
|
|
||||||
// Control whether to store history messages (mysql)
|
|
||||||
isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent)
|
|
||||||
// Only process receiver data
|
|
||||||
if isPersist {
|
|
||||||
switch msgFromMQ.MsgData.SessionType {
|
|
||||||
case constant.SingleChatType, constant.NotificationChatType:
|
|
||||||
if msgKey == msgFromMQ.MsgData.RecvID {
|
|
||||||
tag = true
|
|
||||||
}
|
|
||||||
case constant.GroupChatType:
|
|
||||||
if msgKey == msgFromMQ.MsgData.SendID {
|
|
||||||
tag = true
|
|
||||||
}
|
|
||||||
case constant.SuperGroupChatType:
|
|
||||||
tag = true
|
|
||||||
}
|
|
||||||
if tag {
|
|
||||||
log.ZInfo(ctx, "msg_transfer msg persisting", "msg", string(msg))
|
|
||||||
if err = pc.chatLogDatabase.CreateChatLog(&msgFromMQ); err != nil {
|
|
||||||
log.ZError(ctx, "Message insert failed", err, "msg", msgFromMQ.String())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
func (PersistentConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
|
||||||
func (PersistentConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
|
|
||||||
|
|
||||||
func (pc *PersistentConsumerHandler) ConsumeClaim(
|
|
||||||
sess sarama.ConsumerGroupSession,
|
|
||||||
claim sarama.ConsumerGroupClaim,
|
|
||||||
) error {
|
|
||||||
for msg := range claim.Messages() {
|
|
||||||
ctx := pc.persistentConsumerGroup.GetContextFromMsg(msg)
|
|
||||||
log.ZDebug(
|
|
||||||
ctx,
|
|
||||||
"kafka get info to mysql",
|
|
||||||
"msgTopic",
|
|
||||||
msg.Topic,
|
|
||||||
"msgPartition",
|
|
||||||
msg.Partition,
|
|
||||||
"msg",
|
|
||||||
string(msg.Value),
|
|
||||||
"key",
|
|
||||||
string(msg.Key),
|
|
||||||
)
|
|
||||||
if len(msg.Value) != 0 {
|
|
||||||
pc.handleChatWs2Mysql(ctx, msg, string(msg.Key), sess)
|
|
||||||
} else {
|
|
||||||
log.ZError(ctx, "msg get from kafka but is nil", nil, "key", msg.Key)
|
|
||||||
}
|
|
||||||
sess.MarkMessage(msg, "")
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
@ -1,37 +0,0 @@
|
|||||||
// Copyright © 2023 OpenIM. All rights reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package controller
|
|
||||||
|
|
||||||
import (
|
|
||||||
pbmsg "github.com/OpenIMSDK/protocol/msg"
|
|
||||||
|
|
||||||
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
|
|
||||||
)
|
|
||||||
|
|
||||||
type ChatLogDatabase interface {
|
|
||||||
CreateChatLog(msg *pbmsg.MsgDataToMQ) error
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewChatLogDatabase(chatLogModelInterface relationtb.ChatLogModelInterface) ChatLogDatabase {
|
|
||||||
return &chatLogDatabase{chatLogModel: chatLogModelInterface}
|
|
||||||
}
|
|
||||||
|
|
||||||
type chatLogDatabase struct {
|
|
||||||
chatLogModel relationtb.ChatLogModelInterface
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *chatLogDatabase) CreateChatLog(msg *pbmsg.MsgDataToMQ) error {
|
|
||||||
return c.chatLogModel.Create(msg)
|
|
||||||
}
|
|
@ -0,0 +1,80 @@
|
|||||||
|
package newmgo
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/db/newmgo/mgotool"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/pagination"
|
||||||
|
"go.mongodb.org/mongo-driver/bson"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo/options"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewBlackMongo(db *mongo.Database) (relation.BlackModelInterface, error) {
|
||||||
|
return &BlackMgo{
|
||||||
|
coll: db.Collection("black"),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type BlackMgo struct {
|
||||||
|
coll *mongo.Collection
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *BlackMgo) blackFilter(ownerUserID, blockUserID string) bson.M {
|
||||||
|
return bson.M{
|
||||||
|
"owner_user_id": ownerUserID,
|
||||||
|
"block_user_id": blockUserID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *BlackMgo) blacksFilter(blacks []*relation.BlackModel) bson.M {
|
||||||
|
if len(blacks) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
or := make(bson.A, 0, len(blacks))
|
||||||
|
for _, black := range blacks {
|
||||||
|
or = append(or, b.blackFilter(black.OwnerUserID, black.BlockUserID))
|
||||||
|
}
|
||||||
|
return bson.M{"$or": or}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *BlackMgo) Create(ctx context.Context, blacks []*relation.BlackModel) (err error) {
|
||||||
|
return mgotool.InsertMany(ctx, b.coll, blacks)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *BlackMgo) Delete(ctx context.Context, blacks []*relation.BlackModel) (err error) {
|
||||||
|
if len(blacks) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return mgotool.DeleteMany(ctx, b.coll, b.blacksFilter(blacks))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *BlackMgo) UpdateByMap(ctx context.Context, ownerUserID, blockUserID string, args map[string]any) (err error) {
|
||||||
|
if len(args) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return mgotool.UpdateOne(ctx, b.coll, b.blackFilter(ownerUserID, blockUserID), bson.M{"$set": args}, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *BlackMgo) Find(ctx context.Context, blacks []*relation.BlackModel) (blackList []*relation.BlackModel, err error) {
|
||||||
|
return mgotool.Find[*relation.BlackModel](ctx, b.coll, b.blacksFilter(blacks))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *BlackMgo) Take(ctx context.Context, ownerUserID, blockUserID string) (black *relation.BlackModel, err error) {
|
||||||
|
return mgotool.FindOne[*relation.BlackModel](ctx, b.coll, b.blackFilter(ownerUserID, blockUserID))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *BlackMgo) FindOwnerBlacks(ctx context.Context, ownerUserID string, pagination pagination.Pagination) (total int64, blacks []*relation.BlackModel, err error) {
|
||||||
|
return mgotool.FindPage[*relation.BlackModel](ctx, b.coll, bson.M{"owner_user_id": ownerUserID}, pagination)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *BlackMgo) FindOwnerBlackInfos(ctx context.Context, ownerUserID string, userIDs []string) (blacks []*relation.BlackModel, err error) {
|
||||||
|
if len(userIDs) == 0 {
|
||||||
|
return mgotool.Find[*relation.BlackModel](ctx, b.coll, bson.M{"owner_user_id": ownerUserID})
|
||||||
|
}
|
||||||
|
return mgotool.Find[*relation.BlackModel](ctx, b.coll, bson.M{"owner_user_id": ownerUserID, "block_user_id": bson.M{"$in": userIDs}})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *BlackMgo) FindBlackUserIDs(ctx context.Context, ownerUserID string) (blackUserIDs []string, err error) {
|
||||||
|
return mgotool.Find[string](ctx, b.coll, bson.M{"owner_user_id": ownerUserID}, options.Find().SetProjection(bson.M{"_id": 0, "block_user_id": 1}))
|
||||||
|
}
|
@ -1,38 +0,0 @@
|
|||||||
// Copyright © 2023 OpenIM. All rights reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package relation
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"gorm.io/gorm"
|
|
||||||
)
|
|
||||||
|
|
||||||
type MetaDB struct {
|
|
||||||
DB *gorm.DB
|
|
||||||
table any
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewMetaDB(db *gorm.DB, table any) *MetaDB {
|
|
||||||
return &MetaDB{
|
|
||||||
DB: db,
|
|
||||||
table: table,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *MetaDB) db(ctx context.Context) *gorm.DB {
|
|
||||||
db := g.DB.WithContext(ctx).Model(g.table)
|
|
||||||
return db
|
|
||||||
}
|
|
@ -1,157 +0,0 @@
|
|||||||
// Copyright © 2023 OpenIM. All rights reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package relation
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/OpenIMSDK/tools/errs"
|
|
||||||
"github.com/OpenIMSDK/tools/log"
|
|
||||||
"github.com/OpenIMSDK/tools/mw/specialerror"
|
|
||||||
mysqldriver "github.com/go-sql-driver/mysql"
|
|
||||||
"gorm.io/driver/mysql"
|
|
||||||
"gorm.io/gorm"
|
|
||||||
"gorm.io/gorm/logger"
|
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
maxRetry = 100 // number of retries
|
|
||||||
)
|
|
||||||
|
|
||||||
type option struct {
|
|
||||||
Username string
|
|
||||||
Password string
|
|
||||||
Address []string
|
|
||||||
Database string
|
|
||||||
LogLevel int
|
|
||||||
SlowThreshold int
|
|
||||||
MaxLifeTime int
|
|
||||||
MaxOpenConn int
|
|
||||||
MaxIdleConn int
|
|
||||||
Connect func(dsn string, maxRetry int) (*gorm.DB, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// newMysqlGormDB Initialize the database connection.
|
|
||||||
func newMysqlGormDB(o *option) (*gorm.DB, error) {
|
|
||||||
err := maybeCreateTable(o)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local",
|
|
||||||
o.Username, o.Password, o.Address[0], o.Database)
|
|
||||||
sqlLogger := log.NewSqlLogger(
|
|
||||||
logger.LogLevel(o.LogLevel),
|
|
||||||
true,
|
|
||||||
time.Duration(o.SlowThreshold)*time.Millisecond,
|
|
||||||
)
|
|
||||||
db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{
|
|
||||||
Logger: sqlLogger,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
sqlDB, err := db.DB()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
sqlDB.SetConnMaxLifetime(time.Second * time.Duration(o.MaxLifeTime))
|
|
||||||
sqlDB.SetMaxOpenConns(o.MaxOpenConn)
|
|
||||||
sqlDB.SetMaxIdleConns(o.MaxIdleConn)
|
|
||||||
return db, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// maybeCreateTable creates a database if it does not exists.
|
|
||||||
func maybeCreateTable(o *option) error {
|
|
||||||
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local",
|
|
||||||
o.Username, o.Password, o.Address[0], "mysql")
|
|
||||||
|
|
||||||
var db *gorm.DB
|
|
||||||
var err error
|
|
||||||
if f := o.Connect; f != nil {
|
|
||||||
db, err = f(dsn, maxRetry)
|
|
||||||
} else {
|
|
||||||
db, err = connectToDatabase(dsn, maxRetry)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
panic(err.Error() + " Open failed " + dsn)
|
|
||||||
}
|
|
||||||
|
|
||||||
sqlDB, err := db.DB()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer sqlDB.Close()
|
|
||||||
sql := fmt.Sprintf(
|
|
||||||
"CREATE DATABASE IF NOT EXISTS `%s` default charset utf8mb4 COLLATE utf8mb4_unicode_ci",
|
|
||||||
o.Database,
|
|
||||||
)
|
|
||||||
err = db.Exec(sql).Error
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("init db %w", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// connectToDatabase Connection retry for mysql.
|
|
||||||
func connectToDatabase(dsn string, maxRetry int) (*gorm.DB, error) {
|
|
||||||
var db *gorm.DB
|
|
||||||
var err error
|
|
||||||
for i := 0; i <= maxRetry; i++ {
|
|
||||||
db, err = gorm.Open(mysql.Open(dsn), nil)
|
|
||||||
if err == nil {
|
|
||||||
return db, nil
|
|
||||||
}
|
|
||||||
if mysqlErr, ok := err.(*mysqldriver.MySQLError); ok && mysqlErr.Number == 1045 {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
time.Sleep(time.Duration(1) * time.Second)
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewGormDB gorm mysql.
|
|
||||||
func NewGormDB() (*gorm.DB, error) {
|
|
||||||
specialerror.AddReplace(gorm.ErrRecordNotFound, errs.ErrRecordNotFound)
|
|
||||||
specialerror.AddErrHandler(replaceDuplicateKey)
|
|
||||||
|
|
||||||
return newMysqlGormDB(&option{
|
|
||||||
Username: config.Config.Mysql.Username,
|
|
||||||
Password: config.Config.Mysql.Password,
|
|
||||||
Address: config.Config.Mysql.Address,
|
|
||||||
Database: config.Config.Mysql.Database,
|
|
||||||
LogLevel: config.Config.Mysql.LogLevel,
|
|
||||||
SlowThreshold: config.Config.Mysql.SlowThreshold,
|
|
||||||
MaxLifeTime: config.Config.Mysql.MaxLifeTime,
|
|
||||||
MaxOpenConn: config.Config.Mysql.MaxOpenConn,
|
|
||||||
MaxIdleConn: config.Config.Mysql.MaxIdleConn,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func replaceDuplicateKey(err error) errs.CodeError {
|
|
||||||
if IsMysqlDuplicateKey(err) {
|
|
||||||
return errs.ErrDuplicateKey
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func IsMysqlDuplicateKey(err error) bool {
|
|
||||||
if mysqlErr, ok := err.(*mysqldriver.MySQLError); ok {
|
|
||||||
return mysqlErr.Number == 1062
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
@ -1,121 +0,0 @@
|
|||||||
package relation
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"database/sql"
|
|
||||||
"database/sql/driver"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"reflect"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"gorm.io/driver/mysql"
|
|
||||||
"gorm.io/gorm"
|
|
||||||
"gorm.io/gorm/logger"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestMaybeCreateTable(t *testing.T) {
|
|
||||||
t.Run("normal", func(t *testing.T) {
|
|
||||||
err := maybeCreateTable(&option{
|
|
||||||
Username: "root",
|
|
||||||
Password: "openIM123",
|
|
||||||
Address: []string{"172.28.0.1:13306"},
|
|
||||||
Database: "openIM_v3",
|
|
||||||
LogLevel: 4,
|
|
||||||
SlowThreshold: 500,
|
|
||||||
MaxOpenConn: 1000,
|
|
||||||
MaxIdleConn: 100,
|
|
||||||
MaxLifeTime: 60,
|
|
||||||
Connect: connect(expectExec{
|
|
||||||
query: "CREATE DATABASE IF NOT EXISTS `openIM_v3` default charset utf8mb4 COLLATE utf8mb4_unicode_ci",
|
|
||||||
args: nil,
|
|
||||||
}),
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("im-db", func(t *testing.T) {
|
|
||||||
err := maybeCreateTable(&option{
|
|
||||||
Username: "root",
|
|
||||||
Password: "openIM123",
|
|
||||||
Address: []string{"172.28.0.1:13306"},
|
|
||||||
Database: "im-db",
|
|
||||||
LogLevel: 4,
|
|
||||||
SlowThreshold: 500,
|
|
||||||
MaxOpenConn: 1000,
|
|
||||||
MaxIdleConn: 100,
|
|
||||||
MaxLifeTime: 60,
|
|
||||||
Connect: connect(expectExec{
|
|
||||||
query: "CREATE DATABASE IF NOT EXISTS `im-db` default charset utf8mb4 COLLATE utf8mb4_unicode_ci",
|
|
||||||
args: nil,
|
|
||||||
}),
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("err", func(t *testing.T) {
|
|
||||||
e := errors.New("e")
|
|
||||||
err := maybeCreateTable(&option{
|
|
||||||
Username: "root",
|
|
||||||
Password: "openIM123",
|
|
||||||
Address: []string{"172.28.0.1:13306"},
|
|
||||||
Database: "openIM_v3",
|
|
||||||
LogLevel: 4,
|
|
||||||
SlowThreshold: 500,
|
|
||||||
MaxOpenConn: 1000,
|
|
||||||
MaxIdleConn: 100,
|
|
||||||
MaxLifeTime: 60,
|
|
||||||
Connect: connect(expectExec{
|
|
||||||
err: e,
|
|
||||||
}),
|
|
||||||
})
|
|
||||||
if !errors.Is(err, e) {
|
|
||||||
t.Fatalf("err not is e: %v", err)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func connect(e expectExec) func(string, int) (*gorm.DB, error) {
|
|
||||||
return func(string, int) (*gorm.DB, error) {
|
|
||||||
return gorm.Open(mysql.New(mysql.Config{
|
|
||||||
SkipInitializeWithVersion: true,
|
|
||||||
Conn: sql.OpenDB(e),
|
|
||||||
}), &gorm.Config{
|
|
||||||
Logger: logger.Discard,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type expectExec struct {
|
|
||||||
err error
|
|
||||||
query string
|
|
||||||
args []driver.NamedValue
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c expectExec) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
|
|
||||||
if c.err != nil {
|
|
||||||
return nil, c.err
|
|
||||||
}
|
|
||||||
if query != c.query {
|
|
||||||
return nil, fmt.Errorf("query mismatch. expect: %s, got: %s", c.query, query)
|
|
||||||
}
|
|
||||||
if reflect.DeepEqual(args, c.args) {
|
|
||||||
return nil, fmt.Errorf("args mismatch. expect: %v, got: %v", c.args, args)
|
|
||||||
}
|
|
||||||
return noEffectResult{}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e expectExec) Connect(context.Context) (driver.Conn, error) { return e, nil }
|
|
||||||
func (expectExec) Driver() driver.Driver { panic("not implemented") }
|
|
||||||
func (expectExec) Prepare(query string) (driver.Stmt, error) { panic("not implemented") }
|
|
||||||
func (expectExec) Close() (e error) { return }
|
|
||||||
func (expectExec) Begin() (driver.Tx, error) { panic("not implemented") }
|
|
||||||
|
|
||||||
type noEffectResult struct{}
|
|
||||||
|
|
||||||
func (noEffectResult) LastInsertId() (i int64, e error) { return }
|
|
||||||
func (noEffectResult) RowsAffected() (i int64, e error) { return }
|
|
@ -1,51 +0,0 @@
|
|||||||
// Copyright © 2023 OpenIM. All rights reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package relation
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
|
|
||||||
pbmsg "github.com/OpenIMSDK/protocol/msg"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
ChatLogModelTableName = "chat_logs"
|
|
||||||
)
|
|
||||||
|
|
||||||
type ChatLogModel struct {
|
|
||||||
ServerMsgID string `gorm:"column:server_msg_id;primary_key;type:char(64)" json:"serverMsgID"`
|
|
||||||
ClientMsgID string `gorm:"column:client_msg_id;type:char(64)" json:"clientMsgID"`
|
|
||||||
SendID string `gorm:"column:send_id;type:char(64);index:send_id,priority:2" json:"sendID"`
|
|
||||||
RecvID string `gorm:"column:recv_id;type:char(64);index:recv_id,priority:2" json:"recvID"`
|
|
||||||
SenderPlatformID int32 `gorm:"column:sender_platform_id" json:"senderPlatformID"`
|
|
||||||
SenderNickname string `gorm:"column:sender_nick_name;type:varchar(255)" json:"senderNickname"`
|
|
||||||
SenderFaceURL string `gorm:"column:sender_face_url;type:varchar(255);" json:"senderFaceURL"`
|
|
||||||
SessionType int32 `gorm:"column:session_type;index:session_type,priority:2;index:session_type_alone" json:"sessionType"`
|
|
||||||
MsgFrom int32 `gorm:"column:msg_from" json:"msgFrom"`
|
|
||||||
ContentType int32 `gorm:"column:content_type;index:content_type,priority:2;index:content_type_alone" json:"contentType"`
|
|
||||||
Content string `gorm:"column:content;type:varchar(3000)" json:"content"`
|
|
||||||
Status int32 `gorm:"column:status" json:"status"`
|
|
||||||
SendTime time.Time `gorm:"column:send_time;index:sendTime;index:content_type,priority:1;index:session_type,priority:1;index:recv_id,priority:1;index:send_id,priority:1" json:"sendTime"`
|
|
||||||
CreateTime time.Time `gorm:"column:create_time" json:"createTime"`
|
|
||||||
Ex string `gorm:"column:ex;type:varchar(1024)" json:"ex"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ChatLogModel) TableName() string {
|
|
||||||
return ChatLogModelTableName
|
|
||||||
}
|
|
||||||
|
|
||||||
type ChatLogModelInterface interface {
|
|
||||||
Create(msg *pbmsg.MsgDataToMQ) error
|
|
||||||
}
|
|
@ -1,15 +0,0 @@
|
|||||||
// Copyright © 2023 OpenIM. All rights reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package relation // import "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
|
|
Loading…
Reference in new issue