AndrewZuo01 2 years ago
commit 4735751e62

@ -207,6 +207,12 @@ jobs:
sudo make check || \ sudo make check || \
(echo "An error occurred, printing logs:" && sudo cat ./_output/logs/* 2>/dev/null) (echo "An error occurred, printing logs:" && sudo cat ./_output/logs/* 2>/dev/null)
- name: Restart Services and Print Logs for Ubuntu
if: runner.os == 'Linux'
run: |
sudo make restart
sudo make check
# - name: Build, Start, Check Services and Print Logs for macOS # - name: Build, Start, Check Services and Print Logs for macOS
# if: runner.os == 'macOS' # if: runner.os == 'macOS'
# run: | # run: |
@ -239,4 +245,4 @@ jobs:
- name: Test Docker Build - name: Test Docker Build
run: | run: |
sudo make init sudo make init
sudo make image sudo make image

@ -95,7 +95,7 @@ stop:
## restart: Restart openim (make init configuration file is initialized) ✨ ## restart: Restart openim (make init configuration file is initialized) ✨
.PHONY: restart .PHONY: restart
restart: clean stop build init start check restart: clean stop build start check
## multiarch: Build binaries for multiple platforms. See option PLATFORMS. ✨ ## multiarch: Build binaries for multiple platforms. See option PLATFORMS. ✨
.PHONY: multiarch .PHONY: multiarch

@ -97,6 +97,10 @@ It's crafted in Golang and supports cross-platform deployment, ensuring a cohere
## :rocket: Quick Start ## :rocket: Quick Start
We support many platforms. Here are the addresses for quick experience on the web side
👉 **[OpenIM online web demo](https://web-enterprise.rentsoft.cn/)**
You can quickly learn OpenIM engineering solutions, all it takes is one simple command: You can quickly learn OpenIM engineering solutions, all it takes is one simple command:
```bash ```bash

@ -247,6 +247,14 @@ manager:
userID: [ "${MANAGER_USERID_1}", "${MANAGER_USERID_2}", "${MANAGER_USERID_3}" ] userID: [ "${MANAGER_USERID_1}", "${MANAGER_USERID_2}", "${MANAGER_USERID_3}" ]
nickname: [ "${NICKNAME_1}", "${NICKNAME_2}", "${NICKNAME_3}" ] nickname: [ "${NICKNAME_1}", "${NICKNAME_2}", "${NICKNAME_3}" ]
# chatAdmin, use for send notification
#
# Built-in app system notification account ID
# Built-in app system notification account nickname
im-admin:
userID: [ "${IM_ADMIN_USERID}" ]
nickname: [ "${IM_ADMIN_NAME}" ]
# Multi-platform login policy # Multi-platform login policy
# For each platform(Android, iOS, Windows, Mac, web), only one can be online at a time # For each platform(Android, iOS, Windows, Mac, web), only one can be online at a time
multiLoginPolicy: ${MULTILOGIN_POLICY} multiLoginPolicy: ${MULTILOGIN_POLICY}

@ -44,12 +44,12 @@ scrape_configs:
# prometheus fetches application services # prometheus fetches application services
- job_name: 'openimserver-openim-api' - job_name: 'openimserver-openim-api'
static_configs: static_configs:
- targets: [ '${OPENIM_SERVER_ADDRESS}:${API_PROM_PORT}' ] - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${API_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-msggateway' - job_name: 'openimserver-openim-msggateway'
static_configs: static_configs:
- targets: [ '${OPENIM_SERVER_ADDRESS}:${MSG_GATEWAY_PROM_PORT}' ] - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${MSG_GATEWAY_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-msgtransfer' - job_name: 'openimserver-openim-msgtransfer'
@ -59,41 +59,41 @@ scrape_configs:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-push' - job_name: 'openimserver-openim-push'
static_configs: static_configs:
- targets: [ '${OPENIM_SERVER_ADDRESS}:${PUSH_PROM_PORT}' ] - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${PUSH_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-rpc-auth' - job_name: 'openimserver-openim-rpc-auth'
static_configs: static_configs:
- targets: [ '${OPENIM_SERVER_ADDRESS}:${AUTH_PROM_PORT}' ] - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${AUTH_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-rpc-conversation' - job_name: 'openimserver-openim-rpc-conversation'
static_configs: static_configs:
- targets: [ '${OPENIM_SERVER_ADDRESS}:${CONVERSATION_PROM_PORT}' ] - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${CONVERSATION_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-rpc-friend' - job_name: 'openimserver-openim-rpc-friend'
static_configs: static_configs:
- targets: [ '${OPENIM_SERVER_ADDRESS}:${FRIEND_PROM_PORT}' ] - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${FRIEND_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-rpc-group' - job_name: 'openimserver-openim-rpc-group'
static_configs: static_configs:
- targets: [ '${OPENIM_SERVER_ADDRESS}:${GROUP_PROM_PORT}' ] - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${GROUP_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-rpc-msg' - job_name: 'openimserver-openim-rpc-msg'
static_configs: static_configs:
- targets: [ '${OPENIM_SERVER_ADDRESS}:${MESSAGE_PROM_PORT}' ] - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${MESSAGE_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-rpc-third' - job_name: 'openimserver-openim-rpc-third'
static_configs: static_configs:
- targets: [ '${OPENIM_SERVER_ADDRESS}:${THIRD_PROM_PORT}' ] - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${THIRD_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-rpc-user' - job_name: 'openimserver-openim-rpc-user'
static_configs: static_configs:
- targets: [ '${OPENIM_SERVER_ADDRESS}:${USER_PROM_PORT}' ] - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${USER_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'

@ -453,43 +453,45 @@ This section involves configuring the log settings, including storage location,
This section involves setting up additional configuration variables for Websocket, Push Notifications, and Chat. This section involves setting up additional configuration variables for Websocket, Push Notifications, and Chat.
| Parameter | Example Value | Description | | Parameter | Example Value | Description |
|-------------------------|-------------------|------------------------------------| |-------------------------|-------------------|----------------------------------|
| WEBSOCKET_MAX_CONN_NUM | "100000" | Maximum Websocket connections | | WEBSOCKET_MAX_CONN_NUM | "100000" | Maximum Websocket connections |
| WEBSOCKET_MAX_MSG_LEN | "4096" | Maximum Websocket message length | | WEBSOCKET_MAX_MSG_LEN | "4096" | Maximum Websocket message length |
| WEBSOCKET_TIMEOUT | "10" | Websocket timeout | | WEBSOCKET_TIMEOUT | "10" | Websocket timeout |
| PUSH_ENABLE | "getui" | Push notification enable status | | PUSH_ENABLE | "getui" | Push notification enable status |
| GETUI_PUSH_URL | [Generated URL] | GeTui Push Notification URL | | GETUI_PUSH_URL | [Generated URL] | GeTui Push Notification URL |
| GETUI_MASTER_SECRET | [User Defined] | GeTui Master Secret | | GETUI_MASTER_SECRET | [User Defined] | GeTui Master Secret |
| GETUI_APP_KEY | [User Defined] | GeTui Application Key | | GETUI_APP_KEY | [User Defined] | GeTui Application Key |
| GETUI_INTENT | [User Defined] | GeTui Push Intent | | GETUI_INTENT | [User Defined] | GeTui Push Intent |
| GETUI_CHANNEL_ID | [User Defined] | GeTui Channel ID | | GETUI_CHANNEL_ID | [User Defined] | GeTui Channel ID |
| GETUI_CHANNEL_NAME | [User Defined] | GeTui Channel Name | | GETUI_CHANNEL_NAME | [User Defined] | GeTui Channel Name |
| FCM_SERVICE_ACCOUNT | "x.json" | FCM Service Account | | FCM_SERVICE_ACCOUNT | "x.json" | FCM Service Account |
| JPNS_APP_KEY | [User Defined] | JPNS Application Key | | JPNS_APP_KEY | [User Defined] | JPNS Application Key |
| JPNS_MASTER_SECRET | [User Defined] | JPNS Master Secret | | JPNS_MASTER_SECRET | [User Defined] | JPNS Master Secret |
| JPNS_PUSH_URL | [User Defined] | JPNS Push Notification URL | | JPNS_PUSH_URL | [User Defined] | JPNS Push Notification URL |
| JPNS_PUSH_INTENT | [User Defined] | JPNS Push Intent | | JPNS_PUSH_INTENT | [User Defined] | JPNS Push Intent |
| MANAGER_USERID_1 | "openIM123456" | Administrator ID 1 | | MANAGER_USERID_1 | "openIM123456" | Administrator ID 1 |
| MANAGER_USERID_2 | "openIM654321" | Administrator ID 2 | | MANAGER_USERID_2 | "openIM654321" | Administrator ID 2 |
| MANAGER_USERID_3 | "openIMAdmin" | Administrator ID 3 | | MANAGER_USERID_3 | "openIMAdmin" | Administrator ID 3 |
| NICKNAME_1 | "system1" | Nickname 1 | | NICKNAME_1 | "system1" | Nickname 1 |
| NICKNAME_2 | "system2" | Nickname 2 | | NICKNAME_2 | "system2" | Nickname 2 |
| NICKNAME_3 | "system3" | Nickname 3 | | NICKNAME_3 | "system3" | Nickname 3 |
| MULTILOGIN_POLICY | "1" | Multi-login Policy | | IM_ADMIN_USERID | "imAdmin" | IM Administrator ID |
| CHAT_PERSISTENCE_MYSQL | "true" | Chat Persistence in MySQL | | IM_ADMIN_NAME | "imAdmin" | IM Administrator Nickname |
| MSG_CACHE_TIMEOUT | "86400" | Message Cache Timeout | | MULTILOGIN_POLICY | "1" | Multi-login Policy |
| GROUP_MSG_READ_RECEIPT | "true" | Group Message Read Receipt Enable | | CHAT_PERSISTENCE_MYSQL | "true" | Chat Persistence in MySQL |
| MSG_CACHE_TIMEOUT | "86400" | Message Cache Timeout |
| GROUP_MSG_READ_RECEIPT | "true" | Group Message Read Receipt Enable |
| SINGLE_MSG_READ_RECEIPT | "true" | Single Message Read Receipt Enable | | SINGLE_MSG_READ_RECEIPT | "true" | Single Message Read Receipt Enable |
| RETAIN_CHAT_RECORDS | "365" | Retain Chat Records (in days) | | RETAIN_CHAT_RECORDS | "365" | Retain Chat Records (in days) |
| CHAT_RECORDS_CLEAR_TIME | [Cron Expression] | Chat Records Clear Time | | CHAT_RECORDS_CLEAR_TIME | [Cron Expression] | Chat Records Clear Time |
| MSG_DESTRUCT_TIME | [Cron Expression] | Message Destruct Time | | MSG_DESTRUCT_TIME | [Cron Expression] | Message Destruct Time |
| SECRET | "${PASSWORD}" | Secret Key | | SECRET | "${PASSWORD}" | Secret Key |
| TOKEN_EXPIRE | "90" | Token Expiry Time | | TOKEN_EXPIRE | "90" | Token Expiry Time |
| FRIEND_VERIFY | "false" | Friend Verification Enable | | FRIEND_VERIFY | "false" | Friend Verification Enable |
| IOS_PUSH_SOUND | "xxx" | iOS | | IOS_PUSH_SOUND | "xxx" | iOS |
| CALLBACK_ENABLE | "false" | Enable callback | | CALLBACK_ENABLE | "false" | Enable callback |
| CALLBACK_TIMEOUT | "5" | Maximum timeout for callback call | | CALLBACK_TIMEOUT | "5" | Maximum timeout for callback call |
| CALLBACK_FAILED_CONTINUE| "true" | fails to continue to the next step | | CALLBACK_FAILED_CONTINUE| "true" | fails to continue to the next step |
### 2.20. <a name='PrometheusConfiguration-1'></a>Prometheus Configuration ### 2.20. <a name='PrometheusConfiguration-1'></a>Prometheus Configuration

@ -4,6 +4,8 @@ go 1.19
require ( require (
firebase.google.com/go v3.13.0+incompatible firebase.google.com/go v3.13.0+incompatible
github.com/OpenIMSDK/protocol v0.0.41
github.com/OpenIMSDK/tools v0.0.21
github.com/bwmarrin/snowflake v0.3.0 // indirect github.com/bwmarrin/snowflake v0.3.0 // indirect
github.com/dtm-labs/rockscache v0.1.1 github.com/dtm-labs/rockscache v0.1.1
github.com/gin-gonic/gin v1.9.1 github.com/gin-gonic/gin v1.9.1
@ -38,6 +40,7 @@ require (
github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible
github.com/go-redis/redis v6.15.9+incompatible github.com/go-redis/redis v6.15.9+incompatible
github.com/redis/go-redis/v9 v9.2.1 github.com/redis/go-redis/v9 v9.2.1
github.com/stathat/consistent v1.0.0
github.com/tencentyun/cos-go-sdk-v5 v0.7.45 github.com/tencentyun/cos-go-sdk-v5 v0.7.45
go.uber.org/automaxprocs v1.5.3 go.uber.org/automaxprocs v1.5.3
golang.org/x/sync v0.4.0 golang.org/x/sync v0.4.0
@ -132,7 +135,7 @@ require (
golang.org/x/oauth2 v0.13.0 // indirect golang.org/x/oauth2 v0.13.0 // indirect
golang.org/x/sys v0.14.0 // indirect golang.org/x/sys v0.14.0 // indirect
golang.org/x/text v0.13.0 // indirect golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect golang.org/x/time v0.5.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20231002182017-d307bd883b97 // indirect google.golang.org/genproto v0.0.0-20231002182017-d307bd883b97 // indirect
@ -141,6 +144,7 @@ require (
gopkg.in/src-d/go-billy.v4 v4.3.2 // indirect gopkg.in/src-d/go-billy.v4 v4.3.2 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect
gorm.io/gorm v1.23.8 // indirect gorm.io/gorm v1.23.8 // indirect
stathat.com/c/consistent v1.0.0 // indirect
) )
require ( require (

@ -20,8 +20,10 @@ github.com/AndrewZuo01/protocol v0.0.0-20231219031520-648989b91fca/go.mod h1:F25
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c= github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c=
github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ=
github.com/OpenIMSDK/tools v0.0.20 h1:zBTjQZRJ5lR1FIzP9mtWyAvh5dKsmJXQugi4p8X/97k= github.com/OpenIMSDK/protocol v0.0.42 h1:vIWXqZJZZ1ddleJA25fxhjZ1GyEHATpYM3wVWh4/+PY=
github.com/OpenIMSDK/tools v0.0.20/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= github.com/OpenIMSDK/protocol v0.0.42/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
github.com/OpenIMSDK/tools v0.0.21 h1:iTapc2mIEVH/xl5Nd6jfwPub11Pgp44tVcE1rjB3a48=
github.com/OpenIMSDK/tools v0.0.21/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI=
github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM=
github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7 h1:uSoVVbwJiQipAclBbw+8quDsfcvFjOpI5iCf4p/cqCs= github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7 h1:uSoVVbwJiQipAclBbw+8quDsfcvFjOpI5iCf4p/cqCs=
github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs= github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs=
@ -308,6 +310,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/src-d/gcfg v1.4.0 h1:xXbNR5AlLSA315x2UO+fTSSAXCDf+Ar38/6oyGbDKQ4= github.com/src-d/gcfg v1.4.0 h1:xXbNR5AlLSA315x2UO+fTSSAXCDf+Ar38/6oyGbDKQ4=
github.com/src-d/gcfg v1.4.0/go.mod h1:p/UMsR43ujA89BJY9duynAwIpvqEujIH/jFlfL7jWoI= github.com/src-d/gcfg v1.4.0/go.mod h1:p/UMsR43ujA89BJY9duynAwIpvqEujIH/jFlfL7jWoI=
github.com/stathat/consistent v1.0.0 h1:ZFJ1QTRn8npNBKW065raSZ8xfOqhpb8vLOkfp4CcL/U=
github.com/stathat/consistent v1.0.0/go.mod h1:uajTPbgSygZBJ+V+0mY7meZ8i0XAcZs7AQ6V121XSxw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
@ -451,8 +455,8 @@ golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
@ -536,3 +540,5 @@ gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=
stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c=
stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0=

@ -33,6 +33,10 @@ func (o *ConversationApi) GetAllConversations(c *gin.Context) {
a2r.Call(conversation.ConversationClient.GetAllConversations, o.Client, c) a2r.Call(conversation.ConversationClient.GetAllConversations, o.Client, c)
} }
func (o *ConversationApi) GetConversationsList(c *gin.Context) {
a2r.Call(conversation.ConversationClient.GetConversationList, o.Client, c)
}
func (o *ConversationApi) GetConversation(c *gin.Context) { func (o *ConversationApi) GetConversation(c *gin.Context) {
a2r.Call(conversation.ConversationClient.GetConversation, o.Client, c) a2r.Call(conversation.ConversationClient.GetConversation, o.Client, c)
} }

@ -169,10 +169,10 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM
case constant.OANotification: case constant.OANotification:
data = apistruct.OANotificationElem{} data = apistruct.OANotificationElem{}
req.SessionType = constant.NotificationChatType req.SessionType = constant.NotificationChatType
if !authverify.IsManagerUserID(req.SendID) { if err = m.userRpcClient.GetNotificationByID(c, req.SendID); err != nil {
return nil, errs.ErrNoPermission. return nil, err
Wrap("only app manager can as sender send OANotificationElem")
} }
default: default:
return nil, errs.ErrArgs.WithDetail("not support err contentType") return nil, errs.ErrArgs.WithDetail("not support err contentType")
} }
@ -186,38 +186,63 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM
return m.newUserSendMsgReq(c, &req), nil return m.newUserSendMsgReq(c, &req), nil
} }
// SendMessage handles the sending of a message. It's an HTTP handler function to be used with Gin framework.
func (m *MessageApi) SendMessage(c *gin.Context) { func (m *MessageApi) SendMessage(c *gin.Context) {
// Initialize a request struct for sending a message.
req := apistruct.SendMsgReq{} req := apistruct.SendMsgReq{}
// Bind the JSON request body to the request struct.
if err := c.BindJSON(&req); err != nil { if err := c.BindJSON(&req); err != nil {
// Respond with an error if request body binding fails.
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
return return
} }
// Check if the user has the app manager role.
if !authverify.IsAppManagerUid(c) { if !authverify.IsAppManagerUid(c) {
// Respond with a permission error if the user is not an app manager.
apiresp.GinError(c, errs.ErrNoPermission.Wrap("only app manager can send message")) apiresp.GinError(c, errs.ErrNoPermission.Wrap("only app manager can send message"))
return return
} }
// Prepare the message request with additional required data.
sendMsgReq, err := m.getSendMsgReq(c, req.SendMsg) sendMsgReq, err := m.getSendMsgReq(c, req.SendMsg)
if err != nil { if err != nil {
// Log and respond with an error if preparation fails.
log.ZError(c, "decodeData failed", err) log.ZError(c, "decodeData failed", err)
apiresp.GinError(c, err) apiresp.GinError(c, err)
return return
} }
// Set the receiver ID in the message data.
sendMsgReq.MsgData.RecvID = req.RecvID sendMsgReq.MsgData.RecvID = req.RecvID
// Declare a variable to store the message sending status.
var status int var status int
// Attempt to send the message using the client.
respPb, err := m.Client.SendMsg(c, sendMsgReq) respPb, err := m.Client.SendMsg(c, sendMsgReq)
if err != nil { if err != nil {
// Set the status to failed and respond with an error if sending fails.
status = constant.MsgSendFailed status = constant.MsgSendFailed
log.ZError(c, "send message err", err) log.ZError(c, "send message err", err)
apiresp.GinError(c, err) apiresp.GinError(c, err)
return return
} }
// Set the status to successful if the message is sent.
status = constant.MsgSendSuccessed status = constant.MsgSendSuccessed
// Attempt to update the message sending status in the system.
_, err = m.Client.SetSendMsgStatus(c, &msg.SetSendMsgStatusReq{ _, err = m.Client.SetSendMsgStatus(c, &msg.SetSendMsgStatusReq{
Status: int32(status), Status: int32(status),
}) })
if err != nil { if err != nil {
// Log the error if updating the status fails.
log.ZError(c, "SetSendMsgStatus failed", err) log.ZError(c, "SetSendMsgStatus failed", err)
} }
// Respond with a success message and the response payload.
apiresp.GinSuccess(c, respPb) apiresp.GinSuccess(c, respPb)
} }
@ -225,13 +250,14 @@ func (m *MessageApi) SendBusinessNotification(c *gin.Context) {
req := struct { req := struct {
Key string `json:"key"` Key string `json:"key"`
Data string `json:"data"` Data string `json:"data"`
SendUserID string `json:"sendUserID"` SendUserID string `json:"sendUserID" binding:"required"`
RecvUserID string `json:"recvUserID"` RecvUserID string `json:"recvUserID" binding:"required"`
}{} }{}
if err := c.BindJSON(&req); err != nil { if err := c.BindJSON(&req); err != nil {
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
return return
} }
if !authverify.IsAppManagerUid(c) { if !authverify.IsAppManagerUid(c) {
apiresp.GinError(c, errs.ErrNoPermission.Wrap("only app manager can send message")) apiresp.GinError(c, errs.ErrNoPermission.Wrap("only app manager can send message"))
return return

@ -83,6 +83,10 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive
userRouterGroup.POST("/process_user_command_delete", ParseToken, u.ProcessUserCommandDelete) userRouterGroup.POST("/process_user_command_delete", ParseToken, u.ProcessUserCommandDelete)
userRouterGroup.POST("/process_user_command_update", ParseToken, u.ProcessUserCommandUpdate) userRouterGroup.POST("/process_user_command_update", ParseToken, u.ProcessUserCommandUpdate)
userRouterGroup.POST("/process_user_command_get", ParseToken, u.ProcessUserCommandGet) userRouterGroup.POST("/process_user_command_get", ParseToken, u.ProcessUserCommandGet)
userRouterGroup.POST("/add_notification_account", ParseToken, u.AddNotificationAccount)
userRouterGroup.POST("/update_notification_account", ParseToken, u.UpdateNotificationAccountInfo)
userRouterGroup.POST("/search_notification_account", ParseToken, u.SearchNotificationAccount)
} }
// friend routing group // friend routing group
friendRouterGroup := r.Group("/friend", ParseToken) friendRouterGroup := r.Group("/friend", ParseToken)
@ -168,6 +172,8 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive
objectGroup.POST("/auth_sign", t.AuthSign) objectGroup.POST("/auth_sign", t.AuthSign)
objectGroup.POST("/complete_multipart_upload", t.CompleteMultipartUpload) objectGroup.POST("/complete_multipart_upload", t.CompleteMultipartUpload)
objectGroup.POST("/access_url", t.AccessURL) objectGroup.POST("/access_url", t.AccessURL)
objectGroup.POST("/initiate_form_data", t.InitiateFormData)
objectGroup.POST("/complete_form_data", t.CompleteFormData)
objectGroup.GET("/*name", t.ObjectRedirect) objectGroup.GET("/*name", t.ObjectRedirect)
} }
// Message // Message
@ -198,6 +204,7 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive
conversationGroup := r.Group("/conversation", ParseToken) conversationGroup := r.Group("/conversation", ParseToken)
{ {
c := NewConversationApi(*conversationRpc) c := NewConversationApi(*conversationRpc)
conversationGroup.POST("/get_conversations_list", c.GetConversationsList)
conversationGroup.POST("/get_all_conversations", c.GetAllConversations) conversationGroup.POST("/get_all_conversations", c.GetAllConversations)
conversationGroup.POST("/get_conversation", c.GetConversation) conversationGroup.POST("/get_conversation", c.GetConversation)
conversationGroup.POST("/get_conversations", c.GetConversations) conversationGroup.POST("/get_conversations", c.GetConversations)

@ -71,6 +71,14 @@ func (o *ThirdApi) AccessURL(c *gin.Context) {
a2r.Call(third.ThirdClient.AccessURL, o.Client, c) a2r.Call(third.ThirdClient.AccessURL, o.Client, c)
} }
func (o *ThirdApi) InitiateFormData(c *gin.Context) {
a2r.Call(third.ThirdClient.InitiateFormData, o.Client, c)
}
func (o *ThirdApi) CompleteFormData(c *gin.Context) {
a2r.Call(third.ThirdClient.CompleteFormData, o.Client, c)
}
func (o *ThirdApi) ObjectRedirect(c *gin.Context) { func (o *ThirdApi) ObjectRedirect(c *gin.Context) {
name := c.Param("name") name := c.Param("name")
if name == "" { if name == "" {

@ -15,8 +15,6 @@
package api package api
import ( import (
"github.com/gin-gonic/gin"
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/msggateway" "github.com/OpenIMSDK/protocol/msggateway"
"github.com/OpenIMSDK/protocol/user" "github.com/OpenIMSDK/protocol/user"
@ -24,6 +22,7 @@ import (
"github.com/OpenIMSDK/tools/apiresp" "github.com/OpenIMSDK/tools/apiresp"
"github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/log"
"github.com/gin-gonic/gin"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
@ -221,3 +220,15 @@ func (u *UserApi) ProcessUserCommandUpdate(c *gin.Context) {
func (u *UserApi) ProcessUserCommandGet(c *gin.Context) { func (u *UserApi) ProcessUserCommandGet(c *gin.Context) {
a2r.Call(user.UserClient.ProcessUserCommandGet, u.Client, c) a2r.Call(user.UserClient.ProcessUserCommandGet, u.Client, c)
} }
func (u *UserApi) AddNotificationAccount(c *gin.Context) {
a2r.Call(user.UserClient.AddNotificationAccount, u.Client, c)
}
func (u *UserApi) UpdateNotificationAccountInfo(c *gin.Context) {
a2r.Call(user.UserClient.UpdateNotificationAccountInfo, u.Client, c)
}
func (u *UserApi) SearchNotificationAccount(c *gin.Context) {
a2r.Call(user.UserClient.SearchNotificationAccount, u.Client, c)
}

@ -87,6 +87,7 @@ func newClient(ctx *UserConnContext, conn LongConn, isCompress bool) *Client {
} }
} }
// ResetClient updates the client's state with new connection and context information.
func (c *Client) ResetClient( func (c *Client) ResetClient(
ctx *UserConnContext, ctx *UserConnContext,
conn LongConn, conn LongConn,
@ -108,11 +109,13 @@ func (c *Client) ResetClient(
c.token = token c.token = token
} }
// pingHandler handles ping messages and sends pong responses.
func (c *Client) pingHandler(_ string) error { func (c *Client) pingHandler(_ string) error {
_ = c.conn.SetReadDeadline(pongWait) _ = c.conn.SetReadDeadline(pongWait)
return c.writePongMsg() return c.writePongMsg()
} }
// readMessage continuously reads messages from the connection.
func (c *Client) readMessage() { func (c *Client) readMessage() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
@ -164,6 +167,7 @@ func (c *Client) readMessage() {
} }
} }
// handleMessage processes a single message received by the client.
func (c *Client) handleMessage(message []byte) error { func (c *Client) handleMessage(message []byte) error {
if c.IsCompress { if c.IsCompress {
var err error var err error

@ -288,12 +288,13 @@ func (ws *WsServer) registerClient(client *Client) {
} }
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(1) if config.Config.Envs.Discovery == "zookeeper" {
go func() { wg.Add(1)
defer wg.Done() go func() {
_ = ws.sendUserOnlineInfoToOtherNode(client.ctx, client) defer wg.Done()
}() _ = ws.sendUserOnlineInfoToOtherNode(client.ctx, client)
}()
}
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()

@ -67,13 +67,14 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
case constant.SuperGroupChatType: case constant.SuperGroupChatType:
err = c.pusher.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData) err = c.pusher.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData)
default: default:
var pushUserIDs []string var pushUserIDList []string
if pbData.MsgData.SendID != pbData.MsgData.RecvID { isSenderSync := utils.GetSwitchFromOptions(pbData.MsgData.Options, constant.IsSenderSync)
pushUserIDs = []string{pbData.MsgData.SendID, pbData.MsgData.RecvID} if !isSenderSync || pbData.MsgData.SendID == pbData.MsgData.RecvID {
pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID)
} else { } else {
pushUserIDs = []string{pbData.MsgData.SendID} pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID, pbData.MsgData.SendID)
} }
err = c.pusher.Push2User(ctx, pushUserIDs, pbData.MsgData) err = c.pusher.Push2User(ctx, pushUserIDList, pbData.MsgData)
} }
if err != nil { if err != nil {
if err == errNoOfflinePusher { if err == errNoOfflinePusher {

@ -16,9 +16,8 @@ package push
import ( import (
"context" "context"
"sync"
"github.com/OpenIMSDK/tools/utils" "github.com/OpenIMSDK/tools/utils"
"sync"
"google.golang.org/grpc" "google.golang.org/grpc"

@ -18,6 +18,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"google.golang.org/grpc"
"sync" "sync"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
@ -142,6 +143,47 @@ func (p *Pusher) UnmarshalNotificationElem(bytes []byte, t any) error {
return json.Unmarshal([]byte(notification.Detail), t) return json.Unmarshal([]byte(notification.Detail), t)
} }
/*
k8s deployment,offline push group messages function
*/
func (p *Pusher) k8sOfflinePush2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData, wsResults []*msggateway.SingleMsgToUserResults) error {
var needOfflinePushUserIDs []string
for _, v := range wsResults {
if !v.OnlinePush {
needOfflinePushUserIDs = append(needOfflinePushUserIDs, v.UserID)
}
}
if len(needOfflinePushUserIDs) > 0 {
var offlinePushUserIDs []string
err := callbackOfflinePush(ctx, needOfflinePushUserIDs, msg, &offlinePushUserIDs)
if err != nil {
return err
}
if len(offlinePushUserIDs) > 0 {
needOfflinePushUserIDs = offlinePushUserIDs
}
if msg.ContentType != constant.SignalingNotification {
resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs(
ctx,
&conversation.GetConversationOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs},
)
if err != nil {
return err
}
if len(resp.UserIDs) > 0 {
err = p.offlinePushMsg(ctx, groupID, msg, resp.UserIDs)
if err != nil {
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg)
return err
}
}
}
}
return nil
}
func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID) log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
var pushToUserIDs []string var pushToUserIDs []string
@ -205,7 +247,10 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
log.ZDebug(ctx, "get conn and online push success", "result", wsResults, "msg", msg) log.ZDebug(ctx, "get conn and online push success", "result", wsResults, "msg", msg)
isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush) isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush)
if isOfflinePush { if isOfflinePush && config.Config.Envs.Discovery == "k8s" {
return p.k8sOfflinePush2SuperGroup(ctx, groupID, msg, wsResults)
}
if isOfflinePush && config.Config.Envs.Discovery == "zookeeper" {
var ( var (
onlineSuccessUserIDs = []string{msg.SendID} onlineSuccessUserIDs = []string{msg.SendID}
webAndPcBackgroundUserIDs []string webAndPcBackgroundUserIDs []string
@ -239,14 +284,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
} }
needOfflinePushUserIDs := utils.DifferenceString(onlineSuccessUserIDs, pushToUserIDs) needOfflinePushUserIDs := utils.DifferenceString(onlineSuccessUserIDs, pushToUserIDs)
if msg.ContentType != constant.SignalingNotification {
notNotificationUserIDs, err := p.conversationLocalCache.GetRecvMsgNotNotifyUserIDs(ctx, groupID)
if err != nil {
return err
}
needOfflinePushUserIDs = utils.SliceSub(needOfflinePushUserIDs, notNotificationUserIDs)
}
// Use offline push messaging // Use offline push messaging
if len(needOfflinePushUserIDs) > 0 { if len(needOfflinePushUserIDs) > 0 {
var offlinePushUserIDs []string var offlinePushUserIDs []string
@ -258,30 +296,89 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
if len(offlinePushUserIDs) > 0 { if len(offlinePushUserIDs) > 0 {
needOfflinePushUserIDs = offlinePushUserIDs needOfflinePushUserIDs = offlinePushUserIDs
} }
resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs( if msg.ContentType != constant.SignalingNotification {
ctx, resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs(
&conversation.GetConversationOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs}, ctx,
) &conversation.GetConversationOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs},
if err != nil { )
return err
}
if len(resp.UserIDs) > 0 {
err = p.offlinePushMsg(ctx, groupID, msg, resp.UserIDs)
if err != nil { if err != nil {
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg)
return err return err
} }
if _, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(resp.UserIDs, webAndPcBackgroundUserIDs)); err != nil { if len(resp.UserIDs) > 0 {
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, webAndPcBackgroundUserIDs)) err = p.offlinePushMsg(ctx, groupID, msg, resp.UserIDs)
return err if err != nil {
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg)
return err
}
if _, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(resp.UserIDs, webAndPcBackgroundUserIDs)); err != nil {
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, webAndPcBackgroundUserIDs))
return err
}
} }
} }
} }
} }
return nil return nil
} }
func (p *Pusher) k8sOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
var usersHost = make(map[string][]string)
for _, v := range pushToUserIDs {
tHost, err := p.discov.GetUserIdHashGatewayHost(ctx, v)
if err != nil {
log.ZError(ctx, "get msggateway hash error", err)
return nil, err
}
tUsers, tbl := usersHost[tHost]
if tbl {
tUsers = append(tUsers, v)
usersHost[tHost] = tUsers
} else {
usersHost[tHost] = []string{v}
}
}
log.ZDebug(ctx, "genUsers send hosts struct:", "usersHost", usersHost)
var usersConns = make(map[*grpc.ClientConn][]string)
for host, userIds := range usersHost {
tconn, _ := p.discov.GetConn(ctx, host)
usersConns[tconn] = userIds
}
var (
mu sync.Mutex
wg = errgroup.Group{}
maxWorkers = config.Config.Push.MaxConcurrentWorkers
)
if maxWorkers < 3 {
maxWorkers = 3
}
wg.SetLimit(maxWorkers)
for conn, userIds := range usersConns {
tcon := conn
tuserIds := userIds
wg.Go(func() error {
input := &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: tuserIds}
msgClient := msggateway.NewMsgGatewayClient(tcon)
reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, input)
if err != nil {
return nil
}
log.ZDebug(ctx, "push result", "reply", reply)
if reply != nil && reply.SinglePushResult != nil {
mu.Lock()
wsResults = append(wsResults, reply.SinglePushResult...)
mu.Unlock()
}
return nil
})
}
_ = wg.Wait()
return wsResults, nil
}
func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
if config.Config.Envs.Discovery == "k8s" {
return p.k8sOnlinePush(ctx, msg, pushToUserIDs)
}
conns, err := p.discov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName) conns, err := p.discov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName)
log.ZDebug(ctx, "get gateway conn", "conn length", len(conns)) log.ZDebug(ctx, "get gateway conn", "conn length", len(conns))
if err != nil { if err != nil {

@ -17,6 +17,8 @@ package conversation
import ( import (
"context" "context"
"errors" "errors"
"github.com/OpenIMSDK/protocol/sdkws"
"sort"
"github.com/OpenIMSDK/tools/tx" "github.com/OpenIMSDK/tools/tx"
@ -41,6 +43,8 @@ import (
) )
type conversationServer struct { type conversationServer struct {
msgRpcClient *rpcclient.MessageRpcClient
user *rpcclient.UserRpcClient
groupRpcClient *rpcclient.GroupRpcClient groupRpcClient *rpcclient.GroupRpcClient
conversationDatabase controller.ConversationDatabase conversationDatabase controller.ConversationDatabase
conversationNotificationSender *notification.ConversationNotificationSender conversationNotificationSender *notification.ConversationNotificationSender
@ -61,7 +65,10 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
} }
groupRpcClient := rpcclient.NewGroupRpcClient(client) groupRpcClient := rpcclient.NewGroupRpcClient(client)
msgRpcClient := rpcclient.NewMessageRpcClient(client) msgRpcClient := rpcclient.NewMessageRpcClient(client)
userRpcClient := rpcclient.NewUserRpcClient(client)
pbconversation.RegisterConversationServer(server, &conversationServer{ pbconversation.RegisterConversationServer(server, &conversationServer{
msgRpcClient: &msgRpcClient,
user: &userRpcClient,
conversationNotificationSender: notification.NewConversationNotificationSender(&msgRpcClient), conversationNotificationSender: notification.NewConversationNotificationSender(&msgRpcClient),
groupRpcClient: &groupRpcClient, groupRpcClient: &groupRpcClient,
conversationDatabase: controller.NewConversationDatabase(conversationDB, cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), tx.NewMongo(mongo.GetClient())), conversationDatabase: controller.NewConversationDatabase(conversationDB, cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), tx.NewMongo(mongo.GetClient())),
@ -82,6 +89,73 @@ func (c *conversationServer) GetConversation(ctx context.Context, req *pbconvers
return resp, nil return resp, nil
} }
func (m *conversationServer) GetConversationList(ctx context.Context, req *pbconversation.GetConversationListReq) (resp *pbconversation.GetConversationListResp, err error) {
log.ZDebug(ctx, "GetConversationList", "seqs", req, "userID", req.UserID)
var conversationIDs []string
if len(req.ConversationIDs) == 0 {
conversationIDs, err = m.conversationDatabase.GetConversationIDs(ctx, req.UserID)
if err != nil {
return nil, err
}
} else {
conversationIDs = req.ConversationIDs
}
conversations, err := m.conversationDatabase.FindConversations(ctx, req.UserID, conversationIDs)
if err != nil {
return nil, err
}
if len(conversations) == 0 {
return nil, errs.ErrRecordNotFound.Wrap()
}
maxSeqs, err := m.msgRpcClient.GetMaxSeqs(ctx, conversationIDs)
if err != nil {
return nil, err
}
chatLogs, err := m.msgRpcClient.GetMsgByConversationIDs(ctx, conversationIDs, maxSeqs)
if err != nil {
return nil, err
}
conversationMsg, err := m.getConversationInfo(ctx, chatLogs, req.UserID)
if err != nil {
return nil, err
}
hasReadSeqs, err := m.msgRpcClient.GetHasReadSeqs(ctx, req.UserID, conversationIDs)
if err != nil {
return nil, err
}
conversation_unreadCount := make(map[string]int64)
for conversationID, maxSeq := range maxSeqs {
conversation_unreadCount[conversationID] = maxSeq - hasReadSeqs[conversationID]
}
conversation_isPinkTime := make(map[int64]string)
conversation_notPinkTime := make(map[int64]string)
for _, v := range conversations {
conversationID := v.ConversationID
time := conversationMsg[conversationID].MsgInfo.LatestMsgRecvTime
conversationMsg[conversationID].RecvMsgOpt = v.RecvMsgOpt
if v.IsPinned {
conversationMsg[conversationID].IsPinned = v.IsPinned
conversation_isPinkTime[time] = conversationID
continue
}
conversation_notPinkTime[time] = conversationID
}
resp = &pbconversation.GetConversationListResp{
ConversationElems: []*pbconversation.ConversationElem{},
}
m.conversationSort(conversation_isPinkTime, resp, conversation_unreadCount, conversationMsg)
m.conversationSort(conversation_notPinkTime, resp, conversation_unreadCount, conversationMsg)
return resp, nil
}
func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbconversation.GetAllConversationsReq) (*pbconversation.GetAllConversationsResp, error) { func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbconversation.GetAllConversationsReq) (*pbconversation.GetAllConversationsResp, error) {
conversations, err := c.conversationDatabase.GetUserAllConversation(ctx, req.OwnerUserID) conversations, err := c.conversationDatabase.GetUserAllConversation(ctx, req.OwnerUserID)
if err != nil { if err != nil {
@ -348,3 +422,102 @@ func (c *conversationServer) GetConversationOfflinePushUserIDs(
} }
return &pbconversation.GetConversationOfflinePushUserIDsResp{UserIDs: utils.Keys(userIDSet)}, nil return &pbconversation.GetConversationOfflinePushUserIDsResp{UserIDs: utils.Keys(userIDSet)}, nil
} }
func (c *conversationServer) conversationSort(
conversations map[int64]string,
resp *pbconversation.GetConversationListResp,
conversation_unreadCount map[string]int64,
conversationMsg map[string]*pbconversation.ConversationElem,
) {
keys := []int64{}
for key := range conversations {
keys = append(keys, key)
}
sort.Slice(keys[:], func(i, j int) bool {
return keys[i] > keys[j]
})
index := 0
cons := make([]*pbconversation.ConversationElem, len(conversations))
for _, v := range keys {
conversationID := conversations[v]
conversationElem := conversationMsg[conversationID]
conversationElem.UnreadCount = conversation_unreadCount[conversationID]
cons[index] = conversationElem
index++
}
resp.ConversationElems = append(resp.ConversationElems, cons...)
}
func (c *conversationServer) getConversationInfo(
ctx context.Context,
chatLogs map[string]*sdkws.MsgData,
userID string) (map[string]*pbconversation.ConversationElem, error) {
var (
sendIDs []string
groupIDs []string
sendMap = make(map[string]*sdkws.UserInfo)
groupMap = make(map[string]*sdkws.GroupInfo)
conversationMsg = make(map[string]*pbconversation.ConversationElem)
)
for _, chatLog := range chatLogs {
switch chatLog.SessionType {
case constant.SingleChatType:
if chatLog.SendID == userID {
sendIDs = append(sendIDs, chatLog.RecvID)
}
sendIDs = append(sendIDs, chatLog.SendID)
case constant.GroupChatType, constant.SuperGroupChatType:
groupIDs = append(groupIDs, chatLog.GroupID)
sendIDs = append(sendIDs, chatLog.SendID)
}
}
if len(sendIDs) != 0 {
sendInfos, err := c.user.GetUsersInfo(ctx, sendIDs)
if err != nil {
return nil, err
}
for _, sendInfo := range sendInfos {
sendMap[sendInfo.UserID] = sendInfo
}
}
if len(groupIDs) != 0 {
groupInfos, err := c.groupRpcClient.GetGroupInfos(ctx, groupIDs, false)
if err != nil {
return nil, err
}
for _, groupInfo := range groupInfos {
groupMap[groupInfo.GroupID] = groupInfo
}
}
for conversationID, chatLog := range chatLogs {
pbchatLog := &pbconversation.ConversationElem{}
msgInfo := &pbconversation.MsgInfo{}
if err := utils.CopyStructFields(msgInfo, chatLog); err != nil {
return nil, err
}
switch chatLog.SessionType {
case constant.SingleChatType:
if chatLog.SendID == userID {
msgInfo.FaceURL = sendMap[chatLog.RecvID].FaceURL
msgInfo.SenderName = sendMap[chatLog.RecvID].Nickname
break
}
msgInfo.FaceURL = sendMap[chatLog.SendID].FaceURL
msgInfo.SenderName = sendMap[chatLog.SendID].Nickname
case constant.GroupChatType, constant.SuperGroupChatType:
msgInfo.GroupName = groupMap[chatLog.GroupID].GroupName
msgInfo.GroupFaceURL = groupMap[chatLog.GroupID].FaceURL
msgInfo.GroupMemberCount = groupMap[chatLog.GroupID].MemberCount
msgInfo.GroupID = chatLog.GroupID
msgInfo.GroupType = groupMap[chatLog.GroupID].GroupType
msgInfo.SenderName = sendMap[chatLog.SendID].Nickname
}
pbchatLog.ConversationID = conversationID
msgInfo.LatestMsgRecvTime = chatLog.SendTime
pbchatLog.MsgInfo = msgInfo
conversationMsg[conversationID] = pbchatLog
}
return conversationMsg, nil
}

@ -454,22 +454,27 @@ func (s *friendServer) UpdateFriends(
for _, friendID := range req.FriendUserIDs { for _, friendID := range req.FriendUserIDs {
if req.IsPinned != nil { if req.IsPinned != nil {
if err := s.friendDatabase.UpdateFriendPinStatus(ctx, req.OwnerUserID, friendID, req.IsPinned.Value); err != nil { if err = s.friendDatabase.UpdateFriendPinStatus(ctx, req.OwnerUserID, friendID, req.IsPinned.Value); err != nil {
return nil, err return nil, err
} }
} }
if req.Remark != nil { if req.Remark != nil {
if err := s.friendDatabase.UpdateFriendRemark(ctx, req.OwnerUserID, friendID, req.Remark.Value); err != nil { if err = s.friendDatabase.UpdateFriendRemark(ctx, req.OwnerUserID, friendID, req.Remark.Value); err != nil {
return nil, err return nil, err
} }
} }
if req.Ex != nil { if req.Ex != nil {
if err := s.friendDatabase.UpdateFriendEx(ctx, req.OwnerUserID, friendID, req.Ex.Value); err != nil { if err = s.friendDatabase.UpdateFriendEx(ctx, req.OwnerUserID, friendID, req.Ex.Value); err != nil {
return nil, err return nil, err
} }
} }
} }
resp := &pbfriend.UpdateFriendsResp{} resp := &pbfriend.UpdateFriendsResp{}
err = s.notificationSender.FriendsInfoUpdateNotification(ctx, req.OwnerUserID, req.FriendUserIDs)
if err != nil {
return nil, errs.Wrap(err, "FriendsInfoUpdateNotification Error")
}
return resp, nil return resp, nil
} }

@ -18,6 +18,7 @@ import (
"context" "context"
utils2 "github.com/OpenIMSDK/tools/utils" utils2 "github.com/OpenIMSDK/tools/utils"
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
@ -26,8 +27,6 @@ import (
"github.com/OpenIMSDK/protocol/sdkws" "github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/log"
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
) )
func (m *msgServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *msg.GetConversationsHasReadAndMaxSeqReq) (resp *msg.GetConversationsHasReadAndMaxSeqResp, err error) { func (m *msgServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *msg.GetConversationsHasReadAndMaxSeqReq) (resp *msg.GetConversationsHasReadAndMaxSeqResp, err error) {

@ -100,7 +100,7 @@ func callbackAfterSendSingleMsg(ctx context.Context, msg *pbchat.SendMsgReq) err
} }
func callbackBeforeSendGroupMsg(ctx context.Context, msg *pbchat.SendMsgReq) error { func callbackBeforeSendGroupMsg(ctx context.Context, msg *pbchat.SendMsgReq) error {
if !config.Config.Callback.CallbackBeforeSendSingleMsg.Enable { if !config.Config.Callback.CallbackBeforeSendGroupMsg.Enable {
return nil return nil
} }
req := &cbapi.CallbackBeforeSendGroupMsgReq{ req := &cbapi.CallbackBeforeSendGroupMsgReq{

@ -16,7 +16,6 @@ package msg
import ( import (
"context" "context"
pbmsg "github.com/OpenIMSDK/protocol/msg" pbmsg "github.com/OpenIMSDK/protocol/msg"
) )
@ -30,3 +29,27 @@ func (m *msgServer) GetConversationMaxSeq(
} }
return &pbmsg.GetConversationMaxSeqResp{MaxSeq: maxSeq}, nil return &pbmsg.GetConversationMaxSeqResp{MaxSeq: maxSeq}, nil
} }
func (m *msgServer) GetMaxSeqs(ctx context.Context, req *pbmsg.GetMaxSeqsReq) (*pbmsg.SeqsInfoResp, error) {
maxSeqs, err := m.MsgDatabase.GetMaxSeqs(ctx, req.ConversationIDs)
if err != nil {
return nil, err
}
return &pbmsg.SeqsInfoResp{MaxSeqs: maxSeqs}, nil
}
func (m *msgServer) GetHasReadSeqs(ctx context.Context, req *pbmsg.GetHasReadSeqsReq) (*pbmsg.SeqsInfoResp, error) {
hasReadSeqs, err := m.MsgDatabase.GetHasReadSeqs(ctx, req.UserID, req.ConversationIDs)
if err != nil {
return nil, err
}
return &pbmsg.SeqsInfoResp{MaxSeqs: hasReadSeqs}, nil
}
func (m *msgServer) GetMsgByConversationIDs(ctx context.Context, req *pbmsg.GetMsgByConversationIDsReq) (*pbmsg.GetMsgByConversationIDsResp, error) {
Msgs, err := m.MsgDatabase.FindOneByDocIDs(ctx, req.ConversationIDs, req.MaxSeqs)
if err != nil {
return nil, err
}
return &pbmsg.GetMsgByConversationIDsResp{MsgDatas: Msgs}, nil
}

@ -16,6 +16,12 @@ package third
import ( import (
"context" "context"
"encoding/base64"
"encoding/hex"
"encoding/json"
"github.com/google/uuid"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"path"
"strconv" "strconv"
"time" "time"
@ -179,6 +185,113 @@ func (t *thirdServer) AccessURL(ctx context.Context, req *third.AccessURLReq) (*
}, nil }, nil
} }
func (t *thirdServer) InitiateFormData(ctx context.Context, req *third.InitiateFormDataReq) (*third.InitiateFormDataResp, error) {
if req.Name == "" {
return nil, errs.ErrArgs.Wrap("name is empty")
}
if req.Size <= 0 {
return nil, errs.ErrArgs.Wrap("size must be greater than 0")
}
if err := checkUploadName(ctx, req.Name); err != nil {
return nil, err
}
var duration time.Duration
opUserID := mcontext.GetOpUserID(ctx)
var key string
if authverify.IsManagerUserID(opUserID) {
if req.Millisecond <= 0 {
duration = time.Minute * 10
} else {
duration = time.Millisecond * time.Duration(req.Millisecond)
}
if req.Absolute {
key = req.Name
}
} else {
duration = time.Minute * 10
}
uid, err := uuid.NewRandom()
if err != nil {
return nil, err
}
if key == "" {
date := time.Now().Format("20060102")
key = path.Join(cont.DirectPath, date, opUserID, hex.EncodeToString(uid[:])+path.Ext(req.Name))
}
mate := FormDataMate{
Name: req.Name,
Size: req.Size,
ContentType: req.ContentType,
Group: req.Group,
Key: key,
}
mateData, err := json.Marshal(&mate)
if err != nil {
return nil, err
}
resp, err := t.s3dataBase.FormData(ctx, key, req.Size, req.ContentType, duration)
if err != nil {
return nil, err
}
return &third.InitiateFormDataResp{
Id: base64.RawStdEncoding.EncodeToString(mateData),
Url: resp.URL,
File: resp.File,
Header: toPbMapArray(resp.Header),
FormData: resp.FormData,
Expires: resp.Expires.UnixMilli(),
SuccessCodes: utils.Slice(resp.SuccessCodes, func(code int) int32 {
return int32(code)
}),
}, nil
}
func (t *thirdServer) CompleteFormData(ctx context.Context, req *third.CompleteFormDataReq) (*third.CompleteFormDataResp, error) {
if req.Id == "" {
return nil, errs.ErrArgs.Wrap("id is empty")
}
data, err := base64.RawStdEncoding.DecodeString(req.Id)
if err != nil {
return nil, errs.ErrArgs.Wrap("invalid id " + err.Error())
}
var mate FormDataMate
if err := json.Unmarshal(data, &mate); err != nil {
return nil, errs.ErrArgs.Wrap("invalid id " + err.Error())
}
if err := checkUploadName(ctx, mate.Name); err != nil {
return nil, err
}
info, err := t.s3dataBase.StatObject(ctx, mate.Key)
if err != nil {
return nil, err
}
if info.Size > 0 && info.Size != mate.Size {
return nil, errs.ErrData.Wrap("file size mismatch")
}
obj := &relation.ObjectModel{
Name: mate.Name,
UserID: mcontext.GetOpUserID(ctx),
Hash: "etag_" + info.ETag,
Key: info.Key,
Size: info.Size,
ContentType: mate.ContentType,
Group: mate.Group,
CreateTime: time.Now(),
}
if err := t.s3dataBase.SetObject(ctx, obj); err != nil {
return nil, err
}
return &third.CompleteFormDataResp{Url: t.apiAddress(mate.Name)}, nil
}
func (t *thirdServer) apiAddress(name string) string { func (t *thirdServer) apiAddress(name string) string {
return t.apiURL + name return t.apiURL + name
} }
type FormDataMate struct {
Name string `json:"name"`
Size int64 `json:"size"`
ContentType string `json:"contentType"`
Group string `json:"group"`
Key string `json:"key"`
}

@ -29,6 +29,9 @@ import (
) )
func toPbMapArray(m map[string][]string) []*third.KeyValues { func toPbMapArray(m map[string][]string) []*third.KeyValues {
if len(m) == 0 {
return nil
}
res := make([]*third.KeyValues, 0, len(m)) res := make([]*third.KeyValues, 0, len(m))
for key := range m { for key := range m {
res = append(res, &third.KeyValues{ res = append(res, &third.KeyValues{

@ -17,6 +17,8 @@ package user
import ( import (
"context" "context"
"errors" "errors"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"math/rand"
"strings" "strings"
"time" "time"
@ -72,6 +74,12 @@ func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
for k, v := range config.Config.Manager.UserID { for k, v := range config.Config.Manager.UserID {
users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.Config.Manager.Nickname[k], AppMangerLevel: constant.AppAdmin}) users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.Config.Manager.Nickname[k], AppMangerLevel: constant.AppAdmin})
} }
if len(config.Config.IMAdmin.UserID) != len(config.Config.IMAdmin.Nickname) {
return errors.New("len(config.Config.AppNotificationAdmin.AppManagerUid) != len(config.Config.AppNotificationAdmin.Nickname)")
}
for k, v := range config.Config.IMAdmin.UserID {
users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.Config.IMAdmin.Nickname[k], AppMangerLevel: constant.AppNotificationAdmin})
}
userDB, err := mgo.NewUserMongo(mongo.GetDatabase()) userDB, err := mgo.NewUserMongo(mongo.GetDatabase())
if err != nil { if err != nil {
return err return err
@ -148,11 +156,11 @@ func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUse
return nil, err return nil, err
} }
if err := CallbackBeforeUpdateUserInfoEx(ctx, req); err != nil { if err = CallbackBeforeUpdateUserInfoEx(ctx, req); err != nil {
return nil, err return nil, err
} }
data := convert.UserPb2DBMapEx(req.UserInfo) data := convert.UserPb2DBMapEx(req.UserInfo)
if err := s.UpdateByMap(ctx, req.UserInfo.UserID, data); err != nil { if err = s.UpdateByMap(ctx, req.UserInfo.UserID, data); err != nil {
return nil, err return nil, err
} }
_ = s.friendNotificationSender.UserInfoUpdatedNotification(ctx, req.UserInfo.UserID) _ = s.friendNotificationSender.UserInfoUpdatedNotification(ctx, req.UserInfo.UserID)
@ -424,3 +432,141 @@ func (s *userServer) ProcessUserCommandGet(ctx context.Context, req *pbuser.Proc
// Return the response with the slice // Return the response with the slice
return &pbuser.ProcessUserCommandGetResp{KVArray: commandInfoSlice}, nil return &pbuser.ProcessUserCommandGetResp{KVArray: commandInfoSlice}, nil
} }
func (s *userServer) AddNotificationAccount(ctx context.Context, req *pbuser.AddNotificationAccountReq) (*pbuser.AddNotificationAccountResp, error) {
if err := authverify.CheckIMAdmin(ctx); err != nil {
return nil, err
}
var userID string
for i := 0; i < 20; i++ {
userId := s.genUserID()
_, err := s.UserDatabase.FindWithError(ctx, []string{userId})
if err == nil {
continue
}
userID = userId
break
}
if userID == "" {
return nil, errs.ErrInternalServer.Wrap("gen user id failed")
}
user := &tablerelation.UserModel{
UserID: userID,
Nickname: req.NickName,
FaceURL: req.FaceURL,
CreateTime: time.Now(),
AppMangerLevel: constant.AppNotificationAdmin,
}
if err := s.UserDatabase.Create(ctx, []*tablerelation.UserModel{user}); err != nil {
return nil, err
}
return &pbuser.AddNotificationAccountResp{}, nil
}
func (s *userServer) UpdateNotificationAccountInfo(ctx context.Context, req *pbuser.UpdateNotificationAccountInfoReq) (*pbuser.UpdateNotificationAccountInfoResp, error) {
if err := authverify.CheckIMAdmin(ctx); err != nil {
return nil, err
}
if _, err := s.UserDatabase.FindWithError(ctx, []string{req.UserID}); err != nil {
return nil, errs.ErrArgs.Wrap()
}
user := map[string]interface{}{}
if req.NickName != "" {
user["nickname"] = req.NickName
}
if req.FaceURL != "" {
user["face_url"] = req.FaceURL
}
if err := s.UserDatabase.UpdateByMap(ctx, req.UserID, user); err != nil {
return nil, err
}
return &pbuser.UpdateNotificationAccountInfoResp{}, nil
}
func (s *userServer) SearchNotificationAccount(ctx context.Context, req *pbuser.SearchNotificationAccountReq) (*pbuser.SearchNotificationAccountResp, error) {
if err := authverify.CheckIMAdmin(ctx); err != nil {
return nil, err
}
if req.NickName != "" {
users, err := s.UserDatabase.FindByNickname(ctx, req.NickName)
if err != nil {
return nil, err
}
resp := s.userModelToResp(users)
return resp, nil
}
if req.UserID != "" {
users, err := s.UserDatabase.Find(ctx, []string{req.UserID})
if err != nil {
return nil, err
}
resp := s.userModelToResp(users)
return resp, nil
}
_, users, err := s.UserDatabase.Page(ctx, req.Pagination)
if err != nil {
return nil, err
}
resp := s.userModelToResp(users)
return resp, nil
}
func (s *userServer) GetNotificationAccount(ctx context.Context, req *pbuser.GetNotificationAccountReq) (*pbuser.GetNotificationAccountResp, error) {
if req.UserID == "" {
return nil, errs.ErrArgs.Wrap("userID is empty")
}
user, err := s.UserDatabase.GetUserByID(ctx, req.UserID)
if err != nil {
return nil, errs.ErrUserIDNotFound.Wrap()
}
if user.AppMangerLevel == constant.AppAdmin || user.AppMangerLevel == constant.AppNotificationAdmin {
return &pbuser.GetNotificationAccountResp{}, nil
}
return nil, errs.ErrNoPermission.Wrap("notification messages cannot be sent for this ID")
}
func (s *userServer) genUserID() string {
const l = 10
data := make([]byte, l)
rand.Read(data)
chars := []byte("0123456789")
for i := 0; i < len(data); i++ {
if i == 0 {
data[i] = chars[1:][data[i]%9]
} else {
data[i] = chars[data[i]%10]
}
}
return string(data)
}
func (s *userServer) userModelToResp(users []*relation.UserModel) *pbuser.SearchNotificationAccountResp {
accounts := make([]*pbuser.NotificationAccountInfo, 0)
var total int64
for _, v := range users {
if v.AppMangerLevel == constant.AppNotificationAdmin || v.AppMangerLevel == constant.AppAdmin {
temp := &pbuser.NotificationAccountInfo{
UserID: v.UserID,
FaceURL: v.FaceURL,
NickName: v.Nickname,
}
accounts = append(accounts, temp)
total += 1
}
}
return &pbuser.SearchNotificationAccountResp{Total: total, NotificationAccounts: accounts}
}

@ -64,6 +64,30 @@ type SendMsgReq struct {
SendMsg SendMsg
} }
type GetConversationListReq struct {
// userID uniquely identifies the user.
UserID string `protobuf:"bytes,1,opt,name=userID,proto3" json:"userID,omitempty" binding:"required"`
// ConversationIDs contains a list of unique identifiers for conversations.
ConversationIDs []string `protobuf:"bytes,2,rep,name=conversationIDs,proto3" json:"conversationIDs,omitempty"`
}
type GetConversationListResp struct {
// ConversationElems is a map that associates conversation IDs with their respective details.
ConversationElems map[string]*ConversationElem `protobuf:"bytes,1,rep,name=conversationElems,proto3" json:"conversationElems,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
}
type ConversationElem struct {
// MaxSeq represents the maximum sequence number within the conversation.
MaxSeq int64 `protobuf:"varint,1,opt,name=maxSeq,proto3" json:"maxSeq,omitempty"`
// UnreadSeq represents the number of unread messages in the conversation.
UnreadSeq int64 `protobuf:"varint,2,opt,name=unreadSeq,proto3" json:"unreadSeq,omitempty"`
// LastSeqTime represents the timestamp of the last sequence in the conversation.
LastSeqTime int64 `protobuf:"varint,3,opt,name=LastSeqTime,proto3" json:"LastSeqTime,omitempty"`
}
// BatchSendMsgReq defines the structure for sending a message to multiple recipients. // BatchSendMsgReq defines the structure for sending a message to multiple recipients.
type BatchSendMsgReq struct { type BatchSendMsgReq struct {
SendMsg SendMsg

@ -87,7 +87,7 @@ type OANotificationElem struct {
NotificationType int32 `mapstructure:"notificationType" json:"notificationType" validate:"required"` NotificationType int32 `mapstructure:"notificationType" json:"notificationType" validate:"required"`
Text string `mapstructure:"text" json:"text" validate:"required"` Text string `mapstructure:"text" json:"text" validate:"required"`
Url string `mapstructure:"url" json:"url"` Url string `mapstructure:"url" json:"url"`
MixType int32 `mapstructure:"mixType" json:"mixType" validate:"required"` MixType int32 `mapstructure:"mixType" json:"mixType"`
PictureElem *PictureElem `mapstructure:"pictureElem" json:"pictureElem"` PictureElem *PictureElem `mapstructure:"pictureElem" json:"pictureElem"`
SoundElem *SoundElem `mapstructure:"soundElem" json:"soundElem"` SoundElem *SoundElem `mapstructure:"soundElem" json:"soundElem"`
VideoElem *VideoElem `mapstructure:"videoElem" json:"videoElem"` VideoElem *VideoElem `mapstructure:"videoElem" json:"videoElem"`

@ -54,6 +54,15 @@ func CheckAdmin(ctx context.Context) error {
} }
return errs.ErrNoPermission.Wrap(fmt.Sprintf("user %s is not admin userID", mcontext.GetOpUserID(ctx))) return errs.ErrNoPermission.Wrap(fmt.Sprintf("user %s is not admin userID", mcontext.GetOpUserID(ctx)))
} }
func CheckIMAdmin(ctx context.Context) error {
if utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.IMAdmin.UserID) {
return nil
}
if utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.Manager.UserID) {
return nil
}
return errs.ErrNoPermission.Wrap(fmt.Sprintf("user %s is not CheckIMAdmin userID", mcontext.GetOpUserID(ctx)))
}
func ParseRedisInterfaceToken(redisToken any) (*tokenverify.Claims, error) { func ParseRedisInterfaceToken(redisToken any) (*tokenverify.Claims, error) {
return tokenverify.GetClaimFromToken(string(redisToken.([]uint8)), Secret()) return tokenverify.GetClaimFromToken(string(redisToken.([]uint8)), Secret())

@ -45,7 +45,7 @@ type CmdOpts struct {
func WithCronTaskLogName() func(*CmdOpts) { func WithCronTaskLogName() func(*CmdOpts) {
return func(opts *CmdOpts) { return func(opts *CmdOpts) {
opts.loggerPrefixName = "OpenIM.CronTask.log.all" opts.loggerPrefixName = "openim.crontask.log.all"
} }
} }

@ -236,6 +236,11 @@ type configStruct struct {
Nickname []string `yaml:"nickname"` Nickname []string `yaml:"nickname"`
} `yaml:"manager"` } `yaml:"manager"`
IMAdmin struct {
UserID []string `yaml:"userID"`
Nickname []string `yaml:"nickname"`
} `yaml:"im-admin"`
MultiLoginPolicy int `yaml:"multiLoginPolicy"` MultiLoginPolicy int `yaml:"multiLoginPolicy"`
ChatPersistenceMysql bool `yaml:"chatPersistenceMysql"` ChatPersistenceMysql bool `yaml:"chatPersistenceMysql"`
MsgCacheTimeout int `yaml:"msgCacheTimeout"` MsgCacheTimeout int `yaml:"msgCacheTimeout"`

@ -87,7 +87,15 @@ func NewRedis() (redis.UniversalClient, error) {
// overrideConfigFromEnv overrides configuration fields with environment variables if present. // overrideConfigFromEnv overrides configuration fields with environment variables if present.
func overrideConfigFromEnv() { func overrideConfigFromEnv() {
if envAddr := os.Getenv("REDIS_ADDRESS"); envAddr != "" { if envAddr := os.Getenv("REDIS_ADDRESS"); envAddr != "" {
config.Config.Redis.Address = strings.Split(envAddr, ",") // Assuming addresses are comma-separated if envPort := os.Getenv("REDIS_PORT"); envPort != "" {
addresses := strings.Split(envAddr, ",")
for i, addr := range addresses {
addresses[i] = addr + ":" + envPort
}
config.Config.Redis.Address = addresses
} else {
config.Config.Redis.Address = strings.Split(envAddr, ",")
}
} }
if envUser := os.Getenv("REDIS_USERNAME"); envUser != "" { if envUser := os.Getenv("REDIS_USERNAME"); envUser != "" {
config.Config.Redis.Username = envUser config.Config.Redis.Username = envUser

@ -82,6 +82,7 @@ type FriendDatabase interface {
// UpdateFriendEx updates the 'ex' field for a friend // UpdateFriendEx updates the 'ex' field for a friend
UpdateFriendEx(ctx context.Context, ownerUserID string, friendUserID string, ex string) (err error) UpdateFriendEx(ctx context.Context, ownerUserID string, friendUserID string, ex string) (err error)
} }
type friendDatabase struct { type friendDatabase struct {

@ -98,6 +98,7 @@ type CommonMsgDatabase interface {
SetSendMsgStatus(ctx context.Context, id string, status int32) error SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context, id string) (int32, error) GetSendMsgStatus(ctx context.Context, id string) (int32, error)
SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int32, msgData []*sdkws.MsgData, err error) SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int32, msgData []*sdkws.MsgData, err error)
FindOneByDocIDs(ctx context.Context, docIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error)
// to mq // to mq
MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error
@ -1051,6 +1052,21 @@ func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbmsg.Searc
return total, totalMsgs, nil return total, totalMsgs, nil
} }
func (db *commonMsgDatabase) FindOneByDocIDs(ctx context.Context, conversationIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error) {
totalMsgs := make(map[string]*sdkws.MsgData)
for _, conversationID := range conversationIDs {
seq := seqs[conversationID]
docID := db.msg.GetDocID(conversationID, seq)
msgs, err := db.msgDocDatabase.FindOneByDocID(ctx, docID)
if err != nil {
return nil, err
}
index := db.msg.GetMsgIndex(seq)
totalMsgs[conversationID] = convert.MsgDB2Pb(msgs.Msg[index].Msg)
}
return totalMsgs, nil
}
func (db *commonMsgDatabase) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) { func (db *commonMsgDatabase) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) {
db.msgDocDatabase.ConvertMsgsDocLen(ctx, conversationIDs) db.msgDocDatabase.ConvertMsgsDocLen(ctx, conversationIDs)
} }

@ -35,6 +35,8 @@ type S3Database interface {
CompleteMultipartUpload(ctx context.Context, uploadID string, parts []string) (*cont.UploadResult, error) CompleteMultipartUpload(ctx context.Context, uploadID string, parts []string) (*cont.UploadResult, error)
AccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption) (time.Time, string, error) AccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption) (time.Time, string, error)
SetObject(ctx context.Context, info *relation.ObjectModel) error SetObject(ctx context.Context, info *relation.ObjectModel) error
StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error)
FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error)
} }
func NewS3Database(rdb redis.UniversalClient, s3 s3.Interface, obj relation.ObjectInfoModelInterface) S3Database { func NewS3Database(rdb redis.UniversalClient, s3 s3.Interface, obj relation.ObjectInfoModelInterface) S3Database {
@ -100,3 +102,11 @@ func (s *s3Database) AccessURL(ctx context.Context, name string, expire time.Dur
} }
return expireTime, rawURL, nil return expireTime, rawURL, nil
} }
func (s *s3Database) StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error) {
return s.s3.StatObject(ctx, name)
}
func (s *s3Database) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) {
return s.s3.FormData(ctx, name, size, contentType, duration)
}

@ -38,6 +38,8 @@ type UserDatabase interface {
FindWithError(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error) FindWithError(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error)
// Find Get the information of the specified user If the userID is not found, no error will be returned // Find Get the information of the specified user If the userID is not found, no error will be returned
Find(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error) Find(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error)
// Find userInfo By Nickname
FindByNickname(ctx context.Context, nickname string) (users []*relation.UserModel, err error)
// Create Insert multiple external guarantees that the userID is not repeated and does not exist in the db // Create Insert multiple external guarantees that the userID is not repeated and does not exist in the db
Create(ctx context.Context, users []*relation.UserModel) (err error) Create(ctx context.Context, users []*relation.UserModel) (err error)
// Update update (non-zero value) external guarantee userID exists // Update update (non-zero value) external guarantee userID exists
@ -50,6 +52,8 @@ type UserDatabase interface {
IsExist(ctx context.Context, userIDs []string) (exist bool, err error) IsExist(ctx context.Context, userIDs []string) (exist bool, err error)
// GetAllUserID Get all user IDs // GetAllUserID Get all user IDs
GetAllUserID(ctx context.Context, pagination pagination.Pagination) (int64, []string, error) GetAllUserID(ctx context.Context, pagination pagination.Pagination) (int64, []string, error)
// Get user by userID
GetUserByID(ctx context.Context, userID string) (user *relation.UserModel, err error)
// InitOnce Inside the function, first query whether it exists in the db, if it exists, do nothing; if it does not exist, insert it // InitOnce Inside the function, first query whether it exists in the db, if it exists, do nothing; if it does not exist, insert it
InitOnce(ctx context.Context, users []*relation.UserModel) (err error) InitOnce(ctx context.Context, users []*relation.UserModel) (err error)
// CountTotal Get the total number of users // CountTotal Get the total number of users
@ -131,6 +135,11 @@ func (u *userDatabase) Find(ctx context.Context, userIDs []string) (users []*rel
return u.cache.GetUsersInfo(ctx, userIDs) return u.cache.GetUsersInfo(ctx, userIDs)
} }
// Find userInfo By Nickname
func (u *userDatabase) FindByNickname(ctx context.Context, nickname string) (users []*relation.UserModel, err error) {
return u.userDB.TakeByNickname(ctx, nickname)
}
// Create Insert multiple external guarantees that the userID is not repeated and does not exist in the db. // Create Insert multiple external guarantees that the userID is not repeated and does not exist in the db.
func (u *userDatabase) Create(ctx context.Context, users []*relation.UserModel) (err error) { func (u *userDatabase) Create(ctx context.Context, users []*relation.UserModel) (err error) {
return u.tx.Transaction(ctx, func(ctx context.Context) error { return u.tx.Transaction(ctx, func(ctx context.Context) error {
@ -183,6 +192,10 @@ func (u *userDatabase) GetAllUserID(ctx context.Context, pagination pagination.P
return u.userDB.GetAllUserID(ctx, pagination) return u.userDB.GetAllUserID(ctx, pagination)
} }
func (u *userDatabase) GetUserByID(ctx context.Context, userID string) (user *relation.UserModel, err error) {
return u.userDB.Take(ctx, userID)
}
// CountTotal Get the total number of users. // CountTotal Get the total number of users.
func (u *userDatabase) CountTotal(ctx context.Context, before *time.Time) (count int64, err error) { func (u *userDatabase) CountTotal(ctx context.Context, before *time.Time) (count int64, err error) {
return u.userDB.CountTotal(ctx, before) return u.userDB.CountTotal(ctx, before)

@ -65,6 +65,10 @@ func (u *UserMgo) Take(ctx context.Context, userID string) (user *relation.UserM
return mgoutil.FindOne[*relation.UserModel](ctx, u.coll, bson.M{"user_id": userID}) return mgoutil.FindOne[*relation.UserModel](ctx, u.coll, bson.M{"user_id": userID})
} }
func (u *UserMgo) TakeByNickname(ctx context.Context, nickname string) (user []*relation.UserModel, err error) {
return mgoutil.Find[*relation.UserModel](ctx, u.coll, bson.M{"nickname": nickname})
}
func (u *UserMgo) Page(ctx context.Context, pagination pagination.Pagination) (count int64, users []*relation.UserModel, err error) { func (u *UserMgo) Page(ctx context.Context, pagination pagination.Pagination) (count int64, users []*relation.UserModel, err error) {
return mgoutil.FindPage[*relation.UserModel](ctx, u.coll, bson.M{}, pagination) return mgoutil.FindPage[*relation.UserModel](ctx, u.coll, bson.M{}, pagination)
} }

@ -17,6 +17,7 @@ package cont
const ( const (
hashPath = "openim/data/hash/" hashPath = "openim/data/hash/"
tempPath = "openim/temp/" tempPath = "openim/temp/"
DirectPath = "openim/direct"
UploadTypeMultipart = 1 // 分片上传 UploadTypeMultipart = 1 // 分片上传
UploadTypePresigned = 2 // 预签名上传 UploadTypePresigned = 2 // 预签名上传
partSeparator = "," partSeparator = ","

@ -279,3 +279,7 @@ func (c *Controller) AccessURL(ctx context.Context, name string, expire time.Dur
} }
return c.impl.AccessURL(ctx, name, expire, opt) return c.impl.AccessURL(ctx, name, expire, opt)
} }
func (c *Controller) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) {
return c.impl.FormData(ctx, name, size, contentType, duration)
}

@ -16,6 +16,11 @@ package cos
import ( import (
"context" "context"
"crypto/hmac"
"crypto/sha1"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
@ -44,6 +49,8 @@ const (
imageWebp = "webp" imageWebp = "webp"
) )
const successCode = http.StatusOK
const ( const (
videoSnapshotImagePng = "png" videoSnapshotImagePng = "png"
videoSnapshotImageJpg = "jpg" videoSnapshotImageJpg = "jpg"
@ -326,3 +333,65 @@ func (c *Cos) getPresignedURL(ctx context.Context, name string, expire time.Dura
} }
return c.client.Object.GetObjectURL(name), nil return c.client.Object.GetObjectURL(name), nil
} }
func (c *Cos) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) {
// https://cloud.tencent.com/document/product/436/14690
now := time.Now()
expiration := now.Add(duration)
keyTime := fmt.Sprintf("%d;%d", now.Unix(), expiration.Unix())
conditions := []any{
map[string]string{"q-sign-algorithm": "sha1"},
map[string]string{"q-ak": c.credential.SecretID},
map[string]string{"q-sign-time": keyTime},
map[string]string{"key": name},
}
if contentType != "" {
conditions = append(conditions, map[string]string{"Content-Type": contentType})
}
policy := map[string]any{
"expiration": expiration.Format("2006-01-02T15:04:05.000Z"),
"conditions": conditions,
}
policyJson, err := json.Marshal(policy)
if err != nil {
return nil, err
}
signKey := hmacSha1val(c.credential.SecretKey, keyTime)
strToSign := sha1val(string(policyJson))
signature := hmacSha1val(signKey, strToSign)
fd := &s3.FormData{
URL: c.client.BaseURL.BucketURL.String(),
File: "file",
Expires: expiration,
FormData: map[string]string{
"policy": base64.StdEncoding.EncodeToString(policyJson),
"q-sign-algorithm": "sha1",
"q-ak": c.credential.SecretID,
"q-key-time": keyTime,
"q-signature": signature,
"key": name,
"success_action_status": strconv.Itoa(successCode),
},
SuccessCodes: []int{successCode},
}
if contentType != "" {
fd.FormData["Content-Type"] = contentType
}
if c.credential.SessionToken != "" {
fd.FormData["x-cos-security-token"] = c.credential.SessionToken
}
return fd, nil
}
func hmacSha1val(key, msg string) string {
v := hmac.New(sha1.New, []byte(key))
v.Write([]byte(msg))
return hex.EncodeToString(v.Sum(nil))
}
func sha1val(msg string) string {
sha1Hash := sha1.New()
sha1Hash.Write([]byte(msg))
return hex.EncodeToString(sha1Hash.Sum(nil))
}

@ -57,6 +57,8 @@ const (
imageThumbnailPath = "openim/thumbnail" imageThumbnailPath = "openim/thumbnail"
) )
const successCode = http.StatusOK
func NewMinio(cache cache.MinioCache) (s3.Interface, error) { func NewMinio(cache cache.MinioCache) (s3.Interface, error) {
u, err := url.Parse(config.Config.Object.Minio.Endpoint) u, err := url.Parse(config.Config.Object.Minio.Endpoint)
if err != nil { if err != nil {
@ -441,3 +443,51 @@ func (m *Minio) getObjectData(ctx context.Context, name string, limit int64) ([]
} }
return io.ReadAll(io.LimitReader(object, limit)) return io.ReadAll(io.LimitReader(object, limit))
} }
func (m *Minio) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) {
if err := m.initMinio(ctx); err != nil {
return nil, err
}
policy := minio.NewPostPolicy()
if err := policy.SetKey(name); err != nil {
return nil, err
}
expires := time.Now().Add(duration)
if err := policy.SetExpires(expires); err != nil {
return nil, err
}
if size > 0 {
if err := policy.SetContentLengthRange(0, size); err != nil {
return nil, err
}
}
if err := policy.SetSuccessStatusAction(strconv.Itoa(successCode)); err != nil {
return nil, err
}
if contentType != "" {
if err := policy.SetContentType(contentType); err != nil {
return nil, err
}
}
if err := policy.SetBucket(m.bucket); err != nil {
return nil, err
}
u, fd, err := m.core.PresignedPostPolicy(ctx, policy)
if err != nil {
return nil, err
}
sign, err := url.Parse(m.signEndpoint)
if err != nil {
return nil, err
}
u.Scheme = sign.Scheme
u.Host = sign.Host
return &s3.FormData{
URL: u.String(),
File: "file",
Header: nil,
FormData: fd,
Expires: expires,
SuccessCodes: []int{successCode},
}, nil
}

@ -16,8 +16,13 @@ package oss
import ( import (
"context" "context"
"crypto/hmac"
"crypto/sha1"
"encoding/base64"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"io"
"net/http" "net/http"
"net/url" "net/url"
"reflect" "reflect"
@ -45,6 +50,8 @@ const (
imageWebp = "webp" imageWebp = "webp"
) )
const successCode = http.StatusOK
const ( const (
videoSnapshotImagePng = "png" videoSnapshotImagePng = "png"
videoSnapshotImageJpg = "jpg" videoSnapshotImageJpg = "jpg"
@ -327,3 +334,45 @@ func (o *OSS) AccessURL(ctx context.Context, name string, expire time.Duration,
params := getURLParams(*o.bucket.Client.Conn, rawParams) params := getURLParams(*o.bucket.Client.Conn, rawParams)
return getURL(o.um, o.bucket.BucketName, name, params).String(), nil return getURL(o.um, o.bucket.BucketName, name, params).String(), nil
} }
func (o *OSS) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) {
// https://help.aliyun.com/zh/oss/developer-reference/postobject?spm=a2c4g.11186623.0.0.1cb83cebkP55nn
expires := time.Now().Add(duration)
conditions := []any{
map[string]string{"bucket": o.bucket.BucketName},
map[string]string{"key": name},
}
if size > 0 {
conditions = append(conditions, []any{"content-length-range", 0, size})
}
policy := map[string]any{
"expiration": expires.Format("2006-01-02T15:04:05.000Z"),
"conditions": conditions,
}
policyJson, err := json.Marshal(policy)
if err != nil {
return nil, err
}
policyStr := base64.StdEncoding.EncodeToString(policyJson)
h := hmac.New(sha1.New, []byte(o.credentials.GetAccessKeySecret()))
if _, err := io.WriteString(h, policyStr); err != nil {
return nil, err
}
fd := &s3.FormData{
URL: o.bucketURL,
File: "file",
Expires: expires,
FormData: map[string]string{
"key": name,
"policy": policyStr,
"OSSAccessKeyId": o.credentials.GetAccessKeyID(),
"success_action_status": strconv.Itoa(successCode),
"signature": base64.StdEncoding.EncodeToString(h.Sum(nil)),
},
SuccessCodes: []int{successCode},
}
if contentType != "" {
fd.FormData["x-oss-content-type"] = contentType
}
return fd, nil
}

@ -74,6 +74,15 @@ type CopyObjectInfo struct {
ETag string `json:"etag"` ETag string `json:"etag"`
} }
type FormData struct {
URL string `json:"url"`
File string `json:"file"`
Header http.Header `json:"header"`
FormData map[string]string `json:"form"`
Expires time.Time `json:"expires"`
SuccessCodes []int `json:"successActionStatus"`
}
type SignPart struct { type SignPart struct {
PartNumber int `json:"partNumber"` PartNumber int `json:"partNumber"`
URL string `json:"url"` URL string `json:"url"`
@ -152,4 +161,6 @@ type Interface interface {
ListUploadedParts(ctx context.Context, uploadID string, name string, partNumberMarker int, maxParts int) (*ListUploadedPartsResult, error) ListUploadedParts(ctx context.Context, uploadID string, name string, partNumberMarker int, maxParts int) (*ListUploadedPartsResult, error)
AccessURL(ctx context.Context, name string, expire time.Duration, opt *AccessURLOption) (string, error) AccessURL(ctx context.Context, name string, expire time.Duration, opt *AccessURLOption) (string, error)
FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*FormData, error)
} }

@ -53,6 +53,7 @@ type UserModelInterface interface {
UpdateByMap(ctx context.Context, userID string, args map[string]any) (err error) UpdateByMap(ctx context.Context, userID string, args map[string]any) (err error)
Find(ctx context.Context, userIDs []string) (users []*UserModel, err error) Find(ctx context.Context, userIDs []string) (users []*UserModel, err error)
Take(ctx context.Context, userID string) (user *UserModel, err error) Take(ctx context.Context, userID string) (user *UserModel, err error)
TakeByNickname(ctx context.Context, nickname string) (user []*UserModel, err error)
Page(ctx context.Context, pagination pagination.Pagination) (count int64, users []*UserModel, err error) Page(ctx context.Context, pagination pagination.Pagination) (count int64, users []*UserModel, err error)
Exist(ctx context.Context, userID string) (exist bool, err error) Exist(ctx context.Context, userID string) (exist bool, err error)
GetAllUserID(ctx context.Context, pagination pagination.Pagination) (count int64, userIDs []string, err error) GetAllUserID(ctx context.Context, pagination pagination.Pagination) (count int64, userIDs []string, err error)

@ -74,6 +74,10 @@ func buildMongoURI() string {
return uri return uri
} }
if config.Config.Mongo.Uri != "" {
return config.Config.Mongo.Uri
}
username := os.Getenv("MONGO_USERNAME") username := os.Getenv("MONGO_USERNAME")
password := os.Getenv("MONGO_PASSWORD") password := os.Getenv("MONGO_PASSWORD")
address := os.Getenv("MONGO_ADDRESS") address := os.Getenv("MONGO_ADDRESS")

@ -24,7 +24,8 @@ import (
func setupTestEnvironment() { func setupTestEnvironment() {
os.Setenv("ZOOKEEPER_SCHEMA", "openim") os.Setenv("ZOOKEEPER_SCHEMA", "openim")
os.Setenv("ZOOKEEPER_ADDRESS", "172.28.0.1:12181") os.Setenv("ZOOKEEPER_ADDRESS", "172.28.0.1")
os.Setenv("ZOOKEEPER_PORT", "12181")
os.Setenv("ZOOKEEPER_USERNAME", "") os.Setenv("ZOOKEEPER_USERNAME", "")
os.Setenv("ZOOKEEPER_PASSWORD", "") os.Setenv("ZOOKEEPER_PASSWORD", "")
} }

@ -18,6 +18,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/stathat/consistent"
"os" "os"
"strconv" "strconv"
"strings" "strings"
@ -31,51 +32,54 @@ import (
// K8sDR represents the Kubernetes service discovery and registration client. // K8sDR represents the Kubernetes service discovery and registration client.
type K8sDR struct { type K8sDR struct {
options []grpc.DialOption options []grpc.DialOption
rpcRegisterAddr string rpcRegisterAddr string
gatewayHostConsistent *consistent.Consistent
} }
// NewK8sDiscoveryRegister creates a new instance of K8sDR for Kubernetes service discovery and registration.
func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) { func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) {
gatewayConsistent := consistent.New()
return &K8sDR{}, nil gatewayHosts := getMsgGatewayHost(context.Background())
for _, v := range gatewayHosts {
gatewayConsistent.Add(v)
}
return &K8sDR{gatewayHostConsistent: gatewayConsistent}, nil
} }
// Register registers a service with Kubernetes.
func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error { func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
if serviceName != config.Config.RpcRegisterName.OpenImMessageGatewayName { if serviceName != config.Config.RpcRegisterName.OpenImMessageGatewayName {
cli.rpcRegisterAddr = serviceName cli.rpcRegisterAddr = serviceName
} else { } else {
cli.rpcRegisterAddr = cli.getSelfHost(context.Background()) cli.rpcRegisterAddr = getSelfHost(context.Background())
} }
return nil return nil
} }
// UnRegister removes a service registration from Kubernetes.
func (cli *K8sDR) UnRegister() error { func (cli *K8sDR) UnRegister() error {
return nil return nil
} }
// CreateRpcRootNodes creates root nodes for RPC in Kubernetes.
func (cli *K8sDR) CreateRpcRootNodes(serviceNames []string) error { func (cli *K8sDR) CreateRpcRootNodes(serviceNames []string) error {
return nil return nil
} }
// RegisterConf2Registry registers a configuration to the registry.
func (cli *K8sDR) RegisterConf2Registry(key string, conf []byte) error { func (cli *K8sDR) RegisterConf2Registry(key string, conf []byte) error {
return nil return nil
} }
// GetConfFromRegistry retrieves a configuration from the registry.
func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) { func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) {
return nil, nil return nil, nil
} }
func (cli *K8sDR) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) {
func (cli *K8sDR) getSelfHost(ctx context.Context) string { host, err := cli.gatewayHostConsistent.Get(userId)
if err != nil {
log.ZError(ctx, "GetUserIdHashGatewayHost error", err)
}
return host, err
}
func getSelfHost(ctx context.Context) string {
port := 88 port := 88
instance := "openimserver" instance := "openimserver"
selfPodName := os.Getenv("MY_POD_NAME") selfPodName := os.Getenv("MY_POD_NAME")
@ -95,26 +99,8 @@ func (cli *K8sDR) getSelfHost(ctx context.Context) string {
return host return host
} }
// GetConns returns a list of gRPC client connections for a given service.
func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
if serviceName != config.Config.RpcRegisterName.OpenImMessageGatewayName {
conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
return []*grpc.ClientConn{conn}, err
}
var ret []*grpc.ClientConn
gatewayHosts := cli.getMsgGatewayHost(ctx)
for _, host := range gatewayHosts {
conn, err := grpc.DialContext(ctx, host, append(cli.options, opts...)...)
if err != nil {
return nil, err
}
ret = append(ret, conn)
}
return ret, nil
}
// like openimserver-openim-msggateway-0.openimserver-openim-msggateway-headless.openim-lin.svc.cluster.local:88 // like openimserver-openim-msggateway-0.openimserver-openim-msggateway-headless.openim-lin.svc.cluster.local:88
func (cli *K8sDR) getMsgGatewayHost(ctx context.Context) []string { func getMsgGatewayHost(ctx context.Context) []string {
port := 88 port := 88
instance := "openimserver" instance := "openimserver"
selfPodName := os.Getenv("MY_POD_NAME") selfPodName := os.Getenv("MY_POD_NAME")
@ -135,40 +121,48 @@ func (cli *K8sDR) getMsgGatewayHost(ctx context.Context) []string {
ret = append(ret, host) ret = append(ret, host)
} }
log.ZInfo(ctx, "getMsgGatewayHost", "instance", instance, "selfPodName", selfPodName, "replicas", replicas, "ns", ns, "ret", ret) log.ZInfo(ctx, "getMsgGatewayHost", "instance", instance, "selfPodName", selfPodName, "replicas", replicas, "ns", ns, "ret", ret)
return ret return ret
} }
func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
// GetConn returns a single gRPC client connection for a given service. if serviceName != config.Config.RpcRegisterName.OpenImMessageGatewayName {
conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
return []*grpc.ClientConn{conn}, err
} else {
var ret []*grpc.ClientConn
gatewayHosts := getMsgGatewayHost(ctx)
for _, host := range gatewayHosts {
conn, err := grpc.DialContext(ctx, host, append(cli.options, opts...)...)
if err != nil {
return nil, err
} else {
ret = append(ret, conn)
}
}
return ret, nil
}
}
func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...) return grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
} }
// GetSelfConnTarget returns the connection target of the client itself.
func (cli *K8sDR) GetSelfConnTarget() string { func (cli *K8sDR) GetSelfConnTarget() string {
return cli.rpcRegisterAddr return cli.rpcRegisterAddr
} }
// AddOption adds gRPC dial options to the client.
func (cli *K8sDR) AddOption(opts ...grpc.DialOption) { func (cli *K8sDR) AddOption(opts ...grpc.DialOption) {
cli.options = append(cli.options, opts...) cli.options = append(cli.options, opts...)
} }
// CloseConn closes a given gRPC client connection.
func (cli *K8sDR) CloseConn(conn *grpc.ClientConn) { func (cli *K8sDR) CloseConn(conn *grpc.ClientConn) {
conn.Close() conn.Close()
} }
// do not use this method for call rpc. // do not use this method for call rpc
func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn { func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn {
fmt.Println("should not call this function!!!!!!!!!!!!!!!!!!!!!!!!!") fmt.Println("should not call this function!!!!!!!!!!!!!!!!!!!!!!!!!")
return nil return nil
} }
// Close closes the K8sDR client.
func (cli *K8sDR) Close() { func (cli *K8sDR) Close() {
// Close any open resources here (if applicable)
return return
} }

@ -52,10 +52,18 @@ func getEnv(key, fallback string) string {
return fallback return fallback
} }
// getZkAddrFromEnv returns the value of an environment variable if it exists, otherwise it returns the fallback value. // getZkAddrFromEnv returns the Zookeeper addresses combined from the ZOOKEEPER_ADDRESS and ZOOKEEPER_PORT environment variables.
// If the environment variables are not set, it returns the fallback value.
func getZkAddrFromEnv(fallback []string) []string { func getZkAddrFromEnv(fallback []string) []string {
if value, exists := os.LookupEnv("ZOOKEEPER_ADDRESS"); exists { address, addrExists := os.LookupEnv("ZOOKEEPER_ADDRESS")
return strings.Split(value, ",") port, portExists := os.LookupEnv("ZOOKEEPER_PORT")
if addrExists && portExists {
addresses := strings.Split(address, ",")
for i, addr := range addresses {
addresses[i] = addr + ":" + port
}
return addresses
} }
return fallback return fallback
} }

@ -432,7 +432,7 @@ func computeApproximateRequestSize(r *http.Request) int {
} }
s += len(r.Host) s += len(r.Host)
// r.Form and r.MultipartForm are assumed to be included in r.URL. // r.FormData and r.MultipartForm are assumed to be included in r.URL.
if r.ContentLength != -1 { if r.ContentLength != -1 {
s += int(r.ContentLength) s += int(r.ContentLength)

@ -112,7 +112,6 @@ func callBackPostReturn(ctx context.Context, url, command string, input interfac
//v.Set(constant.CallbackCommand, command) //v.Set(constant.CallbackCommand, command)
//url = url + "/" + v.Encode() //url = url + "/" + v.Encode()
url = url + "/" + command url = url + "/" + command
b, err := Post(ctx, url, nil, input, callbackConfig.CallbackTimeOut) b, err := Post(ctx, url, nil, input, callbackConfig.CallbackTimeOut)
if err != nil { if err != nil {
if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue { if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue {
@ -121,6 +120,7 @@ func callBackPostReturn(ctx context.Context, url, command string, input interfac
} }
return errs.ErrNetwork.Wrap(err.Error()) return errs.ErrNetwork.Wrap(err.Error())
} }
defer log.ZDebug(ctx, "callback", "data", string(b))
if err = json.Unmarshal(b, output); err != nil { if err = json.Unmarshal(b, output); err != nil {
if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue { if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue {

@ -31,15 +31,14 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
) )
const ( const maxRetry = 10 // number of retries
maxRetry = 10 // Maximum number of retries for producer creation
)
var errEmptyMsg = errors.New("binary msg is empty") var errEmptyMsg = errors.New("kafka binary msg is empty")
// Producer represents a Kafka producer.
type Producer struct { type Producer struct {
topic string
addr []string addr []string
topic string
config *sarama.Config config *sarama.Config
producer sarama.SyncProducer producer sarama.SyncProducer
} }
@ -68,7 +67,7 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
// Get Kafka configuration from environment variables or fallback to config file // Get Kafka configuration from environment variables or fallback to config file
kafkaUsername := getEnvOrConfig("KAFKA_USERNAME", config.Config.Kafka.Username) kafkaUsername := getEnvOrConfig("KAFKA_USERNAME", config.Config.Kafka.Username)
kafkaPassword := getEnvOrConfig("KAFKA_PASSWORD", config.Config.Kafka.Password) kafkaPassword := getEnvOrConfig("KAFKA_PASSWORD", config.Config.Kafka.Password)
kafkaAddr := getEnvOrConfig("KAFKA_ADDRESS", addr[0]) // Assuming addr[0] contains address from config kafkaAddr := getKafkaAddrFromEnv(addr) // Updated to use the new function
// Configure SASL authentication if credentials are provided // Configure SASL authentication if credentials are provided
if kafkaUsername != "" && kafkaPassword != "" { if kafkaUsername != "" && kafkaPassword != "" {
@ -78,7 +77,7 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
} }
// Set the Kafka address // Set the Kafka address
p.addr = []string{kafkaAddr} p.addr = kafkaAddr
// Set up TLS configuration (if required) // Set up TLS configuration (if required)
SetupTLSConfig(p.config) SetupTLSConfig(p.config)

@ -15,7 +15,9 @@
package kafka package kafka
import ( import (
"fmt"
"os" "os"
"strings"
"github.com/IBM/sarama" "github.com/IBM/sarama"
@ -44,3 +46,20 @@ func getEnvOrConfig(envName string, configValue string) string {
} }
return configValue return configValue
} }
// getKafkaAddrFromEnv returns the Kafka addresses combined from the KAFKA_ADDRESS and KAFKA_PORT environment variables.
// If the environment variables are not set, it returns the fallback value.
func getKafkaAddrFromEnv(fallback []string) []string {
envAddr := os.Getenv("KAFKA_ADDRESS")
envPort := os.Getenv("KAFKA_PORT")
if envAddr != "" && envPort != "" {
addresses := strings.Split(envAddr, ",")
for i, addr := range addresses {
addresses[i] = fmt.Sprintf("%s:%s", addr, envPort)
}
return addresses
}
return fallback
}

@ -30,10 +30,9 @@ func NewOptions(opts ...OptionsOpt) Options {
options[constant.IsOfflinePush] = false options[constant.IsOfflinePush] = false
options[constant.IsUnreadCount] = false options[constant.IsUnreadCount] = false
options[constant.IsConversationUpdate] = false options[constant.IsConversationUpdate] = false
options[constant.IsSenderSync] = false options[constant.IsSenderSync] = true
options[constant.IsNotPrivate] = false options[constant.IsNotPrivate] = false
options[constant.IsSenderConversationUpdate] = false options[constant.IsSenderConversationUpdate] = false
options[constant.IsSenderNotificationPush] = false
options[constant.IsReactionFromCache] = false options[constant.IsReactionFromCache] = false
for _, opt := range opts { for _, opt := range opts {
opt(options) opt(options)
@ -114,12 +113,6 @@ func WithSenderConversationUpdate() OptionsOpt {
} }
} }
func WithSenderNotificationPush() OptionsOpt {
return func(options Options) {
options[constant.IsSenderNotificationPush] = true
}
}
func WithReactionFromCache() OptionsOpt { func WithReactionFromCache() OptionsOpt {
return func(options Options) { return func(options Options) {
options[constant.IsReactionFromCache] = true options[constant.IsReactionFromCache] = true
@ -174,10 +167,6 @@ func (o Options) IsSenderConversationUpdate() bool {
return o.Is(constant.IsSenderConversationUpdate) return o.Is(constant.IsSenderConversationUpdate)
} }
func (o Options) IsSenderNotificationPush() bool {
return o.Is(constant.IsSenderNotificationPush)
}
func (o Options) IsReactionFromCache() bool { func (o Options) IsReactionFromCache() bool {
return o.Is(constant.IsReactionFromCache) return o.Is(constant.IsReactionFromCache)
} }

@ -17,7 +17,6 @@ package rpcclient
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
@ -68,6 +67,7 @@ func newContentTypeConf() map[int32]config.NotificationConf {
constant.BlackAddedNotification: config.Config.Notification.BlackAdded, constant.BlackAddedNotification: config.Config.Notification.BlackAdded,
constant.BlackDeletedNotification: config.Config.Notification.BlackDeleted, constant.BlackDeletedNotification: config.Config.Notification.BlackDeleted,
constant.FriendInfoUpdatedNotification: config.Config.Notification.FriendInfoUpdated, constant.FriendInfoUpdatedNotification: config.Config.Notification.FriendInfoUpdated,
constant.FriendsInfoUpdateNotification: config.Config.Notification.FriendInfoUpdated, //use the same FriendInfoUpdated
// conversation // conversation
constant.ConversationChangeNotification: config.Config.Notification.ConversationChanged, constant.ConversationChangeNotification: config.Config.Notification.ConversationChanged,
constant.ConversationUnreadNotification: config.Config.Notification.ConversationChanged, constant.ConversationUnreadNotification: config.Config.Notification.ConversationChanged,
@ -115,6 +115,7 @@ func newSessionTypeConf() map[int32]int32 {
constant.BlackAddedNotification: constant.SingleChatType, constant.BlackAddedNotification: constant.SingleChatType,
constant.BlackDeletedNotification: constant.SingleChatType, constant.BlackDeletedNotification: constant.SingleChatType,
constant.FriendInfoUpdatedNotification: constant.SingleChatType, constant.FriendInfoUpdatedNotification: constant.SingleChatType,
constant.FriendsInfoUpdateNotification: constant.SingleChatType,
// conversation // conversation
constant.ConversationChangeNotification: constant.SingleChatType, constant.ConversationChangeNotification: constant.SingleChatType,
constant.ConversationUnreadNotification: constant.SingleChatType, constant.ConversationUnreadNotification: constant.SingleChatType,
@ -155,6 +156,30 @@ func (m *MessageRpcClient) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqRe
return resp, err return resp, err
} }
func (m *MessageRpcClient) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
log.ZDebug(ctx, "GetMaxSeqs", "conversationIDs", conversationIDs)
resp, err := m.Client.GetMaxSeqs(ctx, &msg.GetMaxSeqsReq{
ConversationIDs: conversationIDs,
})
return resp.MaxSeqs, err
}
func (m *MessageRpcClient) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
resp, err := m.Client.GetHasReadSeqs(ctx, &msg.GetHasReadSeqsReq{
UserID: userID,
ConversationIDs: conversationIDs,
})
return resp.MaxSeqs, err
}
func (m *MessageRpcClient) GetMsgByConversationIDs(ctx context.Context, docIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error) {
resp, err := m.Client.GetMsgByConversationIDs(ctx, &msg.GetMsgByConversationIDsReq{
ConversationIDs: docIDs,
MaxSeqs: seqs,
})
return resp.MsgDatas, err
}
func (m *MessageRpcClient) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMessageBySeqsReq) (*sdkws.PullMessageBySeqsResp, error) { func (m *MessageRpcClient) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMessageBySeqsReq) (*sdkws.PullMessageBySeqsResp, error) {
resp, err := m.Client.PullMessageBySeqs(ctx, req) resp, err := m.Client.PullMessageBySeqs(ctx, req)
return resp, err return resp, err
@ -256,6 +281,7 @@ func (s *NotificationSender) NotificationWithSesstionType(ctx context.Context, s
optionsConfig.ReliabilityLevel = constant.UnreliableNotification optionsConfig.ReliabilityLevel = constant.UnreliableNotification
} }
options := config.GetOptionsByNotification(optionsConfig) options := config.GetOptionsByNotification(optionsConfig)
s.SetOptionsByContentType(ctx, options, contentType)
msg.Options = options msg.Options = options
offlineInfo.Title = title offlineInfo.Title = title
offlineInfo.Desc = desc offlineInfo.Desc = desc
@ -274,3 +300,11 @@ func (s *NotificationSender) NotificationWithSesstionType(ctx context.Context, s
func (s *NotificationSender) Notification(ctx context.Context, sendID, recvID string, contentType int32, m proto.Message, opts ...NotificationOptions) error { func (s *NotificationSender) Notification(ctx context.Context, sendID, recvID string, contentType int32, m proto.Message, opts ...NotificationOptions) error {
return s.NotificationWithSesstionType(ctx, sendID, recvID, contentType, s.sessionTypeConf[contentType], m, opts...) return s.NotificationWithSesstionType(ctx, sendID, recvID, contentType, s.sessionTypeConf[contentType], m, opts...)
} }
func (s *NotificationSender) SetOptionsByContentType(_ context.Context, options map[string]bool, contentType int32) {
switch contentType {
case constant.UserStatusChangeNotification:
options[constant.IsSenderSync] = false
default:
}
}

@ -196,7 +196,12 @@ func (f *FriendNotificationSender) FriendRemarkSetNotification(ctx context.Conte
tips.FromToUserID.ToUserID = toUserID tips.FromToUserID.ToUserID = toUserID
return f.Notification(ctx, fromUserID, toUserID, constant.FriendRemarkSetNotification, &tips) return f.Notification(ctx, fromUserID, toUserID, constant.FriendRemarkSetNotification, &tips)
} }
func (f *FriendNotificationSender) FriendsInfoUpdateNotification(ctx context.Context, toUserID string, friendIDs []string) error {
tips := sdkws.FriendsInfoUpdateTips{}
tips.FromToUserID.ToUserID = toUserID
tips.FriendIDs = friendIDs
return f.Notification(ctx, toUserID, toUserID, constant.FriendsInfoUpdateNotification, &tips)
}
func (f *FriendNotificationSender) BlackAddedNotification(ctx context.Context, req *pbfriend.AddBlackReq) error { func (f *FriendNotificationSender) BlackAddedNotification(ctx context.Context, req *pbfriend.AddBlackReq) error {
tips := sdkws.BlackAddedTips{FromToUserID: &sdkws.FromToUserID{}} tips := sdkws.BlackAddedTips{FromToUserID: &sdkws.FromToUserID{}}
tips.FromToUserID.FromUserID = req.OwnerUserID tips.FromToUserID.FromUserID = req.OwnerUserID

@ -179,3 +179,10 @@ func (u *UserRpcClient) SetUserStatus(ctx context.Context, userID string, status
}) })
return err return err
} }
func (u *UserRpcClient) GetNotificationByID(ctx context.Context, userID string) error {
_, err := u.Client.GetNotificationAccount(ctx, &user.GetNotificationAccountReq{
UserID: userID,
})
return err
}

@ -33,15 +33,15 @@ openim::log::info "\n# Begin to check all openim service"
# OpenIM status # OpenIM status
# Elegant printing function # Elegant printing function
print_services_and_ports() { print_services_and_ports() {
local -n service_names=$1 service_names=("$1[@]")
local -n service_ports=$2 service_ports=("$2[@]")
echo "+-------------------------+----------+" echo "+-------------------------+----------+"
echo "| Service Name | Port |" echo "| Service Name | Port |"
echo "+-------------------------+----------+" echo "+-------------------------+----------+"
for index in "${!service_names[@]}"; do for index in "${!service_names}"; do
printf "| %-23s | %-8s |\n" "${service_names[$index]}" "${service_ports[$index]}" printf "| %-23s | %-8s |\n" "${!service_names[$index]}" "${!service_ports[$index]}"
done done
echo "+-------------------------+----------+" echo "+-------------------------+----------+"
@ -89,4 +89,4 @@ else
echo "++++ Check all openim service ports successfully !" echo "++++ Check all openim service ports successfully !"
fi fi
set -e set -e

@ -353,6 +353,8 @@ def "MANAGER_USERID_3" "openIMAdmin" # 管理员ID 3
def "NICKNAME_1" "system1" # 昵称1 def "NICKNAME_1" "system1" # 昵称1
def "NICKNAME_2" "system2" # 昵称2 def "NICKNAME_2" "system2" # 昵称2
def "NICKNAME_3" "system3" # 昵称3 def "NICKNAME_3" "system3" # 昵称3
def "IM_ADMIN_USERID" "imAdmin" # IM管理员ID
def "IM_ADMIN_NAME" "imAdmin" # IM管理员昵称
def "MULTILOGIN_POLICY" "1" # 多登录策略 def "MULTILOGIN_POLICY" "1" # 多登录策略
def "CHAT_PERSISTENCE_MYSQL" "true" # 聊天持久化MySQL def "CHAT_PERSISTENCE_MYSQL" "true" # 聊天持久化MySQL
def "MSG_CACHE_TIMEOUT" "86400" # 消息缓存超时 def "MSG_CACHE_TIMEOUT" "86400" # 消息缓存超时

@ -16,6 +16,7 @@ package main
import ( import (
"context" "context"
"errors"
"flag" "flag"
"fmt" "fmt"
"net" "net"
@ -72,7 +73,7 @@ func initCfg() error {
type checkFunc struct { type checkFunc struct {
name string name string
function func() error function func() (string, error)
} }
func main() { func main() {
@ -101,13 +102,13 @@ func main() {
allSuccess := true allSuccess := true
for _, check := range checks { for _, check := range checks {
err := check.function() str, err := check.function()
if err != nil { if err != nil {
errorPrint(fmt.Sprintf("Starting %s failed: %v", check.name, err)) errorPrint(fmt.Sprintf("Starting %s failed, %v", check.name, err))
allSuccess = false allSuccess = false
break break
} else { } else {
successPrint(fmt.Sprintf("%s starts successfully", check.name)) successPrint(fmt.Sprintf("%s connected successfully, %s", check.name, str))
} }
} }
@ -142,21 +143,22 @@ func getEnv(key, fallback string) string {
} }
// checkMongo checks the MongoDB connection // checkMongo checks the MongoDB connection
func checkMongo() error { func checkMongo() (string, error) {
// Use environment variables or fallback to config // Use environment variables or fallback to config
uri := getEnv("MONGO_URI", buildMongoURI()) uri := getEnv("MONGO_URI", buildMongoURI())
client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(uri)) client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(uri))
str := "ths addr is:" + strings.Join(config.Config.Mongo.Address, ",")
if err != nil { if err != nil {
return errs.Wrap(err) return "", errs.Wrap(errStr(err, str))
} }
defer client.Disconnect(context.TODO()) defer client.Disconnect(context.TODO())
if err = client.Ping(context.TODO(), nil); err != nil { if err = client.Ping(context.TODO(), nil); err != nil {
return errs.Wrap(err) return "", errs.Wrap(errStr(err, str))
} }
return nil return str, nil
} }
// buildMongoURI constructs the MongoDB URI using configuration settings // buildMongoURI constructs the MongoDB URI using configuration settings
@ -178,10 +180,10 @@ func buildMongoURI() string {
} }
// checkMinio checks the MinIO connection // checkMinio checks the MinIO connection
func checkMinio() error { func checkMinio() (string, error) {
// Check if MinIO is enabled // Check if MinIO is enabled
if config.Config.Object.Enable != "minio" { if config.Config.Object.Enable != "minio" {
return nil return "", nil
} }
// Prioritize environment variables // Prioritize environment variables
@ -191,13 +193,14 @@ func checkMinio() error {
useSSL := getEnv("MINIO_USE_SSL", "false") // Assuming SSL is not used by default useSSL := getEnv("MINIO_USE_SSL", "false") // Assuming SSL is not used by default
if endpoint == "" || accessKeyID == "" || secretAccessKey == "" { if endpoint == "" || accessKeyID == "" || secretAccessKey == "" {
return ErrConfig.Wrap("MinIO configuration missing") return "", ErrConfig.Wrap("MinIO configuration missing")
} }
// Parse endpoint URL to determine if SSL is enabled // Parse endpoint URL to determine if SSL is enabled
u, err := url.Parse(endpoint) u, err := url.Parse(endpoint)
if err != nil { if err != nil {
return errs.Wrap(err) str := "the endpoint is:" + endpoint
return "", errs.Wrap(errStr(err, str))
} }
secure := u.Scheme == "https" || useSSL == "true" secure := u.Scheme == "https" || useSSL == "true"
@ -206,31 +209,34 @@ func checkMinio() error {
Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""), Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""),
Secure: secure, Secure: secure,
}) })
str := "ths addr is:" + u.Host
if err != nil { if err != nil {
return errs.Wrap(err) strs := fmt.Sprintf("%v;host:%s,accessKeyID:%s,secretAccessKey:%s,Secure:%v", err, u.Host, accessKeyID, secretAccessKey, secure)
return "", errs.Wrap(err, strs)
} }
// Perform health check // Perform health check
cancel, err := minioClient.HealthCheck(time.Duration(minioHealthCheckDuration) * time.Second) cancel, err := minioClient.HealthCheck(time.Duration(minioHealthCheckDuration) * time.Second)
if err != nil { if err != nil {
return errs.Wrap(err) return "", errs.Wrap(errStr(err, str))
} }
defer cancel() defer cancel()
if minioClient.IsOffline() { if minioClient.IsOffline() {
return ErrComponentStart.Wrap("Minio server is offline") str := fmt.Sprintf("Minio server is offline;%s", str)
return "", ErrComponentStart.Wrap(str)
} }
// Check for localhost in API URL and Minio SignEndpoint // Check for localhost in API URL and Minio SignEndpoint
if exactIP(config.Config.Object.ApiURL) == "127.0.0.1" || exactIP(config.Config.Object.Minio.SignEndpoint) == "127.0.0.1" { if exactIP(config.Config.Object.ApiURL) == "127.0.0.1" || exactIP(config.Config.Object.Minio.SignEndpoint) == "127.0.0.1" {
return ErrConfig.Wrap("apiURL or Minio SignEndpoint endpoint contain 127.0.0.1") return "", ErrConfig.Wrap("apiURL or Minio SignEndpoint endpoint contain 127.0.0.1")
} }
return nil return str, nil
} }
// checkRedis checks the Redis connection // checkRedis checks the Redis connection
func checkRedis() error { func checkRedis() (string, error) {
// Prioritize environment variables // Prioritize environment variables
address := getEnv("REDIS_ADDRESS", strings.Join(config.Config.Redis.Address, ",")) address := getEnv("REDIS_ADDRESS", strings.Join(config.Config.Redis.Address, ","))
username := getEnv("REDIS_USERNAME", config.Config.Redis.Username) username := getEnv("REDIS_USERNAME", config.Config.Redis.Username)
@ -259,15 +265,16 @@ func checkRedis() error {
// Ping Redis to check connectivity // Ping Redis to check connectivity
_, err := redisClient.Ping(context.Background()).Result() _, err := redisClient.Ping(context.Background()).Result()
str := "the addr is:" + strings.Join(redisAddresses, ",")
if err != nil { if err != nil {
return errs.Wrap(err) return "", errs.Wrap(errStr(err, str))
} }
return nil return str, nil
} }
// checkZookeeper checks the Zookeeper connection // checkZookeeper checks the Zookeeper connection
func checkZookeeper() error { func checkZookeeper() (string, error) {
// Prioritize environment variables // Prioritize environment variables
schema := getEnv("ZOOKEEPER_SCHEMA", "digest") schema := getEnv("ZOOKEEPER_SCHEMA", "digest")
address := getEnv("ZOOKEEPER_ADDRESS", strings.Join(config.Config.Zookeeper.ZkAddr, ",")) address := getEnv("ZOOKEEPER_ADDRESS", strings.Join(config.Config.Zookeeper.ZkAddr, ","))
@ -278,30 +285,38 @@ func checkZookeeper() error {
zookeeperAddresses := strings.Split(address, ",") zookeeperAddresses := strings.Split(address, ",")
// Connect to Zookeeper // Connect to Zookeeper
c, _, err := zk.Connect(zookeeperAddresses, time.Second) // Adjust the timeout as necessary str := "the addr is:" + address
c, eventChan, err := zk.Connect(zookeeperAddresses, time.Second) // Adjust the timeout as necessary
if err != nil { if err != nil {
return errs.Wrap(err) return "", errs.Wrap(errStr(err, str))
} }
timeout := time.After(5 * time.Second)
for {
select {
case event := <-eventChan:
if event.State == zk.StateConnected {
fmt.Println("Connected to Zookeeper")
goto Connected
}
case <-timeout:
return "", errs.Wrap(errors.New("timeout waiting for Zookeeper connection"), "Zookeeper Addr: "+strings.Join(config.Config.Zookeeper.ZkAddr, " "))
}
}
Connected:
defer c.Close() defer c.Close()
// Set authentication if username and password are provided // Set authentication if username and password are provided
if username != "" && password != "" { if username != "" && password != "" {
if err := c.AddAuth(schema, []byte(username+":"+password)); err != nil { if err := c.AddAuth(schema, []byte(username+":"+password)); err != nil {
return errs.Wrap(err) return "", errs.Wrap(errStr(err, str))
} }
} }
// Check if Zookeeper is reachable return str, nil
_, _, err = c.Get("/")
if err != nil {
return errs.Wrap(err)
}
return nil
} }
// checkKafka checks the Kafka connection // checkKafka checks the Kafka connection
func checkKafka() error { func checkKafka() (string, error) {
// Prioritize environment variables // Prioritize environment variables
username := getEnv("KAFKA_USERNAME", config.Config.Kafka.Username) username := getEnv("KAFKA_USERNAME", config.Config.Kafka.Username)
password := getEnv("KAFKA_PASSWORD", config.Config.Kafka.Password) password := getEnv("KAFKA_PASSWORD", config.Config.Kafka.Password)
@ -321,16 +336,17 @@ func checkKafka() error {
// kafka.SetupTLSConfig(cfg) // kafka.SetupTLSConfig(cfg)
// Create Kafka client // Create Kafka client
str := "the addr is:" + address
kafkaClient, err := sarama.NewClient(kafkaAddresses, cfg) kafkaClient, err := sarama.NewClient(kafkaAddresses, cfg)
if err != nil { if err != nil {
return errs.Wrap(err) return "", errs.Wrap(errStr(err, str))
} }
defer kafkaClient.Close() defer kafkaClient.Close()
// Verify if necessary topics exist // Verify if necessary topics exist
topics, err := kafkaClient.Topics() topics, err := kafkaClient.Topics()
if err != nil { if err != nil {
return errs.Wrap(err) return "", errs.Wrap(err)
} }
requiredTopics := []string{ requiredTopics := []string{
@ -341,11 +357,11 @@ func checkKafka() error {
for _, requiredTopic := range requiredTopics { for _, requiredTopic := range requiredTopics {
if !isTopicPresent(requiredTopic, topics) { if !isTopicPresent(requiredTopic, topics) {
return ErrComponentStart.Wrap(fmt.Sprintf("Kafka doesn't contain topic: %v", requiredTopic)) return "", ErrComponentStart.Wrap(fmt.Sprintf("Kafka doesn't contain topic: %v", requiredTopic))
} }
} }
return nil return str, nil
} }
// isTopicPresent checks if a topic is present in the list of topics // isTopicPresent checks if a topic is present in the list of topics
@ -373,3 +389,7 @@ func successPrint(s string) {
func warningPrint(s string) { func warningPrint(s string) {
colorPrint(colorYellow, "Warning: But %v", s) colorPrint(colorYellow, "Warning: But %v", s)
} }
func errStr(err error, str string) error {
return fmt.Errorf("%v;%s", err, str)
}

@ -21,22 +21,10 @@ import (
"time" "time"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
) )
func TestCheckMysql(t *testing.T) {
err := mockInitCfg()
assert.NoError(t, err, "Initialization should not produce errors")
err = checkMysql()
if err != nil {
// You might expect an error if MySQL isn't running locally with the mock credentials.
t.Logf("Expected error due to mock configuration: %v", err)
}
}
// Mock for initCfg for testing purpose // Mock for initCfg for testing purpose
func mockInitCfg() error { func mockInitCfg() error {
config.Config.Mysql.Username = "root" config.Config.Mysql.Username = "root"

Loading…
Cancel
Save