Merge remote-tracking branch 'origin/v2.3.0release' into v2.3.0release

pull/351/head
skiffer-git 2 years ago
commit 5d78fa9519

@ -7,6 +7,8 @@ serverversion: 2.3.1
etcd: etcd:
etcdSchema: openim #默认即可 etcdSchema: openim #默认即可
etcdAddr: [ 127.0.0.1:2379 ] #单机部署时,默认即可 etcdAddr: [ 127.0.0.1:2379 ] #单机部署时,默认即可
userName:
password:
k8sMod: false #开启k8s模式 使用pod里面环境变量请求services调用服务 而并非etcd k8sMod: false #开启k8s模式 使用pod里面环境变量请求services调用服务 而并非etcd
@ -46,6 +48,8 @@ redis:
enableCluster: false #如果外部redis以集群方式启动需要打开此开关 enableCluster: false #如果外部redis以集群方式启动需要打开此开关
kafka: kafka:
userName:
password:
ws2mschat: ws2mschat:
addr: [ 127.0.0.1:9092 ] #kafka配置默认即可 addr: [ 127.0.0.1:9092 ] #kafka配置默认即可
topic: "ws2ms_chat" #用于mongo和mysql保存消息 topic: "ws2ms_chat" #用于mongo和mysql保存消息

@ -164,6 +164,8 @@ services:
depends_on: depends_on:
- prometheus - prometheus
network_mode: "host" network_mode: "host"
privileged: true
user: root
# -rw-r----- # -rw-r-----
node-exporter: node-exporter:

Binary file not shown.

@ -39,6 +39,7 @@ func initPrometheus() {
promePkg.NewGetNewestSeqTotalCounter() promePkg.NewGetNewestSeqTotalCounter()
promePkg.NewPullMsgBySeqListTotalCounter() promePkg.NewPullMsgBySeqListTotalCounter()
promePkg.NewMsgOnlinePushSuccessCounter() promePkg.NewMsgOnlinePushSuccessCounter()
promePkg.NewOnlineUserGauges()
//promePkg.NewSingleChatMsgRecvSuccessCounter() //promePkg.NewSingleChatMsgRecvSuccessCounter()
//promePkg.NewGroupChatMsgRecvSuccessCounter() //promePkg.NewGroupChatMsgRecvSuccessCounter()
//promePkg.NewWorkSuperGroupChatMsgRecvSuccessCounter() //promePkg.NewWorkSuperGroupChatMsgRecvSuccessCounter()

@ -5,6 +5,7 @@ import (
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db" "Open_IM/pkg/common/db"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/common/token_verify" "Open_IM/pkg/common/token_verify"
"Open_IM/pkg/grpc-etcdv3/getcdv3" "Open_IM/pkg/grpc-etcdv3/getcdv3"
pbRelay "Open_IM/pkg/proto/relay" pbRelay "Open_IM/pkg/proto/relay"
@ -313,6 +314,7 @@ func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token
for _, v := range ws.wsUserToConn { for _, v := range ws.wsUserToConn {
count = count + len(v) count = count + len(v)
} }
promePkg.PromeGaugeInc(promePkg.OnlineUserGauge)
log.Debug(operationID, "WS Add operation", "", "wsUser added", ws.wsUserToConn, "connection_uid", uid, "connection_platform", constant.PlatformIDToName(platformID), "online_user_num", len(ws.wsUserToConn), "online_conn_num", count) log.Debug(operationID, "WS Add operation", "", "wsUser added", ws.wsUserToConn, "connection_uid", uid, "connection_platform", constant.PlatformIDToName(platformID), "online_user_num", len(ws.wsUserToConn), "online_conn_num", count)
} }
@ -352,6 +354,7 @@ func (ws *WServer) delUserConn(conn *UserConn) {
if callbackResp.ErrCode != 0 { if callbackResp.ErrCode != 0 {
log.NewError(operationID, utils.GetSelfFuncName(), "callbackUserOffline failed", callbackResp) log.NewError(operationID, utils.GetSelfFuncName(), "callbackUserOffline failed", callbackResp)
} }
promePkg.PromeGaugeDec(promePkg.OnlineUserGauge)
} }
func (ws *WServer) getUserConn(uid string, platform int) *UserConn { func (ws *WServer) getUserConn(uid string, platform int) *UserConn {

@ -68,11 +68,10 @@ func (s *groupServer) Run() {
//grpc server //grpc server
recvSize := 1024 * 1024 * constant.GroupRPCRecvSize recvSize := 1024 * 1024 * constant.GroupRPCRecvSize
sendSize := 1024 * 1024 * constant.GroupRPCSendSize sendSize := 1024 * 1024 * constant.GroupRPCSendSize
var options = []grpc.ServerOption{ var grpcOpts = []grpc.ServerOption{
grpc.MaxRecvMsgSize(recvSize), grpc.MaxRecvMsgSize(recvSize),
grpc.MaxSendMsgSize(sendSize), grpc.MaxSendMsgSize(sendSize),
} }
var grpcOpts []grpc.ServerOption
if config.Config.Prometheus.Enable { if config.Config.Prometheus.Enable {
promePkg.NewGrpcRequestCounter() promePkg.NewGrpcRequestCounter()
promePkg.NewGrpcRequestFailedCounter() promePkg.NewGrpcRequestFailedCounter()
@ -83,7 +82,7 @@ func (s *groupServer) Run() {
grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor),
}...) }...)
} }
srv := grpc.NewServer(options...) srv := grpc.NewServer(grpcOpts...)
defer srv.GracefulStop() defer srv.GracefulStop()
//Service registers with etcd //Service registers with etcd
pbGroup.RegisterGroupServer(srv, s) pbGroup.RegisterGroupServer(srv, s)

