Merge branch 'main' of github.com:openimsdk/open-im-server into refactor/err-handle

pull/2614/head
Monet Lee 1 year ago
commit 44f65b5855

File diff suppressed because it is too large Load Diff

@ -3,11 +3,14 @@ api:
listenIP: 0.0.0.0 listenIP: 0.0.0.0
# Listening ports; if multiple are configured, multiple instances will be launched, must be consistent with the number of prometheus.ports # Listening ports; if multiple are configured, multiple instances will be launched, must be consistent with the number of prometheus.ports
ports: [ 10002 ] ports: [ 10002 ]
# API compression level; 0: default compression, 1: best compression, 2: best speed, -1: no compression
compressionLevel: 0
prometheus: prometheus:
# Whether to enable prometheus # Whether to enable prometheus
enable: true enable: true
# Prometheus listening ports, must match the number of api.ports # Prometheus listening ports, must match the number of api.ports
ports: [ 20502 ] ports: [ 12002 ]
# This address can be accessed via a browser # This address can be accessed via a browser
grafanaURL: http://127.0.0.1:13000/ grafanaURL: http://127.0.0.1:13000/

@ -2,13 +2,13 @@ rpc:
# The IP address where this RPC service registers itself; if left blank, it defaults to the internal network IP # The IP address where this RPC service registers itself; if left blank, it defaults to the internal network IP
registerIP: registerIP:
# List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports
ports: [ 10140 ] ports: [ 10140, 10141, 10142, 10143, 10144, 10145, 10146, 10147, 10148, 10149, 10150, 10151, 10152, 10153, 10154, 10155 ]
prometheus: prometheus:
# Enable or disable Prometheus monitoring # Enable or disable Prometheus monitoring
enable: true enable: true
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
ports: [ 20640 ] ports: [ 12140, 12141, 12142, 12143, 12144, 12145, 12146, 12147, 12148, 12149, 12150, 12151, 12152, 12153, 12154, 12155 ]
# IP address that the RPC/WebSocket service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP # IP address that the RPC/WebSocket service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP
listenIP: 0.0.0.0 listenIP: 0.0.0.0
@ -25,6 +25,3 @@ longConnSvr:
# 1: For Android, iOS, Windows, Mac, and web platforms, only one instance can be online at a time # 1: For Android, iOS, Windows, Mac, and web platforms, only one instance can be online at a time
multiLoginPolicy: 1 multiLoginPolicy: 1

@ -3,4 +3,4 @@ prometheus:
enable: true enable: true
# List of ports that Prometheus listens on; each port corresponds to an instance of monitoring. Ensure these are managed accordingly # List of ports that Prometheus listens on; each port corresponds to an instance of monitoring. Ensure these are managed accordingly
# Because four instances have been launched, four ports need to be specified # Because four instances have been launched, four ports need to be specified
ports: [ 20600, 20601, 20602, 20603 ] ports: [ 12020, 12021, 12022, 12023, 12024, 12025, 12026, 12027 ]

@ -4,13 +4,13 @@ rpc:
# IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP
listenIP: 0.0.0.0 listenIP: 0.0.0.0
# List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports
ports: [ 10170, 10171, 10172, 10173 ] ports: [ 10170, 10171, 10172, 10173, 10174, 10175, 10176, 10177, 10178, 10179, 10180, 10181, 10182, 10183, 10184, 10185 ]
prometheus: prometheus:
# Enable or disable Prometheus monitoring # Enable or disable Prometheus monitoring
enable: true enable: true
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
ports: [ 20670, 20671, 20672, 20673 ] ports: [ 12170, 12171, 12172, 12173, 12174, 12175, 12176, 12177, 12178, 12179, 12180, 12181, 12182, 12183, 12184, 12185 ]
maxConcurrentWorkers: 3 maxConcurrentWorkers: 3
#Use geTui for offline push notifications, or choose fcm or jpns; corresponding configuration settings must be specified. #Use geTui for offline push notifications, or choose fcm or jpns; corresponding configuration settings must be specified.
@ -38,9 +38,4 @@ iosPush:
badgeCount: true badgeCount: true
production: false production: false
fullUserCache: true

@ -4,15 +4,14 @@ rpc:
# IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP
listenIP: 0.0.0.0 listenIP: 0.0.0.0
# List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports
ports: [ 10160 ] ports: [ 10200 ]
prometheus: prometheus:
# Enable or disable Prometheus monitoring # Enable or disable Prometheus monitoring
enable: true enable: true
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
ports: [ 20660 ] ports: [ 12200 ]
tokenPolicy: tokenPolicy:
# Token validity period, in days # Token validity period, in days
expire: 90 expire: 90

@ -4,10 +4,10 @@ rpc:
# IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP
listenIP: 0.0.0.0 listenIP: 0.0.0.0
# List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports
ports: [ 10180 ] ports: [ 10220 ]
prometheus: prometheus:
# Enable or disable Prometheus monitoring # Enable or disable Prometheus monitoring
enable: true enable: true
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
ports: [ 20680 ] ports: [ 12220 ]

@ -4,10 +4,10 @@ rpc:
# IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP
listenIP: 0.0.0.0 listenIP: 0.0.0.0
# List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports
ports: [ 10120 ] ports: [ 10240 ]
prometheus: prometheus:
# Enable or disable Prometheus monitoring # Enable or disable Prometheus monitoring
enable: true enable: true
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
ports: [ 20620 ] ports: [ 12240 ]

@ -4,13 +4,13 @@ rpc:
# IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP
listenIP: 0.0.0.0 listenIP: 0.0.0.0
# List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports
ports: [ 10150 ] ports: [ 10260 ]
prometheus: prometheus:
# Enable or disable Prometheus monitoring # Enable or disable Prometheus monitoring
enable: true enable: true
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
ports: [ 20650 ] ports: [ 12260 ]
enableHistoryForNewMembers: true enableHistoryForNewMembers: true

@ -4,17 +4,14 @@ rpc:
# IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP
listenIP: 0.0.0.0 listenIP: 0.0.0.0
# List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports
ports: [ 10130 ] ports: [ 10280 ]
prometheus: prometheus:
# Enable or disable Prometheus monitoring # Enable or disable Prometheus monitoring
enable: true enable: true
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
ports: [ 20630 ] ports: [ 12280 ]
# Does sending messages require friend verification # Does sending messages require friend verification
friendVerify: false friendVerify: false

@ -4,13 +4,13 @@ rpc:
# IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP
listenIP: 0.0.0.0 listenIP: 0.0.0.0
# List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports
ports: [ 10190 ] ports: [ 10300 ]
prometheus: prometheus:
# Enable or disable Prometheus monitoring # Enable or disable Prometheus monitoring
enable: true enable: true
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
ports: [ 20690 ] ports: [ 12300 ]
object: object:

@ -4,14 +4,10 @@ rpc:
# Listening IP; 0.0.0.0 means both internal and external IPs are listened to, if blank, the internal network IP is automatically obtained by default # Listening IP; 0.0.0.0 means both internal and external IPs are listened to, if blank, the internal network IP is automatically obtained by default
listenIP: 0.0.0.0 listenIP: 0.0.0.0
# Listening ports; if multiple are configured, multiple instances will be launched, and must be consistent with the number of prometheus.ports # Listening ports; if multiple are configured, multiple instances will be launched, and must be consistent with the number of prometheus.ports
ports: [ 10110 ] ports: [ 10320 ]
prometheus: prometheus:
# Whether to enable prometheus # Whether to enable prometheus
enable: true enable: true
# Prometheus listening ports, must be consistent with the number of rpc.ports # Prometheus listening ports, must be consistent with the number of rpc.ports
ports: [ 20610 ] ports: [ 12320 ]

@ -28,56 +28,59 @@ scrape_configs:
- targets: [ internal_ip:20500 ] - targets: [ internal_ip:20500 ]
- job_name: openimserver-openim-api - job_name: openimserver-openim-api
static_configs: static_configs:
- targets: [ internal_ip:20502 ] - targets: [ internal_ip:12002 ]
labels: labels:
namespace: default namespace: default
- job_name: openimserver-openim-msggateway - job_name: openimserver-openim-msggateway
static_configs: static_configs:
- targets: [ internal_ip:20640 ] - targets: [ internal_ip:12140 ]
# - targets: [ internal_ip:12140, internal_ip:12141, internal_ip:12142, internal_ip:12143, internal_ip:12144, internal_ip:12145, internal_ip:12146, internal_ip:12147, internal_ip:12148, internal_ip:12149, internal_ip:12150, internal_ip:12151, internal_ip:12152, internal_ip:12153, internal_ip:12154, internal_ip:12155 ]
labels: labels:
namespace: default namespace: default
- job_name: openimserver-openim-msgtransfer - job_name: openimserver-openim-msgtransfer
static_configs: static_configs:
- targets: [ internal_ip:20600, internal_ip:20601, internal_ip:20602, internal_ip:20603 ] - targets: [ internal_ip:12020, internal_ip:12021, internal_ip:12022, internal_ip:12023, internal_ip:12024, internal_ip:12025, internal_ip:12026, internal_ip:12027 ]
# - targets: [ internal_ip:12020, internal_ip:12021, internal_ip:12022, internal_ip:12023, internal_ip:12024, internal_ip:12025, internal_ip:12026, internal_ip:12027, internal_ip:12028, internal_ip:12029, internal_ip:12030, internal_ip:12031, internal_ip:12032, internal_ip:12033, internal_ip:12034, internal_ip:12035 ]
labels: labels:
namespace: default namespace: default
- job_name: openimserver-openim-push - job_name: openimserver-openim-push
static_configs: static_configs:
- targets: [ internal_ip:20670, internal_ip:20671, internal_ip:20672, internal_ip:20673] - targets: [ internal_ip:12170, internal_ip:12171, internal_ip:12172, internal_ip:12173, internal_ip:12174, internal_ip:12175, internal_ip:12176, internal_ip:12177 ]
# - targets: [ internal_ip:12170, internal_ip:12171, internal_ip:12172, internal_ip:12173, internal_ip:12174, internal_ip:12175, internal_ip:12176, internal_ip:12177, internal_ip:12178, internal_ip:12179, internal_ip:12180, internal_ip:12181, internal_ip:12182, internal_ip:12183, internal_ip:12184, internal_ip:12185 ]
labels: labels:
namespace: default namespace: default
- job_name: openimserver-openim-rpc-auth - job_name: openimserver-openim-rpc-auth
static_configs: static_configs:
- targets: [ internal_ip:20600 ] - targets: [ internal_ip:12200 ]
labels: labels:
namespace: default namespace: default
- job_name: openimserver-openim-rpc-conversation - job_name: openimserver-openim-rpc-conversation
static_configs: static_configs:
- targets: [ internal_ip:20680 ] - targets: [ internal_ip:12220 ]
labels: labels:
namespace: default namespace: default
- job_name: openimserver-openim-rpc-friend - job_name: openimserver-openim-rpc-friend
static_configs: static_configs:
- targets: [ internal_ip:20620 ] - targets: [ internal_ip:12240 ]
labels: labels:
namespace: default namespace: default
- job_name: openimserver-openim-rpc-group - job_name: openimserver-openim-rpc-group
static_configs: static_configs:
- targets: [ internal_ip:20650 ] - targets: [ internal_ip:12260 ]
labels: labels:
namespace: default namespace: default
- job_name: openimserver-openim-rpc-msg - job_name: openimserver-openim-rpc-msg
static_configs: static_configs:
- targets: [ internal_ip:20630 ] - targets: [ internal_ip:12280 ]
labels: labels:
namespace: default namespace: default
- job_name: openimserver-openim-rpc-third - job_name: openimserver-openim-rpc-third
static_configs: static_configs:
- targets: [ internal_ip:20690 ] - targets: [ internal_ip:12300 ]
labels: labels:
namespace: default namespace: default
- job_name: openimserver-openim-rpc-user - job_name: openimserver-openim-rpc-user
static_configs: static_configs:
- targets: [ internal_ip:20610 ] - targets: [ internal_ip:12320 ]
labels: labels:
namespace: default namespace: default

