diff --git a/config/openim-api.yml b/config/openim-api.yml index 4c38e1005..a23b5fb31 100644 --- a/config/openim-api.yml +++ b/config/openim-api.yml @@ -10,7 +10,10 @@ api: prometheus: # Whether to enable prometheus enable: true + # autoSetPorts indicates whether to automatically set the ports + autoSetPorts: true # Prometheus listening ports, must match the number of api.ports + # It will only take effect when autoSetPorts is set to false. ports: [ 12002 ] # This address can be accessed via a browser grafanaURL: http://127.0.0.1:13000/ diff --git a/config/openim-msggateway.yml b/config/openim-msggateway.yml index 696bc3d06..b7d6d9847 100644 --- a/config/openim-msggateway.yml +++ b/config/openim-msggateway.yml @@ -12,6 +12,7 @@ prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup + # It will only take effect when autoSetPorts is set to false. ports: [ 12140, 12141, 12142, 12143, 12144, 12145, 12146, 12147, 12148, 12149, 12150, 12151, 12152, 12153, 12154, 12155 ] # IP address that the RPC/WebSocket service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP diff --git a/config/openim-msgtransfer.yml b/config/openim-msgtransfer.yml index 94ed073d8..39b23b222 100644 --- a/config/openim-msgtransfer.yml +++ b/config/openim-msgtransfer.yml @@ -1,6 +1,8 @@ prometheus: # Enable or disable Prometheus monitoring enable: true + # autoSetPorts indicates whether to automatically set the ports + autoSetPorts: true # List of ports that Prometheus listens on; each port corresponds to an instance of monitoring. Ensure these are managed accordingly - # Because four instances have been launched, four ports need to be specified + # It will only take effect when autoSetPorts is set to false. ports: [ 12020, 12021, 12022, 12023, 12024, 12025, 12026, 12027, 12028, 12029, 12030, 12031, 12032, 12033, 12034, 12035 ] diff --git a/config/openim-push.yml b/config/openim-push.yml index a3fe0c765..dcc8f1aec 100644 --- a/config/openim-push.yml +++ b/config/openim-push.yml @@ -15,6 +15,7 @@ prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup + # It will only take effect when autoSetPorts is set to false. ports: [ 12170, 12171, 12172, 12173, 12174, 12175, 12176, 12177, 12178, 12179, 12180, 12182, 12183, 12184, 12185, 12186 ] maxConcurrentWorkers: 3 diff --git a/config/openim-rpc-auth.yml b/config/openim-rpc-auth.yml index a386de097..5d6d85b2f 100644 --- a/config/openim-rpc-auth.yml +++ b/config/openim-rpc-auth.yml @@ -14,6 +14,7 @@ prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup + # It will only take effect when autoSetPorts is set to false. ports: [ 12200 ] tokenPolicy: diff --git a/config/openim-rpc-conversation.yml b/config/openim-rpc-conversation.yml index c17b5709f..eaedfe21f 100644 --- a/config/openim-rpc-conversation.yml +++ b/config/openim-rpc-conversation.yml @@ -14,4 +14,5 @@ prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup + # It will only take effect when autoSetPorts is set to false. ports: [ 12220 ] diff --git a/config/openim-rpc-friend.yml b/config/openim-rpc-friend.yml index 5b51981d2..920c4860b 100644 --- a/config/openim-rpc-friend.yml +++ b/config/openim-rpc-friend.yml @@ -14,4 +14,5 @@ prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup + # It will only take effect when autoSetPorts is set to false. ports: [ 12240 ] diff --git a/config/openim-rpc-group.yml b/config/openim-rpc-group.yml index 06fb489d2..c48065cca 100644 --- a/config/openim-rpc-group.yml +++ b/config/openim-rpc-group.yml @@ -14,6 +14,7 @@ prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup + # It will only take effect when autoSetPorts is set to false. ports: [ 12260 ] diff --git a/config/openim-rpc-msg.yml b/config/openim-rpc-msg.yml index 5a4beae5c..d07854622 100644 --- a/config/openim-rpc-msg.yml +++ b/config/openim-rpc-msg.yml @@ -14,6 +14,7 @@ prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup + # It will only take effect when autoSetPorts is set to false. ports: [ 12280 ] diff --git a/config/openim-rpc-third.yml b/config/openim-rpc-third.yml index e24f93883..95a50fac4 100644 --- a/config/openim-rpc-third.yml +++ b/config/openim-rpc-third.yml @@ -14,6 +14,7 @@ prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup + # It will only take effect when autoSetPorts is set to false. ports: [ 12300 ] diff --git a/config/openim-rpc-user.yml b/config/openim-rpc-user.yml index 02b6d45b1..3a1335895 100644 --- a/config/openim-rpc-user.yml +++ b/config/openim-rpc-user.yml @@ -14,4 +14,5 @@ prometheus: # Whether to enable prometheus enable: true # Prometheus listening ports, must be consistent with the number of rpc.ports + # It will only take effect when autoSetPorts is set to false. ports: [ 12320 ] diff --git a/config/prometheus.yml b/config/prometheus.yml index ab427ee82..6fb112824 100644 --- a/config/prometheus.yml +++ b/config/prometheus.yml @@ -26,61 +26,94 @@ scrape_configs: - job_name: node_exporter static_configs: - targets: [ internal_ip:20500 ] + - job_name: openimserver-openim-api - static_configs: - - targets: [ internal_ip:12002 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/api" +# static_configs: +# - targets: [ internal_ip:12002 ] +# labels: +# namespace: default + - job_name: openimserver-openim-msggateway - static_configs: - - targets: [ internal_ip:12140 ] -# - targets: [ internal_ip:12140, internal_ip:12141, internal_ip:12142, internal_ip:12143, internal_ip:12144, internal_ip:12145, internal_ip:12146, internal_ip:12147, internal_ip:12148, internal_ip:12149, internal_ip:12150, internal_ip:12151, internal_ip:12152, internal_ip:12153, internal_ip:12154, internal_ip:12155 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/msg_gateway" +# static_configs: +# - targets: [ internal_ip:12140 ] +# # - targets: [ internal_ip:12140, internal_ip:12141, internal_ip:12142, internal_ip:12143, internal_ip:12144, internal_ip:12145, internal_ip:12146, internal_ip:12147, internal_ip:12148, internal_ip:12149, internal_ip:12150, internal_ip:12151, internal_ip:12152, internal_ip:12153, internal_ip:12154, internal_ip:12155 ] +# labels: +# namespace: default + - job_name: openimserver-openim-msgtransfer - static_configs: - - targets: [ internal_ip:12020, internal_ip:12021, internal_ip:12022, internal_ip:12023, internal_ip:12024, internal_ip:12025, internal_ip:12026, internal_ip:12027 ] -# - targets: [ internal_ip:12020, internal_ip:12021, internal_ip:12022, internal_ip:12023, internal_ip:12024, internal_ip:12025, internal_ip:12026, internal_ip:12027, internal_ip:12028, internal_ip:12029, internal_ip:12030, internal_ip:12031, internal_ip:12032, internal_ip:12033, internal_ip:12034, internal_ip:12035 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/msg_transfer" +# static_configs: +# - targets: [ internal_ip:12020, internal_ip:12021, internal_ip:12022, internal_ip:12023, internal_ip:12024, internal_ip:12025, internal_ip:12026, internal_ip:12027 ] +# # - targets: [ internal_ip:12020, internal_ip:12021, internal_ip:12022, internal_ip:12023, internal_ip:12024, internal_ip:12025, internal_ip:12026, internal_ip:12027, internal_ip:12028, internal_ip:12029, internal_ip:12030, internal_ip:12031, internal_ip:12032, internal_ip:12033, internal_ip:12034, internal_ip:12035 ] +# labels: +# namespace: default + - job_name: openimserver-openim-push - static_configs: - - targets: [ internal_ip:12170, internal_ip:12171, internal_ip:12172, internal_ip:12173, internal_ip:12174, internal_ip:12175, internal_ip:12176, internal_ip:12177 ] -# - targets: [ internal_ip:12170, internal_ip:12171, internal_ip:12172, internal_ip:12173, internal_ip:12174, internal_ip:12175, internal_ip:12176, internal_ip:12177, internal_ip:12178, internal_ip:12179, internal_ip:12180, internal_ip:12182, internal_ip:12183, internal_ip:12184, internal_ip:12185, internal_ip:12186 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/push" +# static_configs: +# - targets: [ internal_ip:12170, internal_ip:12171, internal_ip:12172, internal_ip:12173, internal_ip:12174, internal_ip:12175, internal_ip:12176, internal_ip:12177 ] +## - targets: [ internal_ip:12170, internal_ip:12171, internal_ip:12172, internal_ip:12173, internal_ip:12174, internal_ip:12175, internal_ip:12176, internal_ip:12177, internal_ip:12178, internal_ip:12179, internal_ip:12180, internal_ip:12182, internal_ip:12183, internal_ip:12184, internal_ip:12185, internal_ip:12186 ] +# labels: +# namespace: default + - job_name: openimserver-openim-rpc-auth - static_configs: - - targets: [ internal_ip:12200 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/auth" +# static_configs: +# - targets: [ internal_ip:12200 ] +# labels: +# namespace: default + - job_name: openimserver-openim-rpc-conversation - static_configs: - - targets: [ internal_ip:12220 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/conversation" +# static_configs: +# - targets: [ internal_ip:12220 ] +# labels: +# namespace: default + - job_name: openimserver-openim-rpc-friend - static_configs: - - targets: [ internal_ip:12240 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/friend" +# static_configs: +# - targets: [ internal_ip:12240 ] +# labels: +# namespace: default + - job_name: openimserver-openim-rpc-group - static_configs: - - targets: [ internal_ip:12260 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/group" +# static_configs: +# - targets: [ internal_ip:12260 ] +# labels: +# namespace: default. + - job_name: openimserver-openim-rpc-msg - static_configs: - - targets: [ internal_ip:12280 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/msg" +# static_configs: +# - targets: [ internal_ip:12280 ] +# labels: +# namespace: default + - job_name: openimserver-openim-rpc-third - static_configs: - - targets: [ internal_ip:12300 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/third" +# static_configs: +# - targets: [ internal_ip:12300 ] +# labels: +# namespace: default + - job_name: openimserver-openim-rpc-user - static_configs: - - targets: [ internal_ip:12320 ] - labels: - namespace: default \ No newline at end of file + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/user" +# static_configs: +# - targets: [ internal_ip:12320 ] +# labels: +# namespace: default \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 8d25383bc..57e654208 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -146,49 +146,49 @@ services: networks: - openim -# prometheus: -# image: ${PROMETHEUS_IMAGE} -# container_name: prometheus -# restart: always -# user: root -# volumes: -# - ./config/prometheus.yml:/etc/prometheus/prometheus.yml -# - ./config/instance-down-rules.yml:/etc/prometheus/instance-down-rules.yml -# - ${DATA_DIR}/components/prometheus/data:/prometheus -# command: -# - '--config.file=/etc/prometheus/prometheus.yml' -# - '--storage.tsdb.path=/prometheus' -# ports: -# - "19091:9090" -# networks: -# - openim -# -# alertmanager: -# image: ${ALERTMANAGER_IMAGE} -# container_name: alertmanager -# restart: always -# volumes: -# - ./config/alertmanager.yml:/etc/alertmanager/alertmanager.yml -# - ./config/email.tmpl:/etc/alertmanager/email.tmpl -# ports: -# - "19093:9093" -# networks: -# - openim -# -# grafana: -# image: ${GRAFANA_IMAGE} -# container_name: grafana -# user: root -# restart: always -# environment: -# - GF_SECURITY_ALLOW_EMBEDDING=true -# - GF_SESSION_COOKIE_SAMESITE=none -# - GF_SESSION_COOKIE_SECURE=true -# - GF_AUTH_ANONYMOUS_ENABLED=true -# - GF_AUTH_ANONYMOUS_ORG_ROLE=Admin -# ports: -# - "13000:3000" -# volumes: -# - ${DATA_DIR:-./}/components/grafana:/var/lib/grafana -# networks: -# - openim + prometheus: + image: ${PROMETHEUS_IMAGE} + container_name: prometheus + restart: always + user: root + volumes: + - ./config/prometheus.yml:/etc/prometheus/prometheus.yml + - ./config/instance-down-rules.yml:/etc/prometheus/instance-down-rules.yml + - ${DATA_DIR}/components/prometheus/data:/prometheus + command: + - '--config.file=/etc/prometheus/prometheus.yml' + - '--storage.tsdb.path=/prometheus' + ports: + - "19091:9090" + networks: + - openim + + alertmanager: + image: ${ALERTMANAGER_IMAGE} + container_name: alertmanager + restart: always + volumes: + - ./config/alertmanager.yml:/etc/alertmanager/alertmanager.yml + - ./config/email.tmpl:/etc/alertmanager/email.tmpl + ports: + - "19093:9093" + networks: + - openim + + grafana: + image: ${GRAFANA_IMAGE} + container_name: grafana + user: root + restart: always + environment: + - GF_SECURITY_ALLOW_EMBEDDING=true + - GF_SESSION_COOKIE_SAMESITE=none + - GF_SESSION_COOKIE_SECURE=true + - GF_AUTH_ANONYMOUS_ENABLED=true + - GF_AUTH_ANONYMOUS_ORG_ROLE=Admin + ports: + - "13000:3000" + volumes: + - ${DATA_DIR:-./}/components/grafana:/var/lib/grafana + networks: + - openim diff --git a/go.mod b/go.mod index 3398349be..123e1ff91 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.61 + github.com/openimsdk/protocol v0.0.72-alpha.63 github.com/openimsdk/tools v0.0.50-alpha.51 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 @@ -42,6 +42,7 @@ require ( github.com/robfig/cron/v3 v3.0.1 github.com/shirou/gopsutil v3.21.11+incompatible github.com/spf13/viper v1.18.2 + go.etcd.io/etcd/client/v3 v3.5.13 go.uber.org/automaxprocs v1.5.3 golang.org/x/exp v0.0.0-20230905200255-921286631fa9 golang.org/x/sync v0.8.0 @@ -179,7 +180,6 @@ require ( github.com/yusufpapurcu/wmi v1.2.4 // indirect go.etcd.io/etcd/api/v3 v3.5.13 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.13 // indirect - go.etcd.io/etcd/client/v3 v3.5.13 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect diff --git a/go.sum b/go.sum index 2f1dfcfda..dd61defdc 100644 --- a/go.sum +++ b/go.sum @@ -347,10 +347,22 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM= github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= +<<<<<<< HEAD github.com/openimsdk/protocol v0.0.72-alpha.61 h1:RuZR9/Sg3p6Bpb2CKPjPoA2AUmTvHITmhZ3PT/RbWMs= github.com/openimsdk/protocol v0.0.72-alpha.61/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M= github.com/openimsdk/tools v0.0.50-alpha.51 h1:M3dMUoHjggx5Ry6XSkK0FTSJmRQjjkSBpuzXiFzKtC4= github.com/openimsdk/tools v0.0.50-alpha.51/go.mod h1:muCtxguNJv8lFwLei27UASu2Nvg4ERSeN0R4K5tivk0= +||||||| 54be83776 +github.com/openimsdk/protocol v0.0.72-alpha.61 h1:RuZR9/Sg3p6Bpb2CKPjPoA2AUmTvHITmhZ3PT/RbWMs= +github.com/openimsdk/protocol v0.0.72-alpha.61/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M= +github.com/openimsdk/tools v0.0.50-alpha.46 h1:j3HxPxhDptVHwr7eChL2rCH8mKfpUEcr4nHi5k4yDME= +github.com/openimsdk/tools v0.0.50-alpha.46/go.mod h1:muCtxguNJv8lFwLei27UASu2Nvg4ERSeN0R4K5tivk0= +======= +github.com/openimsdk/protocol v0.0.72-alpha.63 h1:IyPBibEvwBtTmD8DSrlqcekfEXe74k4+KeeHsgdhGh0= +github.com/openimsdk/protocol v0.0.72-alpha.63/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M= +github.com/openimsdk/tools v0.0.50-alpha.50 h1:+naDlvHcqJDj2NsCGnQd1LLQOET5IRPbrtmWbM/o7JQ= +github.com/openimsdk/tools v0.0.50-alpha.50/go.mod h1:muCtxguNJv8lFwLei27UASu2Nvg4ERSeN0R4K5tivk0= +>>>>>>> 0435915df1702611864caf224139a577ce4df9d0 github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/api/init.go b/internal/api/init.go index ea7b6133f..8fb6baff5 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -16,6 +16,7 @@ package api import ( "context" + "errors" "fmt" "net" "net/http" @@ -26,16 +27,17 @@ import ( "time" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/tools/utils/datautil" - "github.com/openimsdk/tools/utils/network" - "github.com/openimsdk/tools/utils/runtimeenv" - kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/tools/discovery" + "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/system/program" + "github.com/openimsdk/tools/utils/datautil" + "github.com/openimsdk/tools/utils/jsonutil" + "github.com/openimsdk/tools/utils/network" + "github.com/openimsdk/tools/utils/runtimeenv" ) type Config struct { @@ -68,16 +70,57 @@ func Start(ctx context.Context, index int, config *Config) error { prometheusPort int ) - router := newGinRouter(client, config) + registerIP, err := network.GetRpcRegisterIP("") + if err != nil { + return err + } + + getAutoPort := func() (net.Listener, int, error) { + registerAddr := net.JoinHostPort(registerIP, "0") + listener, err := net.Listen("tcp", registerAddr) + if err != nil { + return nil, 0, errs.WrapMsg(err, "listen err", "registerAddr", registerAddr) + } + _, portStr, _ := net.SplitHostPort(listener.Addr().String()) + port, _ := strconv.Atoi(portStr) + return listener, port, nil + } + + if config.API.Prometheus.AutoSetPorts && config.Discovery.Enable != kdisc.Etcd { + return errs.New("only etcd support autoSetPorts", "RegisterName", "api").Wrap() + } + + router := newGinRouter(client, config, client) if config.API.Prometheus.Enable { - go func() { + var ( + listener net.Listener + ) + + if config.API.Prometheus.AutoSetPorts { + listener, prometheusPort, err = getAutoPort() + if err != nil { + return err + } + + etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + + _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) + if err != nil { + return errs.WrapMsg(err, "etcd put err") + } + } else { prometheusPort, err = datautil.GetElemByIndex(config.API.Prometheus.Ports, index) if err != nil { - netErr = err - netDone <- struct{}{} - return + return err + } + listener, err = net.Listen("tcp", fmt.Sprintf(":%d", prometheusPort)) + if err != nil { + return errs.WrapMsg(err, "listen err", "addr", fmt.Sprintf(":%d", prometheusPort)) } - if err := prommetrics.ApiInit(prometheusPort); err != nil && err != http.ErrServerClosed { + } + + go func() { + if err := prommetrics.ApiInit(listener); err != nil && !errors.Is(err, http.ErrServerClosed) { netErr = errs.WrapMsg(err, fmt.Sprintf("api prometheus start err: %d", prometheusPort)) netDone <- struct{}{} } @@ -90,7 +133,7 @@ func Start(ctx context.Context, index int, config *Config) error { log.CInfo(ctx, "API server is initializing", "runtimeEnv", config.RuntimeEnv, "address", address, "apiPort", apiPort, "prometheusPort", prometheusPort) go func() { err = server.ListenAndServe() - if err != nil && err != http.ErrServerClosed { + if err != nil && !errors.Is(err, http.ErrServerClosed) { netErr = errs.WrapMsg(err, fmt.Sprintf("api start err: %s", server.Addr)) netDone <- struct{}{} diff --git a/internal/api/msg.go b/internal/api/msg.go index ce94b5f4f..9f79067a8 100644 --- a/internal/api/msg.go +++ b/internal/api/msg.go @@ -173,6 +173,8 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM data = apistruct.AtElem{} case constant.Custom: data = apistruct.CustomElem{} + case constant.Quote: + data = apistruct.QuoteElem{} case constant.Stream: data = apistruct.StreamMsgElem{} case constant.OANotification: diff --git a/internal/api/prometheus_discovery.go b/internal/api/prometheus_discovery.go new file mode 100644 index 000000000..6e33274be --- /dev/null +++ b/internal/api/prometheus_discovery.go @@ -0,0 +1,113 @@ +package api + +import ( + "encoding/json" + "github.com/gin-gonic/gin" + "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" + "github.com/openimsdk/tools/apiresp" + "github.com/openimsdk/tools/discovery" + "github.com/openimsdk/tools/discovery/etcd" + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/log" + clientv3 "go.etcd.io/etcd/client/v3" + "net/http" +) + +type PrometheusDiscoveryApi struct { + config *Config + client *clientv3.Client +} + +func NewPrometheusDiscoveryApi(config *Config, client discovery.SvcDiscoveryRegistry) *PrometheusDiscoveryApi { + api := &PrometheusDiscoveryApi{ + config: config, + } + if config.Discovery.Enable == discoveryregister.Etcd { + api.client = client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + } + return api +} + +func (p *PrometheusDiscoveryApi) Enable(c *gin.Context) { + if p.config.Discovery.Enable != discoveryregister.Etcd { + c.JSON(http.StatusOK, []struct{}{}) + c.Abort() + } +} + +func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) { + eResp, err := p.client.Get(c, prommetrics.BuildDiscoveryKey(key)) + if err != nil { + // Log and respond with an error if preparation fails. + apiresp.GinError(c, errs.WrapMsg(err, "etcd get err")) + return + } + if len(eResp.Kvs) == 0 { + c.JSON(http.StatusOK, []*prommetrics.Target{}) + } + + var ( + resp = &prommetrics.RespTarget{ + Targets: make([]string, 0, len(eResp.Kvs)), + } + ) + + for i := range eResp.Kvs { + var target prommetrics.Target + err = json.Unmarshal(eResp.Kvs[i].Value, &target) + if err != nil { + log.ZError(c, "prometheus unmarshal err", errs.Wrap(err)) + } + resp.Targets = append(resp.Targets, target.Target) + if resp.Labels == nil { + resp.Labels = target.Labels + } + } + + c.JSON(200, []*prommetrics.RespTarget{resp}) +} + +func (p *PrometheusDiscoveryApi) Api(c *gin.Context) { + p.discovery(c, prommetrics.APIKeyName) +} + +func (p *PrometheusDiscoveryApi) User(c *gin.Context) { + p.discovery(c, p.config.Discovery.RpcService.User) +} + +func (p *PrometheusDiscoveryApi) Group(c *gin.Context) { + p.discovery(c, p.config.Discovery.RpcService.Group) +} + +func (p *PrometheusDiscoveryApi) Msg(c *gin.Context) { + p.discovery(c, p.config.Discovery.RpcService.Msg) +} + +func (p *PrometheusDiscoveryApi) Friend(c *gin.Context) { + p.discovery(c, p.config.Discovery.RpcService.Friend) +} + +func (p *PrometheusDiscoveryApi) Conversation(c *gin.Context) { + p.discovery(c, p.config.Discovery.RpcService.Conversation) +} + +func (p *PrometheusDiscoveryApi) Third(c *gin.Context) { + p.discovery(c, p.config.Discovery.RpcService.Third) +} + +func (p *PrometheusDiscoveryApi) Auth(c *gin.Context) { + p.discovery(c, p.config.Discovery.RpcService.Auth) +} + +func (p *PrometheusDiscoveryApi) Push(c *gin.Context) { + p.discovery(c, p.config.Discovery.RpcService.Push) +} + +func (p *PrometheusDiscoveryApi) MessageGateway(c *gin.Context) { + p.discovery(c, p.config.Discovery.RpcService.MessageGateway) +} + +func (p *PrometheusDiscoveryApi) MessageTransfer(c *gin.Context) { + p.discovery(c, prommetrics.MessageTransferKeyName) +} diff --git a/internal/api/router.go b/internal/api/router.go index 52d26bdc5..6714d645c 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -48,7 +48,7 @@ func prommetricsGin() gin.HandlerFunc { } } -func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.Engine { +func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config, client discovery.SvcDiscoveryRegistry) *gin.Engine { disCov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) gin.SetMode(gin.ReleaseMode) @@ -78,6 +78,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En u := NewUserApi(*userRpc) m := NewMessageApi(messageRpc, userRpc, config.Share.IMAdminUserID) j := jssdk.NewJSSdkApi(userRpc.Client, friendRpc.Client, groupRpc.Client, messageRpc.Client, conversationRpc.Client) + pd := NewPrometheusDiscoveryApi(config, client) userRouterGroup := r.Group("/user") { userRouterGroup.POST("/user_register", u.UserRegister) @@ -254,6 +255,19 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En jssdk.POST("/get_conversations", j.GetConversations) jssdk.POST("/get_active_conversations", j.GetActiveConversations) + proDiscoveryGroup := r.Group("/prometheus_discovery", pd.Enable) + proDiscoveryGroup.GET("/api", pd.Api) + proDiscoveryGroup.GET("/user", pd.User) + proDiscoveryGroup.GET("/group", pd.Group) + proDiscoveryGroup.GET("/msg", pd.Msg) + proDiscoveryGroup.GET("/friend", pd.Friend) + proDiscoveryGroup.GET("/conversation", pd.Conversation) + proDiscoveryGroup.GET("/third", pd.Third) + proDiscoveryGroup.GET("/auth", pd.Auth) + proDiscoveryGroup.GET("/push", pd.Push) + proDiscoveryGroup.GET("/msg_gateway", pd.MessageGateway) + proDiscoveryGroup.GET("/msg_transfer", pd.MessageTransfer) + return r } diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index 0da7d7220..7e1ba6405 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -18,7 +18,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/openimsdk/tools/mw" "runtime/debug" "sync" "sync/atomic" @@ -378,7 +377,7 @@ func (c *Client) activeHeartbeat(ctx context.Context) { go func() { defer func() { if r := recover(); r != nil { - mw.PanicStackToLog(ctx, r) + log.ZPanic(ctx, "activeHeartbeat Panic", r) } }() log.ZDebug(ctx, "server initiative send heartbeat start.") diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 7603147de..6654e6598 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -41,7 +41,7 @@ type Config struct { func Start(ctx context.Context, index int, conf *Config) error { conf.RuntimeEnv = runtimeenv.PrintRuntimeEnvironment() - log.CInfo(ctx, "MSG-GATEWAY server is initializing", "runtimeEnv", conf.RuntimeEnv, "autoSetPorts", conf.MsgGateway.RPC.AutoSetPorts, + log.CInfo(ctx, "MSG-GATEWAY server is initializing", "runtimeEnv", conf.RuntimeEnv, "rpcPorts", conf.MsgGateway.RPC.Ports, "wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports) wsPort, err := datautil.GetElemByIndex(conf.MsgGateway.LongConnSvr.Ports, index) diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 6131d8c77..5be5e107d 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -18,11 +18,17 @@ import ( "context" "errors" "fmt" + "net" "net/http" "os" "os/signal" + "strconv" "syscall" + "github.com/openimsdk/tools/discovery/etcd" + "github.com/openimsdk/tools/utils/jsonutil" + "github.com/openimsdk/tools/utils/network" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" @@ -33,6 +39,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" discRegister "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" + kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/tools/errs" @@ -140,21 +147,67 @@ func (m *MsgTransfer) Start(index int, config *Config) error { return err } + client, err := kdisc.NewDiscoveryRegister(&config.Discovery, m.runTimeEnv) + if err != nil { + return errs.WrapMsg(err, "failed to register discovery service") + } + + registerIP, err := network.GetRpcRegisterIP("") + if err != nil { + return err + } + + getAutoPort := func() (net.Listener, int, error) { + registerAddr := net.JoinHostPort(registerIP, "0") + listener, err := net.Listen("tcp", registerAddr) + if err != nil { + return nil, 0, errs.WrapMsg(err, "listen err", "registerAddr", registerAddr) + } + _, portStr, _ := net.SplitHostPort(listener.Addr().String()) + port, _ := strconv.Atoi(portStr) + return listener, port, nil + } + + if config.MsgTransfer.Prometheus.AutoSetPorts && config.Discovery.Enable != kdisc.Etcd { + return errs.New("only etcd support autoSetPorts", "RegisterName", "api").Wrap() + } + if config.MsgTransfer.Prometheus.Enable { + var ( + listener net.Listener + prometheusPort int + ) + + if config.MsgTransfer.Prometheus.AutoSetPorts { + listener, prometheusPort, err = getAutoPort() + if err != nil { + return err + } + + etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + + _, err = etcdClient.Put(context.TODO(), prommetrics.BuildDiscoveryKey(prommetrics.MessageTransferKeyName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) + if err != nil { + return errs.WrapMsg(err, "etcd put err") + } + } else { + prometheusPort, err = datautil.GetElemByIndex(config.MsgTransfer.Prometheus.Ports, index) + if err != nil { + return err + } + listener, err = net.Listen("tcp", fmt.Sprintf(":%d", prometheusPort)) + if err != nil { + return errs.WrapMsg(err, "listen err", "addr", fmt.Sprintf(":%d", prometheusPort)) + } + } + go func() { defer func() { if r := recover(); r != nil { - mw.PanicStackToLog(m.ctx, r) + log.ZPanic(m.ctx, "MsgTransfer Start Panic", r) } }() - prometheusPort, err := datautil.GetElemByIndex(config.MsgTransfer.Prometheus.Ports, index) - if err != nil { - netErr = err - netDone <- struct{}{} - return - } - - if err := prommetrics.TransferInit(prometheusPort); err != nil && !errors.Is(err, http.ErrServerClosed) { + if err := prommetrics.TransferInit(listener); err != nil && !errors.Is(err, http.ErrServerClosed) { netErr = errs.WrapMsg(err, "prometheus start error", "prometheusPort", prometheusPort) netDone <- struct{}{} } diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 4a5d5ba89..0104f6633 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -18,13 +18,13 @@ import ( "context" "encoding/json" "errors" - "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" - "github.com/openimsdk/tools/mw" "strconv" "strings" "sync" "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" + "github.com/IBM/sarama" "github.com/go-redis/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -349,7 +349,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con func (och *OnlineHistoryRedisConsumerHandler) HandleUserHasReadSeqMessages(ctx context.Context) { defer func() { if r := recover(); r != nil { - mw.PanicStackToLog(ctx, r) + log.ZPanic(ctx, "HandleUserHasReadSeqMessages Panic", r) } }() diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index 2cbbcd1fc..034d549ec 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -16,7 +16,6 @@ package msg import ( "context" - "github.com/openimsdk/tools/mw" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" @@ -89,7 +88,7 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa defer func() { if r := recover(); r != nil { - mw.PanicStackToLog(nctx, r) + log.ZPanic(nctx, "setConversationAtInfo Panic", r) } }() diff --git a/internal/rpc/msg/sync_msg.go b/internal/rpc/msg/sync_msg.go index 2f7788167..13e3cfd33 100644 --- a/internal/rpc/msg/sync_msg.go +++ b/internal/rpc/msg/sync_msg.go @@ -92,11 +92,13 @@ func (m *msgServer) GetSeqMessage(ctx context.Context, req *msg.GetSeqMessageReq NotificationMsgs: make(map[string]*sdkws.PullMsgs), } for _, conv := range req.Conversations { - _, _, msgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, req.UserID, conv.ConversationID, conv.Seqs) + isEnd, endSeq, msgs, err := m.MsgDatabase.GetMessagesBySeqWithBounds(ctx, req.UserID, conv.ConversationID, conv.Seqs, req.GetOrder()) if err != nil { return nil, err } var pullMsgs *sdkws.PullMsgs + pullMsgs.IsEnd = isEnd + pullMsgs.EndSeq = endSeq if ok := false; conversationutil.IsNotificationConversationID(conv.ConversationID) { pullMsgs, ok = resp.NotificationMsgs[conv.ConversationID] if !ok { diff --git a/internal/rpc/third/log.go b/internal/rpc/third/log.go index cb4678b34..4eeb5d558 100644 --- a/internal/rpc/third/log.go +++ b/internal/rpc/third/log.go @@ -50,14 +50,14 @@ func (t *thirdServer) UploadLogs(ctx context.Context, req *third.UploadLogsReq) platform := constant.PlatformID2Name[int(req.Platform)] for _, fileURL := range req.FileURLs { log := relationtb.Log{ - Platform: platform, - UserID: userID, - CreateTime: time.Now(), - Url: fileURL.URL, - FileName: fileURL.Filename, - SystemType: req.AppFramework, - Version: req.Version, - Ex: req.Ex, + Platform: platform, + UserID: userID, + CreateTime: time.Now(), + Url: fileURL.URL, + FileName: fileURL.Filename, + AppFramework: req.AppFramework, + Version: req.Version, + Ex: req.Ex, } for i := 0; i < 20; i++ { id := genLogID() diff --git a/pkg/apistruct/msg.go b/pkg/apistruct/msg.go index dda3ff317..44f157b6d 100644 --- a/pkg/apistruct/msg.go +++ b/pkg/apistruct/msg.go @@ -14,6 +14,8 @@ package apistruct +import "github.com/openimsdk/protocol/sdkws" + type PictureBaseInfo struct { UUID string `mapstructure:"uuid"` Type string `mapstructure:"type" validate:"required"` @@ -90,6 +92,11 @@ type RevokeElem struct { RevokeMsgClientID string `mapstructure:"revokeMsgClientID" validate:"required"` } +type QuoteElem struct { + Text string `json:"text,omitempty"` + QuoteMessage *MsgStruct `json:"quoteMessage,omitempty"` +} + type OANotificationElem struct { NotificationName string `mapstructure:"notificationName" json:"notificationName" validate:"required"` NotificationFaceURL string `mapstructure:"notificationFaceURL" json:"notificationFaceURL"` @@ -103,6 +110,7 @@ type OANotificationElem struct { FileElem *FileElem `mapstructure:"fileElem" json:"fileElem"` Ex string `mapstructure:"ex" json:"ex"` } + type MessageRevoked struct { RevokerID string `mapstructure:"revokerID" json:"revokerID" validate:"required"` RevokerRole int32 `mapstructure:"revokerRole" json:"revokerRole" validate:"required"` @@ -111,3 +119,38 @@ type MessageRevoked struct { SessionType int32 `mapstructure:"sessionType" json:"sessionType" validate:"required"` Seq uint32 `mapstructure:"seq" json:"seq" validate:"required"` } + +type MsgStruct struct { + ClientMsgID string `json:"clientMsgID,omitempty"` + ServerMsgID string `json:"serverMsgID,omitempty"` + CreateTime int64 `json:"createTime"` + SendTime int64 `json:"sendTime"` + SessionType int32 `json:"sessionType"` + SendID string `json:"sendID,omitempty"` + RecvID string `json:"recvID,omitempty"` + MsgFrom int32 `json:"msgFrom"` + ContentType int32 `json:"contentType"` + SenderPlatformID int32 `json:"senderPlatformID"` + SenderNickname string `json:"senderNickname,omitempty"` + SenderFaceURL string `json:"senderFaceUrl,omitempty"` + GroupID string `json:"groupID,omitempty"` + Content string `json:"content,omitempty"` + Seq int64 `json:"seq"` + IsRead bool `json:"isRead"` + Status int32 `json:"status"` + IsReact bool `json:"isReact,omitempty"` + IsExternalExtensions bool `json:"isExternalExtensions,omitempty"` + OfflinePush *sdkws.OfflinePushInfo `json:"offlinePush,omitempty"` + AttachedInfo string `json:"attachedInfo,omitempty"` + Ex string `json:"ex,omitempty"` + LocalEx string `json:"localEx,omitempty"` + TextElem *TextElem `json:"textElem,omitempty"` + PictureElem *PictureElem `json:"pictureElem,omitempty"` + SoundElem *SoundElem `json:"soundElem,omitempty"` + VideoElem *VideoElem `json:"videoElem,omitempty"` + FileElem *FileElem `json:"fileElem,omitempty"` + AtTextElem *AtElem `json:"atTextElem,omitempty"` + LocationElem *LocationElem `json:"locationElem,omitempty"` + CustomElem *CustomElem `json:"customElem,omitempty"` + QuoteElem *QuoteElem `json:"quoteElem,omitempty"` +} diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index b8a60c3df..25748a618 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -18,11 +18,10 @@ import ( "strings" "time" - "github.com/openimsdk/tools/s3/aws" - "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/mq/kafka" + "github.com/openimsdk/tools/s3/aws" "github.com/openimsdk/tools/s3/cos" "github.com/openimsdk/tools/s3/kodo" "github.com/openimsdk/tools/s3/minio" @@ -108,9 +107,10 @@ type API struct { CompressionLevel int `mapstructure:"compressionLevel"` } `mapstructure:"api"` Prometheus struct { - Enable bool `mapstructure:"enable"` - Ports []int `mapstructure:"ports"` - GrafanaURL string `mapstructure:"grafanaURL"` + Enable bool `mapstructure:"enable"` + AutoSetPorts bool `mapstructure:"autoSetPorts"` + Ports []int `mapstructure:"ports"` + GrafanaURL string `mapstructure:"grafanaURL"` } `mapstructure:"prometheus"` } @@ -193,7 +193,11 @@ type MsgGateway struct { } type MsgTransfer struct { - Prometheus Prometheus `mapstructure:"prometheus"` + Prometheus struct { + Enable bool `mapstructure:"enable"` + AutoSetPorts bool `mapstructure:"autoSetPorts"` + Ports []int `mapstructure:"ports"` + } `mapstructure:"prometheus"` } type Push struct { diff --git a/pkg/common/config/load_config_test.go b/pkg/common/config/load_config_test.go index ea11b0574..1552c77d2 100644 --- a/pkg/common/config/load_config_test.go +++ b/pkg/common/config/load_config_test.go @@ -83,3 +83,11 @@ func TestLoadOpenIMThirdConfig(t *testing.T) { // Environment: IMENV_OPENIM_RPC_THIRD_OBJECT_ENABLE=enabled;IMENV_OPENIM_RPC_THIRD_OBJECT_OSS_ENDPOINT=https://oss-cn-chengdu.aliyuncs.com;IMENV_OPENIM_RPC_THIRD_OBJECT_OSS_BUCKET=my_bucket_name;IMENV_OPENIM_RPC_THIRD_OBJECT_OSS_BUCKETURL=https://my_bucket_name.oss-cn-chengdu.aliyuncs.com;IMENV_OPENIM_RPC_THIRD_OBJECT_OSS_ACCESSKEYID=AKID1234567890;IMENV_OPENIM_RPC_THIRD_OBJECT_OSS_ACCESSKEYSECRET=abc123xyz789;IMENV_OPENIM_RPC_THIRD_OBJECT_OSS_SESSIONTOKEN=session_token_value;IMENV_OPENIM_RPC_THIRD_OBJECT_OSS_PUBLICREAD=true } + +func TestTransferConfig(t *testing.T) { + var tran MsgTransfer + err := LoadConfig("../../../config/openim-msgtransfer.yml", "IMENV_OPENIM-MSGTRANSFER", &tran) + assert.Nil(t, err) + assert.Equal(t, true, tran.Prometheus.Enable) + assert.Equal(t, true, tran.Prometheus.AutoSetPorts) +} diff --git a/pkg/common/prommetrics/api.go b/pkg/common/prommetrics/api.go index 95b5c06b6..2dc5cb65d 100644 --- a/pkg/common/prommetrics/api.go +++ b/pkg/common/prommetrics/api.go @@ -3,6 +3,7 @@ package prommetrics import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "net" "strconv" ) @@ -23,14 +24,14 @@ var ( ) ) -func ApiInit(prometheusPort int) error { +func ApiInit(listener net.Listener) error { apiRegistry := prometheus.NewRegistry() cs := append( baseCollector, apiCounter, httpCounter, ) - return Init(apiRegistry, prometheusPort, commonPath, promhttp.HandlerFor(apiRegistry, promhttp.HandlerOpts{}), cs...) + return Init(apiRegistry, listener, commonPath, promhttp.HandlerFor(apiRegistry, promhttp.HandlerOpts{}), cs...) } func APICall(path string, method string, apiCode int) { diff --git a/pkg/common/prommetrics/discovery.go b/pkg/common/prommetrics/discovery.go new file mode 100644 index 000000000..8f03bc2ae --- /dev/null +++ b/pkg/common/prommetrics/discovery.go @@ -0,0 +1,31 @@ +package prommetrics + +import "fmt" + +const ( + APIKeyName = "api" + MessageTransferKeyName = "message-transfer" +) + +type Target struct { + Target string `json:"target"` + Labels map[string]string `json:"labels"` +} + +type RespTarget struct { + Targets []string `json:"targets"` + Labels map[string]string `json:"labels"` +} + +func BuildDiscoveryKey(name string) string { + return fmt.Sprintf("%s/%s/%s", "openim", "prometheus_discovery", name) +} + +func BuildDefaultTarget(host string, ip int) Target { + return Target{ + Target: fmt.Sprintf("%s:%d", host, ip), + Labels: map[string]string{ + "namespace": "default", + }, + } +} diff --git a/pkg/common/prommetrics/prommetrics.go b/pkg/common/prommetrics/prommetrics.go index 02e408d63..2fc5d76b4 100644 --- a/pkg/common/prommetrics/prommetrics.go +++ b/pkg/common/prommetrics/prommetrics.go @@ -15,9 +15,9 @@ package prommetrics import ( - "fmt" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" + "net" "net/http" ) @@ -30,9 +30,9 @@ var ( } ) -func Init(registry *prometheus.Registry, prometheusPort int, path string, handler http.Handler, cs ...prometheus.Collector) error { +func Init(registry *prometheus.Registry, listener net.Listener, path string, handler http.Handler, cs ...prometheus.Collector) error { registry.MustRegister(cs...) srv := http.NewServeMux() srv.Handle(path, handler) - return http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), srv) + return http.Serve(listener, srv) } diff --git a/pkg/common/prommetrics/rpc.go b/pkg/common/prommetrics/rpc.go index 809d509b2..3f115d30b 100644 --- a/pkg/common/prommetrics/rpc.go +++ b/pkg/common/prommetrics/rpc.go @@ -5,6 +5,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "net" "strconv" ) @@ -21,13 +22,13 @@ var ( ) ) -func RpcInit(cs []prometheus.Collector, prometheusPort int) error { +func RpcInit(cs []prometheus.Collector, listener net.Listener) error { reg := prometheus.NewRegistry() cs = append(append( baseCollector, rpcCounter, ), cs...) - return Init(reg, prometheusPort, rpcPath, promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}), cs...) + return Init(reg, listener, rpcPath, promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}), cs...) } func RPCCall(name string, path string, code int) { diff --git a/pkg/common/prommetrics/transfer.go b/pkg/common/prommetrics/transfer.go index f0abb8285..36fe1d568 100644 --- a/pkg/common/prommetrics/transfer.go +++ b/pkg/common/prommetrics/transfer.go @@ -17,6 +17,7 @@ package prommetrics import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "net" ) var ( @@ -42,7 +43,7 @@ var ( }) ) -func TransferInit(prometheusPort int) error { +func TransferInit(listener net.Listener) error { reg := prometheus.NewRegistry() cs := append( baseCollector, @@ -52,5 +53,5 @@ func TransferInit(prometheusPort int) error { MsgInsertMongoFailedCounter, SeqSetFailedCounter, ) - return Init(reg, prometheusPort, commonPath, promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}), cs...) + return Init(reg, listener, commonPath, promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}), cs...) } diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index ae8344923..f75a50541 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -22,15 +22,17 @@ import ( "net/http" "os" "os/signal" + "strconv" "syscall" "time" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/utils/datautil" - "github.com/openimsdk/tools/utils/runtimeenv" + "github.com/openimsdk/tools/utils/jsonutil" "google.golang.org/grpc/status" - "strconv" + "github.com/openimsdk/tools/utils/runtimeenv" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" @@ -49,11 +51,17 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { var ( - rpcTcpAddr string - netDone = make(chan struct{}, 2) - netErr error + rpcTcpAddr string + netDone = make(chan struct{}, 2) + netErr error + prometheusPort int ) + registerIP, err := network.GetRpcRegisterIP(registerIP) + if err != nil { + return err + } + runTimeEnv := runtimeenv.PrintRuntimeEnvironment() if !autoSetPorts { @@ -66,6 +74,27 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo rpcTcpAddr = net.JoinHostPort(network.GetListenIP(listenIP), "0") } + getAutoPort := func() (net.Listener, int, error) { + listener, err := net.Listen("tcp", rpcTcpAddr) + if err != nil { + return nil, 0, errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr) + } + _, portStr, _ := net.SplitHostPort(listener.Addr().String()) + port, _ := strconv.Atoi(portStr) + return listener, port, nil + } + + if autoSetPorts && discovery.Enable != kdisc.Etcd { + return errs.New("only etcd support autoSetPorts", "rpcRegisterName", rpcRegisterName).Wrap() + } + client, err := kdisc.NewDiscoveryRegister(discovery, runTimeEnv) + if err != nil { + return err + } + + defer client.Close() + client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) + // var reg *prometheus.Registry // var metric *grpcprometheus.ServerMetrics if prometheusConfig.Enable { @@ -78,17 +107,40 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo prommetricsUnaryInterceptor(rpcRegisterName), prommetricsStreamInterceptor(rpcRegisterName), ) - prometheusPort, err := datautil.GetElemByIndex(prometheusConfig.Ports, index) - if err != nil { - return err + + var ( + listener net.Listener + ) + + if autoSetPorts { + listener, prometheusPort, err = getAutoPort() + if err != nil { + return err + } + + etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + + _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(rpcRegisterName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) + if err != nil { + return errs.WrapMsg(err, "etcd put err") + } + } else { + prometheusPort, err = datautil.GetElemByIndex(prometheusConfig.Ports, index) + if err != nil { + return err + } + listener, err = net.Listen("tcp", fmt.Sprintf(":%d", prometheusPort)) + if err != nil { + return errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr) + } } cs := prommetrics.GetGrpcCusMetrics(rpcRegisterName, discovery) go func() { - if err := prommetrics.RpcInit(cs, prometheusPort); err != nil && !errors.Is(err, http.ErrServerClosed) { + if err := prommetrics.RpcInit(cs, listener); err != nil && !errors.Is(err, http.ErrServerClosed) { netErr = errs.WrapMsg(err, fmt.Sprintf("rpc %s prometheus start err: %d", rpcRegisterName, prometheusPort)) netDone <- struct{}{} } - // metric.InitializeMetrics(srv) + //metric.InitializeMetrics(srv) // Create a HTTP server for prometheus. // httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)} // if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { @@ -100,30 +152,15 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo options = append(options, mw.GrpcServer()) } - listener, err := net.Listen( - "tcp", - rpcTcpAddr, - ) - if err != nil { - return errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr) - } - - _, portStr, _ := net.SplitHostPort(listener.Addr().String()) - registerIP = network.GetListenIP(registerIP) - port, _ := strconv.Atoi(portStr) - - log.CInfo(ctx, "RPC server is initializing", "runTimeEnv", runTimeEnv, "rpcRegisterName", rpcRegisterName, "rpcPort", portStr, - "prometheusPorts", prometheusConfig.Ports) - - defer listener.Close() - client, err := kdisc.NewDiscoveryRegister(discovery, runTimeEnv) + listener, port, err := getAutoPort() if err != nil { return err } - defer client.Close() - client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) + log.CInfo(ctx, "RPC server is initializing", "rpcRegisterName", rpcRegisterName, "rpcPort", port, + "prometheusPort", prometheusPort) + defer listener.Close() srv := grpc.NewServer(options...) err = rpcFn(ctx, config, client, srv) diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index 464ad7604..8d82d8543 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -24,6 +24,9 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/redis/go-redis/v9" + "go.mongodb.org/mongo-driver/mongo" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" @@ -35,8 +38,6 @@ import ( "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/timeutil" - "github.com/redis/go-redis/v9" - "go.mongodb.org/mongo-driver/mongo" ) const ( @@ -56,6 +57,7 @@ type CommonMsgDatabase interface { GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (minSeq int64, maxSeq int64, seqMsg []*sdkws.MsgData, err error) // DeleteConversationMsgsAndSetMinSeq deletes conversation messages and resets the minimum sequence number. If `remainTime` is 0, all messages are deleted (this method does not delete Redis // cache). + GetMessagesBySeqWithBounds(ctx context.Context, userID string, conversationID string, seqs []int64, pullOrder sdkws.PullOrder) (bool, int64, []*sdkws.MsgData, error) DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error // ClearUserMsgs marks messages for deletion based on clear time and returns a list of sequence numbers for marked messages. ClearUserMsgs(ctx context.Context, userID string, conversationID string, clearTime int64, lastMsgClearTime time.Time) (seqs []int64, err error) @@ -517,6 +519,81 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co return minSeq, maxSeq, successMsgs, nil } +func (db *commonMsgDatabase) GetMessagesBySeqWithBounds(ctx context.Context, userID string, conversationID string, seqs []int64, pullOrder sdkws.PullOrder) (bool, int64, []*sdkws.MsgData, error) { + var endSeq int64 + var isEnd bool + userMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID) + if err != nil { + return false, 0, nil, err + } + minSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID) + if err != nil { + return false, 0, nil, err + } + maxSeq, err := db.seqConversation.GetMaxSeq(ctx, conversationID) + if err != nil { + return false, 0, nil, err + } + userMaxSeq, err := db.seqUser.GetUserMaxSeq(ctx, conversationID, userID) + if err != nil { + return false, 0, nil, err + } + if userMinSeq > minSeq { + minSeq = userMinSeq + } + if userMaxSeq > 0 && userMaxSeq < maxSeq { + maxSeq = userMaxSeq + } + newSeqs := make([]int64, 0, len(seqs)) + for _, seq := range seqs { + if seq <= 0 { + continue + } + // The normal range and can fetch messages + if seq >= minSeq && seq <= maxSeq { + newSeqs = append(newSeqs, seq) + continue + } + // If the requested seq is smaller than the minimum seq and the pull order is descending (pulling older messages) + if seq < minSeq && pullOrder == sdkws.PullOrder_PullOrderDesc { + isEnd = true + endSeq = minSeq + } + // If the requested seq is larger than the maximum seq and the pull order is ascending (pulling newer messages) + if seq > maxSeq && pullOrder == sdkws.PullOrder_PullOrderAsc { + isEnd = true + endSeq = maxSeq + } + } + if len(newSeqs) == 0 { + return isEnd, endSeq, nil, nil + } + successMsgs, failedSeqs, err := db.msg.GetMessagesBySeq(ctx, conversationID, newSeqs) + if err != nil { + if !errors.Is(err, redis.Nil) { + log.ZWarn(ctx, "get message from redis exception", err, "failedSeqs", failedSeqs, "conversationID", conversationID) + } + } + log.ZDebug(ctx, "db.seq.GetMessagesBySeq", "userID", userID, "conversationID", conversationID, "seqs", + seqs, "len(successMsgs)", len(successMsgs), "failedSeqs", failedSeqs) + + if len(failedSeqs) > 0 { + mongoMsgs, err := db.getMsgBySeqs(ctx, userID, conversationID, failedSeqs) + if err != nil { + + return false, 0, nil, err + } + + successMsgs = append(successMsgs, mongoMsgs...) + + //_, err = db.msg.SetMessagesToCache(ctx, conversationID, mongoMsgs) + //if err != nil { + // return 0, 0, nil, err + //} + } + return isEnd, endSeq, successMsgs, nil +} + func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error { var delStruct delMsgRecursionStruct var skip int64 diff --git a/pkg/common/storage/model/log.go b/pkg/common/storage/model/log.go index 9db72c695..9dc392179 100644 --- a/pkg/common/storage/model/log.go +++ b/pkg/common/storage/model/log.go @@ -19,13 +19,14 @@ import ( ) type Log struct { - LogID string `bson:"log_id"` - Platform string `bson:"platform"` - UserID string `bson:"user_id"` - CreateTime time.Time `bson:"create_time"` - Url string `bson:"url"` - FileName string `bson:"file_name"` - SystemType string `bson:"system_type"` - Version string `bson:"version"` - Ex string `bson:"ex"` + LogID string `bson:"log_id"` + Platform string `bson:"platform"` + UserID string `bson:"user_id"` + CreateTime time.Time `bson:"create_time"` + Url string `bson:"url"` + FileName string `bson:"file_name"` + SystemType string `bson:"system_type"` + AppFramework string `bson:"app_framework"` + Version string `bson:"version"` + Ex string `bson:"ex"` } diff --git a/pkg/rpccache/subscriber.go b/pkg/rpccache/subscriber.go index 3c73ef449..44e1f5885 100644 --- a/pkg/rpccache/subscriber.go +++ b/pkg/rpccache/subscriber.go @@ -17,7 +17,6 @@ package rpccache import ( "context" "encoding/json" - "github.com/openimsdk/tools/mw" "github.com/openimsdk/tools/log" "github.com/redis/go-redis/v9" @@ -26,7 +25,7 @@ import ( func subscriberRedisDeleteCache(ctx context.Context, client redis.UniversalClient, channel string, del func(ctx context.Context, key ...string)) { defer func() { if r := recover(); r != nil { - mw.PanicStackToLog(ctx, r) + log.ZPanic(ctx, "subscriberRedisDeleteCache Panic", r) } }() for message := range client.Subscribe(ctx, channel).Channel() {