@ -158,6 +158,8 @@ type config struct {
Etcd struct { Etcd struct {
EtcdSchema string `yaml:"etcdSchema"` EtcdSchema string `yaml:"etcdSchema"`
EtcdAddr []string `yaml:"etcdAddr"` EtcdAddr []string `yaml:"etcdAddr"`
UserName string `yaml:"userName"`
Password string `yaml:"password"`
} }
Log struct { Log struct {
StorageLocation string `yaml:"storageLocation"` StorageLocation string `yaml:"storageLocation"`
@ -219,6 +221,8 @@ type config struct {
} }
Kafka struct { Kafka struct {
UserName string `yaml:"userName"`
Password string `yaml:"password"`
Ws2mschat struct { Ws2mschat struct {
Addr []string `yaml:"addr"` Addr []string `yaml:"addr"`
Topic string `yaml:"topic"` Topic string `yaml:"topic"`

@ -90,7 +90,7 @@ func initMysqlDB() {
&GroupRequest{}, &GroupRequest{},
&User{}, &User{},
&Black{}, &ChatLog{}, &Register{}, &Conversation{}, &AppVersion{}, &Department{}, &BlackList{}, &IpLimit{}, &UserIpLimit{}, &Invitation{}, &RegisterAddFriend{}, &Black{}, &ChatLog{}, &Register{}, &Conversation{}, &AppVersion{}, &Department{}, &BlackList{}, &IpLimit{}, &UserIpLimit{}, &Invitation{}, &RegisterAddFriend{},
&ClientInitConfig{}) &ClientInitConfig{}, &UserIpRecord{})
db.Set("gorm:table_options", "CHARSET=utf8") db.Set("gorm:table_options", "CHARSET=utf8")
db.Set("gorm:table_options", "collation=utf8_unicode_ci") db.Set("gorm:table_options", "collation=utf8_unicode_ci")
@ -181,6 +181,11 @@ func initMysqlDB() {
db.Migrator().CreateTable(&ClientInitConfig{}) db.Migrator().CreateTable(&ClientInitConfig{})
} }
if !db.Migrator().HasTable(&UserIpRecord{}) {
fmt.Println("CreateTable Friend")
db.Migrator().CreateTable(&UserIpRecord{})
}
DB.MysqlDB.db = db DB.MysqlDB.db = db
return return
} }

@ -2,7 +2,10 @@ package im_mysql_model
import ( import (
"Open_IM/pkg/common/db" "Open_IM/pkg/common/db"
"Open_IM/pkg/utils"
"time" "time"
"gorm.io/gorm"
) )
func IsLimitRegisterIp(RegisterIp string) (bool, error) { func IsLimitRegisterIp(RegisterIp string) (bool, error) {
@ -83,12 +86,12 @@ func InsertIpRecord(userID, createIp string) error {
func UpdateIpReocord(userID, ip string) (err error) { func UpdateIpReocord(userID, ip string) (err error) {
record := &db.UserIpRecord{UserID: userID, LastLoginIp: ip, LastLoginTime: time.Now()} record := &db.UserIpRecord{UserID: userID, LastLoginIp: ip, LastLoginTime: time.Now()}
result := db.DB.MysqlDB.DefaultGormDB().Model(&db.UserIpRecord{}).Where("user_id=?", userID).Updates(record).Updates("login_times = login_times + 1") result := db.DB.MysqlDB.DefaultGormDB().Model(&db.UserIpRecord{}).Where("user_id=?", userID).Updates(record).Update("login_times", gorm.Expr("login_times+?", 1))
if result.Error != nil { if result.Error != nil {
return result.Error return utils.Wrap(result.Error, "")
} }
if result.RowsAffected == 0 { if result.RowsAffected == 0 {
err = InsertIpRecord(userID, ip) err = InsertIpRecord(userID, ip)
} }
return err return utils.Wrap(err, "")
} }

@ -1,8 +1,10 @@
package kafka package kafka
import ( import (
"github.com/Shopify/sarama" "Open_IM/pkg/common/config"
"sync" "sync"
"github.com/Shopify/sarama"
) )
type Consumer struct { type Consumer struct {
@ -17,8 +19,13 @@ func NewKafkaConsumer(addr []string, topic string) *Consumer {
p := Consumer{} p := Consumer{}
p.Topic = topic p.Topic = topic
p.addr = addr p.addr = addr
consumerConfig := sarama.NewConfig()
consumer, err := sarama.NewConsumer(p.addr, nil) if config.Config.Kafka.UserName != "" && config.Config.Kafka.Password != "" {
consumerConfig.Net.SASL.Enable = true
consumerConfig.Net.SASL.User = config.Config.Kafka.UserName
consumerConfig.Net.SASL.Password = config.Config.Kafka.Password
}
consumer, err := sarama.NewConsumer(p.addr, consumerConfig)
if err != nil { if err != nil {
panic(err.Error()) panic(err.Error())
return nil return nil

@ -1,6 +1,7 @@
package kafka package kafka
import ( import (
"Open_IM/pkg/common/config"
log "Open_IM/pkg/common/log" log "Open_IM/pkg/common/log"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"
"errors" "errors"
@ -25,7 +26,11 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
p.config.Producer.Return.Errors = true p.config.Producer.Return.Errors = true
p.config.Producer.RequiredAcks = sarama.WaitForAll //Set producer Message Reply level 0 1 all p.config.Producer.RequiredAcks = sarama.WaitForAll //Set producer Message Reply level 0 1 all
p.config.Producer.Partitioner = sarama.NewHashPartitioner //Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly p.config.Producer.Partitioner = sarama.NewHashPartitioner //Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly
if config.Config.Kafka.UserName != "" && config.Config.Kafka.Password != "" {
p.config.Net.SASL.Enable = true
p.config.Net.SASL.User = config.Config.Kafka.UserName
p.config.Net.SASL.Password = config.Config.Kafka.Password
}
p.addr = addr p.addr = addr
p.topic = topic p.topic = topic

@ -34,6 +34,7 @@ var (
SingleChatMsgRecvSuccessCounter prometheus.Counter SingleChatMsgRecvSuccessCounter prometheus.Counter
GroupChatMsgRecvSuccessCounter prometheus.Counter GroupChatMsgRecvSuccessCounter prometheus.Counter
WorkSuperGroupChatMsgRecvSuccessCounter prometheus.Counter WorkSuperGroupChatMsgRecvSuccessCounter prometheus.Counter
OnlineUserGauge prometheus.Gauge
//msg-msg //msg-msg
SingleChatMsgProcessSuccessCounter prometheus.Counter SingleChatMsgProcessSuccessCounter prometheus.Counter
@ -326,6 +327,16 @@ func NewWorkSuperGroupChatMsgRecvSuccessCounter() {
}) })
} }
func NewOnlineUserGauges() {
if OnlineUserGauge != nil {
return
}
OnlineUserGauge = promauto.NewGauge(prometheus.GaugeOpts{
Name: "online_user_num",
Help: "The number of online user num",
})
}
func NewSingleChatMsgProcessSuccessCounter() { func NewSingleChatMsgProcessSuccessCounter() {
if SingleChatMsgProcessSuccessCounter != nil { if SingleChatMsgProcessSuccessCounter != nil {
return return

@ -64,3 +64,19 @@ func PromeAdd(counter prometheus.Counter, add int) {
} }
} }
} }
func PromeGaugeInc(gauges prometheus.Gauge) {
if config.Config.Prometheus.Enable {
if gauges != nil {
gauges.Inc()
}
}
}
func PromeGaugeDec(gauges prometheus.Gauge) {
if config.Config.Prometheus.Enable {
if gauges != nil {
gauges.Dec()
}
}
}

@ -39,6 +39,8 @@ var (
func NewResolver(schema, etcdAddr, serviceName string, operationID string) (*Resolver, error) { func NewResolver(schema, etcdAddr, serviceName string, operationID string) (*Resolver, error) {
etcdCli, err := clientv3.New(clientv3.Config{ etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: strings.Split(etcdAddr, ","), Endpoints: strings.Split(etcdAddr, ","),
Username: config.Config.Etcd.UserName,
Password: config.Config.Etcd.Password,
}) })
if err != nil { if err != nil {
log.Error(operationID, "etcd client v3 failed") log.Error(operationID, "etcd client v3 failed")

Loading…
Cancel
Save