@ -186,4 +186,3 @@ services:
# networks: # networks:
# - openim # - openim

@ -6,21 +6,21 @@ require (
firebase.google.com/go v3.13.0+incompatible firebase.google.com/go v3.13.0+incompatible
github.com/dtm-labs/rockscache v0.1.1 github.com/dtm-labs/rockscache v0.1.1
github.com/gin-gonic/gin v1.9.1 github.com/gin-gonic/gin v1.9.1
github.com/go-playground/validator/v10 v10.18.0 github.com/go-playground/validator/v10 v10.20.0
github.com/gogo/protobuf v1.3.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 github.com/golang-jwt/jwt/v4 v4.5.0
github.com/gorilla/websocket v1.5.1 github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0 github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.72-alpha.20 github.com/openimsdk/protocol v0.0.72-alpha.20
github.com/openimsdk/tools v0.0.50-alpha.11 github.com/openimsdk/tools v0.0.50-alpha.12
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.9.0 github.com/stretchr/testify v1.9.0
go.mongodb.org/mongo-driver v1.14.0 go.mongodb.org/mongo-driver v1.14.0
google.golang.org/api v0.165.0 google.golang.org/api v0.165.0
google.golang.org/grpc v1.62.1 google.golang.org/grpc v1.62.1
google.golang.org/protobuf v1.33.0 google.golang.org/protobuf v1.34.0
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
) )
@ -29,6 +29,7 @@ require github.com/google/uuid v1.6.0
require ( require (
github.com/IBM/sarama v1.43.0 github.com/IBM/sarama v1.43.0
github.com/fatih/color v1.14.1 github.com/fatih/color v1.14.1
github.com/gin-contrib/gzip v1.0.1
github.com/go-redis/redis v6.15.9+incompatible github.com/go-redis/redis v6.15.9+incompatible
github.com/go-redis/redismock/v9 v9.2.0 github.com/go-redis/redismock/v9 v9.2.0
github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/hashicorp/golang-lru/v2 v2.0.7
@ -73,10 +74,12 @@ require (
github.com/aws/aws-sdk-go-v2/service/sts v1.25.4 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.25.4 // indirect
github.com/aws/smithy-go v1.17.0 // indirect github.com/aws/smithy-go v1.17.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/bytedance/sonic v1.9.1 // indirect github.com/bytedance/sonic v1.11.6 // indirect
github.com/bytedance/sonic/loader v0.1.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/clbanning/mxj v1.8.4 // indirect github.com/clbanning/mxj v1.8.4 // indirect
github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/coreos/go-systemd/v22 v22.3.2 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
@ -117,7 +120,7 @@ require (
github.com/json-iterator/go v1.1.12 // indirect github.com/json-iterator/go v1.1.12 // indirect
github.com/kelindar/simd v1.1.2 // indirect github.com/kelindar/simd v1.1.2 // indirect
github.com/klauspost/compress v1.17.7 // indirect github.com/klauspost/compress v1.17.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.6 // indirect github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/leodido/go-urn v1.4.0 // indirect github.com/leodido/go-urn v1.4.0 // indirect
github.com/lestrrat-go/strftime v1.0.6 // indirect github.com/lestrrat-go/strftime v1.0.6 // indirect
github.com/lithammer/shortuuid v3.0.0+incompatible // indirect github.com/lithammer/shortuuid v3.0.0+incompatible // indirect
@ -132,7 +135,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/mozillazg/go-httpheader v0.4.0 // indirect github.com/mozillazg/go-httpheader v0.4.0 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/client_model v0.5.0 // indirect
@ -169,9 +172,9 @@ require (
go.opentelemetry.io/otel/trace v1.23.0 // indirect go.opentelemetry.io/otel/trace v1.23.0 // indirect
go.uber.org/atomic v1.9.0 // indirect go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect
golang.org/x/arch v0.3.0 // indirect golang.org/x/arch v0.7.0 // indirect
golang.org/x/image v0.15.0 // indirect golang.org/x/image v0.15.0 // indirect
golang.org/x/net v0.22.0 // indirect golang.org/x/net v0.24.0 // indirect
golang.org/x/oauth2 v0.17.0 // indirect golang.org/x/oauth2 v0.17.0 // indirect
golang.org/x/sys v0.19.0 // indirect golang.org/x/sys v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect golang.org/x/text v0.14.0 // indirect
@ -187,10 +190,10 @@ require (
require ( require (
github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/locales v0.14.1 // indirect
github.com/goccy/go-json v0.10.2 // indirect github.com/goccy/go-json v0.10.2 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect github.com/mattn/go-isatty v0.0.20 // indirect
github.com/spf13/cobra v1.8.0 github.com/spf13/cobra v1.8.0
github.com/ugorji/go/codec v1.2.11 // indirect github.com/ugorji/go/codec v1.2.12 // indirect
go.uber.org/zap v1.24.0 // indirect go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.21.0 // indirect golang.org/x/crypto v0.22.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect
) )

@ -65,18 +65,20 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.11.6 h1:oUp34TzMlL+OY1OUWxHqsdkgC/Zfc85zGqw9siXjrc0=
github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s= github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4=
github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= github.com/bytedance/sonic/loader v0.1.1 h1:c+e5Pt1k/cy5wMveRDyk2X4B9hF4g7an8N3zCYjJFNM=
github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk=
github.com/clbanning/mxj v1.8.4 h1:HuhwZtbyvyOw+3Z1AowPkU87JkJUSv751ELWaiTpj8I= github.com/clbanning/mxj v1.8.4 h1:HuhwZtbyvyOw+3Z1AowPkU87JkJUSv751ELWaiTpj8I=
github.com/clbanning/mxj v1.8.4/go.mod h1:BVjHeAH+rl9rs6f+QIpeRl0tfu10SXn1pUSa5PVGJng= github.com/clbanning/mxj v1.8.4/go.mod h1:BVjHeAH+rl9rs6f+QIpeRl0tfu10SXn1pUSa5PVGJng=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y=
github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w=
github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg=
github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa h1:jQCWAUqqlij9Pgj2i/PB79y4KOPYVyFYdROxgaCwdTQ= github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa h1:jQCWAUqqlij9Pgj2i/PB79y4KOPYVyFYdROxgaCwdTQ=
github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq8dk6e9PdstVsDgu9RuyIIJqAaF//0IM= github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq8dk6e9PdstVsDgu9RuyIIJqAaF//0IM=
@ -121,6 +123,8 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0= github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0=
github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk= github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk=
github.com/gin-contrib/gzip v1.0.1 h1:HQ8ENHODeLY7a4g1Au/46Z92bdGFl74OhxcZble9WJE=
github.com/gin-contrib/gzip v1.0.1/go.mod h1:njt428fdUNRvjuJf16tZMYZ2Yl+WQB53X5wmhDwXvC4=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.9.1 h1:4idEAncQnU5cB7BeOkPtxjfCSye0AAm1R0RVIqJ+Jmg= github.com/gin-gonic/gin v1.9.1 h1:4idEAncQnU5cB7BeOkPtxjfCSye0AAm1R0RVIqJ+Jmg=
@ -144,8 +148,8 @@ github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.8.0/go.mod h1:9JhgTzTaE31GZDpH/HSvHiRJrJ3iKAgqqH0Bl/Ocjdk= github.com/go-playground/validator/v10 v10.8.0/go.mod h1:9JhgTzTaE31GZDpH/HSvHiRJrJ3iKAgqqH0Bl/Ocjdk=
github.com/go-playground/validator/v10 v10.18.0 h1:BvolUXjp4zuvkZ5YN5t7ebzbhlUtPsPm2S9NAZ5nl9U= github.com/go-playground/validator/v10 v10.20.0 h1:K9ISHbSaI0lyB2eWMPJo+kOS/FBExVwjEviJTixqxL8=
github.com/go-playground/validator/v10 v10.18.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM= github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redismock/v9 v9.2.0 h1:ZrMYQeKPECZPjOj5u9eyOjg8Nnb0BS9lkVIZ6IpsKLw= github.com/go-redis/redismock/v9 v9.2.0 h1:ZrMYQeKPECZPjOj5u9eyOjg8Nnb0BS9lkVIZ6IpsKLw=
@ -257,8 +261,9 @@ github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLA
github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc= github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM=
github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
@ -288,8 +293,8 @@ github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3v
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
@ -321,10 +326,10 @@ github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCF
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.20 h1:kfSYOnWRp9KKkwGelR9Zo20TdjMq5LLzfYKyVqUaolo= github.com/openimsdk/protocol v0.0.72-alpha.20 h1:kfSYOnWRp9KKkwGelR9Zo20TdjMq5LLzfYKyVqUaolo=
github.com/openimsdk/protocol v0.0.72-alpha.20/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/protocol v0.0.72-alpha.20/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.50-alpha.11 h1:ClhkRjUVJWbmOiQ14G6do/ES1a6ZueDITv40Apwq/Tc= github.com/openimsdk/tools v0.0.50-alpha.12 h1:rV3BxgqN+F79vZvdoQ+97Eob8ScsRVEM8D+Wrcl23uo=
github.com/openimsdk/tools v0.0.50-alpha.11/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/openimsdk/tools v0.0.50-alpha.12/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
@ -408,8 +413,8 @@ github.com/tklauser/numcpus v0.7.0 h1:yjuerZP127QG9m5Zh/mSO4wqurYil27tHrqwRoRjpr
github.com/tklauser/numcpus v0.7.0/go.mod h1:bb6dMVcj8A42tSE7i32fsIUCbQNllK5iDguyOZRUzAY= github.com/tklauser/numcpus v0.7.0/go.mod h1:bb6dMVcj8A42tSE7i32fsIUCbQNllK5iDguyOZRUzAY=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU= github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
@ -456,8 +461,8 @@ go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN8
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
golang.org/x/arch v0.3.0 h1:02VY4/ZcO/gBOH6PUaoiptASxtXU10jazRCP865E97k= golang.org/x/arch v0.7.0 h1:pskyeJh/3AmoQ8CPE95vxHLqp1G1GfGNXTmcl9NEKTc=
golang.org/x/arch v0.3.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/arch v0.7.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
@ -465,8 +470,8 @@ golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
@ -493,8 +498,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug
golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.17.0 h1:6m3ZPmLEFdVxKKWnKq4VqZ60gutO35zm+zrAHVmHyDQ= golang.org/x/oauth2 v0.17.0 h1:6m3ZPmLEFdVxKKWnKq4VqZ60gutO35zm+zrAHVmHyDQ=
golang.org/x/oauth2 v0.17.0/go.mod h1:OzPDGQiuQMguemayvdylqddI7qcD9lnSDb+1FiwQ5HA= golang.org/x/oauth2 v0.17.0/go.mod h1:OzPDGQiuQMguemayvdylqddI7qcD9lnSDb+1FiwQ5HA=
@ -586,8 +591,8 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= google.golang.org/protobuf v1.34.0 h1:Qo/qEd2RZPCf2nKuorzksSknv0d3ERwp1vFG38gSmH4=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= google.golang.org/protobuf v1.34.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
@ -608,6 +613,7 @@ gorm.io/gorm v1.25.8 h1:WAGEZ/aEcznN4D03laj8DKnehe1e9gYQAjW8xyPRdeo=
gorm.io/gorm v1.25.8/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8= gorm.io/gorm v1.25.8/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50=
rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=
stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c= stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c=
stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0= stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0=

@ -2,6 +2,7 @@ package api
import ( import (
"fmt" "fmt"
"github.com/gin-contrib/gzip"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding" "github.com/gin-gonic/gin/binding"
@ -22,6 +23,13 @@ import (
"github.com/openimsdk/tools/mw" "github.com/openimsdk/tools/mw"
) )
const (
NoCompression = -1
DefaultCompression = 0
BestCompression = 1
BestSpeed = 2
)
func prommetricsGin() gin.HandlerFunc { func prommetricsGin() gin.HandlerFunc {
return func(c *gin.Context) { return func(c *gin.Context) {
c.Next() c.Next()
@ -54,7 +62,15 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
conversationRpc := rpcclient.NewConversation(disCov, config.Share.RpcRegisterName.Conversation) conversationRpc := rpcclient.NewConversation(disCov, config.Share.RpcRegisterName.Conversation)
authRpc := rpcclient.NewAuth(disCov, config.Share.RpcRegisterName.Auth) authRpc := rpcclient.NewAuth(disCov, config.Share.RpcRegisterName.Auth)
thirdRpc := rpcclient.NewThird(disCov, config.Share.RpcRegisterName.Third, config.API.Prometheus.GrafanaURL) thirdRpc := rpcclient.NewThird(disCov, config.Share.RpcRegisterName.Third, config.API.Prometheus.GrafanaURL)
switch config.API.Api.CompressionLevel {
case NoCompression:
case DefaultCompression:
r.Use(gzip.Gzip(gzip.DefaultCompression))
case BestCompression:
r.Use(gzip.Gzip(gzip.BestCompression))
case BestSpeed:
r.Use(gzip.Gzip(gzip.BestSpeed))
}
r.Use(prommetricsGin(), gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID(), GinParseToken(authRpc)) r.Use(prommetricsGin(), gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID(), GinParseToken(authRpc))
u := NewUserApi(*userRpc) u := NewUserApi(*userRpc)
m := NewMessageApi(messageRpc, userRpc, config.Share.IMAdminUserID) m := NewMessageApi(messageRpc, userRpc, config.Share.IMAdminUserID)

@ -22,6 +22,8 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"google.golang.org/protobuf/proto"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/sdkws"
@ -30,7 +32,6 @@ import (
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/stringutil" "github.com/openimsdk/tools/utils/stringutil"
"google.golang.org/protobuf/proto"
) )
var ( var (
@ -222,6 +223,8 @@ func (c *Client) handleMessage(message []byte) error {
resp, messageErr = c.longConnServer.PullMessageBySeqList(ctx, binaryReq) resp, messageErr = c.longConnServer.PullMessageBySeqList(ctx, binaryReq)
case WSPullMsg: case WSPullMsg:
resp, messageErr = c.longConnServer.GetSeqMessage(ctx, binaryReq) resp, messageErr = c.longConnServer.GetSeqMessage(ctx, binaryReq)
case WSGetConvMaxReadSeq:
resp, messageErr = c.longConnServer.GetConversationsHasReadAndMaxSeq(ctx, binaryReq)
case WsLogoutMsg: case WsLogoutMsg:
resp, messageErr = c.longConnServer.UserLogout(ctx, binaryReq) resp, messageErr = c.longConnServer.UserLogout(ctx, binaryReq)
case WsSetBackgroundStatus: case WsSetBackgroundStatus:

@ -40,6 +40,7 @@ const (
WSSendMsg = 1003 WSSendMsg = 1003
WSSendSignalMsg = 1004 WSSendSignalMsg = 1004
WSPullMsg = 1005 WSPullMsg = 1005
WSGetConvMaxReadSeq = 1006
WSPushMsg = 2001 WSPushMsg = 2001
WSKickOnlineMsg = 2002 WSKickOnlineMsg = 2002
WsLogoutMsg = 2003 WsLogoutMsg = 2003

@ -58,7 +58,7 @@ func Start(ctx context.Context, index int, conf *Config) error {
) )
hubServer := NewServer(rpcPort, longServer, conf, func(srv *Server) error { hubServer := NewServer(rpcPort, longServer, conf, func(srv *Server) error {
longServer.online = rpccache.NewOnlineCache(srv.userRcp, nil, rdb, longServer.subscriberUserOnlineStatusChanges) longServer.online, _ = rpccache.NewOnlineCache(srv.userRcp, nil, rdb, false, longServer.subscriberUserOnlineStatusChanges)
return nil return nil
}) })

@ -19,6 +19,8 @@ import (
"sync" "sync"
"github.com/go-playground/validator/v10" "github.com/go-playground/validator/v10"
"google.golang.org/protobuf/proto"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/msg"
@ -27,7 +29,6 @@ import (
"github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/jsonutil" "github.com/openimsdk/tools/utils/jsonutil"
"google.golang.org/protobuf/proto"
) )
type Req struct { type Req struct {
@ -94,6 +95,7 @@ type MessageHandler interface {
SendMessage(context context.Context, data *Req) ([]byte, error) SendMessage(context context.Context, data *Req) ([]byte, error)
SendSignalMessage(context context.Context, data *Req) ([]byte, error) SendSignalMessage(context context.Context, data *Req) ([]byte, error)
PullMessageBySeqList(context context.Context, data *Req) ([]byte, error) PullMessageBySeqList(context context.Context, data *Req) ([]byte, error)
GetConversationsHasReadAndMaxSeq(context context.Context, data *Req) ([]byte, error)
GetSeqMessage(context context.Context, data *Req) ([]byte, error) GetSeqMessage(context context.Context, data *Req) ([]byte, error)
UserLogout(context context.Context, data *Req) ([]byte, error) UserLogout(context context.Context, data *Req) ([]byte, error)
SetUserDeviceBackground(context context.Context, data *Req) ([]byte, bool, error) SetUserDeviceBackground(context context.Context, data *Req) ([]byte, bool, error)
@ -176,7 +178,7 @@ func (g GrpcHandler) SendSignalMessage(context context.Context, data *Req) ([]by
func (g GrpcHandler) PullMessageBySeqList(context context.Context, data *Req) ([]byte, error) { func (g GrpcHandler) PullMessageBySeqList(context context.Context, data *Req) ([]byte, error) {
req := sdkws.PullMessageBySeqsReq{} req := sdkws.PullMessageBySeqsReq{}
if err := proto.Unmarshal(data.Data, &req); err != nil { if err := proto.Unmarshal(data.Data, &req); err != nil {
return nil, errs.WrapMsg(err, "error unmarshaling request", "action", "unmarshal", "dataType", "PullMessageBySeqsReq") return nil, errs.WrapMsg(err, "err proto unmarshal", "action", "unmarshal", "dataType", "PullMessageBySeqsReq")
} }
if err := g.validate.Struct(data); err != nil { if err := g.validate.Struct(data); err != nil {
return nil, errs.WrapMsg(err, "validation failed", "action", "validate", "dataType", "PullMessageBySeqsReq") return nil, errs.WrapMsg(err, "validation failed", "action", "validate", "dataType", "PullMessageBySeqsReq")
@ -192,6 +194,25 @@ func (g GrpcHandler) PullMessageBySeqList(context context.Context, data *Req) ([
return c, nil return c, nil
} }
func (g GrpcHandler) GetConversationsHasReadAndMaxSeq(context context.Context, data *Req) ([]byte, error) {
req := msg.GetConversationsHasReadAndMaxSeqReq{}
if err := proto.Unmarshal(data.Data, &req); err != nil {
return nil, errs.WrapMsg(err, "err proto unmarshal", "action", "unmarshal", "dataType", "GetConversationsHasReadAndMaxSeq")
}
if err := g.validate.Struct(data); err != nil {
return nil, errs.WrapMsg(err, "validation failed", "action", "validate", "dataType", "GetConversationsHasReadAndMaxSeq")
}
resp, err := g.msgRpcClient.GetConversationsHasReadAndMaxSeq(context, &req)
if err != nil {
return nil, err
}
c, err := proto.Marshal(resp)
if err != nil {
return nil, errs.WrapMsg(err, "error marshaling response", "action", "marshal", "dataType", "GetConversationsHasReadAndMaxSeq")
}
return c, nil
}
func (g GrpcHandler) GetSeqMessage(context context.Context, data *Req) ([]byte, error) { func (g GrpcHandler) GetSeqMessage(context context.Context, data *Req) ([]byte, error) {
req := msg.GetSeqMessageReq{} req := msg.GetSeqMessageReq{}
if err := proto.Unmarshal(data.Data, &req); err != nil { if err := proto.Unmarshal(data.Data, &req); err != nil {

@ -265,7 +265,7 @@ func (ws *WsServer) registerClient(client *Client) {
if clientOK { if clientOK {
ws.clients.Set(client.UserID, client) ws.clients.Set(client.UserID, client)
// There is already a connection to the platform // There is already a connection to the platform
log.ZInfo(client.ctx, "repeat login", "userID", client.UserID, "platformID", log.ZDebug(client.ctx, "repeat login", "userID", client.UserID, "platformID",
client.PlatformID, "old remote addr", getRemoteAdders(oldClients)) client.PlatformID, "old remote addr", getRemoteAdders(oldClients))
ws.onlineUserConnNum.Add(1) ws.onlineUserConnNum.Add(1)
} else { } else {
@ -293,7 +293,7 @@ func (ws *WsServer) registerClient(client *Client) {
wg.Wait() wg.Wait()
log.ZInfo( log.ZDebug(
client.ctx, client.ctx,
"user online", "user online",
"online user Num", "online user Num",
@ -360,7 +360,7 @@ func (ws *WsServer) unregisterClient(client *Client) {
ws.onlineUserConnNum.Add(-1) ws.onlineUserConnNum.Add(-1)
ws.subscription.DelClient(client) ws.subscription.DelClient(client)
//ws.SetUserOnlineStatus(client.ctx, client, constant.Offline) //ws.SetUserOnlineStatus(client.ctx, client, constant.Offline)
log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num", log.ZDebug(client.ctx, "user offline", "close reason", client.closedErr, "online user Num",
ws.onlineUserNum.Load(), "online user conn Num", ws.onlineUserNum.Load(), "online user conn Num",
ws.onlineUserConnNum.Load(), ws.onlineUserConnNum.Load(),
) )

@ -238,6 +238,7 @@ func (och *OnlineHistoryRedisConsumerHandler) categorizeMessageLists(totalMsgs [
} }
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key, conversationID string, storageList, notStorageList []*ContextMsg) { func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key, conversationID string, storageList, notStorageList []*ContextMsg) {
log.ZInfo(ctx, "handle storage msg")
for _, storageMsg := range storageList { for _, storageMsg := range storageList {
log.ZDebug(ctx, "handle storage msg", "msg", storageMsg.message.String()) log.ZDebug(ctx, "handle storage msg", "msg", storageMsg.message.String())
} }
@ -254,16 +255,20 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key
log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageMessageList) log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageMessageList)
return return
} }
log.ZInfo(ctx, "BatchInsertChat2Cache end")
if isNewConversation { if isNewConversation {
switch msg.SessionType { switch msg.SessionType {
case constant.ReadGroupChatType: case constant.ReadGroupChatType:
log.ZInfo(ctx, "group chat first create conversation", "conversationID", log.ZDebug(ctx, "group chat first create conversation", "conversationID",
conversationID) conversationID)
userIDs, err := och.groupRpcClient.GetGroupMemberIDs(ctx, msg.GroupID) userIDs, err := och.groupRpcClient.GetGroupMemberIDs(ctx, msg.GroupID)
if err != nil { if err != nil {
log.ZWarn(ctx, "get group member ids error", err, "conversationID", log.ZWarn(ctx, "get group member ids error", err, "conversationID",
conversationID) conversationID)
} else { } else {
log.ZInfo(ctx, "GetGroupMemberIDs end")
if err := och.conversationRpcClient.GroupChatFirstCreateConversation(ctx, if err := och.conversationRpcClient.GroupChatFirstCreateConversation(ctx,
msg.GroupID, userIDs); err != nil { msg.GroupID, userIDs); err != nil {
log.ZWarn(ctx, "single chat first create conversation error", err, log.ZWarn(ctx, "single chat first create conversation error", err,
@ -282,13 +287,16 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key
} }
} }
log.ZDebug(ctx, "success incr to next topic") log.ZInfo(ctx, "success incr to next topic")
err = och.msgTransferDatabase.MsgToMongoMQ(ctx, key, conversationID, storageMessageList, lastSeq) err = och.msgTransferDatabase.MsgToMongoMQ(ctx, key, conversationID, storageMessageList, lastSeq)
if err != nil { if err != nil {
log.ZError(ctx, "Msg To MongoDB MQ error", err, "conversationID", log.ZError(ctx, "Msg To MongoDB MQ error", err, "conversationID",
conversationID, "storageList", storageMessageList, "lastSeq", lastSeq) conversationID, "storageList", storageMessageList, "lastSeq", lastSeq)
} }
log.ZInfo(ctx, "MsgToMongoMQ end")
och.toPushTopic(ctx, key, conversationID, storageList) och.toPushTopic(ctx, key, conversationID, storageList)
log.ZInfo(ctx, "toPushTopic end")
} }
} }
@ -319,7 +327,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*ContextMsg) { func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*ContextMsg) {
for _, v := range msgs { for _, v := range msgs {
log.ZDebug(ctx, "push msg to topic", "msg", v.message.String()) log.ZDebug(ctx, "push msg to topic", "msg", v.message.String())
och.msgTransferDatabase.MsgToPushMQ(v.ctx, key, conversationID, v.message) _, _, _ = och.msgTransferDatabase.MsgToPushMQ(v.ctx, key, conversationID, v.message)
} }
} }
@ -344,7 +352,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Cleanup(_ sarama.ConsumerGroupSess
func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
log.ZInfo(context.Background(), "online new session msg come", "highWaterMarkOffset", log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset",
claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition()) claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition())
och.redisMessageBatches.OnComplete = func(lastMessage *sarama.ConsumerMessage, totalCount int) { och.redisMessageBatches.OnComplete = func(lastMessage *sarama.ConsumerMessage, totalCount int) {
session.MarkMessage(lastMessage, "") session.MarkMessage(lastMessage, "")

@ -57,7 +57,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont
log.ZError(ctx, "msgFromMQ.MsgData is empty", nil, "cMsg", cMsg) log.ZError(ctx, "msgFromMQ.MsgData is empty", nil, "cMsg", cMsg)
return return
} }
log.ZInfo(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.String()) log.ZDebug(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.String())
err = mc.msgTransferDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq) err = mc.msgTransferDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq)
if err != nil { if err != nil {
log.ZError( log.ZError(

@ -28,6 +28,6 @@ type Dummy struct {
} }
func (d *Dummy) Push(ctx context.Context, userIDs []string, title, content string, opts *options.Opts) error { func (d *Dummy) Push(ctx context.Context, userIDs []string, title, content string, opts *options.Opts) error {
log.ZInfo(ctx, "dummy push") log.ZDebug(ctx, "dummy push")
return nil return nil
} }

@ -23,7 +23,6 @@ import (
"time" "time"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
@ -100,7 +99,6 @@ func (g *Client) Push(ctx context.Context, userIDs []string, title, content stri
if err = g.batchPush(ctx, token, userIDs[i:end], pushReq); err != nil { if err = g.batchPush(ctx, token, userIDs[i:end], pushReq); err != nil {
log.ZError(ctx, "batchPush failed", err, "index", index, "token", token, "req", pushReq) log.ZError(ctx, "batchPush failed", err, "index", index, "token", token, "req", pushReq)
} }
} }
if err = g.batchPush(ctx, token, userIDs, pushReq); err != nil { if err = g.batchPush(ctx, token, userIDs, pushReq); err != nil {
log.ZError(ctx, "batchPush failed", err, "index", index, "token", token, "req", pushReq) log.ZError(ctx, "batchPush failed", err, "index", index, "token", token, "req", pushReq)

@ -63,7 +63,7 @@ func (o *OfflinePushConsumerHandler) handleMsg2OfflinePush(ctx context.Context,
} }
} }
func (c *OfflinePushConsumerHandler) getOfflinePushInfos(msg *sdkws.MsgData) (title, content string, opts *options.Opts, err error) { func (o *OfflinePushConsumerHandler) getOfflinePushInfos(msg *sdkws.MsgData) (title, content string, opts *options.Opts, err error) {
type AtTextElem struct { type AtTextElem struct {
Text string `json:"text,omitempty"` Text string `json:"text,omitempty"`
AtUserList []string `json:"atUserList,omitempty"` AtUserList []string `json:"atUserList,omitempty"`
@ -108,12 +108,12 @@ func (c *OfflinePushConsumerHandler) getOfflinePushInfos(msg *sdkws.MsgData) (ti
return return
} }
func (c *OfflinePushConsumerHandler) offlinePushMsg(ctx context.Context, msg *sdkws.MsgData, offlinePushUserIDs []string) error { func (o *OfflinePushConsumerHandler) offlinePushMsg(ctx context.Context, msg *sdkws.MsgData, offlinePushUserIDs []string) error {
title, content, opts, err := c.getOfflinePushInfos(msg) title, content, opts, err := o.getOfflinePushInfos(msg)
if err != nil { if err != nil {
return err return err
} }
err = c.offlinePusher.Push(ctx, offlinePushUserIDs, title, content, opts) err = o.offlinePusher.Push(ctx, offlinePushUserIDs, title, content, opts)
if err != nil { if err != nil {
prommetrics.MsgOfflinePushFailedCounter.Inc() prommetrics.MsgOfflinePushFailedCounter.Inc()
return err return err

@ -27,12 +27,12 @@ func newEmptyOnlinePusher() *emptyOnlinePusher {
func (emptyOnlinePusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, func (emptyOnlinePusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData,
pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
log.ZWarn(ctx, "emptyOnlinePusher GetConnsAndOnlinePush", nil) log.ZInfo(ctx, "emptyOnlinePusher GetConnsAndOnlinePush", nil)
return nil, nil return nil, nil
} }
func (u emptyOnlinePusher) GetOnlinePushFailedUserIDs(ctx context.Context, msg *sdkws.MsgData, func (u emptyOnlinePusher) GetOnlinePushFailedUserIDs(ctx context.Context, msg *sdkws.MsgData,
wsResults []*msggateway.SingleMsgToUserResults, pushToUserIDs *[]string) []string { wsResults []*msggateway.SingleMsgToUserResults, pushToUserIDs *[]string) []string {
log.ZWarn(ctx, "emptyOnlinePusher GetOnlinePushFailedUserIDs", nil) log.ZInfo(ctx, "emptyOnlinePusher GetOnlinePushFailedUserIDs", nil)
return nil return nil
} }

@ -27,6 +27,9 @@ import (
"github.com/openimsdk/tools/utils/timeutil" "github.com/openimsdk/tools/utils/timeutil"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"math/rand"
"strconv"
"time"
) )
type ConsumerHandler struct { type ConsumerHandler struct {
@ -55,6 +58,7 @@ func NewConsumerHandler(config *Config, database controller.PushDatabase, offlin
} }
userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID)
consumerHandler.offlinePusher = offlinePusher consumerHandler.offlinePusher = offlinePusher
consumerHandler.onlinePusher = NewOnlinePusher(client, config) consumerHandler.onlinePusher = NewOnlinePusher(client, config)
consumerHandler.groupRpcClient = rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) consumerHandler.groupRpcClient = rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group)
@ -65,7 +69,10 @@ func NewConsumerHandler(config *Config, database controller.PushDatabase, offlin
consumerHandler.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL) consumerHandler.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL)
consumerHandler.config = config consumerHandler.config = config
consumerHandler.pushDatabase = database consumerHandler.pushDatabase = database
consumerHandler.onlineCache = rpccache.NewOnlineCache(userRpcClient, consumerHandler.groupLocalCache, rdb, nil) consumerHandler.onlineCache, err = rpccache.NewOnlineCache(userRpcClient, consumerHandler.groupLocalCache, rdb, config.RpcConfig.FullUserCache, nil)
if err != nil {
return nil, err
}
return &consumerHandler, nil return &consumerHandler, nil
} }
@ -108,6 +115,14 @@ func (*ConsumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil }
func (*ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil } func (*ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }
func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
c.onlineCache.Lock.Lock()
for c.onlineCache.CurrentPhase.Load() < rpccache.DoSubscribeOver {
c.onlineCache.Cond.Wait()
}
c.onlineCache.Lock.Unlock()
ctx := mcontext.SetOperationID(context.TODO(), strconv.FormatInt(time.Now().UnixNano()+int64(rand.Uint32()), 10))
log.ZInfo(ctx, "begin consume messages")
for msg := range claim.Messages() { for msg := range claim.Messages() {
ctx := c.pushConsumerGroup.GetContextFromMsg(msg) ctx := c.pushConsumerGroup.GetContextFromMsg(msg)
c.handleMs2PsChat(ctx, msg.Value) c.handleMs2PsChat(ctx, msg.Value)
@ -118,20 +133,27 @@ func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim s
// Push2User Suitable for two types of conversations, one is SingleChatType and the other is NotificationChatType. // Push2User Suitable for two types of conversations, one is SingleChatType and the other is NotificationChatType.
func (c *ConsumerHandler) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) (err error) { func (c *ConsumerHandler) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) (err error) {
log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String()) log.ZInfo(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String())
defer func(duration time.Time) {
t := time.Since(duration)
log.ZInfo(ctx, "Get msg from msg_transfer And push msg", "msg", msg.String(), "time cost", t)
}(time.Now())
if err := c.webhookBeforeOnlinePush(ctx, &c.config.WebhooksConfig.BeforeOnlinePush, userIDs, msg); err != nil { if err := c.webhookBeforeOnlinePush(ctx, &c.config.WebhooksConfig.BeforeOnlinePush, userIDs, msg); err != nil {
return err return err
} }
log.ZInfo(ctx, "webhookBeforeOnlinePush end")
wsResults, err := c.GetConnsAndOnlinePush(ctx, msg, userIDs) wsResults, err := c.GetConnsAndOnlinePush(ctx, msg, userIDs)
if err != nil { if err != nil {
return err return err
} }
log.ZDebug(ctx, "single and notification push result", "result", wsResults, "msg", msg, "push_to_userID", userIDs) log.ZInfo(ctx, "single and notification push result", "result", wsResults, "msg", msg, "push_to_userID", userIDs)
if !c.shouldPushOffline(ctx, msg) { if !c.shouldPushOffline(ctx, msg) {
return nil return nil
} }
log.ZInfo(ctx, "shouldPushOffline end")
for _, v := range wsResults { for _, v := range wsResults {
//message sender do not need offline push //message sender do not need offline push
@ -150,7 +172,7 @@ func (c *ConsumerHandler) Push2User(ctx context.Context, userIDs []string, msg *
offlinePushUserID, msg, nil); err != nil { offlinePushUserID, msg, nil); err != nil {
return err return err
} }
log.ZInfo(ctx, "webhookBeforeOfflinePush end")
err = c.offlinePushMsg(ctx, msg, offlinePushUserID) err = c.offlinePushMsg(ctx, msg, offlinePushUserID)
if err != nil { if err != nil {
log.ZWarn(ctx, "offlinePushMsg failed", err, "offlinePushUserID", offlinePushUserID, "msg", msg) log.ZWarn(ctx, "offlinePushMsg failed", err, "offlinePushUserID", offlinePushUserID, "msg", msg)
@ -172,21 +194,11 @@ func (c *ConsumerHandler) shouldPushOffline(_ context.Context, msg *sdkws.MsgDat
} }
func (c *ConsumerHandler) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) ([]*msggateway.SingleMsgToUserResults, error) { func (c *ConsumerHandler) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) ([]*msggateway.SingleMsgToUserResults, error) {
var ( onlineUserIDs, offlineUserIDs, err := c.onlineCache.GetUsersOnline(ctx, pushToUserIDs)
onlineUserIDs []string if err != nil {
offlineUserIDs []string return nil, err
)
for _, userID := range pushToUserIDs {
online, err := c.onlineCache.GetUserOnline(ctx, userID)
if err != nil {
return nil, err
}
if online {
onlineUserIDs = append(onlineUserIDs, userID)
} else {
offlineUserIDs = append(offlineUserIDs, userID)
}
} }
log.ZDebug(ctx, "GetConnsAndOnlinePush online cache", "sendID", msg.SendID, "recvID", msg.RecvID, "groupID", msg.GroupID, "sessionType", msg.SessionType, "clientMsgID", msg.ClientMsgID, "serverMsgID", msg.ServerMsgID, "offlineUserIDs", offlineUserIDs, "onlineUserIDs", onlineUserIDs) log.ZDebug(ctx, "GetConnsAndOnlinePush online cache", "sendID", msg.SendID, "recvID", msg.RecvID, "groupID", msg.GroupID, "sessionType", msg.SessionType, "clientMsgID", msg.ClientMsgID, "serverMsgID", msg.ServerMsgID, "offlineUserIDs", offlineUserIDs, "onlineUserIDs", onlineUserIDs)
var result []*msggateway.SingleMsgToUserResults var result []*msggateway.SingleMsgToUserResults
if len(onlineUserIDs) > 0 { if len(onlineUserIDs) > 0 {
@ -205,35 +217,42 @@ func (c *ConsumerHandler) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.
} }
func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
log.ZDebug(ctx, "Get group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID) log.ZInfo(ctx, "Get group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
defer func(duration time.Time) {
t := time.Since(duration)
log.ZInfo(ctx, "Get group msg from msg_transfer and push msg end", "msg", msg.String(), "groupID", groupID, "time cost", t)
}(time.Now())
var pushToUserIDs []string var pushToUserIDs []string
if err = c.webhookBeforeGroupOnlinePush(ctx, &c.config.WebhooksConfig.BeforeGroupOnlinePush, groupID, msg, if err = c.webhookBeforeGroupOnlinePush(ctx, &c.config.WebhooksConfig.BeforeGroupOnlinePush, groupID, msg,
&pushToUserIDs); err != nil { &pushToUserIDs); err != nil {
return err return err
} }
log.ZInfo(ctx, "webhookBeforeGroupOnlinePush end")
err = c.groupMessagesHandler(ctx, groupID, &pushToUserIDs, msg) err = c.groupMessagesHandler(ctx, groupID, &pushToUserIDs, msg)
if err != nil { if err != nil {
return err return err
} }
log.ZInfo(ctx, "groupMessagesHandler end")
wsResults, err := c.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs) wsResults, err := c.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs)
if err != nil { if err != nil {
return err return err
} }
log.ZDebug(ctx, "group push result", "result", wsResults, "msg", msg) log.ZInfo(ctx, "group push result", "result", wsResults, "msg", msg)
if !c.shouldPushOffline(ctx, msg) { if !c.shouldPushOffline(ctx, msg) {
return nil return nil
} }
needOfflinePushUserIDs := c.onlinePusher.GetOnlinePushFailedUserIDs(ctx, msg, wsResults, &pushToUserIDs) needOfflinePushUserIDs := c.onlinePusher.GetOnlinePushFailedUserIDs(ctx, msg, wsResults, &pushToUserIDs)
log.ZInfo(ctx, "GetOnlinePushFailedUserIDs end")
//filter some user, like don not disturb or don't need offline push etc. //filter some user, like don not disturb or don't need offline push etc.
needOfflinePushUserIDs, err = c.filterGroupMessageOfflinePush(ctx, groupID, msg, needOfflinePushUserIDs) needOfflinePushUserIDs, err = c.filterGroupMessageOfflinePush(ctx, groupID, msg, needOfflinePushUserIDs)
if err != nil { if err != nil {
return err return err
} }
log.ZInfo(ctx, "filterGroupMessageOfflinePush end")
// Use offline push messaging // Use offline push messaging
if len(needOfflinePushUserIDs) > 0 { if len(needOfflinePushUserIDs) > 0 {
@ -295,7 +314,7 @@ func (c *ConsumerHandler) groupMessagesHandler(ctx context.Context, groupID stri
if unmarshalNotificationElem(msg.Content, &tips) != nil { if unmarshalNotificationElem(msg.Content, &tips) != nil {
return err return err
} }
log.ZInfo(ctx, "GroupDismissedNotificationInfo****", "groupID", groupID, "num", len(*pushToUserIDs), "list", pushToUserIDs) log.ZDebug(ctx, "GroupDismissedNotificationInfo****", "groupID", groupID, "num", len(*pushToUserIDs), "list", pushToUserIDs)
if len(c.config.Share.IMAdminUserID) > 0 { if len(c.config.Share.IMAdminUserID) > 0 {
ctx = mcontext.WithOpUserIDContext(ctx, c.config.Share.IMAdminUserID[0]) ctx = mcontext.WithOpUserIDContext(ctx, c.config.Share.IMAdminUserID[0])
} }
@ -379,6 +398,7 @@ func (c *ConsumerHandler) getOfflinePushInfos(msg *sdkws.MsgData) (title, conten
} }
return return
} }
func (c *ConsumerHandler) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID string, userIDs []string) error { func (c *ConsumerHandler) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID string, userIDs []string) error {
conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID) conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID)
maxSeq, err := c.msgRpcClient.GetConversationMaxSeq(ctx, conversationID) maxSeq, err := c.msgRpcClient.GetConversationMaxSeq(ctx, conversationID)
@ -387,6 +407,7 @@ func (c *ConsumerHandler) DeleteMemberAndSetConversationSeq(ctx context.Context,
} }
return c.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conversationID, maxSeq) return c.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conversationID, maxSeq)
} }
func unmarshalNotificationElem(bytes []byte, t any) error { func unmarshalNotificationElem(bytes []byte, t any) error {
var notification sdkws.NotificationElem var notification sdkws.NotificationElem
if err := json.Unmarshal(bytes, &notification); err != nil { if err := json.Unmarshal(bytes, &notification); err != nil {

@ -67,7 +67,7 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
return nil, err return nil, err
} }
log.ZInfo(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start)) log.ZDebug(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
return &msg.ClearMsgResp{}, nil return &msg.ClearMsgResp{}, nil
} }

@ -2,6 +2,8 @@ package user
import ( import (
"context" "context"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
pbuser "github.com/openimsdk/protocol/user" pbuser "github.com/openimsdk/protocol/user"
) )
@ -80,3 +82,22 @@ func (s *userServer) SetUserOnlineStatus(ctx context.Context, req *pbuser.SetUse
} }
return &pbuser.SetUserOnlineStatusResp{}, nil return &pbuser.SetUserOnlineStatusResp{}, nil
} }
func (s *userServer) GetAllOnlineUsers(ctx context.Context, req *pbuser.GetAllOnlineUsersReq) (*pbuser.GetAllOnlineUsersResp, error) {
resMap, nextCursor, err := s.online.GetAllOnlineUsers(ctx, req.Cursor)
if err != nil {
return nil, err
}
resp := &pbuser.GetAllOnlineUsersResp{
StatusList: make([]*pbuser.OnlineStatus, 0, len(resMap)),
NextCursor: nextCursor,
}
for userID, plats := range resMap {
resp.StatusList = append(resp.StatusList, &pbuser.OnlineStatus{
UserID: userID,
Status: int32(datautil.If(len(plats) > 0, constant.Online, constant.Offline)),
PlatformIDs: plats,
})
}
return resp, nil
}

@ -79,13 +79,13 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
now := time.Now() now := time.Now()
deltime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.RetainChatRecords)) deltime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.RetainChatRecords))
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli())) ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli()))
log.ZInfo(ctx, "clear chat records", "deltime", deltime, "timestamp", deltime.UnixMilli()) log.ZDebug(ctx, "clear chat records", "deltime", deltime, "timestamp", deltime.UnixMilli())
if _, err := msgClient.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: deltime.UnixMilli()}); err != nil { if _, err := msgClient.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: deltime.UnixMilli()}); err != nil {
log.ZError(ctx, "cron clear chat records failed", err, "deltime", deltime, "cont", time.Since(now)) log.ZError(ctx, "cron clear chat records failed", err, "deltime", deltime, "cont", time.Since(now))
return return
} }
log.ZInfo(ctx, "cron clear chat records success", "deltime", deltime, "cont", time.Since(now)) log.ZDebug(ctx, "cron clear chat records success", "deltime", deltime, "cont", time.Since(now))
} }
if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, clearMsgFunc); err != nil { if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, clearMsgFunc); err != nil {
return errs.Wrap(err) return errs.Wrap(err)
@ -95,7 +95,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
msgDestructFunc := func() { msgDestructFunc := func() {
now := time.Now() now := time.Now()
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), now.UnixMilli())) ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), now.UnixMilli()))
log.ZInfo(ctx, "msg destruct cron start", "now", now) log.ZDebug(ctx, "msg destruct cron start", "now", now)
conversations, err := conversationClient.GetConversationsNeedDestructMsgs(ctx, &pbconversation.GetConversationsNeedDestructMsgsReq{}) conversations, err := conversationClient.GetConversationsNeedDestructMsgs(ctx, &pbconversation.GetConversationsNeedDestructMsgsReq{})
if err != nil { if err != nil {
@ -108,7 +108,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
return return
} }
} }
log.ZInfo(ctx, "msg destruct cron task completed", "cont", time.Since(now)) log.ZDebug(ctx, "msg destruct cron task completed", "cont", time.Since(now))
} }
if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, msgDestructFunc); err != nil { if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, msgDestructFunc); err != nil {
return errs.Wrap(err) return errs.Wrap(err)
@ -119,18 +119,18 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
// now := time.Now() // now := time.Now()
// deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime)) // deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime))
// ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli())) // ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli()))
// log.ZInfo(ctx, "deleteoutDatedData ", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli()) // log.ZDebug(ctx, "deleteoutDatedData ", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli())
// if _, err := thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli()}); err != nil { // if _, err := thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli()}); err != nil {
// log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(now)) // log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(now))
// return // return
// } // }
// log.ZInfo(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now)) // log.ZDebug(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now))
// } // }
// if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteObjectFunc); err != nil { // if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteObjectFunc); err != nil {
// return errs.Wrap(err) // return errs.Wrap(err)
// } // }
log.ZInfo(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime) log.ZDebug(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime)
crontab.Start() crontab.Start()
<-ctx.Done() <-ctx.Done()
return nil return nil

@ -100,8 +100,9 @@ type TLSConfig struct {
type API struct { type API struct {
Api struct { Api struct {
ListenIP string `mapstructure:"listenIP"` ListenIP string `mapstructure:"listenIP"`
Ports []int `mapstructure:"ports"` Ports []int `mapstructure:"ports"`
CompressionLevel int `mapstructure:"compressionLevel"`
} `mapstructure:"api"` } `mapstructure:"api"`
Prometheus struct { Prometheus struct {
Enable bool `mapstructure:"enable"` Enable bool `mapstructure:"enable"`
@ -223,6 +224,7 @@ type Push struct {
BadgeCount bool `mapstructure:"badgeCount"` BadgeCount bool `mapstructure:"badgeCount"`
Production bool `mapstructure:"production"` Production bool `mapstructure:"production"`
} `mapstructure:"iosPush"` } `mapstructure:"iosPush"`
FullUserCache bool `mapstructure:"fullUserCache"`
} }
type Auth struct { type Auth struct {

@ -54,15 +54,11 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo
log.CInfo(ctx, "RPC server is initializing", "rpcRegisterName", rpcRegisterName, "rpcPort", rpcPort, log.CInfo(ctx, "RPC server is initializing", "rpcRegisterName", rpcRegisterName, "rpcPort", rpcPort,
"prometheusPorts", prometheusConfig.Ports) "prometheusPorts", prometheusConfig.Ports)
rpcTcpAddr := net.JoinHostPort(network.GetListenIP(listenIP), strconv.Itoa(rpcPort)) rpcTcpAddr := net.JoinHostPort(network.GetListenIP(listenIP), strconv.Itoa(rpcPort))
listener, err := net.Listen( listener, err := net.Listen(
"tcp", "tcp",
rpcTcpAddr, rpcTcpAddr,
) )
if err != nil {
return errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr)
}
defer listener.Close()
client, err := kdisc.NewDiscoveryRegister(discovery, share) client, err := kdisc.NewDiscoveryRegister(discovery, share)
if err != nil { if err != nil {
return err return err

@ -1,6 +1,9 @@
package cachekey package cachekey
import "time" import (
"strings"
"time"
)
const ( const (
OnlineKey = "ONLINE:" OnlineKey = "ONLINE:"
@ -11,3 +14,7 @@ const (
func GetOnlineKey(userID string) string { func GetOnlineKey(userID string) string {
return OnlineKey + userID return OnlineKey + userID
} }
func GetOnlineKeyUserID(key string) string {
return strings.TrimPrefix(key, OnlineKey)
}

@ -5,4 +5,5 @@ import "context"
type OnlineCache interface { type OnlineCache interface {
GetOnline(ctx context.Context, userID string) ([]int32, error) GetOnline(ctx context.Context, userID string) ([]int32, error)
SetUserOnline(ctx context.Context, userID string, online, offline []int32) error SetUserOnline(ctx context.Context, userID string, online, offline []int32) error
GetAllOnlineUsers(ctx context.Context, cursor uint64) (map[string][]int32, uint64, error)
} }

@ -4,6 +4,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"github.com/dtm-labs/rockscache" "github.com/dtm-labs/rockscache"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"golang.org/x/sync/singleflight" "golang.org/x/sync/singleflight"
@ -65,6 +66,7 @@ func batchGetCache2[K comparable, V any](ctx context.Context, rcClient *rockscac
} }
bs, err := json.Marshal(value) bs, err := json.Marshal(value)
if err != nil { if err != nil {
log.ZError(ctx, "marshal failed", err)
return nil, err return nil, err
} }
cacheIndex[index] = string(bs) cacheIndex[index] = string(bs)
@ -72,7 +74,7 @@ func batchGetCache2[K comparable, V any](ctx context.Context, rcClient *rockscac
return cacheIndex, nil return cacheIndex, nil
}) })
if err != nil { if err != nil {
return nil, err return nil, errs.WrapMsg(err, "FetchBatch2 failed")
} }
for index, data := range indexCache { for index, data := range indexCache {
if data == "" { if data == "" {
@ -80,7 +82,7 @@ func batchGetCache2[K comparable, V any](ctx context.Context, rcClient *rockscac
} }
var value V var value V
if err := json.Unmarshal([]byte(data), &value); err != nil { if err := json.Unmarshal([]byte(data), &value); err != nil {
return nil, err return nil, errs.WrapMsg(err, "Unmarshal failed")
} }
if cb, ok := any(&value).(BatchCacheCallback[K]); ok { if cb, ok := any(&value).(BatchCacheCallback[K]); ok {
cb.BatchCache(keyId[keys[index]]) cb.BatchCache(keyId[keys[index]])

@ -28,6 +28,10 @@ import (
"time" "time"
) )
const (
rocksCacheTimeout = 11 * time.Second
)
// BatchDeleterRedis is a concrete implementation of the BatchDeleter interface based on Redis and RocksCache. // BatchDeleterRedis is a concrete implementation of the BatchDeleter interface based on Redis and RocksCache.
type BatchDeleterRedis struct { type BatchDeleterRedis struct {
redisClient redis.UniversalClient redisClient redis.UniversalClient
@ -106,6 +110,8 @@ func (c *BatchDeleterRedis) AddKeys(keys ...string) {
// GetRocksCacheOptions returns the default configuration options for RocksCache. // GetRocksCacheOptions returns the default configuration options for RocksCache.
func GetRocksCacheOptions() *rockscache.Options { func GetRocksCacheOptions() *rockscache.Options {
opts := rockscache.NewDefaultOptions() opts := rockscache.NewDefaultOptions()
opts.LockExpire = rocksCacheTimeout
opts.WaitReplicasTimeout = rocksCacheTimeout
opts.StrongConsistency = true opts.StrongConsistency = true
opts.RandomExpireAdjustment = 0.2 opts.RandomExpireAdjustment = 0.2

@ -2,8 +2,10 @@ package redis
import ( import (
"context" "context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
@ -49,6 +51,36 @@ func (s *userOnline) GetOnline(ctx context.Context, userID string) ([]int32, err
return platformIDs, nil return platformIDs, nil
} }
func (s *userOnline) GetAllOnlineUsers(ctx context.Context, cursor uint64) (map[string][]int32, uint64, error) {
result := make(map[string][]int32)
keys, nextCursor, err := s.rdb.Scan(ctx, cursor, fmt.Sprintf("%s*", cachekey.OnlineKey), constant.ParamMaxLength).Result()
if err != nil {
return nil, 0, err
}
for _, key := range keys {
userID := cachekey.GetOnlineKeyUserID(key)
strValues, err := s.rdb.ZRange(ctx, key, 0, -1).Result()
if err != nil {
return nil, 0, err
}
values := make([]int32, 0, len(strValues))
for _, value := range strValues {
intValue, err := strconv.Atoi(value)
if err != nil {
return nil, 0, errs.Wrap(err)
}
values = append(values, int32(intValue))
}
result[userID] = values
}
return result, nextCursor, nil
}
func (s *userOnline) SetUserOnline(ctx context.Context, userID string, online, offline []int32) error { func (s *userOnline) SetUserOnline(ctx context.Context, userID string, online, offline []int32) error {
script := ` script := `
local key = KEYS[1] local key = KEYS[1]

@ -20,7 +20,9 @@ type EvictCallback[K comparable, V any] simplelru.EvictCallback[K, V]
type LRU[K comparable, V any] interface { type LRU[K comparable, V any] interface {
Get(key K, fetch func() (V, error)) (V, error) Get(key K, fetch func() (V, error)) (V, error)
Set(key K, value V)
SetHas(key K, value V) bool SetHas(key K, value V) bool
GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error)
Del(key K) bool Del(key K) bool
Stop() Stop()
} }

@ -51,6 +51,11 @@ type ExpirationLRU[K comparable, V any] struct {
target Target target Target
} }
func (x *ExpirationLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) {
//TODO implement me
panic("implement me")
}
func (x *ExpirationLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { func (x *ExpirationLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
x.lock.Lock() x.lock.Lock()
v, ok := x.core.Get(key) v, ok := x.core.Get(key)
@ -99,5 +104,11 @@ func (x *ExpirationLRU[K, V]) SetHas(key K, value V) bool {
return false return false
} }
func (x *ExpirationLRU[K, V]) Set(key K, value V) {
x.lock.Lock()
defer x.lock.Unlock()
x.core.Add(key, &expirationLruItem[V]{value: value})
}
func (x *ExpirationLRU[K, V]) Stop() { func (x *ExpirationLRU[K, V]) Stop() {
} }

@ -88,18 +88,75 @@ func (x *LayLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
return v.value, v.err return v.value, v.err
} }
//func (x *LayLRU[K, V]) Set(key K, value V) { func (x *LayLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) {
// x.lock.Lock() var (
// x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()}) err error
// x.lock.Unlock() once sync.Once
//} )
//
x.lock.Lock()
res := make(map[K]V)
queries := make([]K, 0)
setVs := make(map[K]*layLruItem[V])
for _, key := range keys {
v, ok := x.core.Get(key)
x.lock.Unlock()
if ok {
v.lock.Lock()
expires, value, err1 := v.expires, v.value, v.err
v.lock.Unlock()
if expires != 0 && expires > time.Now().UnixMilli() {
x.target.IncrGetHit()
res[key] = value
if err1 != nil {
once.Do(func() {
err = err1
})
}
continue
}
}
queries = append(queries, key)
}
values, err1 := fetch(queries)
if err1 != nil {
once.Do(func() {
err = err1
})
}
for key, val := range values {
v := &layLruItem[V]{}
v.value = val
if err == nil {
v.expires = time.Now().Add(x.successTTL).UnixMilli()
x.target.IncrGetSuccess()
} else {
v.expires = time.Now().Add(x.failedTTL).UnixMilli()
x.target.IncrGetFailed()
}
setVs[key] = v
x.lock.Lock()
x.core.Add(key, v)
x.lock.Unlock()
res[key] = val
}
return res, err
}
//func (x *LayLRU[K, V]) Has(key K) bool { //func (x *LayLRU[K, V]) Has(key K) bool {
// x.lock.Lock() // x.lock.Lock()
// defer x.lock.Unlock() // defer x.lock.Unlock()
// return x.core.Contains(key) // return x.core.Contains(key)
//} //}
func (x *LayLRU[K, V]) Set(key K, value V) {
x.lock.Lock()
defer x.lock.Unlock()
x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()})
}
func (x *LayLRU[K, V]) SetHas(key K, value V) bool { func (x *LayLRU[K, V]) SetHas(key K, value V) bool {
x.lock.Lock() x.lock.Lock()
defer x.lock.Unlock() defer x.lock.Unlock()

@ -32,6 +32,29 @@ type slotLRU[K comparable, V any] struct {
hash func(k K) uint64 hash func(k K) uint64
} }
func (x *slotLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) {
var (
slotKeys = make(map[uint64][]K)
vs = make(map[K]V)
)
for _, k := range keys {
index := x.getIndex(k)
slotKeys[index] = append(slotKeys[index], k)
}
for k, v := range slotKeys {
batches, err := x.slots[k].GetBatch(v, fetch)
if err != nil {
return nil, err
}
for key, value := range batches {
vs[key] = value
}
}
return vs, nil
}
func (x *slotLRU[K, V]) getIndex(k K) uint64 { func (x *slotLRU[K, V]) getIndex(k K) uint64 {
return x.hash(k) % x.n return x.hash(k) % x.n
} }
@ -40,6 +63,10 @@ func (x *slotLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
return x.slots[x.getIndex(key)].Get(key, fetch) return x.slots[x.getIndex(key)].Get(key, fetch)
} }
func (x *slotLRU[K, V]) Set(key K, value V) {
x.slots[x.getIndex(key)].Set(key, value)
}
func (x *slotLRU[K, V]) SetHas(key K, value V) bool { func (x *slotLRU[K, V]) SetHas(key K, value V) bool {
return x.slots[x.getIndex(key)].SetHas(key, value) return x.slots[x.getIndex(key)].SetHas(key, value)
} }

@ -2,60 +2,197 @@ package rpccache
import ( import (
"context" "context"
"fmt"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/user"
"math/rand"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/localcache" "github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/localcache/lru" "github.com/openimsdk/open-im-server/v3/pkg/localcache/lru"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/open-im-server/v3/pkg/util/useronline" "github.com/openimsdk/open-im-server/v3/pkg/util/useronline"
"github.com/openimsdk/tools/db/cacheutil"
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mcontext"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"math/rand"
"strconv"
"time"
) )
func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb redis.UniversalClient, fn func(ctx context.Context, userID string, platformIDs []int32)) *OnlineCache { func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb redis.UniversalClient, fullUserCache bool, fn func(ctx context.Context, userID string, platformIDs []int32)) (*OnlineCache, error) {
l := &sync.Mutex{}
x := &OnlineCache{ x := &OnlineCache{
user: user, user: user,
group: group, group: group,
local: lru.NewSlotLRU(1024, localcache.LRUStringHash, func() lru.LRU[string, []int32] { fullUserCache: fullUserCache,
Lock: l,
Cond: sync.NewCond(l),
}
ctx := mcontext.SetOperationID(context.TODO(), strconv.FormatInt(time.Now().UnixNano()+int64(rand.Uint32()), 10))
switch x.fullUserCache {
case true:
log.ZDebug(ctx, "fullUserCache is true")
x.mapCache = cacheutil.NewCache[string, []int32]()
go func() {
if err := x.initUsersOnlineStatus(ctx); err != nil {
log.ZError(ctx, "initUsersOnlineStatus failed", err)
}
}()
case false:
log.ZDebug(ctx, "fullUserCache is false")
x.lruCache = lru.NewSlotLRU(1024, localcache.LRUStringHash, func() lru.LRU[string, []int32] {
return lru.NewLayLRU[string, []int32](2048, cachekey.OnlineExpire/2, time.Second*3, localcache.EmptyTarget{}, func(key string, value []int32) {}) return lru.NewLayLRU[string, []int32](2048, cachekey.OnlineExpire/2, time.Second*3, localcache.EmptyTarget{}, func(key string, value []int32) {})
}), })
x.CurrentPhase.Store(DoSubscribeOver)
x.Cond.Broadcast()
} }
go func() { go func() {
ctx := mcontext.SetOperationID(context.Background(), cachekey.OnlineChannel+strconv.FormatUint(rand.Uint64(), 10)) x.doSubscribe(ctx, rdb, fn)
for message := range rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() { }()
userID, platformIDs, err := useronline.ParseUserOnlineStatus(message.Payload) return x, nil
}
const (
Begin uint32 = iota
DoOnlineStatusOver
DoSubscribeOver
)
type OnlineCache struct {
user rpcclient.UserRpcClient
group *GroupLocalCache
// fullUserCache if enabled, caches the online status of all users using mapCache;
// otherwise, only a portion of users' online statuses (regardless of whether they are online) will be cached using lruCache.
fullUserCache bool
lruCache lru.LRU[string, []int32]
mapCache *cacheutil.Cache[string, []int32]
Lock *sync.Mutex
Cond *sync.Cond
CurrentPhase atomic.Uint32
}
func (o *OnlineCache) initUsersOnlineStatus(ctx context.Context) (err error) {
log.ZDebug(ctx, "init users online status begin")
var (
totalSet atomic.Int64
maxTries = 5
retryInterval = time.Second * 5
resp *user.GetAllOnlineUsersResp
)
defer func(t time.Time) {
log.ZInfo(ctx, "init users online status end", "cost", time.Since(t), "totalSet", totalSet.Load())
o.CurrentPhase.Store(DoOnlineStatusOver)
o.Cond.Broadcast()
}(time.Now())
retryOperation := func(operation func() error, operationName string) error {
for i := 0; i < maxTries; i++ {
if err = operation(); err != nil {
log.ZWarn(ctx, fmt.Sprintf("initUsersOnlineStatus: %s failed", operationName), err)
time.Sleep(retryInterval)
} else {
return nil
}
}
return err
}
cursor := uint64(0)
for resp == nil || resp.NextCursor != 0 {
if err = retryOperation(func() error {
resp, err = o.user.GetAllOnlineUsers(ctx, cursor)
if err != nil { if err != nil {
log.ZError(ctx, "OnlineCache setUserOnline redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel) return err
continue }
for _, u := range resp.StatusList {
if u.Status == constant.Online {
o.setUserOnline(u.UserID, u.PlatformIDs)
}
totalSet.Add(1)
} }
storageCache := x.setUserOnline(userID, platformIDs) cursor = resp.NextCursor
log.ZDebug(ctx, "OnlineCache setUserOnline", "userID", userID, "platformIDs", platformIDs, "payload", message.Payload, "storageCache", storageCache) return nil
}, "getAllOnlineUsers"); err != nil {
return err
}
}
return nil
}
func (o *OnlineCache) doSubscribe(ctx context.Context, rdb redis.UniversalClient, fn func(ctx context.Context, userID string, platformIDs []int32)) {
o.Lock.Lock()
ch := rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel()
for o.CurrentPhase.Load() < DoOnlineStatusOver {
o.Cond.Wait()
}
o.Lock.Unlock()
log.ZInfo(ctx, "begin doSubscribe")
doMessage := func(message *redis.Message) {
userID, platformIDs, err := useronline.ParseUserOnlineStatus(message.Payload)
if err != nil {
log.ZError(ctx, "OnlineCache setHasUserOnline redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel)
return
}
log.ZDebug(ctx, fmt.Sprintf("get subscribe %s message", cachekey.OnlineChannel), "useID", userID, "platformIDs", platformIDs)
switch o.fullUserCache {
case true:
if len(platformIDs) == 0 {
// offline
o.mapCache.Delete(userID)
} else {
o.mapCache.Store(userID, platformIDs)
}
case false:
storageCache := o.setHasUserOnline(userID, platformIDs)
log.ZDebug(ctx, "OnlineCache setHasUserOnline", "userID", userID, "platformIDs", platformIDs, "payload", message.Payload, "storageCache", storageCache)
if fn != nil { if fn != nil {
fn(ctx, userID, platformIDs) fn(ctx, userID, platformIDs)
} }
} }
}() }
return x
}
type OnlineCache struct { if o.CurrentPhase.Load() == DoOnlineStatusOver {
user rpcclient.UserRpcClient for done := false; !done; {
group *GroupLocalCache select {
local lru.LRU[string, []int32] case message := <-ch:
doMessage(message)
default:
o.CurrentPhase.Store(DoSubscribeOver)
o.Cond.Broadcast()
done = true
}
}
}
for message := range ch {
doMessage(message)
}
} }
func (o *OnlineCache) getUserOnlinePlatform(ctx context.Context, userID string) ([]int32, error) { func (o *OnlineCache) getUserOnlinePlatform(ctx context.Context, userID string) ([]int32, error) {
platformIDs, err := o.local.Get(userID, func() ([]int32, error) { platformIDs, err := o.lruCache.Get(userID, func() ([]int32, error) {
return o.user.GetUserOnlinePlatform(ctx, userID) return o.user.GetUserOnlinePlatform(ctx, userID)
}) })
if err != nil { if err != nil {
log.ZError(ctx, "OnlineCache GetUserOnlinePlatform", err, "userID", userID) log.ZError(ctx, "OnlineCache GetUserOnlinePlatform", err, "userID", userID)
return nil, err return nil, err
} }
log.ZDebug(ctx, "OnlineCache GetUserOnlinePlatform", "userID", userID, "platformIDs", platformIDs) //log.ZDebug(ctx, "OnlineCache GetUserOnlinePlatform", "userID", userID, "platformIDs", platformIDs)
return platformIDs, nil return platformIDs, nil
} }
@ -69,6 +206,16 @@ func (o *OnlineCache) GetUserOnlinePlatform(ctx context.Context, userID string)
return platformIDs, nil return platformIDs, nil
} }
// func (o *OnlineCache) GetUserOnlinePlatformBatch(ctx context.Context, userIDs []string) (map[string]int32, error) {
// platformIDs, err := o.getUserOnlinePlatform(ctx, userIDs)
// if err != nil {
// return nil, err
// }
// tmp := make([]int32, len(platformIDs))
// copy(tmp, platformIDs)
// return platformIDs, nil
// }
func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, error) { func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, error) {
platformIDs, err := o.getUserOnlinePlatform(ctx, userID) platformIDs, err := o.getUserOnlinePlatform(ctx, userID)
if err != nil { if err != nil {
@ -77,10 +224,68 @@ func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, e
return len(platformIDs) > 0, nil return len(platformIDs) > 0, nil
} }
func (o *OnlineCache) getUserOnlinePlatformBatch(ctx context.Context, userIDs []string) (map[string][]int32, error) {
platformIDsMap, err := o.lruCache.GetBatch(userIDs, func(missingUsers []string) (map[string][]int32, error) {
platformIDsMap := make(map[string][]int32)
usersStatus, err := o.user.GetUsersOnlinePlatform(ctx, missingUsers)
if err != nil {
return nil, err
}
for _, u := range usersStatus {
platformIDsMap[u.UserID] = u.PlatformIDs
}
return platformIDsMap, nil
})
if err != nil {
log.ZError(ctx, "OnlineCache GetUserOnlinePlatform", err, "userID", userIDs)
return nil, err
}
return platformIDsMap, nil
}
func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]string, []string, error) {
t := time.Now()
var (
onlineUserIDs = make([]string, 0, len(userIDs))
offlineUserIDs = make([]string, 0, len(userIDs))
)
switch o.fullUserCache {
case true:
for _, userID := range userIDs {
if _, ok := o.mapCache.Load(userID); ok {
onlineUserIDs = append(onlineUserIDs, userID)
} else {
offlineUserIDs = append(offlineUserIDs, userID)
}
}
case false:
userOnlineMap, err := o.getUserOnlinePlatformBatch(ctx, userIDs)
if err != nil {
return nil, nil, err
}
for key, value := range userOnlineMap {
if len(value) > 0 {
onlineUserIDs = append(onlineUserIDs, key)
} else {
offlineUserIDs = append(offlineUserIDs, key)
}
}
}
log.ZInfo(ctx, "get users online", "online users length", len(userIDs), "offline users length", len(offlineUserIDs), "cost", time.Since(t))
return userIDs, offlineUserIDs, nil
}
//func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]string, error) { //func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]string, error) {
// onlineUserIDs := make([]string, 0, len(userIDs)) // onlineUserIDs := make([]string, 0, len(userIDs))
// for _, userID := range userIDs { // for _, userID := range userIDs {
// online, err := o.GetUserOnline(ctx, userID) // online, err := o.GetUserOnline(ctx, userID)
// if err != nil { // if err != nil {
// return nil, err // return nil, err
// } // }
@ -111,6 +316,15 @@ func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, e
// return onlineUserIDs, nil // return onlineUserIDs, nil
//} //}
func (o *OnlineCache) setUserOnline(userID string, platformIDs []int32) bool { func (o *OnlineCache) setUserOnline(userID string, platformIDs []int32) {
return o.local.SetHas(userID, platformIDs) switch o.fullUserCache {
case true:
o.mapCache.Store(userID, platformIDs)
case false:
o.lruCache.Set(userID, platformIDs)
}
}
func (o *OnlineCache) setHasUserOnline(userID string, platformIDs []int32) bool {
return o.lruCache.SetHas(userID, platformIDs)
} }

@ -17,6 +17,11 @@ package rpcclient
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"time"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/msg"
@ -28,9 +33,6 @@ import (
"github.com/openimsdk/tools/utils/idutil" "github.com/openimsdk/tools/utils/idutil"
"github.com/openimsdk/tools/utils/jsonutil" "github.com/openimsdk/tools/utils/jsonutil"
"github.com/openimsdk/tools/utils/timeutil" "github.com/openimsdk/tools/utils/timeutil"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
"time"
) )
func newContentTypeConf(conf *config.Notification) map[int32]config.NotificationConfig { func newContentTypeConf(conf *config.Notification) map[int32]config.NotificationConfig {
@ -221,6 +223,15 @@ func (m *MessageRpcClient) PullMessageBySeqList(ctx context.Context, req *sdkws.
return resp, nil return resp, nil
} }
func (m *MessageRpcClient) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *msg.GetConversationsHasReadAndMaxSeqReq) (*msg.GetConversationsHasReadAndMaxSeqResp, error) {
resp, err := m.Client.GetConversationsHasReadAndMaxSeq(ctx, req)
if err != nil {
// Wrap the error to provide more context if the gRPC call fails.
return nil, err
}
return resp, nil
}
func (m *MessageRpcClient) GetSeqMessage(ctx context.Context, req *msg.GetSeqMessageReq) (*msg.GetSeqMessageResp, error) { func (m *MessageRpcClient) GetSeqMessage(ctx context.Context, req *msg.GetSeqMessageReq) (*msg.GetSeqMessageResp, error) {
return m.Client.GetSeqMessage(ctx, req) return m.Client.GetSeqMessage(ctx, req)
} }

@ -169,6 +169,15 @@ func (u *UserRpcClient) Access(ctx context.Context, ownerUserID string) error {
return authverify.CheckAccessV3(ctx, ownerUserID, u.imAdminUserID) return authverify.CheckAccessV3(ctx, ownerUserID, u.imAdminUserID)
} }
// GetAllUserID retrieves all user IDs with pagination options.
func (u *UserRpcClient) GetAllUserID(ctx context.Context, pageNumber, showNumber int32) (*user.GetAllUserIDResp, error) {
resp, err := u.Client.GetAllUserID(ctx, &user.GetAllUserIDReq{Pagination: &sdkws.RequestPagination{PageNumber: pageNumber, ShowNumber: showNumber}})
if err != nil {
return nil, err
}
return resp, nil
}
// GetAllUserIDs retrieves all user IDs with pagination options. // GetAllUserIDs retrieves all user IDs with pagination options.
func (u *UserRpcClient) GetAllUserIDs(ctx context.Context, pageNumber, showNumber int32) ([]string, error) { func (u *UserRpcClient) GetAllUserIDs(ctx context.Context, pageNumber, showNumber int32) ([]string, error) {
resp, err := u.Client.GetAllUserID(ctx, &user.GetAllUserIDReq{Pagination: &sdkws.RequestPagination{PageNumber: pageNumber, ShowNumber: showNumber}}) resp, err := u.Client.GetAllUserID(ctx, &user.GetAllUserIDReq{Pagination: &sdkws.RequestPagination{PageNumber: pageNumber, ShowNumber: showNumber}})
@ -215,3 +224,7 @@ func (u *UserRpcClient) GetUserOnlinePlatform(ctx context.Context, userID string
} }
return resp[0].PlatformIDs, nil return resp[0].PlatformIDs, nil
} }
func (u *UserRpcClient) GetAllOnlineUsers(ctx context.Context, cursor uint64) (*user.GetAllOnlineUsersResp, error) {
return u.Client.GetAllOnlineUsers(ctx, &user.GetAllOnlineUsersReq{Cursor: cursor})
}

@ -3,8 +3,8 @@ serviceBinaries:
openim-crontask: 1 openim-crontask: 1
openim-rpc-user: 1 openim-rpc-user: 1
openim-msggateway: 1 openim-msggateway: 1
openim-push: 4 openim-push: 8
openim-msgtransfer: 4 openim-msgtransfer: 8
openim-rpc-conversation: 1 openim-rpc-conversation: 1
openim-rpc-auth: 1 openim-rpc-auth: 1
openim-rpc-group: 1 openim-rpc-group: 1

Loading…
Cancel
Save