Merge remote-tracking branch 'origin/3.6.1-code-conventions' into 3.6.1-code-conventions

pull/2148/head
Gordon 1 year ago
commit a850733920

@ -372,7 +372,7 @@ linters-settings:
# Default: []
ignored-functions:
- '^math\.'
- '^http\.StatusText$'
- '^webhook\.StatusText$'
gomoddirectives:
# Allow local `replace` directives. Default is false.
replace-local: true

@ -1,243 +1,67 @@
# OpenIM Configuration Guide
<!-- vscode-markdown-toc -->
* 1. [Directory Structure and File Descriptions](#DirectoryStructureandFileDescriptions)
* 1.1. [Directory Structure](#DirectoryStructure)
* 1.2. [Directory Structure Explanation](#DirectoryStructureExplanation)
* 2. [File Descriptions](#FileDescriptions)
* 2.1. [Files in the Root Directory](#FilesintheRootDirectory)
* 2.2. [Files in the `templates/` Directory](#FilesinthetemplatesDirectory)
* 3. [Configuration File Generation](#ConfigurationFileGeneration)
* 3.1. [How to Use `init-config.sh` Script](#HowtoUseinit-config.shScript)
* 3.2. [Examples of Operations](#ExamplesofOperations)
* 3.3. [Points to Note](#PointstoNote)
* 4. [Example Directory](#ExampleDirectory)
* 4.1. [Overview](#Overview)
* 4.2. [Structure](#Structure)
* 4.3. [How to Use These Examples](#HowtoUseTheseExamples)
* 4.4. [Tips for Using Example Files:](#TipsforUsingExampleFiles:)
* 5. [Configuration Item Descriptions](#ConfigurationItemDescriptions)
* 6. [Version Management and Upgrading](#VersionManagementandUpgrading)
* 6.1. [Pulling the Latest Code](#PullingtheLatestCode)
* 6.2. [Generating the Latest Example Configuration Files](#GeneratingtheLatestExampleConfigurationFiles)
* 6.3. [Comparing Configuration File Differences](#ComparingConfigurationFileDifferences)
* 6.4. [Updating Configuration Files](#UpdatingConfigurationFiles)
* 6.5. [Updating Binary Files and Restarting Services](#UpdatingBinaryFilesandRestartingServices)
* 6.6. [Best Practices for Version Management](#BestPracticesforVersionManagement)
* 7. [How to Contribute](#HowtoContribute)
* 7.1. [OpenIM Configuration Item Descriptions](#OpenIMConfigurationItemDescriptions)
* 7.2. [Modifying Template Files](#ModifyingTemplateFiles)
* 7.3. [Updating Configuration Center Scripts](#UpdatingConfigurationCenterScripts)
* 7.4. [Configuration File Generation Process](#ConfigurationFileGenerationProcess)
* 7.5. [Contribution Guidelines](#ContributionGuidelines)
* 7.6. [Submission and Review](#SubmissionandReview)
<!-- vscode-markdown-toc-config
numbering=true
autoSave=true
/vscode-markdown-toc-config -->
<!-- /vscode-markdown-toc -->
## 1. <a name='DirectoryStructureandFileDescriptions'></a>Directory Structure and File Descriptions
This document details the structure of the `config` directory, aiding users in understanding and managing configuration files.
### 1.1. <a name='DirectoryStructure'></a>Directory Structure
```bash
$ tree config
├── alertmanager.yml
├── config.yaml
├── email.tmpl
├── instance-down-rules.yml
├── notification.yaml
├── prometheus.yml
├── Readme.md
└── templates
├── alertmanager.yml.template
├── config.yaml.template
├── email.tmpl.template
├── env.template
├── instance-down-rules.yml.template
├── notification.yaml.template
├── open-im-ng-example.conf
├── prometheus-dashboard.yaml
└── prometheus.yml.template
```
### 1.2. <a name='DirectoryStructureExplanation'></a>Directory Structure Explanation
- **Root Directory (`config/`)**: Contains actual configuration files and the `templates` subdirectory.
- **`templates/` Subdirectory**: Stores configuration templates for generating or updating configuration files in the root directory.
## 2. <a name='FileDescriptions'></a>File Descriptions
### 2.1. <a name='FilesintheRootDirectory'></a>Files in the Root Directory
- **`alertmanager.yml`**: Configuration file for AlertManager, managing and setting up the alert system.
- **`config.yaml`**: The main application configuration file, covering service settings.
- **`email.tmpl`**: Template file for email notifications, defining email format and content.
- **`instance-down-rules.yml`**: Instance downtime rules configuration file for the monitoring system.
- **`notification.yaml`**: Configuration file for notification settings, defining different types of notifications.
- **`prometheus.yml`**: Configuration file for the Prometheus monitoring system, setting monitoring metrics and rules.
### 2.2. <a name='FilesinthetemplatesDirectory'></a>Files in the `templates/` Directory
- **`alertmanager.yml.template`**: Template for AlertManager configuration.
- **`config.yaml.template`**: Main configuration template for the application.
- **`email.tmpl.template`**: Template for email notifications.
- **`env.template`**: Template for environmental variable configurations, setting environment-related configurations.
- **`instance-down-rules.yml.template`**: Template for instance downtime rules.
- **`notification.yaml.template`**: Template for notification settings.
- **`open-im-ng-example.conf`**: Example configuration file for the application.
- **`prometheus-dashboard.yaml`**: Prometheus dashboard configuration file, specific to the OpenIM application.
- **`prometheus.yml.template`**: Template for Prometheus configuration.
## 3. <a name='ConfigurationFileGeneration'></a>Configuration File Generation
Configuration files can be automatically generated using the `make init` command or the `./scripts/init-config.sh` script. These scripts conveniently extract templates from the `templates` directory and generate or update actual configuration files in the root directory.
### 3.1. <a name='HowtoUseinit-config.shScript'></a>How to Use `init-config.sh` Script
```bash
$ ./scripts/init-config.sh --help
Usage: init-config.sh [options]
Options:
-h, --help Show this help message
--force Overwrite existing files without prompt
--skip Skip generation if file exists
--examples Generate example files
--clean-config Clean all configuration files
--clean-examples Clean all example files
```
### 3.2. <a name='ExamplesofOperations'></a>Examples of Operations
- Generate all template configuration files:
```bash
$ ./scripts/init-config.sh --examples
```
- Force overwrite existing configuration files:
```bash
$ ./scripts/init-config.sh --force
```
### 3.3. <a name='PointstoNote'></a>Points to Note
- **Template files should not be directly modified**: Files in the `template` directory are templates included in source code management. Direct modification may lead to version conflicts or management issues.
- **Operations for Windows Users**: Windows users can use the `cp` command to copy files from the `template` directory to the `config/` directory and then modify the configuration items as needed.
## 4. <a name='ExampleDirectory'></a>Example Directory
Welcome to our project's `examples` directory! This directory contains a range of example files, showcasing various configurations and settings of our software. These examples are intended to provide you with templates that can serve as a starting point for your own configurations.
### 4.1. <a name='Overview'></a>Overview
In this directory, you'll find examples suitable for a variety of use cases. Each file is a template with default values and configurations, demonstrating best practices and typical scenarios. Whether you're just getting started or looking to implement complex settings, these examples should help you get on the right track.
### 4.2. <a name='Structure'></a>Structure
配置文件说明
Here's a quick overview of the contents in this directory:
每个组件单独有一个配置文件,主要是地址及账号密码信息。
- `env-example.yaml`: Demonstrates how to set up environmental variables.
- `openim-example.yaml`: Example configuration file for the OpenIM application.
- `prometheus-example.yml`: Example configuration for monitoring with Prometheus.
- `alertmanager-example.yml`: Template for setting up Alertmanager configuration.
### 4.3. <a name='HowtoUseTheseExamples'></a>How to Use These Examples
To use these examples, simply copy the relevant files to your working directory and rename them as needed (for example, removing the `-example` suffix). Then, modify the files according to your needs.
### 4.4. <a name='TipsforUsingExampleFiles:'></a>Tips for Using Example Files:
1. **Read Comments**: Each file contains comments explaining the various sections and settings. Make sure to read these comments for a better understanding of how to customize the file.
2. **Check Required Changes**: Some examples might require mandatory changes before they can be used effectively (such as setting specific environmental variables).
3. **Version Compatibility**: Ensure that the example files are compatible with the version of the software you are using.
callback
## 5. <a name='ConfigurationItemDescriptions'></a>Configuration Item Descriptions
## 6. <a name='VersionManagementandUpgrading'></a>Version Management and Upgrading
When managing and upgrading the `config` directory's versions, it is crucial to ensure that the configuration files in both the local `config/` and `config/templates/` directories are kept in sync. This process can ensure that your configuration files are consistent with the latest standard templates, while also maintaining custom settings.
### 6.1. <a name='PullingtheLatestCode'></a>Pulling the Latest Code
First, ensure that your local repository is in sync with the remote repository. This can be achieved by pulling the latest code:
rpc共同配置项说明
```bash
$ git pull
```
### 6.2. <a name='GeneratingtheLatestExampleConfigurationFiles'></a>Generating the Latest Example Configuration Files
Next, generate the latest example configuration files. This can be done by running the `init-config.sh` script, using the `--examples` option to generate example files, and the `--skip` option to avoid overwriting existing configuration files:
```bash
$ ./scripts/init-config.sh --examples --skip
```
### 6.3. <a name='ComparingConfigurationFileDifferences'></a>Comparing Configuration File Differences
Once the latest example configuration files are generated, you need to compare the configuration files in the `config/` and `config/templates/` directories to find any potential differences. This step ensures that you can identify and integrate any important updates or changes. Tools like `diff` can be helpful in completing this step:
```bash
$ diff -ur config/ config/templates/
rpc:
#api或其他rpc可通过这个ip访问到此rpc如果为空则获取内网ip默认为空即可
registerIP: ''
#监听ip如果为0.0.0.0则内外网ip都监听为空则自动获取内网ip监听
listenIP: 0.0.0.0
#监听端口如果配置多个则会启动多个实例但需和prometheus.ports保持一致
ports: [ 10120 ]
prometheus:
enable: true
#prometheus
ports: [ 20104 ]
```
### 6.4. <a name='UpdatingConfigurationFiles'></a>Updating Configuration Files
Based on the comparison results, manually update the configuration files in the `config/` directory to reflect the latest configurations in `config/templates/`. During this process, ensure to retain any custom configuration settings.
### 6.5. <a name='UpdatingBinaryFilesandRestartingServices'></a>Updating Binary Files and Restarting Services
After updating the configuration files, the next step is to update any related binary files. This typically involves downloading and installing the latest version of the application or service. Depending on the specific application or service, this might involve running specific update scripts or directly downloading the latest version from official sources.
Once the binary files are updated, the services need to be restarted to apply the new configurations. Make sure to conduct necessary checks before restarting to ensure the correctness of the configurations.
### 6.6. <a name='BestPracticesforVersionManagement'></a>Best Practices for Version Management
- **Record Changes**: When committing changes to a version control system, ensure to log detailed change logs.
- **Stay Synced**: Regularly sync with the remote repository to ensure that your local configurations are in line with the latest developments.
- **Backup**: Backup your current configurations before making any significant changes, so that you can revert to a previous state if necessary.
By following these steps and best practices, you can ensure effective management and smooth upgrading of your `config` directory.
## 7. <a name='HowtoContribute'></a>How to Contribute
api配置项说明
If you have an understanding of the logic behind OpenIM's configuration generation, then you will clearly know where to make modifications to contribute code.
### 7.1. <a name='OpenIMConfigurationItemDescriptions'></a>OpenIM Configuration Item Descriptions
First, it is recommended to read the [OpenIM Configuration Items Document](https://github.com/openimsdk/open-im-server/blob/main/docs/contrib/environment.md). This will help you understand the roles of various configuration items and how they affect the operation of OpenIM.
### 7.2. <a name='ModifyingTemplateFiles'></a>Modifying Template Files
To contribute to OpenIM, focus on the `./deployments/templates` directory. This contains various configuration template files, which are the basis for generating the final configuration files.
When making modifications, ensure that your changes align with OpenIM's configuration requirements and logic. This may involve adding new template files or modifying existing files to reflect new configuration options or structural changes.
### 7.3. <a name='UpdatingConfigurationCenterScripts'></a>Updating Configuration Center Scripts
In addition to modifying template files, pay attention to the `./scripts/install/environment.sh` script. In this script, you may need to add or modify environment variables.
This script is responsible for defining environment variables that influence configuration generation. Therefore, any new configuration items or modifications to existing items need to be reflected here.
```
api:
#0.0.0.0表示内外网ip都监听不应该修改
listenIP: 0.0.0.0
#监听端口,如果配置多个则会启动多个实例,和prometheus.ports保持一致
ports: [ 10002 ]
prometheus:
enable: true
ports: [ 20113 ]
#怎么描述 grafanaURL地址 外网地址,通过浏览器能访问到
grafanaURL: http://127.0.0.1:13000/
```
### 7.4. <a name='ConfigurationFileGenerationProcess'></a>Configuration File Generation Process
The essence of the `make init` command is to use the environment variables defined in `/scripts/install/environment.sh` to render the template files in the `./deployments/templates` directory, thereby generating the final configuration files.
When contributing code, ensure that your changes work smoothly in this process and do not cause errors during configuration file generation.
log配置项说明
### 7.5. <a name='ContributionGuidelines'></a>Contribution Guidelines
```
#log存放路径,如果要修改则改为全路径
storageLocation: ../../../../logs/
rotationTime: 24
remainRotationCount: 2
#3: 生产环境; 6日志较多调试环境
remainLogLevel: 6
isStdout: false
isJson: false
withStack: false
```
- **Code Review**: Ensure your changes have passed code review. This typically means that the code should be clear, easy to understand, and adhere to the project's coding style and best practices.
- **Testing**: Before submitting changes, conduct thorough tests to ensure new or modified configurations work as expected and do not negatively impact existing functionalities.
- **Documentation**: If you have added a new configuration option or made significant changes to an existing one, update the relevant documentation to assist other users and developers in understanding and utilizing these changes.
### 7.6. <a name='SubmissionandReview'></a>Submission and Review
After completing your changes, submit your code to the OpenIM repository in the form of a Pull Request (PR). The PR will be reviewed by the project maintainers and you may be asked to make further modifications or provide additional information.

@ -90,9 +90,9 @@ groupApplicationRejected:
unreadCount: false
offlinePush:
enable: false
title: " title"
desc: " desc"
ext: " ext"
title: "groupApplicationRejected title"
desc: "groupApplicationRejected desc"
ext: "groupApplicationRejected ext"
groupOwnerTransferred:

@ -5,4 +5,4 @@ api:
prometheus:
enable: true
ports: [ 20113 ]
grafanaURL: http://127.0.0.1:13000/
grafanaURL: webhook://127.0.0.1:13000/

@ -8,5 +8,6 @@ prometheus:
ports: [ 20106 ]
tokenPolicy:
#token有效期单位
expire: 90

@ -7,7 +7,10 @@ prometheus:
enable: true
ports: [ 20102 ]
#发消息是否需要好友验证
friendVerify: false
#
groupMessageHasReadReceiptEnable: true
singleMessageHasReadReceiptEnable: true

@ -24,9 +24,9 @@ object:
sessionToken: ''
publicRead: false
kodo:
endpoint: "http://s3.cn-east-1.qiniucs.com"
endpoint: "webhook://s3.cn-east-1.qiniucs.com"
bucket: "demo-9999999"
bucketURL: "http://your.domain.com"
bucketURL: "webhook://your.domain.com"
accessKeyID: ''
accessKeySecret: ''
sessionToken: ''

@ -1213,7 +1213,7 @@
"editorMode": "code",
"expr": "sum(rate(app_requests_total{job=~\"^($job)$\"}[$interval])) by (job)",
"instant": false,
"legendFormat": "{{job}}-http",
"legendFormat": "{{job}}-webhook",
"range": true,
"refId": "A"
},

@ -1,4 +1,4 @@
url: "http://127.0.0.1:10008/callbackExample"
url: "webhook://127.0.0.1:10008/callbackExample"
beforeSendSingleMsg:
enable: false
timeout: 5
@ -10,11 +10,9 @@ beforeUpdateUserInfoEx:
afterUpdateUserInfoEx:
enable: false
timeout: 5
failedContinue: true
afterSendSingleMsg:
enable: false
timeout: 5
failedContinue: true
beforeSendGroupMsg:
enable: false
timeout: 5
@ -26,19 +24,15 @@ beforeMsgModify:
afterSendGroupMsg:
enable: false
timeout: 5
failedContinue: true
afterUserOnline:
enable: false
timeout: 5
failedContinue: true
afterUserOffline:
enable: false
timeout: 5
failedContinue: true
afterUserKickOff:
enable: false
timeout: 5
failedContinue: true
beforeOfflinePush:
enable: false
timeout: 5
@ -62,7 +56,6 @@ beforeUpdateUserInfo:
afterUpdateUserInfo:
enable: false
timeout: 5
failedContinue: true
beforeCreateGroup:
enable: false
timeout: 5
@ -70,7 +63,6 @@ beforeCreateGroup:
afterCreateGroup:
enable: false
timeout: 5
failedContinue: true
beforeMemberJoinGroup:
enable: false
timeout: 5
@ -82,19 +74,15 @@ beforeSetGroupMemberInfo:
afterSetGroupMemberInfo:
enable: false
timeout: 5
failedContinue: true
afterQuitGroup:
enable: false
timeout: 5
failedContinue: true
afterKickGroupMember:
enable: false
timeout: 5
failedContinue: true
afterDismissGroup:
enable: false
timeout: 5
failedContinue: true
beforeApplyJoinGroup:
enable: false
timeout: 5
@ -102,11 +90,9 @@ beforeApplyJoinGroup:
afterGroupMsgRead:
enable: false
timeout: 5
failedContinue: true
afterSingleMsgRead:
enable: false
timeout: 5
failedContinue: true
beforeUserRegister:
enable: false
timeout: 5
@ -114,11 +100,9 @@ beforeUserRegister:
afterUserRegister:
enable: false
timeout: 5
failedContinue: true
afterTransferGroupOwner:
enable: false
timeout: 5
failedContinue: true
beforeSetFriendRemark:
enable: false
timeout: 5
@ -126,15 +110,12 @@ beforeSetFriendRemark:
afterSetFriendRemark:
enable: false
timeout: 5
failedContinue: true
afterGroupMsgRevoke:
enable: false
timeout: 5
failedContinue: true
afterJoinGroup:
enable: false
timeout: 5
failedContinue: true
beforeInviteUserToGroup:
enable: false
timeout: 5
@ -142,7 +123,6 @@ beforeInviteUserToGroup:
afterSetGroupInfo:
enable: false
timeout: 5
failedContinue: true
beforeSetGroupInfo:
enable: false
timeout: 5
@ -150,15 +130,13 @@ beforeSetGroupInfo:
afterRevokeMsg:
enable: false
timeout: 5
failedContinue: true
beforeAddBlack:
enable: false
timeout: 5
failedContinue: true
failedContinue:
afterAddFriend:
enable: false
timeout: 5
failedContinue: true
beforeAddFriendAgree:
enable: false
timeout: 5
@ -166,7 +144,6 @@ beforeAddFriendAgree:
afterDeleteFriend:
enable: false
timeout: 5
failedContinue: true
beforeImportFriends:
enable: false
timeout: 5
@ -174,8 +151,6 @@ beforeImportFriends:
afterImportFriends:
enable: false
timeout: 5
failedContinue: true
afterRemoveBlack:
enable: false
timeout: 5
failedContinue: true

@ -48,17 +48,17 @@ spec:
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- name: http
- name: webhook
containerPort: 80
protocol: TCP
#livenessProbe:
# httpGet:
# path: /
# port: http
# port: webhook
#readinessProbe:
# httpGet:
# path: /
# port: http
# port: webhook
resources:
{{- toYaml .Values.resources | nindent 12 }}
volumeMounts:

@ -22,8 +22,8 @@ spec:
type: {{ .Values.service.type }}
ports:
- port: {{ .Values.service.port }}
targetPort: http
targetPort: webhook
protocol: TCP
name: http
name: webhook
selector:
{{- include "openim-api.selectorLabels" . | nindent 4 }}

@ -48,7 +48,7 @@ spec:
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- name: http
- name: webhook
containerPort: 80
protocol: TCP
- name: rpc
@ -57,11 +57,11 @@ spec:
#livenessProbe:
# httpGet:
# path: /
# port: http
# port: webhook
#readinessProbe:
# httpGet:
# path: /
# port: http
# port: webhook
resources:
{{- toYaml .Values.resources | nindent 12 }}
volumeMounts:

@ -22,9 +22,9 @@ spec:
type: {{ .Values.service.type }}
ports:
- port: {{ .Values.service.port }}
targetPort: http
targetPort: webhook
protocol: TCP
name: http
name: webhook
- port: 88
targetPort: rpc
protocol: TCP

@ -48,17 +48,17 @@ spec:
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- name: http
- name: webhook
containerPort: 80
protocol: TCP
#livenessProbe:
# httpGet:
# path: /
# port: http
# port: webhook
#readinessProbe:
# httpGet:
# path: /
# port: http
# port: webhook
resources:
{{- toYaml .Values.resources | nindent 12 }}
volumeMounts:

@ -22,8 +22,8 @@ spec:
type: {{ .Values.service.type }}
ports:
- port: {{ .Values.service.port }}
targetPort: http
targetPort: webhook
protocol: TCP
name: http
name: webhook
selector:
{{- include "openim-msgtransfer.selectorLabels" . | nindent 4 }}

@ -48,17 +48,17 @@ spec:
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- name: http
- name: webhook
containerPort: 80
protocol: TCP
#livenessProbe:
# httpGet:
# path: /
# port: http
# port: webhook
#readinessProbe:
# httpGet:
# path: /
# port: http
# port: webhook
resources:
{{- toYaml .Values.resources | nindent 12 }}
volumeMounts:

@ -22,8 +22,8 @@ spec:
type: {{ .Values.service.type }}
ports:
- port: {{ .Values.service.port }}
targetPort: http
targetPort: webhook
protocol: TCP
name: http
name: webhook
selector:
{{- include "openim-push.selectorLabels" . | nindent 4 }}

@ -48,17 +48,17 @@ spec:
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- name: http
- name: webhook
containerPort: 80
protocol: TCP
#livenessProbe:
# httpGet:
# path: /
# port: http
# port: webhook
#readinessProbe:
# httpGet:
# path: /
# port: http
# port: webhook
resources:
{{- toYaml .Values.resources | nindent 12 }}
volumeMounts:

@ -22,8 +22,8 @@ spec:
type: {{ .Values.service.type }}
ports:
- port: {{ .Values.service.port }}
targetPort: http
targetPort: webhook
protocol: TCP
name: http
name: webhook
selector:
{{- include "openim-rpc-auth.selectorLabels" . | nindent 4 }}

@ -48,17 +48,17 @@ spec:
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- name: http
- name: webhook
containerPort: 80
protocol: TCP
#livenessProbe:
# httpGet:
# path: /
# port: http
# port: webhook
#readinessProbe:
# httpGet:
# path: /
# port: http
# port: webhook
resources:
{{- toYaml .Values.resources | nindent 12 }}
volumeMounts:

@ -22,8 +22,8 @@ spec:
type: {{ .Values.service.type }}
ports:
- port: {{ .Values.service.port }}
targetPort: http
targetPort: webhook
protocol: TCP
name: http
name: webhook
selector:
{{- include "openim-rpc-conversation.selectorLabels" . | nindent 4 }}

@ -48,17 +48,17 @@ spec:
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- name: http
- name: webhook
containerPort: 80
protocol: TCP
#livenessProbe:
# httpGet:
# path: /
# port: http
# port: webhook
#readinessProbe:
# httpGet:
# path: /
# port: http
# port: webhook
resources:
{{- toYaml .Values.resources | nindent 12 }}
volumeMounts:

@ -22,8 +22,8 @@ spec:
type: {{ .Values.service.type }}
ports:
- port: {{ .Values.service.port }}
targetPort: http
targetPort: webhook
protocol: TCP
name: http
name: webhook
selector:
{{- include "openim-rpc-friend.selectorLabels" . | nindent 4 }}

@ -48,17 +48,17 @@ spec:
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- name: http
- name: webhook
containerPort: 80
protocol: TCP
#livenessProbe:
# httpGet:
# path: /
# port: http
# port: webhook
#readinessProbe:
# httpGet:
# path: /
# port: http
# port: webhook
resources:
{{- toYaml .Values.resources | nindent 12 }}
volumeMounts:

@ -22,8 +22,8 @@ spec:
type: {{ .Values.service.type }}
ports:
- port: {{ .Values.service.port }}
targetPort: http
targetPort: webhook
protocol: TCP
name: http
name: webhook
selector:
{{- include "openim-rpc-group.selectorLabels" . | nindent 4 }}

@ -48,17 +48,17 @@ spec:
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- name: http
- name: webhook
containerPort: 80
protocol: TCP
#livenessProbe:
# httpGet:
# path: /
# port: http
# port: webhook
#readinessProbe:
# httpGet:
# path: /
# port: http
# port: webhook
resources:
{{- toYaml .Values.resources | nindent 12 }}
volumeMounts:

@ -22,8 +22,8 @@ spec:
type: {{ .Values.service.type }}
ports:
- port: {{ .Values.service.port }}
targetPort: http
targetPort: webhook
protocol: TCP
name: http
name: webhook
selector:
{{- include "openim-rpc-msg.selectorLabels" . | nindent 4 }}

@ -48,17 +48,17 @@ spec:
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- name: http
- name: webhook
containerPort: 80
protocol: TCP
#livenessProbe:
# httpGet:
# path: /
# port: http
# port: webhook
#readinessProbe:
# httpGet:
# path: /
# port: http
# port: webhook
resources:
{{- toYaml .Values.resources | nindent 12 }}
volumeMounts:

@ -22,8 +22,8 @@ spec:
type: {{ .Values.service.type }}
ports:
- port: {{ .Values.service.port }}
targetPort: http
targetPort: webhook
protocol: TCP
name: http
name: webhook
selector:
{{- include "openim-rpc-third.selectorLabels" . | nindent 4 }}

@ -48,17 +48,17 @@ spec:
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- name: http
- name: webhook
containerPort: 80
protocol: TCP
#livenessProbe:
# httpGet:
# path: /
# port: http
# port: webhook
#readinessProbe:
# httpGet:
# path: /
# port: http
# port: webhook
resources:
{{- toYaml .Values.resources | nindent 12 }}
volumeMounts:

@ -22,8 +22,8 @@ spec:
type: {{ .Values.service.type }}
ports:
- port: {{ .Values.service.port }}
targetPort: http
targetPort: webhook
protocol: TCP
name: http
name: webhook
selector:
{{- include "openim-rpc-user.selectorLabels" . | nindent 4 }}

@ -323,7 +323,7 @@ iosPush:
# Timeout in seconds
# Whether to continue execution if callback fails
callback:
url: "http://127.0.0.1:10008/callbackExample"
url: "webhook://127.0.0.1:10008/callbackExample"
beforeSendSingleMsg:
enable: ${CALLBACK_ENABLE}
timeout: ${CALLBACK_TIMEOUT}

@ -362,7 +362,7 @@ func SIGTERMExit() {
```go
import (
_ "net/http/pprof"
_ "net/webhook/pprof"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"

@ -581,7 +581,7 @@ func SIGTERMExit() {
```go
import (
_ "net/http/pprof"
_ "net/webhook/pprof"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"

@ -470,7 +470,7 @@ wrappedErr := errs.WrapMsg(err, "additional error information")
```go
// "github.com/openimsdk/tools/errs"
err := errors.New("original error")
wrappedErr := errs.WrapMsg(err, "problem occurred", "code", 404, "url", "http://example.com")
wrappedErr := errs.WrapMsg(err, "problem occurred", "code", 404, "url", "webhook://example.com")
// wrappedErr will contain the original error, call stack, and "problem occurred code=404, url=http://example.com"
```
@ -494,7 +494,7 @@ Suppose we have some runtime context variables, such as a user ID and the type o
userID := "user123"
operation := "update profile"
errorCode := 500
requestURL := "http://example.com/updateProfile"
requestURL := "webhook://example.com/updateProfile"
// Create a new error
err := errors.New("original error")

@ -65,7 +65,7 @@ func setURLPrefix(c *gin.Context, urlPrefix *string) error {
}
}
u := url.URL{
Scheme: "http",
Scheme: "webhook",
Host: c.Request.Host,
Path: "/object/",
}

@ -20,19 +20,15 @@ import (
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/http"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/mcontext"
)
func CallbackUserOnline(ctx context.Context, callback *config.Webhooks, userID string, platformID int, isAppBackground bool, connID string) error {
if !callback.AfterUserOnline.Enable {
return nil
}
func (ws *WsServer) webhookAfterUserOnline(ctx context.Context, after *config.AfterConfig, userID string, platformID int, isAppBackground bool, connID string) {
req := cbapi.CallbackUserOnlineReq{
UserStatusCallbackReq: cbapi.UserStatusCallbackReq{
UserStatusBaseCallback: cbapi.UserStatusBaseCallback{
CallbackCommand: cbapi.CallbackUserOnlineCommand,
CallbackCommand: cbapi.CallbackAfterUserOnlineCommand,
OperationID: mcontext.GetOperationID(ctx),
PlatformID: platformID,
Platform: constant.PlatformIDToName(platformID),
@ -43,21 +39,14 @@ func CallbackUserOnline(ctx context.Context, callback *config.Webhooks, userID s
IsAppBackground: isAppBackground,
ConnID: connID,
}
resp := cbapi.CommonCallbackResp{}
if err := http.CallBackPostReturn(ctx, callback.URL, &req, &resp, callback.AfterUserOnline); err != nil {
return err
}
return nil
ws.webhookClient.AsyncPost(ctx, req.GetCallbackCommand(), req, &cbapi.CommonCallbackResp{}, after)
}
func CallbackUserOffline(ctx context.Context, callback *config.Webhooks, userID string, platformID int, connID string) error {
if !callback.AfterUserOffline.Enable {
return nil
}
func (ws *WsServer) webhookAfterUserOffline(ctx context.Context, after *config.AfterConfig, userID string, platformID int, connID string) {
req := &cbapi.CallbackUserOfflineReq{
UserStatusCallbackReq: cbapi.UserStatusCallbackReq{
UserStatusBaseCallback: cbapi.UserStatusBaseCallback{
CallbackCommand: cbapi.CallbackUserOfflineCommand,
CallbackCommand: cbapi.CallbackAfterUserOfflineCommand,
OperationID: mcontext.GetOperationID(ctx),
PlatformID: platformID,
Platform: constant.PlatformIDToName(platformID),
@ -67,21 +56,14 @@ func CallbackUserOffline(ctx context.Context, callback *config.Webhooks, userID
Seq: time.Now().UnixMilli(),
ConnID: connID,
}
resp := &cbapi.CallbackUserOfflineResp{}
if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.AfterUserOffline); err != nil {
return err
}
return nil
ws.webhookClient.AsyncPost(ctx, req.GetCallbackCommand(), req, &cbapi.CallbackUserOfflineResp{}, after)
}
func CallbackUserKickOff(ctx context.Context, callback *config.Webhooks, userID string, platformID int) error {
if !callback.AfterUserKickOff.Enable {
return nil
}
func (ws *WsServer) webhookAfterUserKickOff(ctx context.Context, after *config.AfterConfig, userID string, platformID int) {
req := &cbapi.CallbackUserKickOffReq{
UserStatusCallbackReq: cbapi.UserStatusCallbackReq{
UserStatusBaseCallback: cbapi.UserStatusBaseCallback{
CallbackCommand: cbapi.CallbackUserKickOffCommand,
CallbackCommand: cbapi.CallbackAfterUserKickOffCommand,
OperationID: mcontext.GetOperationID(ctx),
PlatformID: platformID,
Platform: constant.PlatformIDToName(platformID),
@ -90,9 +72,5 @@ func CallbackUserKickOff(ctx context.Context, callback *config.Webhooks, userID
},
Seq: time.Now().UnixMilli(),
}
resp := &cbapi.CommonCallbackResp{}
if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.AfterUserOffline); err != nil {
return err
}
return nil
ws.webhookClient.AsyncPost(ctx, req.GetCallbackCommand(), req, &cbapi.CommonCallbackResp{}, after)
}

@ -17,6 +17,8 @@ package msggateway
import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue"
pbAuth "github.com/openimsdk/protocol/auth"
"github.com/openimsdk/tools/mcontext"
"net/http"
@ -52,6 +54,11 @@ type LongConnServer interface {
MessageHandler
}
const (
webhookWorkerCount = 2
webhookBufferSize = 100
)
type WsServer struct {
msgGatewayConfig *Config
port int
@ -72,6 +79,7 @@ type WsServer struct {
Compressor
Encoder
MessageHandler
webhookClient *webhook.Client
}
type kickHandler struct {
@ -95,15 +103,9 @@ func (ws *WsServer) SetUserOnlineStatus(ctx context.Context, client *Client, sta
}
switch status {
case constant.Online:
err := CallbackUserOnline(ctx, &ws.msgGatewayConfig.WebhooksConfig, client.UserID, client.PlatformID, client.IsBackground, client.ctx.GetConnID())
if err != nil {
log.ZWarn(ctx, "CallbackUserOnline err", err)
}
ws.webhookAfterUserOnline(ctx, &ws.msgGatewayConfig.WebhooksConfig.AfterUserOnline, client.UserID, client.PlatformID, client.IsBackground, client.ctx.GetConnID())
case constant.Offline:
err := CallbackUserOffline(ctx, &ws.msgGatewayConfig.WebhooksConfig, client.UserID, client.PlatformID, client.ctx.GetConnID())
if err != nil {
log.ZWarn(ctx, "CallbackUserOffline err", err)
}
ws.webhookAfterUserOffline(ctx, &ws.msgGatewayConfig.WebhooksConfig.AfterUserOffline, client.UserID, client.PlatformID, client.ctx.GetConnID())
}
}
@ -147,6 +149,7 @@ func NewWsServer(msgGatewayConfig *Config, opts ...Option) (*WsServer, error) {
clients: newUserMap(),
Compressor: NewGzipCompressor(),
Encoder: NewGobEncoder(),
webhookClient: webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL, memAsyncQueue.NewMemoryQueue(webhookWorkerCount, webhookBufferSize)),
}, nil
}

@ -19,21 +19,20 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/http"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/datautil"
)
func callbackOfflinePush(ctx context.Context, callback *config.Webhooks, userIDs []string, msg *sdkws.MsgData, offlinePushUserIDs *[]string) error {
if !callback.BeforeOfflinePush.Enable || msg.ContentType == constant.Typing {
func (p *Pusher) webhookBeforeOfflinePush(ctx context.Context, before *config.BeforeConfig, userIDs []string, msg *sdkws.MsgData, offlinePushUserIDs *[]string) error {
if msg.ContentType == constant.Typing {
return nil
}
req := &callbackstruct.CallbackBeforePushReq{
UserStatusBatchCallbackReq: callbackstruct.UserStatusBatchCallbackReq{
UserStatusBaseCallback: callbackstruct.UserStatusBaseCallback{
CallbackCommand: callbackstruct.CallbackOfflinePushCommand,
CallbackCommand: callbackstruct.CallbackBeforeOfflinePushCommand,
OperationID: mcontext.GetOperationID(ctx),
PlatformID: int(msg.SenderPlatformID),
Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)),
@ -51,7 +50,8 @@ func callbackOfflinePush(ctx context.Context, callback *config.Webhooks, userIDs
}
resp := &callbackstruct.CallbackBeforePushResp{}
if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.BeforeOfflinePush); err != nil {
if err := p.webhookClient.SyncPost(ctx, req.GetCallbackCommand(), req, resp, before); err != nil {
return err
}
@ -64,14 +64,14 @@ func callbackOfflinePush(ctx context.Context, callback *config.Webhooks, userIDs
return nil
}
func callbackOnlinePush(ctx context.Context, callback *config.Webhooks, userIDs []string, msg *sdkws.MsgData) error {
if !callback.BeforeOnlinePush.Enable || datautil.Contain(msg.SendID, userIDs...) || msg.ContentType == constant.Typing {
func (p *Pusher) webhookBeforeOnlinePush(ctx context.Context, before *config.BeforeConfig, userIDs []string, msg *sdkws.MsgData) error {
if datautil.Contain(msg.SendID, userIDs...) || msg.ContentType == constant.Typing {
return nil
}
req := callbackstruct.CallbackBeforePushReq{
UserStatusBatchCallbackReq: callbackstruct.UserStatusBatchCallbackReq{
UserStatusBaseCallback: callbackstruct.UserStatusBaseCallback{
CallbackCommand: callbackstruct.CallbackOnlinePushCommand,
CallbackCommand: callbackstruct.CallbackBeforeOnlinePushCommand,
OperationID: mcontext.GetOperationID(ctx),
PlatformID: int(msg.SenderPlatformID),
Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)),
@ -87,25 +87,25 @@ func callbackOnlinePush(ctx context.Context, callback *config.Webhooks, userIDs
Content: GetContent(msg),
}
resp := &callbackstruct.CallbackBeforePushResp{}
if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.BeforeOnlinePush); err != nil {
if err := p.webhookClient.SyncPost(ctx, req.GetCallbackCommand(), req, resp, before); err != nil {
return err
}
return nil
}
func callbackBeforeSuperGroupOnlinePush(
func (p *Pusher) webhookBeforeGroupOnlinePush(
ctx context.Context,
callback *config.Webhooks,
before *config.BeforeConfig,
groupID string,
msg *sdkws.MsgData,
pushToUserIDs *[]string,
) error {
if !callback.BeforeGroupOnlinePush.Enable || msg.ContentType == constant.Typing {
if msg.ContentType == constant.Typing {
return nil
}
req := callbackstruct.CallbackBeforeSuperGroupOnlinePushReq{
UserStatusBaseCallback: callbackstruct.UserStatusBaseCallback{
CallbackCommand: callbackstruct.CallbackSuperGroupOnlinePushCommand,
CallbackCommand: callbackstruct.CallbackBeforeGroupOnlinePushCommand,
OperationID: mcontext.GetOperationID(ctx),
PlatformID: int(msg.SenderPlatformID),
Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)),
@ -120,10 +120,9 @@ func callbackBeforeSuperGroupOnlinePush(
Seq: msg.Seq,
}
resp := &callbackstruct.CallbackBeforeSuperGroupOnlinePushResp{}
if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.BeforeGroupOnlinePush); err != nil {
if err := p.webhookClient.SyncPost(ctx, req.GetCallbackCommand(), req, resp, before); err != nil {
return err
}
if len(resp.UserIDs) != 0 {
*pushToUserIDs = resp.UserIDs
}

@ -17,6 +17,8 @@ package push
import (
"context"
"encoding/json"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue"
"github.com/openimsdk/tools/errs"
"sync"
@ -47,6 +49,11 @@ import (
"google.golang.org/grpc"
)
const (
webhookWorkerCount = 2
webhookBufferSize = 100
)
type Pusher struct {
config *Config
database controller.PushDatabase
@ -57,6 +64,7 @@ type Pusher struct {
msgRpcClient *rpcclient.MessageRpcClient
conversationRpcClient *rpcclient.ConversationRpcClient
groupRpcClient *rpcclient.GroupRpcClient
webhookClient *webhook.Client
}
var errNoOfflinePusher = errs.New("no offlinePusher is configured")
@ -75,6 +83,7 @@ func NewPusher(config *Config, discov discovery.SvcDiscoveryRegistry, offlinePus
msgRpcClient: msgRpcClient,
conversationRpcClient: conversationRpcClient,
groupRpcClient: groupRpcClient,
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL, memAsyncQueue.NewMemoryQueue(webhookWorkerCount, webhookBufferSize)),
}
}
@ -104,9 +113,11 @@ func (p *Pusher) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID
func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) error {
log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String())
if err := callbackOnlinePush(ctx, &p.config.WebhooksConfig, userIDs, msg); err != nil {
if err := p.webhookBeforeOnlinePush(ctx, &p.config.WebhooksConfig.BeforeOnlinePush, userIDs, msg); err != nil {
return err
}
// push
wsResults, err := p.GetConnsAndOnlinePush(ctx, msg, userIDs)
if err != nil {
@ -132,7 +143,7 @@ func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.Msg
})
if len(offlinePushUserIDList) > 0 {
if err = callbackOfflinePush(ctx, &p.config.WebhooksConfig, offlinePushUserIDList, msg, &[]string{}); err != nil {
if err = p.webhookBeforeOfflinePush(ctx, &p.config.WebhooksConfig.BeforeOfflinePush, offlinePushUserIDList, msg, &[]string{}); err != nil {
return err
}
err = p.offlinePushMsg(ctx, msg.SendID, msg, offlinePushUserIDList)
@ -165,7 +176,7 @@ func (p *Pusher) k8sOfflinePush2SuperGroup(ctx context.Context, groupID string,
}
if len(needOfflinePushUserIDs) > 0 {
var offlinePushUserIDs []string
err := callbackOfflinePush(ctx, &p.config.WebhooksConfig, needOfflinePushUserIDs, msg, &offlinePushUserIDs)
err := p.webhookBeforeOfflinePush(ctx, &p.config.WebhooksConfig.BeforeOfflinePush, needOfflinePushUserIDs, msg, &offlinePushUserIDs)
if err != nil {
return err
}
@ -196,7 +207,8 @@ func (p *Pusher) k8sOfflinePush2SuperGroup(ctx context.Context, groupID string,
func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
var pushToUserIDs []string
if err = callbackBeforeSuperGroupOnlinePush(ctx, &p.config.WebhooksConfig, groupID, msg, &pushToUserIDs); err != nil {
if err = p.webhookBeforeGroupOnlinePush(ctx, &p.config.WebhooksConfig.BeforeGroupOnlinePush, groupID, msg, &pushToUserIDs); err != nil {
return err
}
@ -298,7 +310,8 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
// Use offline push messaging
if len(needOfflinePushUserIDs) > 0 {
var offlinePushUserIDs []string
err = callbackOfflinePush(ctx, &p.config.WebhooksConfig, needOfflinePushUserIDs, msg, &offlinePushUserIDs)
err = p.webhookBeforeOfflinePush(ctx, &p.config.WebhooksConfig.BeforeOfflinePush, needOfflinePushUserIDs, msg, &offlinePushUserIDs)
if err != nil {
return err
}

@ -19,15 +19,19 @@ import (
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/http"
pbfriend "github.com/openimsdk/protocol/friend"
"github.com/openimsdk/tools/utils/datautil"
)
func CallbackBeforeAddFriend(ctx context.Context, callback *config.Webhooks, req *pbfriend.ApplyToAddFriendReq) error {
if !callback.BeforeAddFriend.Enable {
return nil
func (s *friendServer) webhookAfterDeleteFriend(ctx context.Context, after *config.AfterConfig, req *pbfriend.DeleteFriendReq) {
cbReq := &cbapi.CallbackAfterDeleteFriendReq{
CallbackCommand: cbapi.CallbackAfterDeleteFriendCommand,
OwnerUserID: req.OwnerUserID,
FriendUserID: req.FriendUserID,
}
s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterDeleteFriendResp{}, after)
}
func (s *friendServer) webhookBeforeAddFriend(ctx context.Context, before *config.BeforeConfig, req *pbfriend.ApplyToAddFriendReq) error {
cbReq := &cbapi.CallbackBeforeAddFriendReq{
CallbackCommand: cbapi.CallbackBeforeAddFriendCommand,
FromUserID: req.FromUserID,
@ -36,49 +40,75 @@ func CallbackBeforeAddFriend(ctx context.Context, callback *config.Webhooks, req
Ex: req.Ex,
}
resp := &cbapi.CallbackBeforeAddFriendResp{}
if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.BeforeAddFriend); err != nil {
if err := s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
return err
}
return nil
}
func CallbackBeforeSetFriendRemark(ctx context.Context, callback *config.Webhooks, req *pbfriend.SetFriendRemarkReq) error {
if !callback.BeforeSetFriendRemark.Enable {
return nil
func (s *friendServer) webhookAfterAddFriend(ctx context.Context, after *config.AfterConfig, req *pbfriend.ApplyToAddFriendReq) {
cbReq := &cbapi.CallbackAfterAddFriendReq{
CallbackCommand: cbapi.CallbackAfterAddFriendCommand,
FromUserID: req.FromUserID,
ToUserID: req.ToUserID,
ReqMsg: req.ReqMsg,
}
cbReq := &cbapi.CallbackBeforeSetFriendRemarkReq{
CallbackCommand: cbapi.CallbackBeforeSetFriendRemark,
resp := &cbapi.CallbackAfterAddFriendResp{}
s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, after)
}
func (s *friendServer) webhookAfterSetFriendRemark(ctx context.Context, after *config.AfterConfig, req *pbfriend.SetFriendRemarkReq) {
cbReq := &cbapi.CallbackAfterSetFriendRemarkReq{
CallbackCommand: cbapi.CallbackAfterSetFriendRemarkCommand,
OwnerUserID: req.OwnerUserID,
FriendUserID: req.FriendUserID,
Remark: req.Remark,
}
resp := &cbapi.CallbackBeforeSetFriendRemarkResp{}
if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.BeforeAddFriend); err != nil {
return err
resp := &cbapi.CallbackAfterSetFriendRemarkResp{}
s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, after)
}
func (s *friendServer) webhookAfterImportFriends(ctx context.Context, after *config.AfterConfig, req *pbfriend.ImportFriendReq) {
cbReq := &cbapi.CallbackAfterImportFriendsReq{
CallbackCommand: cbapi.CallbackAfterImportFriendsCommand,
OwnerUserID: req.OwnerUserID,
FriendUserIDs: req.FriendUserIDs,
}
datautil.NotNilReplace(&req.Remark, &resp.Remark)
return nil
resp := &cbapi.CallbackAfterImportFriendsResp{}
s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, after)
}
func CallbackAfterSetFriendRemark(ctx context.Context, callback *config.Webhooks, req *pbfriend.SetFriendRemarkReq) error {
if !callback.AfterSetFriendRemark.Enable {
return nil
func (s *friendServer) webhookAfterRemoveBlack(ctx context.Context, after *config.AfterConfig, req *pbfriend.RemoveBlackReq) {
cbReq := &cbapi.CallbackAfterRemoveBlackReq{
CallbackCommand: cbapi.CallbackAfterRemoveBlackCommand,
OwnerUserID: req.OwnerUserID,
BlackUserID: req.BlackUserID,
}
cbReq := &cbapi.CallbackAfterSetFriendRemarkReq{
CallbackCommand: cbapi.CallbackAfterSetFriendRemark,
resp := &cbapi.CallbackAfterRemoveBlackResp{}
s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, after)
}
func (s *friendServer) webhookBeforeSetFriendRemark(ctx context.Context, before *config.BeforeConfig, req *pbfriend.SetFriendRemarkReq) error {
cbReq := &cbapi.CallbackBeforeSetFriendRemarkReq{
CallbackCommand: cbapi.CallbackBeforeSetFriendRemarkCommand,
OwnerUserID: req.OwnerUserID,
FriendUserID: req.FriendUserID,
Remark: req.Remark,
}
resp := &cbapi.CallbackAfterSetFriendRemarkResp{}
if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.BeforeAddFriend); err != nil {
resp := &cbapi.CallbackBeforeSetFriendRemarkResp{}
if err := s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
return err
}
if resp.Remark != "" {
req.Remark = resp.Remark
}
return nil
}
func CallbackBeforeAddBlack(ctx context.Context, callback *config.Webhooks, req *pbfriend.AddBlackReq) error {
if !callback.BeforeAddBlack.Enable {
func (s *friendServer) webhookBeforeAddBlack(ctx context.Context, before *config.BeforeConfig, req *pbfriend.AddBlackReq) error {
if !before.Enable {
return nil
}
cbReq := &cbapi.CallbackBeforeAddBlackReq{
@ -87,32 +117,11 @@ func CallbackBeforeAddBlack(ctx context.Context, callback *config.Webhooks, req
BlackUserID: req.BlackUserID,
}
resp := &cbapi.CallbackBeforeAddBlackResp{}
if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.BeforeAddBlack); err != nil {
return err
}
return nil
return s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before)
}
func CallbackAfterAddFriend(ctx context.Context, callback *config.Webhooks, req *pbfriend.ApplyToAddFriendReq) error {
if !callback.AfterAddFriend.Enable {
return nil
}
cbReq := &cbapi.CallbackAfterAddFriendReq{
CallbackCommand: cbapi.CallbackAfterAddFriendCommand,
FromUserID: req.FromUserID,
ToUserID: req.ToUserID,
ReqMsg: req.ReqMsg,
}
resp := &cbapi.CallbackAfterAddFriendResp{}
if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.AfterAddFriend); err != nil {
return err
}
return nil
}
func CallbackBeforeAddFriendAgree(ctx context.Context, callback *config.Webhooks, req *pbfriend.RespondFriendApplyReq) error {
if !callback.BeforeAddFriendAgree.Enable {
func (s *friendServer) webhookBeforeAddFriendAgree(ctx context.Context, before *config.BeforeConfig, req *pbfriend.RespondFriendApplyReq) error {
if !before.Enable {
return nil
}
cbReq := &cbapi.CallbackBeforeAddFriendAgreeReq{
@ -123,30 +132,11 @@ func CallbackBeforeAddFriendAgree(ctx context.Context, callback *config.Webhooks
HandleResult: req.HandleResult,
}
resp := &cbapi.CallbackBeforeAddFriendAgreeResp{}
if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.BeforeAddFriendAgree); err != nil {
return err
}
return nil
return s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before)
}
func CallbackAfterDeleteFriend(ctx context.Context, callback *config.Webhooks, req *pbfriend.DeleteFriendReq) error {
if !callback.AfterDeleteFriend.Enable {
return nil
}
cbReq := &cbapi.CallbackAfterDeleteFriendReq{
CallbackCommand: cbapi.CallbackAfterDeleteFriendCommand,
OwnerUserID: req.OwnerUserID,
FriendUserID: req.FriendUserID,
}
resp := &cbapi.CallbackAfterDeleteFriendResp{}
if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.AfterDeleteFriend); err != nil {
return err
}
return nil
}
func CallbackBeforeImportFriends(ctx context.Context, callback *config.Webhooks, req *pbfriend.ImportFriendReq) error {
if !callback.BeforeImportFriends.Enable {
func (s *friendServer) webhookBeforeImportFriends(ctx context.Context, before *config.BeforeConfig, req *pbfriend.ImportFriendReq) error {
if !before.Enable {
return nil
}
cbReq := &cbapi.CallbackBeforeImportFriendsReq{
@ -155,43 +145,11 @@ func CallbackBeforeImportFriends(ctx context.Context, callback *config.Webhooks,
FriendUserIDs: req.FriendUserIDs,
}
resp := &cbapi.CallbackBeforeImportFriendsResp{}
if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.BeforeImportFriends); err != nil {
if err := s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
return err
}
if len(resp.FriendUserIDs) != 0 {
if len(resp.FriendUserIDs) > 0 {
req.FriendUserIDs = resp.FriendUserIDs
}
return nil
}
func CallbackAfterImportFriends(ctx context.Context, callback *config.Webhooks, req *pbfriend.ImportFriendReq) error {
if !callback.AfterImportFriends.Enable {
return nil
}
cbReq := &cbapi.CallbackAfterImportFriendsReq{
CallbackCommand: cbapi.CallbackAfterImportFriendsCommand,
OwnerUserID: req.OwnerUserID,
FriendUserIDs: req.FriendUserIDs,
}
resp := &cbapi.CallbackAfterImportFriendsResp{}
if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.AfterImportFriends); err != nil {
return err
}
return nil
}
func CallbackAfterRemoveBlack(ctx context.Context, callback *config.Webhooks, req *pbfriend.RemoveBlackReq) error {
if !callback.AfterRemoveBlack.Enable {
return nil
}
cbReq := &cbapi.CallbackAfterRemoveBlackReq{
CallbackCommand: cbapi.CallbackAfterRemoveBlackCommand,
OwnerUserID: req.OwnerUserID,
BlackUserID: req.BlackUserID,
}
resp := &cbapi.CallbackAfterRemoveBlackResp{}
if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.AfterRemoveBlack); err != nil {
return err
}
return nil
}

@ -17,6 +17,8 @@ package friend
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
@ -38,6 +40,11 @@ import (
"google.golang.org/grpc"
)
const (
webhookWorkerCount = 2
webhookBufferSize = 100
)
type friendServer struct {
friendDatabase controller.FriendDatabase
blackDatabase controller.BlackDatabase
@ -46,6 +53,7 @@ type friendServer struct {
conversationRpcClient rpcclient.ConversationRpcClient
RegisterCenter discovery.SvcDiscoveryRegistry
config *Config
webhookClient *webhook.Client
}
type Config struct {
@ -113,6 +121,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
RegisterCenter: client,
conversationRpcClient: rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation),
config: config,
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL, memAsyncQueue.NewMemoryQueue(webhookWorkerCount, webhookBufferSize)),
})
return nil
@ -124,14 +133,12 @@ func (s *friendServer) ApplyToAddFriend(ctx context.Context, req *pbfriend.Apply
if err := authverify.CheckAccessV3(ctx, req.FromUserID, s.config.Share.IMAdminUserID); err != nil {
return nil, err
}
if req.ToUserID == req.FromUserID {
return nil, servererrs.ErrCanNotAddYourself.WrapMsg("req.ToUserID", req.ToUserID)
}
if err = CallbackBeforeAddFriend(ctx, &s.config.WebhooksConfig, req); err != nil && err != servererrs.ErrCallbackContinue {
if err = s.webhookBeforeAddFriend(ctx, &s.config.WebhooksConfig.BeforeAddFriend, req); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
if _, err := s.userRpcClient.GetUsersInfoMap(ctx, []string{req.ToUserID, req.FromUserID}); err != nil {
return nil, err
}
@ -140,22 +147,17 @@ func (s *friendServer) ApplyToAddFriend(ctx context.Context, req *pbfriend.Apply
if err != nil {
return nil, err
}
if in1 && in2 {
return nil, servererrs.ErrRelationshipAlready.WrapMsg("already friends has f")
}
if err = s.friendDatabase.AddFriendRequest(ctx, req.FromUserID, req.ToUserID, req.ReqMsg, req.Ex); err != nil {
return nil, err
}
if err = s.notificationSender.FriendApplicationAddNotification(ctx, req); err != nil {
return nil, err
}
if err = CallbackAfterAddFriend(ctx, &s.config.WebhooksConfig, req); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
s.webhookAfterAddFriend(ctx, &s.config.WebhooksConfig.AfterAddFriend, req)
return resp, nil
}
@ -174,7 +176,7 @@ func (s *friendServer) ImportFriends(ctx context.Context, req *pbfriend.ImportFr
return nil, errs.ErrArgs.WrapMsg("friend userID repeated")
}
if err := CallbackBeforeImportFriends(ctx, &s.config.WebhooksConfig, req); err != nil {
if err := s.webhookBeforeImportFriends(ctx, &s.config.WebhooksConfig.BeforeImportFriends, req); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
@ -188,9 +190,8 @@ func (s *friendServer) ImportFriends(ctx context.Context, req *pbfriend.ImportFr
HandleResult: constant.FriendResponseAgree,
})
}
if err := CallbackAfterImportFriends(ctx, &s.config.WebhooksConfig, req); err != nil {
return nil, err
}
s.webhookAfterImportFriends(ctx, &s.config.WebhooksConfig.AfterImportFriends, req)
return &pbfriend.ImportFriendResp{}, nil
}
@ -208,7 +209,7 @@ func (s *friendServer) RespondFriendApply(ctx context.Context, req *pbfriend.Res
HandleResult: req.HandleResult,
}
if req.HandleResult == constant.FriendResponseAgree {
if err := CallbackBeforeAddFriendAgree(ctx, &s.config.WebhooksConfig, req); err != nil && err != servererrs.ErrCallbackContinue {
if err := s.webhookBeforeAddFriendAgree(ctx, &s.config.WebhooksConfig.BeforeAddFriendAgree, req); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
err := s.friendDatabase.AgreeFriendRequest(ctx, &friendRequest)
@ -245,16 +246,13 @@ func (s *friendServer) DeleteFriend(ctx context.Context, req *pbfriend.DeleteFri
return nil, err
}
s.notificationSender.FriendDeletedNotification(ctx, req)
if err := CallbackAfterDeleteFriend(ctx, &s.config.WebhooksConfig, req); err != nil {
return nil, err
}
s.webhookAfterDeleteFriend(ctx, &s.config.WebhooksConfig.AfterDeleteFriend, req)
return resp, nil
}
// ok.
func (s *friendServer) SetFriendRemark(ctx context.Context, req *pbfriend.SetFriendRemarkReq) (resp *pbfriend.SetFriendRemarkResp, err error) {
if err = CallbackBeforeSetFriendRemark(ctx, &s.config.WebhooksConfig, req); err != nil && err != servererrs.ErrCallbackContinue {
if err = s.webhookBeforeSetFriendRemark(ctx, &s.config.WebhooksConfig.BeforeSetFriendRemark, req); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
resp = &pbfriend.SetFriendRemarkResp{}
@ -268,9 +266,7 @@ func (s *friendServer) SetFriendRemark(ctx context.Context, req *pbfriend.SetFri
if err := s.friendDatabase.UpdateRemark(ctx, req.OwnerUserID, req.FriendUserID, req.Remark); err != nil {
return nil, err
}
if err := CallbackAfterSetFriendRemark(ctx, &s.config.WebhooksConfig, req); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
s.webhookAfterSetFriendRemark(ctx, &s.config.WebhooksConfig.AfterSetFriendRemark, req)
s.notificationSender.FriendRemarkSetNotification(ctx, req.OwnerUserID, req.FriendUserID)
return resp, nil
}

@ -16,32 +16,21 @@ package group
import (
"context"
"github.com/openimsdk/tools/utils/datautil"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/openimsdk/open-im-server/v3/pkg/common/http"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/group"
pbgroup "github.com/openimsdk/protocol/group"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/datautil"
"google.golang.org/protobuf/types/known/wrapperspb"
"time"
)
type GroupEventCallbackConfig struct {
CallbackUrl string
BeforeCreateGroup config.WebhookConfig
}
// CallbackBeforeCreateGroup callback before create group.
func CallbackBeforeCreateGroup(ctx context.Context, cfg *GroupEventCallbackConfig, req *group.CreateGroupReq) error {
if !cfg.BeforeCreateGroup.Enable {
return nil
}
func (s *groupServer) webhookBeforeCreateGroup(ctx context.Context, before *config.BeforeConfig, req *group.CreateGroupReq) error {
cbReq := &callbackstruct.CallbackBeforeCreateGroupReq{
CallbackCommand: callbackstruct.CallbackBeforeCreateGroupCommand,
OperationID: mcontext.GetOperationID(ctx),
@ -65,9 +54,10 @@ func CallbackBeforeCreateGroup(ctx context.Context, cfg *GroupEventCallbackConfi
}
resp := &callbackstruct.CallbackBeforeCreateGroupResp{}
if err := http.CallBackPostReturn(ctx, cfg.CallbackUrl, cbReq, resp, cfg.BeforeCreateGroup); err != nil {
if err := s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
return err
}
datautil.NotNilReplace(&req.GroupInfo.GroupID, resp.GroupID)
datautil.NotNilReplace(&req.GroupInfo.GroupName, resp.GroupName)
datautil.NotNilReplace(&req.GroupInfo.Notification, resp.Notification)
@ -83,11 +73,7 @@ func CallbackBeforeCreateGroup(ctx context.Context, cfg *GroupEventCallbackConfi
return nil
}
func CallbackAfterCreateGroup(ctx context.Context, cfg *GroupEventCallbackConfig, req *group.CreateGroupReq) error {
if !cfg.BeforeCreateGroup.Enable {
return nil
}
func (s *groupServer) webhookAfterCreateGroup(ctx context.Context, after *config.AfterConfig, req *group.CreateGroupReq) {
cbReq := &callbackstruct.CallbackAfterCreateGroupReq{
CallbackCommand: callbackstruct.CallbackAfterCreateGroupCommand,
GroupInfo: req.GroupInfo,
@ -108,15 +94,11 @@ func CallbackAfterCreateGroup(ctx context.Context, cfg *GroupEventCallbackConfig
RoleLevel: constant.GroupOrdinaryUsers,
})
}
resp := &callbackstruct.CallbackAfterCreateGroupResp{}
return http.CallBackPostReturn(ctx, cfg.CallbackUrl, cbReq, resp, cfg.BeforeCreateGroup)
s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackAfterCreateGroupResp{}, after)
}
func CallbackBeforeMemberJoinGroup(ctx context.Context, cfg *GroupEventCallbackConfig, groupMember *relation.GroupMemberModel, groupEx string) error {
if !cfg.BeforeCreateGroup.Enable {
return nil
}
callbackReq := &callbackstruct.CallbackBeforeMemberJoinGroupReq{
func (s *groupServer) webhookBeforeMemberJoinGroup(ctx context.Context, before *config.BeforeConfig, groupMember *relation.GroupMemberModel, groupEx string) error {
cbReq := &callbackstruct.CallbackBeforeMemberJoinGroupReq{
CallbackCommand: callbackstruct.CallbackBeforeMemberJoinGroupCommand,
GroupID: groupMember.GroupID,
UserID: groupMember.UserID,
@ -124,15 +106,13 @@ func CallbackBeforeMemberJoinGroup(ctx context.Context, cfg *GroupEventCallbackC
GroupEx: groupEx,
}
resp := &callbackstruct.CallbackBeforeMemberJoinGroupResp{}
if err := http.CallBackPostReturn(ctx, cfg.CallbackUrl, callbackReq, resp, cfg.BeforeCreateGroup); err != nil {
if err := s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
return err
}
if resp.MuteEndTime != nil {
groupMember.MuteEndTime = time.UnixMilli(*resp.MuteEndTime)
}
datautil.NotNilReplace(&groupMember.FaceURL, resp.FaceURL)
datautil.NotNilReplace(&groupMember.Ex, resp.Ex)
datautil.NotNilReplace(&groupMember.Nickname, resp.Nickname)
@ -140,31 +120,26 @@ func CallbackBeforeMemberJoinGroup(ctx context.Context, cfg *GroupEventCallbackC
return nil
}
func CallbackBeforeSetGroupMemberInfo(ctx context.Context, cfg *GroupEventCallbackConfig, req *group.SetGroupMemberInfo) error {
if !cfg.BeforeCreateGroup.Enable {
return nil
}
callbackReq := callbackstruct.CallbackBeforeSetGroupMemberInfoReq{
func (s *groupServer) webhookBeforeSetGroupMemberInfo(ctx context.Context, before *config.BeforeConfig, req *group.SetGroupMemberInfo) error {
cbReq := callbackstruct.CallbackBeforeSetGroupMemberInfoReq{
CallbackCommand: callbackstruct.CallbackBeforeSetGroupMemberInfoCommand,
GroupID: req.GroupID,
UserID: req.UserID,
}
if req.Nickname != nil {
callbackReq.Nickname = &req.Nickname.Value
cbReq.Nickname = &req.Nickname.Value
}
if req.FaceURL != nil {
callbackReq.FaceURL = &req.FaceURL.Value
cbReq.FaceURL = &req.FaceURL.Value
}
if req.RoleLevel != nil {
callbackReq.RoleLevel = &req.RoleLevel.Value
cbReq.RoleLevel = &req.RoleLevel.Value
}
if req.Ex != nil {
callbackReq.Ex = &req.Ex.Value
cbReq.Ex = &req.Ex.Value
}
resp := &callbackstruct.CallbackBeforeSetGroupMemberInfoResp{}
err := http.CallBackPostReturn(ctx, cfg.CallbackUrl, callbackReq, resp, cfg.BeforeCreateGroup)
if err != nil {
if err := s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
return err
}
if resp.FaceURL != nil {
@ -182,112 +157,71 @@ func CallbackBeforeSetGroupMemberInfo(ctx context.Context, cfg *GroupEventCallba
return nil
}
func CallbackAfterSetGroupMemberInfo(ctx context.Context, cfg *GroupEventCallbackConfig, req *group.SetGroupMemberInfo) error {
if !cfg.BeforeCreateGroup.Enable {
return nil
}
callbackReq := callbackstruct.CallbackAfterSetGroupMemberInfoReq{
func (s *groupServer) webhookAfterSetGroupMemberInfo(ctx context.Context, after *config.AfterConfig, req *group.SetGroupMemberInfo) {
cbReq := callbackstruct.CallbackAfterSetGroupMemberInfoReq{
CallbackCommand: callbackstruct.CallbackAfterSetGroupMemberInfoCommand,
GroupID: req.GroupID,
UserID: req.UserID,
}
if req.Nickname != nil {
callbackReq.Nickname = &req.Nickname.Value
cbReq.Nickname = &req.Nickname.Value
}
if req.FaceURL != nil {
callbackReq.FaceURL = &req.FaceURL.Value
cbReq.FaceURL = &req.FaceURL.Value
}
if req.RoleLevel != nil {
callbackReq.RoleLevel = &req.RoleLevel.Value
cbReq.RoleLevel = &req.RoleLevel.Value
}
if req.Ex != nil {
callbackReq.Ex = &req.Ex.Value
cbReq.Ex = &req.Ex.Value
}
resp := &callbackstruct.CallbackAfterSetGroupMemberInfoResp{}
return http.CallBackPostReturn(ctx, cfg.CallbackUrl, callbackReq, resp, cfg.BeforeCreateGroup)
s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackAfterSetGroupMemberInfoResp{}, after)
}
func CallbackQuitGroup(ctx context.Context, cfg *GroupEventCallbackConfig, req *group.QuitGroupReq) error {
if !cfg.BeforeCreateGroup.Enable {
return nil
}
func (s *groupServer) webhookAfterQuitGroup(ctx context.Context, after *config.AfterConfig, req *group.QuitGroupReq) {
cbReq := &callbackstruct.CallbackQuitGroupReq{
CallbackCommand: callbackstruct.CallbackQuitGroupCommand,
CallbackCommand: callbackstruct.CallbackAfterQuitGroupCommand,
GroupID: req.GroupID,
UserID: req.UserID,
}
resp := &callbackstruct.CallbackQuitGroupResp{}
return http.CallBackPostReturn(ctx, cfg.CallbackUrl, cbReq, resp, cfg.BeforeCreateGroup)
s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackQuitGroupResp{}, after)
}
func CallbackKillGroupMember(ctx context.Context, cfg *GroupEventCallbackConfig, req *pbgroup.KickGroupMemberReq) (err error) {
if !cfg.BeforeCreateGroup.Enable {
return nil
}
func (s *groupServer) webhookAfterKickGroupMember(ctx context.Context, after *config.AfterConfig, req *group.KickGroupMemberReq) {
cbReq := &callbackstruct.CallbackKillGroupMemberReq{
CallbackCommand: callbackstruct.CallbackKillGroupCommand,
CallbackCommand: callbackstruct.CallbackAfterKickGroupCommand,
GroupID: req.GroupID,
KickedUserIDs: req.KickedUserIDs,
}
resp := &callbackstruct.CallbackKillGroupMemberResp{}
if err = http.CallBackPostReturn(ctx, cfg.CallbackUrl, cbReq, resp, cfg.BeforeCreateGroup); err != nil {
return err
}
return nil
s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackKillGroupMemberResp{}, after)
}
func CallbackDismissGroup(ctx context.Context, cfg *GroupEventCallbackConfig, req *callbackstruct.CallbackDisMissGroupReq) (err error) {
if !cfg.BeforeCreateGroup.Enable {
return nil
}
req.CallbackCommand = callbackstruct.CallbackDisMissGroupCommand
resp := &callbackstruct.CallbackDisMissGroupResp{}
if err = http.CallBackPostReturn(ctx, cfg.CallbackUrl, req, resp, cfg.BeforeCreateGroup); err != nil {
return err
}
return nil
func (s *groupServer) webhookAfterDismissGroup(ctx context.Context, after *config.AfterConfig, req *callbackstruct.CallbackDisMissGroupReq) {
req.CallbackCommand = callbackstruct.CallbackAfterDisMissGroupCommand
s.webhookClient.AsyncPost(ctx, req.GetCallbackCommand(), req, &callbackstruct.CallbackDisMissGroupResp{}, after)
}
func CallbackApplyJoinGroupBefore(ctx context.Context, cfg *GroupEventCallbackConfig, req *callbackstruct.CallbackJoinGroupReq) (err error) {
if !cfg.BeforeCreateGroup.Enable {
return nil
}
func (s *groupServer) webhookBeforeApplyJoinGroup(ctx context.Context, before *config.BeforeConfig, req *callbackstruct.CallbackJoinGroupReq) (err error) {
req.CallbackCommand = callbackstruct.CallbackBeforeJoinGroupCommand
resp := &callbackstruct.CallbackJoinGroupResp{}
if err = http.CallBackPostReturn(ctx, cfg.CallbackUrl, req, resp, cfg.BeforeCreateGroup); err != nil {
if err := s.webhookClient.SyncPost(ctx, req.GetCallbackCommand(), req, resp, before); err != nil {
return err
}
return nil
}
func CallbackAfterTransferGroupOwner(ctx context.Context, cfg *GroupEventCallbackConfig, req *pbgroup.TransferGroupOwnerReq) (err error) {
if !cfg.BeforeCreateGroup.Enable {
return nil
}
func (s *groupServer) webhookAfterTransferGroupOwner(ctx context.Context, after *config.AfterConfig, req *group.TransferGroupOwnerReq) {
cbReq := &callbackstruct.CallbackTransferGroupOwnerReq{
CallbackCommand: callbackstruct.CallbackAfterTransferGroupOwner,
CallbackCommand: callbackstruct.CallbackAfterTransferGroupOwnerCommand,
GroupID: req.GroupID,
OldOwnerUserID: req.OldOwnerUserID,
NewOwnerUserID: req.NewOwnerUserID,
}
resp := &callbackstruct.CallbackTransferGroupOwnerResp{}
if err = http.CallBackPostReturn(ctx, cfg.CallbackUrl, cbReq, resp, cfg.BeforeCreateGroup); err != nil {
return err
}
return nil
s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackTransferGroupOwnerResp{}, after)
}
func CallbackBeforeInviteUserToGroup(ctx context.Context, cfg *GroupEventCallbackConfig, req *group.InviteUserToGroupReq) (err error) {
if !cfg.BeforeCreateGroup.Enable {
return nil
}
callbackReq := &callbackstruct.CallbackBeforeInviteUserToGroupReq{
func (s *groupServer) webhookBeforeInviteUserToGroup(ctx context.Context, before *config.BeforeConfig, req *group.InviteUserToGroupReq) (err error) {
cbReq := &callbackstruct.CallbackBeforeInviteUserToGroupReq{
CallbackCommand: callbackstruct.CallbackBeforeInviteJoinGroupCommand,
OperationID: mcontext.GetOperationID(ctx),
GroupID: req.GroupID,
@ -296,15 +230,7 @@ func CallbackBeforeInviteUserToGroup(ctx context.Context, cfg *GroupEventCallbac
}
resp := &callbackstruct.CallbackBeforeInviteUserToGroupResp{}
err = http.CallBackPostReturn(
ctx,
cfg.CallbackUrl,
callbackReq,
resp,
cfg.BeforeCreateGroup,
)
if err != nil {
if err := s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
return err
}
@ -315,11 +241,8 @@ func CallbackBeforeInviteUserToGroup(ctx context.Context, cfg *GroupEventCallbac
return nil
}
func CallbackAfterJoinGroup(ctx context.Context, cfg *GroupEventCallbackConfig, req *group.JoinGroupReq) error {
if !cfg.BeforeCreateGroup.Enable {
return nil
}
callbackReq := &callbackstruct.CallbackAfterJoinGroupReq{
func (s *groupServer) webhookAfterJoinGroup(ctx context.Context, after *config.AfterConfig, req *group.JoinGroupReq) {
cbReq := &callbackstruct.CallbackAfterJoinGroupReq{
CallbackCommand: callbackstruct.CallbackAfterJoinGroupCommand,
OperationID: mcontext.GetOperationID(ctx),
GroupID: req.GroupID,
@ -327,18 +250,12 @@ func CallbackAfterJoinGroup(ctx context.Context, cfg *GroupEventCallbackConfig,
JoinSource: req.JoinSource,
InviterUserID: req.InviterUserID,
}
resp := &callbackstruct.CallbackAfterJoinGroupResp{}
if err := http.CallBackPostReturn(ctx, cfg.CallbackUrl, callbackReq, resp, cfg.BeforeCreateGroup); err != nil {
return err
}
return nil
s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackAfterJoinGroupResp{}, after)
}
func CallbackBeforeSetGroupInfo(ctx context.Context, cfg *GroupEventCallbackConfig, req *group.SetGroupInfoReq) error {
if !cfg.BeforeCreateGroup.Enable {
return nil
}
callbackReq := &callbackstruct.CallbackBeforeSetGroupInfoReq{
func (s *groupServer) webhookBeforeSetGroupInfo(ctx context.Context, before *config.BeforeConfig, req *group.SetGroupInfoReq) error {
cbReq := &callbackstruct.CallbackBeforeSetGroupInfoReq{
CallbackCommand: callbackstruct.CallbackBeforeSetGroupInfoCommand,
GroupID: req.GroupInfoForSet.GroupID,
Notification: req.GroupInfoForSet.Notification,
@ -346,23 +263,22 @@ func CallbackBeforeSetGroupInfo(ctx context.Context, cfg *GroupEventCallbackConf
FaceURL: req.GroupInfoForSet.FaceURL,
GroupName: req.GroupInfoForSet.GroupName,
}
if req.GroupInfoForSet.Ex != nil {
callbackReq.Ex = req.GroupInfoForSet.Ex.Value
cbReq.Ex = req.GroupInfoForSet.Ex.Value
}
log.ZDebug(ctx, "debug CallbackBeforeSetGroupInfo", callbackReq.Ex)
log.ZDebug(ctx, "debug CallbackBeforeSetGroupInfo", cbReq.Ex)
if req.GroupInfoForSet.NeedVerification != nil {
callbackReq.NeedVerification = req.GroupInfoForSet.NeedVerification.Value
cbReq.NeedVerification = req.GroupInfoForSet.NeedVerification.Value
}
if req.GroupInfoForSet.LookMemberInfo != nil {
callbackReq.LookMemberInfo = req.GroupInfoForSet.LookMemberInfo.Value
cbReq.LookMemberInfo = req.GroupInfoForSet.LookMemberInfo.Value
}
if req.GroupInfoForSet.ApplyMemberFriend != nil {
callbackReq.ApplyMemberFriend = req.GroupInfoForSet.ApplyMemberFriend.Value
cbReq.ApplyMemberFriend = req.GroupInfoForSet.ApplyMemberFriend.Value
}
resp := &callbackstruct.CallbackBeforeSetGroupInfoResp{}
if err := http.CallBackPostReturn(ctx, cfg.CallbackUrl, callbackReq, resp, cfg.BeforeCreateGroup); err != nil {
if err := s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
return err
}
@ -385,11 +301,8 @@ func CallbackBeforeSetGroupInfo(ctx context.Context, cfg *GroupEventCallbackConf
return nil
}
func CallbackAfterSetGroupInfo(ctx context.Context, cfg *GroupEventCallbackConfig, req *group.SetGroupInfoReq) error {
if !cfg.BeforeCreateGroup.Enable {
return nil
}
callbackReq := &callbackstruct.CallbackAfterSetGroupInfoReq{
func (s *groupServer) webhookAfterSetGroupInfo(ctx context.Context, after *config.AfterConfig, req *group.SetGroupInfoReq) {
cbReq := &callbackstruct.CallbackAfterSetGroupInfoReq{
CallbackCommand: callbackstruct.CallbackAfterSetGroupInfoCommand,
GroupID: req.GroupInfoForSet.GroupID,
Notification: req.GroupInfoForSet.Notification,
@ -398,17 +311,16 @@ func CallbackAfterSetGroupInfo(ctx context.Context, cfg *GroupEventCallbackConfi
GroupName: req.GroupInfoForSet.GroupName,
}
if req.GroupInfoForSet.Ex != nil {
callbackReq.Ex = &req.GroupInfoForSet.Ex.Value
cbReq.Ex = &req.GroupInfoForSet.Ex.Value
}
if req.GroupInfoForSet.NeedVerification != nil {
callbackReq.NeedVerification = &req.GroupInfoForSet.NeedVerification.Value
cbReq.NeedVerification = &req.GroupInfoForSet.NeedVerification.Value
}
if req.GroupInfoForSet.LookMemberInfo != nil {
callbackReq.LookMemberInfo = &req.GroupInfoForSet.LookMemberInfo.Value
cbReq.LookMemberInfo = &req.GroupInfoForSet.LookMemberInfo.Value
}
if req.GroupInfoForSet.ApplyMemberFriend != nil {
callbackReq.ApplyMemberFriend = &req.GroupInfoForSet.ApplyMemberFriend.Value
cbReq.ApplyMemberFriend = &req.GroupInfoForSet.ApplyMemberFriend.Value
}
resp := &callbackstruct.CallbackAfterSetGroupInfoResp{}
return http.CallBackPostReturn(ctx, cfg.CallbackUrl, callbackReq, resp, cfg.BeforeCreateGroup)
s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackAfterSetGroupInfoResp{}, after)
}

@ -19,6 +19,8 @@ import (
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue"
"math/big"
"math/rand"
"strconv"
@ -53,6 +55,11 @@ import (
"google.golang.org/protobuf/types/known/wrapperspb"
)
const (
webhookWorkerCount = 2
webhookBufferSize = 100
)
type groupServer struct {
db controller.GroupDatabase
user rpcclient.UserRpcClient
@ -60,6 +67,7 @@ type groupServer struct {
conversationRpcClient rpcclient.ConversationRpcClient
msgRpcClient rpcclient.MessageRpcClient
config *Config
webhookClient *webhook.Client
}
type Config struct {
@ -112,6 +120,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
gs.conversationRpcClient = conversationRpcClient
gs.msgRpcClient = msgRpcClient
gs.config = config
gs.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL, memAsyncQueue.NewMemoryQueue(webhookWorkerCount, webhookBufferSize))
pbgroup.RegisterGroupServer(server, &gs)
return nil
}
@ -229,13 +238,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR
return nil, servererrs.ErrUserIDNotFound.WrapMsg("user not found")
}
config := &GroupEventCallbackConfig{
CallbackUrl: s.config.WebhooksConfig.URL,
BeforeCreateGroup: s.config.WebhooksConfig.BeforeCreateGroup,
}
// Callback Before create Group
if err := CallbackBeforeCreateGroup(ctx, config, req); err != nil {
if err := s.webhookBeforeCreateGroup(ctx, &s.config.WebhooksConfig.BeforeCreateGroup, req); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
@ -245,11 +248,6 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR
return nil, err
}
beforeCreateGroupConfig := &GroupEventCallbackConfig{
CallbackUrl: s.config.WebhooksConfig.URL,
BeforeCreateGroup: s.config.WebhooksConfig.BeforeMemberJoinGroup,
}
joinGroup := func(userID string, roleLevel int32) error {
groupMember := &relationtb.GroupMemberModel{
GroupID: group.GroupID,
@ -262,7 +260,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR
MuteEndTime: time.UnixMilli(0),
}
if err := CallbackBeforeMemberJoinGroup(ctx, beforeCreateGroupConfig, groupMember, group.Ex); err != nil {
if err := s.webhookBeforeMemberJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, groupMember, group.Ex); err != nil && err != servererrs.ErrCallbackContinue {
return err
}
groupMembers = append(groupMembers, groupMember)
@ -331,9 +329,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR
AdminUserIDs: req.AdminUserIDs,
}
if err := CallbackAfterCreateGroup(ctx, beforeCreateGroupConfig, reqCallBackAfter); err != nil {
return nil, err
}
s.webhookAfterCreateGroup(ctx, &s.config.WebhooksConfig.AfterCreateGroup, reqCallBackAfter)
return resp, nil
}
@ -423,12 +419,7 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite
}
}
beforeInviteUserToGroupConfig := &GroupEventCallbackConfig{
CallbackUrl: s.config.WebhooksConfig.URL,
BeforeCreateGroup: s.config.WebhooksConfig.BeforeInviteUserToGroup,
}
if err := CallbackBeforeInviteUserToGroup(ctx, beforeInviteUserToGroupConfig, req); err != nil {
if err := s.webhookBeforeInviteUserToGroup(ctx, &s.config.WebhooksConfig.BeforeInviteUserToGroup, req); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
@ -474,12 +465,11 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite
MuteEndTime: time.UnixMilli(0),
}
beforeMemberJoinGroupConfig := beforeInviteUserToGroupConfig
if err := CallbackBeforeMemberJoinGroup(ctx, beforeMemberJoinGroupConfig, member, group.Ex); err != nil {
if err := s.webhookBeforeMemberJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, groupMember, group.Ex); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
groupMembers = append(groupMembers, member)
}
if err := s.db.CreateGroup(ctx, nil, groupMembers); err != nil {
return nil, err
@ -647,15 +637,8 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbgroup.KickGrou
if err := s.deleteMemberAndSetConversationSeq(ctx, req.GroupID, req.KickedUserIDs); err != nil {
return nil, err
}
s.webhookAfterKickGroupMember(ctx, &s.config.WebhooksConfig.AfterKickGroupMember, req)
killGroupMemberConfig := &GroupEventCallbackConfig{
CallbackUrl: s.config.WebhooksConfig.URL,
BeforeCreateGroup: s.config.WebhooksConfig.BeforeMemberJoinGroup,
}
if err := CallbackKillGroupMember(ctx, killGroupMemberConfig, req); err != nil {
return nil, err
}
return &pbgroup.KickGroupMemberResp{}, nil
}
@ -823,13 +806,7 @@ func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbgroup
OperatorUserID: mcontext.GetOpUserID(ctx),
Ex: groupRequest.Ex,
}
beforeMemberJoinGroupConfig := &GroupEventCallbackConfig{
CallbackUrl: s.config.WebhooksConfig.URL,
BeforeCreateGroup: s.config.WebhooksConfig.BeforeMemberJoinGroup,
}
if err = CallbackBeforeMemberJoinGroup(ctx, beforeMemberJoinGroupConfig, member, group.Ex); err != nil {
if err := s.webhookBeforeMemberJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, member, group.Ex); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
}
@ -876,14 +853,10 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq)
Ex: req.Ex,
}
applyJoinGroupBeforeConfig := &GroupEventCallbackConfig{
CallbackUrl: s.config.WebhooksConfig.URL,
BeforeCreateGroup: s.config.WebhooksConfig.BeforeMemberJoinGroup,
}
if err = CallbackApplyJoinGroupBefore(ctx, applyJoinGroupBeforeConfig, reqCall); err != nil {
if err := s.webhookBeforeApplyJoinGroup(ctx, &s.config.WebhooksConfig.BeforeApplyJoinGroup, reqCall); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
_, err = s.db.TakeGroupMember(ctx, req.GroupID, req.InviterUserID)
if err == nil {
return nil, errs.ErrArgs.Wrap()
@ -901,10 +874,11 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq)
JoinTime: time.Now(),
MuteEndTime: time.UnixMilli(0),
}
MemberJoinGroupConfig := applyJoinGroupBeforeConfig
if err := CallbackBeforeMemberJoinGroup(ctx, MemberJoinGroupConfig, groupMember, group.Ex); err != nil {
if err := s.webhookBeforeMemberJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, groupMember, group.Ex); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
if err := s.db.CreateGroup(ctx, nil, []*relationtb.GroupMemberModel{groupMember}); err != nil {
return nil, err
}
@ -913,10 +887,8 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq)
return nil, err
}
s.notification.MemberEnterNotification(ctx, req.GroupID, req.InviterUserID)
afterJoinGroupConfig := applyJoinGroupBeforeConfig
if err = CallbackAfterJoinGroup(ctx, afterJoinGroupConfig, req); err != nil {
return nil, err
}
s.webhookAfterJoinGroup(ctx, &s.config.WebhooksConfig.AfterJoinGroup, req)
return &pbgroup.JoinGroupResp{}, nil
}
groupRequest := relationtb.GroupRequestModel{
@ -961,15 +933,8 @@ func (s *groupServer) QuitGroup(ctx context.Context, req *pbgroup.QuitGroupReq)
if err := s.deleteMemberAndSetConversationSeq(ctx, req.GroupID, []string{req.UserID}); err != nil {
return nil, err
}
s.webhookAfterQuitGroup(ctx, &s.config.WebhooksConfig.AfterQuitGroup, req)
quitGroupConfig := &GroupEventCallbackConfig{
CallbackUrl: s.config.WebhooksConfig.URL,
BeforeCreateGroup: s.config.WebhooksConfig.BeforeMemberJoinGroup,
}
if err := CallbackQuitGroup(ctx, quitGroupConfig, req); err != nil {
return nil, err
}
return &pbgroup.QuitGroupResp{}, nil
}
@ -998,14 +963,10 @@ func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf
}
}
beforeSetGroupInfoConfig := &GroupEventCallbackConfig{
CallbackUrl: s.config.WebhooksConfig.URL,
BeforeCreateGroup: s.config.WebhooksConfig.BeforeMemberJoinGroup,
}
if err := CallbackBeforeSetGroupInfo(ctx, beforeSetGroupInfoConfig, req); err != nil {
if err := s.webhookBeforeSetGroupInfo(ctx, &s.config.WebhooksConfig.BeforeSetGroupInfo, req); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
group, err := s.db.TakeGroup(ctx, req.GroupInfoForSet.GroupID)
if err != nil {
return nil, err
@ -1073,10 +1034,8 @@ func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf
_ = s.notification.GroupInfoSetNotification(ctx, tips)
}
afterSetGroupInfoConfig := beforeSetGroupInfoConfig
if err := CallbackAfterSetGroupInfo(ctx, afterSetGroupInfoConfig, req); err != nil {
return nil, err
}
s.webhookAfterSetGroupInfo(ctx, &s.config.WebhooksConfig.AfterSetGroupInfo, req)
return &pbgroup.SetGroupInfoResp{}, nil
}
@ -1119,14 +1078,8 @@ func (s *groupServer) TransferGroupOwner(ctx context.Context, req *pbgroup.Trans
return nil, err
}
afterTransferGroupOwnerConfig := &GroupEventCallbackConfig{
CallbackUrl: s.config.WebhooksConfig.URL,
BeforeCreateGroup: s.config.WebhooksConfig.BeforeMemberJoinGroup,
}
s.webhookAfterTransferGroupOwner(ctx, &s.config.WebhooksConfig.AfterTransferGroupOwner, req)
if err := CallbackAfterTransferGroupOwner(ctx, afterTransferGroupOwnerConfig, req); err != nil {
return nil, err
}
s.notification.GroupOwnerTransferredNotification(ctx, req)
return &pbgroup.TransferGroupOwnerResp{}, nil
}
@ -1285,21 +1238,14 @@ func (s *groupServer) DismissGroup(ctx context.Context, req *pbgroup.DismissGrou
if err != nil {
return nil, err
}
reqCall := &callbackstruct.CallbackDisMissGroupReq{
cbReq := &callbackstruct.CallbackDisMissGroupReq{
GroupID: req.GroupID,
OwnerID: owner.UserID,
MembersID: membersID,
GroupType: string(group.GroupType),
}
dismissGroupConfig := &GroupEventCallbackConfig{
CallbackUrl: s.config.WebhooksConfig.URL,
BeforeCreateGroup: s.config.WebhooksConfig.BeforeMemberJoinGroup,
}
if err := CallbackDismissGroup(ctx, dismissGroupConfig, reqCall); err != nil {
return nil, err
}
s.webhookAfterDismissGroup(ctx, &s.config.WebhooksConfig.AfterDismissGroup, cbReq)
return &pbgroup.DismissGroupResp{}, nil
}
@ -1485,15 +1431,12 @@ func (s *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbgroup.SetGr
}
}
beforeSetGroupMemberInfoConfig := &GroupEventCallbackConfig{
CallbackUrl: s.config.WebhooksConfig.URL,
BeforeCreateGroup: s.config.WebhooksConfig.BeforeMemberJoinGroup,
}
for i := 0; i < len(req.Members); i++ {
if err := CallbackBeforeSetGroupMemberInfo(ctx, beforeSetGroupMemberInfoConfig, req.Members[i]); err != nil {
if err := s.webhookBeforeSetGroupMemberInfo(ctx, &s.config.WebhooksConfig.BeforeSetGroupMemberInfo, req.Members[i]); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
}
if err := s.db.UpdateGroupMembers(ctx, datautil.Slice(req.Members, func(e *pbgroup.SetGroupMemberInfo) *relationtb.BatchUpdateGroupMember {
return &relationtb.BatchUpdateGroupMember{
@ -1517,11 +1460,8 @@ func (s *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbgroup.SetGr
s.notification.GroupMemberInfoSetNotification(ctx, member.GroupID, member.UserID)
}
}
afterSetGroupMemberInfoConfig := beforeSetGroupMemberInfoConfig
for i := 0; i < len(req.Members); i++ {
if err := CallbackAfterSetGroupMemberInfo(ctx, afterSetGroupMemberInfoConfig, req.Members[i]); err != nil {
return nil, err
}
s.webhookAfterSetGroupMemberInfo(ctx, &s.config.WebhooksConfig.AfterSetGroupMemberInfo, req.Members[i])
}
return &pbgroup.SetGroupMemberInfoResp{}, nil

@ -125,9 +125,7 @@ func (m *msgServer) MarkMsgsAsRead(ctx context.Context, req *msg.MarkMsgsAsReadR
Seqs: req.Seqs,
ContentType: conversation.ConversationType,
}
if err := CallbackSingleMsgRead(ctx, &m.config.WebhooksConfig, reqCallback); err != nil {
return nil, err
}
m.webhookAfterSingleMsgRead(ctx, &m.config.WebhooksConfig.AfterSingleMsgRead, reqCallback)
if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID,
m.conversationAndGetRecvID(conversation, req.UserID), req.Seqs, hasReadSeq); err != nil {
@ -198,10 +196,8 @@ func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkCon
UnreadMsgNum: req.HasReadSeq,
ContentType: int64(conversation.ConversationType),
}
if err := CallbackGroupMsgRead(ctx, &m.config.WebhooksConfig, reqCall); err != nil {
return nil, err
}
m.webhookAfterGroupMsgRead(ctx, &m.config.WebhooksConfig.AfterGroupMsgRead, reqCall)
return &msg.MarkConversationAsReadResp{}, nil
}

@ -19,7 +19,6 @@ import (
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/http"
"github.com/openimsdk/protocol/constant"
pbchat "github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
@ -61,77 +60,71 @@ func GetContent(msg *sdkws.MsgData) string {
}
}
func callbackBeforeSendSingleMsg(ctx context.Context, callback *config.Webhooks, msg *pbchat.SendMsgReq) error {
if !callback.BeforeSendSingleMsg.Enable || msg.MsgData.ContentType == constant.Typing {
func (m *msgServer) webhookBeforeSendSingleMsg(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error {
if msg.MsgData.ContentType == constant.Typing {
return nil
}
req := &cbapi.CallbackBeforeSendSingleMsgReq{
cbReq := &cbapi.CallbackBeforeSendSingleMsgReq{
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackBeforeSendSingleMsgCommand),
RecvID: msg.MsgData.RecvID,
}
resp := &cbapi.CallbackBeforeSendSingleMsgResp{}
if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.BeforeSendSingleMsg); err != nil {
if err := m.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
return err
}
return nil
}
func callbackAfterSendSingleMsg(ctx context.Context, callback *config.Webhooks, msg *pbchat.SendMsgReq) error {
if !callback.AfterSendSingleMsg.Enable || msg.MsgData.ContentType == constant.Typing {
return nil
func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) {
if msg.MsgData.ContentType == constant.Typing {
return
}
req := &cbapi.CallbackAfterSendSingleMsgReq{
cbReq := &cbapi.CallbackAfterSendSingleMsgReq{
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand),
RecvID: msg.MsgData.RecvID,
}
resp := &cbapi.CallbackAfterSendSingleMsgResp{}
if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.AfterSendSingleMsg); err != nil {
return err
}
return nil
m.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after)
}
func callbackBeforeSendGroupMsg(ctx context.Context, callback *config.Webhooks, msg *pbchat.SendMsgReq) error {
if !callback.BeforeSendGroupMsg.Enable || msg.MsgData.ContentType == constant.Typing {
func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error {
if msg.MsgData.ContentType == constant.Typing {
return nil
}
req := &cbapi.CallbackBeforeSendGroupMsgReq{
cbReq := &cbapi.CallbackBeforeSendGroupMsgReq{
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackBeforeSendGroupMsgCommand),
GroupID: msg.MsgData.GroupID,
}
resp := &cbapi.CallbackBeforeSendGroupMsgResp{}
if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.BeforeSendGroupMsg); err != nil {
if err := m.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
return err
}
return nil
}
func callbackAfterSendGroupMsg(ctx context.Context, callback *config.Webhooks, msg *pbchat.SendMsgReq) error {
if !callback.AfterSendGroupMsg.Enable || msg.MsgData.ContentType == constant.Typing {
return nil
func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) {
if msg.MsgData.ContentType == constant.Typing {
return
}
req := &cbapi.CallbackAfterSendGroupMsgReq{
cbReq := &cbapi.CallbackAfterSendGroupMsgReq{
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand),
GroupID: msg.MsgData.GroupID,
}
resp := &cbapi.CallbackAfterSendGroupMsgResp{}
if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.AfterSendGroupMsg); err != nil {
return err
}
return nil
m.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after)
}
func callbackMsgModify(ctx context.Context, callback *config.Webhooks, msg *pbchat.SendMsgReq) error {
if !callback.BeforeMsgModify.Enable || msg.MsgData.ContentType != constant.Text {
func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error {
if msg.MsgData.ContentType != constant.Text {
return nil
}
req := &cbapi.CallbackMsgModifyCommandReq{
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackMsgModifyCommand),
cbReq := &cbapi.CallbackMsgModifyCommandReq{
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackBeforeMsgModifyCommand),
}
resp := &cbapi.CallbackMsgModifyCommandResp{}
if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.BeforeMsgModify); err != nil {
if err := m.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
return err
}
if resp.Content != nil {
msg.MsgData.Content = []byte(*resp.Content)
}
@ -154,45 +147,25 @@ func callbackMsgModify(ctx context.Context, callback *config.Webhooks, msg *pbch
return nil
}
func CallbackGroupMsgRead(ctx context.Context, callback *config.Webhooks, req *cbapi.CallbackGroupMsgReadReq) error {
if !callback.AfterGroupMsgRead.Enable {
return nil
}
req.CallbackCommand = cbapi.CallbackGroupMsgReadCommand
resp := &cbapi.CallbackGroupMsgReadResp{}
if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.AfterGroupMsgRead); err != nil {
return err
}
return nil
func (m *msgServer) webhookAfterGroupMsgRead(ctx context.Context, after *config.AfterConfig, req *cbapi.CallbackGroupMsgReadReq) {
req.CallbackCommand = cbapi.CallbackAfterGroupMsgReadCommand
m.webhookClient.AsyncPost(ctx, req.GetCallbackCommand(), req, &cbapi.CallbackGroupMsgReadResp{}, after)
}
func CallbackSingleMsgRead(ctx context.Context, callback *config.Webhooks, req *cbapi.CallbackSingleMsgReadReq) error {
if !callback.AfterSingleMsgRead.Enable {
return nil
}
req.CallbackCommand = cbapi.CallbackSingleMsgRead
func (m *msgServer) webhookAfterSingleMsgRead(ctx context.Context, after *config.AfterConfig, req *cbapi.CallbackSingleMsgReadReq) {
resp := &cbapi.CallbackSingleMsgReadResp{}
req.CallbackCommand = cbapi.CallbackAfterSingleMsgReadCommand
m.webhookClient.AsyncPost(ctx, req.GetCallbackCommand(), req, &cbapi.CallbackSingleMsgReadResp{}, after)
if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.AfterSingleMsgRead); err != nil {
return err
}
return nil
}
func CallbackAfterRevokeMsg(ctx context.Context, callback *config.Webhooks, req *pbchat.RevokeMsgReq) error {
if !callback.AfterRevokeMsg.Enable {
return nil
}
func (m *msgServer) webhookAfterRevokeMsg(ctx context.Context, after *config.AfterConfig, req *pbchat.RevokeMsgReq) {
callbackReq := &cbapi.CallbackAfterRevokeMsgReq{
CallbackCommand: cbapi.CallbackAfterRevokeMsgCommand,
ConversationID: req.ConversationID,
Seq: req.Seq,
UserID: req.UserID,
}
resp := &cbapi.CallbackAfterRevokeMsgResp{}
if err := http.CallBackPostReturn(ctx, callback.URL, callbackReq, resp, callback.AfterRevokeMsg); err != nil {
return err
}
return nil
m.webhookClient.AsyncPost(ctx, callbackReq.GetCallbackCommand(), callbackReq, &cbapi.CallbackAfterRevokeMsgResp{}, after)
}

@ -126,8 +126,6 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.
if err := m.notificationSender.NotificationWithSesstionType(ctx, req.UserID, recvID, constant.MsgRevokeNotification, msgs[0].SessionType, &tips); err != nil {
return nil, err
}
if err = CallbackAfterRevokeMsg(ctx, &m.config.WebhooksConfig, req); err != nil {
return nil, err
}
m.webhookAfterRevokeMsg(ctx, &m.config.WebhooksConfig.AfterRevokeMsg, req)
return &msg.RevokeMsgResp{}, nil
}

@ -59,10 +59,11 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *pbmsg.SendMs
prommetrics.GroupChatMsgProcessFailedCounter.Inc()
return nil, err
}
if err = callbackBeforeSendGroupMsg(ctx, &m.config.WebhooksConfig, req); err != nil {
if err = m.webhookBeforeSendGroupMsg(ctx, &m.config.WebhooksConfig.BeforeSendGroupMsg, req); err != nil {
return nil, err
}
if err := callbackMsgModify(ctx, &m.config.WebhooksConfig, req); err != nil {
if err := m.webhookBeforeMsgModify(ctx, &m.config.WebhooksConfig.BeforeMsgModify, req); err != nil {
return nil, err
}
err = m.MsgDatabase.MsgToMQ(ctx, conversationutil.GenConversationUniqueKeyForGroup(req.MsgData.GroupID), req.MsgData)
@ -72,9 +73,8 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *pbmsg.SendMs
if req.MsgData.ContentType == constant.AtText {
go m.setConversationAtInfo(ctx, req.MsgData)
}
if err = callbackAfterSendGroupMsg(ctx, &m.config.WebhooksConfig, req); err != nil {
log.ZWarn(ctx, "CallbackAfterSendGroupMsg", err)
}
m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req)
prommetrics.GroupChatMsgProcessSuccessCounter.Inc()
resp = &pbmsg.SendMsgResp{}
resp.SendTime = req.MsgData.SendTime
@ -157,21 +157,18 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq
prommetrics.SingleChatMsgProcessFailedCounter.Inc()
return nil, nil
} else {
if err = callbackBeforeSendSingleMsg(ctx, &m.config.WebhooksConfig, req); err != nil {
if err = m.webhookBeforeSendSingleMsg(ctx, &m.config.WebhooksConfig.BeforeSendSingleMsg, req); err != nil {
return nil, err
}
if err := callbackMsgModify(ctx, &m.config.WebhooksConfig, req); err != nil {
if err := m.webhookBeforeMsgModify(ctx, &m.config.WebhooksConfig.BeforeMsgModify, req); err != nil {
return nil, err
}
if err := m.MsgDatabase.MsgToMQ(ctx, conversationutil.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil {
prommetrics.SingleChatMsgProcessFailedCounter.Inc()
return nil, err
}
err = callbackAfterSendSingleMsg(ctx, &m.config.WebhooksConfig, req)
if err != nil {
log.ZWarn(ctx, "CallbackAfterSendSingleMsg", err, "req", req)
}
m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req)
prommetrics.SingleChatMsgProcessSuccessCounter.Inc()
return &pbmsg.SendMsgResp{
ServerMsgID: req.MsgData.ServerMsgID,

@ -17,6 +17,8 @@ package msg
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
@ -32,6 +34,11 @@ import (
"google.golang.org/grpc"
)
const (
webhookWorkerCount = 2
webhookBufferSize = 100
)
type (
// MessageInterceptorChain defines a chain of message interceptor functions.
MessageInterceptorChain []MessageInterceptorFunc
@ -48,6 +55,7 @@ type (
Handlers MessageInterceptorChain // Chain of handlers for processing messages.
notificationSender *rpcclient.NotificationSender // RPC client for sending notifications.
config *Config // Global configuration settings.
webhookClient *webhook.Client
}
Config struct {
@ -101,6 +109,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
ConversationLocalCache: rpccache.NewConversationLocalCache(conversationClient, &config.LocalCacheConfig, rdb),
FriendLocalCache: rpccache.NewFriendLocalCache(friendRpcClient, &config.LocalCacheConfig, rdb),
config: config,
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL, memAsyncQueue.NewMemoryQueue(webhookWorkerCount, webhookBufferSize)),
}
s.notificationSender = rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithLocalSendMsg(s.SendMsg))

@ -20,14 +20,10 @@ import (
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/http"
pbuser "github.com/openimsdk/protocol/user"
)
func CallbackBeforeUpdateUserInfo(ctx context.Context, callback *config.Webhooks, req *pbuser.UpdateUserInfoReq) error {
if !callback.BeforeUpdateUserInfo.Enable {
return nil
}
func (s *userServer) webhookBeforeUpdateUserInfo(ctx context.Context, before *config.BeforeConfig, req *pbuser.UpdateUserInfoReq) error {
cbReq := &cbapi.CallbackBeforeUpdateUserInfoReq{
CallbackCommand: cbapi.CallbackBeforeUpdateUserInfoCommand,
UserID: req.UserInfo.UserID,
@ -35,34 +31,27 @@ func CallbackBeforeUpdateUserInfo(ctx context.Context, callback *config.Webhooks
Nickname: &req.UserInfo.Nickname,
}
resp := &cbapi.CallbackBeforeUpdateUserInfoResp{}
if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.BeforeUpdateUserInfo); err != nil {
if err := s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
return err
}
datautil.NotNilReplace(&req.UserInfo.FaceURL, resp.FaceURL)
datautil.NotNilReplace(&req.UserInfo.Ex, resp.Ex)
datautil.NotNilReplace(&req.UserInfo.Nickname, resp.Nickname)
return nil
}
func CallbackAfterUpdateUserInfo(ctx context.Context, callback *config.Webhooks, req *pbuser.UpdateUserInfoReq) error {
if !callback.AfterUpdateUserInfo.Enable {
return nil
}
func (s *userServer) webhookAfterUpdateUserInfo(ctx context.Context, after *config.AfterConfig, req *pbuser.UpdateUserInfoReq) {
cbReq := &cbapi.CallbackAfterUpdateUserInfoReq{
CallbackCommand: cbapi.CallbackAfterUpdateUserInfoCommand,
UserID: req.UserInfo.UserID,
FaceURL: req.UserInfo.FaceURL,
Nickname: req.UserInfo.Nickname,
}
resp := &cbapi.CallbackAfterUpdateUserInfoResp{}
if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.AfterUpdateUserInfo); err != nil {
return err
}
return nil
s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterUpdateUserInfoResp{}, after)
}
func CallbackBeforeUpdateUserInfoEx(ctx context.Context, callback *config.Webhooks, req *pbuser.UpdateUserInfoExReq) error {
if !callback.BeforeUpdateUserInfoEx.Enable {
return nil
}
func (s *userServer) webhookBeforeUpdateUserInfoEx(ctx context.Context, before *config.BeforeConfig, req *pbuser.UpdateUserInfoExReq) error {
cbReq := &cbapi.CallbackBeforeUpdateUserInfoExReq{
CallbackCommand: cbapi.CallbackBeforeUpdateUserInfoExCommand,
UserID: req.UserInfo.UserID,
@ -70,35 +59,27 @@ func CallbackBeforeUpdateUserInfoEx(ctx context.Context, callback *config.Webhoo
Nickname: req.UserInfo.Nickname,
}
resp := &cbapi.CallbackBeforeUpdateUserInfoExResp{}
if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.BeforeUpdateUserInfoEx); err != nil {
if err := s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
return err
}
datautil.NotNilReplace(req.UserInfo.FaceURL, resp.FaceURL)
datautil.NotNilReplace(req.UserInfo.Ex, resp.Ex)
datautil.NotNilReplace(req.UserInfo.Nickname, resp.Nickname)
return nil
}
func CallbackAfterUpdateUserInfoEx(ctx context.Context, callback *config.Webhooks, req *pbuser.UpdateUserInfoExReq) error {
if !callback.AfterUpdateUserInfoEx.Enable {
return nil
}
func (s *userServer) webhookAfterUpdateUserInfoEx(ctx context.Context, after *config.AfterConfig, req *pbuser.UpdateUserInfoExReq) {
cbReq := &cbapi.CallbackAfterUpdateUserInfoExReq{
CallbackCommand: cbapi.CallbackAfterUpdateUserInfoExCommand,
UserID: req.UserInfo.UserID,
FaceURL: req.UserInfo.FaceURL,
Nickname: req.UserInfo.Nickname,
}
resp := &cbapi.CallbackAfterUpdateUserInfoExResp{}
if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.AfterUpdateUserInfoEx); err != nil {
return err
}
return nil
s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterUpdateUserInfoExResp{}, after)
}
func CallbackBeforeUserRegister(ctx context.Context, callback *config.Webhooks, req *pbuser.UserRegisterReq) error {
if !callback.BeforeUserRegister.Enable {
return nil
}
func (s *userServer) webhookBeforeUserRegister(ctx context.Context, before *config.BeforeConfig, req *pbuser.UserRegisterReq) error {
cbReq := &cbapi.CallbackBeforeUserRegisterReq{
CallbackCommand: cbapi.CallbackBeforeUserRegisterCommand,
Secret: req.Secret,
@ -106,28 +87,23 @@ func CallbackBeforeUserRegister(ctx context.Context, callback *config.Webhooks,
}
resp := &cbapi.CallbackBeforeUserRegisterResp{}
if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.BeforeUserRegister); err != nil {
if err := s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
return err
}
if len(resp.Users) != 0 {
req.Users = resp.Users
}
return nil
}
func CallbackAfterUserRegister(ctx context.Context, callback *config.Webhooks, req *pbuser.UserRegisterReq) error {
if !callback.AfterUserRegister.Enable {
return nil
}
func (s *userServer) webhookAfterUserRegister(ctx context.Context, after *config.AfterConfig, req *pbuser.UserRegisterReq) {
cbReq := &cbapi.CallbackAfterUserRegisterReq{
CallbackCommand: cbapi.CallbackAfterUserRegisterCommand,
Secret: req.Secret,
Users: req.Users,
}
resp := &cbapi.CallbackAfterUserRegisterResp{}
if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.AfterUserRegister); err != nil {
return err
}
return nil
s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterUserRegisterResp{}, after)
}

@ -17,6 +17,8 @@ package user
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue"
"github.com/openimsdk/tools/db/redisutil"
"math/rand"
"strings"
@ -44,6 +46,11 @@ import (
"google.golang.org/grpc"
)
const (
webhookWorkerCount = 2
webhookBufferSize = 100
)
type userServer struct {
db controller.UserDatabase
friendNotificationSender *notification.FriendNotificationSender
@ -52,6 +59,7 @@ type userServer struct {
groupRpcClient *rpcclient.GroupRpcClient
RegisterCenter registry.SvcDiscoveryRegistry
config *Config
webhookClient *webhook.Client
}
type Config struct {
@ -99,6 +107,7 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi
friendNotificationSender: notification.NewFriendNotificationSender(&config.NotificationConfig, &msgRpcClient, notification.WithDBFunc(database.FindWithError)),
userNotificationSender: NewUserNotificationSender(config, &msgRpcClient, WithUserFunc(database.FindWithError)),
config: config,
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL, memAsyncQueue.NewMemoryQueue(webhookWorkerCount, webhookBufferSize)),
}
pbuser.RegisterUserServer(server, u)
return u.db.InitOnce(context.Background(), users)
@ -114,15 +123,21 @@ func (s *userServer) GetDesignateUsers(ctx context.Context, req *pbuser.GetDesig
return resp, nil
}
// deprecated:
//UpdateUserInfo
func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserInfoReq) (resp *pbuser.UpdateUserInfoResp, err error) {
resp = &pbuser.UpdateUserInfoResp{}
err = authverify.CheckAccessV3(ctx, req.UserInfo.UserID, s.config.Share.IMAdminUserID)
if err != nil {
return nil, err
}
if err := CallbackBeforeUpdateUserInfo(ctx, &s.config.WebhooksConfig, req); err != nil {
if err := s.webhookBeforeUpdateUserInfo(ctx, &s.config.WebhooksConfig.BeforeUpdateUserInfo, req); err != nil {
return nil, err
}
data := convert.UserPb2DBMap(req.UserInfo)
if err := s.db.UpdateByMap(ctx, req.UserInfo.UserID, data); err != nil {
return nil, err
@ -140,9 +155,7 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI
for _, friendID := range friends {
s.friendNotificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, friendID)
}
if err = CallbackAfterUpdateUserInfo(ctx, &s.config.WebhooksConfig, req); err != nil {
return nil, err
}
s.webhookAfterUpdateUserInfo(ctx, &s.config.WebhooksConfig.AfterUpdateUserInfo, req)
if err = s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil {
return nil, err
}
@ -154,8 +167,7 @@ func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUse
if err != nil {
return nil, err
}
if err = CallbackBeforeUpdateUserInfoEx(ctx, &s.config.WebhooksConfig, req); err != nil {
if err = s.webhookBeforeUpdateUserInfoEx(ctx, &s.config.WebhooksConfig.BeforeUpdateUserInfoEx, req); err != nil {
return nil, err
}
data := convert.UserPb2DBMapEx(req.UserInfo)
@ -175,9 +187,7 @@ func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUse
for _, friendID := range friends {
s.friendNotificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, friendID)
}
if err := CallbackAfterUpdateUserInfoEx(ctx, &s.config.WebhooksConfig, req); err != nil {
return nil, err
}
s.webhookAfterUpdateUserInfoEx(ctx, &s.config.WebhooksConfig.AfterUpdateUserInfoEx, req)
if err := s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil {
return nil, err
}
@ -273,7 +283,7 @@ func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterR
if exist {
return nil, servererrs.ErrRegisteredAlready.WrapMsg("userID registered already")
}
if err := CallbackBeforeUserRegister(ctx, &s.config.WebhooksConfig, req); err != nil {
if err := s.webhookBeforeUserRegister(ctx, &s.config.WebhooksConfig.BeforeUserRegister, req); err != nil {
return nil, err
}
now := time.Now()
@ -293,9 +303,7 @@ func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterR
return nil, err
}
if err := CallbackAfterUserRegister(ctx, &s.config.WebhooksConfig, req); err != nil {
return nil, err
}
s.webhookAfterUserRegister(ctx, &s.config.WebhooksConfig.AfterUserRegister, req)
return resp, nil
}

@ -64,7 +64,7 @@ type CommonCallbackResp struct {
}
func (c CommonCallbackResp) Parse() error {
if c.ActionCode != servererrs.NoError || c.NextCode == Next {
if c.ActionCode == servererrs.NoError && c.NextCode == Next {
return errs.NewCodeError(int(c.ErrCode), c.ErrMsg).WithDetail(c.ErrDlt)
}
return nil

@ -14,47 +14,44 @@
package callbackstruct
const CallbackBeforeInviteJoinGroupCommand = "callbackBeforeInviteJoinGroupCommand"
const CallbackAfterJoinGroupCommand = "callbackAfterJoinGroupCommand"
const CallbackAfterSetGroupInfoCommand = "callbackAfterSetGroupInfoCommand"
const CallbackBeforeSetGroupInfoCommand = "callbackBeforeSetGroupInfoCommand"
const CallbackAfterRevokeMsgCommand = "callbackBeforeAfterMsgCommand"
const CallbackBeforeAddBlackCommand = "callbackBeforeAddBlackCommand"
const CallbackAfterAddFriendCommand = "callbackAfterAddFriendCommand"
const CallbackBeforeAddFriendAgreeCommand = "callbackBeforeAddFriendAgreeCommand"
const CallbackAfterDeleteFriendCommand = "callbackAfterDeleteFriendCommand"
const CallbackBeforeImportFriendsCommand = "callbackBeforeImportFriendsCommand"
const CallbackAfterImportFriendsCommand = "callbackAfterImportFriendsCommand"
const CallbackAfterRemoveBlackCommand = "callbackAfterRemoveBlackCommand"
const (
CallbackQuitGroupCommand = "callbackQuitGroupCommand"
CallbackKillGroupCommand = "callbackKillGroupCommand"
CallbackDisMissGroupCommand = "callbackDisMissGroupCommand"
CallbackBeforeInviteJoinGroupCommand = "callbackBeforeInviteJoinGroupCommand"
CallbackAfterJoinGroupCommand = "callbackAfterJoinGroupCommand"
CallbackAfterSetGroupInfoCommand = "callbackAfterSetGroupInfoCommand"
CallbackBeforeSetGroupInfoCommand = "callbackBeforeSetGroupInfoCommand"
CallbackAfterRevokeMsgCommand = "callbackBeforeAfterMsgCommand"
CallbackBeforeAddBlackCommand = "callbackBeforeAddBlackCommand"
CallbackAfterAddFriendCommand = "callbackAfterAddFriendCommand"
CallbackBeforeAddFriendAgreeCommand = "callbackBeforeAddFriendAgreeCommand"
CallbackAfterDeleteFriendCommand = "callbackAfterDeleteFriendCommand"
CallbackBeforeImportFriendsCommand = "callbackBeforeImportFriendsCommand"
CallbackAfterImportFriendsCommand = "callbackAfterImportFriendsCommand"
CallbackAfterRemoveBlackCommand = "callbackAfterRemoveBlackCommand"
CallbackAfterQuitGroupCommand = "callbackAfterQuitGroupCommand"
CallbackAfterKickGroupCommand = "callbackAfterKickGroupCommand"
CallbackAfterDisMissGroupCommand = "callbackAfterDisMissGroupCommand"
CallbackBeforeJoinGroupCommand = "callbackBeforeJoinGroupCommand"
CallbackGroupMsgReadCommand = "callbackGroupMsgReadCommand"
CallbackMsgModifyCommand = "callbackMsgModifyCommand"
CallbackAfterGroupMsgReadCommand = "callbackAfterGroupMsgReadCommand"
CallbackBeforeMsgModifyCommand = "callbackBeforeMsgModifyCommand"
CallbackAfterUpdateUserInfoCommand = "callbackAfterUpdateUserInfoCommand"
CallbackAfterUpdateUserInfoExCommand = "callbackAfterUpdateUserInfoExCommand"
CallbackBeforeUpdateUserInfoExCommand = "callbackBeforeUpdateUserInfoExCommand"
CallbackBeforeUserRegisterCommand = "callbackBeforeUserRegisterCommand"
CallbackAfterUserRegisterCommand = "callbackAfterUserRegisterCommand"
CallbackAfterTransferGroupOwner = "callbackAfterTransferGroupOwner"
CallbackBeforeSetFriendRemark = "callbackBeforeSetFriendRemark"
CallbackAfterSetFriendRemark = "callbackAfterSetFriendRemark"
CallbackSingleMsgRead = "callbackSingleMsgRead"
CallbackAfterTransferGroupOwnerCommand = "callbackAfterTransferGroupOwnerCommand"
CallbackBeforeSetFriendRemarkCommand = "callbackBeforeSetFriendRemarkCommand"
CallbackAfterSetFriendRemarkCommand = "callbackAfterSetFriendRemarkCommand"
CallbackAfterSingleMsgReadCommand = "callbackAfterSingleMsgReadCommand"
CallbackBeforeSendSingleMsgCommand = "callbackBeforeSendSingleMsgCommand"
CallbackAfterSendSingleMsgCommand = "callbackAfterSendSingleMsgCommand"
CallbackBeforeSendGroupMsgCommand = "callbackBeforeSendGroupMsgCommand"
CallbackAfterSendGroupMsgCommand = "callbackAfterSendGroupMsgCommand"
CallbackUserOnlineCommand = "callbackUserOnlineCommand"
CallbackUserOfflineCommand = "callbackUserOfflineCommand"
CallbackUserKickOffCommand = "callbackUserKickOffCommand"
CallbackOfflinePushCommand = "callbackOfflinePushCommand"
CallbackOnlinePushCommand = "callbackOnlinePushCommand"
CallbackSuperGroupOnlinePushCommand = "callbackSuperGroupOnlinePushCommand"
CallbackAfterUserOnlineCommand = "callbackAfterUserOnlineCommand"
CallbackAfterUserOfflineCommand = "callbackAfterUserOfflineCommand"
CallbackAfterUserKickOffCommand = "callbackAfterUserKickOffCommand"
CallbackBeforeOfflinePushCommand = "callbackBeforeOfflinePushCommand"
CallbackBeforeOnlinePushCommand = "callbackBeforeOnlinePushCommand"
CallbackBeforeGroupOnlinePushCommand = "callbackBeforeGroupOnlinePushCommand"
CallbackBeforeAddFriendCommand = "callbackBeforeAddFriendCommand"
CallbackBeforeUpdateUserInfoCommand = "callbackBeforeUpdateUserInfoCommand"
CallbackBeforeCreateGroupCommand = "callbackBeforeCreateGroupCommand"

@ -339,12 +339,17 @@ type Redis struct {
MaxRetry int `mapstructure:"MaxRetry"`
}
type WebhookConfig struct {
type BeforeConfig struct {
Enable bool `mapstructure:"enable"`
Timeout int `mapstructure:"timeout"`
FailedContinue bool `mapstructure:"failedContinue"`
}
type AfterConfig struct {
Enable bool `mapstructure:"enable"`
Timeout int `mapstructure:"timeout"`
}
type Share struct {
Secret string `mapstructure:"secret"`
Env string `mapstructure:"env"`
@ -377,53 +382,54 @@ func (r *RpcRegisterName) GetServiceNames() []string {
}
}
// FullConfig stores all configurations for before and after events
type Webhooks struct {
URL string `mapstructure:"url"`
BeforeSendSingleMsg WebhookConfig `mapstructure:"beforeSendSingleMsg"`
BeforeUpdateUserInfoEx WebhookConfig `mapstructure:"beforeUpdateUserInfoEx"`
AfterUpdateUserInfoEx WebhookConfig `mapstructure:"afterUpdateUserInfoEx"`
AfterSendSingleMsg WebhookConfig `mapstructure:"afterSendSingleMsg"`
BeforeSendGroupMsg WebhookConfig `mapstructure:"beforeSendGroupMsg"`
AfterSendGroupMsg WebhookConfig `mapstructure:"afterSendGroupMsg"`
BeforeMsgModify WebhookConfig `mapstructure:"beforeMsgModify"`
AfterUserOnline WebhookConfig `mapstructure:"afterUserOnline"`
AfterUserOffline WebhookConfig `mapstructure:"afterUserOffline"`
AfterUserKickOff WebhookConfig `mapstructure:"afterUserKickOff"`
BeforeOfflinePush WebhookConfig `mapstructure:"beforeOfflinePush"`
BeforeOnlinePush WebhookConfig `mapstructure:"beforeOnlinePush"`
BeforeGroupOnlinePush WebhookConfig `mapstructure:"beforeGroupOnlinePush"`
BeforeAddFriend WebhookConfig `mapstructure:"beforeAddFriend"`
BeforeUpdateUserInfo WebhookConfig `mapstructure:"beforeUpdateUserInfo"`
AfterUpdateUserInfo WebhookConfig `mapstructure:"afterUpdateUserInfo"`
BeforeCreateGroup WebhookConfig `mapstructure:"beforeCreateGroup"`
AfterCreateGroup WebhookConfig `mapstructure:"afterCreateGroup"`
BeforeMemberJoinGroup WebhookConfig `mapstructure:"beforeMemberJoinGroup"`
BeforeSetGroupMemberInfo WebhookConfig `mapstructure:"beforeSetGroupMemberInfo"`
AfterSetGroupMemberInfo WebhookConfig `mapstructure:"afterSetGroupMemberInfo"`
AfterQuitGroup WebhookConfig `mapstructure:"afterQuitGroup"`
AfterKickGroupMember WebhookConfig `mapstructure:"afterKickGroupMember"`
AfterDismissGroup WebhookConfig `mapstructure:"afterDismissGroup"`
BeforeApplyJoinGroup WebhookConfig `mapstructure:"beforeApplyJoinGroup"`
AfterGroupMsgRead WebhookConfig `mapstructure:"afterGroupMsgRead"`
AfterSingleMsgRead WebhookConfig `mapstructure:"afterSingleMsgRead"`
BeforeUserRegister WebhookConfig `mapstructure:"beforeUserRegister"`
AfterUserRegister WebhookConfig `mapstructure:"afterUserRegister"`
AfterTransferGroupOwner WebhookConfig `mapstructure:"afterTransferGroupOwner"`
BeforeSetFriendRemark WebhookConfig `mapstructure:"beforeSetFriendRemark"`
AfterSetFriendRemark WebhookConfig `mapstructure:"afterSetFriendRemark"`
AfterGroupMsgRevoke WebhookConfig `mapstructure:"afterGroupMsgRevoke"`
AfterJoinGroup WebhookConfig `mapstructure:"afterJoinGroup"`
BeforeInviteUserToGroup WebhookConfig `mapstructure:"beforeInviteUserToGroup"`
AfterSetGroupInfo WebhookConfig `mapstructure:"afterSetGroupInfo"`
BeforeSetGroupInfo WebhookConfig `mapstructure:"beforeSetGroupInfo"`
AfterRevokeMsg WebhookConfig `mapstructure:"afterRevokeMsg"`
BeforeAddBlack WebhookConfig `mapstructure:"beforeAddBlack"`
AfterAddFriend WebhookConfig `mapstructure:"afterAddFriend"`
BeforeAddFriendAgree WebhookConfig `mapstructure:"beforeAddFriendAgree"`
AfterDeleteFriend WebhookConfig `mapstructure:"afterDeleteFriend"`
BeforeImportFriends WebhookConfig `mapstructure:"beforeImportFriends"`
AfterImportFriends WebhookConfig `mapstructure:"afterImportFriends"`
AfterRemoveBlack WebhookConfig `mapstructure:"afterRemoveBlack"`
URL string `mapstructure:"url"`
BeforeSendSingleMsg BeforeConfig `mapstructure:"beforeSendSingleMsg"`
BeforeUpdateUserInfoEx BeforeConfig `mapstructure:"beforeUpdateUserInfoEx"`
AfterUpdateUserInfoEx AfterConfig `mapstructure:"afterUpdateUserInfoEx"`
AfterSendSingleMsg AfterConfig `mapstructure:"afterSendSingleMsg"`
BeforeSendGroupMsg BeforeConfig `mapstructure:"beforeSendGroupMsg"`
BeforeMsgModify BeforeConfig `mapstructure:"beforeMsgModify"`
AfterSendGroupMsg AfterConfig `mapstructure:"afterSendGroupMsg"`
AfterUserOnline AfterConfig `mapstructure:"afterUserOnline"`
AfterUserOffline AfterConfig `mapstructure:"afterUserOffline"`
AfterUserKickOff AfterConfig `mapstructure:"afterUserKickOff"`
BeforeOfflinePush BeforeConfig `mapstructure:"beforeOfflinePush"`
BeforeOnlinePush BeforeConfig `mapstructure:"beforeOnlinePush"`
BeforeGroupOnlinePush BeforeConfig `mapstructure:"beforeGroupOnlinePush"`
BeforeAddFriend BeforeConfig `mapstructure:"beforeAddFriend"`
BeforeUpdateUserInfo BeforeConfig `mapstructure:"beforeUpdateUserInfo"`
AfterUpdateUserInfo AfterConfig `mapstructure:"afterUpdateUserInfo"`
BeforeCreateGroup BeforeConfig `mapstructure:"beforeCreateGroup"`
AfterCreateGroup AfterConfig `mapstructure:"afterCreateGroup"`
BeforeMemberJoinGroup BeforeConfig `mapstructure:"beforeMemberJoinGroup"`
BeforeSetGroupMemberInfo BeforeConfig `mapstructure:"beforeSetGroupMemberInfo"`
AfterSetGroupMemberInfo AfterConfig `mapstructure:"afterSetGroupMemberInfo"`
AfterQuitGroup AfterConfig `mapstructure:"afterQuitGroup"`
AfterKickGroupMember AfterConfig `mapstructure:"afterKickGroupMember"`
AfterDismissGroup AfterConfig `mapstructure:"afterDismissGroup"`
BeforeApplyJoinGroup BeforeConfig `mapstructure:"beforeApplyJoinGroup"`
AfterGroupMsgRead AfterConfig `mapstructure:"afterGroupMsgRead"`
AfterSingleMsgRead AfterConfig `mapstructure:"afterSingleMsgRead"`
BeforeUserRegister BeforeConfig `mapstructure:"beforeUserRegister"`
AfterUserRegister AfterConfig `mapstructure:"afterUserRegister"`
AfterTransferGroupOwner AfterConfig `mapstructure:"afterTransferGroupOwner"`
BeforeSetFriendRemark BeforeConfig `mapstructure:"beforeSetFriendRemark"`
AfterSetFriendRemark AfterConfig `mapstructure:"afterSetFriendRemark"`
AfterGroupMsgRevoke AfterConfig `mapstructure:"afterGroupMsgRevoke"`
AfterJoinGroup AfterConfig `mapstructure:"afterJoinGroup"`
BeforeInviteUserToGroup BeforeConfig `mapstructure:"beforeInviteUserToGroup"`
AfterSetGroupInfo AfterConfig `mapstructure:"afterSetGroupInfo"`
BeforeSetGroupInfo BeforeConfig `mapstructure:"beforeSetGroupInfo"`
AfterRevokeMsg AfterConfig `mapstructure:"afterRevokeMsg"`
BeforeAddBlack BeforeConfig `mapstructure:"beforeAddBlack"`
AfterAddFriend AfterConfig `mapstructure:"afterAddFriend"`
BeforeAddFriendAgree BeforeConfig `mapstructure:"beforeAddFriendAgree"`
AfterDeleteFriend AfterConfig `mapstructure:"afterDeleteFriend"`
BeforeImportFriends BeforeConfig `mapstructure:"beforeImportFriends"`
AfterImportFriends AfterConfig `mapstructure:"afterImportFriends"`
AfterRemoveBlack AfterConfig `mapstructure:"afterRemoveBlack"`
}
type ZooKeeper struct {

@ -1,68 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package http
import (
"context"
"encoding/json"
"net/http"
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/httputil"
)
var (
// Define http client.
client = httputil.NewHTTPClient(httputil.NewClientConfig())
)
func init() {
// reset http default transport
http.DefaultTransport.(*http.Transport).MaxConnsPerHost = 100 // default: 2
}
func callBackPostReturn(ctx context.Context, url, command string, input interface{}, output callbackstruct.CallbackResp, callbackConfig config.WebhookConfig) error {
url = url + "/" + command
log.ZInfo(ctx, "callback", "url", url, "input", input, "config", callbackConfig)
b, err := client.Post(ctx, url, nil, input, callbackConfig.Timeout)
if err != nil {
if callbackConfig.FailedContinue {
log.ZInfo(ctx, "callback failed but continue", err, "url", url)
return nil
}
log.ZWarn(ctx, "callback network failed", err, "url", url, "input", input)
return servererrs.ErrNetwork.WrapMsg(err.Error())
}
if err = json.Unmarshal(b, output); err != nil {
if callbackConfig.FailedContinue {
log.ZWarn(ctx, "callback failed but continue", err, "url", url)
return nil
}
log.ZWarn(ctx, "callback json unmarshal failed", err, "url", url, "input", input, "response", string(b))
return servererrs.ErrData.WithDetail(err.Error() + "response format error")
}
if err := output.Parse(); err != nil {
log.ZWarn(ctx, "callback parse failed", err, "url", url, "input", input, "response", string(b))
}
log.ZInfo(ctx, "callback success", "url", url, "input", input, "response", string(b))
return nil
}
func CallBackPostReturn(ctx context.Context, url string, req callbackstruct.CallbackReq, resp callbackstruct.CallbackResp, callbackConfig config.WebhookConfig) error {
return callBackPostReturn(ctx, url, req.GetCallbackCommand(), req, resp, callbackConfig)
}

@ -12,4 +12,4 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package http // import "github.com/openimsdk/open-im-server/v3/pkg/common/http"
package webhook // import "github.com/openimsdk/open-im-server/v3/pkg/common/webhook"

@ -0,0 +1,74 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package webhook
import (
"context"
"encoding/json"
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/httputil"
"net/http"
)
type Client struct {
client *httputil.HTTPClient
url string
queue *memAsyncQueue.MemoryQueue
}
func NewWebhookClient(url string, queue *memAsyncQueue.MemoryQueue) *Client {
http.DefaultTransport.(*http.Transport).MaxConnsPerHost = 100 // Enhance the default number of max connections per host
return &Client{
client: httputil.NewHTTPClient(httputil.NewClientConfig()),
url: url,
queue: queue,
}
}
func (c *Client) SyncPost(ctx context.Context, command string, req callbackstruct.CallbackReq, resp callbackstruct.CallbackResp, before *config.BeforeConfig) error {
if before.Enable {
return c.post(ctx, command, req, resp, before.Timeout)
}
return nil
}
func (c *Client) AsyncPost(ctx context.Context, command string, req callbackstruct.CallbackReq, resp callbackstruct.CallbackResp, after *config.AfterConfig) {
if after.Enable {
c.queue.Push(func() { c.post(ctx, command, req, resp, after.Timeout) })
}
}
func (c *Client) post(ctx context.Context, command string, input interface{}, output callbackstruct.CallbackResp, timeout int) error {
fullURL := c.url + "/" + command
log.ZInfo(ctx, "webhook", "url", fullURL, "input", input, "config", timeout)
operationID, _ := ctx.Value(constant.OperationID).(string)
b, err := c.client.Post(ctx, fullURL, map[string]string{constant.OperationID: operationID}, input, timeout)
if err != nil {
return servererrs.ErrNetwork.WrapMsg(err.Error(), "post url", fullURL)
}
if err = json.Unmarshal(b, output); err != nil {
return servererrs.ErrData.WithDetail(err.Error() + " response format error")
}
if err := output.Parse(); err != nil {
return err
}
log.ZInfo(ctx, "webhook success", "url", fullURL, "input", input, "response", string(b))
return nil
}

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package http
package webhook
import (
"context"

@ -0,0 +1,74 @@
package memAsyncQueue
import (
"errors"
"fmt"
"sync"
"time"
)
// AsyncQueue is the interface responsible for asynchronous processing of functions.
type AsyncQueue interface {
Initialize(processFunc func(), workerCount int, bufferSize int)
Push(task func()) error
}
// MemoryQueue is an implementation of the AsyncQueue interface using a channel to process functions.
type MemoryQueue struct {
taskChan chan func()
wg sync.WaitGroup
isStopped bool
stopMutex sync.Mutex // Mutex to protect access to isStopped
}
func NewMemoryQueue(workerCount int, bufferSize int) *MemoryQueue {
mq := &MemoryQueue{} // Create a new instance of MemoryQueue
mq.Initialize(workerCount, bufferSize) // Initialize it with specified parameters
return mq
}
// Initialize sets up the worker nodes and the buffer size of the channel,
// starting internal goroutines to handle tasks from the channel.
func (mq *MemoryQueue) Initialize(workerCount int, bufferSize int) {
mq.taskChan = make(chan func(), bufferSize) // Initialize the channel with the provided buffer size.
mq.isStopped = false
// Start multiple goroutines based on the specified workerCount.
for i := 0; i < workerCount; i++ {
mq.wg.Add(1)
go func(workerID int) {
defer mq.wg.Done()
for task := range mq.taskChan {
fmt.Printf("Worker %d: Executing task\n", workerID)
task() // Execute the function
}
}(i)
}
}
// Push submits a function to the queue.
// Returns an error if the queue is stopped or if the queue is full.
func (mq *MemoryQueue) Push(task func()) error {
mq.stopMutex.Lock()
if mq.isStopped {
mq.stopMutex.Unlock()
return errors.New("push failed: queue is stopped")
}
mq.stopMutex.Unlock()
select {
case mq.taskChan <- task:
return nil
case <-time.After(time.Millisecond * 100): // Timeout to prevent deadlock/blocking
return errors.New("push failed: queue is full")
}
}
// Stop is used to terminate the internal goroutines and close the channel.
func (mq *MemoryQueue) Stop() {
mq.stopMutex.Lock()
mq.isStopped = true
close(mq.taskChan)
mq.stopMutex.Unlock()
mq.wg.Wait()
}

@ -0,0 +1,91 @@
package memAsyncQueue
import (
"testing"
"time"
)
// TestPushSuccess tests the successful pushing of data into the queue.
func TestPushSuccess(t *testing.T) {
queue := &MemoryQueue{}
queue.Initialize(func(data any) {}, 1, 5) // Small buffer size for test
// Try to push data that should succeed
err := queue.Push("test data")
if err != nil {
t.Errorf("Push should succeed, but got error: %v", err)
}
}
// TestPushFailWhenFull tests that pushing to a full queue results in an error.
func TestPushFailWhenFull(t *testing.T) {
queue := &MemoryQueue{}
queue.Initialize(func(data any) {
time.Sleep(100 * time.Millisecond) // Simulate work to delay processing
}, 1, 1) // Very small buffer to fill quickly
queue.Push("data 1") // Fill the buffer
err := queue.Push("data 2") // This should fail
if err == nil {
t.Error("Expected an error when pushing to full queue, but got none")
}
}
// TestPushFailWhenStopped tests that pushing to a stopped queue results in an error.
func TestPushFailWhenStopped(t *testing.T) {
queue := &MemoryQueue{}
queue.Initialize(func(data any) {}, 1, 1)
queue.Stop() // Stop the queue before pushing
err := queue.Push("test data")
if err == nil {
t.Error("Expected an error when pushing to stopped queue, but got none")
}
}
// TestQueueOperationSequence tests a sequence of operations to ensure the queue handles them correctly.
func TestQueueOperationSequence(t *testing.T) {
queue := &MemoryQueue{}
queue.Initialize(func(data any) {}, 1, 2)
// Sequence of pushes and a stop
err := queue.Push("data 1")
if err != nil {
t.Errorf("Failed to push data 1: %v", err)
}
err = queue.Push("data 2")
if err != nil {
t.Errorf("Failed to push data 2: %v", err)
}
queue.Stop() // Stop the queue
err = queue.Push("data 3") // This push should fail
if err == nil {
t.Error("Expected an error when pushing after stop, but got none")
}
}
// TestBlockingOnFull tests that the queue does not block indefinitely when full.
func TestBlockingOnFull(t *testing.T) {
queue := &MemoryQueue{}
queue.Initialize(func(data any) {
time.Sleep(1 * time.Second) // Simulate a long processing time
}, 1, 1)
queue.Push("data 1") // Fill the queue
start := time.Now()
err := queue.Push("data 2") // This should time out
duration := time.Since(start)
if err == nil {
t.Error("Expected an error due to full queue, but got none")
}
if duration >= time.Second {
t.Errorf("Push blocked for too long, duration: %v", duration)
}
}

@ -35,8 +35,8 @@ DRY_RUN=${DRY_RUN:-""}
REGENERATE_DOCS=${REGENERATE_DOCS:-""}
UPSTREAM_REMOTE=${UPSTREAM_REMOTE:-upstream}
FORK_REMOTE=${FORK_REMOTE:-origin}
MAIN_REPO_ORG=${MAIN_REPO_ORG:-$(git remote get-url "$UPSTREAM_REMOTE" | awk '{gsub(/http[s]:\/\/|git@/,"")}1' | awk -F'[@:./]' 'NR==1{print $3}')}
MAIN_REPO_NAME=${MAIN_REPO_NAME:-$(git remote get-url "$UPSTREAM_REMOTE" | awk '{gsub(/http[s]:\/\/|git@/,"")}1' | awk -F'[@:./]' 'NR==1{print $4}')}
MAIN_REPO_ORG=${MAIN_REPO_ORG:-$(git remote get-url "$UPSTREAM_REMOTE" | awk '{gsub(/webhook[s]:\/\/|git@/,"")}1' | awk -F'[@:./]' 'NR==1{print $3}')}
MAIN_REPO_NAME=${MAIN_REPO_NAME:-$(git remote get-url "$UPSTREAM_REMOTE" | awk '{gsub(/webhook[s]:\/\/|git@/,"")}1' | awk -F'[@:./]' 'NR==1{print $4}')}
if [[ -z ${GITHUB_USER:-} ]]; then
openim::log::error_exit "Please export GITHUB_USER=<your-user> (or GH organization, if that's where your fork lives)"

@ -101,7 +101,7 @@ func main() {
}
/* func getLatestVersion(url string) (string, error) {
resp, err := http.Get(url)
resp, err := webhook.Get(url)
if err != nil {
return "", err
}

@ -392,7 +392,7 @@ func (m *Manage) HttpGet(ctx context.Context, url string) (*http.Response, error
}
if response.StatusCode != http.StatusOK {
_ = response.Body.Close()
return nil, fmt.Errorf("http get %s status %s", url, response.Status)
return nil, fmt.Errorf("webhook get %s status %s", url, response.Status)
}
return response, nil
}

Loading…
Cancel
Save