Merge pull request #3625 from icey-yu/merge-main

Merge main
pre-release-v3.8.4
icey-yu 1 week ago committed by GitHub
commit 79464bae17
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -4,42 +4,80 @@ on:
push: push:
branches: branches:
- release-* - release-*
# tags:
# - 'v*'
release: release:
types: [published] types: [published]
workflow_dispatch: workflow_dispatch:
inputs: inputs:
tag: tag:
description: "Tag version to be used for Docker image" description: "Tag version to be used for Docker image"
required: true required: true
default: "v3.8.0" default: "v3.8.3"
env:
GO_VERSION: "1.22"
IMAGE_NAME: "openim-server"
# IMAGE_NAME: ${{ github.event.repository.name }}
DOCKER_BUILDKIT: 1
jobs: jobs:
build-and-test: publish-docker-images:
runs-on: ubuntu-latest runs-on: ubuntu-latest
if: ${{ !(github.event_name == 'pull_request' && github.event.pull_request.merged == false) }}
steps: steps:
- uses: actions/checkout@v4 - name: Checkout main repository
uses: actions/checkout@v4
with: with:
path: main-repo path: main-repo
# - name: Set up QEMU - name: Set up QEMU
# uses: docker/setup-qemu-action@v3.3.0 uses: docker/setup-qemu-action@v3.3.0
- name: Set up Docker Buildx - name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3.8.0 id: buildx
uses: docker/setup-buildx-action@v3
with:
driver-opts: network=host
- name: Build Docker image - name: Extract metadata for Docker
id: build id: meta
uses: docker/build-push-action@v5 uses: docker/metadata-action@v5.6.0
with: with:
context: ./main-repo images: |
load: true ${{ secrets.DOCKER_USERNAME }}/${{ env.IMAGE_NAME }}
tags: "openim/openim-server:local" ghcr.io/${{ github.repository_owner }}/${{ env.IMAGE_NAME }}
cache-from: type=gha,scope=build registry.cn-hangzhou.aliyuncs.com/openimsdk/${{ env.IMAGE_NAME }}
cache-to: type=gha,mode=max,scope=build tags: |
type=ref,event=tag
type=schedule
type=ref,event=branch
type=ref,event=pr
type=semver,pattern={{version}}
type=semver,pattern=v{{version}}
type=semver,pattern={{major}}.{{minor}}
type=semver,pattern={{major}}
type=sha
- name: Install skopeo
run: |
sudo apt-get update && sudo apt-get install -y skopeo
- name: Build multi-arch images as OCI
run: |
mkdir -p /tmp/oci-image /tmp/docker-cache
# Build multi-architecture image and save in OCI format
docker buildx build \
--platform linux/amd64,linux/arm64 \
--output type=oci,dest=/tmp/oci-image/multi-arch.tar \
--cache-to type=local,dest=/tmp/docker-cache \
--cache-from type=gha \
./main-repo
# Use skopeo to convert the amd64 image from OCI format to Docker format and load it
skopeo copy --override-arch amd64 oci-archive:/tmp/oci-image/multi-arch.tar docker-daemon:${{ secrets.DOCKER_USERNAME }}/${{ env.IMAGE_NAME }}:local
# check image
docker image ls | grep openim
- name: Checkout compose repository - name: Checkout compose repository
uses: actions/checkout@v4 uses: actions/checkout@v4
@ -52,11 +90,11 @@ jobs:
run: | run: |
IP=$(hostname -I | awk '{print $1}') IP=$(hostname -I | awk '{print $1}')
echo "The IP Address is: $IP" echo "The IP Address is: $IP"
echo "::set-output name=ip::$IP" echo "ip=$IP" >> $GITHUB_OUTPUT
- name: Update .env to use the local image - name: Update .env to use the local image
run: | run: |
sed -i 's|OPENIM_SERVER_IMAGE=.*|OPENIM_SERVER_IMAGE=openim/openim-server:local|' ${{ github.workspace }}/compose-repo/.env sed -i 's|OPENIM_SERVER_IMAGE=.*|OPENIM_SERVER_IMAGE=${{ secrets.DOCKER_USERNAME }}/${{ env.IMAGE_NAME }}:local|' ${{ github.workspace }}/compose-repo/.env
sed -i 's|MINIO_EXTERNAL_ADDRESS=.*|MINIO_EXTERNAL_ADDRESS=http://${{ steps.get-ip.outputs.ip }}:10005|' ${{ github.workspace }}/compose-repo/.env sed -i 's|MINIO_EXTERNAL_ADDRESS=.*|MINIO_EXTERNAL_ADDRESS=http://${{ steps.get-ip.outputs.ip }}:10005|' ${{ github.workspace }}/compose-repo/.env
- name: Start services using Docker Compose - name: Start services using Docker Compose
@ -66,23 +104,34 @@ jobs:
docker compose ps docker compose ps
- name: Extract metadata for Docker (tags, labels) # - name: Check openim-server health
id: meta # run: |
uses: docker/metadata-action@v5.6.0 # timeout=300
with: # interval=30
images: | # elapsed=0
openim/openim-server # while [[ $elapsed -le $timeout ]]; do
ghcr.io/openimsdk/openim-server # if ! docker exec openim-server mage check; then
registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-server # echo "openim-server is not ready, waiting..."
tags: | # sleep $interval
type=ref,event=tag # elapsed=$(($elapsed + $interval))
type=schedule # else
type=ref,event=branch # echo "Health check successful"
# type=semver,pattern={{version}} # exit 0
type=semver,pattern=v{{version}} # fi
type=semver,pattern=release-{{raw}} # done
type=sha # echo "Health check failed after 5 minutes"
type=raw,value=${{ github.event.inputs.tag }} # exit 1
# - name: Check openim-chat health
# if: success()
# run: |
# if ! docker exec openim-chat mage check; then
# echo "openim-chat check failed"
# exit 1
# else
# echo "Health check successful"
# exit 0
# fi
- name: Log in to Docker Hub - name: Log in to Docker Hub
uses: docker/login-action@v3.3.0 uses: docker/login-action@v3.3.0
@ -104,22 +153,27 @@ jobs:
username: ${{ secrets.ALIREGISTRY_USERNAME }} username: ${{ secrets.ALIREGISTRY_USERNAME }}
password: ${{ secrets.ALIREGISTRY_TOKEN }} password: ${{ secrets.ALIREGISTRY_TOKEN }}
- name: Push Docker images - name: Push multi-architecture images
uses: docker/build-push-action@v5 if: success()
with: run: |
context: ./main-repo docker buildx build \
push: true --platform linux/amd64,linux/arm64 \
platforms: linux/amd64,linux/arm64 $(echo "${{ steps.meta.outputs.tags }}" | sed 's/,/ --tag /g' | sed 's/^/--tag /') \
tags: ${{ steps.meta.outputs.tags }} --cache-from type=local,src=/tmp/docker-cache \
labels: ${{ steps.meta.outputs.labels }} --push \
cache-from: type=gha,scope=build ./main-repo
cache-to: type=gha,mode=max,scope=build
- name: Verify multi-platform support - name: Verify multi-platform support
run: | run: |
images=("openim/openim-server" "ghcr.io/openimsdk/openim-server" "registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-server") images=(
"${{ secrets.DOCKER_USERNAME }}/${{ env.IMAGE_NAME }}"
"ghcr.io/${{ github.repository_owner }}/${{ env.IMAGE_NAME }}"
"registry.cn-hangzhou.aliyuncs.com/openimsdk/${{ env.IMAGE_NAME }}"
)
for image in "${images[@]}"; do for image in "${images[@]}"; do
for tag in $(echo "${{ steps.meta.outputs.tags }}" | tr ',' '\n'); do for tag in $(echo "${{ steps.meta.outputs.tags }}" | tr ',' '\n' | cut -d':' -f2); do
echo "Verifying multi-arch support for $image:$tag"
manifest=$(docker manifest inspect "$image:$tag" || echo "error") manifest=$(docker manifest inspect "$image:$tag" || echo "error")
if [[ "$manifest" == "error" ]]; then if [[ "$manifest" == "error" ]]; then
echo "Manifest not found for $image:$tag" echo "Manifest not found for $image:$tag"
@ -135,5 +189,6 @@ jobs:
echo "Multi-platform support check failed for $image:$tag - missing arm64" echo "Multi-platform support check failed for $image:$tag - missing arm64"
exit 1 exit 1
fi fi
echo "✅ $image:$tag supports both amd64 and arm64 architectures"
done done
done done

@ -1,3 +1,10 @@
auth:
topic: DELETE_CACHE_AUTH
slotNum: 100
slotSize: 2000
successExpire: 300
failedExpire: 5
user: user:
topic: DELETE_CACHE_USER topic: DELETE_CACHE_USER
slotNum: 100 slotNum: 100

@ -17,3 +17,13 @@ prometheus:
ports: ports:
# This address can be accessed via a browser # This address can be accessed via a browser
grafanaURL: grafanaURL:
ratelimiter:
# Whether to enable rate limiting
enable: false
# WindowSize defines time duration per window
window: 20s
# BucketNum defines bucket number for each window
bucket: 500
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 850

@ -26,3 +26,20 @@ longConnSvr:
websocketMaxMsgLen: 4096 websocketMaxMsgLen: 4096
# WebSocket connection handshake timeout in seconds # WebSocket connection handshake timeout in seconds
websocketTimeout: 10 websocketTimeout: 10
ratelimiter:
# Whether to enable rate limiting
enable: false
# WindowSize defines time duration per window
window: 20s
# BucketNum defines bucket number for each window
bucket: 500
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 850
circuitBreaker:
enable: false
window: 5s # Time window size (seconds)
bucket: 100 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
request: 500 # Request threshold; circuit breaker evaluation occurs when reached

@ -6,3 +6,20 @@ prometheus:
# List of ports that Prometheus listens on; each port corresponds to an instance of monitoring. Ensure these are managed accordingly # List of ports that Prometheus listens on; each port corresponds to an instance of monitoring. Ensure these are managed accordingly
# It will only take effect when autoSetPorts is set to false. # It will only take effect when autoSetPorts is set to false.
ports: ports:
ratelimiter:
# Whether to enable rate limiting
enable: false
# WindowSize defines time duration per window
window: 20s
# BucketNum defines bucket number for each window
bucket: 500
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 850
circuitBreaker:
enable: false
window: 5s # Time window size (seconds)
bucket: 100 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
request: 500 # Request threshold; circuit breaker evaluation occurs when reached

@ -10,10 +10,26 @@ rpc:
# It will only take effect when autoSetPorts is set to false. # It will only take effect when autoSetPorts is set to false.
ports: ports:
ratelimiter:
# Whether to enable rate limiting
enable: false
# WindowSize defines time duration per window
window: 20s
# BucketNum defines bucket number for each window
bucket: 500
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 850
circuitBreaker:
enable: false
window: 5s # Time window size (seconds)
bucket: 100 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
request: 500 # Request threshold; circuit breaker evaluation occurs when reached
prometheus: prometheus:
# Enable or disable Prometheus monitoring # Enable or disable Prometheus monitoring
enable: true enable: false
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
# It will only take effect when autoSetPorts is set to false. # It will only take effect when autoSetPorts is set to false.
ports: ports:

@ -20,3 +20,20 @@ prometheus:
tokenPolicy: tokenPolicy:
# Token validity period, in days # Token validity period, in days
expire: 90 expire: 90
ratelimiter:
# Whether to enable rate limiting
enable: false
# WindowSize defines time duration per window
window: 20s
# BucketNum defines bucket number for each window
bucket: 500
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 850
circuitBreaker:
enable: false
window: 5s # Time window size (seconds)
bucket: 100 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
request: 500 # Request threshold; circuit breaker evaluation occurs when reached

@ -16,3 +16,20 @@ prometheus:
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
# It will only take effect when autoSetPorts is set to false. # It will only take effect when autoSetPorts is set to false.
ports: ports:
ratelimiter:
# Whether to enable rate limiting
enable: false
# WindowSize defines time duration per window
window: 20s
# BucketNum defines bucket number for each window
bucket: 500
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 850
circuitBreaker:
enable: false
window: 5s # Time window size (seconds)
bucket: 100 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
request: 500 # Request threshold; circuit breaker evaluation occurs when reached

@ -16,3 +16,20 @@ prometheus:
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
# It will only take effect when autoSetPorts is set to false. # It will only take effect when autoSetPorts is set to false.
ports: ports:
ratelimiter:
# Whether to enable rate limiting
enable: false
# WindowSize defines time duration per window
window: 20s
# BucketNum defines bucket number for each window
bucket: 500
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 850
circuitBreaker:
enable: false
window: 5s # Time window size (seconds)
bucket: 100 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
request: 500 # Request threshold; circuit breaker evaluation occurs when reached

