diff --git a/.github/workflows/api-test.yml b/.github/workflows/api-test.yml deleted file mode 100644 index 0bbc86619..000000000 --- a/.github/workflows/api-test.yml +++ /dev/null @@ -1,90 +0,0 @@ -# Copyright © 2023 OpenIM. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -name: OpenIM API TEST - -on: - push: - branches: - - main - paths-ignore: - - "docs/**" - - "README.md" - - "README_zh-CN.md" - - "CONTRIBUTING.md" - pull_request: - branches: - - main - paths-ignore: - - "README.md" - - "README_zh-CN.md" - - "CONTRIBUTING.md" - - "docs/**" - -env: - GO_VERSION: "1.19" - GOLANGCI_VERSION: "v1.50.1" - -jobs: - execute-linux-systemd-scripts: - name: Execute OpenIM script on ${{ matrix.os }} - runs-on: ${{ matrix.os }} - environment: - name: openim - strategy: - matrix: - go_version: ["1.20"] - os: ["ubuntu-latest"] - steps: - - name: Checkout code - uses: actions/checkout@v4 - - - name: Set up Go ${{ matrix.go_version }} - uses: actions/setup-go@v4 - with: - go-version: ${{ matrix.go_version }} - id: go - - - name: Install Task - uses: arduino/setup-task@v1 - with: - version: '3.x' # If available, use the latest major version that's compatible - repo-token: ${{ secrets.GITHUB_TOKEN }} - - - name: Docker Operations - run: | - curl -o docker-compose.yml https://raw.githubusercontent.com/OpenIMSDK/openim-docker/main/example/basic-openim-server-dependency.yml - sudo docker compose up -d - sudo sleep 60 - - - name: Module Operations - run: | - sudo make tidy - sudo make tools.verify.go-gitlint - - - name: Build, Start, Check Services and Print Logs - run: | - sudo ./scripts/install/install.sh -i && \ - sudo ./scripts/install/install.sh -s && \ - (echo "An error occurred, printing logs:" && sudo cat ./_output/logs/* 2>/dev/null) - - - name: Run Test - run: | - sudo make test-api && \ - (echo "An error occurred, printing logs:" && sudo cat ./_output/logs/* 2>/dev/null) - - - name: Stop Services - run: | - sudo ./scripts/install/install.sh -u && \ - (echo "An error occurred, printing logs:" && sudo cat ./_output/logs/* 2>/dev/null) \ No newline at end of file diff --git a/.github/workflows/build-openim-web-image.yml b/.github/workflows/build-openim-web-image.yml index e040d5270..999c2e1e3 100644 --- a/.github/workflows/build-openim-web-image.yml +++ b/.github/workflows/build-openim-web-image.yml @@ -15,11 +15,11 @@ name: Build OpenIM Web Docker image on: - schedule: - - cron: '30 3 * * *' +# schedule: +# - cron: '30 3 * * *' push: branches: - - main + # - main - release-* tags: - v* diff --git a/.github/workflows/create_branch_on_tag.yml b/.github/workflows/create-branch-on-tag.yml similarity index 100% rename from .github/workflows/create_branch_on_tag.yml rename to .github/workflows/create-branch-on-tag.yml diff --git a/chat b/chat deleted file mode 100755 index b79e06dd7..000000000 Binary files a/chat and /dev/null differ diff --git a/deployments/README.md b/deployments/README.md index e952830ea..0f73a553e 100644 --- a/deployments/README.md +++ b/deployments/README.md @@ -123,6 +123,30 @@ Explore our Helm-Charts repository and read through: [Helm-Charts Repository](ht Using the helm charts repository, you can ignore the following configuration, but if you want to just use the server and scale on top of it, you can go ahead: +**Use the Helm template to generate the deployment yaml file: `openim-charts.yaml`** + +**Gen Image:** + +```bash +../scripts/genconfig.sh ../scripts/install/environment.sh ./templates/helm-image.yaml > ./charts/generated-configs/helm-image.yaml +``` + +**Gen Charts:** + +```bash +for chart in ./charts/*/; do + if [[ "$chart" == *"generated-configs"* || "$chart" == *"helmfile.yaml"* ]]; then + continue + fi + + if [ -f "${chart}values.yaml" ]; then + helm template "$chart" -f "./charts/generated-configs/helm-image.yaml" -f "./charts/generated-configs/config.yaml" -f "./charts/generated-configs/notification.yaml" >> openim-charts.yaml + else + helm template "$chart" >> openim-charts.yaml + fi +done +``` + **Use Helmfile:** ```bash diff --git a/deployments/openim-charts.yaml b/deployments/openim-charts.yaml new file mode 100644 index 000000000..59815dbd5 --- /dev/null +++ b/deployments/openim-charts.yaml @@ -0,0 +1,1262 @@ +--- +# Source: openim-api/templates/app-cm.yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: openim-cm +data: + config.yaml: |+ + notification.yaml: |+ +--- +# Source: openim-api/templates/service.yaml +apiVersion: v1 +kind: Service +metadata: + name: release-name-openim-api + labels: + helm.sh/chart: openim-api-0.1.0 + app.kubernetes.io/name: openim-api + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm +spec: + type: ClusterIP + ports: + - port: 80 + targetPort: http + protocol: TCP + name: http + selector: + app.kubernetes.io/name: openim-api + app.kubernetes.io/instance: release-name +--- +# Source: openim-api/templates/deployment.yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: release-name-openim-api + labels: + helm.sh/chart: openim-api-0.1.0 + app.kubernetes.io/name: openim-api + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: openim-api + app.kubernetes.io/instance: release-name + template: + metadata: + labels: + app.kubernetes.io/name: openim-api + app.kubernetes.io/instance: release-name + spec: + serviceAccountName: default + securityContext: + {} + containers: + - name: openim-api + securityContext: + {} + image: "registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-api:latest" + imagePullPolicy: Always + ports: + - name: http + containerPort: 80 + protocol: TCP + #livenessProbe: + # httpGet: + # path: / + # port: http + #readinessProbe: + # httpGet: + # path: / + # port: http + resources: + {} + volumeMounts: + - mountPath: /openim/openim-server/config/config.yaml + name: config + subPath: config.yaml + - mountPath: /openim/openim-server/config/ + name: config + subPath: notification.yaml + volumes: + - name: config + configMap: + name: openim-cm +--- +# Source: openim-api/templates/ingress.yaml +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: release-name-openim-api + labels: + helm.sh/chart: openim-api-0.1.0 + app.kubernetes.io/name: openim-api + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm + annotations: + nginx.ingress.kubernetes.io/rewrite-target: /./templates/helm-image.yaml + nginx.ingress.kubernetes.io/use-regex: "true" +spec: + ingressClassName: nginx + tls: + - hosts: + - "openim.server.com" + secretName: webapitls + rules: + - host: "openim.server.com" + http: + paths: + - path: /api(/|$)(.*) + pathType: ImplementationSpecific + backend: + service: + name: release-name-openim-api + port: + number: 80 +--- +# Source: openim-msggateway/templates/service.yaml +apiVersion: v1 +kind: Service +metadata: + name: release-name-openim-msggateway + labels: + helm.sh/chart: openim-msggateway-0.1.0 + app.kubernetes.io/name: openim-msggateway + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm +spec: + type: ClusterIP + ports: + - port: 80 + targetPort: http + protocol: TCP + name: http + - port: 88 + targetPort: rpc + protocol: TCP + name: rpc + selector: + app.kubernetes.io/name: openim-msggateway + app.kubernetes.io/instance: release-name +--- +# Source: openim-msggateway/templates/deployment.yaml +# Copyright © 2023 OpenIM. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: release-name-openim-msggateway + labels: + helm.sh/chart: openim-msggateway-0.1.0 + app.kubernetes.io/name: openim-msggateway + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: openim-msggateway + app.kubernetes.io/instance: release-name + template: + metadata: + labels: + app.kubernetes.io/name: openim-msggateway + app.kubernetes.io/instance: release-name + spec: + serviceAccountName: default + securityContext: + {} + containers: + - name: openim-msggateway + securityContext: + {} + image: "registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-api:latest" + imagePullPolicy: Always + ports: + - name: http + containerPort: 80 + protocol: TCP + - name: rpc + containerPort: 88 + protocol: TCP + #livenessProbe: + # httpGet: + # path: / + # port: http + #readinessProbe: + # httpGet: + # path: / + # port: http + resources: + {} + volumeMounts: + - mountPath: /openim/openim-server/config/config.yaml + name: config + subPath: config.yaml + - mountPath: /openim/openim-server/config/ + name: config + subPath: notification.yaml + volumes: + - name: config + configMap: + name: openim-cm +--- +# Source: openim-msggateway/templates/ingress.yaml +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: release-name-openim-msggateway + labels: + helm.sh/chart: openim-msggateway-0.1.0 + app.kubernetes.io/name: openim-msggateway + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm + annotations: + nginx.ingress.kubernetes.io/rewrite-target: /./templates/helm-image.yaml + nginx.ingress.kubernetes.io/use-regex: "true" +spec: + ingressClassName: nginx + tls: + - hosts: + - "openim.server.com" + secretName: webapitls + rules: + - host: "openim.server.com" + http: + paths: + - path: /api(/|$)(.*) + pathType: ImplementationSpecific + backend: + service: + name: release-name-openim-msggateway + port: + number: 80 +--- +# Source: openim-msgtransfer/templates/service.yaml +apiVersion: v1 +kind: Service +metadata: + name: release-name-openim-msgtransfer + labels: + helm.sh/chart: openim-msgtransfer-0.1.0 + app.kubernetes.io/name: openim-msgtransfer + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm +spec: + type: ClusterIP + ports: + - port: 80 + targetPort: http + protocol: TCP + name: http + selector: + app.kubernetes.io/name: openim-msgtransfer + app.kubernetes.io/instance: release-name +--- +# Source: openim-msgtransfer/templates/deployment.yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: release-name-openim-msgtransfer + labels: + helm.sh/chart: openim-msgtransfer-0.1.0 + app.kubernetes.io/name: openim-msgtransfer + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: openim-msgtransfer + app.kubernetes.io/instance: release-name + template: + metadata: + labels: + app.kubernetes.io/name: openim-msgtransfer + app.kubernetes.io/instance: release-name + spec: + serviceAccountName: default + securityContext: + {} + containers: + - name: openim-msgtransfer + securityContext: + {} + image: "registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-api:latest" + imagePullPolicy: Always + ports: + - name: http + containerPort: 80 + protocol: TCP + #livenessProbe: + # httpGet: + # path: / + # port: http + #readinessProbe: + # httpGet: + # path: / + # port: http + resources: + {} + volumeMounts: + - mountPath: /openim/openim-server/config/config.yaml + name: config + subPath: config.yaml + - mountPath: /openim/openim-server/config/ + name: config + subPath: notification.yaml + volumes: + - name: config + configMap: + name: openim-cm +--- +# Source: openim-msgtransfer/templates/ingress.yaml +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: release-name-openim-msgtransfer + labels: + helm.sh/chart: openim-msgtransfer-0.1.0 + app.kubernetes.io/name: openim-msgtransfer + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm + annotations: + nginx.ingress.kubernetes.io/rewrite-target: /./templates/helm-image.yaml + nginx.ingress.kubernetes.io/use-regex: "true" +spec: + ingressClassName: nginx + tls: + - hosts: + - "openim.server.com" + secretName: webapitls + rules: + - host: "openim.server.com" + http: + paths: + - path: /api(/|$)(.*) + pathType: ImplementationSpecific + backend: + service: + name: release-name-openim-msgtransfer + port: + number: 80 +--- +# Source: openim-push/templates/service.yaml +apiVersion: v1 +kind: Service +metadata: + name: release-name-openim-push + labels: + helm.sh/chart: openim-push-0.1.0 + app.kubernetes.io/name: openim-push + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm +spec: + type: ClusterIP + ports: + - port: 80 + targetPort: http + protocol: TCP + name: http + selector: + app.kubernetes.io/name: openim-push + app.kubernetes.io/instance: release-name +--- +# Source: openim-push/templates/deployment.yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: release-name-openim-push + labels: + helm.sh/chart: openim-push-0.1.0 + app.kubernetes.io/name: openim-push + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: openim-push + app.kubernetes.io/instance: release-name + template: + metadata: + labels: + app.kubernetes.io/name: openim-push + app.kubernetes.io/instance: release-name + spec: + serviceAccountName: default + securityContext: + {} + containers: + - name: openim-push + securityContext: + {} + image: "registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-api:latest" + imagePullPolicy: Always + ports: + - name: http + containerPort: 80 + protocol: TCP + #livenessProbe: + # httpGet: + # path: / + # port: http + #readinessProbe: + # httpGet: + # path: / + # port: http + resources: + {} + volumeMounts: + - mountPath: /openim/openim-server/config/config.yaml + name: config + subPath: config.yaml + - mountPath: /openim/openim-server/config/ + name: config + subPath: notification.yaml + volumes: + - name: config + configMap: + name: openim-cm +--- +# Source: openim-push/templates/ingress.yaml +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: release-name-openim-push + labels: + helm.sh/chart: openim-push-0.1.0 + app.kubernetes.io/name: openim-push + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm + annotations: + nginx.ingress.kubernetes.io/rewrite-target: /./templates/helm-image.yaml + nginx.ingress.kubernetes.io/use-regex: "true" +spec: + ingressClassName: nginx + tls: + - hosts: + - "openim.server.com" + secretName: webapitls + rules: + - host: "openim.server.com" + http: + paths: + - path: /api(/|$)(.*) + pathType: ImplementationSpecific + backend: + service: + name: release-name-openim-push + port: + number: 80 +--- +# Source: openim-rpc-auth/templates/service.yaml +apiVersion: v1 +kind: Service +metadata: + name: release-name-openim-rpc-auth + labels: + helm.sh/chart: openim-rpc-auth-0.1.0 + app.kubernetes.io/name: openim-rpc-auth + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm +spec: + type: ClusterIP + ports: + - port: 80 + targetPort: http + protocol: TCP + name: http + selector: + app.kubernetes.io/name: openim-rpc-auth + app.kubernetes.io/instance: release-name +--- +# Source: openim-rpc-auth/templates/deployment.yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: release-name-openim-rpc-auth + labels: + helm.sh/chart: openim-rpc-auth-0.1.0 + app.kubernetes.io/name: openim-rpc-auth + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: openim-rpc-auth + app.kubernetes.io/instance: release-name + template: + metadata: + labels: + app.kubernetes.io/name: openim-rpc-auth + app.kubernetes.io/instance: release-name + spec: + serviceAccountName: default + securityContext: + {} + containers: + - name: openim-rpc-auth + securityContext: + {} + image: "registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-api:latest" + imagePullPolicy: Always + ports: + - name: http + containerPort: 80 + protocol: TCP + #livenessProbe: + # httpGet: + # path: / + # port: http + #readinessProbe: + # httpGet: + # path: / + # port: http + resources: + {} + volumeMounts: + - mountPath: /openim/openim-server/config/config.yaml + name: config + subPath: config.yaml + - mountPath: /openim/openim-server/config/ + name: config + subPath: notification.yaml + volumes: + - name: config + configMap: + name: openim-cm +--- +# Source: openim-rpc-auth/templates/ingress.yaml +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: release-name-openim-rpc-auth + labels: + helm.sh/chart: openim-rpc-auth-0.1.0 + app.kubernetes.io/name: openim-rpc-auth + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm + annotations: + nginx.ingress.kubernetes.io/rewrite-target: /./templates/helm-image.yaml + nginx.ingress.kubernetes.io/use-regex: "true" +spec: + ingressClassName: nginx + tls: + - hosts: + - "openim.server.com" + secretName: webapitls + rules: + - host: "openim.server.com" + http: + paths: + - path: /api(/|$)(.*) + pathType: ImplementationSpecific + backend: + service: + name: release-name-openim-rpc-auth + port: + number: 80 +--- +# Source: openim-rpc-conversation/templates/service.yaml +apiVersion: v1 +kind: Service +metadata: + name: release-name-openim-rpc-conversation + labels: + helm.sh/chart: openim-rpc-conversation-0.1.0 + app.kubernetes.io/name: openim-rpc-conversation + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm +spec: + type: ClusterIP + ports: + - port: 80 + targetPort: http + protocol: TCP + name: http + selector: + app.kubernetes.io/name: openim-rpc-conversation + app.kubernetes.io/instance: release-name +--- +# Source: openim-rpc-conversation/templates/deployment.yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: release-name-openim-rpc-conversation + labels: + helm.sh/chart: openim-rpc-conversation-0.1.0 + app.kubernetes.io/name: openim-rpc-conversation + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: openim-rpc-conversation + app.kubernetes.io/instance: release-name + template: + metadata: + labels: + app.kubernetes.io/name: openim-rpc-conversation + app.kubernetes.io/instance: release-name + spec: + serviceAccountName: default + securityContext: + {} + containers: + - name: openim-rpc-conversation + securityContext: + {} + image: "registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-api:latest" + imagePullPolicy: Always + ports: + - name: http + containerPort: 80 + protocol: TCP + #livenessProbe: + # httpGet: + # path: / + # port: http + #readinessProbe: + # httpGet: + # path: / + # port: http + resources: + {} + volumeMounts: + - mountPath: /openim/openim-server/config/config.yaml + name: config + subPath: config.yaml + - mountPath: /openim/openim-server/config/ + name: config + subPath: notification.yaml + volumes: + - name: config + configMap: + name: openim-cm +--- +# Source: openim-rpc-conversation/templates/ingress.yaml +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: release-name-openim-rpc-conversation + labels: + helm.sh/chart: openim-rpc-conversation-0.1.0 + app.kubernetes.io/name: openim-rpc-conversation + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm + annotations: + nginx.ingress.kubernetes.io/rewrite-target: /./templates/helm-image.yaml + nginx.ingress.kubernetes.io/use-regex: "true" +spec: + ingressClassName: nginx + tls: + - hosts: + - "openim.server.com" + secretName: webapitls + rules: + - host: "openim.server.com" + http: + paths: + - path: /api(/|$)(.*) + pathType: ImplementationSpecific + backend: + service: + name: release-name-openim-rpc-conversation + port: + number: 80 +--- +# Source: openim-rpc-friend/templates/service.yaml +apiVersion: v1 +kind: Service +metadata: + name: release-name-openim-rpc-friend + labels: + helm.sh/chart: openim-rpc-friend-0.1.0 + app.kubernetes.io/name: openim-rpc-friend + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm +spec: + type: ClusterIP + ports: + - port: 80 + targetPort: http + protocol: TCP + name: http + selector: + app.kubernetes.io/name: openim-rpc-friend + app.kubernetes.io/instance: release-name +--- +# Source: openim-rpc-friend/templates/deployment.yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: release-name-openim-rpc-friend + labels: + helm.sh/chart: openim-rpc-friend-0.1.0 + app.kubernetes.io/name: openim-rpc-friend + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: openim-rpc-friend + app.kubernetes.io/instance: release-name + template: + metadata: + labels: + app.kubernetes.io/name: openim-rpc-friend + app.kubernetes.io/instance: release-name + spec: + serviceAccountName: default + securityContext: + {} + containers: + - name: openim-rpc-friend + securityContext: + {} + image: "registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-api:latest" + imagePullPolicy: Always + ports: + - name: http + containerPort: 80 + protocol: TCP + #livenessProbe: + # httpGet: + # path: / + # port: http + #readinessProbe: + # httpGet: + # path: / + # port: http + resources: + {} + volumeMounts: + - mountPath: /openim/openim-server/config/config.yaml + name: config + subPath: config.yaml + - mountPath: /openim/openim-server/config/ + name: config + subPath: notification.yaml + volumes: + - name: config + configMap: + name: openim-cm +--- +# Source: openim-rpc-friend/templates/ingress.yaml +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: release-name-openim-rpc-friend + labels: + helm.sh/chart: openim-rpc-friend-0.1.0 + app.kubernetes.io/name: openim-rpc-friend + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm + annotations: + nginx.ingress.kubernetes.io/rewrite-target: /./templates/helm-image.yaml + nginx.ingress.kubernetes.io/use-regex: "true" +spec: + ingressClassName: nginx + tls: + - hosts: + - "openim.server.com" + secretName: webapitls + rules: + - host: "openim.server.com" + http: + paths: + - path: /api(/|$)(.*) + pathType: ImplementationSpecific + backend: + service: + name: release-name-openim-rpc-friend + port: + number: 80 +--- +# Source: openim-rpc-group/templates/service.yaml +apiVersion: v1 +kind: Service +metadata: + name: release-name-openim-rpc-group + labels: + helm.sh/chart: openim-rpc-group-0.1.0 + app.kubernetes.io/name: openim-rpc-group + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm +spec: + type: ClusterIP + ports: + - port: 80 + targetPort: http + protocol: TCP + name: http + selector: + app.kubernetes.io/name: openim-rpc-group + app.kubernetes.io/instance: release-name +--- +# Source: openim-rpc-group/templates/deployment.yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: release-name-openim-rpc-group + labels: + helm.sh/chart: openim-rpc-group-0.1.0 + app.kubernetes.io/name: openim-rpc-group + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: openim-rpc-group + app.kubernetes.io/instance: release-name + template: + metadata: + labels: + app.kubernetes.io/name: openim-rpc-group + app.kubernetes.io/instance: release-name + spec: + serviceAccountName: default + securityContext: + {} + containers: + - name: openim-rpc-group + securityContext: + {} + image: "registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-api:latest" + imagePullPolicy: Always + ports: + - name: http + containerPort: 80 + protocol: TCP + #livenessProbe: + # httpGet: + # path: / + # port: http + #readinessProbe: + # httpGet: + # path: / + # port: http + resources: + {} + volumeMounts: + - mountPath: /openim/openim-server/config/config.yaml + name: config + subPath: config.yaml + - mountPath: /openim/openim-server/config/ + name: config + subPath: notification.yaml + volumes: + - name: config + configMap: + name: openim-cm +--- +# Source: openim-rpc-group/templates/ingress.yaml +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: release-name-openim-rpc-group + labels: + helm.sh/chart: openim-rpc-group-0.1.0 + app.kubernetes.io/name: openim-rpc-group + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm + annotations: + nginx.ingress.kubernetes.io/rewrite-target: /./templates/helm-image.yaml + nginx.ingress.kubernetes.io/use-regex: "true" +spec: + ingressClassName: nginx + tls: + - hosts: + - "openim.server.com" + secretName: webapitls + rules: + - host: "openim.server.com" + http: + paths: + - path: /api(/|$)(.*) + pathType: ImplementationSpecific + backend: + service: + name: release-name-openim-rpc-group + port: + number: 80 +--- +# Source: openim-rpc-msg/templates/service.yaml +apiVersion: v1 +kind: Service +metadata: + name: release-name-openim-rpc-msg + labels: + helm.sh/chart: openim-rpc-msg-0.1.0 + app.kubernetes.io/name: openim-rpc-msg + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm +spec: + type: ClusterIP + ports: + - port: 80 + targetPort: http + protocol: TCP + name: http + selector: + app.kubernetes.io/name: openim-rpc-msg + app.kubernetes.io/instance: release-name +--- +# Source: openim-rpc-msg/templates/deployment.yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: release-name-openim-rpc-msg + labels: + helm.sh/chart: openim-rpc-msg-0.1.0 + app.kubernetes.io/name: openim-rpc-msg + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: openim-rpc-msg + app.kubernetes.io/instance: release-name + template: + metadata: + labels: + app.kubernetes.io/name: openim-rpc-msg + app.kubernetes.io/instance: release-name + spec: + serviceAccountName: default + securityContext: + {} + containers: + - name: openim-rpc-msg + securityContext: + {} + image: "registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-api:latest" + imagePullPolicy: Always + ports: + - name: http + containerPort: 80 + protocol: TCP + #livenessProbe: + # httpGet: + # path: / + # port: http + #readinessProbe: + # httpGet: + # path: / + # port: http + resources: + {} + volumeMounts: + - mountPath: /openim/openim-server/config/config.yaml + name: config + subPath: config.yaml + - mountPath: /openim/openim-server/config/ + name: config + subPath: notification.yaml + volumes: + - name: config + configMap: + name: openim-cm +--- +# Source: openim-rpc-msg/templates/ingress.yaml +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: release-name-openim-rpc-msg + labels: + helm.sh/chart: openim-rpc-msg-0.1.0 + app.kubernetes.io/name: openim-rpc-msg + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm + annotations: + nginx.ingress.kubernetes.io/rewrite-target: /./templates/helm-image.yaml + nginx.ingress.kubernetes.io/use-regex: "true" +spec: + ingressClassName: nginx + tls: + - hosts: + - "openim.server.com" + secretName: webapitls + rules: + - host: "openim.server.com" + http: + paths: + - path: /api(/|$)(.*) + pathType: ImplementationSpecific + backend: + service: + name: release-name-openim-rpc-msg + port: + number: 80 +--- +# Source: openim-rpc-third/templates/service.yaml +apiVersion: v1 +kind: Service +metadata: + name: release-name-openim-rpc-third + labels: + helm.sh/chart: openim-rpc-third-0.1.0 + app.kubernetes.io/name: openim-rpc-third + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm +spec: + type: ClusterIP + ports: + - port: 80 + targetPort: http + protocol: TCP + name: http + selector: + app.kubernetes.io/name: openim-rpc-third + app.kubernetes.io/instance: release-name +--- +# Source: openim-rpc-third/templates/deployment.yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: release-name-openim-rpc-third + labels: + helm.sh/chart: openim-rpc-third-0.1.0 + app.kubernetes.io/name: openim-rpc-third + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: openim-rpc-third + app.kubernetes.io/instance: release-name + template: + metadata: + labels: + app.kubernetes.io/name: openim-rpc-third + app.kubernetes.io/instance: release-name + spec: + serviceAccountName: default + securityContext: + {} + containers: + - name: openim-rpc-third + securityContext: + {} + image: "registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-api:latest" + imagePullPolicy: Always + ports: + - name: http + containerPort: 80 + protocol: TCP + #livenessProbe: + # httpGet: + # path: / + # port: http + #readinessProbe: + # httpGet: + # path: / + # port: http + resources: + {} + volumeMounts: + - mountPath: /openim/openim-server/config/config.yaml + name: config + subPath: config.yaml + - mountPath: /openim/openim-server/config/ + name: config + subPath: notification.yaml + volumes: + - name: config + configMap: + name: openim-cm +--- +# Source: openim-rpc-third/templates/ingress.yaml +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: release-name-openim-rpc-third + labels: + helm.sh/chart: openim-rpc-third-0.1.0 + app.kubernetes.io/name: openim-rpc-third + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm + annotations: + nginx.ingress.kubernetes.io/rewrite-target: /./templates/helm-image.yaml + nginx.ingress.kubernetes.io/use-regex: "true" +spec: + ingressClassName: nginx + tls: + - hosts: + - "openim.server.com" + secretName: webapitls + rules: + - host: "openim.server.com" + http: + paths: + - path: /api(/|$)(.*) + pathType: ImplementationSpecific + backend: + service: + name: release-name-openim-rpc-third + port: + number: 80 +--- +# Source: openim-rpc-user/templates/service.yaml +apiVersion: v1 +kind: Service +metadata: + name: release-name-openim-rpc-user + labels: + helm.sh/chart: openim-rpc-user-0.1.0 + app.kubernetes.io/name: openim-rpc-user + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm +spec: + type: ClusterIP + ports: + - port: 80 + targetPort: http + protocol: TCP + name: http + selector: + app.kubernetes.io/name: openim-rpc-user + app.kubernetes.io/instance: release-name +--- +# Source: openim-rpc-user/templates/deployment.yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: release-name-openim-rpc-user + labels: + helm.sh/chart: openim-rpc-user-0.1.0 + app.kubernetes.io/name: openim-rpc-user + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: openim-rpc-user + app.kubernetes.io/instance: release-name + template: + metadata: + labels: + app.kubernetes.io/name: openim-rpc-user + app.kubernetes.io/instance: release-name + spec: + serviceAccountName: default + securityContext: + {} + containers: + - name: openim-rpc-user + securityContext: + {} + image: "registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-api:latest" + imagePullPolicy: Always + ports: + - name: http + containerPort: 80 + protocol: TCP + #livenessProbe: + # httpGet: + # path: / + # port: http + #readinessProbe: + # httpGet: + # path: / + # port: http + resources: + {} + volumeMounts: + - mountPath: /openim/openim-server/config/config.yaml + name: config + subPath: config.yaml + - mountPath: /openim/openim-server/config/ + name: config + subPath: notification.yaml + volumes: + - name: config + configMap: + name: openim-cm +--- +# Source: openim-rpc-user/templates/ingress.yaml +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: release-name-openim-rpc-user + labels: + helm.sh/chart: openim-rpc-user-0.1.0 + app.kubernetes.io/name: openim-rpc-user + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: "1.16.0" + app.kubernetes.io/managed-by: Helm + annotations: + nginx.ingress.kubernetes.io/rewrite-target: /./templates/helm-image.yaml + nginx.ingress.kubernetes.io/use-regex: "true" +spec: + ingressClassName: nginx + tls: + - hosts: + - "openim.server.com" + secretName: webapitls + rules: + - host: "openim.server.com" + http: + paths: + - path: /api(/|$)(.*) + pathType: ImplementationSpecific + backend: + service: + name: release-name-openim-rpc-user + port: + number: 80 diff --git a/deployments/templates/chat.yaml b/deployments/templates/chat.yaml index 4f4936b13..3c3862bce 100644 --- a/deployments/templates/chat.yaml +++ b/deployments/templates/chat.yaml @@ -96,7 +96,14 @@ verifyCode: accessKeySecret: "" signName: "" verificationCodeTemplateCode: "" - + mail: # 根据对应的发件邮箱更改 sendMail、senderAuthorizationCode、smtpAddr、smtpPort 即可 + title: "" + senderMail: "" # 发送者 + senderAuthorizationCode: "" # 授权码 + smtpAddr: "smtp.qq.com" # smtp 服务器地址 + smtpPort: 25 # smtp 服务器邮件发送端口 + testDepartMentID: 001 + imAPIURL: http://127.0.0.1:10002 ###################### Proxy Header ###################### # 获取ip的header,没有配置直接获取远程地址 diff --git a/go.mod b/go.mod index bbbfaec41..e7d9097d2 100644 --- a/go.mod +++ b/go.mod @@ -45,6 +45,7 @@ require ( github.com/redis/go-redis/v9 v9.2.1 github.com/tencentyun/cos-go-sdk-v5 v0.7.45 go.uber.org/automaxprocs v1.5.3 + golang.org/x/sync v0.4.0 gopkg.in/src-d/go-git.v4 v4.13.1 gotest.tools v2.2.0+incompatible ) @@ -132,7 +133,6 @@ require ( golang.org/x/arch v0.3.0 // indirect golang.org/x/net v0.17.0 // indirect golang.org/x/oauth2 v0.13.0 // indirect - golang.org/x/sync v0.4.0 // indirect golang.org/x/sys v0.13.0 // indirect golang.org/x/text v0.13.0 // indirect golang.org/x/time v0.3.0 // indirect diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index 670757850..807c4af3b 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -17,22 +17,19 @@ package msggateway import ( "context" - "github.com/OpenIMSDK/tools/mcontext" - - "github.com/openimsdk/open-im-server/v3/pkg/authverify" - - "github.com/OpenIMSDK/tools/errs" "google.golang.org/grpc" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" - "github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/msggateway" "github.com/OpenIMSDK/tools/discoveryregistry" + "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/log" + "github.com/OpenIMSDK/tools/mcontext" "github.com/OpenIMSDK/tools/utils" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" ) @@ -41,6 +38,7 @@ func (s *Server) InitServer(disCov discoveryregistry.SvcDiscoveryRegistry, serve if err != nil { return err } + msgModel := cache.NewMsgCacheModel(rdb) s.LongConnServer.SetDiscoveryRegistry(disCov) s.LongConnServer.SetCacheHandler(msgModel) @@ -97,22 +95,25 @@ func (s *Server) GetUsersOnlineStatus( if !ok { continue } - temp := new(msggateway.GetUsersOnlineStatusResp_SuccessResult) - temp.UserID = userID + + uresp := new(msggateway.GetUsersOnlineStatusResp_SuccessResult) + uresp.UserID = userID for _, client := range clients { - if client != nil { - ps := new(msggateway.GetUsersOnlineStatusResp_SuccessDetail) - ps.Platform = constant.PlatformIDToName(client.PlatformID) - ps.Status = constant.OnlineStatus - ps.ConnID = client.ctx.GetConnID() - ps.Token = client.token - ps.IsBackground = client.IsBackground - temp.Status = constant.OnlineStatus - temp.DetailPlatformStatus = append(temp.DetailPlatformStatus, ps) + if client == nil { + continue } + + ps := new(msggateway.GetUsersOnlineStatusResp_SuccessDetail) + ps.Platform = constant.PlatformIDToName(client.PlatformID) + ps.Status = constant.OnlineStatus + ps.ConnID = client.ctx.GetConnID() + ps.Token = client.token + ps.IsBackground = client.IsBackground + uresp.Status = constant.OnlineStatus + uresp.DetailPlatformStatus = append(uresp.DetailPlatformStatus, ps) } - if temp.Status == constant.OnlineStatus { - resp.SuccessResult = append(resp.SuccessResult, temp) + if uresp.Status == constant.OnlineStatus { + resp.SuccessResult = append(resp.SuccessResult, uresp) } } return &resp, nil @@ -129,50 +130,55 @@ func (s *Server) SuperGroupOnlineBatchPushOneMsg( ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq, ) (*msggateway.OnlineBatchPushOneMsgResp, error) { - var singleUserResult []*msggateway.SingleMsgToUserResults + + var singleUserResults []*msggateway.SingleMsgToUserResults + for _, v := range req.PushToUserIDs { var resp []*msggateway.SingleMsgToUserPlatform - tempT := &msggateway.SingleMsgToUserResults{ + results := &msggateway.SingleMsgToUserResults{ UserID: v, } clients, ok := s.LongConnServer.GetUserAllCons(v) if !ok { log.ZDebug(ctx, "push user not online", "userID", v) - tempT.Resp = resp - singleUserResult = append(singleUserResult, tempT) + results.Resp = resp + singleUserResults = append(singleUserResults, results) continue } + log.ZDebug(ctx, "push user online", "clients", clients, "userID", v) for _, client := range clients { - if client != nil { - temp := &msggateway.SingleMsgToUserPlatform{ - RecvID: v, - RecvPlatFormID: int32(client.PlatformID), - } - if !client.IsBackground || - (client.IsBackground == true && client.PlatformID != constant.IOSPlatformID) { - err := client.PushMessage(ctx, req.MsgData) - if err != nil { - temp.ResultCode = -2 - resp = append(resp, temp) - } else { - if utils.IsContainInt(client.PlatformID, s.pushTerminal) { - tempT.OnlinePush = true - resp = append(resp, temp) - } - } + if client == nil { + continue + } + + userPlatform := &msggateway.SingleMsgToUserPlatform{ + RecvID: v, + RecvPlatFormID: int32(client.PlatformID), + } + if !client.IsBackground || + (client.IsBackground && client.PlatformID != constant.IOSPlatformID) { + err := client.PushMessage(ctx, req.MsgData) + if err != nil { + userPlatform.ResultCode = -2 + resp = append(resp, userPlatform) } else { - temp.ResultCode = -3 - resp = append(resp, temp) + if utils.IsContainInt(client.PlatformID, s.pushTerminal) { + results.OnlinePush = true + resp = append(resp, userPlatform) + } } + } else { + userPlatform.ResultCode = -3 + resp = append(resp, userPlatform) } } - tempT.Resp = resp - singleUserResult = append(singleUserResult, tempT) + results.Resp = resp + singleUserResults = append(singleUserResults, results) } return &msggateway.OnlineBatchPushOneMsgResp{ - SinglePushResult: singleUserResult, + SinglePushResult: singleUserResults, }, nil } @@ -181,17 +187,21 @@ func (s *Server) KickUserOffline( req *msggateway.KickUserOfflineReq, ) (*msggateway.KickUserOfflineResp, error) { for _, v := range req.KickUserIDList { - if clients, _, ok := s.LongConnServer.GetUserPlatformCons(v, int(req.PlatformID)); ok { - for _, client := range clients { - log.ZDebug(ctx, "kick user offline", "userID", v, "platformID", req.PlatformID, "client", client) - if err := client.longConnServer.KickUserConn(client); err != nil { - log.ZWarn(ctx, "kick user offline failed", err, "userID", v, "platformID", req.PlatformID) - } - } - } else { + clients, _, ok := s.LongConnServer.GetUserPlatformCons(v, int(req.PlatformID)) + if !ok { log.ZInfo(ctx, "conn not exist", "userID", v, "platformID", req.PlatformID) + continue + } + + for _, client := range clients { + log.ZDebug(ctx, "kick user offline", "userID", v, "platformID", req.PlatformID, "client", client) + if err := client.longConnServer.KickUserConn(client); err != nil { + log.ZWarn(ctx, "kick user offline failed", err, "userID", v, "platformID", req.PlatformID) + } } + continue } + return &msggateway.KickUserOfflineResp{}, nil } diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index 2466da2eb..10dd988d1 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -23,26 +23,22 @@ import ( "sync/atomic" "time" - "github.com/OpenIMSDK/protocol/msggateway" - - "github.com/openimsdk/open-im-server/v3/pkg/authverify" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" - - "github.com/OpenIMSDK/protocol/constant" - - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" - "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" - + "github.com/go-playground/validator/v10" "github.com/redis/go-redis/v9" + "golang.org/x/sync/errgroup" + "github.com/OpenIMSDK/protocol/constant" + "github.com/OpenIMSDK/protocol/msggateway" "github.com/OpenIMSDK/tools/discoveryregistry" - - "github.com/go-playground/validator/v10" - "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/utils" + + "github.com/openimsdk/open-im-server/v3/pkg/authverify" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" + "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" ) type LongConnServer interface { @@ -61,12 +57,6 @@ type LongConnServer interface { MessageHandler } -var bufferPool = sync.Pool{ - New: func() interface{} { - return make([]byte, 1024) - }, -} - type WsServer struct { port int wsMaxConnNum int64 @@ -78,7 +68,6 @@ type WsServer struct { onlineUserNum atomic.Int64 onlineUserConnNum atomic.Int64 handshakeTimeout time.Duration - hubServer *Server validate *validator.Validate cache cache.MsgModel userClient *rpcclient.UserRpcClient @@ -183,27 +172,39 @@ func (ws *WsServer) Run() error { return http.ListenAndServe(":"+utils.IntToString(ws.port), nil) // Start listening } +var concurrentRequest = 3 + func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *Client) error { conns, err := ws.disCov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName) if err != nil { return err } + + wg := errgroup.Group{} + wg.SetLimit(concurrentRequest) + // Online push user online message to other node for _, v := range conns { + v := v // safe closure var if v.Target() == ws.disCov.GetSelfConnTarget() { log.ZDebug(ctx, "Filter out this node", "node", v.Target()) continue } - msgClient := msggateway.NewMsgGatewayClient(v) - _, err := msgClient.MultiTerminalLoginCheck(ctx, &msggateway.MultiTerminalLoginCheckReq{ - UserID: client.UserID, - PlatformID: int32(client.PlatformID), Token: client.token, + + wg.Go(func() error { + msgClient := msggateway.NewMsgGatewayClient(v) + _, err := msgClient.MultiTerminalLoginCheck(ctx, &msggateway.MultiTerminalLoginCheckReq{ + UserID: client.UserID, + PlatformID: int32(client.PlatformID), Token: client.token, + }) + if err != nil { + log.ZWarn(ctx, "MultiTerminalLoginCheck err", err, "node", v.Target()) + } + return nil }) - if err != nil { - log.ZWarn(ctx, "MultiTerminalLoginCheck err", err, "node", v.Target()) - continue - } } + + _ = wg.Wait() return nil } @@ -289,70 +290,72 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien } fallthrough case constant.AllLoginButSameTermKick: - if clientOK { - isDeleteUser := ws.clients.deleteClients(newClient.UserID, oldClients) - if isDeleteUser { - ws.onlineUserNum.Add(-1) - } - for _, c := range oldClients { - err := c.KickOnlineMessage() - if err != nil { - log.ZWarn(c.ctx, "KickOnlineMessage", err) - } + if !clientOK { + return + } + + isDeleteUser := ws.clients.deleteClients(newClient.UserID, oldClients) + if isDeleteUser { + ws.onlineUserNum.Add(-1) + } + for _, c := range oldClients { + err := c.KickOnlineMessage() + if err != nil { + log.ZWarn(c.ctx, "KickOnlineMessage", err) } - m, err := ws.cache.GetTokensWithoutError( + } + m, err := ws.cache.GetTokensWithoutError( + newClient.ctx, + newClient.UserID, + newClient.PlatformID, + ) + if err != nil && err != redis.Nil { + log.ZWarn( newClient.ctx, + "get token from redis err", + err, + "userID", newClient.UserID, + "platformID", newClient.PlatformID, ) - if err != nil && err != redis.Nil { - log.ZWarn( - newClient.ctx, - "get token from redis err", - err, - "userID", - newClient.UserID, - "platformID", - newClient.PlatformID, - ) - return - } - if m == nil { - log.ZWarn( - newClient.ctx, - "m is nil", - errors.New("m is nil"), - "userID", - newClient.UserID, - "platformID", - newClient.PlatformID, - ) - return - } - log.ZDebug( + return + } + if m == nil { + log.ZWarn( newClient.ctx, - "get token from redis", + "m is nil", + errors.New("m is nil"), "userID", newClient.UserID, "platformID", newClient.PlatformID, - "tokenMap", - m, ) - - for k := range m { - if k != newClient.ctx.GetToken() { - m[k] = constant.KickedToken - } - } - log.ZDebug(newClient.ctx, "set token map is ", "token map", m, "userID", - newClient.UserID, "token", newClient.ctx.GetToken()) - err = ws.cache.SetTokenMapByUidPid(newClient.ctx, newClient.UserID, newClient.PlatformID, m) - if err != nil { - log.ZWarn(newClient.ctx, "SetTokenMapByUidPid err", err, "userID", newClient.UserID, "platformID", newClient.PlatformID) - return + return + } + log.ZDebug( + newClient.ctx, + "get token from redis", + "userID", + newClient.UserID, + "platformID", + newClient.PlatformID, + "tokenMap", + m, + ) + + for k := range m { + if k != newClient.ctx.GetToken() { + m[k] = constant.KickedToken } } + log.ZDebug(newClient.ctx, "set token map is ", "token map", m, "userID", + newClient.UserID, "token", newClient.ctx.GetToken()) + err = ws.cache.SetTokenMapByUidPid(newClient.ctx, newClient.UserID, newClient.PlatformID, m) + if err != nil { + log.ZWarn(newClient.ctx, "SetTokenMapByUidPid err", err, "userID", newClient.UserID, "platformID", newClient.PlatformID) + return + } } } @@ -365,7 +368,7 @@ func (ws *WsServer) unregisterClient(client *Client) { } ws.onlineUserConnNum.Add(-1) ws.SetUserOnlineStatus(client.ctx, client, constant.Offline) - log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num", ws.onlineUserNum, "online user conn Num", + log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num", ws.onlineUserNum.Load(), "online user conn Num", ws.onlineUserConnNum.Load(), ) } @@ -404,7 +407,7 @@ func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) { httpError(connContext, errs.ErrConnArgsErr) return } - if err := authverify.WsVerifyToken(token, userID, platformID); err != nil { + if err = authverify.WsVerifyToken(token, userID, platformID); err != nil { httpError(connContext, err) return } diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index bebf6819a..4ce015543 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -21,14 +21,13 @@ import ( "net/http" "sync" + "github.com/OpenIMSDK/tools/mw" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "github.com/OpenIMSDK/tools/mw" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index b4556634c..b019b0120 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -427,49 +427,62 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim( break } } - rwLock := new(sync.RWMutex) log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset", claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition()) - cMsg := make([]*sarama.ConsumerMessage, 0, 1000) - t := time.NewTicker(time.Millisecond * 100) + + split := 1000 + rwLock := new(sync.RWMutex) + messages := make([]*sarama.ConsumerMessage, 0, 1000) + ticker := time.NewTicker(time.Millisecond * 100) + go func() { for { select { - case <-t.C: - if len(cMsg) > 0 { - rwLock.Lock() - ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) - for _, v := range cMsg { - ccMsg = append(ccMsg, v) - } - cMsg = make([]*sarama.ConsumerMessage, 0, 1000) - rwLock.Unlock() - split := 1000 - ctx := mcontext.WithTriggerIDContext(context.Background(), utils.OperationIDGenerator()) - log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(ccMsg)) - for i := 0; i < len(ccMsg)/split; i++ { - // log.Debug() - och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ - ctx: ctx, cMsgList: ccMsg[i*split : (i+1)*split], - }} - } - if (len(ccMsg) % split) > 0 { - och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ - ctx: ctx, cMsgList: ccMsg[split*(len(ccMsg)/split):], - }} - } - log.ZDebug(ctx, "timer trigger msg consumer end", "length", len(ccMsg)) + case <-ticker.C: + if len(messages) == 0 { + continue + } + + rwLock.Lock() + buffer := make([]*sarama.ConsumerMessage, 0, len(messages)) + buffer = append(buffer, messages...) + + // reuse slice, set cap to 0 + messages = messages[:0] + rwLock.Unlock() + + start := time.Now() + ctx := mcontext.WithTriggerIDContext(context.Background(), utils.OperationIDGenerator()) + log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(buffer)) + for i := 0; i < len(buffer)/split; i++ { + och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + ctx: ctx, cMsgList: buffer[i*split : (i+1)*split], + }} + } + if (len(buffer) % split) > 0 { + och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + ctx: ctx, cMsgList: buffer[split*(len(buffer)/split):], + }} } + + log.ZDebug(ctx, "timer trigger msg consumer end", + "length", len(buffer), "time_cost", time.Since(start), + ) } } }() + for msg := range claim.Messages() { - rwLock.Lock() - if len(msg.Value) != 0 { - cMsg = append(cMsg, msg) + if len(msg.Value) == 0 { + continue } + + rwLock.Lock() + messages = append(messages, msg) rwLock.Unlock() + sess.MarkMessage(msg, "") } + return nil } diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 6f4803628..e22504bbb 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -17,13 +17,18 @@ package tools import ( "context" "fmt" - "sync" + "os" + "os/signal" + "syscall" + "time" + "github.com/redis/go-redis/v9" "github.com/robfig/cron/v3" "github.com/OpenIMSDK/tools/log" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" ) func StartTask() error { @@ -32,23 +37,75 @@ func StartTask() error { if err != nil { return err } - msgTool.ConvertTools() - c := cron.New() - var wg sync.WaitGroup - wg.Add(1) + + msgTool.convertTools() + + rdb, err := cache.NewRedis() + if err != nil { + return err + } + + // register cron tasks + var crontab = cron.New() log.ZInfo(context.Background(), "start chatRecordsClearTime cron task", "cron config", config.Config.ChatRecordsClearTime) - _, err = c.AddFunc(config.Config.ChatRecordsClearTime, msgTool.AllConversationClearMsgAndFixSeq) + _, err = crontab.AddFunc(config.Config.ChatRecordsClearTime, cronWrapFunc(rdb, "cron_clear_msg_and_fix_seq", msgTool.AllConversationClearMsgAndFixSeq)) if err != nil { log.ZError(context.Background(), "start allConversationClearMsgAndFixSeq cron failed", err) panic(err) } + log.ZInfo(context.Background(), "start msgDestruct cron task", "cron config", config.Config.MsgDestructTime) - _, err = c.AddFunc(config.Config.MsgDestructTime, msgTool.ConversationsDestructMsgs) + _, err = crontab.AddFunc(config.Config.MsgDestructTime, cronWrapFunc(rdb, "cron_conversations_destruct_msgs", msgTool.ConversationsDestructMsgs)) if err != nil { log.ZError(context.Background(), "start conversationsDestructMsgs cron failed", err) panic(err) } - c.Start() - wg.Wait() + + // start crontab + crontab.Start() + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + <-sigs + + // stop crontab, Wait for the running task to exit. + ctx := crontab.Stop() + + select { + case <-ctx.Done(): + // graceful exit + + case <-time.After(15 * time.Second): + // forced exit on timeout + } + return nil } + +// netlock redis lock. +func netlock(rdb redis.UniversalClient, key string, ttl time.Duration) bool { + value := "used" + ok, err := rdb.SetNX(context.Background(), key, value, ttl).Result() // nolint + if err != nil { + // when err is about redis server, return true. + return false + } + + return ok +} + +func cronWrapFunc(rdb redis.UniversalClient, key string, fn func()) func() { + enableCronLocker := config.Config.EnableCronLocker + return func() { + // if don't enable cron-locker, call fn directly. + if !enableCronLocker { + fn() + return + } + + // when acquire redis lock, call fn(). + if netlock(rdb, key, 5*time.Second) { + fn() + } + } +} diff --git a/internal/tools/cron_task_test.go b/internal/tools/cron_task_test.go new file mode 100644 index 000000000..1f4f1f5c1 --- /dev/null +++ b/internal/tools/cron_task_test.go @@ -0,0 +1,82 @@ +package tools + +import ( + "fmt" + "math/rand" + "sync" + "testing" + "time" + + "github.com/redis/go-redis/v9" + "github.com/robfig/cron/v3" + "github.com/stretchr/testify/assert" + + "github.com/openimsdk/open-im-server/v3/pkg/common/config" +) + +func TestDisLock(t *testing.T) { + rdb := redis.NewClient(&redis.Options{}) + defer rdb.Close() + + assert.Equal(t, true, netlock(rdb, "cron-1", 1*time.Second)) + + // if exists, get false + assert.Equal(t, false, netlock(rdb, "cron-1", 1*time.Second)) + + time.Sleep(2 * time.Second) + + // wait for key on timeout, get true + assert.Equal(t, true, netlock(rdb, "cron-1", 2*time.Second)) + + // set different key + assert.Equal(t, true, netlock(rdb, "cron-2", 2*time.Second)) +} + +func TestCronWrapFunc(t *testing.T) { + rdb := redis.NewClient(&redis.Options{}) + defer rdb.Close() + + once := sync.Once{} + done := make(chan struct{}, 1) + cb := func() { + once.Do(func() { + close(done) + }) + } + + start := time.Now() + key := fmt.Sprintf("cron-%v", rand.Int31()) + crontab := cron.New(cron.WithSeconds()) + crontab.AddFunc("*/1 * * * * *", cronWrapFunc(rdb, key, cb)) + crontab.Start() + <-done + + dur := time.Since(start) + assert.LessOrEqual(t, dur.Seconds(), float64(2*time.Second)) + crontab.Stop() +} + +func TestCronWrapFuncWithNetlock(t *testing.T) { + config.Config.EnableCronLocker = true + rdb := redis.NewClient(&redis.Options{}) + defer rdb.Close() + + done := make(chan string, 10) + + crontab := cron.New(cron.WithSeconds()) + + key := fmt.Sprintf("cron-%v", rand.Int31()) + crontab.AddFunc("*/1 * * * * *", cronWrapFunc(rdb, key, func() { + done <- "host1" + })) + crontab.AddFunc("*/1 * * * * *", cronWrapFunc(rdb, key, func() { + done <- "host2" + })) + crontab.Start() + + time.Sleep(12 * time.Second) + // the ttl of netlock is 5s, so expected value is 2. + assert.Equal(t, len(done), 2) + + crontab.Stop() +} diff --git a/internal/tools/msg_doc_convert.go b/internal/tools/msg_doc_convert.go index aa24d385f..758625be1 100644 --- a/internal/tools/msg_doc_convert.go +++ b/internal/tools/msg_doc_convert.go @@ -22,7 +22,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" ) -func (c *MsgTool) ConvertTools() { +func (c *MsgTool) convertTools() { ctx := mcontext.NewCtx("convert") conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx) if err != nil { diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 261e3d8c0..94688b0fb 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -78,10 +78,11 @@ type configStruct struct { } `yaml:"mongo"` Redis struct { - ClusterMode bool `yaml:"clusterMode"` - Address []string `yaml:"address"` - Username string `yaml:"username"` - Password string `yaml:"password"` + ClusterMode bool `yaml:"clusterMode"` + Address []string `yaml:"address"` + Username string `yaml:"username"` + Password string `yaml:"password"` + EnablePipeline bool `yaml:"enablePipeline"` } `yaml:"redis"` Kafka struct { @@ -231,6 +232,7 @@ type configStruct struct { ChatRecordsClearTime string `yaml:"chatRecordsClearTime"` MsgDestructTime string `yaml:"msgDestructTime"` Secret string `yaml:"secret"` + EnableCronLocker bool `yaml:"enableCronLocker"` TokenPolicy struct { Expire int64 `yaml:"expire"` } `yaml:"tokenPolicy"` diff --git a/pkg/common/db/cache/init_redis.go b/pkg/common/db/cache/init_redis.go index 1a5507f89..77b38d9b7 100644 --- a/pkg/common/db/cache/init_redis.go +++ b/pkg/common/db/cache/init_redis.go @@ -28,12 +28,21 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" ) +var ( + // singleton pattern. + redisClient redis.UniversalClient +) + const ( maxRetry = 10 // number of retries ) // NewRedis Initialize redis connection. func NewRedis() (redis.UniversalClient, error) { + if redisClient != nil { + return redisClient, nil + } + if len(config.Config.Redis.Address) == 0 { return nil, errors.New("redis address is empty") } @@ -66,5 +75,6 @@ func NewRedis() (redis.UniversalClient, error) { return nil, fmt.Errorf("redis ping %w", err) } + redisClient = rdb return rdb, err } diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index 50fb617aa..c8346a1d4 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -21,6 +21,7 @@ import ( "time" "github.com/dtm-labs/rockscache" + "golang.org/x/sync/errgroup" unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation" @@ -62,6 +63,8 @@ const ( uidPidToken = "UID_PID_TOKEN_STATUS:" ) +var concurrentLimit = 3 + type SeqCache interface { SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) @@ -345,85 +348,165 @@ func (c *msgCache) allMessageCacheKey(conversationID string) string { } func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { + if config.Config.Redis.EnablePipeline { + return c.PipeGetMessagesBySeq(ctx, conversationID, seqs) + } + + return c.ParallelGetMessagesBySeq(ctx, conversationID, seqs) +} + +func (c *msgCache) PipeGetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { + pipe := c.rdb.Pipeline() + + results := []*redis.StringCmd{} for _, seq := range seqs { - res, err := c.rdb.Get(ctx, c.getMessageCacheKey(conversationID, seq)).Result() - if err != nil { - log.ZError(ctx, "GetMessagesBySeq failed", err, "conversationID", conversationID, "seq", seq) + results = append(results, pipe.Get(ctx, c.getMessageCacheKey(conversationID, seq))) + } + + _, err = pipe.Exec(ctx) + if err != nil && err != redis.Nil { + return seqMsgs, failedSeqs, errs.Wrap(err, "pipe.get") + } + + for idx, res := range results { + seq := seqs[idx] + if res.Err() != nil { + log.ZError(ctx, "GetMessagesBySeq failed", err, "conversationID", conversationID, "seq", seq, "err", res.Err()) failedSeqs = append(failedSeqs, seq) continue } + msg := sdkws.MsgData{} - if err = msgprocessor.String2Pb(res, &msg); err != nil { + if err = msgprocessor.String2Pb(res.Val(), &msg); err != nil { log.ZError(ctx, "GetMessagesBySeq Unmarshal failed", err, "res", res, "conversationID", conversationID, "seq", seq) failedSeqs = append(failedSeqs, seq) continue } + if msg.Status == constant.MsgDeleted { failedSeqs = append(failedSeqs, seq) continue } + seqMsgs = append(seqMsgs, &msg) } return - //pipe := c.rdb.Pipeline() - //for _, v := range seqs { - // // MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 - // key := c.getMessageCacheKey(conversationID, v) - // if err := pipe.Get(ctx, key).Err(); err != nil && err != redis.Nil { - // return nil, nil, err - // } - //} - //result, err := pipe.Exec(ctx) - //for i, v := range result { - // cmd := v.(*redis.StringCmd) - // if cmd.Err() != nil { - // failedSeqs = append(failedSeqs, seqs[i]) - // } else { - // msg := sdkws.MsgData{} - // err = msgprocessor.String2Pb(cmd.Val(), &msg) - // if err == nil { - // if msg.Status != constant.MsgDeleted { - // seqMsgs = append(seqMsgs, &msg) - // continue - // } - // } else { - // log.ZWarn(ctx, "UnmarshalString failed", err, "conversationID", conversationID, "seq", seqs[i], "msg", cmd.Val()) - // } - // failedSeqs = append(failedSeqs, seqs[i]) - // } - //} - //return seqMsgs, failedSeqs, err +} + +func (c *msgCache) ParallelGetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { + type entry struct { + err error + msg *sdkws.MsgData + } + + wg := errgroup.Group{} + wg.SetLimit(concurrentLimit) + + results := make([]entry, len(seqs)) // set slice len/cap to length of seqs. + for idx, seq := range seqs { + // closure safe var + idx := idx + seq := seq + + wg.Go(func() error { + res, err := c.rdb.Get(ctx, c.getMessageCacheKey(conversationID, seq)).Result() + if err != nil { + log.ZError(ctx, "GetMessagesBySeq failed", err, "conversationID", conversationID, "seq", seq) + results[idx] = entry{err: err} + return nil + } + + msg := sdkws.MsgData{} + if err = msgprocessor.String2Pb(res, &msg); err != nil { + log.ZError(ctx, "GetMessagesBySeq Unmarshal failed", err, "res", res, "conversationID", conversationID, "seq", seq) + results[idx] = entry{err: err} + return nil + } + + if msg.Status == constant.MsgDeleted { + results[idx] = entry{err: err} + return nil + } + + results[idx] = entry{msg: &msg} + return nil + }) + } + + _ = wg.Wait() + + for idx, res := range results { + if res.err != nil { + failedSeqs = append(failedSeqs, seqs[idx]) + continue + } + + seqMsgs = append(seqMsgs, res.msg) + } + + return } func (c *msgCache) SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) { + if config.Config.Redis.EnablePipeline { + return c.PipeSetMessageToCache(ctx, conversationID, msgs) + } + return c.ParallelSetMessageToCache(ctx, conversationID, msgs) +} + +func (c *msgCache) PipeSetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) { + pipe := c.rdb.Pipeline() for _, msg := range msgs { s, err := msgprocessor.Pb2String(msg) if err != nil { - return 0, errs.Wrap(err) + return 0, errs.Wrap(err, "pb.marshal") } + key := c.getMessageCacheKey(conversationID, msg.Seq) - if err := c.rdb.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { + _ = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second) + } + + results, err := pipe.Exec(ctx) + if err != nil { + return 0, errs.Wrap(err, "pipe.set") + } + + for _, res := range results { + if res.Err() != nil { return 0, errs.Wrap(err) } } + + return len(msgs), nil +} + +func (c *msgCache) ParallelSetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) { + wg := errgroup.Group{} + wg.SetLimit(concurrentLimit) + + for _, msg := range msgs { + msg := msg // closure safe var + wg.Go(func() error { + s, err := msgprocessor.Pb2String(msg) + if err != nil { + return errs.Wrap(err) + } + + key := c.getMessageCacheKey(conversationID, msg.Seq) + if err := c.rdb.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { + return errs.Wrap(err) + } + return nil + }) + } + + err := wg.Wait() + if err != nil { + return 0, err + } + return len(msgs), nil - //pipe := c.rdb.Pipeline() - //var failedMsgs []*sdkws.MsgData - //for _, msg := range msgs { - // key := c.getMessageCacheKey(conversationID, msg.Seq) - // s, err := msgprocessor.Pb2String(msg) - // if err != nil { - // return 0, errs.Wrap(err) - // } - // err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err() - // if err != nil { - // failedMsgs = append(failedMsgs, msg) - // log.ZWarn(ctx, "set msg 2 cache failed", err, "msg", failedMsgs) - // } - //} - //_, err := pipe.Exec(ctx) - //return len(failedMsgs), err } func (c *msgCache) getMessageDelUserListKey(conversationID string, seq int64) string { diff --git a/pkg/common/db/cache/msg_test.go b/pkg/common/db/cache/msg_test.go new file mode 100644 index 000000000..c5a4fb870 --- /dev/null +++ b/pkg/common/db/cache/msg_test.go @@ -0,0 +1,251 @@ +package cache + +import ( + "context" + "fmt" + "math/rand" + "testing" + + "github.com/OpenIMSDK/protocol/sdkws" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/assert" +) + +func TestParallelSetMessageToCache(t *testing.T) { + var ( + cid = fmt.Sprintf("cid-%v", rand.Int63()) + seqFirst = rand.Int63() + msgs = []*sdkws.MsgData{} + ) + + for i := 0; i < 100; i++ { + msgs = append(msgs, &sdkws.MsgData{ + Seq: seqFirst + int64(i), + }) + } + + testParallelSetMessageToCache(t, cid, msgs) +} + +func testParallelSetMessageToCache(t *testing.T, cid string, msgs []*sdkws.MsgData) { + rdb := redis.NewClient(&redis.Options{}) + defer rdb.Close() + + cacher := msgCache{rdb: rdb} + + ret, err := cacher.ParallelSetMessageToCache(context.Background(), cid, msgs) + assert.Nil(t, err) + assert.Equal(t, len(msgs), ret) + + // validate + for _, msg := range msgs { + key := cacher.getMessageCacheKey(cid, msg.Seq) + val, err := rdb.Exists(context.Background(), key).Result() + assert.Nil(t, err) + assert.EqualValues(t, 1, val) + } +} + +func TestPipeSetMessageToCache(t *testing.T) { + var ( + cid = fmt.Sprintf("cid-%v", rand.Int63()) + seqFirst = rand.Int63() + msgs = []*sdkws.MsgData{} + ) + + for i := 0; i < 100; i++ { + msgs = append(msgs, &sdkws.MsgData{ + Seq: seqFirst + int64(i), + }) + } + + testPipeSetMessageToCache(t, cid, msgs) +} + +func testPipeSetMessageToCache(t *testing.T, cid string, msgs []*sdkws.MsgData) { + rdb := redis.NewClient(&redis.Options{}) + defer rdb.Close() + + cacher := msgCache{rdb: rdb} + + ret, err := cacher.PipeSetMessageToCache(context.Background(), cid, msgs) + assert.Nil(t, err) + assert.Equal(t, len(msgs), ret) + + // validate + for _, msg := range msgs { + key := cacher.getMessageCacheKey(cid, msg.Seq) + val, err := rdb.Exists(context.Background(), key).Result() + assert.Nil(t, err) + assert.EqualValues(t, 1, val) + } +} + +func TestGetMessagesBySeq(t *testing.T) { + var ( + cid = fmt.Sprintf("cid-%v", rand.Int63()) + seqFirst = rand.Int63() + msgs = []*sdkws.MsgData{} + ) + + seqs := []int64{} + for i := 0; i < 100; i++ { + msgs = append(msgs, &sdkws.MsgData{ + Seq: seqFirst + int64(i), + SendID: fmt.Sprintf("fake-sendid-%v", i), + }) + seqs = append(seqs, seqFirst+int64(i)) + } + + // set data to cache + testPipeSetMessageToCache(t, cid, msgs) + + // get data from cache with parallet mode + testParallelGetMessagesBySeq(t, cid, seqs, msgs) + + // get data from cache with pipeline mode + testPipeGetMessagesBySeq(t, cid, seqs, msgs) +} + +func testParallelGetMessagesBySeq(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) { + rdb := redis.NewClient(&redis.Options{}) + defer rdb.Close() + + cacher := msgCache{rdb: rdb} + + respMsgs, failedSeqs, err := cacher.ParallelGetMessagesBySeq(context.Background(), cid, seqs) + assert.Nil(t, err) + assert.Equal(t, 0, len(failedSeqs)) + assert.Equal(t, len(respMsgs), len(seqs)) + + // validate + for idx, msg := range respMsgs { + assert.Equal(t, msg.Seq, inputMsgs[idx].Seq) + assert.Equal(t, msg.SendID, inputMsgs[idx].SendID) + } +} + +func testPipeGetMessagesBySeq(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) { + rdb := redis.NewClient(&redis.Options{}) + defer rdb.Close() + + cacher := msgCache{rdb: rdb} + + respMsgs, failedSeqs, err := cacher.PipeGetMessagesBySeq(context.Background(), cid, seqs) + assert.Nil(t, err) + assert.Equal(t, 0, len(failedSeqs)) + assert.Equal(t, len(respMsgs), len(seqs)) + + // validate + for idx, msg := range respMsgs { + assert.Equal(t, msg.Seq, inputMsgs[idx].Seq) + assert.Equal(t, msg.SendID, inputMsgs[idx].SendID) + } +} + +func TestGetMessagesBySeqWithEmptySeqs(t *testing.T) { + var ( + cid = fmt.Sprintf("cid-%v", rand.Int63()) + seqFirst int64 = 0 + msgs = []*sdkws.MsgData{} + ) + + seqs := []int64{} + for i := 0; i < 100; i++ { + msgs = append(msgs, &sdkws.MsgData{ + Seq: seqFirst + int64(i), + SendID: fmt.Sprintf("fake-sendid-%v", i), + }) + seqs = append(seqs, seqFirst+int64(i)) + } + + // don't set cache, only get data from cache. + + // get data from cache with parallet mode + testParallelGetMessagesBySeqWithEmptry(t, cid, seqs, msgs) + + // get data from cache with pipeline mode + testPipeGetMessagesBySeqWithEmptry(t, cid, seqs, msgs) +} + +func testParallelGetMessagesBySeqWithEmptry(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) { + rdb := redis.NewClient(&redis.Options{}) + defer rdb.Close() + + cacher := msgCache{rdb: rdb} + + respMsgs, failedSeqs, err := cacher.ParallelGetMessagesBySeq(context.Background(), cid, seqs) + assert.Nil(t, err) + assert.Equal(t, len(seqs), len(failedSeqs)) + assert.Equal(t, 0, len(respMsgs)) +} + +func testPipeGetMessagesBySeqWithEmptry(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) { + rdb := redis.NewClient(&redis.Options{}) + defer rdb.Close() + + cacher := msgCache{rdb: rdb} + + respMsgs, failedSeqs, err := cacher.PipeGetMessagesBySeq(context.Background(), cid, seqs) + assert.Equal(t, err, redis.Nil) + assert.Equal(t, len(seqs), len(failedSeqs)) + assert.Equal(t, 0, len(respMsgs)) +} + +func TestGetMessagesBySeqWithLostHalfSeqs(t *testing.T) { + var ( + cid = fmt.Sprintf("cid-%v", rand.Int63()) + seqFirst int64 = 0 + msgs = []*sdkws.MsgData{} + ) + + seqs := []int64{} + for i := 0; i < 100; i++ { + msgs = append(msgs, &sdkws.MsgData{ + Seq: seqFirst + int64(i), + SendID: fmt.Sprintf("fake-sendid-%v", i), + }) + seqs = append(seqs, seqFirst+int64(i)) + } + + // Only set half the number of messages. + testParallelSetMessageToCache(t, cid, msgs[:50]) + + // get data from cache with parallet mode + testParallelGetMessagesBySeqWithLostHalfSeqs(t, cid, seqs, msgs) + + // get data from cache with pipeline mode + testPipeGetMessagesBySeqWithLostHalfSeqs(t, cid, seqs, msgs) +} + +func testParallelGetMessagesBySeqWithLostHalfSeqs(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) { + rdb := redis.NewClient(&redis.Options{}) + defer rdb.Close() + + cacher := msgCache{rdb: rdb} + + respMsgs, failedSeqs, err := cacher.ParallelGetMessagesBySeq(context.Background(), cid, seqs) + assert.Nil(t, err) + assert.Equal(t, len(seqs)/2, len(failedSeqs)) + assert.Equal(t, len(seqs)/2, len(respMsgs)) + + for idx, msg := range respMsgs { + assert.Equal(t, msg.Seq, seqs[idx]) + } +} + +func testPipeGetMessagesBySeqWithLostHalfSeqs(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) { + rdb := redis.NewClient(&redis.Options{}) + defer rdb.Close() + + cacher := msgCache{rdb: rdb} + + respMsgs, failedSeqs, err := cacher.PipeGetMessagesBySeq(context.Background(), cid, seqs) + assert.Nil(t, err) + assert.Equal(t, len(seqs)/2, len(failedSeqs)) + assert.Equal(t, len(seqs)/2, len(respMsgs)) + + for idx, msg := range respMsgs { + assert.Equal(t, msg.Seq, seqs[idx]) + } +} diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 183874657..fb0a9c702 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -16,9 +16,14 @@ package controller import ( "context" + "encoding/json" "errors" "time" + "github.com/OpenIMSDK/protocol/constant" + + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" + "github.com/redis/go-redis/v9" "github.com/OpenIMSDK/tools/errs" @@ -32,7 +37,6 @@ import ( unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation" "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" "github.com/openimsdk/open-im-server/v3/pkg/common/kafka" - "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" pbmsg "github.com/OpenIMSDK/protocol/msg" "github.com/OpenIMSDK/protocol/sdkws" @@ -398,7 +402,7 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) { // log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs) - msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, seqs) + msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, conversationID, seqs) if err != nil { return nil, err } @@ -409,12 +413,70 @@ func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversat return totalMsgs, nil } -func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID string, seqs []int64) (totalMsgs []*unrelationtb.MsgInfoModel, err error) { +func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][]*unrelationtb.MsgInfoModel, userID, conversationID string, msg *unrelationtb.MsgInfoModel) { + if msg.IsRead { + msg.Msg.IsRead = true + } + if msg.Msg.ContentType != constant.Quote { + return + } + if msg.Msg.Content == "" { + return + } + var quoteMsg struct { + Text string `json:"text,omitempty"` + QuoteMessage *sdkws.MsgData `json:"quoteMessage,omitempty"` + MessageEntityList json.RawMessage `json:"messageEntityList,omitempty"` + } + if err := json.Unmarshal([]byte(msg.Msg.Content), "eMsg); err != nil { + log.ZError(ctx, "json.Unmarshal", err) + return + } + if quoteMsg.QuoteMessage == nil || quoteMsg.QuoteMessage.ContentType == constant.MsgRevokeNotification { + return + } + var msgs []*unrelationtb.MsgInfoModel + if v, ok := cache[quoteMsg.QuoteMessage.Seq]; ok { + msgs = v + } else { + if quoteMsg.QuoteMessage.Seq > 0 { + ms, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, db.msg.GetDocID(conversationID, quoteMsg.QuoteMessage.Seq), []int64{quoteMsg.QuoteMessage.Seq}) + if err != nil { + log.ZError(ctx, "GetMsgBySeqIndexIn1Doc", err, "conversationID", conversationID, "seq", quoteMsg.QuoteMessage.Seq) + return + } + msgs = ms + cache[quoteMsg.QuoteMessage.Seq] = ms + } + } + if len(msgs) != 0 && msgs[0].Msg.ContentType != constant.MsgRevokeNotification { + return + } + quoteMsg.QuoteMessage.ContentType = constant.MsgRevokeNotification + if len(msgs) > 0 { + quoteMsg.QuoteMessage.Content = []byte(msgs[0].Msg.Content) + } else { + quoteMsg.QuoteMessage.Content = []byte("{}") + } + data, err := json.Marshal("eMsg) + if err != nil { + log.ZError(ctx, "json.Marshal", err) + return + } + msg.Msg.Content = string(data) + if _, err := db.msgDocDatabase.UpdateMsg(ctx, db.msg.GetDocID(conversationID, msg.Msg.Seq), db.msg.GetMsgIndex(msg.Msg.Seq), "msg", msg.Msg); err != nil { + log.ZError(ctx, "UpdateMsgContent", err) + } +} + +func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID string, conversationID string, seqs []int64) (totalMsgs []*unrelationtb.MsgInfoModel, err error) { msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, docID, seqs) + if err != nil { + return nil, err + } + tempCache := make(map[int64][]*unrelationtb.MsgInfoModel) for _, msg := range msgs { - if msg.IsRead { - msg.Msg.IsRead = true - } + db.handlerDBMsg(ctx, tempCache, userID, conversationID, msg) } return msgs, err } @@ -423,7 +485,7 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin log.ZDebug(ctx, "getMsgBySeqsRange", "conversationID", conversationID, "allSeqs", allSeqs, "begin", begin, "end", end) for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) { log.ZDebug(ctx, "getMsgBySeqsRange", "docID", docID, "seqs", seqs) - msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, seqs) + msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, conversationID, seqs) if err != nil { return nil, err } diff --git a/pkg/common/db/controller/msg_test.go b/pkg/common/db/controller/msg_test.go index 80e2db122..ba5aecd25 100644 --- a/pkg/common/db/controller/msg_test.go +++ b/pkg/common/db/controller/msg_test.go @@ -144,9 +144,9 @@ func Test_BatchInsertChat2DB(t *testing.T) { } func GetDB() *commonMsgDatabase { - config.Config.Mongo.Address = []string{"192.168.44.128:37017"} + config.Config.Mongo.Address = []string{"203.56.175.233:37017"} // config.Config.Mongo.Timeout = 60 - config.Config.Mongo.Database = "openIM" + config.Config.Mongo.Database = "openIM_v3" // config.Config.Mongo.Source = "admin" config.Config.Mongo.Username = "root" config.Config.Mongo.Password = "openIM123" @@ -232,37 +232,17 @@ func Test_FindBySeq(t *testing.T) { // } //} -//func Test_Delete1(t *testing.T) { -// config.Config.Mongo.DBAddress = []string{"192.168.44.128:37017"} -// config.Config.Mongo.DBTimeout = 60 -// config.Config.Mongo.DBDatabase = "openIM" -// config.Config.Mongo.DBSource = "admin" -// config.Config.Mongo.DBUserName = "root" -// config.Config.Mongo.DBPassword = "openIM123" -// config.Config.Mongo.DBMaxPoolSize = 100 -// config.Config.Mongo.DBRetainChatRecords = 3650 -// config.Config.Mongo.ChatRecordsClearTime = "0 2 * * 3" -// -// mongo, err := unrelation.NewMongo() -// if err != nil { -// panic(err) -// } -// err = mongo.GetDatabase().Client().Ping(context.Background(), nil) -// if err != nil { -// panic(err) -// } -// -// c := mongo.GetClient().Database("openIM").Collection("msg") -// -// var o unrelationtb.MsgDocModel -// -// err = c.FindOne(context.Background(), bson.M{"doc_id": "test:0"}).Decode(&o) -// if err != nil { -// panic(err) -// } -// -// for i, model := range o.Msg { -// fmt.Println(i, model == nil) -// } -// -//} +func TestName(t *testing.T) { + db := GetDB() + var seqs []int64 + for i := int64(1); i <= 4; i++ { + seqs = append(seqs, i) + } + msgs, err := db.getMsgBySeqsRange(context.Background(), "4931176757", "si_3866692501_4931176757", seqs, seqs[0], seqs[len(seqs)-1]) + if err != nil { + t.Fatal(err) + } + + t.Log(msgs) + +} diff --git a/pkg/common/db/s3/cont/controller.go b/pkg/common/db/s3/cont/controller.go index 09025e130..7040c7306 100644 --- a/pkg/common/db/s3/cont/controller.go +++ b/pkg/common/db/s3/cont/controller.go @@ -175,7 +175,6 @@ func (c *Controller) CompleteUpload(ctx context.Context, uploadID string, partHa return nil, err } if md5Sum := md5.Sum([]byte(strings.Join(partHashs, partSeparator))); hex.EncodeToString(md5Sum[:]) != upload.Hash { - fmt.Println("CompleteUpload sum:", hex.EncodeToString(md5Sum[:]), "upload hash:", upload.Hash) return nil, errors.New("md5 mismatching") } if info, err := c.StatObject(ctx, c.HashPath(upload.Hash)); err == nil { diff --git a/pkg/common/ginprometheus/ginprometheus.go b/pkg/common/ginprometheus/ginprometheus.go index 50a6c3a2c..a325595d6 100644 --- a/pkg/common/ginprometheus/ginprometheus.go +++ b/pkg/common/ginprometheus/ginprometheus.go @@ -3,7 +3,7 @@ package ginprometheus import ( "bytes" "fmt" - "io/ioutil" + "io" "net/http" "os" "strconv" @@ -17,40 +17,42 @@ import ( var defaultMetricPath = "/metrics" // counter, counter_vec, gauge, gauge_vec, -// histogram, histogram_vec, summary, summary_vec -var reqCnt = &Metric{ - ID: "reqCnt", - Name: "requests_total", - Description: "How many HTTP requests processed, partitioned by status code and HTTP method.", - Type: "counter_vec", - Args: []string{"code", "method", "handler", "host", "url"}} - -var reqDur = &Metric{ - ID: "reqDur", - Name: "request_duration_seconds", - Description: "The HTTP request latencies in seconds.", - Type: "histogram_vec", - Args: []string{"code", "method", "url"}, -} +// histogram, histogram_vec, summary, summary_vec. +var ( + reqCounter = &Metric{ + ID: "reqCnt", + Name: "requests_total", + Description: "How many HTTP requests processed, partitioned by status code and HTTP method.", + Type: "counter_vec", + Args: []string{"code", "method", "handler", "host", "url"}} + + reqDuration = &Metric{ + ID: "reqDur", + Name: "request_duration_seconds", + Description: "The HTTP request latencies in seconds.", + Type: "histogram_vec", + Args: []string{"code", "method", "url"}, + } -var resSz = &Metric{ - ID: "resSz", - Name: "response_size_bytes", - Description: "The HTTP response sizes in bytes.", - Type: "summary"} - -var reqSz = &Metric{ - ID: "reqSz", - Name: "request_size_bytes", - Description: "The HTTP request sizes in bytes.", - Type: "summary"} - -var standardMetrics = []*Metric{ - reqCnt, - reqDur, - resSz, - reqSz, -} + resSize = &Metric{ + ID: "resSz", + Name: "response_size_bytes", + Description: "The HTTP response sizes in bytes.", + Type: "summary"} + + reqSize = &Metric{ + ID: "reqSz", + Name: "request_size_bytes", + Description: "The HTTP request sizes in bytes.", + Type: "summary"} + + standardMetrics = []*Metric{ + reqCounter, + reqDuration, + resSize, + reqSize, + } +) /* RequestCounterURLLabelMappingFn is a function which can be supplied to the middleware to control @@ -74,7 +76,7 @@ which would map "/customer/alice" and "/customer/bob" to their template "/custom type RequestCounterURLLabelMappingFn func(c *gin.Context) string // Metric is a definition for the name, description, type, ID, and -// prometheus.Collector type (i.e. CounterVec, Summary, etc) of each metric +// prometheus.Collector type (i.e. CounterVec, Summary, etc) of each metric. type Metric struct { MetricCollector prometheus.Collector ID string @@ -84,7 +86,7 @@ type Metric struct { Args []string } -// Prometheus contains the metrics gathered by the instance and its path +// Prometheus contains the metrics gathered by the instance and its path. type Prometheus struct { reqCnt *prometheus.CounterVec reqDur *prometheus.HistogramVec @@ -102,7 +104,7 @@ type Prometheus struct { URLLabelFromContext string } -// PrometheusPushGateway contains the configuration for pushing to a Prometheus pushgateway (optional) +// PrometheusPushGateway contains the configuration for pushing to a Prometheus pushgateway (optional). type PrometheusPushGateway struct { // Push interval in seconds @@ -112,7 +114,7 @@ type PrometheusPushGateway struct { // where JOBNAME can be any string of your choice PushGatewayURL string - // Local metrics URL where metrics are fetched from, this could be ommited in the future + // Local metrics URL where metrics are fetched from, this could be omitted in the future // if implemented using prometheus common/expfmt instead MetricsURL string @@ -120,9 +122,11 @@ type PrometheusPushGateway struct { Job string } -// NewPrometheus generates a new set of metrics with a certain subsystem name +// NewPrometheus generates a new set of metrics with a certain subsystem name. func NewPrometheus(subsystem string, customMetricsList ...[]*Metric) *Prometheus { - subsystem = "app" + if subsystem == "" { + subsystem = "app" + } var metricsList []*Metric @@ -131,16 +135,13 @@ func NewPrometheus(subsystem string, customMetricsList ...[]*Metric) *Prometheus } else if len(customMetricsList) == 1 { metricsList = customMetricsList[0] } - - for _, metric := range standardMetrics { - metricsList = append(metricsList, metric) - } + metricsList = append(metricsList, standardMetrics...) p := &Prometheus{ MetricsList: metricsList, MetricsPath: defaultMetricPath, ReqCntURLLabelMappingFn: func(c *gin.Context) string { - return c.Request.URL.Path + return c.FullPath() // e.g. /user/:id , /user/:id/info }, } @@ -150,7 +151,7 @@ func NewPrometheus(subsystem string, customMetricsList ...[]*Metric) *Prometheus } // SetPushGateway sends metrics to a remote pushgateway exposed on pushGatewayURL -// every pushIntervalSeconds. Metrics are fetched from metricsURL +// every pushIntervalSeconds. Metrics are fetched from metricsURL. func (p *Prometheus) SetPushGateway(pushGatewayURL, metricsURL string, pushIntervalSeconds time.Duration) { p.Ppg.PushGatewayURL = pushGatewayURL p.Ppg.MetricsURL = metricsURL @@ -158,13 +159,13 @@ func (p *Prometheus) SetPushGateway(pushGatewayURL, metricsURL string, pushInter p.startPushTicker() } -// SetPushGatewayJob job name, defaults to "gin" +// SetPushGatewayJob job name, defaults to "gin". func (p *Prometheus) SetPushGatewayJob(j string) { p.Ppg.Job = j } // SetListenAddress for exposing metrics on address. If not set, it will be exposed at the -// same address of the gin engine that is being used +// same address of the gin engine that is being used. func (p *Prometheus) SetListenAddress(address string) { p.listenAddress = address if p.listenAddress != "" { @@ -181,7 +182,7 @@ func (p *Prometheus) SetListenAddressWithRouter(listenAddress string, r *gin.Eng } } -// SetMetricsPath set metrics paths +// SetMetricsPath set metrics paths. func (p *Prometheus) SetMetricsPath(e *gin.Engine) { if p.listenAddress != "" { @@ -192,7 +193,7 @@ func (p *Prometheus) SetMetricsPath(e *gin.Engine) { } } -// SetMetricsPathWithAuth set metrics paths with authentication +// SetMetricsPathWithAuth set metrics paths with authentication. func (p *Prometheus) SetMetricsPathWithAuth(e *gin.Engine, accounts gin.Accounts) { if p.listenAddress != "" { @@ -205,34 +206,43 @@ func (p *Prometheus) SetMetricsPathWithAuth(e *gin.Engine, accounts gin.Accounts } func (p *Prometheus) runServer() { - if p.listenAddress != "" { - go p.router.Run(p.listenAddress) - } + go p.router.Run(p.listenAddress) } func (p *Prometheus) getMetrics() []byte { - response, _ := http.Get(p.Ppg.MetricsURL) + response, err := http.Get(p.Ppg.MetricsURL) + if err != nil { + return nil + } defer response.Body.Close() - body, _ := ioutil.ReadAll(response.Body) + body, _ := io.ReadAll(response.Body) return body } +var hostname, _ = os.Hostname() + func (p *Prometheus) getPushGatewayURL() string { - h, _ := os.Hostname() if p.Ppg.Job == "" { p.Ppg.Job = "gin" } - return p.Ppg.PushGatewayURL + "/metrics/job/" + p.Ppg.Job + "/instance/" + h + return p.Ppg.PushGatewayURL + "/metrics/job/" + p.Ppg.Job + "/instance/" + hostname } func (p *Prometheus) sendMetricsToPushGateway(metrics []byte) { req, err := http.NewRequest("POST", p.getPushGatewayURL(), bytes.NewBuffer(metrics)) + if err != nil { + return + } + client := &http.Client{} - if _, err = client.Do(req); err != nil { + resp, err := client.Do(req) + if err != nil { fmt.Println("Error sending to push gateway error:", err.Error()) } + + resp.Body.Close() } func (p *Prometheus) startPushTicker() { @@ -244,7 +254,7 @@ func (p *Prometheus) startPushTicker() { }() } -// NewMetric associates prometheus.Collector based on Metric.Type +// NewMetric associates prometheus.Collector based on Metric.Type. func NewMetric(m *Metric, subsystem string) prometheus.Collector { var metric prometheus.Collector switch m.Type { @@ -321,20 +331,20 @@ func NewMetric(m *Metric, subsystem string) prometheus.Collector { } func (p *Prometheus) registerMetrics(subsystem string) { - for _, metricDef := range p.MetricsList { metric := NewMetric(metricDef, subsystem) if err := prometheus.Register(metric); err != nil { fmt.Println("could not be registered in Prometheus,metricDef.Name:", metricDef.Name, " error:", err.Error()) } + switch metricDef { - case reqCnt: + case reqCounter: p.reqCnt = metric.(*prometheus.CounterVec) - case reqDur: + case reqDuration: p.reqDur = metric.(*prometheus.HistogramVec) - case resSz: + case resSize: p.resSz = metric.(prometheus.Summary) - case reqSz: + case reqSize: p.reqSz = metric.(prometheus.Summary) } metricDef.MetricCollector = metric @@ -353,7 +363,7 @@ func (p *Prometheus) UseWithAuth(e *gin.Engine, accounts gin.Accounts) { p.SetMetricsPathWithAuth(e, accounts) } -// HandlerFunc defines handler function for middleware +// HandlerFunc defines handler function for middleware. func (p *Prometheus) HandlerFunc() gin.HandlerFunc { return func(c *gin.Context) { if c.Request.URL.Path == p.MetricsPath { @@ -393,7 +403,7 @@ func prometheusHandler() gin.HandlerFunc { } func computeApproximateRequestSize(r *http.Request) int { - s := 0 + var s int if r.URL != nil { s = len(r.URL.Path) } diff --git a/pkg/msgprocessor/conversation.go b/pkg/msgprocessor/conversation.go index b72abb1da..56255f37c 100644 --- a/pkg/msgprocessor/conversation.go +++ b/pkg/msgprocessor/conversation.go @@ -118,8 +118,9 @@ func GetNotificationConversationIDByConversationID(conversationID string) string l := strings.Split(conversationID, "_") if len(l) > 1 { l[0] = "n" - return conversationID + return strings.Join(l, "_") } + return "" } diff --git a/scripts/README.md b/scripts/README.md index 242e76017..b55049a7b 100644 --- a/scripts/README.md +++ b/scripts/README.md @@ -39,7 +39,7 @@ scripts/ ├── demo.sh # Demonstration or example script. ├── docker-check-service.sh # Docker script to check services' status. ├── docker-start-all.sh # Docker script to start all containers/services. -├── ensure_tag.sh # Ensure correct tags or labeling. +├── ensure-tag.sh # Ensure correct tags or labeling. ├── env_check.sh # Environment verification and checking. ├── gen-swagger-docs.sh # Script to generate Swagger documentation. ├── genconfig.sh # Generate configuration files. diff --git a/scripts/ensure_tag.sh b/scripts/ensure-tag.sh similarity index 100% rename from scripts/ensure_tag.sh rename to scripts/ensure-tag.sh diff --git a/scripts/env_check.sh b/scripts/env_check.sh deleted file mode 100755 index 08d6a84b1..000000000 --- a/scripts/env_check.sh +++ /dev/null @@ -1,76 +0,0 @@ -#!/usr/bin/env bash - -# Copyright © 2023 OpenIM. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -#Include shell font styles and some basic information -SCRIPTS_ROOT=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -OPENIM_ROOT=$(dirname "${SCRIPTS_ROOT}")/.. - -#Include shell font styles and some basic information -source $SCRIPTS_ROOT/path_info.sh -source $SCRIPTS_ROOT/lib/init.sh - -cd $SCRIPTS_ROOT - -echo -e "check time synchronize.................................." -t=`curl http://time.akamai.com/?iso -s` -t1=`date -d $t +%s` -t2=`date +%s` -let between=t2-t1 -if [[ $between -gt 10 ]] || [[ $between -lt -10 ]]; then - echo -e ${RED_PREFIX}"Warning: The difference between the iso time and the server's time is too large: "$between"s" ${COLOR_SUFFIX} -else - echo -e ${GREEN_PREFIX} "ok: Server time is synchronized " ${COLOR_SUFFIX} -fi - - -echo -e "check login user........................................" -user=`whoami` -if [ $user == "root" ] ; then - echo -e ${GREEN_PREFIX} "ok: login user is root" ${COLOR_SUFFIX} -else - echo -e ${RED_PREFIX}"Warning: The current user is not root "${COLOR_SUFFIX} -fi - -echo -e "check docker............................................" -docker_running=`systemctl status docker | grep running | grep active | wc -l` - -docker_version=`docker-compose -v; docker -v` - -if [ $docker_running -gt 0 ]; then - echo -e ${GREEN_PREFIX} "ok: docker is running" ${COLOR_SUFFIX} - echo -e ${GREEN_PREFIX} $docker_version ${COLOR_SUFFIX} - -else - echo -e ${RED_PREFIX}"docker not running"${COLOR_SUFFIX} -fi - - -echo -e "check environment......................................." -SYSTEM=`uname -s` -if [ $SYSTEM != "Linux" ] ; then - echo -e ${RED_PREFIX}"Warning: Currently only Linux is supported"${COLOR_SUFFIX} -else - echo -e ${GREEN_PREFIX} "ok: system is linux"${COLOR_SUFFIX} -fi - -echo -e "check memory............................................" -available=`free -m | grep Mem | awk '{print $NF}'` -if [ $available -lt 2000 ] ; then - echo -e ${RED_PREFIX}"Warning: Your memory not enough, available is: " "$available"m${COLOR_SUFFIX}"\c" - echo -e ${RED_PREFIX}", must be greater than 2000m"${COLOR_SUFFIX} -else - echo -e ${GREEN_PREFIX} "ok: available memory is: "$available"m${COLOR_SUFFIX}" -fi diff --git a/scripts/install/test.sh b/scripts/install/test.sh index 2e4d8cbb9..7bcea3b82 100755 --- a/scripts/install/test.sh +++ b/scripts/install/test.sh @@ -38,9 +38,8 @@ IAM_ROOT=$(dirname "${BASH_SOURCE[0]}")/../.. [[ -z ${COMMON_SOURCED} ]] && source ${IAM_ROOT}/scripts/install/common.sh # API Server API Address:Port -INSECURE_OPENIMAPI=${IAM_APISERVER_HOST}:${API_OPENIM_PORT} +INSECURE_OPENIMAPI="http://${OPENIM_API_HOST}:${API_OPENIM_PORT}" INSECURE_OPENIMAUTO=${OPENIM_RPC_AUTH_HOST}:${OPENIM_AUTH_PORT} - CCURL="curl -f -s -XPOST" # Create UCURL="curl -f -s -XPUT" # Update RCURL="curl -f -s -XGET" # Retrieve @@ -73,7 +72,7 @@ function openim::test::auth() { # Define a function to get a token (Admin Token) openim::test::get_token() { - token_response=$(${CCURL} "${OperationID}" "${Header}" http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/auth/user_token \ + token_response=$(${CCURL} "${OperationID}" "${Header}" ${INSECURE_OPENIMAPI}/auth/user_token \ -d'{"secret": "'"$SECRET"'","platformID": 1,"userID": "openIM123456"}') token=$(echo $token_response | grep -Po 'token[" :]+\K[^"]+') echo "$token" @@ -94,7 +93,7 @@ EOF ) echo "Requesting force logout for user: $request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/auth/force_logout" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/auth/force_logout" -d "${request_body}") openim::test::check_error "$response" } @@ -127,7 +126,7 @@ EOF echo "Request body for user registration: $request_body" # Send the registration request - local user_register_response=$(${CCURL} "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/user/user_register" -d "${request_body}") + local user_register_response=$(${CCURL} "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/user/user_register" -d "${request_body}") # Check for errors in the response openim::test::check_error "$user_register_response" @@ -148,7 +147,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/user/account_check" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/user/account_check" -d "${request_body}") openim::test::check_error "$response" } @@ -166,7 +165,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/user/get_users" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/user/get_users" -d "${request_body}") openim::test::check_error "$response" } @@ -184,7 +183,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/user/get_users_info" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/user/get_users_info" -d "${request_body}") openim::test::check_error "$response" } @@ -204,7 +203,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/user/get_users_online_status" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/user/get_users_online_status" -d "${request_body}") openim::test::check_error "$response" } @@ -223,7 +222,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/user/update_user_info" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/user/update_user_info" -d "${request_body}") openim::test::check_error "$response" } @@ -238,7 +237,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/user/get_subscribe_users_status" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/user/get_subscribe_users_status" -d "${request_body}") openim::test::check_error "$response" } @@ -259,7 +258,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/user/subscribe_users_status" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/user/subscribe_users_status" -d "${request_body}") openim::test::check_error "$response" } @@ -275,7 +274,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/user/set_global_msg_recv_opt" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/user/set_global_msg_recv_opt" -d "${request_body}") openim::test::check_error "$response" } @@ -330,7 +329,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/friend/is_friend" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/friend/is_friend" -d "${request_body}") openim::test::check_error "$response" } @@ -346,7 +345,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/friend/delete_friend" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/friend/delete_friend" -d "${request_body}") openim::test::check_error "$response" } @@ -365,7 +364,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/friend/get_friend_apply_list" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/friend/get_friend_apply_list" -d "${request_body}") openim::test::check_error "$response" } @@ -384,7 +383,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/friend/get_friend_list" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/friend/get_friend_list" -d "${request_body}") openim::test::check_error "$response" } @@ -401,7 +400,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/friend/set_friend_remark" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/friend/set_friend_remark" -d "${request_body}") openim::test::check_error "$response" } @@ -419,7 +418,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/friend/add_friend" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/friend/add_friend" -d "${request_body}") openim::test::check_error "$response" } @@ -437,7 +436,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/friend/import_friend" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/friend/import_friend" -d "${request_body}") openim::test::check_error "$response" } @@ -456,7 +455,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/friend/add_friend_response" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/friend/add_friend_response" -d "${request_body}") openim::test::check_error "$response" } @@ -475,7 +474,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/friend/get_self_friend_apply_list" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/friend/get_self_friend_apply_list" -d "${request_body}") openim::test::check_error "$response" } @@ -491,7 +490,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/friend/add_black" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/friend/add_black" -d "${request_body}") openim::test::check_error "$response" } @@ -507,7 +506,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/friend/remove_black" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/friend/remove_black" -d "${request_body}") openim::test::check_error "$response" } @@ -526,7 +525,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/friend/get_black_list" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/friend/get_black_list" -d "${request_body}") openim::test::check_error "$response" } @@ -619,7 +618,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/create_group" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/create_group" -d "${request_body}") openim::test::check_error "$response" } @@ -639,7 +638,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/invite_user_to_group" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/invite_user_to_group" -d "${request_body}") openim::test::check_error "$response" } @@ -656,7 +655,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/transfer_group" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/transfer_group" -d "${request_body}") openim::test::check_error "$response" } @@ -671,7 +670,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/get_groups_info" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/get_groups_info" -d "${request_body}") openim::test::check_error "$response" } @@ -690,7 +689,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/kick_group" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/kick_group" -d "${request_body}") openim::test::check_error "$response" } @@ -706,7 +705,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/get_group_members_info" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/get_group_members_info" -d "${request_body}") openim::test::check_error "$response" } @@ -725,7 +724,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/get_group_member_list" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/get_group_member_list" -d "${request_body}") openim::test::check_error "$response" } @@ -744,7 +743,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/get_joined_group_list" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/get_joined_group_list" -d "${request_body}") openim::test::check_error "$response" } @@ -769,7 +768,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/set_group_member_info" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/set_group_member_info" -d "${request_body}") openim::test::check_error "$response" } @@ -784,7 +783,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/mute_group" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/mute_group" -d "${request_body}") openim::test::check_error "$response" } @@ -799,7 +798,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/cancel_mute_group" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/cancel_mute_group" -d "${request_body}") openim::test::check_error "$response" } @@ -814,7 +813,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/dismiss_group" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/dismiss_group" -d "${request_body}") openim::test::check_error "$response" } @@ -830,7 +829,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/cancel_mute_group_member" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/cancel_mute_group_member" -d "${request_body}") openim::test::check_error "$response" } @@ -848,7 +847,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/join_group" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/join_group" -d "${request_body}") openim::test::check_error "$response" } @@ -873,7 +872,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/set_group_info" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/set_group_info" -d "${request_body}") openim::test::check_error "$response" } @@ -890,7 +889,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/quit_group" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/quit_group" -d "${request_body}") openim::test::check_error "$response" } @@ -909,7 +908,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/get_recv_group_applicationList" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/get_recv_group_applicationList" -d "${request_body}") openim::test::check_error "$response" } @@ -927,7 +926,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/group_application_response" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/group_application_response" -d "${request_body}") openim::test::check_error "$response" } @@ -946,7 +945,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/get_user_req_group_applicationList" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/get_user_req_group_applicationList" -d "${request_body}") openim::test::check_error "$response" } @@ -963,7 +962,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/mute_group_member" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/mute_group_member" -d "${request_body}") openim::test::check_error "$response" } @@ -981,7 +980,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/group/get_group_users_req_application_list" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/group/get_group_users_req_application_list" -d "${request_body}") openim::test::check_error "$response" } @@ -1078,7 +1077,7 @@ function openim::test::group() { # Define a function to register a user openim::register_user() { - user_register_response=$(${CCURL} "${Header}" http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/user/user_register \ + user_register_response=$(${CCURL} "${Header}" ${INSECURE_OPENIMAPI}/user/user_register \ -d'{ "secret": "openIM123", "users": [{"userID": "11111112","nickname": "yourNickname","faceURL": "yourFaceURL"}] @@ -1090,7 +1089,7 @@ openim::register_user() { # Define a function to check the account openim::test::check_account() { local token=$1 - account_check_response=$(${CCURL} "${Header}" -H"operationID: 1646445464564" -H"token: ${token}" http://localhost:${API_OPENIM_PORT}/user/account_check \ + account_check_response=$(${CCURL} "${Header}" -H"operationID: 1646445464564" -H"token: ${token}" ${INSECURE_OPENIMAPI}/user/account_check \ -d'{ "checkUserIDs": ["11111111","11111112"] }') @@ -1164,7 +1163,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/msg/send_msg" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/msg/send_msg" -d "${request_body}") openim::test::check_error "$response" } @@ -1185,7 +1184,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/msg/revoke_msg" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/msg/revoke_msg" -d "${request_body}") openim::test::check_error "$response" } @@ -1203,7 +1202,7 @@ EOF ) echo "$request_body" - local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "http://${OPENIM_API_HOST}:${API_OPENIM_PORT}/msg/user_clear_all_msg" -d "${request_body}") + local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/msg/user_clear_all_msg" -d "${request_body}") openim::test::check_error "$response" } diff --git a/scripts/make-rules/release.mk b/scripts/make-rules/release.mk index 862aa3065..68bf05898 100644 --- a/scripts/make-rules/release.mk +++ b/scripts/make-rules/release.mk @@ -34,7 +34,7 @@ release.tag: tools.verify.gsemver release.ensure-tag ## release.ensure-tag: ensure tag .PHONY: release.ensure-tag release.ensure-tag: tools.verify.gsemver - @scripts/ensure_tag.sh + @scripts/ensure-tag.sh ## release.help: Display help information about the release package .PHONY: release.help diff --git a/test/e2e/api/.keep b/test/e2e/api/.keep deleted file mode 100644 index 4f07f1caf..000000000 --- a/test/e2e/api/.keep +++ /dev/null @@ -1 +0,0 @@ -.keep \ No newline at end of file diff --git a/test/e2e/api/user/curd.go b/test/e2e/api/user/curd.go index 28b55b682..6f9959203 100644 --- a/test/e2e/api/user/curd.go +++ b/test/e2e/api/user/curd.go @@ -1,7 +1,10 @@ package user import ( + "fmt" + gettoken "github.com/openimsdk/open-im-server/v3/test/e2e/api/token" + "github.com/openimsdk/open-im-server/v3/test/e2e/framework/config" ) // UserInfoRequest represents a request to get or update user information @@ -17,14 +20,20 @@ type GetUsersOnlineStatusRequest struct { // GetUsersInfo retrieves detailed information for a list of user IDs func GetUsersInfo(token string, userIDs []string) error { + + url := fmt.Sprintf("http://%s:%s/user/get_users_info", config.LoadConfig().APIHost, config.LoadConfig().APIPort) + requestBody := UserInfoRequest{ UserIDs: userIDs, } - return sendPostRequestWithToken("http://your-api-host:port/user/get_users_info", token, requestBody) + return sendPostRequestWithToken(url, token, requestBody) } // UpdateUserInfo updates the information for a user func UpdateUserInfo(token, userID, nickname, faceURL string) error { + + url := fmt.Sprintf("http://%s:%s/user/update_user_info", config.LoadConfig().APIHost, config.LoadConfig().APIPort) + requestBody := UserInfoRequest{ UserInfo: &gettoken.User{ UserID: userID, @@ -32,13 +41,17 @@ func UpdateUserInfo(token, userID, nickname, faceURL string) error { FaceURL: faceURL, }, } - return sendPostRequestWithToken("http://your-api-host:port/user/update_user_info", token, requestBody) + return sendPostRequestWithToken(url, token, requestBody) } // GetUsersOnlineStatus retrieves the online status for a list of user IDs func GetUsersOnlineStatus(token string, userIDs []string) error { + + url := fmt.Sprintf("http://%s:%s/user/get_users_online_status", config.LoadConfig().APIHost, config.LoadConfig().APIPort) + requestBody := GetUsersOnlineStatusRequest{ UserIDs: userIDs, } - return sendPostRequestWithToken("http://your-api-host:port/user/get_users_online_status", token, requestBody) + + return sendPostRequestWithToken(url, token, requestBody) } diff --git a/test/e2e/api/user/user.go b/test/e2e/api/user/user.go index 63c6659a0..66419b735 100644 --- a/test/e2e/api/user/user.go +++ b/test/e2e/api/user/user.go @@ -8,6 +8,7 @@ import ( "net/http" gettoken "github.com/openimsdk/open-im-server/v3/test/e2e/api/token" + "github.com/openimsdk/open-im-server/v3/test/e2e/framework/config" ) // ForceLogoutRequest represents a request to force a user logout @@ -34,30 +35,39 @@ type Pagination struct { // ForceLogout forces a user to log out func ForceLogout(token, userID string, platformID int) error { + + url := fmt.Sprintf("http://%s:%s/auth/force_logout", config.LoadConfig().APIHost, config.LoadConfig().APIPort) + requestBody := ForceLogoutRequest{ PlatformID: platformID, UserID: userID, } - return sendPostRequestWithToken("http://your-api-host:port/auth/force_logout", token, requestBody) + return sendPostRequestWithToken(url, token, requestBody) } // CheckUserAccount checks if the user accounts exist func CheckUserAccount(token string, userIDs []string) error { + + url := fmt.Sprintf("http://%s:%s/user/account_check", config.LoadConfig().APIHost, config.LoadConfig().APIPort) + requestBody := CheckUserAccountRequest{ CheckUserIDs: userIDs, } - return sendPostRequestWithToken("http://your-api-host:port/user/account_check", token, requestBody) + return sendPostRequestWithToken(url, token, requestBody) } // GetUsers retrieves a list of users with pagination func GetUsers(token string, pageNumber, showNumber int) error { + + url := fmt.Sprintf("http://%s:%s/user/account_check", config.LoadConfig().APIHost, config.LoadConfig().APIPort) + requestBody := GetUsersRequest{ Pagination: Pagination{ PageNumber: pageNumber, ShowNumber: showNumber, }, } - return sendPostRequestWithToken("http://your-api-host:port/user/get_users", token, requestBody) + return sendPostRequestWithToken(url, token, requestBody) } // sendPostRequestWithToken sends a POST request with a token in the header diff --git a/test/e2e/framework/config/.keep b/test/e2e/framework/config/.keep deleted file mode 100644 index 4f07f1caf..000000000 --- a/test/e2e/framework/config/.keep +++ /dev/null @@ -1 +0,0 @@ -.keep \ No newline at end of file diff --git a/test/e2e/framework/config/config.go b/test/e2e/framework/config/config.go index dcf87cecf..f790de5f3 100644 --- a/test/e2e/framework/config/config.go +++ b/test/e2e/framework/config/config.go @@ -1,6 +1,9 @@ package config -import "flag" +import ( + "flag" + "os" +) // Flags is the flag set that AddOptions adds to. Test authors should // also use it instead of directly adding to the global command line. @@ -19,3 +22,49 @@ func CopyFlags(source *flag.FlagSet, target *flag.FlagSet) { target.Var(flag.Value, flag.Name, flag.Usage) }) } + +// Config defines the configuration structure for the OpenIM components. +type Config struct { + APIHost string + APIPort string + MsgGatewayHost string + MsgTransferHost string + PushHost string + RPCAuthHost string + RPCConversationHost string + RPCFriendHost string + RPCGroupHost string + RPCMsgHost string + RPCThirdHost string + RPCUserHost string + // Add other configuration fields as needed +} + +// LoadConfig loads the configurations from environment variables or default values. +func LoadConfig() *Config { + return &Config{ + APIHost: getEnv("OPENIM_API_HOST", "127.0.0.1"), + APIPort: getEnv("API_OPENIM_PORT", "10002"), + + // TODO: Set default variable + MsgGatewayHost: getEnv("OPENIM_MSGGATEWAY_HOST", "default-msggateway-host"), + MsgTransferHost: getEnv("OPENIM_MSGTRANSFER_HOST", "default-msgtransfer-host"), + PushHost: getEnv("OPENIM_PUSH_HOST", "default-push-host"), + RPCAuthHost: getEnv("OPENIM_RPC_AUTH_HOST", "default-rpc-auth-host"), + RPCConversationHost: getEnv("OPENIM_RPC_CONVERSATION_HOST", "default-rpc-conversation-host"), + RPCFriendHost: getEnv("OPENIM_RPC_FRIEND_HOST", "default-rpc-friend-host"), + RPCGroupHost: getEnv("OPENIM_RPC_GROUP_HOST", "default-rpc-group-host"), + RPCMsgHost: getEnv("OPENIM_RPC_MSG_HOST", "default-rpc-msg-host"), + RPCThirdHost: getEnv("OPENIM_RPC_THIRD_HOST", "default-rpc-third-host"), + RPCUserHost: getEnv("OPENIM_RPC_USER_HOST", "default-rpc-user-host"), + } +} + +// getEnv is a helper function to read an environment variable or return a default value. +func getEnv(key, defaultValue string) string { + value, exists := os.LookupEnv(key) + if !exists { + return defaultValue + } + return value +}