diff --git a/src/common/config/config.go b/src/common/config/config.go new file mode 100644 index 000000000..362225916 --- /dev/null +++ b/src/common/config/config.go @@ -0,0 +1,150 @@ +package config + +import ( + "gopkg.in/yaml.v3" + "io/ioutil" +) + +var Config config + +type config struct { + ServerIP string `yaml:"serverip"` + + Api struct { + GinPort []int `yaml:"openImApiPort"` + } + + Credential struct { + Tencent struct { + AppID string `yaml:"appID"` + Region string `yaml:"region"` + Bucket string `yaml:"bucket"` + SecretID string `yaml:"secretID"` + SecretKey string `yaml:"secretKey"` + } + } + + Mysql struct { + DBAddress []string `yaml:"dbAddress"` + DBUserName string `yaml:"dbUserName"` + DBPassword string `yaml:"dbPassword"` + DBDatabaseName string `yaml:"dbDatabaseName"` + DBTableName string `yaml:"DBTableName"` + DBMsgTableNum int `yaml:"dbMsgTableNum"` + DBMaxOpenConns int `yaml:"dbMaxOpenConns"` + DBMaxIdleConns int `yaml:"dbMaxIdleConns"` + DBMaxLifeTime int `yaml:"dbMaxLifeTime"` + } + Mongo struct { + DBAddress []string `yaml:"dbAddress"` + DBDirect bool `yaml:"dbDirect"` + DBTimeout int `yaml:"dbTimeout"` + DBDatabase []string `yaml:"dbDatabase"` + DBSource string `yaml:"dbSource"` + DBUserName string `yaml:"dbUserName"` + DBPassword string `yaml:"dbPassword"` + DBMaxPoolSize int `yaml:"dbMaxPoolSize"` + DBRetainChatRecords int `yaml:"dbRetainChatRecords"` + } + Redis struct { + DBAddress []string `yaml:"dbAddress"` + DBMaxIdle int `yaml:"dbMaxIdle"` + DBMaxActive int `yaml:"dbMaxActive"` + DBIdleTimeout int `yaml:"dbIdleTimeout"` + DBPassWord string `yaml:"dbPassWord"` + } + RpcPort struct { + OpenImUserPort []int `yaml:"openImUserPort"` + openImFriendPort []int `yaml:"openImFriendPort"` + RpcMessagePort []int `yaml:"rpcMessagePort"` + RpcPushMessagePort []int `yaml:"rpcPushMessagePort"` + OpenImGroupPort []int `yaml:"openImGroupPort"` + RpcModifyUserInfoPort []int `yaml:"rpcModifyUserInfoPort"` + RpcGetTokenPort []int `yaml:"rpcGetTokenPort"` + } + RpcRegisterName struct { + OpenImUserName string `yaml:"openImUserName"` + OpenImFriendName string `yaml:"openImFriendName"` + OpenImOfflineMessageName string `yaml:"openImOfflineMessageName"` + OpenImPushName string `yaml:"openImPushName"` + OpenImOnlineMessageRelayName string `yaml:"openImOnlineMessageRelayName"` + OpenImGroupName string `yaml:"openImGroupName"` + RpcGetTokenName string `yaml:"rpcGetTokenName"` + } + Etcd struct { + EtcdSchema string `yaml:"etcdSchema"` + EtcdAddr []string `yaml:"etcdAddr"` + } + Log struct { + StorageLocation string `yaml:"storageLocation"` + ElasticSearchSwitch bool `yaml:"elasticSearchSwitch"` + ElasticSearchAddr []string `yaml:"elasticSearchAddr"` + ElasticSearchUser string `yaml:"elasticSearchUser"` + ElasticSearchPassword string `yaml:"elasticSearchPassword"` + } + ModuleName struct { + LongConnSvrName string `yaml:"longConnSvrName"` + MsgTransferName string `yaml:"msgTransferName"` + PushName string `yaml:"pushName"` + } + LongConnSvr struct { + WebsocketPort []int `yaml:"websocketPort"` + WebsocketMaxConnNum int `yaml:"websocketMaxConnNum"` + WebsocketMaxMsgLen int `yaml:"websocketMaxMsgLen"` + WebsocketTimeOut int `yaml:"websocketTimeOut"` + } + + Push struct { + Tpns struct { + Ios struct { + AccessID string `yaml:"accessID"` + SecretKey string `yaml:"secretKey"` + } + Android struct { + AccessID string `yaml:"accessID"` + SecretKey string `yaml:"secretKey"` + } + } + } + + Kafka struct { + Ws2mschat struct { + Addr []string `yaml:"addr"` + Topic string `yaml:"topic"` + } + Ms2pschat struct { + Addr []string `yaml:"addr"` + Topic string `yaml:"topic"` + } + ConsumerGroupID struct { + MsgToMongo string `yaml:"msgToMongo"` + MsgToMySql string `yaml:"msgToMySql"` + MsgToPush string `yaml:"msgToPush"` + } + } + + Secret string + + MultiLoginPolicy struct { + OnlyOneTerminalAccess bool `yaml:"onlyOneTerminalAccess"` + MobileAndPCTerminalAccessButOtherTerminalKickEachOther bool `yaml:"mobileAndPCTerminalAccessButOtherTerminalKickEachOther"` + AllTerminalAccess bool `yaml:"allTerminalAccess"` + } + TokenPolicy struct { + AccessSecret string `yaml:"accessSecret"` + AccessExpire int64 `yaml:"accessExpire"` + } +} + +func init() { + bytes, err := ioutil.ReadFile("../config/config.yaml") + if err != nil { + panic(err) + return + } + if err = yaml.Unmarshal(bytes, &Config); err != nil { + panic(err) + return + } + +} diff --git a/src/common/config/error.go b/src/common/config/error.go new file mode 100644 index 000000000..e629cf727 --- /dev/null +++ b/src/common/config/error.go @@ -0,0 +1,38 @@ +package config + +// key = errCode, string = errMsg +type ErrInfo struct { + ErrCode int32 + ErrMsg string +} + +var ( + OK = ErrInfo{0, ""} + + ErrMysql = ErrInfo{100, ""} + ErrMongo = ErrInfo{110, ""} + ErrRedis = ErrInfo{120, ""} + ErrParseToken = ErrInfo{200, "Parse token failed"} + ErrCreateToken = ErrInfo{201, "Create token failed"} + ErrAppServerKey = ErrInfo{300, "key error"} + ErrTencentCredential = ErrInfo{400, ""} + + ErrorUserRegister = ErrInfo{600, "User registration failed"} + ErrAccountExists = ErrInfo{601, "The account is already registered and cannot be registered again"} + ErrUserPassword = ErrInfo{602, "User password error"} + ErrTokenIncorrect = ErrInfo{603, "Invalid token"} + ErrTokenExpired = ErrInfo{604, "Expired token"} + ErrRefreshToken = ErrInfo{605, "Failed to refresh token"} + ErrAddFriend = ErrInfo{606, "Failed to add friends"} + ErrAgreeToAddFriend = ErrInfo{607, "Failed to agree to add friend"} + ErrAddFriendToBlack = ErrInfo{608, "Failed to add friends to the blacklist"} + ErrGetBlackList = ErrInfo{609, "Failed to get blacklist"} + ErrDeleteFriend = ErrInfo{610, "Failed to delete friend"} + ErrGetFriendApplyList = ErrInfo{611, "Failed to get friend application list"} + ErrGetFriendList = ErrInfo{612, "Failed to get friend list"} + ErrRemoveBlackList = ErrInfo{613, "Failed to remove blacklist"} + ErrSearchUserInfo = ErrInfo{614, "Failed to find user information"} + ErrDelAppleDeviceToken = ErrInfo{615, ""} + ErrModifyUserInfo = ErrInfo{616, "update user some attribute failed"} + ErrSetFriendComment = ErrInfo{617, "set friend comment failed"} +) diff --git a/src/common/db/model.go b/src/common/db/model.go new file mode 100644 index 000000000..f76600f8f --- /dev/null +++ b/src/common/db/model.go @@ -0,0 +1,72 @@ +package db + +import ( + "gopkg.in/mgo.v2" +) + +var DB DataBases + +type DataBases struct { + MgoDB mongoDB + RedisDB redisDB + MysqlDB mysqlDB +} + +func key(dbAddress, dbName string) string { + return dbAddress + "_" + dbName +} + +//type Config struct { +// Mongo struct { +// DBAddress []string `yaml:"dbAddress"` +// DBDirect bool `yaml:"dbDirect"` +// DBTimeout int `yaml:"dbTimeout"` +// DBDatabase []string `yaml:"dbDatabase"` +// DBSource string `yaml:"dbSource"` +// DBUserName string `yaml:"dbUserName"` +// DBPassword string `yaml:"dbPassword"` +// DBMaxPoolSize int `yaml:"dbMaxPoolSize"` +// } +// Mysql struct { +// DBAddress []string `yaml:"dbAddress"` +// DBPort int `yaml:"dbPort"` +// DBUserName string `yaml:"dbUserName"` +// DBPassword string `yaml:"dbPassword"` +// DBDatabaseName string `yaml:"dbChatName"` // 默认使用DBAddress[0] +// DBTableName string `yaml:"dbMsgName"` +// DBMsgTableNum int `yaml:"dbMsgTableNum"` +// DBCharset string `yaml:"dbCharset"` +// DBMaxOpenConns int `yaml:"dbMaxOpenConns"` +// DBMaxIdleConns int `yaml:"dbMaxIdleConns"` +// DBMaxLifeTime int `yaml:"dbMaxLifeTime"` +// } +// Redis struct { +// DBAddress string `yaml:"dbAddress"` +// DBPort int `yaml:"dbPort"` +// DBMaxIdle int `yaml:"dbMaxIdle"` +// DBMaxActive int `yaml:"dbMaxActive"` +// DBIdleTimeout int `yaml:"dbIdleTimeout"` +// } +//} + +//func init() { +// bytes, err := ioutil.ReadFile("config/db.yaml") +// if err != nil { +// log.Error("", "", "read db.yaml config fail! err = %s", err.Error()) +// return +// } +// +// if err = yaml.Unmarshal(bytes, &DB.Config); err != nil { +// log.Error("", "", "unmarshal db.yaml config fail! err = %s", err.Error()) +// return +// } +// +// DB.RedisDB.newPool(DB.Config) +// //DB.MysqlDB.sqlxDB(DB.Config.Mysql.DBName[0], DB.Config) +//} +func init() { + DB.RedisDB.newPool() +} +func (d *DataBases) session(dbName string) *mgo.Session { + return d.MgoDB.mgoSession(dbName) +} diff --git a/src/common/db/mongo.go b/src/common/db/mongo.go new file mode 100644 index 000000000..2da6ff3e0 --- /dev/null +++ b/src/common/db/mongo.go @@ -0,0 +1,51 @@ +package db + +import ( + "Open_IM/src/common/config" + "errors" + "fmt" + "gopkg.in/mgo.v2" + "sync" + "time" +) + +type mongoDB struct { + sync.RWMutex + dbMap map[string]*mgo.Session +} + +func (m *mongoDB) mgoSession(dbName string) *mgo.Session { + m.Lock() + defer m.Unlock() + if _, ok := m.dbMap[dbName]; !ok { + if err := m.newMgoSession(dbName); err != nil { + panic(err) + return nil + } + } + return m.dbMap[dbName] +} + +func (m *mongoDB) newMgoSession(dbName string) error { + dailInfo := &mgo.DialInfo{ + Addrs: config.Config.Mongo.DBAddress, + Direct: config.Config.Mongo.DBDirect, + Timeout: time.Second * time.Duration(config.Config.Mongo.DBTimeout), + Database: dbName, + Source: config.Config.Mongo.DBSource, + Username: config.Config.Mongo.DBUserName, + Password: config.Config.Mongo.DBPassword, + PoolLimit: config.Config.Mongo.DBMaxPoolSize, + } + session, err := mgo.DialWithInfo(dailInfo) + if err != nil { + return errors.New(fmt.Sprintf("mongo DialWithInfo fail, err= %s", err.Error())) + } + + if m.dbMap == nil { + m.dbMap = make(map[string]*mgo.Session) + } + + m.dbMap[dbName] = session + return nil +} diff --git a/src/common/db/mongoModel.go b/src/common/db/mongoModel.go new file mode 100644 index 000000000..fdd923216 --- /dev/null +++ b/src/common/db/mongoModel.go @@ -0,0 +1,189 @@ +package db + +import ( + "Open_IM/src/common/config" + "Open_IM/src/common/constant" + pbMsg "Open_IM/src/proto/chat" + "errors" + "github.com/golang/protobuf/proto" + "gopkg.in/mgo.v2" + "gopkg.in/mgo.v2/bson" + "time" +) + +type UserChat struct { + UID string + Msg [][]byte +} + +func (d *DataBases) GetUserChat(uid string, seqBegin, seqEnd int64) (SingleMsg []*pbMsg.MsgFormat, GroupMsg []*pbMsg.MsgFormat, MaxSeq int64, MinSeq int64, err error) { + session := d.session(config.Config.Mongo.DBDatabase[0]).Clone() + if session == nil { + return nil, nil, MaxSeq, MinSeq, errors.New("session == nil") + } + defer session.Close() + + c := session.DB(config.Config.Mongo.DBDatabase[0]).C("chat") + + sChat := UserChat{} + if err = c.Find(bson.M{"uid": uid}).One(&sChat); err != nil { + return nil, nil, MaxSeq, MinSeq, err + } + pChat := pbMsg.MsgSvrToPushSvrChatMsg{} + for i := 0; i < len(sChat.Msg); i++ { + //每次产生新的指针 + temp := new(pbMsg.MsgFormat) + if err = proto.Unmarshal(sChat.Msg[i], &pChat); err != nil { + return nil, nil, MaxSeq, MinSeq, err + } + if pChat.RecvSeq >= seqBegin && pChat.RecvSeq <= seqEnd { + temp.SendID = pChat.SendID + temp.RecvID = pChat.RecvID + temp.MsgFrom = pChat.MsgFrom + temp.Seq = pChat.RecvSeq + temp.ServerMsgID = pChat.MsgID + temp.SendTime = pChat.SendTime + temp.Content = pChat.Content + temp.ContentType = pChat.ContentType + temp.SenderPlatformID = pChat.PlatformID + if pChat.RecvSeq > MaxSeq { + MaxSeq = pChat.RecvSeq + } + if i == 0 { + MinSeq = pChat.RecvSeq + } + if pChat.RecvSeq < MinSeq { + MinSeq = pChat.RecvSeq + } + //单聊消息 + if pChat.SessionType == constant.SingleChatType { + SingleMsg = append(SingleMsg, temp) + } else { + GroupMsg = append(GroupMsg, temp) + } + } + } + + //d.DelUserChat(&sChat) + + return SingleMsg, GroupMsg, MaxSeq, MinSeq, nil +} + +func (d *DataBases) SaveUserChat(uid string, m proto.Message) error { + session := d.session(config.Config.Mongo.DBDatabase[0]).Clone() + if session == nil { + return errors.New("session == nil") + } + defer session.Close() + session.SetMode(mgo.Monotonic, true) + + c := session.DB(config.Config.Mongo.DBDatabase[0]).C("chat") + + n, err := c.Find(bson.M{"uid": uid}).Count() + if err != nil { + return err + } + + if n == 0 { + sChat := UserChat{} + sChat.UID = uid + bMsg, _ := proto.Marshal(m) + sChat.Msg = append(sChat.Msg, bMsg) + + err = c.Insert(&sChat) + if err != nil { + return err + } + } else { + bMsg, err := proto.Marshal(m) + err = c.Update(bson.M{"uid": uid}, bson.M{"$addToSet": bson.M{"msg": bMsg}}) + if err != nil { + return err + } + } + + return nil +} + +func (d *DataBases) DelUserChat(uc *UserChat) { + delMaxIndex := 0 + pbData := pbMsg.WSToMsgSvrChatMsg{} + for i := 0; i < len(uc.Msg); i++ { + if err := proto.Unmarshal(uc.Msg[i], &pbData); err != nil { + delMaxIndex = i + } else { + if time.Now().Unix()-pbData.SendTime > 7*24*3600 { + delMaxIndex = i + } else { + break + } + } + } + + if delMaxIndex > 0 { + uc.Msg = uc.Msg[delMaxIndex:] + + session := d.session(config.Config.Mongo.DBDatabase[0]).Clone() + if session == nil { + return + } + defer session.Close() + + c := session.DB(config.Config.Mongo.DBDatabase[0]).C("chat") + if err := c.Update(bson.M{"uid": uc.UID}, bson.M{"msg": uc.Msg}); err != nil { + return + } + } +} + +func (d *DataBases) DelHistoryChat(days int64, ids []string) error { + session := d.session(config.Config.Mongo.DBDatabase[0]).Clone() + if session == nil { + return errors.New("mgo session == nil") + } + defer session.Close() + + c := session.DB(config.Config.Mongo.DBDatabase[0]).C("chat") + + for i := 0; i < len(ids); i++ { + d.delHistoryUserChat(c, days, ids[i]) + //time.Sleep(1 * time.Millisecond) + } + + return nil +} + +func (d *DataBases) delHistoryUserChat(c *mgo.Collection, days int64, id string) error { + sChat := UserChat{} + if err := c.Find(bson.M{"uid": id}).One(&sChat); err != nil { + return err + } + + delMaxIndex := 0 + pbData := pbMsg.WSToMsgSvrChatMsg{} + for i := 0; i < len(sChat.Msg); i++ { + if err := proto.Unmarshal(sChat.Msg[i], &pbData); err != nil { + delMaxIndex = i + } else { + if time.Now().Unix()-pbData.SendTime > int64(days)*24*3600 { + delMaxIndex = i + } else { + break + } + } + } + + if delMaxIndex > 0 { + if delMaxIndex < len(sChat.Msg) { + sChat.Msg = sChat.Msg[delMaxIndex:] + } else { + sChat.Msg = sChat.Msg[0:0] + } + + if err := c.Update(bson.M{"uid": sChat.UID}, bson.M{"msg": sChat.Msg}); err != nil { + return err + } + } + + return nil +} diff --git a/src/common/db/mysql.go b/src/common/db/mysql.go new file mode 100644 index 000000000..fbe7d3fa5 --- /dev/null +++ b/src/common/db/mysql.go @@ -0,0 +1,53 @@ +package db + +import ( + "Open_IM/src/common/config" + "fmt" + "github.com/jinzhu/gorm" + _ "github.com/jinzhu/gorm/dialects/mysql" + "sync" + "time" +) + +type mysqlDB struct { + sync.RWMutex + dbMap map[string]*gorm.DB +} + +func (m *mysqlDB) DefaultGormDB() (*gorm.DB, error) { + return m.GormDB(config.Config.Mysql.DBAddress[0], config.Config.Mysql.DBDatabaseName) +} + +func (m *mysqlDB) GormDB(dbAddress, dbName string) (*gorm.DB, error) { + m.Lock() + defer m.Unlock() + + k := key(dbAddress, dbName) + if _, ok := m.dbMap[k]; !ok { + if err := m.open(dbAddress, dbName); err != nil { + return nil, err + } + } + return m.dbMap[k], nil +} + +func (m *mysqlDB) open(dbAddress, dbName string) error { + dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local", + config.Config.Mysql.DBUserName, config.Config.Mysql.DBPassword, dbAddress, dbName) + db, err := gorm.Open("mysql", dsn) + if err != nil { + return err + } + + db.SingularTable(true) + db.DB().SetMaxOpenConns(config.Config.Mysql.DBMaxOpenConns) + db.DB().SetMaxIdleConns(config.Config.Mysql.DBMaxIdleConns) + db.DB().SetConnMaxLifetime(time.Duration(config.Config.Mysql.DBMaxLifeTime) * time.Second) + + if m.dbMap == nil { + m.dbMap = make(map[string]*gorm.DB) + } + k := key(dbAddress, dbName) + m.dbMap[k] = db + return nil +} diff --git a/src/common/db/redis.go b/src/common/db/redis.go new file mode 100644 index 000000000..31c2e86c4 --- /dev/null +++ b/src/common/db/redis.go @@ -0,0 +1,52 @@ +package db + +import ( + "Open_IM/src/common/config" + log2 "Open_IM/src/common/log" + "github.com/garyburd/redigo/redis" + "time" +) + +type redisDB struct { + pool *redis.Pool +} + +func (r *redisDB) newPool() { + r.pool = &redis.Pool{ + MaxIdle: config.Config.Redis.DBMaxIdle, + MaxActive: config.Config.Redis.DBMaxActive, + + IdleTimeout: time.Duration(config.Config.Redis.DBIdleTimeout) * time.Second, + Dial: func() (redis.Conn, error) { + return redis.Dial( + "tcp", + config.Config.Redis.DBAddress[0], + redis.DialReadTimeout(time.Duration(1000)*time.Millisecond), + redis.DialWriteTimeout(time.Duration(1000)*time.Millisecond), + redis.DialConnectTimeout(time.Duration(1000)*time.Millisecond), + redis.DialDatabase(0), + redis.DialPassword(config.Config.Redis.DBPassWord), + ) + }, + } +} + +func (r *redisDB) Exec(cmd string, key interface{}, args ...interface{}) (interface{}, error) { + con := r.pool.Get() + if err := con.Err(); err != nil { + log2.Error("", "", "redis cmd = %v, err = %v", cmd, err) + return nil, err + } + defer con.Close() + + params := make([]interface{}, 0) + params = append(params, key) + + if len(args) > 0 { + for _, v := range args { + params = append(params, v) + } + } + + return con.Do(cmd, params...) +} diff --git a/src/common/db/redisModel.go b/src/common/db/redisModel.go new file mode 100644 index 000000000..27a67b96b --- /dev/null +++ b/src/common/db/redisModel.go @@ -0,0 +1,71 @@ +package db + +import ( + "github.com/garyburd/redigo/redis" +) + +const ( + userIncrSeq = "REDIS_USER_INCR_SEQ:" // user incr seq + appleDeviceToken = "DEVICE_TOKEN" + lastGetSeq = "LAST_GET_SEQ" +) + +//执行用户消息的seq自增操作 +func (d *DataBases) IncrUserSeq(uid string) (int64, error) { + key := userIncrSeq + uid + return redis.Int64(d.RedisDB.Exec("INCR", key)) +} + +//获取最新的seq +func (d *DataBases) GetUserSeq(uid string) (int64, error) { + key := userIncrSeq + uid + return redis.Int64(d.RedisDB.Exec("GET", key)) +} + +//存储苹果的设备token到redis +func (d *DataBases) SetAppleDeviceToken(accountAddress, value string) (err error) { + key := appleDeviceToken + accountAddress + _, err = d.RedisDB.Exec("SET", key, value) + return err +} + +//删除苹果设备token +func (d *DataBases) DelAppleDeviceToken(accountAddress string) (err error) { + key := appleDeviceToken + accountAddress + _, err = d.RedisDB.Exec("DEL", key) + return err +} + +//记录用户上一次主动拉取Seq的值 +func (d *DataBases) SetLastGetSeq(uid string) (err error) { + key := lastGetSeq + uid + _, err = d.RedisDB.Exec("SET", key) + return err +} + +//获取用户上一次主动拉取Seq的值 +func (d *DataBases) GetLastGetSeq(uid string) (int64, error) { + key := userIncrSeq + uid + return redis.Int64(d.RedisDB.Exec("GET", key)) +} + +//Store userid and platform class to redis +func (d *DataBases) SetUserIDAndPlatform(userID, platformClass, value string, ttl int64) error { + key := userID + platformClass + _, err := d.RedisDB.Exec("SET", key, value, "EX", ttl) + return err +} + +//Check exists userid and platform class from redis +func (d *DataBases) ExistsUserIDAndPlatform(userID, platformClass string) (interface{}, error) { + key := userID + platformClass + exists, err := d.RedisDB.Exec("EXISTS", key) + return exists, err +} + +//Get platform class Token +func (d *DataBases) GetPlatformToken(userID, platformClass string) (interface{}, error) { + key := userID + platformClass + token, err := d.RedisDB.Exec("GET", key) + return token, err +}