@ -19,3 +19,20 @@ prometheus:
enableHistoryForNewMembers: true enableHistoryForNewMembers: true
ratelimiter:
# Whether to enable rate limiting
enable: false
# WindowSize defines time duration per window
window: 20s
# BucketNum defines bucket number for each window
bucket: 500
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 850
circuitBreaker:
enable: false
window: 5s # Time window size (seconds)
bucket: 100 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
request: 500 # Request threshold; circuit breaker evaluation occurs when reached

@ -20,3 +20,20 @@ prometheus:
# Does sending messages require friend verification # Does sending messages require friend verification
friendVerify: false friendVerify: false
ratelimiter:
# Whether to enable rate limiting
enable: false
# WindowSize defines time duration per window
window: 20s
# BucketNum defines bucket number for each window
bucket: 500
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 850
circuitBreaker:
enable: false
window: 5s # Time window size (seconds)
bucket: 100 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
request: 500 # Request threshold; circuit breaker evaluation occurs when reached

@ -17,6 +17,22 @@ prometheus:
# It will only take effect when autoSetPorts is set to false. # It will only take effect when autoSetPorts is set to false.
ports: ports:
ratelimiter:
# Whether to enable rate limiting
enable: false
# WindowSize defines time duration per window
window: 20s
# BucketNum defines bucket number for each window
bucket: 500
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 850
circuitBreaker:
enable: false
window: 5s # Time window size (seconds)
bucket: 100 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
request: 500 # Request threshold; circuit breaker evaluation occurs when reached
object: object:
# Use MinIO as object storage, or set to "cos", "oss", "kodo", "aws", while also configuring the corresponding settings # Use MinIO as object storage, or set to "cos", "oss", "kodo", "aws", while also configuring the corresponding settings

@ -16,3 +16,20 @@ prometheus:
# Prometheus listening ports, must be consistent with the number of rpc.ports # Prometheus listening ports, must be consistent with the number of rpc.ports
# It will only take effect when autoSetPorts is set to false. # It will only take effect when autoSetPorts is set to false.
ports: ports:
ratelimiter:
# Whether to enable rate limiting
enable: false
# WindowSize defines time duration per window
window: 20s
# BucketNum defines bucket number for each window
bucket: 500
# CPU threshold; valid range 01000 (1000 = 100%)
cpuThreshold: 850
circuitBreaker:
enable: false
window: 5s # Time window size (seconds)
bucket: 100 # Number of buckets
success: 0.6 # Success rate threshold (0.6 means 60%)
request: 500 # Request threshold; circuit breaker evaluation occurs when reached

@ -41,6 +41,9 @@ afterSendGroupMsg:
attentionIds: [] attentionIds: []
# See beforeSendSingleMsg comment. # See beforeSendSingleMsg comment.
deniedTypes: [] deniedTypes: []
afterMsgSaveDB:
enable: false
timeout: 5
afterUserOnline: afterUserOnline:
enable: false enable: false
timeout: 5 timeout: 5

@ -63,7 +63,12 @@ services:
restart: always restart: always
sysctls: sysctls:
net.core.somaxconn: 1024 net.core.somaxconn: 1024
command: redis-server /usr/local/redis/config/redis.conf --requirepass openIM123 --appendonly yes command: >
redis-server
--requirepass openIM123
--appendonly yes
--aof-use-rdb-preamble yes
--save ""
networks: networks:
- openim - openim
@ -208,7 +213,6 @@ services:
# Defines which listener is used for inter-broker communication within the Kafka cluster # Defines which listener is used for inter-broker communication within the Kafka cluster
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "INTERNAL" KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "INTERNAL"
# Authentication configuration variables - comment out to disable auth # Authentication configuration variables - comment out to disable auth
# KAFKA_USERNAME: "openIM" # KAFKA_USERNAME: "openIM"
# KAFKA_PASSWORD: "openIM123" # KAFKA_PASSWORD: "openIM123"
@ -262,14 +266,14 @@ services:
networks: networks:
- openim - openim
openim-admin-front: # openim-admin-front:
image: ${OPENIM_ADMIN_FRONT_IMAGE} # image: ${OPENIM_ADMIN_FRONT_IMAGE}
container_name: openim-admin-front # container_name: openim-admin-front
restart: always # restart: always
ports: # ports:
- "11002:80" # - "11002:80"
networks: # networks:
- openim # - openim
prometheus: prometheus:
image: ${PROMETHEUS_IMAGE} image: ${PROMETHEUS_IMAGE}

@ -12,8 +12,8 @@ require (
github.com/gorilla/websocket v1.5.1 github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0 github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.73-alpha.12 github.com/openimsdk/protocol v0.0.73-alpha.17
github.com/openimsdk/tools v0.0.50-alpha.103 github.com/openimsdk/tools v0.0.50-alpha.105
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.10.0 github.com/stretchr/testify v1.10.0
@ -135,6 +135,7 @@ require (
github.com/leodido/go-urn v1.4.0 // indirect github.com/leodido/go-urn v1.4.0 // indirect
github.com/lestrrat-go/strftime v1.0.6 // indirect github.com/lestrrat-go/strftime v1.0.6 // indirect
github.com/lithammer/shortuuid v3.0.0+incompatible // indirect github.com/lithammer/shortuuid v3.0.0+incompatible // indirect
github.com/lufia/plan9stats v0.0.0-20230110061619-bbe2e5e100de // indirect
github.com/magefile/mage v1.15.0 // indirect github.com/magefile/mage v1.15.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect
@ -151,6 +152,7 @@ require (
github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect
github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect
@ -160,6 +162,8 @@ require (
github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sercand/kuberesolver/v6 v6.0.1 // indirect github.com/sercand/kuberesolver/v6 v6.0.1 // indirect
github.com/shirou/gopsutil/v3 v3.24.5 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect github.com/spf13/cast v1.6.0 // indirect

@ -303,6 +303,8 @@ github.com/likexian/gokit v0.25.13 h1:p2Uw3+6fGG53CwdU2Dz0T6bOycdb2+bAFAa3ymwWVk
github.com/likexian/gokit v0.25.13/go.mod h1:qQhEWFBEfqLCO3/vOEo2EDKd+EycekVtUK4tex+l2H4= github.com/likexian/gokit v0.25.13/go.mod h1:qQhEWFBEfqLCO3/vOEo2EDKd+EycekVtUK4tex+l2H4=
github.com/lithammer/shortuuid v3.0.0+incompatible h1:NcD0xWW/MZYXEHa6ITy6kaXN5nwm/V115vj2YXfhS0w= github.com/lithammer/shortuuid v3.0.0+incompatible h1:NcD0xWW/MZYXEHa6ITy6kaXN5nwm/V115vj2YXfhS0w=
github.com/lithammer/shortuuid v3.0.0+incompatible/go.mod h1:FR74pbAuElzOUuenUHTK2Tciko1/vKuIKS9dSkDrA4w= github.com/lithammer/shortuuid v3.0.0+incompatible/go.mod h1:FR74pbAuElzOUuenUHTK2Tciko1/vKuIKS9dSkDrA4w=
github.com/lufia/plan9stats v0.0.0-20230110061619-bbe2e5e100de h1:V53FWzU6KAZVi1tPp5UIsMoUWJ2/PNwYIDXnu7QuBCE=
github.com/lufia/plan9stats v0.0.0-20230110061619-bbe2e5e100de/go.mod h1:JKx41uQRwqlTZabZc+kILPrO/3jlKnQ2Z8b7YiVw5cE=
github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg=
github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
@ -347,10 +349,10 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.15-alpha.11 h1:PQudYDRESYeYlUYrrLLJhYIlUPO5x7FAx+o5El9U/Bw= github.com/openimsdk/gomake v0.0.15-alpha.11 h1:PQudYDRESYeYlUYrrLLJhYIlUPO5x7FAx+o5El9U/Bw=
github.com/openimsdk/gomake v0.0.15-alpha.11/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/gomake v0.0.15-alpha.11/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce2WmwEtbP4JvKk= github.com/openimsdk/protocol v0.0.73-alpha.17 h1:ddo0QMns1GVwAmrPIPlAQ7uKmThAYLnOt+CIOgLsJyE=
github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= github.com/openimsdk/protocol v0.0.73-alpha.17/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
github.com/openimsdk/tools v0.0.50-alpha.103 h1:jYvI86cWiVu8a8iw1panw+pwIiStuUHF76h3fxA6ESI= github.com/openimsdk/tools v0.0.50-alpha.105 h1:axuCvKXhxY2RGLhpMMFNgBtE0B65T2Sr1JDW3UD9nBs=
github.com/openimsdk/tools v0.0.50-alpha.103/go.mod h1:qCExFBqXpQBMzZck3XGIFwivBayAn2KNqB3WAd++IJw= github.com/openimsdk/tools v0.0.50-alpha.105/go.mod h1:x9i/e+WJFW4tocy6RNJQ9NofQiP3KJ1Y576/06TqOG4=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
@ -361,6 +363,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b h1:0LFwY6Q3gMACTjAbMZBjXAqTOzOwFaj2Ld6cjeQ7Rig=
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk= github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk=
@ -397,6 +401,12 @@ github.com/sercand/kuberesolver/v6 v6.0.1 h1:XZUTA0gy/lgDYp/UhEwv7Js24F1j8NJ833Q
github.com/sercand/kuberesolver/v6 v6.0.1/go.mod h1:C0tsTuRMONSY+Xf7pv7RMW1/JlewY1+wS8SZE+1lf1s= github.com/sercand/kuberesolver/v6 v6.0.1/go.mod h1:C0tsTuRMONSY+Xf7pv7RMW1/JlewY1+wS8SZE+1lf1s=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil/v3 v3.24.5 h1:i0t8kL+kQTvpAYToeuiVk3TgDeKOFioZO3Ztz/iZ9pI=
github.com/shirou/gopsutil/v3 v3.24.5/go.mod h1:bsoOS1aStSs9ErQ1WWfxllSeS1K5D+U30r2NfcubMVk=
github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM=
github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ=
github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU=
github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k=
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8= github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8=
@ -548,6 +558,7 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

@ -76,3 +76,7 @@ func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) {
func (o *ConversationApi) UpdateConversationsByUser(c *gin.Context) { func (o *ConversationApi) UpdateConversationsByUser(c *gin.Context) {
a2r.Call(c, conversation.ConversationClient.UpdateConversationsByUser, o.Client) a2r.Call(c, conversation.ConversationClient.UpdateConversationsByUser, o.Client)
} }
func (o *ConversationApi) DeleteConversations(c *gin.Context) {
a2r.Call(c, conversation.ConversationClient.DeleteConversations, o.Client)
}

@ -219,6 +219,8 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM
data = &apistruct.CustomElem{} data = &apistruct.CustomElem{}
case constant.MarkdownText: case constant.MarkdownText:
data = &apistruct.MarkdownTextElem{} data = &apistruct.MarkdownTextElem{}
case constant.Quote:
data = &apistruct.QuoteElem{}
case constant.OANotification: case constant.OANotification:
data = &apistruct.OANotificationElem{} data = &apistruct.OANotificationElem{}
req.SessionType = constant.NotificationChatType req.SessionType = constant.NotificationChatType

@ -0,0 +1,83 @@
package api
import (
"fmt"
"math"
"net/http"
"strconv"
"time"
"github.com/gin-gonic/gin"
"github.com/openimsdk/tools/apiresp"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/stability/ratelimit"
"github.com/openimsdk/tools/stability/ratelimit/bbr"
)
type RateLimiter struct {
Enable bool `yaml:"enable"`
Window time.Duration `yaml:"window"` // time duration per window
Bucket int `yaml:"bucket"` // bucket number for each window
CPUThreshold int64 `yaml:"cpuThreshold"` // CPU threshold; valid range 01000 (1000 = 100%)
}
func RateLimitMiddleware(config *RateLimiter) gin.HandlerFunc {
if !config.Enable {
return func(c *gin.Context) {
c.Next()
}
}
limiter := bbr.NewBBRLimiter(
bbr.WithWindow(config.Window),
bbr.WithBucket(config.Bucket),
bbr.WithCPUThreshold(config.CPUThreshold),
)
return func(c *gin.Context) {
status := limiter.Stat()
c.Header("X-BBR-CPU", strconv.FormatInt(status.CPU, 10))
c.Header("X-BBR-MinRT", strconv.FormatInt(status.MinRt, 10))
c.Header("X-BBR-MaxPass", strconv.FormatInt(status.MaxPass, 10))
c.Header("X-BBR-MaxInFlight", strconv.FormatInt(status.MaxInFlight, 10))
c.Header("X-BBR-InFlight", strconv.FormatInt(status.InFlight, 10))
done, err := limiter.Allow()
if err != nil {
c.Header("X-RateLimit-Policy", "BBR")
c.Header("Retry-After", calculateBBRRetryAfter(status))
c.Header("X-RateLimit-Limit", strconv.FormatInt(status.MaxInFlight, 10))
c.Header("X-RateLimit-Remaining", "0") // There is no concept of remaining quota in BBR.
fmt.Println("rate limited:", err, "path:", c.Request.URL.Path)
log.ZWarn(c, "rate limited", err, "path", c.Request.URL.Path)
c.AbortWithStatus(http.StatusTooManyRequests)
apiresp.GinError(c, errs.NewCodeError(http.StatusTooManyRequests, "too many requests, please try again later"))
return
}
c.Next()
done(ratelimit.DoneInfo{})
}
}
func calculateBBRRetryAfter(status bbr.Stat) string {
loadRatio := float64(status.CPU) / float64(status.CPU)
if loadRatio < 0.8 {
return "1"
}
if loadRatio < 0.95 {
return "2"
}
backoff := 1 + int64(math.Pow(loadRatio-0.95, 2)*50)
if backoff > 5 {
backoff = 5
}
return strconv.FormatInt(backoff, 10)
}

@ -97,6 +97,18 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf
case BestSpeed: case BestSpeed:
r.Use(gzip.Gzip(gzip.BestSpeed)) r.Use(gzip.Gzip(gzip.BestSpeed))
} }
// Use rate limiter middleware
if cfg.API.RateLimiter.Enable {
rl := &RateLimiter{
Enable: cfg.API.RateLimiter.Enable,
Window: cfg.API.RateLimiter.Window,
Bucket: cfg.API.RateLimiter.Bucket,
CPUThreshold: cfg.API.RateLimiter.CPUThreshold,
}
r.Use(RateLimitMiddleware(rl))
}
if config.Standalone() { if config.Standalone() {
r.Use(func(c *gin.Context) { r.Use(func(c *gin.Context) {
c.Set(authverify.CtxAdminUserIDsKey, cfg.Share.IMAdminUser.UserIDs) c.Set(authverify.CtxAdminUserIDsKey, cfg.Share.IMAdminUser.UserIDs)
@ -277,6 +289,7 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf
conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation) conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation)
conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs) conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs)
conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs) conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs)
conversationGroup.POST("/delete_conversations", c.DeleteConversations)
conversationGroup.POST("/update_conversations_by_user", c.UpdateConversationsByUser) conversationGroup.POST("/update_conversations_by_user", c.UpdateConversationsByUser)
} }

