diff --git a/config/config.yaml b/config/config.yaml index 2224e914f..f474fc06a 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -7,6 +7,8 @@ serverversion: 2.3.1 etcd: etcdSchema: openim #默认即可 etcdAddr: [ 127.0.0.1:2379 ] #单机部署时,默认即可 + userName: + password: k8sMod: false #开启k8s模式 使用pod里面环境变量请求services调用服务 而并非etcd @@ -46,6 +48,8 @@ redis: enableCluster: false #如果外部redis以集群方式启动,需要打开此开关 kafka: + userName: + password: ws2mschat: addr: [ 127.0.0.1:9092 ] #kafka配置,默认即可 topic: "ws2ms_chat" #用于mongo和mysql保存消息 diff --git a/docker-compose.yaml b/docker-compose.yaml index 8692c7ba0..654a94982 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -164,6 +164,8 @@ services: depends_on: - prometheus network_mode: "host" + privileged: true + user: root # -rw-r----- node-exporter: diff --git a/docker-compose_cfg/grafana.db b/docker-compose_cfg/grafana.db new file mode 100755 index 000000000..362adfe67 Binary files /dev/null and b/docker-compose_cfg/grafana.db differ diff --git a/internal/msg_gateway/gate/relay_rpc_server.go b/internal/msg_gateway/gate/relay_rpc_server.go index 830a6cb80..bb8958680 100644 --- a/internal/msg_gateway/gate/relay_rpc_server.go +++ b/internal/msg_gateway/gate/relay_rpc_server.go @@ -39,6 +39,7 @@ func initPrometheus() { promePkg.NewGetNewestSeqTotalCounter() promePkg.NewPullMsgBySeqListTotalCounter() promePkg.NewMsgOnlinePushSuccessCounter() + promePkg.NewOnlineUserGauges() //promePkg.NewSingleChatMsgRecvSuccessCounter() //promePkg.NewGroupChatMsgRecvSuccessCounter() //promePkg.NewWorkSuperGroupChatMsgRecvSuccessCounter() diff --git a/internal/msg_gateway/gate/ws_server.go b/internal/msg_gateway/gate/ws_server.go index 09af4a4ea..3a84c74b3 100644 --- a/internal/msg_gateway/gate/ws_server.go +++ b/internal/msg_gateway/gate/ws_server.go @@ -5,6 +5,7 @@ import ( "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/token_verify" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbRelay "Open_IM/pkg/proto/relay" @@ -313,6 +314,7 @@ func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token for _, v := range ws.wsUserToConn { count = count + len(v) } + promePkg.PromeGaugeInc(promePkg.OnlineUserGauge) log.Debug(operationID, "WS Add operation", "", "wsUser added", ws.wsUserToConn, "connection_uid", uid, "connection_platform", constant.PlatformIDToName(platformID), "online_user_num", len(ws.wsUserToConn), "online_conn_num", count) } @@ -352,6 +354,7 @@ func (ws *WServer) delUserConn(conn *UserConn) { if callbackResp.ErrCode != 0 { log.NewError(operationID, utils.GetSelfFuncName(), "callbackUserOffline failed", callbackResp) } + promePkg.PromeGaugeDec(promePkg.OnlineUserGauge) } func (ws *WServer) getUserConn(uid string, platform int) *UserConn { diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 740919ee2..27838b4e2 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -68,11 +68,10 @@ func (s *groupServer) Run() { //grpc server recvSize := 1024 * 1024 * constant.GroupRPCRecvSize sendSize := 1024 * 1024 * constant.GroupRPCSendSize - var options = []grpc.ServerOption{ + var grpcOpts = []grpc.ServerOption{ grpc.MaxRecvMsgSize(recvSize), grpc.MaxSendMsgSize(sendSize), } - var grpcOpts []grpc.ServerOption if config.Config.Prometheus.Enable { promePkg.NewGrpcRequestCounter() promePkg.NewGrpcRequestFailedCounter() @@ -83,7 +82,7 @@ func (s *groupServer) Run() { grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), }...) } - srv := grpc.NewServer(options...) + srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() //Service registers with etcd pbGroup.RegisterGroupServer(srv, s) diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 5714dff2e..9d6e60391 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -158,6 +158,8 @@ type config struct { Etcd struct { EtcdSchema string `yaml:"etcdSchema"` EtcdAddr []string `yaml:"etcdAddr"` + UserName string `yaml:"userName"` + Password string `yaml:"password"` } Log struct { StorageLocation string `yaml:"storageLocation"` @@ -219,6 +221,8 @@ type config struct { } Kafka struct { + UserName string `yaml:"userName"` + Password string `yaml:"password"` Ws2mschat struct { Addr []string `yaml:"addr"` Topic string `yaml:"topic"` diff --git a/pkg/common/db/mysql.go b/pkg/common/db/mysql.go index 75d3e6887..2da7add65 100644 --- a/pkg/common/db/mysql.go +++ b/pkg/common/db/mysql.go @@ -90,7 +90,7 @@ func initMysqlDB() { &GroupRequest{}, &User{}, &Black{}, &ChatLog{}, &Register{}, &Conversation{}, &AppVersion{}, &Department{}, &BlackList{}, &IpLimit{}, &UserIpLimit{}, &Invitation{}, &RegisterAddFriend{}, - &ClientInitConfig{}) + &ClientInitConfig{}, &UserIpRecord{}) db.Set("gorm:table_options", "CHARSET=utf8") db.Set("gorm:table_options", "collation=utf8_unicode_ci") @@ -181,6 +181,11 @@ func initMysqlDB() { db.Migrator().CreateTable(&ClientInitConfig{}) } + if !db.Migrator().HasTable(&UserIpRecord{}) { + fmt.Println("CreateTable Friend") + db.Migrator().CreateTable(&UserIpRecord{}) + } + DB.MysqlDB.db = db return } diff --git a/pkg/common/db/mysql_model/im_mysql_model/ip_model.go b/pkg/common/db/mysql_model/im_mysql_model/ip_model.go index 4e1ae6f4a..d9ff1bfcb 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/ip_model.go +++ b/pkg/common/db/mysql_model/im_mysql_model/ip_model.go @@ -2,7 +2,10 @@ package im_mysql_model import ( "Open_IM/pkg/common/db" + "Open_IM/pkg/utils" "time" + + "gorm.io/gorm" ) func IsLimitRegisterIp(RegisterIp string) (bool, error) { @@ -83,12 +86,12 @@ func InsertIpRecord(userID, createIp string) error { func UpdateIpReocord(userID, ip string) (err error) { record := &db.UserIpRecord{UserID: userID, LastLoginIp: ip, LastLoginTime: time.Now()} - result := db.DB.MysqlDB.DefaultGormDB().Model(&db.UserIpRecord{}).Where("user_id=?", userID).Updates(record).Updates("login_times = login_times + 1") + result := db.DB.MysqlDB.DefaultGormDB().Model(&db.UserIpRecord{}).Where("user_id=?", userID).Updates(record).Update("login_times", gorm.Expr("login_times+?", 1)) if result.Error != nil { - return result.Error + return utils.Wrap(result.Error, "") } if result.RowsAffected == 0 { err = InsertIpRecord(userID, ip) } - return err + return utils.Wrap(err, "") } diff --git a/pkg/common/kafka/consumer.go b/pkg/common/kafka/consumer.go index 771145906..dd3182fad 100644 --- a/pkg/common/kafka/consumer.go +++ b/pkg/common/kafka/consumer.go @@ -1,8 +1,10 @@ package kafka import ( - "github.com/Shopify/sarama" + "Open_IM/pkg/common/config" "sync" + + "github.com/Shopify/sarama" ) type Consumer struct { @@ -17,8 +19,13 @@ func NewKafkaConsumer(addr []string, topic string) *Consumer { p := Consumer{} p.Topic = topic p.addr = addr - - consumer, err := sarama.NewConsumer(p.addr, nil) + consumerConfig := sarama.NewConfig() + if config.Config.Kafka.UserName != "" && config.Config.Kafka.Password != "" { + consumerConfig.Net.SASL.Enable = true + consumerConfig.Net.SASL.User = config.Config.Kafka.UserName + consumerConfig.Net.SASL.Password = config.Config.Kafka.Password + } + consumer, err := sarama.NewConsumer(p.addr, consumerConfig) if err != nil { panic(err.Error()) return nil diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index 15c08ac04..23dfc3085 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -1,6 +1,7 @@ package kafka import ( + "Open_IM/pkg/common/config" log "Open_IM/pkg/common/log" "Open_IM/pkg/utils" "errors" @@ -25,7 +26,11 @@ func NewKafkaProducer(addr []string, topic string) *Producer { p.config.Producer.Return.Errors = true p.config.Producer.RequiredAcks = sarama.WaitForAll //Set producer Message Reply level 0 1 all p.config.Producer.Partitioner = sarama.NewHashPartitioner //Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly - + if config.Config.Kafka.UserName != "" && config.Config.Kafka.Password != "" { + p.config.Net.SASL.Enable = true + p.config.Net.SASL.User = config.Config.Kafka.UserName + p.config.Net.SASL.Password = config.Config.Kafka.Password + } p.addr = addr p.topic = topic diff --git a/pkg/common/prometheus/gather.go b/pkg/common/prometheus/gather.go index 1d6b71ebb..3dd7d05a2 100644 --- a/pkg/common/prometheus/gather.go +++ b/pkg/common/prometheus/gather.go @@ -34,6 +34,7 @@ var ( SingleChatMsgRecvSuccessCounter prometheus.Counter GroupChatMsgRecvSuccessCounter prometheus.Counter WorkSuperGroupChatMsgRecvSuccessCounter prometheus.Counter + OnlineUserGauge prometheus.Gauge //msg-msg SingleChatMsgProcessSuccessCounter prometheus.Counter @@ -326,6 +327,16 @@ func NewWorkSuperGroupChatMsgRecvSuccessCounter() { }) } +func NewOnlineUserGauges() { + if OnlineUserGauge != nil { + return + } + OnlineUserGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "online_user_num", + Help: "The number of online user num", + }) +} + func NewSingleChatMsgProcessSuccessCounter() { if SingleChatMsgProcessSuccessCounter != nil { return diff --git a/pkg/common/prometheus/prometheus.go b/pkg/common/prometheus/prometheus.go index e8bc87b47..7c497a78c 100644 --- a/pkg/common/prometheus/prometheus.go +++ b/pkg/common/prometheus/prometheus.go @@ -64,3 +64,19 @@ func PromeAdd(counter prometheus.Counter, add int) { } } } + +func PromeGaugeInc(gauges prometheus.Gauge) { + if config.Config.Prometheus.Enable { + if gauges != nil { + gauges.Inc() + } + } +} + +func PromeGaugeDec(gauges prometheus.Gauge) { + if config.Config.Prometheus.Enable { + if gauges != nil { + gauges.Dec() + } + } +} diff --git a/pkg/grpc-etcdv3/getcdv3/resolver.go b/pkg/grpc-etcdv3/getcdv3/resolver.go index db280e040..8f6a383d6 100644 --- a/pkg/grpc-etcdv3/getcdv3/resolver.go +++ b/pkg/grpc-etcdv3/getcdv3/resolver.go @@ -39,6 +39,8 @@ var ( func NewResolver(schema, etcdAddr, serviceName string, operationID string) (*Resolver, error) { etcdCli, err := clientv3.New(clientv3.Config{ Endpoints: strings.Split(etcdAddr, ","), + Username: config.Config.Etcd.UserName, + Password: config.Config.Etcd.Password, }) if err != nil { log.Error(operationID, "etcd client v3 failed")