diff --git a/.golangci.yml b/.golangci.yml index f0ca3a010..ae8cea673 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -372,7 +372,7 @@ linters-settings: # Default: [] ignored-functions: - '^math\.' - - '^http\.StatusText$' + - '^webhook\.StatusText$' gomoddirectives: # Allow local `replace` directives. Default is false. replace-local: true diff --git a/config/Readme.md b/config/Readme.md index 72f4577a5..b0de2089c 100644 --- a/config/Readme.md +++ b/config/Readme.md @@ -1,243 +1,67 @@ -# OpenIM Configuration Guide - - -* 1. [Directory Structure and File Descriptions](#DirectoryStructureandFileDescriptions) - * 1.1. [Directory Structure](#DirectoryStructure) - * 1.2. [Directory Structure Explanation](#DirectoryStructureExplanation) -* 2. [File Descriptions](#FileDescriptions) - * 2.1. [Files in the Root Directory](#FilesintheRootDirectory) - * 2.2. [Files in the `templates/` Directory](#FilesinthetemplatesDirectory) -* 3. [Configuration File Generation](#ConfigurationFileGeneration) - * 3.1. [How to Use `init-config.sh` Script](#HowtoUseinit-config.shScript) - * 3.2. [Examples of Operations](#ExamplesofOperations) - * 3.3. [Points to Note](#PointstoNote) -* 4. [Example Directory](#ExampleDirectory) - * 4.1. [Overview](#Overview) - * 4.2. [Structure](#Structure) - * 4.3. [How to Use These Examples](#HowtoUseTheseExamples) - * 4.4. [Tips for Using Example Files:](#TipsforUsingExampleFiles:) -* 5. [Configuration Item Descriptions](#ConfigurationItemDescriptions) -* 6. [Version Management and Upgrading](#VersionManagementandUpgrading) - * 6.1. [Pulling the Latest Code](#PullingtheLatestCode) - * 6.2. [Generating the Latest Example Configuration Files](#GeneratingtheLatestExampleConfigurationFiles) - * 6.3. [Comparing Configuration File Differences](#ComparingConfigurationFileDifferences) - * 6.4. [Updating Configuration Files](#UpdatingConfigurationFiles) - * 6.5. [Updating Binary Files and Restarting Services](#UpdatingBinaryFilesandRestartingServices) - * 6.6. [Best Practices for Version Management](#BestPracticesforVersionManagement) -* 7. [How to Contribute](#HowtoContribute) - * 7.1. [OpenIM Configuration Item Descriptions](#OpenIMConfigurationItemDescriptions) - * 7.2. [Modifying Template Files](#ModifyingTemplateFiles) - * 7.3. [Updating Configuration Center Scripts](#UpdatingConfigurationCenterScripts) - * 7.4. [Configuration File Generation Process](#ConfigurationFileGenerationProcess) - * 7.5. [Contribution Guidelines](#ContributionGuidelines) - * 7.6. [Submission and Review](#SubmissionandReview) - - - - - -## 1. Directory Structure and File Descriptions - -This document details the structure of the `config` directory, aiding users in understanding and managing configuration files. - -### 1.1. Directory Structure - -```bash -$ tree config -├── alertmanager.yml -├── config.yaml -├── email.tmpl -├── instance-down-rules.yml -├── notification.yaml -├── prometheus.yml -├── Readme.md -└── templates - ├── alertmanager.yml.template - ├── config.yaml.template - ├── email.tmpl.template - ├── env.template - ├── instance-down-rules.yml.template - ├── notification.yaml.template - ├── open-im-ng-example.conf - ├── prometheus-dashboard.yaml - └── prometheus.yml.template -``` - -### 1.2. Directory Structure Explanation - -- **Root Directory (`config/`)**: Contains actual configuration files and the `templates` subdirectory. -- **`templates/` Subdirectory**: Stores configuration templates for generating or updating configuration files in the root directory. - -## 2. File Descriptions - -### 2.1. Files in the Root Directory - -- **`alertmanager.yml`**: Configuration file for AlertManager, managing and setting up the alert system. -- **`config.yaml`**: The main application configuration file, covering service settings. -- **`email.tmpl`**: Template file for email notifications, defining email format and content. -- **`instance-down-rules.yml`**: Instance downtime rules configuration file for the monitoring system. -- **`notification.yaml`**: Configuration file for notification settings, defining different types of notifications. -- **`prometheus.yml`**: Configuration file for the Prometheus monitoring system, setting monitoring metrics and rules. - -### 2.2. Files in the `templates/` Directory - -- **`alertmanager.yml.template`**: Template for AlertManager configuration. -- **`config.yaml.template`**: Main configuration template for the application. -- **`email.tmpl.template`**: Template for email notifications. -- **`env.template`**: Template for environmental variable configurations, setting environment-related configurations. -- **`instance-down-rules.yml.template`**: Template for instance downtime rules. -- **`notification.yaml.template`**: Template for notification settings. -- **`open-im-ng-example.conf`**: Example configuration file for the application. -- **`prometheus-dashboard.yaml`**: Prometheus dashboard configuration file, specific to the OpenIM application. -- **`prometheus.yml.template`**: Template for Prometheus configuration. - -## 3. Configuration File Generation - -Configuration files can be automatically generated using the `make init` command or the `./scripts/init-config.sh` script. These scripts conveniently extract templates from the `templates` directory and generate or update actual configuration files in the root directory. - -### 3.1. How to Use `init-config.sh` Script - -```bash -$ ./scripts/init-config.sh --help -Usage: init-config.sh [options] -Options: - -h, --help Show this help message - --force Overwrite existing files without prompt - --skip Skip generation if file exists - --examples Generate example files - --clean-config Clean all configuration files - --clean-examples Clean all example files -``` - -### 3.2. Examples of Operations - -- Generate all template configuration files: - - ```bash - $ ./scripts/init-config.sh --examples - ``` - -- Force overwrite existing configuration files: - - ```bash - $ ./scripts/init-config.sh --force - ``` - -### 3.3. Points to Note - -- **Template files should not be directly modified**: Files in the `template` directory are templates included in source code management. Direct modification may lead to version conflicts or management issues. -- **Operations for Windows Users**: Windows users can use the `cp` command to copy files from the `template` directory to the `config/` directory and then modify the configuration items as needed. - -## 4. Example Directory - -Welcome to our project's `examples` directory! This directory contains a range of example files, showcasing various configurations and settings of our software. These examples are intended to provide you with templates that can serve as a starting point for your own configurations. - -### 4.1. Overview - -In this directory, you'll find examples suitable for a variety of use cases. Each file is a template with default values and configurations, demonstrating best practices and typical scenarios. Whether you're just getting started or looking to implement complex settings, these examples should help you get on the right track. - -### 4.2. Structure +配置文件说明 -Here's a quick overview of the contents in this directory: +每个组件单独有一个配置文件,主要是地址及账号密码信息。 -- `env-example.yaml`: Demonstrates how to set up environmental variables. -- `openim-example.yaml`: Example configuration file for the OpenIM application. -- `prometheus-example.yml`: Example configuration for monitoring with Prometheus. -- `alertmanager-example.yml`: Template for setting up Alertmanager configuration. -### 4.3. How to Use These Examples -To use these examples, simply copy the relevant files to your working directory and rename them as needed (for example, removing the `-example` suffix). Then, modify the files according to your needs. -### 4.4. Tips for Using Example Files: -1. **Read Comments**: Each file contains comments explaining the various sections and settings. Make sure to read these comments for a better understanding of how to customize the file. -2. **Check Required Changes**: Some examples might require mandatory changes before they can be used effectively (such as setting specific environmental variables). -3. **Version Compatibility**: Ensure that the example files are compatible with the version of the software you are using. +callback -## 5. Configuration Item Descriptions -## 6. Version Management and Upgrading -When managing and upgrading the `config` directory's versions, it is crucial to ensure that the configuration files in both the local `config/` and `config/templates/` directories are kept in sync. This process can ensure that your configuration files are consistent with the latest standard templates, while also maintaining custom settings. -### 6.1. Pulling the Latest Code -First, ensure that your local repository is in sync with the remote repository. This can be achieved by pulling the latest code: +rpc共同配置项说明 -```bash -$ git pull ``` - -### 6.2. Generating the Latest Example Configuration Files - -Next, generate the latest example configuration files. This can be done by running the `init-config.sh` script, using the `--examples` option to generate example files, and the `--skip` option to avoid overwriting existing configuration files: - -```bash -$ ./scripts/init-config.sh --examples --skip -``` - -### 6.3. Comparing Configuration File Differences - -Once the latest example configuration files are generated, you need to compare the configuration files in the `config/` and `config/templates/` directories to find any potential differences. This step ensures that you can identify and integrate any important updates or changes. Tools like `diff` can be helpful in completing this step: - -```bash -$ diff -ur config/ config/templates/ +rpc: + #api或其他rpc可通过这个ip访问到此rpc,如果为空则获取内网ip,默认为空即可 + registerIP: '' + #监听ip,如果为0.0.0.0,则内外网ip都监听,为空则自动获取内网ip监听 + listenIP: 0.0.0.0 + #监听端口,如果配置多个则会启动多个实例,但需和prometheus.ports保持一致 + ports: [ 10120 ] + +prometheus: + enable: true + #prometheus + ports: [ 20104 ] ``` -### 6.4. Updating Configuration Files - -Based on the comparison results, manually update the configuration files in the `config/` directory to reflect the latest configurations in `config/templates/`. During this process, ensure to retain any custom configuration settings. - -### 6.5. Updating Binary Files and Restarting Services - -After updating the configuration files, the next step is to update any related binary files. This typically involves downloading and installing the latest version of the application or service. Depending on the specific application or service, this might involve running specific update scripts or directly downloading the latest version from official sources. - -Once the binary files are updated, the services need to be restarted to apply the new configurations. Make sure to conduct necessary checks before restarting to ensure the correctness of the configurations. - -### 6.6. Best Practices for Version Management - -- **Record Changes**: When committing changes to a version control system, ensure to log detailed change logs. -- **Stay Synced**: Regularly sync with the remote repository to ensure that your local configurations are in line with the latest developments. -- **Backup**: Backup your current configurations before making any significant changes, so that you can revert to a previous state if necessary. -By following these steps and best practices, you can ensure effective management and smooth upgrading of your `config` directory. -## 7. How to Contribute +api配置项说明 -If you have an understanding of the logic behind OpenIM's configuration generation, then you will clearly know where to make modifications to contribute code. - -### 7.1. OpenIM Configuration Item Descriptions - -First, it is recommended to read the [OpenIM Configuration Items Document](https://github.com/openimsdk/open-im-server/blob/main/docs/contrib/environment.md). This will help you understand the roles of various configuration items and how they affect the operation of OpenIM. - -### 7.2. Modifying Template Files - -To contribute to OpenIM, focus on the `./deployments/templates` directory. This contains various configuration template files, which are the basis for generating the final configuration files. - -When making modifications, ensure that your changes align with OpenIM's configuration requirements and logic. This may involve adding new template files or modifying existing files to reflect new configuration options or structural changes. - -### 7.3. Updating Configuration Center Scripts - -In addition to modifying template files, pay attention to the `./scripts/install/environment.sh` script. In this script, you may need to add or modify environment variables. - -This script is responsible for defining environment variables that influence configuration generation. Therefore, any new configuration items or modifications to existing items need to be reflected here. +``` +api: + #0.0.0.0表示内外网ip都监听,不应该修改 + listenIP: 0.0.0.0 + #监听端口,如果配置多个则会启动多个实例,和prometheus.ports保持一致 + ports: [ 10002 ] + +prometheus: + enable: true + ports: [ 20113 ] + #怎么描述 grafanaURL地址, 外网地址,通过浏览器能访问到 + grafanaURL: http://127.0.0.1:13000/ +``` -### 7.4. Configuration File Generation Process -The essence of the `make init` command is to use the environment variables defined in `/scripts/install/environment.sh` to render the template files in the `./deployments/templates` directory, thereby generating the final configuration files. -When contributing code, ensure that your changes work smoothly in this process and do not cause errors during configuration file generation. +log配置项说明 -### 7.5. Contribution Guidelines +``` +#log存放路径,如果要修改则改为全路径 +storageLocation: ../../../../logs/ +rotationTime: 24 +remainRotationCount: 2 +#3: 生产环境; 6:日志较多,调试环境 +remainLogLevel: 6 +isStdout: false +isJson: false +withStack: false +``` -- **Code Review**: Ensure your changes have passed code review. This typically means that the code should be clear, easy to understand, and adhere to the project's coding style and best practices. -- **Testing**: Before submitting changes, conduct thorough tests to ensure new or modified configurations work as expected and do not negatively impact existing functionalities. -- **Documentation**: If you have added a new configuration option or made significant changes to an existing one, update the relevant documentation to assist other users and developers in understanding and utilizing these changes. -### 7.6. Submission and Review -After completing your changes, submit your code to the OpenIM repository in the form of a Pull Request (PR). The PR will be reviewed by the project maintainers and you may be asked to make further modifications or provide additional information. \ No newline at end of file diff --git a/config/notification.yml b/config/notification.yml index 665c21261..1afb44e46 100644 --- a/config/notification.yml +++ b/config/notification.yml @@ -90,9 +90,9 @@ groupApplicationRejected: unreadCount: false offlinePush: enable: false - title: " title" - desc: " desc" - ext: " ext" + title: "groupApplicationRejected title" + desc: "groupApplicationRejected desc" + ext: "groupApplicationRejected ext" groupOwnerTransferred: diff --git a/config/openim-api.yml b/config/openim-api.yml index b787789a7..f20070468 100644 --- a/config/openim-api.yml +++ b/config/openim-api.yml @@ -5,4 +5,4 @@ api: prometheus: enable: true ports: [ 20113 ] - grafanaURL: http://127.0.0.1:13000/ + grafanaURL: webhook://127.0.0.1:13000/ diff --git a/config/openim-rpc-auth.yml b/config/openim-rpc-auth.yml index 1ad3fe24b..a77b5457e 100644 --- a/config/openim-rpc-auth.yml +++ b/config/openim-rpc-auth.yml @@ -8,5 +8,6 @@ prometheus: ports: [ 20106 ] tokenPolicy: + #token有效期,单位(天) expire: 90 diff --git a/config/openim-rpc-msg.yml b/config/openim-rpc-msg.yml index 0fd7926d9..86d1dd7e2 100644 --- a/config/openim-rpc-msg.yml +++ b/config/openim-rpc-msg.yml @@ -7,7 +7,10 @@ prometheus: enable: true ports: [ 20102 ] +#发消息是否需要好友验证 friendVerify: false + +# groupMessageHasReadReceiptEnable: true singleMessageHasReadReceiptEnable: true diff --git a/config/openim-rpc-third.yml b/config/openim-rpc-third.yml index 2b3fc4917..e3a847bf2 100644 --- a/config/openim-rpc-third.yml +++ b/config/openim-rpc-third.yml @@ -24,9 +24,9 @@ object: sessionToken: '' publicRead: false kodo: - endpoint: "http://s3.cn-east-1.qiniucs.com" + endpoint: "webhook://s3.cn-east-1.qiniucs.com" bucket: "demo-9999999" - bucketURL: "http://your.domain.com" + bucketURL: "webhook://your.domain.com" accessKeyID: '' accessKeySecret: '' sessionToken: '' diff --git a/config/templates/prometheus-dashboard.yaml b/config/templates/prometheus-dashboard.yaml index 2e1ae7760..1dccbd692 100644 --- a/config/templates/prometheus-dashboard.yaml +++ b/config/templates/prometheus-dashboard.yaml @@ -1213,7 +1213,7 @@ "editorMode": "code", "expr": "sum(rate(app_requests_total{job=~\"^($job)$\"}[$interval])) by (job)", "instant": false, - "legendFormat": "{{job}}-http", + "legendFormat": "{{job}}-webhook", "range": true, "refId": "A" }, diff --git a/config/webhooks.yml b/config/webhooks.yml index 689ac3e08..c7839d4f2 100644 --- a/config/webhooks.yml +++ b/config/webhooks.yml @@ -1,4 +1,4 @@ -url: "http://127.0.0.1:10008/callbackExample" +url: "webhook://127.0.0.1:10008/callbackExample" beforeSendSingleMsg: enable: false timeout: 5 @@ -10,11 +10,9 @@ beforeUpdateUserInfoEx: afterUpdateUserInfoEx: enable: false timeout: 5 - failedContinue: true afterSendSingleMsg: enable: false timeout: 5 - failedContinue: true beforeSendGroupMsg: enable: false timeout: 5 @@ -26,19 +24,15 @@ beforeMsgModify: afterSendGroupMsg: enable: false timeout: 5 - failedContinue: true afterUserOnline: enable: false timeout: 5 - failedContinue: true afterUserOffline: enable: false timeout: 5 - failedContinue: true afterUserKickOff: enable: false timeout: 5 - failedContinue: true beforeOfflinePush: enable: false timeout: 5 @@ -62,7 +56,6 @@ beforeUpdateUserInfo: afterUpdateUserInfo: enable: false timeout: 5 - failedContinue: true beforeCreateGroup: enable: false timeout: 5 @@ -70,7 +63,6 @@ beforeCreateGroup: afterCreateGroup: enable: false timeout: 5 - failedContinue: true beforeMemberJoinGroup: enable: false timeout: 5 @@ -82,19 +74,15 @@ beforeSetGroupMemberInfo: afterSetGroupMemberInfo: enable: false timeout: 5 - failedContinue: true afterQuitGroup: enable: false timeout: 5 - failedContinue: true afterKickGroupMember: enable: false timeout: 5 - failedContinue: true afterDismissGroup: enable: false timeout: 5 - failedContinue: true beforeApplyJoinGroup: enable: false timeout: 5 @@ -102,11 +90,9 @@ beforeApplyJoinGroup: afterGroupMsgRead: enable: false timeout: 5 - failedContinue: true afterSingleMsgRead: enable: false timeout: 5 - failedContinue: true beforeUserRegister: enable: false timeout: 5 @@ -114,11 +100,9 @@ beforeUserRegister: afterUserRegister: enable: false timeout: 5 - failedContinue: true afterTransferGroupOwner: enable: false timeout: 5 - failedContinue: true beforeSetFriendRemark: enable: false timeout: 5 @@ -126,15 +110,12 @@ beforeSetFriendRemark: afterSetFriendRemark: enable: false timeout: 5 - failedContinue: true afterGroupMsgRevoke: enable: false timeout: 5 - failedContinue: true afterJoinGroup: enable: false timeout: 5 - failedContinue: true beforeInviteUserToGroup: enable: false timeout: 5 @@ -142,7 +123,6 @@ beforeInviteUserToGroup: afterSetGroupInfo: enable: false timeout: 5 - failedContinue: true beforeSetGroupInfo: enable: false timeout: 5 @@ -150,15 +130,13 @@ beforeSetGroupInfo: afterRevokeMsg: enable: false timeout: 5 - failedContinue: true beforeAddBlack: enable: false timeout: 5 - failedContinue: true + failedContinue: afterAddFriend: enable: false timeout: 5 - failedContinue: true beforeAddFriendAgree: enable: false timeout: 5 @@ -166,7 +144,6 @@ beforeAddFriendAgree: afterDeleteFriend: enable: false timeout: 5 - failedContinue: true beforeImportFriends: enable: false timeout: 5 @@ -174,8 +151,6 @@ beforeImportFriends: afterImportFriends: enable: false timeout: 5 - failedContinue: true afterRemoveBlack: enable: false timeout: 5 - failedContinue: true diff --git a/deployments/charts/openim-api/templates/deployment.yaml b/deployments/charts/openim-api/templates/deployment.yaml index 3b4bf57a2..b0076393f 100644 --- a/deployments/charts/openim-api/templates/deployment.yaml +++ b/deployments/charts/openim-api/templates/deployment.yaml @@ -48,17 +48,17 @@ spec: image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" imagePullPolicy: {{ .Values.image.pullPolicy }} ports: - - name: http + - name: webhook containerPort: 80 protocol: TCP #livenessProbe: # httpGet: # path: / - # port: http + # port: webhook #readinessProbe: # httpGet: # path: / - # port: http + # port: webhook resources: {{- toYaml .Values.resources | nindent 12 }} volumeMounts: diff --git a/deployments/charts/openim-api/templates/service.yaml b/deployments/charts/openim-api/templates/service.yaml index 3704bf35c..74f75a25e 100644 --- a/deployments/charts/openim-api/templates/service.yaml +++ b/deployments/charts/openim-api/templates/service.yaml @@ -22,8 +22,8 @@ spec: type: {{ .Values.service.type }} ports: - port: {{ .Values.service.port }} - targetPort: http + targetPort: webhook protocol: TCP - name: http + name: webhook selector: {{- include "openim-api.selectorLabels" . | nindent 4 }} diff --git a/deployments/charts/openim-msggateway/templates/deployment.yaml b/deployments/charts/openim-msggateway/templates/deployment.yaml index 0f5f9d06f..e938fa9bf 100644 --- a/deployments/charts/openim-msggateway/templates/deployment.yaml +++ b/deployments/charts/openim-msggateway/templates/deployment.yaml @@ -48,7 +48,7 @@ spec: image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" imagePullPolicy: {{ .Values.image.pullPolicy }} ports: - - name: http + - name: webhook containerPort: 80 protocol: TCP - name: rpc @@ -57,11 +57,11 @@ spec: #livenessProbe: # httpGet: # path: / - # port: http + # port: webhook #readinessProbe: # httpGet: # path: / - # port: http + # port: webhook resources: {{- toYaml .Values.resources | nindent 12 }} volumeMounts: diff --git a/deployments/charts/openim-msggateway/templates/service.yaml b/deployments/charts/openim-msggateway/templates/service.yaml index e33fce6db..e914ee1d4 100644 --- a/deployments/charts/openim-msggateway/templates/service.yaml +++ b/deployments/charts/openim-msggateway/templates/service.yaml @@ -22,9 +22,9 @@ spec: type: {{ .Values.service.type }} ports: - port: {{ .Values.service.port }} - targetPort: http + targetPort: webhook protocol: TCP - name: http + name: webhook - port: 88 targetPort: rpc protocol: TCP diff --git a/deployments/charts/openim-msgtransfer/templates/deployment.yaml b/deployments/charts/openim-msgtransfer/templates/deployment.yaml index 2c9e24bdd..019e307d5 100644 --- a/deployments/charts/openim-msgtransfer/templates/deployment.yaml +++ b/deployments/charts/openim-msgtransfer/templates/deployment.yaml @@ -48,17 +48,17 @@ spec: image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" imagePullPolicy: {{ .Values.image.pullPolicy }} ports: - - name: http + - name: webhook containerPort: 80 protocol: TCP #livenessProbe: # httpGet: # path: / - # port: http + # port: webhook #readinessProbe: # httpGet: # path: / - # port: http + # port: webhook resources: {{- toYaml .Values.resources | nindent 12 }} volumeMounts: diff --git a/deployments/charts/openim-msgtransfer/templates/service.yaml b/deployments/charts/openim-msgtransfer/templates/service.yaml index e657f8c18..467f7d13c 100644 --- a/deployments/charts/openim-msgtransfer/templates/service.yaml +++ b/deployments/charts/openim-msgtransfer/templates/service.yaml @@ -22,8 +22,8 @@ spec: type: {{ .Values.service.type }} ports: - port: {{ .Values.service.port }} - targetPort: http + targetPort: webhook protocol: TCP - name: http + name: webhook selector: {{- include "openim-msgtransfer.selectorLabels" . | nindent 4 }} diff --git a/deployments/charts/openim-push/templates/deployment.yaml b/deployments/charts/openim-push/templates/deployment.yaml index 34de33e10..86c27d14c 100644 --- a/deployments/charts/openim-push/templates/deployment.yaml +++ b/deployments/charts/openim-push/templates/deployment.yaml @@ -48,17 +48,17 @@ spec: image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" imagePullPolicy: {{ .Values.image.pullPolicy }} ports: - - name: http + - name: webhook containerPort: 80 protocol: TCP #livenessProbe: # httpGet: # path: / - # port: http + # port: webhook #readinessProbe: # httpGet: # path: / - # port: http + # port: webhook resources: {{- toYaml .Values.resources | nindent 12 }} volumeMounts: diff --git a/deployments/charts/openim-push/templates/service.yaml b/deployments/charts/openim-push/templates/service.yaml index b38c35375..c2ef8db35 100644 --- a/deployments/charts/openim-push/templates/service.yaml +++ b/deployments/charts/openim-push/templates/service.yaml @@ -22,8 +22,8 @@ spec: type: {{ .Values.service.type }} ports: - port: {{ .Values.service.port }} - targetPort: http + targetPort: webhook protocol: TCP - name: http + name: webhook selector: {{- include "openim-push.selectorLabels" . | nindent 4 }} diff --git a/deployments/charts/openim-rpc-auth/templates/deployment.yaml b/deployments/charts/openim-rpc-auth/templates/deployment.yaml index 202162775..98c43ecb7 100644 --- a/deployments/charts/openim-rpc-auth/templates/deployment.yaml +++ b/deployments/charts/openim-rpc-auth/templates/deployment.yaml @@ -48,17 +48,17 @@ spec: image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" imagePullPolicy: {{ .Values.image.pullPolicy }} ports: - - name: http + - name: webhook containerPort: 80 protocol: TCP #livenessProbe: # httpGet: # path: / - # port: http + # port: webhook #readinessProbe: # httpGet: # path: / - # port: http + # port: webhook resources: {{- toYaml .Values.resources | nindent 12 }} volumeMounts: diff --git a/deployments/charts/openim-rpc-auth/templates/service.yaml b/deployments/charts/openim-rpc-auth/templates/service.yaml index 3674da014..785512347 100644 --- a/deployments/charts/openim-rpc-auth/templates/service.yaml +++ b/deployments/charts/openim-rpc-auth/templates/service.yaml @@ -22,8 +22,8 @@ spec: type: {{ .Values.service.type }} ports: - port: {{ .Values.service.port }} - targetPort: http + targetPort: webhook protocol: TCP - name: http + name: webhook selector: {{- include "openim-rpc-auth.selectorLabels" . | nindent 4 }} diff --git a/deployments/charts/openim-rpc-conversation/templates/deployment.yaml b/deployments/charts/openim-rpc-conversation/templates/deployment.yaml index 01721aa30..6dcb001f4 100644 --- a/deployments/charts/openim-rpc-conversation/templates/deployment.yaml +++ b/deployments/charts/openim-rpc-conversation/templates/deployment.yaml @@ -48,17 +48,17 @@ spec: image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" imagePullPolicy: {{ .Values.image.pullPolicy }} ports: - - name: http + - name: webhook containerPort: 80 protocol: TCP #livenessProbe: # httpGet: # path: / - # port: http + # port: webhook #readinessProbe: # httpGet: # path: / - # port: http + # port: webhook resources: {{- toYaml .Values.resources | nindent 12 }} volumeMounts: diff --git a/deployments/charts/openim-rpc-conversation/templates/service.yaml b/deployments/charts/openim-rpc-conversation/templates/service.yaml index f90673584..8559c4d11 100644 --- a/deployments/charts/openim-rpc-conversation/templates/service.yaml +++ b/deployments/charts/openim-rpc-conversation/templates/service.yaml @@ -22,8 +22,8 @@ spec: type: {{ .Values.service.type }} ports: - port: {{ .Values.service.port }} - targetPort: http + targetPort: webhook protocol: TCP - name: http + name: webhook selector: {{- include "openim-rpc-conversation.selectorLabels" . | nindent 4 }} diff --git a/deployments/charts/openim-rpc-friend/templates/deployment.yaml b/deployments/charts/openim-rpc-friend/templates/deployment.yaml index a57188828..01251cdfa 100644 --- a/deployments/charts/openim-rpc-friend/templates/deployment.yaml +++ b/deployments/charts/openim-rpc-friend/templates/deployment.yaml @@ -48,17 +48,17 @@ spec: image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" imagePullPolicy: {{ .Values.image.pullPolicy }} ports: - - name: http + - name: webhook containerPort: 80 protocol: TCP #livenessProbe: # httpGet: # path: / - # port: http + # port: webhook #readinessProbe: # httpGet: # path: / - # port: http + # port: webhook resources: {{- toYaml .Values.resources | nindent 12 }} volumeMounts: diff --git a/deployments/charts/openim-rpc-friend/templates/service.yaml b/deployments/charts/openim-rpc-friend/templates/service.yaml index e445d561f..892a007dd 100644 --- a/deployments/charts/openim-rpc-friend/templates/service.yaml +++ b/deployments/charts/openim-rpc-friend/templates/service.yaml @@ -22,8 +22,8 @@ spec: type: {{ .Values.service.type }} ports: - port: {{ .Values.service.port }} - targetPort: http + targetPort: webhook protocol: TCP - name: http + name: webhook selector: {{- include "openim-rpc-friend.selectorLabels" . | nindent 4 }} diff --git a/deployments/charts/openim-rpc-group/templates/deployment.yaml b/deployments/charts/openim-rpc-group/templates/deployment.yaml index 406d0b342..e738f33be 100644 --- a/deployments/charts/openim-rpc-group/templates/deployment.yaml +++ b/deployments/charts/openim-rpc-group/templates/deployment.yaml @@ -48,17 +48,17 @@ spec: image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" imagePullPolicy: {{ .Values.image.pullPolicy }} ports: - - name: http + - name: webhook containerPort: 80 protocol: TCP #livenessProbe: # httpGet: # path: / - # port: http + # port: webhook #readinessProbe: # httpGet: # path: / - # port: http + # port: webhook resources: {{- toYaml .Values.resources | nindent 12 }} volumeMounts: diff --git a/deployments/charts/openim-rpc-group/templates/service.yaml b/deployments/charts/openim-rpc-group/templates/service.yaml index fc6f76060..42e1f78ca 100644 --- a/deployments/charts/openim-rpc-group/templates/service.yaml +++ b/deployments/charts/openim-rpc-group/templates/service.yaml @@ -22,8 +22,8 @@ spec: type: {{ .Values.service.type }} ports: - port: {{ .Values.service.port }} - targetPort: http + targetPort: webhook protocol: TCP - name: http + name: webhook selector: {{- include "openim-rpc-group.selectorLabels" . | nindent 4 }} diff --git a/deployments/charts/openim-rpc-msg/templates/deployment.yaml b/deployments/charts/openim-rpc-msg/templates/deployment.yaml index d764294ea..f7267fabb 100644 --- a/deployments/charts/openim-rpc-msg/templates/deployment.yaml +++ b/deployments/charts/openim-rpc-msg/templates/deployment.yaml @@ -48,17 +48,17 @@ spec: image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" imagePullPolicy: {{ .Values.image.pullPolicy }} ports: - - name: http + - name: webhook containerPort: 80 protocol: TCP #livenessProbe: # httpGet: # path: / - # port: http + # port: webhook #readinessProbe: # httpGet: # path: / - # port: http + # port: webhook resources: {{- toYaml .Values.resources | nindent 12 }} volumeMounts: diff --git a/deployments/charts/openim-rpc-msg/templates/service.yaml b/deployments/charts/openim-rpc-msg/templates/service.yaml index 953b89d5d..ba403d5ab 100644 --- a/deployments/charts/openim-rpc-msg/templates/service.yaml +++ b/deployments/charts/openim-rpc-msg/templates/service.yaml @@ -22,8 +22,8 @@ spec: type: {{ .Values.service.type }} ports: - port: {{ .Values.service.port }} - targetPort: http + targetPort: webhook protocol: TCP - name: http + name: webhook selector: {{- include "openim-rpc-msg.selectorLabels" . | nindent 4 }} diff --git a/deployments/charts/openim-rpc-third/templates/deployment.yaml b/deployments/charts/openim-rpc-third/templates/deployment.yaml index e4f47de57..779415535 100644 --- a/deployments/charts/openim-rpc-third/templates/deployment.yaml +++ b/deployments/charts/openim-rpc-third/templates/deployment.yaml @@ -48,17 +48,17 @@ spec: image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" imagePullPolicy: {{ .Values.image.pullPolicy }} ports: - - name: http + - name: webhook containerPort: 80 protocol: TCP #livenessProbe: # httpGet: # path: / - # port: http + # port: webhook #readinessProbe: # httpGet: # path: / - # port: http + # port: webhook resources: {{- toYaml .Values.resources | nindent 12 }} volumeMounts: diff --git a/deployments/charts/openim-rpc-third/templates/service.yaml b/deployments/charts/openim-rpc-third/templates/service.yaml index f467992a2..af112794e 100644 --- a/deployments/charts/openim-rpc-third/templates/service.yaml +++ b/deployments/charts/openim-rpc-third/templates/service.yaml @@ -22,8 +22,8 @@ spec: type: {{ .Values.service.type }} ports: - port: {{ .Values.service.port }} - targetPort: http + targetPort: webhook protocol: TCP - name: http + name: webhook selector: {{- include "openim-rpc-third.selectorLabels" . | nindent 4 }} diff --git a/deployments/charts/openim-rpc-user/templates/deployment.yaml b/deployments/charts/openim-rpc-user/templates/deployment.yaml index 62106e5a2..26497d837 100644 --- a/deployments/charts/openim-rpc-user/templates/deployment.yaml +++ b/deployments/charts/openim-rpc-user/templates/deployment.yaml @@ -48,17 +48,17 @@ spec: image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" imagePullPolicy: {{ .Values.image.pullPolicy }} ports: - - name: http + - name: webhook containerPort: 80 protocol: TCP #livenessProbe: # httpGet: # path: / - # port: http + # port: webhook #readinessProbe: # httpGet: # path: / - # port: http + # port: webhook resources: {{- toYaml .Values.resources | nindent 12 }} volumeMounts: diff --git a/deployments/charts/openim-rpc-user/templates/service.yaml b/deployments/charts/openim-rpc-user/templates/service.yaml index f89be1c44..af8a53e19 100644 --- a/deployments/charts/openim-rpc-user/templates/service.yaml +++ b/deployments/charts/openim-rpc-user/templates/service.yaml @@ -22,8 +22,8 @@ spec: type: {{ .Values.service.type }} ports: - port: {{ .Values.service.port }} - targetPort: http + targetPort: webhook protocol: TCP - name: http + name: webhook selector: {{- include "openim-rpc-user.selectorLabels" . | nindent 4 }} diff --git a/deployments/templates/config.yaml b/deployments/templates/config.yaml index 5da6d5d0b..fee0bf90a 100644 --- a/deployments/templates/config.yaml +++ b/deployments/templates/config.yaml @@ -323,7 +323,7 @@ iosPush: # Timeout in seconds # Whether to continue execution if callback fails callback: - url: "http://127.0.0.1:10008/callbackExample" + url: "webhook://127.0.0.1:10008/callbackExample" beforeSendSingleMsg: enable: ${CALLBACK_ENABLE} timeout: ${CALLBACK_TIMEOUT} diff --git a/docs/contrib/go-code.md b/docs/contrib/go-code.md index 851838a98..df74dec1b 100644 --- a/docs/contrib/go-code.md +++ b/docs/contrib/go-code.md @@ -362,7 +362,7 @@ func SIGTERMExit() { ```go import ( - _ "net/http/pprof" + _ "net/webhook/pprof" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" diff --git a/docs/contrib/go-code1.md b/docs/contrib/go-code1.md index 8dcb9ce6d..2206a153e 100644 --- a/docs/contrib/go-code1.md +++ b/docs/contrib/go-code1.md @@ -581,7 +581,7 @@ func SIGTERMExit() { ```go import ( - _ "net/http/pprof" + _ "net/webhook/pprof" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" diff --git a/docs/contrib/logging.md b/docs/contrib/logging.md index b88fbeb8d..03628f427 100644 --- a/docs/contrib/logging.md +++ b/docs/contrib/logging.md @@ -470,7 +470,7 @@ wrappedErr := errs.WrapMsg(err, "additional error information") ```go // "github.com/openimsdk/tools/errs" err := errors.New("original error") -wrappedErr := errs.WrapMsg(err, "problem occurred", "code", 404, "url", "http://example.com") +wrappedErr := errs.WrapMsg(err, "problem occurred", "code", 404, "url", "webhook://example.com") // wrappedErr will contain the original error, call stack, and "problem occurred code=404, url=http://example.com" ``` @@ -494,7 +494,7 @@ Suppose we have some runtime context variables, such as a user ID and the type o userID := "user123" operation := "update profile" errorCode := 500 -requestURL := "http://example.com/updateProfile" +requestURL := "webhook://example.com/updateProfile" // Create a new error err := errors.New("original error") diff --git a/internal/api/third.go b/internal/api/third.go index 6baa70ee5..8297c9e98 100644 --- a/internal/api/third.go +++ b/internal/api/third.go @@ -65,7 +65,7 @@ func setURLPrefix(c *gin.Context, urlPrefix *string) error { } } u := url.URL{ - Scheme: "http", + Scheme: "webhook", Host: c.Request.Host, Path: "/object/", } diff --git a/internal/msggateway/callback.go b/internal/msggateway/callback.go index 46a09c3de..1750f779b 100644 --- a/internal/msggateway/callback.go +++ b/internal/msggateway/callback.go @@ -20,19 +20,15 @@ import ( cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/http" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/mcontext" ) -func CallbackUserOnline(ctx context.Context, callback *config.Webhooks, userID string, platformID int, isAppBackground bool, connID string) error { - if !callback.AfterUserOnline.Enable { - return nil - } +func (ws *WsServer) webhookAfterUserOnline(ctx context.Context, after *config.AfterConfig, userID string, platformID int, isAppBackground bool, connID string) { req := cbapi.CallbackUserOnlineReq{ UserStatusCallbackReq: cbapi.UserStatusCallbackReq{ UserStatusBaseCallback: cbapi.UserStatusBaseCallback{ - CallbackCommand: cbapi.CallbackUserOnlineCommand, + CallbackCommand: cbapi.CallbackAfterUserOnlineCommand, OperationID: mcontext.GetOperationID(ctx), PlatformID: platformID, Platform: constant.PlatformIDToName(platformID), @@ -43,21 +39,14 @@ func CallbackUserOnline(ctx context.Context, callback *config.Webhooks, userID s IsAppBackground: isAppBackground, ConnID: connID, } - resp := cbapi.CommonCallbackResp{} - if err := http.CallBackPostReturn(ctx, callback.URL, &req, &resp, callback.AfterUserOnline); err != nil { - return err - } - return nil + ws.webhookClient.AsyncPost(ctx, req.GetCallbackCommand(), req, &cbapi.CommonCallbackResp{}, after) } -func CallbackUserOffline(ctx context.Context, callback *config.Webhooks, userID string, platformID int, connID string) error { - if !callback.AfterUserOffline.Enable { - return nil - } +func (ws *WsServer) webhookAfterUserOffline(ctx context.Context, after *config.AfterConfig, userID string, platformID int, connID string) { req := &cbapi.CallbackUserOfflineReq{ UserStatusCallbackReq: cbapi.UserStatusCallbackReq{ UserStatusBaseCallback: cbapi.UserStatusBaseCallback{ - CallbackCommand: cbapi.CallbackUserOfflineCommand, + CallbackCommand: cbapi.CallbackAfterUserOfflineCommand, OperationID: mcontext.GetOperationID(ctx), PlatformID: platformID, Platform: constant.PlatformIDToName(platformID), @@ -67,21 +56,14 @@ func CallbackUserOffline(ctx context.Context, callback *config.Webhooks, userID Seq: time.Now().UnixMilli(), ConnID: connID, } - resp := &cbapi.CallbackUserOfflineResp{} - if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.AfterUserOffline); err != nil { - return err - } - return nil + ws.webhookClient.AsyncPost(ctx, req.GetCallbackCommand(), req, &cbapi.CallbackUserOfflineResp{}, after) } -func CallbackUserKickOff(ctx context.Context, callback *config.Webhooks, userID string, platformID int) error { - if !callback.AfterUserKickOff.Enable { - return nil - } +func (ws *WsServer) webhookAfterUserKickOff(ctx context.Context, after *config.AfterConfig, userID string, platformID int) { req := &cbapi.CallbackUserKickOffReq{ UserStatusCallbackReq: cbapi.UserStatusCallbackReq{ UserStatusBaseCallback: cbapi.UserStatusBaseCallback{ - CallbackCommand: cbapi.CallbackUserKickOffCommand, + CallbackCommand: cbapi.CallbackAfterUserKickOffCommand, OperationID: mcontext.GetOperationID(ctx), PlatformID: platformID, Platform: constant.PlatformIDToName(platformID), @@ -90,9 +72,5 @@ func CallbackUserKickOff(ctx context.Context, callback *config.Webhooks, userID }, Seq: time.Now().UnixMilli(), } - resp := &cbapi.CommonCallbackResp{} - if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.AfterUserOffline); err != nil { - return err - } - return nil + ws.webhookClient.AsyncPost(ctx, req.GetCallbackCommand(), req, &cbapi.CommonCallbackResp{}, after) } diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index 830735be7..a691ee7b7 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -17,6 +17,8 @@ package msggateway import ( "context" "fmt" + "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" + "github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue" pbAuth "github.com/openimsdk/protocol/auth" "github.com/openimsdk/tools/mcontext" "net/http" @@ -52,6 +54,11 @@ type LongConnServer interface { MessageHandler } +const ( + webhookWorkerCount = 2 + webhookBufferSize = 100 +) + type WsServer struct { msgGatewayConfig *Config port int @@ -72,6 +79,7 @@ type WsServer struct { Compressor Encoder MessageHandler + webhookClient *webhook.Client } type kickHandler struct { @@ -95,15 +103,9 @@ func (ws *WsServer) SetUserOnlineStatus(ctx context.Context, client *Client, sta } switch status { case constant.Online: - err := CallbackUserOnline(ctx, &ws.msgGatewayConfig.WebhooksConfig, client.UserID, client.PlatformID, client.IsBackground, client.ctx.GetConnID()) - if err != nil { - log.ZWarn(ctx, "CallbackUserOnline err", err) - } + ws.webhookAfterUserOnline(ctx, &ws.msgGatewayConfig.WebhooksConfig.AfterUserOnline, client.UserID, client.PlatformID, client.IsBackground, client.ctx.GetConnID()) case constant.Offline: - err := CallbackUserOffline(ctx, &ws.msgGatewayConfig.WebhooksConfig, client.UserID, client.PlatformID, client.ctx.GetConnID()) - if err != nil { - log.ZWarn(ctx, "CallbackUserOffline err", err) - } + ws.webhookAfterUserOffline(ctx, &ws.msgGatewayConfig.WebhooksConfig.AfterUserOffline, client.UserID, client.PlatformID, client.ctx.GetConnID()) } } @@ -147,6 +149,7 @@ func NewWsServer(msgGatewayConfig *Config, opts ...Option) (*WsServer, error) { clients: newUserMap(), Compressor: NewGzipCompressor(), Encoder: NewGobEncoder(), + webhookClient: webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL, memAsyncQueue.NewMemoryQueue(webhookWorkerCount, webhookBufferSize)), }, nil } diff --git a/internal/push/callback.go b/internal/push/callback.go index 3b665b38f..26b475512 100644 --- a/internal/push/callback.go +++ b/internal/push/callback.go @@ -19,21 +19,20 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/http" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/datautil" ) -func callbackOfflinePush(ctx context.Context, callback *config.Webhooks, userIDs []string, msg *sdkws.MsgData, offlinePushUserIDs *[]string) error { - if !callback.BeforeOfflinePush.Enable || msg.ContentType == constant.Typing { +func (p *Pusher) webhookBeforeOfflinePush(ctx context.Context, before *config.BeforeConfig, userIDs []string, msg *sdkws.MsgData, offlinePushUserIDs *[]string) error { + if msg.ContentType == constant.Typing { return nil } req := &callbackstruct.CallbackBeforePushReq{ UserStatusBatchCallbackReq: callbackstruct.UserStatusBatchCallbackReq{ UserStatusBaseCallback: callbackstruct.UserStatusBaseCallback{ - CallbackCommand: callbackstruct.CallbackOfflinePushCommand, + CallbackCommand: callbackstruct.CallbackBeforeOfflinePushCommand, OperationID: mcontext.GetOperationID(ctx), PlatformID: int(msg.SenderPlatformID), Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)), @@ -51,7 +50,8 @@ func callbackOfflinePush(ctx context.Context, callback *config.Webhooks, userIDs } resp := &callbackstruct.CallbackBeforePushResp{} - if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.BeforeOfflinePush); err != nil { + + if err := p.webhookClient.SyncPost(ctx, req.GetCallbackCommand(), req, resp, before); err != nil { return err } @@ -64,14 +64,14 @@ func callbackOfflinePush(ctx context.Context, callback *config.Webhooks, userIDs return nil } -func callbackOnlinePush(ctx context.Context, callback *config.Webhooks, userIDs []string, msg *sdkws.MsgData) error { - if !callback.BeforeOnlinePush.Enable || datautil.Contain(msg.SendID, userIDs...) || msg.ContentType == constant.Typing { +func (p *Pusher) webhookBeforeOnlinePush(ctx context.Context, before *config.BeforeConfig, userIDs []string, msg *sdkws.MsgData) error { + if datautil.Contain(msg.SendID, userIDs...) || msg.ContentType == constant.Typing { return nil } req := callbackstruct.CallbackBeforePushReq{ UserStatusBatchCallbackReq: callbackstruct.UserStatusBatchCallbackReq{ UserStatusBaseCallback: callbackstruct.UserStatusBaseCallback{ - CallbackCommand: callbackstruct.CallbackOnlinePushCommand, + CallbackCommand: callbackstruct.CallbackBeforeOnlinePushCommand, OperationID: mcontext.GetOperationID(ctx), PlatformID: int(msg.SenderPlatformID), Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)), @@ -87,25 +87,25 @@ func callbackOnlinePush(ctx context.Context, callback *config.Webhooks, userIDs Content: GetContent(msg), } resp := &callbackstruct.CallbackBeforePushResp{} - if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.BeforeOnlinePush); err != nil { + if err := p.webhookClient.SyncPost(ctx, req.GetCallbackCommand(), req, resp, before); err != nil { return err } return nil } -func callbackBeforeSuperGroupOnlinePush( +func (p *Pusher) webhookBeforeGroupOnlinePush( ctx context.Context, - callback *config.Webhooks, + before *config.BeforeConfig, groupID string, msg *sdkws.MsgData, pushToUserIDs *[]string, ) error { - if !callback.BeforeGroupOnlinePush.Enable || msg.ContentType == constant.Typing { + if msg.ContentType == constant.Typing { return nil } req := callbackstruct.CallbackBeforeSuperGroupOnlinePushReq{ UserStatusBaseCallback: callbackstruct.UserStatusBaseCallback{ - CallbackCommand: callbackstruct.CallbackSuperGroupOnlinePushCommand, + CallbackCommand: callbackstruct.CallbackBeforeGroupOnlinePushCommand, OperationID: mcontext.GetOperationID(ctx), PlatformID: int(msg.SenderPlatformID), Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)), @@ -120,10 +120,9 @@ func callbackBeforeSuperGroupOnlinePush( Seq: msg.Seq, } resp := &callbackstruct.CallbackBeforeSuperGroupOnlinePushResp{} - if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.BeforeGroupOnlinePush); err != nil { + if err := p.webhookClient.SyncPost(ctx, req.GetCallbackCommand(), req, resp, before); err != nil { return err } - if len(resp.UserIDs) != 0 { *pushToUserIDs = resp.UserIDs } diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index 0e8f826a3..e4ed54b31 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -17,6 +17,8 @@ package push import ( "context" "encoding/json" + "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" + "github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue" "github.com/openimsdk/tools/errs" "sync" @@ -47,6 +49,11 @@ import ( "google.golang.org/grpc" ) +const ( + webhookWorkerCount = 2 + webhookBufferSize = 100 +) + type Pusher struct { config *Config database controller.PushDatabase @@ -57,6 +64,7 @@ type Pusher struct { msgRpcClient *rpcclient.MessageRpcClient conversationRpcClient *rpcclient.ConversationRpcClient groupRpcClient *rpcclient.GroupRpcClient + webhookClient *webhook.Client } var errNoOfflinePusher = errs.New("no offlinePusher is configured") @@ -75,6 +83,7 @@ func NewPusher(config *Config, discov discovery.SvcDiscoveryRegistry, offlinePus msgRpcClient: msgRpcClient, conversationRpcClient: conversationRpcClient, groupRpcClient: groupRpcClient, + webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL, memAsyncQueue.NewMemoryQueue(webhookWorkerCount, webhookBufferSize)), } } @@ -104,9 +113,11 @@ func (p *Pusher) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) error { log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String()) - if err := callbackOnlinePush(ctx, &p.config.WebhooksConfig, userIDs, msg); err != nil { + + if err := p.webhookBeforeOnlinePush(ctx, &p.config.WebhooksConfig.BeforeOnlinePush, userIDs, msg); err != nil { return err } + // push wsResults, err := p.GetConnsAndOnlinePush(ctx, msg, userIDs) if err != nil { @@ -132,7 +143,7 @@ func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.Msg }) if len(offlinePushUserIDList) > 0 { - if err = callbackOfflinePush(ctx, &p.config.WebhooksConfig, offlinePushUserIDList, msg, &[]string{}); err != nil { + if err = p.webhookBeforeOfflinePush(ctx, &p.config.WebhooksConfig.BeforeOfflinePush, offlinePushUserIDList, msg, &[]string{}); err != nil { return err } err = p.offlinePushMsg(ctx, msg.SendID, msg, offlinePushUserIDList) @@ -165,7 +176,7 @@ func (p *Pusher) k8sOfflinePush2SuperGroup(ctx context.Context, groupID string, } if len(needOfflinePushUserIDs) > 0 { var offlinePushUserIDs []string - err := callbackOfflinePush(ctx, &p.config.WebhooksConfig, needOfflinePushUserIDs, msg, &offlinePushUserIDs) + err := p.webhookBeforeOfflinePush(ctx, &p.config.WebhooksConfig.BeforeOfflinePush, needOfflinePushUserIDs, msg, &offlinePushUserIDs) if err != nil { return err } @@ -196,7 +207,8 @@ func (p *Pusher) k8sOfflinePush2SuperGroup(ctx context.Context, groupID string, func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID) var pushToUserIDs []string - if err = callbackBeforeSuperGroupOnlinePush(ctx, &p.config.WebhooksConfig, groupID, msg, &pushToUserIDs); err != nil { + + if err = p.webhookBeforeGroupOnlinePush(ctx, &p.config.WebhooksConfig.BeforeGroupOnlinePush, groupID, msg, &pushToUserIDs); err != nil { return err } @@ -298,7 +310,8 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws // Use offline push messaging if len(needOfflinePushUserIDs) > 0 { var offlinePushUserIDs []string - err = callbackOfflinePush(ctx, &p.config.WebhooksConfig, needOfflinePushUserIDs, msg, &offlinePushUserIDs) + + err = p.webhookBeforeOfflinePush(ctx, &p.config.WebhooksConfig.BeforeOfflinePush, needOfflinePushUserIDs, msg, &offlinePushUserIDs) if err != nil { return err } diff --git a/internal/rpc/friend/callback.go b/internal/rpc/friend/callback.go index 81f80082a..15586082b 100644 --- a/internal/rpc/friend/callback.go +++ b/internal/rpc/friend/callback.go @@ -19,15 +19,19 @@ import ( cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/http" pbfriend "github.com/openimsdk/protocol/friend" - "github.com/openimsdk/tools/utils/datautil" ) -func CallbackBeforeAddFriend(ctx context.Context, callback *config.Webhooks, req *pbfriend.ApplyToAddFriendReq) error { - if !callback.BeforeAddFriend.Enable { - return nil +func (s *friendServer) webhookAfterDeleteFriend(ctx context.Context, after *config.AfterConfig, req *pbfriend.DeleteFriendReq) { + cbReq := &cbapi.CallbackAfterDeleteFriendReq{ + CallbackCommand: cbapi.CallbackAfterDeleteFriendCommand, + OwnerUserID: req.OwnerUserID, + FriendUserID: req.FriendUserID, } + s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterDeleteFriendResp{}, after) +} + +func (s *friendServer) webhookBeforeAddFriend(ctx context.Context, before *config.BeforeConfig, req *pbfriend.ApplyToAddFriendReq) error { cbReq := &cbapi.CallbackBeforeAddFriendReq{ CallbackCommand: cbapi.CallbackBeforeAddFriendCommand, FromUserID: req.FromUserID, @@ -36,49 +40,75 @@ func CallbackBeforeAddFriend(ctx context.Context, callback *config.Webhooks, req Ex: req.Ex, } resp := &cbapi.CallbackBeforeAddFriendResp{} - if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.BeforeAddFriend); err != nil { + + if err := s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil { return err } return nil } -func CallbackBeforeSetFriendRemark(ctx context.Context, callback *config.Webhooks, req *pbfriend.SetFriendRemarkReq) error { - if !callback.BeforeSetFriendRemark.Enable { - return nil +func (s *friendServer) webhookAfterAddFriend(ctx context.Context, after *config.AfterConfig, req *pbfriend.ApplyToAddFriendReq) { + cbReq := &cbapi.CallbackAfterAddFriendReq{ + CallbackCommand: cbapi.CallbackAfterAddFriendCommand, + FromUserID: req.FromUserID, + ToUserID: req.ToUserID, + ReqMsg: req.ReqMsg, } - cbReq := &cbapi.CallbackBeforeSetFriendRemarkReq{ - CallbackCommand: cbapi.CallbackBeforeSetFriendRemark, + resp := &cbapi.CallbackAfterAddFriendResp{} + s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, after) +} + +func (s *friendServer) webhookAfterSetFriendRemark(ctx context.Context, after *config.AfterConfig, req *pbfriend.SetFriendRemarkReq) { + + cbReq := &cbapi.CallbackAfterSetFriendRemarkReq{ + CallbackCommand: cbapi.CallbackAfterSetFriendRemarkCommand, OwnerUserID: req.OwnerUserID, FriendUserID: req.FriendUserID, Remark: req.Remark, } - resp := &cbapi.CallbackBeforeSetFriendRemarkResp{} - if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.BeforeAddFriend); err != nil { - return err + resp := &cbapi.CallbackAfterSetFriendRemarkResp{} + s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, after) +} + +func (s *friendServer) webhookAfterImportFriends(ctx context.Context, after *config.AfterConfig, req *pbfriend.ImportFriendReq) { + cbReq := &cbapi.CallbackAfterImportFriendsReq{ + CallbackCommand: cbapi.CallbackAfterImportFriendsCommand, + OwnerUserID: req.OwnerUserID, + FriendUserIDs: req.FriendUserIDs, } - datautil.NotNilReplace(&req.Remark, &resp.Remark) - return nil + resp := &cbapi.CallbackAfterImportFriendsResp{} + s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, after) } -func CallbackAfterSetFriendRemark(ctx context.Context, callback *config.Webhooks, req *pbfriend.SetFriendRemarkReq) error { - if !callback.AfterSetFriendRemark.Enable { - return nil +func (s *friendServer) webhookAfterRemoveBlack(ctx context.Context, after *config.AfterConfig, req *pbfriend.RemoveBlackReq) { + cbReq := &cbapi.CallbackAfterRemoveBlackReq{ + CallbackCommand: cbapi.CallbackAfterRemoveBlackCommand, + OwnerUserID: req.OwnerUserID, + BlackUserID: req.BlackUserID, } - cbReq := &cbapi.CallbackAfterSetFriendRemarkReq{ - CallbackCommand: cbapi.CallbackAfterSetFriendRemark, + resp := &cbapi.CallbackAfterRemoveBlackResp{} + s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, after) +} + +func (s *friendServer) webhookBeforeSetFriendRemark(ctx context.Context, before *config.BeforeConfig, req *pbfriend.SetFriendRemarkReq) error { + cbReq := &cbapi.CallbackBeforeSetFriendRemarkReq{ + CallbackCommand: cbapi.CallbackBeforeSetFriendRemarkCommand, OwnerUserID: req.OwnerUserID, FriendUserID: req.FriendUserID, Remark: req.Remark, } - resp := &cbapi.CallbackAfterSetFriendRemarkResp{} - if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.BeforeAddFriend); err != nil { + resp := &cbapi.CallbackBeforeSetFriendRemarkResp{} + if err := s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil { return err } + if resp.Remark != "" { + req.Remark = resp.Remark + } return nil } -func CallbackBeforeAddBlack(ctx context.Context, callback *config.Webhooks, req *pbfriend.AddBlackReq) error { - if !callback.BeforeAddBlack.Enable { +func (s *friendServer) webhookBeforeAddBlack(ctx context.Context, before *config.BeforeConfig, req *pbfriend.AddBlackReq) error { + if !before.Enable { return nil } cbReq := &cbapi.CallbackBeforeAddBlackReq{ @@ -87,32 +117,11 @@ func CallbackBeforeAddBlack(ctx context.Context, callback *config.Webhooks, req BlackUserID: req.BlackUserID, } resp := &cbapi.CallbackBeforeAddBlackResp{} - if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.BeforeAddBlack); err != nil { - return err - } - return nil + return s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before) } -func CallbackAfterAddFriend(ctx context.Context, callback *config.Webhooks, req *pbfriend.ApplyToAddFriendReq) error { - if !callback.AfterAddFriend.Enable { - return nil - } - cbReq := &cbapi.CallbackAfterAddFriendReq{ - CallbackCommand: cbapi.CallbackAfterAddFriendCommand, - FromUserID: req.FromUserID, - ToUserID: req.ToUserID, - ReqMsg: req.ReqMsg, - } - resp := &cbapi.CallbackAfterAddFriendResp{} - if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.AfterAddFriend); err != nil { - return err - } - - return nil -} - -func CallbackBeforeAddFriendAgree(ctx context.Context, callback *config.Webhooks, req *pbfriend.RespondFriendApplyReq) error { - if !callback.BeforeAddFriendAgree.Enable { +func (s *friendServer) webhookBeforeAddFriendAgree(ctx context.Context, before *config.BeforeConfig, req *pbfriend.RespondFriendApplyReq) error { + if !before.Enable { return nil } cbReq := &cbapi.CallbackBeforeAddFriendAgreeReq{ @@ -123,30 +132,11 @@ func CallbackBeforeAddFriendAgree(ctx context.Context, callback *config.Webhooks HandleResult: req.HandleResult, } resp := &cbapi.CallbackBeforeAddFriendAgreeResp{} - if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.BeforeAddFriendAgree); err != nil { - return err - } - return nil + return s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before) } -func CallbackAfterDeleteFriend(ctx context.Context, callback *config.Webhooks, req *pbfriend.DeleteFriendReq) error { - if !callback.AfterDeleteFriend.Enable { - return nil - } - cbReq := &cbapi.CallbackAfterDeleteFriendReq{ - CallbackCommand: cbapi.CallbackAfterDeleteFriendCommand, - OwnerUserID: req.OwnerUserID, - FriendUserID: req.FriendUserID, - } - resp := &cbapi.CallbackAfterDeleteFriendResp{} - if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.AfterDeleteFriend); err != nil { - return err - } - return nil -} - -func CallbackBeforeImportFriends(ctx context.Context, callback *config.Webhooks, req *pbfriend.ImportFriendReq) error { - if !callback.BeforeImportFriends.Enable { +func (s *friendServer) webhookBeforeImportFriends(ctx context.Context, before *config.BeforeConfig, req *pbfriend.ImportFriendReq) error { + if !before.Enable { return nil } cbReq := &cbapi.CallbackBeforeImportFriendsReq{ @@ -155,43 +145,11 @@ func CallbackBeforeImportFriends(ctx context.Context, callback *config.Webhooks, FriendUserIDs: req.FriendUserIDs, } resp := &cbapi.CallbackBeforeImportFriendsResp{} - if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.BeforeImportFriends); err != nil { + if err := s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil { return err } - if len(resp.FriendUserIDs) != 0 { + if len(resp.FriendUserIDs) > 0 { req.FriendUserIDs = resp.FriendUserIDs } return nil } - -func CallbackAfterImportFriends(ctx context.Context, callback *config.Webhooks, req *pbfriend.ImportFriendReq) error { - if !callback.AfterImportFriends.Enable { - return nil - } - cbReq := &cbapi.CallbackAfterImportFriendsReq{ - CallbackCommand: cbapi.CallbackAfterImportFriendsCommand, - OwnerUserID: req.OwnerUserID, - FriendUserIDs: req.FriendUserIDs, - } - resp := &cbapi.CallbackAfterImportFriendsResp{} - if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.AfterImportFriends); err != nil { - return err - } - return nil -} - -func CallbackAfterRemoveBlack(ctx context.Context, callback *config.Webhooks, req *pbfriend.RemoveBlackReq) error { - if !callback.AfterRemoveBlack.Enable { - return nil - } - cbReq := &cbapi.CallbackAfterRemoveBlackReq{ - CallbackCommand: cbapi.CallbackAfterRemoveBlackCommand, - OwnerUserID: req.OwnerUserID, - BlackUserID: req.BlackUserID, - } - resp := &cbapi.CallbackAfterRemoveBlackResp{} - if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.AfterRemoveBlack); err != nil { - return err - } - return nil -} diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index 5ebe288d2..e3bec71eb 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -17,6 +17,8 @@ package friend import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" + "github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue" "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/open-im-server/v3/pkg/authverify" @@ -38,6 +40,11 @@ import ( "google.golang.org/grpc" ) +const ( + webhookWorkerCount = 2 + webhookBufferSize = 100 +) + type friendServer struct { friendDatabase controller.FriendDatabase blackDatabase controller.BlackDatabase @@ -46,6 +53,7 @@ type friendServer struct { conversationRpcClient rpcclient.ConversationRpcClient RegisterCenter discovery.SvcDiscoveryRegistry config *Config + webhookClient *webhook.Client } type Config struct { @@ -113,6 +121,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg RegisterCenter: client, conversationRpcClient: rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation), config: config, + webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL, memAsyncQueue.NewMemoryQueue(webhookWorkerCount, webhookBufferSize)), }) return nil @@ -124,14 +133,12 @@ func (s *friendServer) ApplyToAddFriend(ctx context.Context, req *pbfriend.Apply if err := authverify.CheckAccessV3(ctx, req.FromUserID, s.config.Share.IMAdminUserID); err != nil { return nil, err } - if req.ToUserID == req.FromUserID { return nil, servererrs.ErrCanNotAddYourself.WrapMsg("req.ToUserID", req.ToUserID) } - if err = CallbackBeforeAddFriend(ctx, &s.config.WebhooksConfig, req); err != nil && err != servererrs.ErrCallbackContinue { + if err = s.webhookBeforeAddFriend(ctx, &s.config.WebhooksConfig.BeforeAddFriend, req); err != nil && err != servererrs.ErrCallbackContinue { return nil, err } - if _, err := s.userRpcClient.GetUsersInfoMap(ctx, []string{req.ToUserID, req.FromUserID}); err != nil { return nil, err } @@ -140,22 +147,17 @@ func (s *friendServer) ApplyToAddFriend(ctx context.Context, req *pbfriend.Apply if err != nil { return nil, err } - if in1 && in2 { return nil, servererrs.ErrRelationshipAlready.WrapMsg("already friends has f") } - if err = s.friendDatabase.AddFriendRequest(ctx, req.FromUserID, req.ToUserID, req.ReqMsg, req.Ex); err != nil { return nil, err } - if err = s.notificationSender.FriendApplicationAddNotification(ctx, req); err != nil { return nil, err } - if err = CallbackAfterAddFriend(ctx, &s.config.WebhooksConfig, req); err != nil && err != servererrs.ErrCallbackContinue { - return nil, err - } + s.webhookAfterAddFriend(ctx, &s.config.WebhooksConfig.AfterAddFriend, req) return resp, nil } @@ -174,7 +176,7 @@ func (s *friendServer) ImportFriends(ctx context.Context, req *pbfriend.ImportFr return nil, errs.ErrArgs.WrapMsg("friend userID repeated") } - if err := CallbackBeforeImportFriends(ctx, &s.config.WebhooksConfig, req); err != nil { + if err := s.webhookBeforeImportFriends(ctx, &s.config.WebhooksConfig.BeforeImportFriends, req); err != nil && err != servererrs.ErrCallbackContinue { return nil, err } @@ -188,9 +190,8 @@ func (s *friendServer) ImportFriends(ctx context.Context, req *pbfriend.ImportFr HandleResult: constant.FriendResponseAgree, }) } - if err := CallbackAfterImportFriends(ctx, &s.config.WebhooksConfig, req); err != nil { - return nil, err - } + + s.webhookAfterImportFriends(ctx, &s.config.WebhooksConfig.AfterImportFriends, req) return &pbfriend.ImportFriendResp{}, nil } @@ -208,7 +209,7 @@ func (s *friendServer) RespondFriendApply(ctx context.Context, req *pbfriend.Res HandleResult: req.HandleResult, } if req.HandleResult == constant.FriendResponseAgree { - if err := CallbackBeforeAddFriendAgree(ctx, &s.config.WebhooksConfig, req); err != nil && err != servererrs.ErrCallbackContinue { + if err := s.webhookBeforeAddFriendAgree(ctx, &s.config.WebhooksConfig.BeforeAddFriendAgree, req); err != nil && err != servererrs.ErrCallbackContinue { return nil, err } err := s.friendDatabase.AgreeFriendRequest(ctx, &friendRequest) @@ -245,16 +246,13 @@ func (s *friendServer) DeleteFriend(ctx context.Context, req *pbfriend.DeleteFri return nil, err } s.notificationSender.FriendDeletedNotification(ctx, req) - if err := CallbackAfterDeleteFriend(ctx, &s.config.WebhooksConfig, req); err != nil { - return nil, err - } + s.webhookAfterDeleteFriend(ctx, &s.config.WebhooksConfig.AfterDeleteFriend, req) return resp, nil } // ok. func (s *friendServer) SetFriendRemark(ctx context.Context, req *pbfriend.SetFriendRemarkReq) (resp *pbfriend.SetFriendRemarkResp, err error) { - - if err = CallbackBeforeSetFriendRemark(ctx, &s.config.WebhooksConfig, req); err != nil && err != servererrs.ErrCallbackContinue { + if err = s.webhookBeforeSetFriendRemark(ctx, &s.config.WebhooksConfig.BeforeSetFriendRemark, req); err != nil && err != servererrs.ErrCallbackContinue { return nil, err } resp = &pbfriend.SetFriendRemarkResp{} @@ -268,9 +266,7 @@ func (s *friendServer) SetFriendRemark(ctx context.Context, req *pbfriend.SetFri if err := s.friendDatabase.UpdateRemark(ctx, req.OwnerUserID, req.FriendUserID, req.Remark); err != nil { return nil, err } - if err := CallbackAfterSetFriendRemark(ctx, &s.config.WebhooksConfig, req); err != nil && err != servererrs.ErrCallbackContinue { - return nil, err - } + s.webhookAfterSetFriendRemark(ctx, &s.config.WebhooksConfig.AfterSetFriendRemark, req) s.notificationSender.FriendRemarkSetNotification(ctx, req.OwnerUserID, req.FriendUserID) return resp, nil } diff --git a/internal/rpc/group/callback.go b/internal/rpc/group/callback.go index 34701c8f3..51ff1e0d8 100644 --- a/internal/rpc/group/callback.go +++ b/internal/rpc/group/callback.go @@ -16,32 +16,21 @@ package group import ( "context" - "github.com/openimsdk/tools/utils/datautil" - "time" - "github.com/openimsdk/open-im-server/v3/pkg/apistruct" "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" - "github.com/openimsdk/open-im-server/v3/pkg/common/http" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/group" - pbgroup "github.com/openimsdk/protocol/group" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" + "github.com/openimsdk/tools/utils/datautil" "google.golang.org/protobuf/types/known/wrapperspb" + "time" ) -type GroupEventCallbackConfig struct { - CallbackUrl string - BeforeCreateGroup config.WebhookConfig -} - // CallbackBeforeCreateGroup callback before create group. -func CallbackBeforeCreateGroup(ctx context.Context, cfg *GroupEventCallbackConfig, req *group.CreateGroupReq) error { - if !cfg.BeforeCreateGroup.Enable { - return nil - } +func (s *groupServer) webhookBeforeCreateGroup(ctx context.Context, before *config.BeforeConfig, req *group.CreateGroupReq) error { cbReq := &callbackstruct.CallbackBeforeCreateGroupReq{ CallbackCommand: callbackstruct.CallbackBeforeCreateGroupCommand, OperationID: mcontext.GetOperationID(ctx), @@ -65,9 +54,10 @@ func CallbackBeforeCreateGroup(ctx context.Context, cfg *GroupEventCallbackConfi } resp := &callbackstruct.CallbackBeforeCreateGroupResp{} - if err := http.CallBackPostReturn(ctx, cfg.CallbackUrl, cbReq, resp, cfg.BeforeCreateGroup); err != nil { + if err := s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil { return err } + datautil.NotNilReplace(&req.GroupInfo.GroupID, resp.GroupID) datautil.NotNilReplace(&req.GroupInfo.GroupName, resp.GroupName) datautil.NotNilReplace(&req.GroupInfo.Notification, resp.Notification) @@ -83,11 +73,7 @@ func CallbackBeforeCreateGroup(ctx context.Context, cfg *GroupEventCallbackConfi return nil } -func CallbackAfterCreateGroup(ctx context.Context, cfg *GroupEventCallbackConfig, req *group.CreateGroupReq) error { - if !cfg.BeforeCreateGroup.Enable { - return nil - } - +func (s *groupServer) webhookAfterCreateGroup(ctx context.Context, after *config.AfterConfig, req *group.CreateGroupReq) { cbReq := &callbackstruct.CallbackAfterCreateGroupReq{ CallbackCommand: callbackstruct.CallbackAfterCreateGroupCommand, GroupInfo: req.GroupInfo, @@ -108,15 +94,11 @@ func CallbackAfterCreateGroup(ctx context.Context, cfg *GroupEventCallbackConfig RoleLevel: constant.GroupOrdinaryUsers, }) } - resp := &callbackstruct.CallbackAfterCreateGroupResp{} - return http.CallBackPostReturn(ctx, cfg.CallbackUrl, cbReq, resp, cfg.BeforeCreateGroup) + s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackAfterCreateGroupResp{}, after) } -func CallbackBeforeMemberJoinGroup(ctx context.Context, cfg *GroupEventCallbackConfig, groupMember *relation.GroupMemberModel, groupEx string) error { - if !cfg.BeforeCreateGroup.Enable { - return nil - } - callbackReq := &callbackstruct.CallbackBeforeMemberJoinGroupReq{ +func (s *groupServer) webhookBeforeMemberJoinGroup(ctx context.Context, before *config.BeforeConfig, groupMember *relation.GroupMemberModel, groupEx string) error { + cbReq := &callbackstruct.CallbackBeforeMemberJoinGroupReq{ CallbackCommand: callbackstruct.CallbackBeforeMemberJoinGroupCommand, GroupID: groupMember.GroupID, UserID: groupMember.UserID, @@ -124,15 +106,13 @@ func CallbackBeforeMemberJoinGroup(ctx context.Context, cfg *GroupEventCallbackC GroupEx: groupEx, } resp := &callbackstruct.CallbackBeforeMemberJoinGroupResp{} - - if err := http.CallBackPostReturn(ctx, cfg.CallbackUrl, callbackReq, resp, cfg.BeforeCreateGroup); err != nil { + if err := s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil { return err } if resp.MuteEndTime != nil { groupMember.MuteEndTime = time.UnixMilli(*resp.MuteEndTime) } - datautil.NotNilReplace(&groupMember.FaceURL, resp.FaceURL) datautil.NotNilReplace(&groupMember.Ex, resp.Ex) datautil.NotNilReplace(&groupMember.Nickname, resp.Nickname) @@ -140,31 +120,26 @@ func CallbackBeforeMemberJoinGroup(ctx context.Context, cfg *GroupEventCallbackC return nil } -func CallbackBeforeSetGroupMemberInfo(ctx context.Context, cfg *GroupEventCallbackConfig, req *group.SetGroupMemberInfo) error { - if !cfg.BeforeCreateGroup.Enable { - return nil - } - - callbackReq := callbackstruct.CallbackBeforeSetGroupMemberInfoReq{ +func (s *groupServer) webhookBeforeSetGroupMemberInfo(ctx context.Context, before *config.BeforeConfig, req *group.SetGroupMemberInfo) error { + cbReq := callbackstruct.CallbackBeforeSetGroupMemberInfoReq{ CallbackCommand: callbackstruct.CallbackBeforeSetGroupMemberInfoCommand, GroupID: req.GroupID, UserID: req.UserID, } if req.Nickname != nil { - callbackReq.Nickname = &req.Nickname.Value + cbReq.Nickname = &req.Nickname.Value } if req.FaceURL != nil { - callbackReq.FaceURL = &req.FaceURL.Value + cbReq.FaceURL = &req.FaceURL.Value } if req.RoleLevel != nil { - callbackReq.RoleLevel = &req.RoleLevel.Value + cbReq.RoleLevel = &req.RoleLevel.Value } if req.Ex != nil { - callbackReq.Ex = &req.Ex.Value + cbReq.Ex = &req.Ex.Value } resp := &callbackstruct.CallbackBeforeSetGroupMemberInfoResp{} - err := http.CallBackPostReturn(ctx, cfg.CallbackUrl, callbackReq, resp, cfg.BeforeCreateGroup) - if err != nil { + if err := s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil { return err } if resp.FaceURL != nil { @@ -182,112 +157,71 @@ func CallbackBeforeSetGroupMemberInfo(ctx context.Context, cfg *GroupEventCallba return nil } -func CallbackAfterSetGroupMemberInfo(ctx context.Context, cfg *GroupEventCallbackConfig, req *group.SetGroupMemberInfo) error { - if !cfg.BeforeCreateGroup.Enable { - return nil - } - callbackReq := callbackstruct.CallbackAfterSetGroupMemberInfoReq{ +func (s *groupServer) webhookAfterSetGroupMemberInfo(ctx context.Context, after *config.AfterConfig, req *group.SetGroupMemberInfo) { + cbReq := callbackstruct.CallbackAfterSetGroupMemberInfoReq{ CallbackCommand: callbackstruct.CallbackAfterSetGroupMemberInfoCommand, GroupID: req.GroupID, UserID: req.UserID, } if req.Nickname != nil { - callbackReq.Nickname = &req.Nickname.Value + cbReq.Nickname = &req.Nickname.Value } if req.FaceURL != nil { - callbackReq.FaceURL = &req.FaceURL.Value + cbReq.FaceURL = &req.FaceURL.Value } if req.RoleLevel != nil { - callbackReq.RoleLevel = &req.RoleLevel.Value + cbReq.RoleLevel = &req.RoleLevel.Value } if req.Ex != nil { - callbackReq.Ex = &req.Ex.Value + cbReq.Ex = &req.Ex.Value } - resp := &callbackstruct.CallbackAfterSetGroupMemberInfoResp{} - return http.CallBackPostReturn(ctx, cfg.CallbackUrl, callbackReq, resp, cfg.BeforeCreateGroup) + s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackAfterSetGroupMemberInfoResp{}, after) } -func CallbackQuitGroup(ctx context.Context, cfg *GroupEventCallbackConfig, req *group.QuitGroupReq) error { - if !cfg.BeforeCreateGroup.Enable { - return nil - } +func (s *groupServer) webhookAfterQuitGroup(ctx context.Context, after *config.AfterConfig, req *group.QuitGroupReq) { cbReq := &callbackstruct.CallbackQuitGroupReq{ - CallbackCommand: callbackstruct.CallbackQuitGroupCommand, + CallbackCommand: callbackstruct.CallbackAfterQuitGroupCommand, GroupID: req.GroupID, UserID: req.UserID, } - resp := &callbackstruct.CallbackQuitGroupResp{} - return http.CallBackPostReturn(ctx, cfg.CallbackUrl, cbReq, resp, cfg.BeforeCreateGroup) + s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackQuitGroupResp{}, after) } -func CallbackKillGroupMember(ctx context.Context, cfg *GroupEventCallbackConfig, req *pbgroup.KickGroupMemberReq) (err error) { - if !cfg.BeforeCreateGroup.Enable { - return nil - } +func (s *groupServer) webhookAfterKickGroupMember(ctx context.Context, after *config.AfterConfig, req *group.KickGroupMemberReq) { cbReq := &callbackstruct.CallbackKillGroupMemberReq{ - CallbackCommand: callbackstruct.CallbackKillGroupCommand, + CallbackCommand: callbackstruct.CallbackAfterKickGroupCommand, GroupID: req.GroupID, KickedUserIDs: req.KickedUserIDs, } - resp := &callbackstruct.CallbackKillGroupMemberResp{} - if err = http.CallBackPostReturn(ctx, cfg.CallbackUrl, cbReq, resp, cfg.BeforeCreateGroup); err != nil { - return err - } - return nil + s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackKillGroupMemberResp{}, after) } -func CallbackDismissGroup(ctx context.Context, cfg *GroupEventCallbackConfig, req *callbackstruct.CallbackDisMissGroupReq) (err error) { - if !cfg.BeforeCreateGroup.Enable { - return nil - } - req.CallbackCommand = callbackstruct.CallbackDisMissGroupCommand - resp := &callbackstruct.CallbackDisMissGroupResp{} - if err = http.CallBackPostReturn(ctx, cfg.CallbackUrl, req, resp, cfg.BeforeCreateGroup); err != nil { - return err - } - return nil +func (s *groupServer) webhookAfterDismissGroup(ctx context.Context, after *config.AfterConfig, req *callbackstruct.CallbackDisMissGroupReq) { + req.CallbackCommand = callbackstruct.CallbackAfterDisMissGroupCommand + s.webhookClient.AsyncPost(ctx, req.GetCallbackCommand(), req, &callbackstruct.CallbackDisMissGroupResp{}, after) } -func CallbackApplyJoinGroupBefore(ctx context.Context, cfg *GroupEventCallbackConfig, req *callbackstruct.CallbackJoinGroupReq) (err error) { - if !cfg.BeforeCreateGroup.Enable { - return nil - } - +func (s *groupServer) webhookBeforeApplyJoinGroup(ctx context.Context, before *config.BeforeConfig, req *callbackstruct.CallbackJoinGroupReq) (err error) { req.CallbackCommand = callbackstruct.CallbackBeforeJoinGroupCommand - resp := &callbackstruct.CallbackJoinGroupResp{} - if err = http.CallBackPostReturn(ctx, cfg.CallbackUrl, req, resp, cfg.BeforeCreateGroup); err != nil { + if err := s.webhookClient.SyncPost(ctx, req.GetCallbackCommand(), req, resp, before); err != nil { return err } - return nil } -func CallbackAfterTransferGroupOwner(ctx context.Context, cfg *GroupEventCallbackConfig, req *pbgroup.TransferGroupOwnerReq) (err error) { - if !cfg.BeforeCreateGroup.Enable { - return nil - } - +func (s *groupServer) webhookAfterTransferGroupOwner(ctx context.Context, after *config.AfterConfig, req *group.TransferGroupOwnerReq) { cbReq := &callbackstruct.CallbackTransferGroupOwnerReq{ - CallbackCommand: callbackstruct.CallbackAfterTransferGroupOwner, + CallbackCommand: callbackstruct.CallbackAfterTransferGroupOwnerCommand, GroupID: req.GroupID, OldOwnerUserID: req.OldOwnerUserID, NewOwnerUserID: req.NewOwnerUserID, } - - resp := &callbackstruct.CallbackTransferGroupOwnerResp{} - if err = http.CallBackPostReturn(ctx, cfg.CallbackUrl, cbReq, resp, cfg.BeforeCreateGroup); err != nil { - return err - } - return nil + s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackTransferGroupOwnerResp{}, after) } -func CallbackBeforeInviteUserToGroup(ctx context.Context, cfg *GroupEventCallbackConfig, req *group.InviteUserToGroupReq) (err error) { - if !cfg.BeforeCreateGroup.Enable { - return nil - } - - callbackReq := &callbackstruct.CallbackBeforeInviteUserToGroupReq{ +func (s *groupServer) webhookBeforeInviteUserToGroup(ctx context.Context, before *config.BeforeConfig, req *group.InviteUserToGroupReq) (err error) { + cbReq := &callbackstruct.CallbackBeforeInviteUserToGroupReq{ CallbackCommand: callbackstruct.CallbackBeforeInviteJoinGroupCommand, OperationID: mcontext.GetOperationID(ctx), GroupID: req.GroupID, @@ -296,15 +230,7 @@ func CallbackBeforeInviteUserToGroup(ctx context.Context, cfg *GroupEventCallbac } resp := &callbackstruct.CallbackBeforeInviteUserToGroupResp{} - err = http.CallBackPostReturn( - ctx, - cfg.CallbackUrl, - callbackReq, - resp, - cfg.BeforeCreateGroup, - ) - - if err != nil { + if err := s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil { return err } @@ -315,11 +241,8 @@ func CallbackBeforeInviteUserToGroup(ctx context.Context, cfg *GroupEventCallbac return nil } -func CallbackAfterJoinGroup(ctx context.Context, cfg *GroupEventCallbackConfig, req *group.JoinGroupReq) error { - if !cfg.BeforeCreateGroup.Enable { - return nil - } - callbackReq := &callbackstruct.CallbackAfterJoinGroupReq{ +func (s *groupServer) webhookAfterJoinGroup(ctx context.Context, after *config.AfterConfig, req *group.JoinGroupReq) { + cbReq := &callbackstruct.CallbackAfterJoinGroupReq{ CallbackCommand: callbackstruct.CallbackAfterJoinGroupCommand, OperationID: mcontext.GetOperationID(ctx), GroupID: req.GroupID, @@ -327,18 +250,12 @@ func CallbackAfterJoinGroup(ctx context.Context, cfg *GroupEventCallbackConfig, JoinSource: req.JoinSource, InviterUserID: req.InviterUserID, } - resp := &callbackstruct.CallbackAfterJoinGroupResp{} - if err := http.CallBackPostReturn(ctx, cfg.CallbackUrl, callbackReq, resp, cfg.BeforeCreateGroup); err != nil { - return err - } - return nil + s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackAfterJoinGroupResp{}, after) } -func CallbackBeforeSetGroupInfo(ctx context.Context, cfg *GroupEventCallbackConfig, req *group.SetGroupInfoReq) error { - if !cfg.BeforeCreateGroup.Enable { - return nil - } - callbackReq := &callbackstruct.CallbackBeforeSetGroupInfoReq{ +func (s *groupServer) webhookBeforeSetGroupInfo(ctx context.Context, before *config.BeforeConfig, req *group.SetGroupInfoReq) error { + + cbReq := &callbackstruct.CallbackBeforeSetGroupInfoReq{ CallbackCommand: callbackstruct.CallbackBeforeSetGroupInfoCommand, GroupID: req.GroupInfoForSet.GroupID, Notification: req.GroupInfoForSet.Notification, @@ -346,23 +263,22 @@ func CallbackBeforeSetGroupInfo(ctx context.Context, cfg *GroupEventCallbackConf FaceURL: req.GroupInfoForSet.FaceURL, GroupName: req.GroupInfoForSet.GroupName, } - if req.GroupInfoForSet.Ex != nil { - callbackReq.Ex = req.GroupInfoForSet.Ex.Value + cbReq.Ex = req.GroupInfoForSet.Ex.Value } - log.ZDebug(ctx, "debug CallbackBeforeSetGroupInfo", callbackReq.Ex) + log.ZDebug(ctx, "debug CallbackBeforeSetGroupInfo", cbReq.Ex) if req.GroupInfoForSet.NeedVerification != nil { - callbackReq.NeedVerification = req.GroupInfoForSet.NeedVerification.Value + cbReq.NeedVerification = req.GroupInfoForSet.NeedVerification.Value } if req.GroupInfoForSet.LookMemberInfo != nil { - callbackReq.LookMemberInfo = req.GroupInfoForSet.LookMemberInfo.Value + cbReq.LookMemberInfo = req.GroupInfoForSet.LookMemberInfo.Value } if req.GroupInfoForSet.ApplyMemberFriend != nil { - callbackReq.ApplyMemberFriend = req.GroupInfoForSet.ApplyMemberFriend.Value + cbReq.ApplyMemberFriend = req.GroupInfoForSet.ApplyMemberFriend.Value } resp := &callbackstruct.CallbackBeforeSetGroupInfoResp{} - if err := http.CallBackPostReturn(ctx, cfg.CallbackUrl, callbackReq, resp, cfg.BeforeCreateGroup); err != nil { + if err := s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil { return err } @@ -385,11 +301,8 @@ func CallbackBeforeSetGroupInfo(ctx context.Context, cfg *GroupEventCallbackConf return nil } -func CallbackAfterSetGroupInfo(ctx context.Context, cfg *GroupEventCallbackConfig, req *group.SetGroupInfoReq) error { - if !cfg.BeforeCreateGroup.Enable { - return nil - } - callbackReq := &callbackstruct.CallbackAfterSetGroupInfoReq{ +func (s *groupServer) webhookAfterSetGroupInfo(ctx context.Context, after *config.AfterConfig, req *group.SetGroupInfoReq) { + cbReq := &callbackstruct.CallbackAfterSetGroupInfoReq{ CallbackCommand: callbackstruct.CallbackAfterSetGroupInfoCommand, GroupID: req.GroupInfoForSet.GroupID, Notification: req.GroupInfoForSet.Notification, @@ -398,17 +311,16 @@ func CallbackAfterSetGroupInfo(ctx context.Context, cfg *GroupEventCallbackConfi GroupName: req.GroupInfoForSet.GroupName, } if req.GroupInfoForSet.Ex != nil { - callbackReq.Ex = &req.GroupInfoForSet.Ex.Value + cbReq.Ex = &req.GroupInfoForSet.Ex.Value } if req.GroupInfoForSet.NeedVerification != nil { - callbackReq.NeedVerification = &req.GroupInfoForSet.NeedVerification.Value + cbReq.NeedVerification = &req.GroupInfoForSet.NeedVerification.Value } if req.GroupInfoForSet.LookMemberInfo != nil { - callbackReq.LookMemberInfo = &req.GroupInfoForSet.LookMemberInfo.Value + cbReq.LookMemberInfo = &req.GroupInfoForSet.LookMemberInfo.Value } if req.GroupInfoForSet.ApplyMemberFriend != nil { - callbackReq.ApplyMemberFriend = &req.GroupInfoForSet.ApplyMemberFriend.Value + cbReq.ApplyMemberFriend = &req.GroupInfoForSet.ApplyMemberFriend.Value } - resp := &callbackstruct.CallbackAfterSetGroupInfoResp{} - return http.CallBackPostReturn(ctx, cfg.CallbackUrl, callbackReq, resp, cfg.BeforeCreateGroup) + s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackAfterSetGroupInfoResp{}, after) } diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 44e22a10e..fd9eb6873 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -19,6 +19,8 @@ import ( "fmt" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" + "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" + "github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue" "math/big" "math/rand" "strconv" @@ -53,6 +55,11 @@ import ( "google.golang.org/protobuf/types/known/wrapperspb" ) +const ( + webhookWorkerCount = 2 + webhookBufferSize = 100 +) + type groupServer struct { db controller.GroupDatabase user rpcclient.UserRpcClient @@ -60,6 +67,7 @@ type groupServer struct { conversationRpcClient rpcclient.ConversationRpcClient msgRpcClient rpcclient.MessageRpcClient config *Config + webhookClient *webhook.Client } type Config struct { @@ -112,6 +120,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg gs.conversationRpcClient = conversationRpcClient gs.msgRpcClient = msgRpcClient gs.config = config + gs.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL, memAsyncQueue.NewMemoryQueue(webhookWorkerCount, webhookBufferSize)) pbgroup.RegisterGroupServer(server, &gs) return nil } @@ -229,13 +238,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR return nil, servererrs.ErrUserIDNotFound.WrapMsg("user not found") } - config := &GroupEventCallbackConfig{ - CallbackUrl: s.config.WebhooksConfig.URL, - BeforeCreateGroup: s.config.WebhooksConfig.BeforeCreateGroup, - } - - // Callback Before create Group - if err := CallbackBeforeCreateGroup(ctx, config, req); err != nil { + if err := s.webhookBeforeCreateGroup(ctx, &s.config.WebhooksConfig.BeforeCreateGroup, req); err != nil && err != servererrs.ErrCallbackContinue { return nil, err } @@ -245,11 +248,6 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR return nil, err } - beforeCreateGroupConfig := &GroupEventCallbackConfig{ - CallbackUrl: s.config.WebhooksConfig.URL, - BeforeCreateGroup: s.config.WebhooksConfig.BeforeMemberJoinGroup, - } - joinGroup := func(userID string, roleLevel int32) error { groupMember := &relationtb.GroupMemberModel{ GroupID: group.GroupID, @@ -262,7 +260,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR MuteEndTime: time.UnixMilli(0), } - if err := CallbackBeforeMemberJoinGroup(ctx, beforeCreateGroupConfig, groupMember, group.Ex); err != nil { + if err := s.webhookBeforeMemberJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, groupMember, group.Ex); err != nil && err != servererrs.ErrCallbackContinue { return err } groupMembers = append(groupMembers, groupMember) @@ -331,9 +329,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR AdminUserIDs: req.AdminUserIDs, } - if err := CallbackAfterCreateGroup(ctx, beforeCreateGroupConfig, reqCallBackAfter); err != nil { - return nil, err - } + s.webhookAfterCreateGroup(ctx, &s.config.WebhooksConfig.AfterCreateGroup, reqCallBackAfter) return resp, nil } @@ -423,12 +419,7 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite } } - beforeInviteUserToGroupConfig := &GroupEventCallbackConfig{ - CallbackUrl: s.config.WebhooksConfig.URL, - BeforeCreateGroup: s.config.WebhooksConfig.BeforeInviteUserToGroup, - } - - if err := CallbackBeforeInviteUserToGroup(ctx, beforeInviteUserToGroupConfig, req); err != nil { + if err := s.webhookBeforeInviteUserToGroup(ctx, &s.config.WebhooksConfig.BeforeInviteUserToGroup, req); err != nil && err != servererrs.ErrCallbackContinue { return nil, err } @@ -474,12 +465,11 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite MuteEndTime: time.UnixMilli(0), } - beforeMemberJoinGroupConfig := beforeInviteUserToGroupConfig - - if err := CallbackBeforeMemberJoinGroup(ctx, beforeMemberJoinGroupConfig, member, group.Ex); err != nil { + if err := s.webhookBeforeMemberJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, groupMember, group.Ex); err != nil && err != servererrs.ErrCallbackContinue { return nil, err } groupMembers = append(groupMembers, member) + } if err := s.db.CreateGroup(ctx, nil, groupMembers); err != nil { return nil, err @@ -647,15 +637,8 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbgroup.KickGrou if err := s.deleteMemberAndSetConversationSeq(ctx, req.GroupID, req.KickedUserIDs); err != nil { return nil, err } + s.webhookAfterKickGroupMember(ctx, &s.config.WebhooksConfig.AfterKickGroupMember, req) - killGroupMemberConfig := &GroupEventCallbackConfig{ - CallbackUrl: s.config.WebhooksConfig.URL, - BeforeCreateGroup: s.config.WebhooksConfig.BeforeMemberJoinGroup, - } - - if err := CallbackKillGroupMember(ctx, killGroupMemberConfig, req); err != nil { - return nil, err - } return &pbgroup.KickGroupMemberResp{}, nil } @@ -823,13 +806,7 @@ func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbgroup OperatorUserID: mcontext.GetOpUserID(ctx), Ex: groupRequest.Ex, } - - beforeMemberJoinGroupConfig := &GroupEventCallbackConfig{ - CallbackUrl: s.config.WebhooksConfig.URL, - BeforeCreateGroup: s.config.WebhooksConfig.BeforeMemberJoinGroup, - } - - if err = CallbackBeforeMemberJoinGroup(ctx, beforeMemberJoinGroupConfig, member, group.Ex); err != nil { + if err := s.webhookBeforeMemberJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, member, group.Ex); err != nil && err != servererrs.ErrCallbackContinue { return nil, err } } @@ -876,14 +853,10 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq) Ex: req.Ex, } - applyJoinGroupBeforeConfig := &GroupEventCallbackConfig{ - CallbackUrl: s.config.WebhooksConfig.URL, - BeforeCreateGroup: s.config.WebhooksConfig.BeforeMemberJoinGroup, - } - - if err = CallbackApplyJoinGroupBefore(ctx, applyJoinGroupBeforeConfig, reqCall); err != nil { + if err := s.webhookBeforeApplyJoinGroup(ctx, &s.config.WebhooksConfig.BeforeApplyJoinGroup, reqCall); err != nil && err != servererrs.ErrCallbackContinue { return nil, err } + _, err = s.db.TakeGroupMember(ctx, req.GroupID, req.InviterUserID) if err == nil { return nil, errs.ErrArgs.Wrap() @@ -901,10 +874,11 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq) JoinTime: time.Now(), MuteEndTime: time.UnixMilli(0), } - MemberJoinGroupConfig := applyJoinGroupBeforeConfig - if err := CallbackBeforeMemberJoinGroup(ctx, MemberJoinGroupConfig, groupMember, group.Ex); err != nil { + + if err := s.webhookBeforeMemberJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, groupMember, group.Ex); err != nil && err != servererrs.ErrCallbackContinue { return nil, err } + if err := s.db.CreateGroup(ctx, nil, []*relationtb.GroupMemberModel{groupMember}); err != nil { return nil, err } @@ -913,10 +887,8 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq) return nil, err } s.notification.MemberEnterNotification(ctx, req.GroupID, req.InviterUserID) - afterJoinGroupConfig := applyJoinGroupBeforeConfig - if err = CallbackAfterJoinGroup(ctx, afterJoinGroupConfig, req); err != nil { - return nil, err - } + s.webhookAfterJoinGroup(ctx, &s.config.WebhooksConfig.AfterJoinGroup, req) + return &pbgroup.JoinGroupResp{}, nil } groupRequest := relationtb.GroupRequestModel{ @@ -961,15 +933,8 @@ func (s *groupServer) QuitGroup(ctx context.Context, req *pbgroup.QuitGroupReq) if err := s.deleteMemberAndSetConversationSeq(ctx, req.GroupID, []string{req.UserID}); err != nil { return nil, err } + s.webhookAfterQuitGroup(ctx, &s.config.WebhooksConfig.AfterQuitGroup, req) - quitGroupConfig := &GroupEventCallbackConfig{ - CallbackUrl: s.config.WebhooksConfig.URL, - BeforeCreateGroup: s.config.WebhooksConfig.BeforeMemberJoinGroup, - } - - if err := CallbackQuitGroup(ctx, quitGroupConfig, req); err != nil { - return nil, err - } return &pbgroup.QuitGroupResp{}, nil } @@ -998,14 +963,10 @@ func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf } } - beforeSetGroupInfoConfig := &GroupEventCallbackConfig{ - CallbackUrl: s.config.WebhooksConfig.URL, - BeforeCreateGroup: s.config.WebhooksConfig.BeforeMemberJoinGroup, - } - - if err := CallbackBeforeSetGroupInfo(ctx, beforeSetGroupInfoConfig, req); err != nil { + if err := s.webhookBeforeSetGroupInfo(ctx, &s.config.WebhooksConfig.BeforeSetGroupInfo, req); err != nil && err != servererrs.ErrCallbackContinue { return nil, err } + group, err := s.db.TakeGroup(ctx, req.GroupInfoForSet.GroupID) if err != nil { return nil, err @@ -1073,10 +1034,8 @@ func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf _ = s.notification.GroupInfoSetNotification(ctx, tips) } - afterSetGroupInfoConfig := beforeSetGroupInfoConfig - if err := CallbackAfterSetGroupInfo(ctx, afterSetGroupInfoConfig, req); err != nil { - return nil, err - } + s.webhookAfterSetGroupInfo(ctx, &s.config.WebhooksConfig.AfterSetGroupInfo, req) + return &pbgroup.SetGroupInfoResp{}, nil } @@ -1119,14 +1078,8 @@ func (s *groupServer) TransferGroupOwner(ctx context.Context, req *pbgroup.Trans return nil, err } - afterTransferGroupOwnerConfig := &GroupEventCallbackConfig{ - CallbackUrl: s.config.WebhooksConfig.URL, - BeforeCreateGroup: s.config.WebhooksConfig.BeforeMemberJoinGroup, - } + s.webhookAfterTransferGroupOwner(ctx, &s.config.WebhooksConfig.AfterTransferGroupOwner, req) - if err := CallbackAfterTransferGroupOwner(ctx, afterTransferGroupOwnerConfig, req); err != nil { - return nil, err - } s.notification.GroupOwnerTransferredNotification(ctx, req) return &pbgroup.TransferGroupOwnerResp{}, nil } @@ -1285,21 +1238,14 @@ func (s *groupServer) DismissGroup(ctx context.Context, req *pbgroup.DismissGrou if err != nil { return nil, err } - reqCall := &callbackstruct.CallbackDisMissGroupReq{ + cbReq := &callbackstruct.CallbackDisMissGroupReq{ GroupID: req.GroupID, OwnerID: owner.UserID, MembersID: membersID, GroupType: string(group.GroupType), } - dismissGroupConfig := &GroupEventCallbackConfig{ - CallbackUrl: s.config.WebhooksConfig.URL, - BeforeCreateGroup: s.config.WebhooksConfig.BeforeMemberJoinGroup, - } - - if err := CallbackDismissGroup(ctx, dismissGroupConfig, reqCall); err != nil { - return nil, err - } + s.webhookAfterDismissGroup(ctx, &s.config.WebhooksConfig.AfterDismissGroup, cbReq) return &pbgroup.DismissGroupResp{}, nil } @@ -1485,15 +1431,12 @@ func (s *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbgroup.SetGr } } - beforeSetGroupMemberInfoConfig := &GroupEventCallbackConfig{ - CallbackUrl: s.config.WebhooksConfig.URL, - BeforeCreateGroup: s.config.WebhooksConfig.BeforeMemberJoinGroup, - } - for i := 0; i < len(req.Members); i++ { - if err := CallbackBeforeSetGroupMemberInfo(ctx, beforeSetGroupMemberInfoConfig, req.Members[i]); err != nil { + + if err := s.webhookBeforeSetGroupMemberInfo(ctx, &s.config.WebhooksConfig.BeforeSetGroupMemberInfo, req.Members[i]); err != nil && err != servererrs.ErrCallbackContinue { return nil, err } + } if err := s.db.UpdateGroupMembers(ctx, datautil.Slice(req.Members, func(e *pbgroup.SetGroupMemberInfo) *relationtb.BatchUpdateGroupMember { return &relationtb.BatchUpdateGroupMember{ @@ -1517,11 +1460,8 @@ func (s *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbgroup.SetGr s.notification.GroupMemberInfoSetNotification(ctx, member.GroupID, member.UserID) } } - afterSetGroupMemberInfoConfig := beforeSetGroupMemberInfoConfig for i := 0; i < len(req.Members); i++ { - if err := CallbackAfterSetGroupMemberInfo(ctx, afterSetGroupMemberInfoConfig, req.Members[i]); err != nil { - return nil, err - } + s.webhookAfterSetGroupMemberInfo(ctx, &s.config.WebhooksConfig.AfterSetGroupMemberInfo, req.Members[i]) } return &pbgroup.SetGroupMemberInfoResp{}, nil diff --git a/internal/rpc/msg/as_read.go b/internal/rpc/msg/as_read.go index fab94f11d..8d001ff56 100644 --- a/internal/rpc/msg/as_read.go +++ b/internal/rpc/msg/as_read.go @@ -125,9 +125,7 @@ func (m *msgServer) MarkMsgsAsRead(ctx context.Context, req *msg.MarkMsgsAsReadR Seqs: req.Seqs, ContentType: conversation.ConversationType, } - if err := CallbackSingleMsgRead(ctx, &m.config.WebhooksConfig, reqCallback); err != nil { - return nil, err - } + m.webhookAfterSingleMsgRead(ctx, &m.config.WebhooksConfig.AfterSingleMsgRead, reqCallback) if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID, m.conversationAndGetRecvID(conversation, req.UserID), req.Seqs, hasReadSeq); err != nil { @@ -198,10 +196,8 @@ func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkCon UnreadMsgNum: req.HasReadSeq, ContentType: int64(conversation.ConversationType), } - if err := CallbackGroupMsgRead(ctx, &m.config.WebhooksConfig, reqCall); err != nil { - return nil, err - } + m.webhookAfterGroupMsgRead(ctx, &m.config.WebhooksConfig.AfterGroupMsgRead, reqCall) return &msg.MarkConversationAsReadResp{}, nil } diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go index e4d30de32..f6e3868ea 100644 --- a/internal/rpc/msg/callback.go +++ b/internal/rpc/msg/callback.go @@ -19,7 +19,6 @@ import ( cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/http" "github.com/openimsdk/protocol/constant" pbchat "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" @@ -61,77 +60,71 @@ func GetContent(msg *sdkws.MsgData) string { } } -func callbackBeforeSendSingleMsg(ctx context.Context, callback *config.Webhooks, msg *pbchat.SendMsgReq) error { - if !callback.BeforeSendSingleMsg.Enable || msg.MsgData.ContentType == constant.Typing { +func (m *msgServer) webhookBeforeSendSingleMsg(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error { + if msg.MsgData.ContentType == constant.Typing { return nil } - req := &cbapi.CallbackBeforeSendSingleMsgReq{ + cbReq := &cbapi.CallbackBeforeSendSingleMsgReq{ CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackBeforeSendSingleMsgCommand), RecvID: msg.MsgData.RecvID, } resp := &cbapi.CallbackBeforeSendSingleMsgResp{} - if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.BeforeSendSingleMsg); err != nil { + if err := m.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil { return err } + return nil } -func callbackAfterSendSingleMsg(ctx context.Context, callback *config.Webhooks, msg *pbchat.SendMsgReq) error { - if !callback.AfterSendSingleMsg.Enable || msg.MsgData.ContentType == constant.Typing { - return nil +func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) { + if msg.MsgData.ContentType == constant.Typing { + return } - req := &cbapi.CallbackAfterSendSingleMsgReq{ + cbReq := &cbapi.CallbackAfterSendSingleMsgReq{ CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), RecvID: msg.MsgData.RecvID, } - resp := &cbapi.CallbackAfterSendSingleMsgResp{} - if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.AfterSendSingleMsg); err != nil { - return err - } - return nil + m.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after) } -func callbackBeforeSendGroupMsg(ctx context.Context, callback *config.Webhooks, msg *pbchat.SendMsgReq) error { - if !callback.BeforeSendGroupMsg.Enable || msg.MsgData.ContentType == constant.Typing { +func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error { + if msg.MsgData.ContentType == constant.Typing { return nil } - req := &cbapi.CallbackBeforeSendGroupMsgReq{ + cbReq := &cbapi.CallbackBeforeSendGroupMsgReq{ CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackBeforeSendGroupMsgCommand), GroupID: msg.MsgData.GroupID, } resp := &cbapi.CallbackBeforeSendGroupMsgResp{} - if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.BeforeSendGroupMsg); err != nil { + if err := m.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil { return err } return nil } -func callbackAfterSendGroupMsg(ctx context.Context, callback *config.Webhooks, msg *pbchat.SendMsgReq) error { - if !callback.AfterSendGroupMsg.Enable || msg.MsgData.ContentType == constant.Typing { - return nil +func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) { + if msg.MsgData.ContentType == constant.Typing { + return } - req := &cbapi.CallbackAfterSendGroupMsgReq{ + cbReq := &cbapi.CallbackAfterSendGroupMsgReq{ CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand), GroupID: msg.MsgData.GroupID, } - resp := &cbapi.CallbackAfterSendGroupMsgResp{} - if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.AfterSendGroupMsg); err != nil { - return err - } - return nil + m.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after) } -func callbackMsgModify(ctx context.Context, callback *config.Webhooks, msg *pbchat.SendMsgReq) error { - if !callback.BeforeMsgModify.Enable || msg.MsgData.ContentType != constant.Text { +func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error { + if msg.MsgData.ContentType != constant.Text { return nil } - req := &cbapi.CallbackMsgModifyCommandReq{ - CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackMsgModifyCommand), + cbReq := &cbapi.CallbackMsgModifyCommandReq{ + CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackBeforeMsgModifyCommand), } resp := &cbapi.CallbackMsgModifyCommandResp{} - if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.BeforeMsgModify); err != nil { + if err := m.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil { return err } + if resp.Content != nil { msg.MsgData.Content = []byte(*resp.Content) } @@ -154,45 +147,25 @@ func callbackMsgModify(ctx context.Context, callback *config.Webhooks, msg *pbch return nil } -func CallbackGroupMsgRead(ctx context.Context, callback *config.Webhooks, req *cbapi.CallbackGroupMsgReadReq) error { - if !callback.AfterGroupMsgRead.Enable { - return nil - } - req.CallbackCommand = cbapi.CallbackGroupMsgReadCommand - - resp := &cbapi.CallbackGroupMsgReadResp{} - if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.AfterGroupMsgRead); err != nil { - return err - } - return nil +func (m *msgServer) webhookAfterGroupMsgRead(ctx context.Context, after *config.AfterConfig, req *cbapi.CallbackGroupMsgReadReq) { + req.CallbackCommand = cbapi.CallbackAfterGroupMsgReadCommand + m.webhookClient.AsyncPost(ctx, req.GetCallbackCommand(), req, &cbapi.CallbackGroupMsgReadResp{}, after) } -func CallbackSingleMsgRead(ctx context.Context, callback *config.Webhooks, req *cbapi.CallbackSingleMsgReadReq) error { - if !callback.AfterSingleMsgRead.Enable { - return nil - } - req.CallbackCommand = cbapi.CallbackSingleMsgRead +func (m *msgServer) webhookAfterSingleMsgRead(ctx context.Context, after *config.AfterConfig, req *cbapi.CallbackSingleMsgReadReq) { - resp := &cbapi.CallbackSingleMsgReadResp{} + req.CallbackCommand = cbapi.CallbackAfterSingleMsgReadCommand + + m.webhookClient.AsyncPost(ctx, req.GetCallbackCommand(), req, &cbapi.CallbackSingleMsgReadResp{}, after) - if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.AfterSingleMsgRead); err != nil { - return err - } - return nil } -func CallbackAfterRevokeMsg(ctx context.Context, callback *config.Webhooks, req *pbchat.RevokeMsgReq) error { - if !callback.AfterRevokeMsg.Enable { - return nil - } + +func (m *msgServer) webhookAfterRevokeMsg(ctx context.Context, after *config.AfterConfig, req *pbchat.RevokeMsgReq) { callbackReq := &cbapi.CallbackAfterRevokeMsgReq{ CallbackCommand: cbapi.CallbackAfterRevokeMsgCommand, ConversationID: req.ConversationID, Seq: req.Seq, UserID: req.UserID, } - resp := &cbapi.CallbackAfterRevokeMsgResp{} - if err := http.CallBackPostReturn(ctx, callback.URL, callbackReq, resp, callback.AfterRevokeMsg); err != nil { - return err - } - return nil + m.webhookClient.AsyncPost(ctx, callbackReq.GetCallbackCommand(), callbackReq, &cbapi.CallbackAfterRevokeMsgResp{}, after) } diff --git a/internal/rpc/msg/revoke.go b/internal/rpc/msg/revoke.go index f6be0722f..167e2d0aa 100644 --- a/internal/rpc/msg/revoke.go +++ b/internal/rpc/msg/revoke.go @@ -126,8 +126,6 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg. if err := m.notificationSender.NotificationWithSesstionType(ctx, req.UserID, recvID, constant.MsgRevokeNotification, msgs[0].SessionType, &tips); err != nil { return nil, err } - if err = CallbackAfterRevokeMsg(ctx, &m.config.WebhooksConfig, req); err != nil { - return nil, err - } + m.webhookAfterRevokeMsg(ctx, &m.config.WebhooksConfig.AfterRevokeMsg, req) return &msg.RevokeMsgResp{}, nil } diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index 999c24a67..9cefd65e0 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -59,10 +59,11 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *pbmsg.SendMs prommetrics.GroupChatMsgProcessFailedCounter.Inc() return nil, err } - if err = callbackBeforeSendGroupMsg(ctx, &m.config.WebhooksConfig, req); err != nil { + + if err = m.webhookBeforeSendGroupMsg(ctx, &m.config.WebhooksConfig.BeforeSendGroupMsg, req); err != nil { return nil, err } - if err := callbackMsgModify(ctx, &m.config.WebhooksConfig, req); err != nil { + if err := m.webhookBeforeMsgModify(ctx, &m.config.WebhooksConfig.BeforeMsgModify, req); err != nil { return nil, err } err = m.MsgDatabase.MsgToMQ(ctx, conversationutil.GenConversationUniqueKeyForGroup(req.MsgData.GroupID), req.MsgData) @@ -72,9 +73,8 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *pbmsg.SendMs if req.MsgData.ContentType == constant.AtText { go m.setConversationAtInfo(ctx, req.MsgData) } - if err = callbackAfterSendGroupMsg(ctx, &m.config.WebhooksConfig, req); err != nil { - log.ZWarn(ctx, "CallbackAfterSendGroupMsg", err) - } + + m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req) prommetrics.GroupChatMsgProcessSuccessCounter.Inc() resp = &pbmsg.SendMsgResp{} resp.SendTime = req.MsgData.SendTime @@ -157,21 +157,18 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq prommetrics.SingleChatMsgProcessFailedCounter.Inc() return nil, nil } else { - if err = callbackBeforeSendSingleMsg(ctx, &m.config.WebhooksConfig, req); err != nil { + if err = m.webhookBeforeSendSingleMsg(ctx, &m.config.WebhooksConfig.BeforeSendSingleMsg, req); err != nil { return nil, err } - - if err := callbackMsgModify(ctx, &m.config.WebhooksConfig, req); err != nil { + if err := m.webhookBeforeMsgModify(ctx, &m.config.WebhooksConfig.BeforeMsgModify, req); err != nil { return nil, err } + if err := m.MsgDatabase.MsgToMQ(ctx, conversationutil.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil { prommetrics.SingleChatMsgProcessFailedCounter.Inc() return nil, err } - err = callbackAfterSendSingleMsg(ctx, &m.config.WebhooksConfig, req) - if err != nil { - log.ZWarn(ctx, "CallbackAfterSendSingleMsg", err, "req", req) - } + m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req) prommetrics.SingleChatMsgProcessSuccessCounter.Inc() return &pbmsg.SendMsgResp{ ServerMsgID: req.MsgData.ServerMsgID, diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 1ccc493ac..c88170f1c 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -17,6 +17,8 @@ package msg import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" + "github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" @@ -32,6 +34,11 @@ import ( "google.golang.org/grpc" ) +const ( + webhookWorkerCount = 2 + webhookBufferSize = 100 +) + type ( // MessageInterceptorChain defines a chain of message interceptor functions. MessageInterceptorChain []MessageInterceptorFunc @@ -48,6 +55,7 @@ type ( Handlers MessageInterceptorChain // Chain of handlers for processing messages. notificationSender *rpcclient.NotificationSender // RPC client for sending notifications. config *Config // Global configuration settings. + webhookClient *webhook.Client } Config struct { @@ -101,6 +109,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg ConversationLocalCache: rpccache.NewConversationLocalCache(conversationClient, &config.LocalCacheConfig, rdb), FriendLocalCache: rpccache.NewFriendLocalCache(friendRpcClient, &config.LocalCacheConfig, rdb), config: config, + webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL, memAsyncQueue.NewMemoryQueue(webhookWorkerCount, webhookBufferSize)), } s.notificationSender = rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithLocalSendMsg(s.SendMsg)) diff --git a/internal/rpc/user/callback.go b/internal/rpc/user/callback.go index fa2f14be6..700a61941 100644 --- a/internal/rpc/user/callback.go +++ b/internal/rpc/user/callback.go @@ -20,14 +20,10 @@ import ( cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/http" pbuser "github.com/openimsdk/protocol/user" ) -func CallbackBeforeUpdateUserInfo(ctx context.Context, callback *config.Webhooks, req *pbuser.UpdateUserInfoReq) error { - if !callback.BeforeUpdateUserInfo.Enable { - return nil - } +func (s *userServer) webhookBeforeUpdateUserInfo(ctx context.Context, before *config.BeforeConfig, req *pbuser.UpdateUserInfoReq) error { cbReq := &cbapi.CallbackBeforeUpdateUserInfoReq{ CallbackCommand: cbapi.CallbackBeforeUpdateUserInfoCommand, UserID: req.UserInfo.UserID, @@ -35,34 +31,27 @@ func CallbackBeforeUpdateUserInfo(ctx context.Context, callback *config.Webhooks Nickname: &req.UserInfo.Nickname, } resp := &cbapi.CallbackBeforeUpdateUserInfoResp{} - if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.BeforeUpdateUserInfo); err != nil { + if err := s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil { return err } + datautil.NotNilReplace(&req.UserInfo.FaceURL, resp.FaceURL) datautil.NotNilReplace(&req.UserInfo.Ex, resp.Ex) datautil.NotNilReplace(&req.UserInfo.Nickname, resp.Nickname) return nil } -func CallbackAfterUpdateUserInfo(ctx context.Context, callback *config.Webhooks, req *pbuser.UpdateUserInfoReq) error { - if !callback.AfterUpdateUserInfo.Enable { - return nil - } + +func (s *userServer) webhookAfterUpdateUserInfo(ctx context.Context, after *config.AfterConfig, req *pbuser.UpdateUserInfoReq) { cbReq := &cbapi.CallbackAfterUpdateUserInfoReq{ CallbackCommand: cbapi.CallbackAfterUpdateUserInfoCommand, UserID: req.UserInfo.UserID, FaceURL: req.UserInfo.FaceURL, Nickname: req.UserInfo.Nickname, } - resp := &cbapi.CallbackAfterUpdateUserInfoResp{} - if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.AfterUpdateUserInfo); err != nil { - return err - } - return nil + s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterUpdateUserInfoResp{}, after) } -func CallbackBeforeUpdateUserInfoEx(ctx context.Context, callback *config.Webhooks, req *pbuser.UpdateUserInfoExReq) error { - if !callback.BeforeUpdateUserInfoEx.Enable { - return nil - } + +func (s *userServer) webhookBeforeUpdateUserInfoEx(ctx context.Context, before *config.BeforeConfig, req *pbuser.UpdateUserInfoExReq) error { cbReq := &cbapi.CallbackBeforeUpdateUserInfoExReq{ CallbackCommand: cbapi.CallbackBeforeUpdateUserInfoExCommand, UserID: req.UserInfo.UserID, @@ -70,35 +59,27 @@ func CallbackBeforeUpdateUserInfoEx(ctx context.Context, callback *config.Webhoo Nickname: req.UserInfo.Nickname, } resp := &cbapi.CallbackBeforeUpdateUserInfoExResp{} - if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.BeforeUpdateUserInfoEx); err != nil { + if err := s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil { return err } + datautil.NotNilReplace(req.UserInfo.FaceURL, resp.FaceURL) datautil.NotNilReplace(req.UserInfo.Ex, resp.Ex) datautil.NotNilReplace(req.UserInfo.Nickname, resp.Nickname) return nil } -func CallbackAfterUpdateUserInfoEx(ctx context.Context, callback *config.Webhooks, req *pbuser.UpdateUserInfoExReq) error { - if !callback.AfterUpdateUserInfoEx.Enable { - return nil - } + +func (s *userServer) webhookAfterUpdateUserInfoEx(ctx context.Context, after *config.AfterConfig, req *pbuser.UpdateUserInfoExReq) { cbReq := &cbapi.CallbackAfterUpdateUserInfoExReq{ CallbackCommand: cbapi.CallbackAfterUpdateUserInfoExCommand, UserID: req.UserInfo.UserID, FaceURL: req.UserInfo.FaceURL, Nickname: req.UserInfo.Nickname, } - resp := &cbapi.CallbackAfterUpdateUserInfoExResp{} - if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.AfterUpdateUserInfoEx); err != nil { - return err - } - return nil + s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterUpdateUserInfoExResp{}, after) } -func CallbackBeforeUserRegister(ctx context.Context, callback *config.Webhooks, req *pbuser.UserRegisterReq) error { - if !callback.BeforeUserRegister.Enable { - return nil - } +func (s *userServer) webhookBeforeUserRegister(ctx context.Context, before *config.BeforeConfig, req *pbuser.UserRegisterReq) error { cbReq := &cbapi.CallbackBeforeUserRegisterReq{ CallbackCommand: cbapi.CallbackBeforeUserRegisterCommand, Secret: req.Secret, @@ -106,28 +87,23 @@ func CallbackBeforeUserRegister(ctx context.Context, callback *config.Webhooks, } resp := &cbapi.CallbackBeforeUserRegisterResp{} - if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.BeforeUserRegister); err != nil { + + if err := s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil { return err } + if len(resp.Users) != 0 { req.Users = resp.Users } return nil } -func CallbackAfterUserRegister(ctx context.Context, callback *config.Webhooks, req *pbuser.UserRegisterReq) error { - if !callback.AfterUserRegister.Enable { - return nil - } +func (s *userServer) webhookAfterUserRegister(ctx context.Context, after *config.AfterConfig, req *pbuser.UserRegisterReq) { cbReq := &cbapi.CallbackAfterUserRegisterReq{ CallbackCommand: cbapi.CallbackAfterUserRegisterCommand, Secret: req.Secret, Users: req.Users, } - resp := &cbapi.CallbackAfterUserRegisterResp{} - if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.AfterUserRegister); err != nil { - return err - } - return nil + s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterUserRegisterResp{}, after) } diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 712de03cf..273a6ca42 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -17,6 +17,8 @@ package user import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" + "github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue" "github.com/openimsdk/tools/db/redisutil" "math/rand" "strings" @@ -44,6 +46,11 @@ import ( "google.golang.org/grpc" ) +const ( + webhookWorkerCount = 2 + webhookBufferSize = 100 +) + type userServer struct { db controller.UserDatabase friendNotificationSender *notification.FriendNotificationSender @@ -52,6 +59,7 @@ type userServer struct { groupRpcClient *rpcclient.GroupRpcClient RegisterCenter registry.SvcDiscoveryRegistry config *Config + webhookClient *webhook.Client } type Config struct { @@ -99,6 +107,7 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi friendNotificationSender: notification.NewFriendNotificationSender(&config.NotificationConfig, &msgRpcClient, notification.WithDBFunc(database.FindWithError)), userNotificationSender: NewUserNotificationSender(config, &msgRpcClient, WithUserFunc(database.FindWithError)), config: config, + webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL, memAsyncQueue.NewMemoryQueue(webhookWorkerCount, webhookBufferSize)), } pbuser.RegisterUserServer(server, u) return u.db.InitOnce(context.Background(), users) @@ -114,15 +123,21 @@ func (s *userServer) GetDesignateUsers(ctx context.Context, req *pbuser.GetDesig return resp, nil } +// deprecated: + +//UpdateUserInfo + func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserInfoReq) (resp *pbuser.UpdateUserInfoResp, err error) { resp = &pbuser.UpdateUserInfoResp{} err = authverify.CheckAccessV3(ctx, req.UserInfo.UserID, s.config.Share.IMAdminUserID) if err != nil { return nil, err } - if err := CallbackBeforeUpdateUserInfo(ctx, &s.config.WebhooksConfig, req); err != nil { + + if err := s.webhookBeforeUpdateUserInfo(ctx, &s.config.WebhooksConfig.BeforeUpdateUserInfo, req); err != nil { return nil, err } + data := convert.UserPb2DBMap(req.UserInfo) if err := s.db.UpdateByMap(ctx, req.UserInfo.UserID, data); err != nil { return nil, err @@ -140,9 +155,7 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI for _, friendID := range friends { s.friendNotificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, friendID) } - if err = CallbackAfterUpdateUserInfo(ctx, &s.config.WebhooksConfig, req); err != nil { - return nil, err - } + s.webhookAfterUpdateUserInfo(ctx, &s.config.WebhooksConfig.AfterUpdateUserInfo, req) if err = s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil { return nil, err } @@ -154,8 +167,7 @@ func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUse if err != nil { return nil, err } - - if err = CallbackBeforeUpdateUserInfoEx(ctx, &s.config.WebhooksConfig, req); err != nil { + if err = s.webhookBeforeUpdateUserInfoEx(ctx, &s.config.WebhooksConfig.BeforeUpdateUserInfoEx, req); err != nil { return nil, err } data := convert.UserPb2DBMapEx(req.UserInfo) @@ -175,9 +187,7 @@ func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUse for _, friendID := range friends { s.friendNotificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, friendID) } - if err := CallbackAfterUpdateUserInfoEx(ctx, &s.config.WebhooksConfig, req); err != nil { - return nil, err - } + s.webhookAfterUpdateUserInfoEx(ctx, &s.config.WebhooksConfig.AfterUpdateUserInfoEx, req) if err := s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil { return nil, err } @@ -273,7 +283,7 @@ func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterR if exist { return nil, servererrs.ErrRegisteredAlready.WrapMsg("userID registered already") } - if err := CallbackBeforeUserRegister(ctx, &s.config.WebhooksConfig, req); err != nil { + if err := s.webhookBeforeUserRegister(ctx, &s.config.WebhooksConfig.BeforeUserRegister, req); err != nil { return nil, err } now := time.Now() @@ -293,9 +303,7 @@ func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterR return nil, err } - if err := CallbackAfterUserRegister(ctx, &s.config.WebhooksConfig, req); err != nil { - return nil, err - } + s.webhookAfterUserRegister(ctx, &s.config.WebhooksConfig.AfterUserRegister, req) return resp, nil } diff --git a/pkg/callbackstruct/common.go b/pkg/callbackstruct/common.go index 1adb0112b..d6714f5f2 100644 --- a/pkg/callbackstruct/common.go +++ b/pkg/callbackstruct/common.go @@ -64,7 +64,7 @@ type CommonCallbackResp struct { } func (c CommonCallbackResp) Parse() error { - if c.ActionCode != servererrs.NoError || c.NextCode == Next { + if c.ActionCode == servererrs.NoError && c.NextCode == Next { return errs.NewCodeError(int(c.ErrCode), c.ErrMsg).WithDetail(c.ErrDlt) } return nil diff --git a/pkg/callbackstruct/constant.go b/pkg/callbackstruct/constant.go index f3bcf1383..66e1598cd 100644 --- a/pkg/callbackstruct/constant.go +++ b/pkg/callbackstruct/constant.go @@ -14,47 +14,44 @@ package callbackstruct -const CallbackBeforeInviteJoinGroupCommand = "callbackBeforeInviteJoinGroupCommand" -const CallbackAfterJoinGroupCommand = "callbackAfterJoinGroupCommand" -const CallbackAfterSetGroupInfoCommand = "callbackAfterSetGroupInfoCommand" -const CallbackBeforeSetGroupInfoCommand = "callbackBeforeSetGroupInfoCommand" - -const CallbackAfterRevokeMsgCommand = "callbackBeforeAfterMsgCommand" -const CallbackBeforeAddBlackCommand = "callbackBeforeAddBlackCommand" -const CallbackAfterAddFriendCommand = "callbackAfterAddFriendCommand" -const CallbackBeforeAddFriendAgreeCommand = "callbackBeforeAddFriendAgreeCommand" - -const CallbackAfterDeleteFriendCommand = "callbackAfterDeleteFriendCommand" -const CallbackBeforeImportFriendsCommand = "callbackBeforeImportFriendsCommand" -const CallbackAfterImportFriendsCommand = "callbackAfterImportFriendsCommand" -const CallbackAfterRemoveBlackCommand = "callbackAfterRemoveBlackCommand" - const ( - CallbackQuitGroupCommand = "callbackQuitGroupCommand" - CallbackKillGroupCommand = "callbackKillGroupCommand" - CallbackDisMissGroupCommand = "callbackDisMissGroupCommand" + CallbackBeforeInviteJoinGroupCommand = "callbackBeforeInviteJoinGroupCommand" + CallbackAfterJoinGroupCommand = "callbackAfterJoinGroupCommand" + CallbackAfterSetGroupInfoCommand = "callbackAfterSetGroupInfoCommand" + CallbackBeforeSetGroupInfoCommand = "callbackBeforeSetGroupInfoCommand" + CallbackAfterRevokeMsgCommand = "callbackBeforeAfterMsgCommand" + CallbackBeforeAddBlackCommand = "callbackBeforeAddBlackCommand" + CallbackAfterAddFriendCommand = "callbackAfterAddFriendCommand" + CallbackBeforeAddFriendAgreeCommand = "callbackBeforeAddFriendAgreeCommand" + CallbackAfterDeleteFriendCommand = "callbackAfterDeleteFriendCommand" + CallbackBeforeImportFriendsCommand = "callbackBeforeImportFriendsCommand" + CallbackAfterImportFriendsCommand = "callbackAfterImportFriendsCommand" + CallbackAfterRemoveBlackCommand = "callbackAfterRemoveBlackCommand" + CallbackAfterQuitGroupCommand = "callbackAfterQuitGroupCommand" + CallbackAfterKickGroupCommand = "callbackAfterKickGroupCommand" + CallbackAfterDisMissGroupCommand = "callbackAfterDisMissGroupCommand" CallbackBeforeJoinGroupCommand = "callbackBeforeJoinGroupCommand" - CallbackGroupMsgReadCommand = "callbackGroupMsgReadCommand" - CallbackMsgModifyCommand = "callbackMsgModifyCommand" + CallbackAfterGroupMsgReadCommand = "callbackAfterGroupMsgReadCommand" + CallbackBeforeMsgModifyCommand = "callbackBeforeMsgModifyCommand" CallbackAfterUpdateUserInfoCommand = "callbackAfterUpdateUserInfoCommand" CallbackAfterUpdateUserInfoExCommand = "callbackAfterUpdateUserInfoExCommand" CallbackBeforeUpdateUserInfoExCommand = "callbackBeforeUpdateUserInfoExCommand" CallbackBeforeUserRegisterCommand = "callbackBeforeUserRegisterCommand" CallbackAfterUserRegisterCommand = "callbackAfterUserRegisterCommand" - CallbackAfterTransferGroupOwner = "callbackAfterTransferGroupOwner" - CallbackBeforeSetFriendRemark = "callbackBeforeSetFriendRemark" - CallbackAfterSetFriendRemark = "callbackAfterSetFriendRemark" - CallbackSingleMsgRead = "callbackSingleMsgRead" + CallbackAfterTransferGroupOwnerCommand = "callbackAfterTransferGroupOwnerCommand" + CallbackBeforeSetFriendRemarkCommand = "callbackBeforeSetFriendRemarkCommand" + CallbackAfterSetFriendRemarkCommand = "callbackAfterSetFriendRemarkCommand" + CallbackAfterSingleMsgReadCommand = "callbackAfterSingleMsgReadCommand" CallbackBeforeSendSingleMsgCommand = "callbackBeforeSendSingleMsgCommand" CallbackAfterSendSingleMsgCommand = "callbackAfterSendSingleMsgCommand" CallbackBeforeSendGroupMsgCommand = "callbackBeforeSendGroupMsgCommand" CallbackAfterSendGroupMsgCommand = "callbackAfterSendGroupMsgCommand" - CallbackUserOnlineCommand = "callbackUserOnlineCommand" - CallbackUserOfflineCommand = "callbackUserOfflineCommand" - CallbackUserKickOffCommand = "callbackUserKickOffCommand" - CallbackOfflinePushCommand = "callbackOfflinePushCommand" - CallbackOnlinePushCommand = "callbackOnlinePushCommand" - CallbackSuperGroupOnlinePushCommand = "callbackSuperGroupOnlinePushCommand" + CallbackAfterUserOnlineCommand = "callbackAfterUserOnlineCommand" + CallbackAfterUserOfflineCommand = "callbackAfterUserOfflineCommand" + CallbackAfterUserKickOffCommand = "callbackAfterUserKickOffCommand" + CallbackBeforeOfflinePushCommand = "callbackBeforeOfflinePushCommand" + CallbackBeforeOnlinePushCommand = "callbackBeforeOnlinePushCommand" + CallbackBeforeGroupOnlinePushCommand = "callbackBeforeGroupOnlinePushCommand" CallbackBeforeAddFriendCommand = "callbackBeforeAddFriendCommand" CallbackBeforeUpdateUserInfoCommand = "callbackBeforeUpdateUserInfoCommand" CallbackBeforeCreateGroupCommand = "callbackBeforeCreateGroupCommand" diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 7bee2818a..002f6113e 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -339,12 +339,17 @@ type Redis struct { MaxRetry int `mapstructure:"MaxRetry"` } -type WebhookConfig struct { +type BeforeConfig struct { Enable bool `mapstructure:"enable"` Timeout int `mapstructure:"timeout"` FailedContinue bool `mapstructure:"failedContinue"` } +type AfterConfig struct { + Enable bool `mapstructure:"enable"` + Timeout int `mapstructure:"timeout"` +} + type Share struct { Secret string `mapstructure:"secret"` Env string `mapstructure:"env"` @@ -377,53 +382,54 @@ func (r *RpcRegisterName) GetServiceNames() []string { } } +// FullConfig stores all configurations for before and after events type Webhooks struct { - URL string `mapstructure:"url"` - BeforeSendSingleMsg WebhookConfig `mapstructure:"beforeSendSingleMsg"` - BeforeUpdateUserInfoEx WebhookConfig `mapstructure:"beforeUpdateUserInfoEx"` - AfterUpdateUserInfoEx WebhookConfig `mapstructure:"afterUpdateUserInfoEx"` - AfterSendSingleMsg WebhookConfig `mapstructure:"afterSendSingleMsg"` - BeforeSendGroupMsg WebhookConfig `mapstructure:"beforeSendGroupMsg"` - AfterSendGroupMsg WebhookConfig `mapstructure:"afterSendGroupMsg"` - BeforeMsgModify WebhookConfig `mapstructure:"beforeMsgModify"` - AfterUserOnline WebhookConfig `mapstructure:"afterUserOnline"` - AfterUserOffline WebhookConfig `mapstructure:"afterUserOffline"` - AfterUserKickOff WebhookConfig `mapstructure:"afterUserKickOff"` - BeforeOfflinePush WebhookConfig `mapstructure:"beforeOfflinePush"` - BeforeOnlinePush WebhookConfig `mapstructure:"beforeOnlinePush"` - BeforeGroupOnlinePush WebhookConfig `mapstructure:"beforeGroupOnlinePush"` - BeforeAddFriend WebhookConfig `mapstructure:"beforeAddFriend"` - BeforeUpdateUserInfo WebhookConfig `mapstructure:"beforeUpdateUserInfo"` - AfterUpdateUserInfo WebhookConfig `mapstructure:"afterUpdateUserInfo"` - BeforeCreateGroup WebhookConfig `mapstructure:"beforeCreateGroup"` - AfterCreateGroup WebhookConfig `mapstructure:"afterCreateGroup"` - BeforeMemberJoinGroup WebhookConfig `mapstructure:"beforeMemberJoinGroup"` - BeforeSetGroupMemberInfo WebhookConfig `mapstructure:"beforeSetGroupMemberInfo"` - AfterSetGroupMemberInfo WebhookConfig `mapstructure:"afterSetGroupMemberInfo"` - AfterQuitGroup WebhookConfig `mapstructure:"afterQuitGroup"` - AfterKickGroupMember WebhookConfig `mapstructure:"afterKickGroupMember"` - AfterDismissGroup WebhookConfig `mapstructure:"afterDismissGroup"` - BeforeApplyJoinGroup WebhookConfig `mapstructure:"beforeApplyJoinGroup"` - AfterGroupMsgRead WebhookConfig `mapstructure:"afterGroupMsgRead"` - AfterSingleMsgRead WebhookConfig `mapstructure:"afterSingleMsgRead"` - BeforeUserRegister WebhookConfig `mapstructure:"beforeUserRegister"` - AfterUserRegister WebhookConfig `mapstructure:"afterUserRegister"` - AfterTransferGroupOwner WebhookConfig `mapstructure:"afterTransferGroupOwner"` - BeforeSetFriendRemark WebhookConfig `mapstructure:"beforeSetFriendRemark"` - AfterSetFriendRemark WebhookConfig `mapstructure:"afterSetFriendRemark"` - AfterGroupMsgRevoke WebhookConfig `mapstructure:"afterGroupMsgRevoke"` - AfterJoinGroup WebhookConfig `mapstructure:"afterJoinGroup"` - BeforeInviteUserToGroup WebhookConfig `mapstructure:"beforeInviteUserToGroup"` - AfterSetGroupInfo WebhookConfig `mapstructure:"afterSetGroupInfo"` - BeforeSetGroupInfo WebhookConfig `mapstructure:"beforeSetGroupInfo"` - AfterRevokeMsg WebhookConfig `mapstructure:"afterRevokeMsg"` - BeforeAddBlack WebhookConfig `mapstructure:"beforeAddBlack"` - AfterAddFriend WebhookConfig `mapstructure:"afterAddFriend"` - BeforeAddFriendAgree WebhookConfig `mapstructure:"beforeAddFriendAgree"` - AfterDeleteFriend WebhookConfig `mapstructure:"afterDeleteFriend"` - BeforeImportFriends WebhookConfig `mapstructure:"beforeImportFriends"` - AfterImportFriends WebhookConfig `mapstructure:"afterImportFriends"` - AfterRemoveBlack WebhookConfig `mapstructure:"afterRemoveBlack"` + URL string `mapstructure:"url"` + BeforeSendSingleMsg BeforeConfig `mapstructure:"beforeSendSingleMsg"` + BeforeUpdateUserInfoEx BeforeConfig `mapstructure:"beforeUpdateUserInfoEx"` + AfterUpdateUserInfoEx AfterConfig `mapstructure:"afterUpdateUserInfoEx"` + AfterSendSingleMsg AfterConfig `mapstructure:"afterSendSingleMsg"` + BeforeSendGroupMsg BeforeConfig `mapstructure:"beforeSendGroupMsg"` + BeforeMsgModify BeforeConfig `mapstructure:"beforeMsgModify"` + AfterSendGroupMsg AfterConfig `mapstructure:"afterSendGroupMsg"` + AfterUserOnline AfterConfig `mapstructure:"afterUserOnline"` + AfterUserOffline AfterConfig `mapstructure:"afterUserOffline"` + AfterUserKickOff AfterConfig `mapstructure:"afterUserKickOff"` + BeforeOfflinePush BeforeConfig `mapstructure:"beforeOfflinePush"` + BeforeOnlinePush BeforeConfig `mapstructure:"beforeOnlinePush"` + BeforeGroupOnlinePush BeforeConfig `mapstructure:"beforeGroupOnlinePush"` + BeforeAddFriend BeforeConfig `mapstructure:"beforeAddFriend"` + BeforeUpdateUserInfo BeforeConfig `mapstructure:"beforeUpdateUserInfo"` + AfterUpdateUserInfo AfterConfig `mapstructure:"afterUpdateUserInfo"` + BeforeCreateGroup BeforeConfig `mapstructure:"beforeCreateGroup"` + AfterCreateGroup AfterConfig `mapstructure:"afterCreateGroup"` + BeforeMemberJoinGroup BeforeConfig `mapstructure:"beforeMemberJoinGroup"` + BeforeSetGroupMemberInfo BeforeConfig `mapstructure:"beforeSetGroupMemberInfo"` + AfterSetGroupMemberInfo AfterConfig `mapstructure:"afterSetGroupMemberInfo"` + AfterQuitGroup AfterConfig `mapstructure:"afterQuitGroup"` + AfterKickGroupMember AfterConfig `mapstructure:"afterKickGroupMember"` + AfterDismissGroup AfterConfig `mapstructure:"afterDismissGroup"` + BeforeApplyJoinGroup BeforeConfig `mapstructure:"beforeApplyJoinGroup"` + AfterGroupMsgRead AfterConfig `mapstructure:"afterGroupMsgRead"` + AfterSingleMsgRead AfterConfig `mapstructure:"afterSingleMsgRead"` + BeforeUserRegister BeforeConfig `mapstructure:"beforeUserRegister"` + AfterUserRegister AfterConfig `mapstructure:"afterUserRegister"` + AfterTransferGroupOwner AfterConfig `mapstructure:"afterTransferGroupOwner"` + BeforeSetFriendRemark BeforeConfig `mapstructure:"beforeSetFriendRemark"` + AfterSetFriendRemark AfterConfig `mapstructure:"afterSetFriendRemark"` + AfterGroupMsgRevoke AfterConfig `mapstructure:"afterGroupMsgRevoke"` + AfterJoinGroup AfterConfig `mapstructure:"afterJoinGroup"` + BeforeInviteUserToGroup BeforeConfig `mapstructure:"beforeInviteUserToGroup"` + AfterSetGroupInfo AfterConfig `mapstructure:"afterSetGroupInfo"` + BeforeSetGroupInfo BeforeConfig `mapstructure:"beforeSetGroupInfo"` + AfterRevokeMsg AfterConfig `mapstructure:"afterRevokeMsg"` + BeforeAddBlack BeforeConfig `mapstructure:"beforeAddBlack"` + AfterAddFriend AfterConfig `mapstructure:"afterAddFriend"` + BeforeAddFriendAgree BeforeConfig `mapstructure:"beforeAddFriendAgree"` + AfterDeleteFriend AfterConfig `mapstructure:"afterDeleteFriend"` + BeforeImportFriends BeforeConfig `mapstructure:"beforeImportFriends"` + AfterImportFriends AfterConfig `mapstructure:"afterImportFriends"` + AfterRemoveBlack AfterConfig `mapstructure:"afterRemoveBlack"` } type ZooKeeper struct { diff --git a/pkg/common/http/http_client.go b/pkg/common/http/http_client.go deleted file mode 100644 index e455cbed7..000000000 --- a/pkg/common/http/http_client.go +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package http - -import ( - "context" - "encoding/json" - "net/http" - - "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" - "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/utils/httputil" -) - -var ( - // Define http client. - client = httputil.NewHTTPClient(httputil.NewClientConfig()) -) - -func init() { - // reset http default transport - http.DefaultTransport.(*http.Transport).MaxConnsPerHost = 100 // default: 2 -} - -func callBackPostReturn(ctx context.Context, url, command string, input interface{}, output callbackstruct.CallbackResp, callbackConfig config.WebhookConfig) error { - url = url + "/" + command - log.ZInfo(ctx, "callback", "url", url, "input", input, "config", callbackConfig) - b, err := client.Post(ctx, url, nil, input, callbackConfig.Timeout) - if err != nil { - if callbackConfig.FailedContinue { - log.ZInfo(ctx, "callback failed but continue", err, "url", url) - return nil - } - log.ZWarn(ctx, "callback network failed", err, "url", url, "input", input) - return servererrs.ErrNetwork.WrapMsg(err.Error()) - } - if err = json.Unmarshal(b, output); err != nil { - if callbackConfig.FailedContinue { - log.ZWarn(ctx, "callback failed but continue", err, "url", url) - return nil - } - log.ZWarn(ctx, "callback json unmarshal failed", err, "url", url, "input", input, "response", string(b)) - return servererrs.ErrData.WithDetail(err.Error() + "response format error") - } - if err := output.Parse(); err != nil { - log.ZWarn(ctx, "callback parse failed", err, "url", url, "input", input, "response", string(b)) - } - log.ZInfo(ctx, "callback success", "url", url, "input", input, "response", string(b)) - return nil -} - -func CallBackPostReturn(ctx context.Context, url string, req callbackstruct.CallbackReq, resp callbackstruct.CallbackResp, callbackConfig config.WebhookConfig) error { - return callBackPostReturn(ctx, url, req.GetCallbackCommand(), req, resp, callbackConfig) -} diff --git a/pkg/common/http/doc.go b/pkg/common/webhook/doc.go similarity index 87% rename from pkg/common/http/doc.go rename to pkg/common/webhook/doc.go index 8368203f6..3a8e3e622 100644 --- a/pkg/common/http/doc.go +++ b/pkg/common/webhook/doc.go @@ -12,4 +12,4 @@ // See the License for the specific language governing permissions and // limitations under the License. -package http // import "github.com/openimsdk/open-im-server/v3/pkg/common/http" +package webhook // import "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" diff --git a/pkg/common/webhook/http_client.go b/pkg/common/webhook/http_client.go new file mode 100644 index 000000000..325308586 --- /dev/null +++ b/pkg/common/webhook/http_client.go @@ -0,0 +1,74 @@ +// Copyright © 2023 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package webhook + +import ( + "context" + "encoding/json" + "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" + "github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue" + "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/utils/httputil" + "net/http" +) + +type Client struct { + client *httputil.HTTPClient + url string + queue *memAsyncQueue.MemoryQueue +} + +func NewWebhookClient(url string, queue *memAsyncQueue.MemoryQueue) *Client { + http.DefaultTransport.(*http.Transport).MaxConnsPerHost = 100 // Enhance the default number of max connections per host + return &Client{ + client: httputil.NewHTTPClient(httputil.NewClientConfig()), + url: url, + queue: queue, + } +} + +func (c *Client) SyncPost(ctx context.Context, command string, req callbackstruct.CallbackReq, resp callbackstruct.CallbackResp, before *config.BeforeConfig) error { + if before.Enable { + return c.post(ctx, command, req, resp, before.Timeout) + } + return nil +} + +func (c *Client) AsyncPost(ctx context.Context, command string, req callbackstruct.CallbackReq, resp callbackstruct.CallbackResp, after *config.AfterConfig) { + if after.Enable { + c.queue.Push(func() { c.post(ctx, command, req, resp, after.Timeout) }) + } +} + +func (c *Client) post(ctx context.Context, command string, input interface{}, output callbackstruct.CallbackResp, timeout int) error { + fullURL := c.url + "/" + command + log.ZInfo(ctx, "webhook", "url", fullURL, "input", input, "config", timeout) + operationID, _ := ctx.Value(constant.OperationID).(string) + b, err := c.client.Post(ctx, fullURL, map[string]string{constant.OperationID: operationID}, input, timeout) + if err != nil { + return servererrs.ErrNetwork.WrapMsg(err.Error(), "post url", fullURL) + } + if err = json.Unmarshal(b, output); err != nil { + return servererrs.ErrData.WithDetail(err.Error() + " response format error") + } + if err := output.Parse(); err != nil { + return err + } + log.ZInfo(ctx, "webhook success", "url", fullURL, "input", input, "response", string(b)) + return nil +} diff --git a/pkg/common/http/http_client_test.go b/pkg/common/webhook/http_client_test.go similarity index 99% rename from pkg/common/http/http_client_test.go rename to pkg/common/webhook/http_client_test.go index b099929a0..3ebf81522 100644 --- a/pkg/common/http/http_client_test.go +++ b/pkg/common/webhook/http_client_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package http +package webhook import ( "context" diff --git a/pkg/util/memAsyncQueue/queue.go b/pkg/util/memAsyncQueue/queue.go new file mode 100644 index 000000000..c033116a9 --- /dev/null +++ b/pkg/util/memAsyncQueue/queue.go @@ -0,0 +1,74 @@ +package memAsyncQueue + +import ( + "errors" + "fmt" + "sync" + "time" +) + +// AsyncQueue is the interface responsible for asynchronous processing of functions. +type AsyncQueue interface { + Initialize(processFunc func(), workerCount int, bufferSize int) + Push(task func()) error +} + +// MemoryQueue is an implementation of the AsyncQueue interface using a channel to process functions. +type MemoryQueue struct { + taskChan chan func() + wg sync.WaitGroup + isStopped bool + stopMutex sync.Mutex // Mutex to protect access to isStopped +} + +func NewMemoryQueue(workerCount int, bufferSize int) *MemoryQueue { + mq := &MemoryQueue{} // Create a new instance of MemoryQueue + mq.Initialize(workerCount, bufferSize) // Initialize it with specified parameters + return mq +} + +// Initialize sets up the worker nodes and the buffer size of the channel, +// starting internal goroutines to handle tasks from the channel. +func (mq *MemoryQueue) Initialize(workerCount int, bufferSize int) { + mq.taskChan = make(chan func(), bufferSize) // Initialize the channel with the provided buffer size. + mq.isStopped = false + + // Start multiple goroutines based on the specified workerCount. + for i := 0; i < workerCount; i++ { + mq.wg.Add(1) + go func(workerID int) { + defer mq.wg.Done() + for task := range mq.taskChan { + fmt.Printf("Worker %d: Executing task\n", workerID) + task() // Execute the function + } + }(i) + } +} + +// Push submits a function to the queue. +// Returns an error if the queue is stopped or if the queue is full. +func (mq *MemoryQueue) Push(task func()) error { + mq.stopMutex.Lock() + if mq.isStopped { + mq.stopMutex.Unlock() + return errors.New("push failed: queue is stopped") + } + mq.stopMutex.Unlock() + + select { + case mq.taskChan <- task: + return nil + case <-time.After(time.Millisecond * 100): // Timeout to prevent deadlock/blocking + return errors.New("push failed: queue is full") + } +} + +// Stop is used to terminate the internal goroutines and close the channel. +func (mq *MemoryQueue) Stop() { + mq.stopMutex.Lock() + mq.isStopped = true + close(mq.taskChan) + mq.stopMutex.Unlock() + mq.wg.Wait() +} diff --git a/pkg/util/memAsyncQueue/queue_test.go b/pkg/util/memAsyncQueue/queue_test.go new file mode 100644 index 000000000..465b5ced9 --- /dev/null +++ b/pkg/util/memAsyncQueue/queue_test.go @@ -0,0 +1,91 @@ +package memAsyncQueue + +import ( + "testing" + "time" +) + +// TestPushSuccess tests the successful pushing of data into the queue. +func TestPushSuccess(t *testing.T) { + queue := &MemoryQueue{} + queue.Initialize(func(data any) {}, 1, 5) // Small buffer size for test + + // Try to push data that should succeed + err := queue.Push("test data") + if err != nil { + t.Errorf("Push should succeed, but got error: %v", err) + } +} + +// TestPushFailWhenFull tests that pushing to a full queue results in an error. +func TestPushFailWhenFull(t *testing.T) { + queue := &MemoryQueue{} + queue.Initialize(func(data any) { + time.Sleep(100 * time.Millisecond) // Simulate work to delay processing + }, 1, 1) // Very small buffer to fill quickly + + queue.Push("data 1") // Fill the buffer + err := queue.Push("data 2") // This should fail + + if err == nil { + t.Error("Expected an error when pushing to full queue, but got none") + } +} + +// TestPushFailWhenStopped tests that pushing to a stopped queue results in an error. +func TestPushFailWhenStopped(t *testing.T) { + queue := &MemoryQueue{} + queue.Initialize(func(data any) {}, 1, 1) + + queue.Stop() // Stop the queue before pushing + err := queue.Push("test data") + + if err == nil { + t.Error("Expected an error when pushing to stopped queue, but got none") + } +} + +// TestQueueOperationSequence tests a sequence of operations to ensure the queue handles them correctly. +func TestQueueOperationSequence(t *testing.T) { + queue := &MemoryQueue{} + queue.Initialize(func(data any) {}, 1, 2) + + // Sequence of pushes and a stop + err := queue.Push("data 1") + if err != nil { + t.Errorf("Failed to push data 1: %v", err) + } + + err = queue.Push("data 2") + if err != nil { + t.Errorf("Failed to push data 2: %v", err) + } + + queue.Stop() // Stop the queue + err = queue.Push("data 3") // This push should fail + if err == nil { + t.Error("Expected an error when pushing after stop, but got none") + } +} + +// TestBlockingOnFull tests that the queue does not block indefinitely when full. +func TestBlockingOnFull(t *testing.T) { + queue := &MemoryQueue{} + queue.Initialize(func(data any) { + time.Sleep(1 * time.Second) // Simulate a long processing time + }, 1, 1) + + queue.Push("data 1") // Fill the queue + + start := time.Now() + err := queue.Push("data 2") // This should time out + duration := time.Since(start) + + if err == nil { + t.Error("Expected an error due to full queue, but got none") + } + + if duration >= time.Second { + t.Errorf("Push blocked for too long, duration: %v", duration) + } +} diff --git a/scripts/cherry-pick.sh b/scripts/cherry-pick.sh index ff303269d..b95289cd4 100755 --- a/scripts/cherry-pick.sh +++ b/scripts/cherry-pick.sh @@ -35,8 +35,8 @@ DRY_RUN=${DRY_RUN:-""} REGENERATE_DOCS=${REGENERATE_DOCS:-""} UPSTREAM_REMOTE=${UPSTREAM_REMOTE:-upstream} FORK_REMOTE=${FORK_REMOTE:-origin} -MAIN_REPO_ORG=${MAIN_REPO_ORG:-$(git remote get-url "$UPSTREAM_REMOTE" | awk '{gsub(/http[s]:\/\/|git@/,"")}1' | awk -F'[@:./]' 'NR==1{print $3}')} -MAIN_REPO_NAME=${MAIN_REPO_NAME:-$(git remote get-url "$UPSTREAM_REMOTE" | awk '{gsub(/http[s]:\/\/|git@/,"")}1' | awk -F'[@:./]' 'NR==1{print $4}')} +MAIN_REPO_ORG=${MAIN_REPO_ORG:-$(git remote get-url "$UPSTREAM_REMOTE" | awk '{gsub(/webhook[s]:\/\/|git@/,"")}1' | awk -F'[@:./]' 'NR==1{print $3}')} +MAIN_REPO_NAME=${MAIN_REPO_NAME:-$(git remote get-url "$UPSTREAM_REMOTE" | awk '{gsub(/webhook[s]:\/\/|git@/,"")}1' | awk -F'[@:./]' 'NR==1{print $4}')} if [[ -z ${GITHUB_USER:-} ]]; then openim::log::error_exit "Please export GITHUB_USER= (or GH organization, if that's where your fork lives)" diff --git a/test/e2e/framework/helpers/chat/chat.go b/test/e2e/framework/helpers/chat/chat.go index aa280fe1d..0613ff569 100644 --- a/test/e2e/framework/helpers/chat/chat.go +++ b/test/e2e/framework/helpers/chat/chat.go @@ -101,7 +101,7 @@ func main() { } /* func getLatestVersion(url string) (string, error) { - resp, err := http.Get(url) + resp, err := webhook.Get(url) if err != nil { return "", err } diff --git a/tools/url2im/pkg/manage.go b/tools/url2im/pkg/manage.go index 5b3c555c6..5e1626da9 100644 --- a/tools/url2im/pkg/manage.go +++ b/tools/url2im/pkg/manage.go @@ -392,7 +392,7 @@ func (m *Manage) HttpGet(ctx context.Context, url string) (*http.Response, error } if response.StatusCode != http.StatusOK { _ = response.Body.Close() - return nil, fmt.Errorf("http get %s status %s", url, response.Status) + return nil, fmt.Errorf("webhook get %s status %s", url, response.Status) } return response, nil }