@ -70,6 +70,7 @@ type Client struct {
UserID string `json:"userID"` UserID string `json:"userID"`
IsBackground bool `json:"isBackground"` IsBackground bool `json:"isBackground"`
SDKType string `json:"sdkType"` SDKType string `json:"sdkType"`
SDKVersion string `json:"sdkVersion"`
Encoder Encoder Encoder Encoder
ctx *UserConnContext ctx *UserConnContext
longConnServer LongConnServer longConnServer LongConnServer
@ -97,6 +98,7 @@ func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer
c.closedErr = nil c.closedErr = nil
c.token = ctx.GetToken() c.token = ctx.GetToken()
c.SDKType = ctx.GetSDKType() c.SDKType = ctx.GetSDKType()
c.SDKVersion = ctx.GetSDKVersion()
c.hbCtx, c.hbCancel = context.WithCancel(c.ctx) c.hbCtx, c.hbCancel = context.WithCancel(c.ctx)
c.subLock = new(sync.Mutex) c.subLock = new(sync.Mutex)
if c.subUserIDs != nil { if c.subUserIDs != nil {

@ -28,6 +28,7 @@ const (
BackgroundStatus = "isBackground" BackgroundStatus = "isBackground"
SendResponse = "isMsgResp" SendResponse = "isMsgResp"
SDKType = "sdkType" SDKType = "sdkType"
SDKVersion = "sdkVersion"
) )
const ( const (

@ -15,12 +15,13 @@
package msggateway package msggateway
import ( import (
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"net/http" "net/http"
"net/url" "net/url"
"strconv" "strconv"
"time" "time"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/utils/encrypt" "github.com/openimsdk/tools/utils/encrypt"
"github.com/openimsdk/tools/utils/stringutil" "github.com/openimsdk/tools/utils/stringutil"
@ -140,6 +141,10 @@ func (c *UserConnContext) GetToken() string {
return c.Req.URL.Query().Get(Token) return c.Req.URL.Query().Get(Token)
} }
func (c *UserConnContext) GetSDKVersion() string {
return c.Req.URL.Query().Get(SDKVersion)
}
func (c *UserConnContext) GetCompression() bool { func (c *UserConnContext) GetCompression() bool {
compression, exists := c.Query(Compression) compression, exists := c.Query(Compression)
if exists && compression == GzipCompressionProtocol { if exists && compression == GzipCompressionProtocol {

@ -13,6 +13,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/rpccache" "github.com/openimsdk/open-im-server/v3/pkg/rpccache"
pbAuth "github.com/openimsdk/protocol/auth" pbAuth "github.com/openimsdk/protocol/auth"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mcontext"
"github.com/go-playground/validator/v10" "github.com/go-playground/validator/v10"
@ -64,6 +65,8 @@ type WsServer struct {
webhookClient *webhook.Client webhookClient *webhook.Client
userClient *rpcli.UserClient userClient *rpcli.UserClient
authClient *rpcli.AuthClient authClient *rpcli.AuthClient
ready atomic.Bool
} }
type kickHandler struct { type kickHandler struct {
@ -93,6 +96,8 @@ func (ws *WsServer) SetDiscoveryRegistry(ctx context.Context, disCov discovery.C
ws.authClient = rpcli.NewAuthClient(authConn) ws.authClient = rpcli.NewAuthClient(authConn)
ws.MessageHandler = NewGrpcHandler(ws.validate, rpcli.NewMsgClient(msgConn), rpcli.NewPushMsgServiceClient(pushConn)) ws.MessageHandler = NewGrpcHandler(ws.validate, rpcli.NewMsgClient(msgConn), rpcli.NewPushMsgServiceClient(pushConn))
ws.disCov = disCov ws.disCov = disCov
ws.ready.Store(true)
return nil return nil
} }
@ -254,6 +259,10 @@ func (ws *WsServer) registerClient(client *Client) {
oldClients []*Client oldClients []*Client
) )
oldClients, userOK, clientOK = ws.clients.Get(client.UserID, client.PlatformID) oldClients, userOK, clientOK = ws.clients.Get(client.UserID, client.PlatformID)
log.ZInfo(client.ctx, "registerClient", "userID", client.UserID, "platformID", client.PlatformID,
"sdkVersion", client.SDKVersion)
if !userOK { if !userOK {
ws.clients.Set(client.UserID, client) ws.clients.Set(client.UserID, client)
log.ZDebug(client.ctx, "user not exist", "userID", client.UserID, "platformID", client.PlatformID) log.ZDebug(client.ctx, "user not exist", "userID", client.UserID, "platformID", client.PlatformID)
@ -453,6 +462,11 @@ func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {
// Create a new connection context // Create a new connection context
connContext := newContext(w, r) connContext := newContext(w, r)
if !ws.ready.Load() {
httpError(connContext, errs.New("ws server not ready"))
return
}
// Check if the current number of online user connections exceeds the maximum limit // Check if the current number of online user connections exceeds the maximum limit
if ws.onlineUserConnNum.Load() >= ws.wsMaxConnNum { if ws.onlineUserConnNum.Load() >= ws.wsMaxConnNum {
// If it exceeds the maximum connection number, return an error via HTTP and stop processing // If it exceeds the maximum connection number, return an error via HTTP and stop processing
@ -469,6 +483,11 @@ func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
if ws.authClient == nil {
httpError(connContext, errs.New("auth client is not initialized"))
return
}
// Call the authentication client to parse the Token obtained from the context // Call the authentication client to parse the Token obtained from the context
resp, err := ws.authClient.ParseToken(connContext, connContext.GetToken()) resp, err := ws.authClient.ParseToken(connContext, connContext.GetToken())
if err != nil { if err != nil {

@ -51,37 +51,24 @@ func GetContent(msg *sdkws.MsgData) string {
} }
} }
func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) { func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterMsgSaveDB(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) {
if msg.ContentType == constant.Typing {
return
}
if !filterAfterMsg(msg, after) { if !filterAfterMsg(msg, after) {
return return
} }
cbReq := &cbapi.CallbackAfterSendSingleMsgReq{ cbReq := &cbapi.CallbackAfterMsgSaveDBReq{
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterMsgSaveDBCommand),
RecvID: msg.RecvID,
}
mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg))
}
func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) {
if msg.ContentType == constant.Typing {
return
}
if !filterAfterMsg(msg, after) {
return
} }
cbReq := &cbapi.CallbackAfterSendGroupMsgReq{ switch msg.SessionType {
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand), case constant.SingleChatType, constant.NotificationChatType:
GroupID: msg.GroupID, cbReq.RecvID = msg.RecvID
case constant.ReadGroupChatType:
cbReq.GroupID = msg.GroupID
default:
} }
mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg)) mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterMsgSaveDBResp{}, after, buildKeyMsgDataQuery(msg))
} }
func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string { func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string {

@ -18,6 +18,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"github.com/openimsdk/tools/mq" "github.com/openimsdk/tools/mq"
"sync" "sync"

@ -15,7 +15,6 @@
package msgtransfer package msgtransfer
import ( import (
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/mq" "github.com/openimsdk/tools/mq"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
@ -57,7 +56,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(val mq.Message)
log.ZDebug(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.String()) log.ZDebug(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.String())
err = mc.msgTransferDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq) err = mc.msgTransferDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq)
if err != nil { if err != nil {
log.ZError(ctx, "single data insert to mongo err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID) log.ZError(ctx, "batch data insert to mongo err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID)
prommetrics.MsgInsertMongoFailedCounter.Inc() prommetrics.MsgInsertMongoFailedCounter.Inc()
} else { } else {
prommetrics.MsgInsertMongoSuccessCounter.Inc() prommetrics.MsgInsertMongoSuccessCounter.Inc()
@ -65,12 +64,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(val mq.Message)
} }
for _, msgData := range msgFromMQ.MsgData { for _, msgData := range msgFromMQ.MsgData {
switch msgData.SessionType { mc.webhookAfterMsgSaveDB(ctx, &mc.config.WebhooksConfig.AfterMsgSaveDB, msgData)
case constant.SingleChatType:
mc.webhookAfterSendSingleMsg(ctx, &mc.config.WebhooksConfig.AfterSendSingleMsg, msgData)
case constant.ReadGroupChatType:
mc.webhookAfterSendGroupMsg(ctx, &mc.config.WebhooksConfig.AfterSendGroupMsg, msgData)
}
} }
//var seqs []int64 //var seqs []int64

@ -18,10 +18,13 @@ import (
"context" "context"
"errors" "errors"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/mcache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/mcache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/dbbuild" "github.com/openimsdk/open-im-server/v3/pkg/dbbuild"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
@ -46,6 +49,7 @@ import (
type authServer struct { type authServer struct {
pbauth.UnimplementedAuthServer pbauth.UnimplementedAuthServer
authDatabase controller.AuthDatabase authDatabase controller.AuthDatabase
AuthLocalCache *rpccache.AuthLocalCache
RegisterCenter discovery.Conn RegisterCenter discovery.Conn
config *Config config *Config
userClient *rpcli.UserClient userClient *rpcli.UserClient
@ -53,11 +57,12 @@ type authServer struct {
} }
type Config struct { type Config struct {
RpcConfig config.Auth RpcConfig config.Auth
RedisConfig config.Redis RedisConfig config.Redis
MongoConfig config.Mongo MongoConfig config.Mongo
Share config.Share Share config.Share
Discovery config.Discovery LocalCacheConfig config.LocalCache
Discovery config.Discovery
} }
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
@ -78,12 +83,19 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
} }
token = mcache.NewTokenCacheModel(mc, config.RpcConfig.TokenPolicy.Expire) token = mcache.NewTokenCacheModel(mc, config.RpcConfig.TokenPolicy.Expire)
} else { } else {
token = redis2.NewTokenCacheModel(rdb, config.RpcConfig.TokenPolicy.Expire) token = redis2.NewTokenCacheModel(rdb, &config.LocalCacheConfig, config.RpcConfig.TokenPolicy.Expire)
} }
userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User)
if err != nil { if err != nil {
return err return err
} }
authConn, err := client.GetConn(ctx, config.Discovery.RpcService.Auth)
if err != nil {
return err
}
localcache.InitLocalCache(&config.LocalCacheConfig)
pbauth.RegisterAuthServer(server, &authServer{ pbauth.RegisterAuthServer(server, &authServer{
RegisterCenter: client, RegisterCenter: client,
authDatabase: controller.NewAuthDatabase( authDatabase: controller.NewAuthDatabase(
@ -93,9 +105,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
config.Share.MultiLogin, config.Share.MultiLogin,
config.Share.IMAdminUser.UserIDs, config.Share.IMAdminUser.UserIDs,
), ),
config: config, AuthLocalCache: rpccache.NewAuthLocalCache(rpcli.NewAuthClient(authConn), &config.LocalCacheConfig, rdb),
userClient: rpcli.NewUserClient(userConn), config: config,
adminUserIDs: config.Share.IMAdminUser.UserIDs, userClient: rpcli.NewUserClient(userConn),
adminUserIDs: config.Share.IMAdminUser.UserIDs,
}) })
return nil return nil
} }
@ -121,6 +134,7 @@ func (s *authServer) GetAdminToken(ctx context.Context, req *pbauth.GetAdminToke
} }
prommetrics.UserLoginCounter.Inc() prommetrics.UserLoginCounter.Inc()
resp.Token = token resp.Token = token
resp.ExpireTimeSeconds = s.config.RpcConfig.TokenPolicy.Expire * 24 * 60 * 60 resp.ExpireTimeSeconds = s.config.RpcConfig.TokenPolicy.Expire * 24 * 60 * 60
return &resp, nil return &resp, nil
@ -151,20 +165,34 @@ func (s *authServer) GetUserToken(ctx context.Context, req *pbauth.GetUserTokenR
if err != nil { if err != nil {
return nil, err return nil, err
} }
resp.Token = token resp.Token = token
resp.ExpireTimeSeconds = s.config.RpcConfig.TokenPolicy.Expire * 24 * 60 * 60 resp.ExpireTimeSeconds = s.config.RpcConfig.TokenPolicy.Expire * 24 * 60 * 60
return &resp, nil return &resp, nil
} }
func (s *authServer) GetExistingToken(ctx context.Context, req *pbauth.GetExistingTokenReq) (*pbauth.GetExistingTokenResp, error) {
m, err := s.authDatabase.GetTokensWithoutError(ctx, req.UserID, int(req.PlatformID))
if err != nil {
return nil, err
}
return &pbauth.GetExistingTokenResp{
TokenStates: convert.TokenMapDB2Pb(m),
}, nil
}
func (s *authServer) parseToken(ctx context.Context, tokensString string) (claims *tokenverify.Claims, err error) { func (s *authServer) parseToken(ctx context.Context, tokensString string) (claims *tokenverify.Claims, err error) {
claims, err = tokenverify.GetClaimFromToken(tokensString, authverify.Secret(s.config.Share.Secret)) claims, err = tokenverify.GetClaimFromToken(tokensString, authverify.Secret(s.config.Share.Secret))
if err != nil { if err != nil {
return nil, err return nil, err
} }
m, err := s.authDatabase.GetTokensWithoutError(ctx, claims.UserID, claims.PlatformID)
m, err := s.AuthLocalCache.GetExistingToken(ctx, claims.UserID, claims.PlatformID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(m) == 0 { if len(m) == 0 {
isAdmin := authverify.CheckUserIsAdmin(ctx, claims.UserID) isAdmin := authverify.CheckUserIsAdmin(ctx, claims.UserID)
if isAdmin { if isAdmin {

@ -37,6 +37,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
pbconversation "github.com/openimsdk/protocol/conversation" pbconversation "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
@ -132,6 +133,7 @@ func (c *conversationServer) GetConversation(ctx context.Context, req *pbconvers
return resp, nil return resp, nil
} }
// Deprecated: Use `GetConversations` instead.
func (c *conversationServer) GetSortedConversationList(ctx context.Context, req *pbconversation.GetSortedConversationListReq) (resp *pbconversation.GetSortedConversationListResp, err error) { func (c *conversationServer) GetSortedConversationList(ctx context.Context, req *pbconversation.GetSortedConversationListReq) (resp *pbconversation.GetSortedConversationListResp, err error) {
if err := authverify.CheckAccess(ctx, req.UserID); err != nil { if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
return nil, err return nil, err
@ -183,9 +185,21 @@ func (c *conversationServer) GetSortedConversationList(ctx context.Context, req
conversation_isPinTime := make(map[int64]string) conversation_isPinTime := make(map[int64]string)
conversation_notPinTime := make(map[int64]string) conversation_notPinTime := make(map[int64]string)
for _, v := range conversations { for _, v := range conversations {
conversationID := v.ConversationID conversationID := v.ConversationID
time := conversationMsg[conversationID].MsgInfo.LatestMsgRecvTime var time int64
if _, ok := conversationMsg[conversationID]; ok {
time = conversationMsg[conversationID].MsgInfo.LatestMsgRecvTime
} else {
conversationMsg[conversationID] = &pbconversation.ConversationElem{
ConversationID: conversationID,
IsPinned: v.IsPinned,
MsgInfo: nil,
}
time = v.CreateTime.UnixMilli()
}
conversationMsg[conversationID].RecvMsgOpt = v.RecvMsgOpt conversationMsg[conversationID].RecvMsgOpt = v.RecvMsgOpt
if v.IsPinned { if v.IsPinned {
conversationMsg[conversationID].IsPinned = v.IsPinned conversationMsg[conversationID].IsPinned = v.IsPinned
@ -782,7 +796,7 @@ func (c *conversationServer) ClearUserConversationMsg(ctx context.Context, req *
} }
latestMsgDestructTime := time.UnixMilli(req.Timestamp) latestMsgDestructTime := time.UnixMilli(req.Timestamp)
for i, conversation := range conversations { for i, conversation := range conversations {
if conversation.IsMsgDestruct == false || conversation.MsgDestructTime == 0 { if !conversation.IsMsgDestruct || conversation.MsgDestructTime == 0 {
continue continue
} }
seq, err := c.msgClient.GetLastMessageSeqByTime(ctx, conversation.ConversationID, req.Timestamp-(conversation.MsgDestructTime*1000)) seq, err := c.msgClient.GetLastMessageSeqByTime(ctx, conversation.ConversationID, req.Timestamp-(conversation.MsgDestructTime*1000))
@ -822,3 +836,53 @@ func (c *conversationServer) setConversationMinSeqAndLatestMsgDestructTime(ctx c
c.conversationNotificationSender.ConversationChangeNotification(ctx, ownerUserID, []string{conversationID}) c.conversationNotificationSender.ConversationChangeNotification(ctx, ownerUserID, []string{conversationID})
return nil return nil
} }
func (c *conversationServer) DeleteConversations(ctx context.Context, req *pbconversation.DeleteConversationsReq) (resp *pbconversation.DeleteConversationsResp, err error) {
if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil {
return nil, err
}
if req.NeedDeleteTime == 0 && len(req.ConversationIDs) == 0 {
return nil, errs.ErrArgs.WrapMsg("need_delete_time or conversationIDs need be set")
}
if req.NeedDeleteTime != 0 && len(req.ConversationIDs) != 0 {
return nil, errs.ErrArgs.WrapMsg("need_delete_time and conversationIDs cannot both be set")
}
var needDeleteConversationIDs []string
if len(req.ConversationIDs) == 0 {
deleteTimeThreshold := time.Now().AddDate(0, 0, -int(req.NeedDeleteTime)).UnixMilli()
conversationIDs, err := c.conversationDatabase.GetConversationIDs(ctx, req.OwnerUserID)
if err != nil {
return nil, err
}
latestMsgs, err := c.msgClient.GetLastMessage(ctx, &msg.GetLastMessageReq{
UserID: req.OwnerUserID,
ConversationIDs: conversationIDs,
})
if err != nil {
return nil, err
}
for conversationID, msg := range latestMsgs.Msgs {
if msg.SendTime < deleteTimeThreshold {
needDeleteConversationIDs = append(needDeleteConversationIDs, conversationID)
}
}
if len(needDeleteConversationIDs) == 0 {
return &pbconversation.DeleteConversationsResp{}, nil
}
} else {
needDeleteConversationIDs = req.ConversationIDs
}
if err := c.conversationDatabase.DeleteUsersConversations(ctx, req.OwnerUserID, needDeleteConversationIDs); err != nil {
return nil, err
}
// c.conversationNotificationSender.ConversationDeleteNotification(ctx, req.OwnerUserID, needDeleteConversationIDs)
return &pbconversation.DeleteConversationsResp{}, nil
}

@ -73,3 +73,12 @@ func (c *ConversationNotificationSender) ConversationUnreadChangeNotification(
c.Notification(ctx, userID, userID, constant.ConversationUnreadNotification, tips) c.Notification(ctx, userID, userID, constant.ConversationUnreadNotification, tips)
} }
func (c *ConversationNotificationSender) ConversationDeleteNotification(ctx context.Context, userID string, conversationIDs []string) {
tips := &sdkws.ConversationDeleteTips{
UserID: userID,
ConversationIDs: conversationIDs,
}
c.Notification(ctx, userID, userID, constant.ConversationDeleteNotification, tips)
}

@ -2,6 +2,7 @@ package group
import ( import (
"context" "context"
"errors"
"github.com/openimsdk/open-im-server/v3/internal/rpc/incrversion" "github.com/openimsdk/open-im-server/v3/internal/rpc/incrversion"
"github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/authverify"
@ -11,6 +12,7 @@ import (
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
pbgroup "github.com/openimsdk/protocol/group" pbgroup "github.com/openimsdk/protocol/group"
"github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/log"
) )
const versionSyncLimit = 500 const versionSyncLimit = 500
@ -170,19 +172,26 @@ func (g *groupServer) GetIncrementalJoinGroup(ctx context.Context, req *pbgroup.
func (g *groupServer) BatchGetIncrementalGroupMember(ctx context.Context, req *pbgroup.BatchGetIncrementalGroupMemberReq) (*pbgroup.BatchGetIncrementalGroupMemberResp, error) { func (g *groupServer) BatchGetIncrementalGroupMember(ctx context.Context, req *pbgroup.BatchGetIncrementalGroupMemberReq) (*pbgroup.BatchGetIncrementalGroupMemberResp, error) {
var num int var num int
resp := make(map[string]*pbgroup.GetIncrementalGroupMemberResp) resp := make(map[string]*pbgroup.GetIncrementalGroupMemberResp)
for _, memberReq := range req.ReqList { for _, memberReq := range req.ReqList {
if _, ok := resp[memberReq.GroupID]; ok { if _, ok := resp[memberReq.GroupID]; ok {
continue continue
} }
memberResp, err := g.GetIncrementalGroupMember(ctx, memberReq) memberResp, err := g.GetIncrementalGroupMember(ctx, memberReq)
if err != nil { if err != nil {
if errors.Is(err, servererrs.ErrDismissedAlready) {
log.ZWarn(ctx, "Failed to get incremental group member", err, "groupID", memberReq.GroupID, "request", memberReq)
continue
}
return nil, err return nil, err
} }
resp[memberReq.GroupID] = memberResp resp[memberReq.GroupID] = memberResp
num += len(memberResp.Insert) + len(memberResp.Update) + len(memberResp.Delete) num += len(memberResp.Insert) + len(memberResp.Update) + len(memberResp.Delete)
if num >= versionSyncLimit { if num >= versionSyncLimit {
break break
} }
} }
return &pbgroup.BatchGetIncrementalGroupMemberResp{RespList: resp}, nil return &pbgroup.BatchGetIncrementalGroupMemberResp{RespList: resp}, nil
} }

@ -16,8 +16,10 @@ package msg
import ( import (
"context" "context"
"encoding/base64"
"encoding/json" "encoding/json"
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
@ -28,6 +30,7 @@ import (
"github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/stringutil"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
) )
@ -87,19 +90,19 @@ func (m *msgServer) webhookBeforeSendSingleMsg(ctx context.Context, before *conf
} }
// Move to msgtransfer // Move to msgtransfer
// func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) { func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) {
// if msg.MsgData.ContentType == constant.Typing { if msg.MsgData.ContentType == constant.Typing {
// return return
// } }
// if !filterAfterMsg(msg, after) { if !filterAfterMsg(msg, after) {
// return return
// } }
// cbReq := &cbapi.CallbackAfterSendSingleMsgReq{ cbReq := &cbapi.CallbackAfterSendSingleMsgReq{
// CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand),
// RecvID: msg.MsgData.RecvID, RecvID: msg.MsgData.RecvID,
// } }
// m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData))
// } }
func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error { func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error {
return webhook.WithCondition(ctx, before, func(ctx context.Context) error { return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
@ -121,21 +124,20 @@ func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *confi
}) })
} }
// Move to msgtransfer func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) {
// func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) { if msg.MsgData.ContentType == constant.Typing {
// if msg.MsgData.ContentType == constant.Typing { return
// return }
// } if !filterAfterMsg(msg, after) {
// if !filterAfterMsg(msg, after) { return
// return }
// } cbReq := &cbapi.CallbackAfterSendGroupMsgReq{
// cbReq := &cbapi.CallbackAfterSendGroupMsgReq{ CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand),
// CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand), GroupID: msg.MsgData.GroupID,
// GroupID: msg.MsgData.GroupID, }
// }
m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData))
// m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) }
// }
func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq, beforeMsgData **sdkws.MsgData) error { func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq, beforeMsgData **sdkws.MsgData) error {
return webhook.WithCondition(ctx, before, func(ctx context.Context) error { return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
@ -204,14 +206,14 @@ func (m *msgServer) webhookAfterRevokeMsg(ctx context.Context, after *config.Aft
m.webhookClient.AsyncPost(ctx, callbackReq.GetCallbackCommand(), callbackReq, &cbapi.CallbackAfterRevokeMsgResp{}, after) m.webhookClient.AsyncPost(ctx, callbackReq.GetCallbackCommand(), callbackReq, &cbapi.CallbackAfterRevokeMsgResp{}, after)
} }
// func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string { func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string {
// keyMsgData := apistruct.KeyMsgData{ keyMsgData := apistruct.KeyMsgData{
// SendID: msg.SendID, SendID: msg.SendID,
// RecvID: msg.RecvID, RecvID: msg.RecvID,
// GroupID: msg.GroupID, GroupID: msg.GroupID,
// } }
// return map[string]string{ return map[string]string{
// webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)), webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)),
// } }
// } }

@ -86,7 +86,7 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq,
go m.setConversationAtInfo(ctx, req.MsgData) go m.setConversationAtInfo(ctx, req.MsgData)
} }
// m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req) m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req)
prommetrics.GroupChatMsgProcessSuccessCounter.Inc() prommetrics.GroupChatMsgProcessSuccessCounter.Inc()
resp = &pbmsg.SendMsgResp{} resp = &pbmsg.SendMsgResp{}
@ -194,7 +194,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq
return nil, err return nil, err
} }
// m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req) m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req)
prommetrics.SingleChatMsgProcessSuccessCounter.Inc() prommetrics.SingleChatMsgProcessSuccessCounter.Inc()
return &pbmsg.SendMsgResp{ return &pbmsg.SendMsgResp{
ServerMsgID: req.MsgData.ServerMsgID, ServerMsgID: req.MsgData.ServerMsgID,

@ -66,4 +66,5 @@ const (
CallbackAfterCreateSingleChatConversationsCommand = "callbackAfterCreateSingleChatConversationsCommand" CallbackAfterCreateSingleChatConversationsCommand = "callbackAfterCreateSingleChatConversationsCommand"
CallbackBeforeCreateGroupChatConversationsCommand = "callbackBeforeCreateGroupChatConversationsCommand" CallbackBeforeCreateGroupChatConversationsCommand = "callbackBeforeCreateGroupChatConversationsCommand"
CallbackAfterCreateGroupChatConversationsCommand = "callbackAfterCreateGroupChatConversationsCommand" CallbackAfterCreateGroupChatConversationsCommand = "callbackAfterCreateGroupChatConversationsCommand"
CallbackAfterMsgSaveDBCommand = "callbackAfterMsgSaveDBCommand"
) )

@ -103,3 +103,13 @@ type CallbackSingleMsgReadReq struct {
type CallbackSingleMsgReadResp struct { type CallbackSingleMsgReadResp struct {
CommonCallbackResp CommonCallbackResp
} }
type CallbackAfterMsgSaveDBReq struct {
CommonCallbackReq
RecvID string `json:"recvID"`
GroupID string `json:"groupID"`
}
type CallbackAfterMsgSaveDBResp struct {
CommonCallbackResp
}

@ -81,6 +81,9 @@ func (a *ApiCmd) runE() error {
} }
return startrpc.Start( return startrpc.Start(
a.ctx, &a.apiConfig.Discovery, a.ctx, &a.apiConfig.Discovery,
nil,
nil,
// &a.apiConfig.API.RateLimiter,
&prometheus, &prometheus,
a.apiConfig.API.Api.ListenIP, "", a.apiConfig.API.Api.ListenIP, "",
a.apiConfig.API.Prometheus.AutoSetPorts, a.apiConfig.API.Prometheus.AutoSetPorts,

@ -40,6 +40,7 @@ func NewAuthRpcCmd() *AuthRpcCmd {
config.RedisConfigFileName: &authConfig.RedisConfig, config.RedisConfigFileName: &authConfig.RedisConfig,
config.MongodbConfigFileName: &authConfig.MongoConfig, config.MongodbConfigFileName: &authConfig.MongoConfig,
config.ShareFileName: &authConfig.Share, config.ShareFileName: &authConfig.Share,
config.LocalCacheConfigFileName: &authConfig.LocalCacheConfig,
config.DiscoveryConfigFilename: &authConfig.Discovery, config.DiscoveryConfigFilename: &authConfig.Discovery,
} }
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
@ -56,7 +57,7 @@ func (a *AuthRpcCmd) Exec() error {
} }
func (a *AuthRpcCmd) runE() error { func (a *AuthRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.CircuitBreaker, &a.authConfig.RpcConfig.RateLimiter, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP,
a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.AutoSetPorts, a.authConfig.RpcConfig.RPC.Ports, a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.AutoSetPorts, a.authConfig.RpcConfig.RPC.Ports,
a.Index(), a.authConfig.Discovery.RpcService.Auth, nil, a.authConfig, a.Index(), a.authConfig.Discovery.RpcService.Auth, nil, a.authConfig,
[]string{ []string{

@ -58,7 +58,7 @@ func (a *ConversationRpcCmd) Exec() error {
} }
func (a *ConversationRpcCmd) runE() error { func (a *ConversationRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.CircuitBreaker, &a.conversationConfig.RpcConfig.RateLimiter, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP,
a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.AutoSetPorts, a.conversationConfig.RpcConfig.RPC.Ports, a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.AutoSetPorts, a.conversationConfig.RpcConfig.RPC.Ports,
a.Index(), a.conversationConfig.Discovery.RpcService.Conversation, &a.conversationConfig.NotificationConfig, a.conversationConfig, a.Index(), a.conversationConfig.Discovery.RpcService.Conversation, &a.conversationConfig.NotificationConfig, a.conversationConfig,
[]string{ []string{

@ -56,6 +56,8 @@ func (a *CronTaskCmd) runE() error {
var prometheus config.Prometheus var prometheus config.Prometheus
return startrpc.Start( return startrpc.Start(
a.ctx, &a.cronTaskConfig.Discovery, a.ctx, &a.cronTaskConfig.Discovery,
nil,
nil,
&prometheus, &prometheus,
"", "", "", "",
true, true,

@ -58,7 +58,7 @@ func (a *FriendRpcCmd) Exec() error {
} }
func (a *FriendRpcCmd) runE() error { func (a *FriendRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.CircuitBreaker, &a.relationConfig.RpcConfig.RateLimiter, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP,
a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.AutoSetPorts, a.relationConfig.RpcConfig.RPC.Ports, a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.AutoSetPorts, a.relationConfig.RpcConfig.RPC.Ports,
a.Index(), a.relationConfig.Discovery.RpcService.Friend, &a.relationConfig.NotificationConfig, a.relationConfig, a.Index(), a.relationConfig.Discovery.RpcService.Friend, &a.relationConfig.NotificationConfig, a.relationConfig,
[]string{ []string{

@ -59,7 +59,7 @@ func (a *GroupRpcCmd) Exec() error {
} }
func (a *GroupRpcCmd) runE() error { func (a *GroupRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.CircuitBreaker, &a.groupConfig.RpcConfig.RateLimiter, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP,
a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.AutoSetPorts, a.groupConfig.RpcConfig.RPC.Ports, a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.AutoSetPorts, a.groupConfig.RpcConfig.RPC.Ports,
a.Index(), a.groupConfig.Discovery.RpcService.Group, &a.groupConfig.NotificationConfig, a.groupConfig, a.Index(), a.groupConfig.Discovery.RpcService.Group, &a.groupConfig.NotificationConfig, a.groupConfig,
[]string{ []string{

@ -59,7 +59,7 @@ func (a *MsgRpcCmd) Exec() error {
} }
func (a *MsgRpcCmd) runE() error { func (a *MsgRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.CircuitBreaker, &a.msgConfig.RpcConfig.RateLimiter, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP,
a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.AutoSetPorts, a.msgConfig.RpcConfig.RPC.Ports, a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.AutoSetPorts, a.msgConfig.RpcConfig.RPC.Ports,
a.Index(), a.msgConfig.Discovery.RpcService.Msg, &a.msgConfig.NotificationConfig, a.msgConfig, a.Index(), a.msgConfig.Discovery.RpcService.Msg, &a.msgConfig.NotificationConfig, a.msgConfig,
[]string{ []string{

@ -61,6 +61,8 @@ func (m *MsgGatewayCmd) runE() error {
var prometheus config.Prometheus var prometheus config.Prometheus
return startrpc.Start( return startrpc.Start(
m.ctx, &m.msgGatewayConfig.Discovery, m.ctx, &m.msgGatewayConfig.Discovery,
&m.msgGatewayConfig.MsgGateway.CircuitBreaker,
&m.msgGatewayConfig.MsgGateway.RateLimiter,
&prometheus, &prometheus,
rpc.ListenIP, rpc.RegisterIP, rpc.ListenIP, rpc.RegisterIP,
rpc.AutoSetPorts, rpc.AutoSetPorts,

@ -62,6 +62,8 @@ func (m *MsgTransferCmd) runE() error {
var prometheus config.Prometheus var prometheus config.Prometheus
return startrpc.Start( return startrpc.Start(
m.ctx, &m.msgTransferConfig.Discovery, m.ctx, &m.msgTransferConfig.Discovery,
&m.msgTransferConfig.MsgTransfer.CircuitBreaker,
&m.msgTransferConfig.MsgTransfer.RateLimiter,
&prometheus, &prometheus,
"", "", "", "",
true, true,

@ -60,7 +60,7 @@ func (a *PushRpcCmd) Exec() error {
} }
func (a *PushRpcCmd) runE() error { func (a *PushRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.CircuitBreaker, &a.pushConfig.RpcConfig.RateLimiter, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP,
a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.AutoSetPorts, a.pushConfig.RpcConfig.RPC.Ports, a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.AutoSetPorts, a.pushConfig.RpcConfig.RPC.Ports,
a.Index(), a.pushConfig.Discovery.RpcService.Push, &a.pushConfig.NotificationConfig, a.pushConfig, a.Index(), a.pushConfig.Discovery.RpcService.Push, &a.pushConfig.NotificationConfig, a.pushConfig,
[]string{ []string{

@ -58,7 +58,7 @@ func (a *ThirdRpcCmd) Exec() error {
} }
func (a *ThirdRpcCmd) runE() error { func (a *ThirdRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.CircuitBreaker, &a.thirdConfig.RpcConfig.RateLimiter, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP,
a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.AutoSetPorts, a.thirdConfig.RpcConfig.RPC.Ports, a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.AutoSetPorts, a.thirdConfig.RpcConfig.RPC.Ports,
a.Index(), a.thirdConfig.Discovery.RpcService.Third, &a.thirdConfig.NotificationConfig, a.thirdConfig, a.Index(), a.thirdConfig.Discovery.RpcService.Third, &a.thirdConfig.NotificationConfig, a.thirdConfig,
[]string{ []string{

@ -59,7 +59,7 @@ func (a *UserRpcCmd) Exec() error {
} }
func (a *UserRpcCmd) runE() error { func (a *UserRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.CircuitBreaker, &a.userConfig.RpcConfig.RateLimiter, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP,
a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.AutoSetPorts, a.userConfig.RpcConfig.RPC.Ports, a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.AutoSetPorts, a.userConfig.RpcConfig.RPC.Ports,
a.Index(), a.userConfig.Discovery.RpcService.User, &a.userConfig.NotificationConfig, a.userConfig, a.Index(), a.userConfig.Discovery.RpcService.User, &a.userConfig.NotificationConfig, a.userConfig,
[]string{ []string{

@ -43,6 +43,7 @@ type CacheConfig struct {
} }
type LocalCache struct { type LocalCache struct {
Auth CacheConfig `yaml:"auth"`
User CacheConfig `yaml:"user"` User CacheConfig `yaml:"user"`
Group CacheConfig `yaml:"group"` Group CacheConfig `yaml:"group"`
Friend CacheConfig `yaml:"friend"` Friend CacheConfig `yaml:"friend"`
@ -142,6 +143,23 @@ type API struct {
Ports []int `yaml:"ports"` Ports []int `yaml:"ports"`
GrafanaURL string `yaml:"grafanaURL"` GrafanaURL string `yaml:"grafanaURL"`
} `yaml:"prometheus"` } `yaml:"prometheus"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
}
type RateLimiter struct {
Enable bool `yaml:"enable"`
Window time.Duration `yaml:"window"`
Bucket int `yaml:"bucket"`
CPUThreshold int64 `yaml:"cpuThreshold"`
}
type CircuitBreaker struct {
Enable bool `yaml:"enable"`
Window time.Duration `yaml:"window"`
Bucket int `yaml:"bucket"`
Success float64 `yaml:"success"`
Request int64 `yaml:"request"`
} }
type CronTask struct { type CronTask struct {
@ -216,6 +234,8 @@ type MsgGateway struct {
WebsocketMaxMsgLen int `yaml:"websocketMaxMsgLen"` WebsocketMaxMsgLen int `yaml:"websocketMaxMsgLen"`
WebsocketTimeout int `yaml:"websocketTimeout"` WebsocketTimeout int `yaml:"websocketTimeout"`
} `yaml:"longConnSvr"` } `yaml:"longConnSvr"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
} }
type MsgTransfer struct { type MsgTransfer struct {
@ -224,6 +244,8 @@ type MsgTransfer struct {
AutoSetPorts bool `yaml:"autoSetPorts"` AutoSetPorts bool `yaml:"autoSetPorts"`
Ports []int `yaml:"ports"` Ports []int `yaml:"ports"`
} `yaml:"prometheus"` } `yaml:"prometheus"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
} }
type Push struct { type Push struct {
@ -254,7 +276,9 @@ type Push struct {
BadgeCount bool `yaml:"badgeCount"` BadgeCount bool `yaml:"badgeCount"`
Production bool `yaml:"production"` Production bool `yaml:"production"`
} `yaml:"iosPush"` } `yaml:"iosPush"`
FullUserCache bool `yaml:"fullUserCache"` FullUserCache bool `yaml:"fullUserCache"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
} }
type Auth struct { type Auth struct {
@ -263,28 +287,38 @@ type Auth struct {
TokenPolicy struct { TokenPolicy struct {
Expire int64 `yaml:"expire"` Expire int64 `yaml:"expire"`
} `yaml:"tokenPolicy"` } `yaml:"tokenPolicy"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
} }
type Conversation struct { type Conversation struct {
RPC RPC `yaml:"rpc"` RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"` Prometheus Prometheus `yaml:"prometheus"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
} }
type Friend struct { type Friend struct {
RPC RPC `yaml:"rpc"` RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"` Prometheus Prometheus `yaml:"prometheus"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
} }
type Group struct { type Group struct {
RPC RPC `yaml:"rpc"` RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"` Prometheus Prometheus `yaml:"prometheus"`
EnableHistoryForNewMembers bool `yaml:"enableHistoryForNewMembers"` EnableHistoryForNewMembers bool `yaml:"enableHistoryForNewMembers"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
} }
type Msg struct { type Msg struct {
RPC RPC `yaml:"rpc"` RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"` Prometheus Prometheus `yaml:"prometheus"`
FriendVerify bool `yaml:"friendVerify"` FriendVerify bool `yaml:"friendVerify"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
} }
type Third struct { type Third struct {
@ -297,6 +331,8 @@ type Third struct {
Kodo Kodo `yaml:"kodo"` Kodo Kodo `yaml:"kodo"`
Aws Aws `yaml:"aws"` Aws Aws `yaml:"aws"`
} `yaml:"object"` } `yaml:"object"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
} }
type Cos struct { type Cos struct {
BucketURL string `yaml:"bucketURL"` BucketURL string `yaml:"bucketURL"`
@ -335,8 +371,10 @@ type Aws struct {
} }
type User struct { type User struct {
RPC RPC `yaml:"rpc"` RPC RPC `yaml:"rpc"`
Prometheus Prometheus `yaml:"prometheus"` Prometheus Prometheus `yaml:"prometheus"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
CircuitBreaker CircuitBreaker `yaml:"circuitBreaker"`
} }
type RPC struct { type RPC struct {
@ -435,6 +473,7 @@ type Webhooks struct {
BeforeSendGroupMsg BeforeConfig `yaml:"beforeSendGroupMsg"` BeforeSendGroupMsg BeforeConfig `yaml:"beforeSendGroupMsg"`
BeforeMsgModify BeforeConfig `yaml:"beforeMsgModify"` BeforeMsgModify BeforeConfig `yaml:"beforeMsgModify"`
AfterSendGroupMsg AfterConfig `yaml:"afterSendGroupMsg"` AfterSendGroupMsg AfterConfig `yaml:"afterSendGroupMsg"`
AfterMsgSaveDB AfterConfig `yaml:"afterMsgSaveDB"`
AfterUserOnline AfterConfig `yaml:"afterUserOnline"` AfterUserOnline AfterConfig `yaml:"afterUserOnline"`
AfterUserOffline AfterConfig `yaml:"afterUserOffline"` AfterUserOffline AfterConfig `yaml:"afterUserOffline"`
AfterUserKickOff AfterConfig `yaml:"afterUserKickOff"` AfterUserKickOff AfterConfig `yaml:"afterUserKickOff"`

@ -0,0 +1,25 @@
package convert
func TokenMapDB2Pb(tokenMapDB map[string]int) map[string]int32 {
if tokenMapDB == nil {
return nil
}
tokenMapPB := make(map[string]int32, len(tokenMapDB))
for k, v := range tokenMapDB {
tokenMapPB[k] = int32(v)
}
return tokenMapPB
}
func TokenMapPb2DB(tokenMapPB map[string]int32) map[string]int {
if tokenMapPB == nil {
return nil
}
tokenMapDB := make(map[string]int, len(tokenMapPB))
for k, v := range tokenMapPB {
tokenMapDB[k] = int(v)
}
return tokenMapDB
}

@ -0,0 +1,107 @@
package startrpc
import (
"context"
"time"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/stability/circuitbreaker"
"github.com/openimsdk/tools/stability/circuitbreaker/sre"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type CircuitBreaker struct {
Enable bool `yaml:"enable"`
Success float64 `yaml:"success"` // success rate threshold (0.0-1.0)
Request int64 `yaml:"request"` // request threshold
Bucket int `yaml:"bucket"` // number of buckets
Window time.Duration `yaml:"window"` // time window for statistics
}
func NewCircuitBreaker(config *CircuitBreaker) circuitbreaker.CircuitBreaker {
if !config.Enable {
return nil
}
return sre.NewSREBraker(
sre.WithWindow(config.Window),
sre.WithBucket(config.Bucket),
sre.WithSuccess(config.Success),
sre.WithRequest(config.Request),
)
}
func UnaryCircuitBreakerInterceptor(breaker circuitbreaker.CircuitBreaker) grpc.ServerOption {
if breaker == nil {
return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
return handler(ctx, req)
})
}
return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
if err := breaker.Allow(); err != nil {
log.ZWarn(ctx, "rpc circuit breaker open", err, "method", info.FullMethod)
return nil, status.Error(codes.Unavailable, "service unavailable due to circuit breaker")
}
resp, err = handler(ctx, req)
if err != nil {
if st, ok := status.FromError(err); ok {
switch st.Code() {
case codes.OK:
breaker.MarkSuccess()
case codes.InvalidArgument, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied:
breaker.MarkSuccess()
default:
breaker.MarkFailed()
}
} else {
breaker.MarkFailed()
}
} else {
breaker.MarkSuccess()
}
return resp, err
})
}
func StreamCircuitBreakerInterceptor(breaker circuitbreaker.CircuitBreaker) grpc.ServerOption {
if breaker == nil {
return grpc.ChainStreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return handler(srv, ss)
})
}
return grpc.ChainStreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if err := breaker.Allow(); err != nil {
log.ZWarn(ss.Context(), "rpc circuit breaker open", err, "method", info.FullMethod)
return status.Error(codes.Unavailable, "service unavailable due to circuit breaker")
}
err := handler(srv, ss)
if err != nil {
if st, ok := status.FromError(err); ok {
switch st.Code() {
case codes.OK:
breaker.MarkSuccess()
case codes.InvalidArgument, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied:
breaker.MarkSuccess()
default:
breaker.MarkFailed()
}
} else {
breaker.MarkFailed()
}
} else {
breaker.MarkSuccess()
}
return err
})
}

@ -0,0 +1,70 @@
package startrpc
import (
"context"
"time"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/stability/ratelimit"
"github.com/openimsdk/tools/stability/ratelimit/bbr"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type RateLimiter struct {
Enable bool
Window time.Duration
Bucket int
CPUThreshold int64
}
func NewRateLimiter(config *RateLimiter) ratelimit.Limiter {
if !config.Enable {
return nil
}
return bbr.NewBBRLimiter(
bbr.WithWindow(config.Window),
bbr.WithBucket(config.Bucket),
bbr.WithCPUThreshold(config.CPUThreshold),
)
}
func UnaryRateLimitInterceptor(limiter ratelimit.Limiter) grpc.ServerOption {
if limiter == nil {
return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
return handler(ctx, req)
})
}
return grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
done, err := limiter.Allow()
if err != nil {
log.ZWarn(ctx, "rpc rate limited", err, "method", info.FullMethod)
return nil, status.Errorf(codes.ResourceExhausted, "rpc request rate limit exceeded: %v, please try again later", err)
}
defer done(ratelimit.DoneInfo{})
return handler(ctx, req)
})
}
func StreamRateLimitInterceptor(limiter ratelimit.Limiter) grpc.ServerOption {
if limiter == nil {
return grpc.ChainStreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return handler(srv, ss)
})
}
return grpc.ChainStreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
done, err := limiter.Allow()
if err != nil {
log.ZWarn(ss.Context(), "rpc rate limited", err, "method", info.FullMethod)
return status.Errorf(codes.ResourceExhausted, "rpc request rate limit exceeded: %v, please try again later", err)
}
defer done(ratelimit.DoneInfo{})
return handler(srv, ss)
})
}

@ -47,7 +47,7 @@ func init() {
prommetrics.RegistryAll() prommetrics.RegistryAll()
} }
func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP, func Start[T any](ctx context.Context, disc *conf.Discovery, circuitBreakerConfig *conf.CircuitBreaker, rateLimiterConfig *conf.RateLimiter, prometheusConfig *conf.Prometheus, listenIP,
registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T, registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T,
watchConfigNames []string, watchServiceNames []string, watchConfigNames []string, watchServiceNames []string,
rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error, rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error,
@ -84,6 +84,45 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c
} }
} }
if circuitBreakerConfig != nil && circuitBreakerConfig.Enable {
cb := &CircuitBreaker{
Enable: circuitBreakerConfig.Enable,
Success: circuitBreakerConfig.Success,
Request: circuitBreakerConfig.Request,
Bucket: circuitBreakerConfig.Bucket,
Window: circuitBreakerConfig.Window,
}
breaker := NewCircuitBreaker(cb)
options = append(options,
UnaryCircuitBreakerInterceptor(breaker),
StreamCircuitBreakerInterceptor(breaker),
)
log.ZInfo(ctx, "RPC circuit breaker enabled",
"service", rpcRegisterName,
"window", circuitBreakerConfig.Window,
"bucket", circuitBreakerConfig.Bucket,
"success", circuitBreakerConfig.Success,
"requestThreshold", circuitBreakerConfig.Request)
}
if rateLimiterConfig != nil && rateLimiterConfig.Enable {
limiter := NewRateLimiter((*RateLimiter)(rateLimiterConfig))
options = append(options,
UnaryRateLimitInterceptor(limiter),
StreamRateLimitInterceptor(limiter),
)
log.ZInfo(ctx, "RPC rate limiter enabled",
"service", rpcRegisterName,
"window", rateLimiterConfig.Window,
"bucket", rateLimiterConfig.Bucket,
"cpuThreshold", rateLimiterConfig.CPUThreshold)
}
registerIP, err := network.GetRpcRegisterIP(registerIP) registerIP, err := network.GetRpcRegisterIP(registerIP)
if err != nil { if err != nil {
return err return err
@ -123,7 +162,7 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c
go func() { go func() {
sigs := make(chan os.Signal, 1) sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return

@ -16,6 +16,7 @@ package cache
import ( import (
"context" "context"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
) )
@ -57,7 +58,7 @@ type ConversationCache interface {
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache
DelConversationNotNotifyMessageUserIDs(userIDs ...string) ConversationCache DelConversationNotNotifyMessageUserIDs(userIDs ...string) ConversationCache
DelConversationPinnedMessageUserIDs(userIDs ...string) ConversationCache DelUserPinnedConversations(userIDs ...string) ConversationCache
DelConversationVersionUserIDs(userIDs ...string) ConversationCache DelConversationVersionUserIDs(userIDs ...string) ConversationCache
FindMaxConversationUserVersion(ctx context.Context, userID string) (*relationtb.VersionLog, error) FindMaxConversationUserVersion(ctx context.Context, userID string) (*relationtb.VersionLog, error)

@ -24,7 +24,7 @@ var (
func NewMsgCache(cache database.Cache, msgDocDatabase database.Msg) cache.MsgCache { func NewMsgCache(cache database.Cache, msgDocDatabase database.Msg) cache.MsgCache {
initMemMsgCache.Do(func() { initMemMsgCache.Do(func() {
memMsgCache = lru.NewLayLRU[string, *model.MsgInfoModel](1024*8, time.Hour, time.Second*10, localcache.EmptyTarget{}, nil) memMsgCache = lru.NewLazyLRU[string, *model.MsgInfoModel](1024*8, time.Hour, time.Second*10, localcache.EmptyTarget{}, nil)
}) })
return &msgCache{ return &msgCache{
cache: cache, cache: cache,

@ -253,7 +253,7 @@ func (c *ConversationRedisCache) DelConversationNotNotifyMessageUserIDs(userIDs
return cache return cache
} }
func (c *ConversationRedisCache) DelConversationPinnedMessageUserIDs(userIDs ...string) cache.ConversationCache { func (c *ConversationRedisCache) DelUserPinnedConversations(userIDs ...string) cache.ConversationCache {
cache := c.CloneConversationCache() cache := c.CloneConversationCache()
for _, userID := range userIDs { for _, userID := range userIDs {
cache.AddKeys(c.getPinnedConversationIDsKey(userID)) cache.AddKeys(c.getPinnedConversationIDsKey(userID))

@ -2,13 +2,16 @@ package redis
import ( import (
"context" "context"
"encoding/json"
"strconv" "strconv"
"sync" "sync"
"time" "time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/datautil"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
) )
@ -16,16 +19,26 @@ import (
type tokenCache struct { type tokenCache struct {
rdb redis.UniversalClient rdb redis.UniversalClient
accessExpire time.Duration accessExpire time.Duration
localCache *config.LocalCache
} }
func NewTokenCacheModel(rdb redis.UniversalClient, accessExpire int64) cache.TokenModel { func NewTokenCacheModel(rdb redis.UniversalClient, localCache *config.LocalCache, accessExpire int64) cache.TokenModel {
c := &tokenCache{rdb: rdb} c := &tokenCache{rdb: rdb, localCache: localCache}
c.accessExpire = c.getExpireTime(accessExpire) c.accessExpire = c.getExpireTime(accessExpire)
return c return c
} }
func (c *tokenCache) SetTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { func (c *tokenCache) SetTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
return errs.Wrap(c.rdb.HSet(ctx, cachekey.GetTokenKey(userID, platformID), token, flag).Err()) key := cachekey.GetTokenKey(userID, platformID)
if err := c.rdb.HSet(ctx, key, token, flag).Err(); err != nil {
return errs.Wrap(err)
}
if c.localCache != nil {
c.removeLocalTokenCache(ctx, key)
}
return nil
} }
// SetTokenFlagEx set token and flag with expire time // SetTokenFlagEx set token and flag with expire time
@ -37,6 +50,11 @@ func (c *tokenCache) SetTokenFlagEx(ctx context.Context, userID string, platform
if err := c.rdb.Expire(ctx, key, c.accessExpire).Err(); err != nil { if err := c.rdb.Expire(ctx, key, c.accessExpire).Err(); err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
if c.localCache != nil {
c.removeLocalTokenCache(ctx, key)
}
return nil return nil
} }
@ -106,7 +124,17 @@ func (c *tokenCache) SetTokenMapByUidPid(ctx context.Context, userID string, pla
for k, v := range m { for k, v := range m {
mm[k] = v mm[k] = v
} }
return errs.Wrap(c.rdb.HSet(ctx, cachekey.GetTokenKey(userID, platformID), mm).Err())
err := c.rdb.HSet(ctx, cachekey.GetTokenKey(userID, platformID), mm).Err()
if err != nil {
return errs.Wrap(err)
}
if c.localCache != nil {
c.removeLocalTokenCache(ctx, cachekey.GetTokenKey(userID, platformID))
}
return nil
} }
func (c *tokenCache) BatchSetTokenMapByUidPid(ctx context.Context, tokens map[string]map[string]any) error { func (c *tokenCache) BatchSetTokenMapByUidPid(ctx context.Context, tokens map[string]map[string]any) error {
@ -124,11 +152,23 @@ func (c *tokenCache) BatchSetTokenMapByUidPid(ctx context.Context, tokens map[st
}); err != nil { }); err != nil {
return err return err
} }
if c.localCache != nil {
c.removeLocalTokenCache(ctx, keys...)
}
return nil return nil
} }
func (c *tokenCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error { func (c *tokenCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error {
return errs.Wrap(c.rdb.HDel(ctx, cachekey.GetTokenKey(userID, platformID), fields...).Err()) key := cachekey.GetTokenKey(userID, platformID)
if err := c.rdb.HDel(ctx, key, fields...).Err(); err != nil {
return errs.Wrap(err)
}
if c.localCache != nil {
c.removeLocalTokenCache(ctx, key)
}
return nil
} }
func (c *tokenCache) getExpireTime(t int64) time.Duration { func (c *tokenCache) getExpireTime(t int64) time.Duration {
@ -161,6 +201,11 @@ func (c *tokenCache) DeleteTokenByTokenMap(ctx context.Context, userID string, t
return err return err
} }
// Remove local cache for the token
if c.localCache != nil {
c.removeLocalTokenCache(ctx, keys...)
}
return nil return nil
} }
@ -175,5 +220,28 @@ func (c *tokenCache) DeleteAndSetTemporary(ctx context.Context, userID string, p
if err := c.rdb.HDel(ctx, key, fields...).Err(); err != nil { if err := c.rdb.HDel(ctx, key, fields...).Err(); err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
if c.localCache != nil {
c.removeLocalTokenCache(ctx, key)
}
return nil return nil
} }
func (c *tokenCache) removeLocalTokenCache(ctx context.Context, keys ...string) {
if len(keys) == 0 {
return
}
topic := c.localCache.Auth.Topic
if topic == "" {
return
}
data, err := json.Marshal(keys)
if err != nil {
log.ZWarn(ctx, "keys json marshal failed", err, "topic", topic, "keys", keys)
} else {
if err := c.rdb.Publish(ctx, topic, string(data)).Err(); err != nil {
log.ZWarn(ctx, "redis publish cache delete error", err, "topic", topic, "keys", keys)
}
}
}

@ -78,6 +78,8 @@ type ConversationDatabase interface {
GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error) GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error)
// FindRandConversation finds random conversations based on the specified timestamp and limit. // FindRandConversation finds random conversations based on the specified timestamp and limit.
FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error)
DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error)
} }
func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase { func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
@ -120,7 +122,7 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context,
cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...) cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...)
} }
if _, ok := fieldMap["is_pinned"]; ok { if _, ok := fieldMap["is_pinned"]; ok {
cache = cache.DelConversationPinnedMessageUserIDs(userIDs...) cache = cache.DelUserPinnedConversations(userIDs...)
} }
cache = cache.DelConversationVersionUserIDs(haveUserIDs...) cache = cache.DelConversationVersionUserIDs(haveUserIDs...)
} }
@ -172,7 +174,7 @@ func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context,
cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...) cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...)
} }
if _, ok := args["is_pinned"]; ok { if _, ok := args["is_pinned"]; ok {
cache = cache.DelConversationPinnedMessageUserIDs(userIDs...) cache = cache.DelUserPinnedConversations(userIDs...)
} }
return cache.ChainExecDel(ctx) return cache.ChainExecDel(ctx)
} }
@ -203,7 +205,7 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat
DelUserConversationIDsHash(userIDs...). DelUserConversationIDsHash(userIDs...).
DelConversationVersionUserIDs(userIDs...). DelConversationVersionUserIDs(userIDs...).
DelConversationNotNotifyMessageUserIDs(notNotifyUserIDs...). DelConversationNotNotifyMessageUserIDs(notNotifyUserIDs...).
DelConversationPinnedMessageUserIDs(pinnedUserIDs...). DelUserPinnedConversations(pinnedUserIDs...).
ChainExecDel(ctx) ChainExecDel(ctx)
} }
@ -259,7 +261,7 @@ func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUs
cache := c.cache.CloneConversationCache() cache := c.cache.CloneConversationCache()
cache = cache.DelConversationVersionUserIDs(ownerUserID). cache = cache.DelConversationVersionUserIDs(ownerUserID).
DelConversationNotNotifyMessageUserIDs(ownerUserID). DelConversationNotNotifyMessageUserIDs(ownerUserID).
DelConversationPinnedMessageUserIDs(ownerUserID) DelUserPinnedConversations(ownerUserID)
groupIDs := datautil.Distinct(datautil.Filter(conversations, func(e *relationtb.Conversation) (string, bool) { groupIDs := datautil.Distinct(datautil.Filter(conversations, func(e *relationtb.Conversation) (string, bool) {
return e.GroupID, e.GroupID != "" return e.GroupID, e.GroupID != ""
@ -429,3 +431,21 @@ func (c *conversationDatabase) GetPinnedConversationIDs(ctx context.Context, use
func (c *conversationDatabase) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error) { func (c *conversationDatabase) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error) {
return c.conversationDB.FindRandConversation(ctx, ts, limit) return c.conversationDB.FindRandConversation(ctx, ts, limit)
} }
func (c *conversationDatabase) DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) {
return c.tx.Transaction(ctx, func(ctx context.Context) error {
err = c.conversationDB.DeleteUsersConversations(ctx, userID, conversationIDs)
if err != nil {
return err
}
cache := c.cache.CloneConversationCache()
cache = cache.DelConversations(userID, conversationIDs...).
DelConversationVersionUserIDs(userID).
DelConversationIDs(userID).
DelUserConversationIDsHash(userID).
DelConversationNotNotifyMessageUserIDs(userID).
DelUserPinnedConversations(userID)
return cache.ChainExecDel(ctx)
})
}

@ -194,7 +194,6 @@ func (g *groupDatabase) CreateGroup(ctx context.Context, groups []*model.Group,
} }
for _, group := range groups { for _, group := range groups {
c = c.DelGroupsInfo(group.GroupID). c = c.DelGroupsInfo(group.GroupID).
DelGroupMembersHash(group.GroupID).
DelGroupMembersHash(group.GroupID). DelGroupMembersHash(group.GroupID).
DelGroupsMemberNum(group.GroupID). DelGroupsMemberNum(group.GroupID).
DelGroupMemberIDs(group.GroupID). DelGroupMemberIDs(group.GroupID).

@ -44,4 +44,5 @@ type Conversation interface {
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error)
FindRandConversation(ctx context.Context, ts int64, limit int) ([]*model.Conversation, error) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*model.Conversation, error)
DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error)
} }

@ -308,3 +308,20 @@ func (c *ConversationMgo) FindRandConversation(ctx context.Context, ts int64, li
} }
return mongoutil.Aggregate[*model.Conversation](ctx, c.coll, pipeline) return mongoutil.Aggregate[*model.Conversation](ctx, c.coll, pipeline)
} }
func (c *ConversationMgo) DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) {
if len(conversationIDs) == 0 {
return nil
}
return mongoutil.IncrVersion(func() error {
err := mongoutil.DeleteMany(ctx, c.coll, bson.M{"owner_user_id": userID, "conversation_id": bson.M{"$in": conversationIDs}})
return err
}, func() error {
for _, conversationID := range conversationIDs {
if err := c.version.IncrVersion(ctx, userID, []string{conversationID}, model.VersionStateDelete); err != nil {
return err
}
}
return nil
})
}

@ -5,6 +5,11 @@ import (
"fmt" "fmt"
"time" "time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
@ -14,10 +19,6 @@ import (
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/jsonutil" "github.com/openimsdk/tools/utils/jsonutil"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
) )
func NewMsgMongo(db *mongo.Database) (database.Msg, error) { func NewMsgMongo(db *mongo.Database) (database.Msg, error) {
@ -1154,7 +1155,7 @@ func (m *MsgMgo) findBeforeDocSendTime(ctx context.Context, docID string, limit
if err != nil { if err != nil {
return 0, 0, err return 0, 0, err
} }
for i := len(res) - 1; i > 0; i-- { for i := len(res) - 1; i >= 0; i-- {
v := res[i] v := res[i]
if v.Msgs != nil && v.Msgs.Msg != nil && v.Msgs.Msg.SendTime > 0 { if v.Msgs != nil && v.Msgs.Msg != nil && v.Msgs.Msg.SendTime > 0 {
return v.Msgs.Msg.Seq, v.Msgs.Msg.SendTime, nil return v.Msgs.Msg.Seq, v.Msgs.Msg.SendTime, nil
@ -1169,7 +1170,7 @@ func (m *MsgMgo) findBeforeSendTime(ctx context.Context, conversationID string,
limit := int64(-1) limit := int64(-1)
if first { if first {
first = false first = false
limit = m.model.GetMsgIndex(seq) limit = m.model.GetLimitForSingleDoc(seq)
} }
docID := m.model.BuildDocIDByIndex(conversationID, i) docID := m.model.BuildDocIDByIndex(conversationID, i)
msgSeq, msgSendTime, err := m.findBeforeDocSendTime(ctx, docID, limit) msgSeq, msgSendTime, err := m.findBeforeDocSendTime(ctx, docID, limit)

@ -73,7 +73,7 @@ func (u *UserMgo) TakeNotification(ctx context.Context, level int64) (user []*mo
} }
func (u *UserMgo) TakeGTEAppManagerLevel(ctx context.Context, level int64) (user []*model.User, err error) { func (u *UserMgo) TakeGTEAppManagerLevel(ctx context.Context, level int64) (user []*model.User, err error) {
return mongoutil.Find[*model.User](ctx, u.coll, bson.M{"app_manager_level": bson.M{"$gte": level}}) return mongoutil.Find[*model.User](ctx, u.coll, bson.M{"app_manger_level": bson.M{"$gte": level}})
} }
func (u *UserMgo) TakeByNickname(ctx context.Context, nickname string) (user []*model.User, err error) { func (u *UserMgo) TakeByNickname(ctx context.Context, nickname string) (user []*model.User, err error) {

@ -132,6 +132,10 @@ func (*MsgDocModel) GetMsgIndex(seq int64) int64 {
return (seq - 1) % singleGocMsgNum return (seq - 1) % singleGocMsgNum
} }
func (*MsgDocModel) GetLimitForSingleDoc(seq int64) int64 {
return seq % singleGocMsgNum
}
func (*MsgDocModel) indexGen(conversationID string, seqSuffix int64) string { func (*MsgDocModel) indexGen(conversationID string, seqSuffix int64) string {
return conversationID + ":" + strconv.FormatInt(seqSuffix, 10) return conversationID + ":" + strconv.FormatInt(seqSuffix, 10)
} }

@ -47,15 +47,15 @@ func New[V any](opts ...Option) Cache[V] {
if opt.localSlotNum > 0 && opt.localSlotSize > 0 { if opt.localSlotNum > 0 && opt.localSlotSize > 0 {
createSimpleLRU := func() lru.LRU[string, V] { createSimpleLRU := func() lru.LRU[string, V] {
if opt.expirationEvict { if opt.expirationEvict {
return lru.NewExpirationLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) return lru.NewExpirationLRU(opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
} else { } else {
return lru.NewLayLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) return lru.NewLazyLRU(opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
} }
} }
if opt.localSlotNum == 1 { if opt.localSlotNum == 1 {
c.local = createSimpleLRU() c.local = createSimpleLRU()
} else { } else {
c.local = lru.NewSlotLRU[string, V](opt.localSlotNum, LRUStringHash, createSimpleLRU) c.local = lru.NewSlotLRU(opt.localSlotNum, LRUStringHash, createSimpleLRU)
} }
if opt.linkSlotNum > 0 { if opt.linkSlotNum > 0 {
c.link = link.New(opt.linkSlotNum) c.link = link.New(opt.linkSlotNum)
@ -71,6 +71,8 @@ type cache[V any] struct {
} }
func (c *cache[V]) onEvict(key string, value V) { func (c *cache[V]) onEvict(key string, value V) {
_ = value
if c.link != nil { if c.link != nil {
lks := c.link.Del(key) lks := c.link.Del(key)
for k := range lks { for k := range lks {

@ -15,10 +15,11 @@
package localcache package localcache
import ( import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"strings" "strings"
"sync" "sync"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
) )
var ( var (
@ -32,6 +33,10 @@ func InitLocalCache(localCache *config.LocalCache) {
Local config.CacheConfig Local config.CacheConfig
Keys []string Keys []string
}{ }{
{
Local: localCache.Auth,
Keys: []string{cachekey.UidPidToken},
},
{ {
Local: localCache.User, Local: localCache.User,
Keys: []string{cachekey.UserInfoKey, cachekey.UserGlobalRecvMsgOptKey}, Keys: []string{cachekey.UserInfoKey, cachekey.UserGlobalRecvMsgOptKey},

@ -21,25 +21,25 @@ import (
"github.com/hashicorp/golang-lru/v2/simplelru" "github.com/hashicorp/golang-lru/v2/simplelru"
) )
type layLruItem[V any] struct { type lazyLruItem[V any] struct {
lock sync.Mutex lock sync.Mutex
expires int64 expires int64
err error err error
value V value V
} }
func NewLayLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[K, V]) *LayLRU[K, V] { func NewLazyLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[K, V]) *LazyLRU[K, V] {
var cb simplelru.EvictCallback[K, *layLruItem[V]] var cb simplelru.EvictCallback[K, *lazyLruItem[V]]
if onEvict != nil { if onEvict != nil {
cb = func(key K, value *layLruItem[V]) { cb = func(key K, value *lazyLruItem[V]) {
onEvict(key, value.value) onEvict(key, value.value)
} }
} }
core, err := simplelru.NewLRU[K, *layLruItem[V]](size, cb) core, err := simplelru.NewLRU[K, *lazyLruItem[V]](size, cb)
if err != nil { if err != nil {
panic(err) panic(err)
} }
return &LayLRU[K, V]{ return &LazyLRU[K, V]{
core: core, core: core,
successTTL: successTTL, successTTL: successTTL,
failedTTL: failedTTL, failedTTL: failedTTL,
@ -47,15 +47,15 @@ func NewLayLRU[K comparable, V any](size int, successTTL, failedTTL time.Duratio
} }
} }
type LayLRU[K comparable, V any] struct { type LazyLRU[K comparable, V any] struct {
lock sync.Mutex lock sync.Mutex
core *simplelru.LRU[K, *layLruItem[V]] core *simplelru.LRU[K, *lazyLruItem[V]]
successTTL time.Duration successTTL time.Duration
failedTTL time.Duration failedTTL time.Duration
target Target target Target
} }
func (x *LayLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { func (x *LazyLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
x.lock.Lock() x.lock.Lock()
v, ok := x.core.Get(key) v, ok := x.core.Get(key)
if ok { if ok {
@ -68,7 +68,7 @@ func (x *LayLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
return value, err return value, err
} }
} else { } else {
v = &layLruItem[V]{} v = &lazyLruItem[V]{}
x.core.Add(key, v) x.core.Add(key, v)
v.lock.Lock() v.lock.Lock()
x.lock.Unlock() x.lock.Unlock()
@ -88,15 +88,15 @@ func (x *LayLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
return v.value, v.err return v.value, v.err
} }
func (x *LayLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) { func (x *LazyLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) {
var ( var (
err error err error
once sync.Once once sync.Once
) )
res := make(map[K]V) res := make(map[K]V)
queries := make([]K, 0) queries := make([]K, 0, len(keys))
setVs := make(map[K]*layLruItem[V])
for _, key := range keys { for _, key := range keys {
x.lock.Lock() x.lock.Lock()
v, ok := x.core.Get(key) v, ok := x.core.Get(key)
@ -118,14 +118,20 @@ func (x *LayLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error))
} }
queries = append(queries, key) queries = append(queries, key)
} }
values, err1 := fetch(queries)
if err1 != nil { if len(queries) == 0 {
return res, err
}
values, fetchErr := fetch(queries)
if fetchErr != nil {
once.Do(func() { once.Do(func() {
err = err1 err = fetchErr
}) })
} }
for key, val := range values { for key, val := range values {
v := &layLruItem[V]{} v := &lazyLruItem[V]{}
v.value = val v.value = val
if err == nil { if err == nil {
@ -135,7 +141,7 @@ func (x *LayLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error))
v.expires = time.Now().Add(x.failedTTL).UnixMilli() v.expires = time.Now().Add(x.failedTTL).UnixMilli()
x.target.IncrGetFailed() x.target.IncrGetFailed()
} }
setVs[key] = v
x.lock.Lock() x.lock.Lock()
x.core.Add(key, v) x.core.Add(key, v)
x.lock.Unlock() x.lock.Unlock()
@ -145,29 +151,29 @@ func (x *LayLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error))
return res, err return res, err
} }
//func (x *LayLRU[K, V]) Has(key K) bool { //func (x *LazyLRU[K, V]) Has(key K) bool {
// x.lock.Lock() // x.lock.Lock()
// defer x.lock.Unlock() // defer x.lock.Unlock()
// return x.core.Contains(key) // return x.core.Contains(key)
//} //}
func (x *LayLRU[K, V]) Set(key K, value V) { func (x *LazyLRU[K, V]) Set(key K, value V) {
x.lock.Lock() x.lock.Lock()
defer x.lock.Unlock() defer x.lock.Unlock()
x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()}) x.core.Add(key, &lazyLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()})
} }
func (x *LayLRU[K, V]) SetHas(key K, value V) bool { func (x *LazyLRU[K, V]) SetHas(key K, value V) bool {
x.lock.Lock() x.lock.Lock()
defer x.lock.Unlock() defer x.lock.Unlock()
if x.core.Contains(key) { if x.core.Contains(key) {
x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()}) x.core.Add(key, &lazyLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()})
return true return true
} }
return false return false
} }
func (x *LayLRU[K, V]) Del(key K) bool { func (x *LazyLRU[K, V]) Del(key K) bool {
x.lock.Lock() x.lock.Lock()
ok := x.core.Remove(key) ok := x.core.Remove(key)
x.lock.Unlock() x.lock.Unlock()
@ -179,6 +185,6 @@ func (x *LayLRU[K, V]) Del(key K) bool {
return ok return ok
} }
func (x *LayLRU[K, V]) Stop() { func (x *LazyLRU[K, V]) Stop() {
} }

@ -35,7 +35,7 @@ type slotLRU[K comparable, V any] struct {
func (x *slotLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) { func (x *slotLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) {
var ( var (
slotKeys = make(map[uint64][]K) slotKeys = make(map[uint64][]K)
vs = make(map[K]V) kVs = make(map[K]V)
) )
for _, k := range keys { for _, k := range keys {
@ -49,10 +49,10 @@ func (x *slotLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)
return nil, err return nil, err
} }
for key, value := range batches { for key, value := range batches {
vs[key] = value kVs[key] = value
} }
} }
return vs, nil return kVs, nil
} }
func (x *slotLRU[K, V]) getIndex(k K) uint64 { func (x *slotLRU[K, V]) getIndex(k K) uint64 {

@ -109,7 +109,7 @@ func GetConversationIDBySessionType(sessionType int, ids ...string) string {
case constant.ReadGroupChatType: case constant.ReadGroupChatType:
return "sg_" + ids[0] // super group chat return "sg_" + ids[0] // super group chat
case constant.NotificationChatType: case constant.NotificationChatType:
return "sn_" + ids[0] // server notification chat return "sn_" + strings.Join(ids, "_") // server notification chat
} }
return "" return ""
} }

@ -0,0 +1,69 @@
package rpccache
import (
"context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/protocol/auth"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
)
func NewAuthLocalCache(client *rpcli.AuthClient, localCache *config.LocalCache, cli redis.UniversalClient) *AuthLocalCache {
lc := localCache.Auth
log.ZDebug(context.Background(), "AuthLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable())
x := &AuthLocalCache{
client: client,
local: localcache.New[[]byte](
localcache.WithLocalSlotNum(lc.SlotNum),
localcache.WithLocalSlotSize(lc.SlotSize),
localcache.WithLinkSlotNum(lc.SlotNum),
localcache.WithLocalSuccessTTL(lc.Success()),
localcache.WithLocalFailedTTL(lc.Failed()),
),
}
if lc.Enable() {
go subscriberRedisDeleteCache(context.Background(), cli, lc.Topic, x.local.DelLocal)
}
return x
}
type AuthLocalCache struct {
client *rpcli.AuthClient
local localcache.Cache[[]byte]
}
func (a *AuthLocalCache) GetExistingToken(ctx context.Context, userID string, platformID int) (val map[string]int, err error) {
resp, err := a.getExistingToken(ctx, userID, platformID)
if err != nil {
return nil, err
}
res := convert.TokenMapPb2DB(resp.TokenStates)
return res, nil
}
func (a *AuthLocalCache) getExistingToken(ctx context.Context, userID string, platformID int) (val *auth.GetExistingTokenResp, err error) {
start := time.Now()
log.ZDebug(ctx, "AuthLocalCache GetExistingToken req", "userID", userID, "platformID", platformID)
defer func() {
if err != nil {
log.ZError(ctx, "AuthLocalCache GetExistingToken error", err, "cost", time.Since(start), "userID", userID, "platformID", platformID)
} else {
log.ZDebug(ctx, "AuthLocalCache GetExistingToken resp", "cost", time.Since(start), "userID", userID, "platformID", platformID, "val", val)
}
}()
var cache cacheProto[auth.GetExistingTokenResp]
return cache.Unmarshal(a.local.Get(ctx, cachekey.GetTokenKey(userID, platformID), func(ctx context.Context) ([]byte, error) {
log.ZDebug(ctx, "AuthLocalCache GetExistingToken call rpc", "userID", userID, "platformID", platformID)
return cache.Marshal(a.client.AuthClient.GetExistingToken(ctx, &auth.GetExistingTokenReq{UserID: userID, PlatformID: int32(platformID)}))
}))
}

@ -64,7 +64,7 @@ func NewOnlineCache(client *rpcli.UserClient, group *GroupLocalCache, rdb redis.
case false: case false:
log.ZDebug(ctx, "fullUserCache is false") log.ZDebug(ctx, "fullUserCache is false")
x.lruCache = lru.NewSlotLRU(1024, localcache.LRUStringHash, func() lru.LRU[string, []int32] { x.lruCache = lru.NewSlotLRU(1024, localcache.LRUStringHash, func() lru.LRU[string, []int32] {
return lru.NewLayLRU[string, []int32](2048, cachekey.OnlineExpire/2, time.Second*3, localcache.EmptyTarget{}, func(key string, value []int32) {}) return lru.NewLazyLRU[string, []int32](2048, cachekey.OnlineExpire/2, time.Second*3, localcache.EmptyTarget{}, func(key string, value []int32) {})
}) })
x.CurrentPhase.Store(DoSubscribeOver) x.CurrentPhase.Store(DoSubscribeOver)
x.Cond.Broadcast() x.Cond.Broadcast()

Loading…
Cancel
Save