Merge remote-tracking branch 'upstream/main'

pull/1499/head
AndrewZuo01 2 years ago
commit 5b6538dc5f

@ -18,11 +18,13 @@ import (
"context" "context"
"fmt" "fmt"
"net" "net"
"net/http"
_ "net/http/pprof" _ "net/http/pprof"
"os"
"os/signal"
"strconv" "strconv"
"syscall"
ginProm "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus" "time"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/tools/discoveryregistry" "github.com/OpenIMSDK/tools/discoveryregistry"
@ -33,6 +35,8 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
ginProm "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
) )
func main() { func main() {
@ -51,13 +55,12 @@ func run(port int, proPort int) error {
if port == 0 || proPort == 0 { if port == 0 || proPort == 0 {
err := "port or proPort is empty:" + strconv.Itoa(port) + "," + strconv.Itoa(proPort) err := "port or proPort is empty:" + strconv.Itoa(port) + "," + strconv.Itoa(proPort)
log.ZError(context.Background(), err, nil) log.ZError(context.Background(), err, nil)
return fmt.Errorf(err) return fmt.Errorf(err)
} }
rdb, err := cache.NewRedis() rdb, err := cache.NewRedis()
if err != nil { if err != nil {
log.ZError(context.Background(), "Failed to initialize Redis", err) log.ZError(context.Background(), "Failed to initialize Redis", err)
return err return err
} }
log.ZInfo(context.Background(), "api start init discov client") log.ZInfo(context.Background(), "api start init discov client")
@ -68,30 +71,29 @@ func run(port int, proPort int) error {
client, err = kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery) client, err = kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery)
if err != nil { if err != nil {
log.ZError(context.Background(), "Failed to initialize discovery register", err) log.ZError(context.Background(), "Failed to initialize discovery register", err)
return err return err
} }
if err = client.CreateRpcRootNodes(config.Config.GetServiceNames()); err != nil { if err = client.CreateRpcRootNodes(config.Config.GetServiceNames()); err != nil {
log.ZError(context.Background(), "Failed to create RPC root nodes", err) log.ZError(context.Background(), "Failed to create RPC root nodes", err)
return err return err
} }
log.ZInfo(context.Background(), "api register public config to discov") log.ZInfo(context.Background(), "api register public config to discov")
if err = client.RegisterConf2Registry(constant.OpenIMCommonConfigKey, config.Config.EncodeConfig()); err != nil { if err = client.RegisterConf2Registry(constant.OpenIMCommonConfigKey, config.Config.EncodeConfig()); err != nil {
log.ZError(context.Background(), "Failed to register public config to discov", err) log.ZError(context.Background(), "Failed to register public config to discov", err)
return err return err
} }
log.ZInfo(context.Background(), "api register public config to discov success") log.ZInfo(context.Background(), "api register public config to discov success")
router := api.NewGinRouter(client, rdb) router := api.NewGinRouter(client, rdb)
//////////////////////////////
if config.Config.Prometheus.Enable { if config.Config.Prometheus.Enable {
p := ginProm.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api")) p := ginProm.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api"))
p.SetListenAddress(fmt.Sprintf(":%d", proPort)) p.SetListenAddress(fmt.Sprintf(":%d", proPort))
p.Use(router) p.Use(router)
} }
/////////////////////////////////
log.ZInfo(context.Background(), "api init router success") log.ZInfo(context.Background(), "api init router success")
var address string var address string
if config.Config.Api.ListenIP != "" { if config.Config.Api.ListenIP != "" {
address = net.JoinHostPort(config.Config.Api.ListenIP, strconv.Itoa(port)) address = net.JoinHostPort(config.Config.Api.ListenIP, strconv.Itoa(port))
@ -100,10 +102,25 @@ func run(port int, proPort int) error {
} }
log.ZInfo(context.Background(), "start api server", "address", address, "OpenIM version", config.Version) log.ZInfo(context.Background(), "start api server", "address", address, "OpenIM version", config.Version)
err = router.Run(address) server := http.Server{Addr: address, Handler: router}
if err != nil { go func() {
err = server.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
log.ZError(context.Background(), "api run failed", err, "address", address) log.ZError(context.Background(), "api run failed", err, "address", address)
os.Exit(1)
}
}()
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
<-sigs
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
// graceful shutdown operation.
if err := server.Shutdown(ctx); err != nil {
log.ZError(context.Background(), "failed to api-server shutdown", err)
return err return err
} }

@ -84,8 +84,8 @@ $ sudo sealos run labring/kubernetes:v1.25.0 labring/helm:v3.8.2 labring/calico:
If you are local, you can also use Kind and Minikube to test, for example, using Kind: If you are local, you can also use Kind and Minikube to test, for example, using Kind:
```bash ```bash
$ sGO111MODULE="on" go get sigs.k8s.io/kind@v0.11.1 $ GO111MODULE="on" go get sigs.k8s.io/kind@v0.11.1
$ skind create cluster $ kind create cluster
``` ```
### Installing helm ### Installing helm

@ -19,6 +19,7 @@ templates:
- /etc/alertmanager/email.tmpl - /etc/alertmanager/email.tmpl
route: route:
group_by: ['alertname']
group_wait: 5s group_wait: 5s
group_interval: 5s group_interval: 5s
repeat_interval: 5m repeat_interval: 5m

@ -314,68 +314,125 @@ iosPush:
callback: callback:
url: "" url: ""
beforeSendSingleMsg: beforeSendSingleMsg:
enable: false enable: ${CALLBACK_ENABLE}
timeout: 5 timeout: ${CALLBACK_TIMEOUT}
failedContinue: true failedContinue: ${CALLBACK_FAILED_CONTINUE}
afterSendSingleMsg: afterSendSingleMsg:
enable: false enable: ${CALLBACK_ENABLE}
timeout: 5 timeout: ${CALLBACK_TIMEOUT}
failedContinue: ${CALLBACK_FAILED_CONTINUE}
beforeSendGroupMsg: beforeSendGroupMsg:
enable: false enable: ${CALLBACK_ENABLE}
timeout: 5 timeout: ${CALLBACK_TIMEOUT}
failedContinue: true failedContinue: ${CALLBACK_FAILED_CONTINUE}
afterSendGroupMsg: afterSendGroupMsg:
enable: false enable: ${CALLBACK_ENABLE}
timeout: 5 timeout: ${CALLBACK_TIMEOUT}
failedContinue: ${CALLBACK_FAILED_CONTINUE}
msgModify: msgModify:
enable: false enable: ${CALLBACK_ENABLE}
timeout: 5 timeout: ${CALLBACK_TIMEOUT}
failedContinue: true failedContinue: ${CALLBACK_FAILED_CONTINUE}
userOnline: userOnline:
enable: false enable: ${CALLBACK_ENABLE}
timeout: 5 timeout: ${CALLBACK_TIMEOUT}
failedContinue: ${CALLBACK_FAILED_CONTINUE}
userOffline: userOffline:
enable: false enable: ${CALLBACK_ENABLE}
timeout: 5 timeout: ${CALLBACK_TIMEOUT}
failedContinue: ${CALLBACK_FAILED_CONTINUE}
userKickOff: userKickOff:
enable: false enable: ${CALLBACK_ENABLE}
timeout: 5 timeout: ${CALLBACK_TIMEOUT}
failedContinue: ${CALLBACK_FAILED_CONTINUE}
offlinePush: offlinePush:
enable: false enable: ${CALLBACK_ENABLE}
timeout: 5 timeout: ${CALLBACK_TIMEOUT}
failedContinue: true failedContinue: ${CALLBACK_FAILED_CONTINUE}
onlinePush: onlinePush:
enable: false enable: ${CALLBACK_ENABLE}
timeout: 5 timeout: ${CALLBACK_TIMEOUT}
failedContinue: true failedContinue: ${CALLBACK_FAILED_CONTINUE}
superGroupOnlinePush: superGroupOnlinePush:
enable: false enable: ${CALLBACK_ENABLE}
timeout: 5 timeout: ${CALLBACK_TIMEOUT}
failedContinue: true failedContinue: ${CALLBACK_FAILED_CONTINUE}
beforeAddFriend: beforeAddFriend:
enable: false enable: ${CALLBACK_ENABLE}
timeout: 5 timeout: ${CALLBACK_TIMEOUT}
failedContinue: true failedContinue: ${CALLBACK_FAILED_CONTINUE}
beforeUpdateUserInfo: beforeUpdateUserInfo:
enable: false enable: ${CALLBACK_ENABLE}
timeout: 5 timeout: ${CALLBACK_TIMEOUT}
failedContinue: true failedContinue: ${CALLBACK_FAILED_CONTINUE}
beforeCreateGroup: beforeCreateGroup:
enable: false enable: ${CALLBACK_ENABLE}
timeout: 5 timeout: ${CALLBACK_TIMEOUT}
failedContinue: true failedContinue: ${CALLBACK_FAILED_CONTINUE}
afterCreateGroup:
enable: ${CALLBACK_ENABLE}
timeout: ${CALLBACK_TIMEOUT}
failedContinue: ${CALLBACK_FAILED_CONTINUE}
beforeMemberJoinGroup: beforeMemberJoinGroup:
enable: false enable: ${CALLBACK_ENABLE}
timeout: 5 timeout: ${CALLBACK_TIMEOUT}
failedContinue: true failedContinue: ${CALLBACK_FAILED_CONTINUE}
beforeSetGroupMemberInfo: beforeSetGroupMemberInfo:
enable: false enable: ${CALLBACK_ENABLE}
timeout: 5 timeout: ${CALLBACK_TIMEOUT}
failedContinue: true failedContinue: ${CALLBACK_FAILED_CONTINUE}
setMessageReactionExtensions: setMessageReactionExtensions:
enable: false enable: ${CALLBACK_ENABLE}
timeout: 5 timeout: ${CALLBACK_TIMEOUT}
failedContinue: true failedContinue: ${CALLBACK_FAILED_CONTINUE}
quitGroup:
enable: ${CALLBACK_ENABLE}
timeout: ${CALLBACK_TIMEOUT}
failedContinue: ${CALLBACK_FAILED_CONTINUE}
killGroupMember:
enable: ${CALLBACK_ENABLE}
timeout: ${CALLBACK_TIMEOUT}
failedContinue: ${CALLBACK_FAILED_CONTINUE}
dismissGroup:
enable: ${CALLBACK_ENABLE}
timeout: ${CALLBACK_TIMEOUT}
failedContinue: ${CALLBACK_FAILED_CONTINUE}
joinGroup:
enable: ${CALLBACK_ENABLE}
timeout: ${CALLBACK_TIMEOUT}
failedContinue: ${CALLBACK_FAILED_CONTINUE}
groupMsgRead:
enable: ${CALLBACK_ENABLE}
timeout: ${CALLBACK_TIMEOUT}
failedContinue: ${CALLBACK_FAILED_CONTINUE}
singleMsgRead:
enable: ${CALLBACK_ENABLE}
timeout: ${CALLBACK_TIMEOUT}
failedContinue: ${CALLBACK_FAILED_CONTINUE}
updateUserInfo:
enable: ${CALLBACK_ENABLE}
timeout: ${CALLBACK_TIMEOUT}
failedContinue: ${CALLBACK_FAILED_CONTINUE}
beforeUserRegister:
enable: ${CALLBACK_ENABLE}
timeout: ${CALLBACK_TIMEOUT}
failedContinue: ${CALLBACK_FAILED_CONTINUE}
afterUserRegister:
enable: ${CALLBACK_ENABLE}
timeout: ${CALLBACK_TIMEOUT}
failedContinue: ${CALLBACK_FAILED_CONTINUE}
transferGroupOwner:
enable: ${CALLBACK_ENABLE}
timeout: ${CALLBACK_TIMEOUT}
failedContinue: ${CALLBACK_FAILED_CONTINUE}
beforeSetFriendRemark:
enable: ${CALLBACK_ENABLE}
timeout: ${CALLBACK_TIMEOUT}
failedContinue: ${CALLBACK_FAILED_CONTINUE}
afterSetFriendRemark:
enable: ${CALLBACK_ENABLE}
timeout: ${CALLBACK_TIMEOUT}
failedContinue: ${CALLBACK_FAILED_CONTINUE}
afterGroupMsgRead: afterGroupMsgRead:
enable: false enable: false
timeout: 5 timeout: 5

@ -100,7 +100,6 @@ services:
- KAFKA_CFG_NODE_ID=0 - KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker - KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@<your_host>:9093 - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@<your_host>:9093
- KAFKA_HEAP_OPTS:"-Xmx256m -Xms256m"
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://${DOCKER_BRIDGE_GATEWAY}:${KAFKA_PORT} - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://${DOCKER_BRIDGE_GATEWAY}:${KAFKA_PORT}
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT

@ -466,7 +466,7 @@ This section involves configuring the log settings, including storage location,
This section involves setting up additional configuration variables for Websocket, Push Notifications, and Chat. This section involves setting up additional configuration variables for Websocket, Push Notifications, and Chat.
| Parameter | Example Value | Description | | Parameter | Example Value | Description |
| ----------------------- | ----------------- | ---------------------------------- | |-------------------------|-------------------|------------------------------------|
| WEBSOCKET_MAX_CONN_NUM | "100000" | Maximum Websocket connections | | WEBSOCKET_MAX_CONN_NUM | "100000" | Maximum Websocket connections |
| WEBSOCKET_MAX_MSG_LEN | "4096" | Maximum Websocket message length | | WEBSOCKET_MAX_MSG_LEN | "4096" | Maximum Websocket message length |
| WEBSOCKET_TIMEOUT | "10" | Websocket timeout | | WEBSOCKET_TIMEOUT | "10" | Websocket timeout |
@ -500,9 +500,9 @@ This section involves setting up additional configuration variables for Websocke
| TOKEN_EXPIRE | "90" | Token Expiry Time | | TOKEN_EXPIRE | "90" | Token Expiry Time |
| FRIEND_VERIFY | "false" | Friend Verification Enable | | FRIEND_VERIFY | "false" | Friend Verification Enable |
| IOS_PUSH_SOUND | "xxx" | iOS | | IOS_PUSH_SOUND | "xxx" | iOS |
| CALLBACK_ENABLE | "true" | Enable callback | | | |
| CALLBACK_TIMEOUT | "5" | Maximum timeout for callback call |
| CALLBACK_FAILED_CONTINUE| "true" | fails to continue to the next step |
### 2.20. <a name='PrometheusConfiguration-1'></a>Prometheus Configuration ### 2.20. <a name='PrometheusConfiguration-1'></a>Prometheus Configuration
This section involves configuring Prometheus, including enabling/disabling it and setting up ports for various services. This section involves configuring Prometheus, including enabling/disabling it and setting up ports for various services.

@ -37,7 +37,7 @@ func CallbackUserOnline(ctx context.Context, userID string, platformID int, isAp
req := cbapi.CallbackUserOnlineReq{ req := cbapi.CallbackUserOnlineReq{
UserStatusCallbackReq: cbapi.UserStatusCallbackReq{ UserStatusCallbackReq: cbapi.UserStatusCallbackReq{
UserStatusBaseCallback: cbapi.UserStatusBaseCallback{ UserStatusBaseCallback: cbapi.UserStatusBaseCallback{
CallbackCommand: constant.CallbackUserOnlineCommand, CallbackCommand: cbapi.CallbackUserOnlineCommand,
OperationID: mcontext.GetOperationID(ctx), OperationID: mcontext.GetOperationID(ctx),
PlatformID: platformID, PlatformID: platformID,
Platform: constant.PlatformIDToName(platformID), Platform: constant.PlatformIDToName(platformID),
@ -49,7 +49,10 @@ func CallbackUserOnline(ctx context.Context, userID string, platformID int, isAp
ConnID: connID, ConnID: connID,
} }
resp := cbapi.CommonCallbackResp{} resp := cbapi.CommonCallbackResp{}
return http.CallBackPostReturn(ctx, callBackURL(), &req, &resp, config.Config.Callback.CallbackUserOnline) if err := http.CallBackPostReturn(ctx, callBackURL(), &req, &resp, config.Config.Callback.CallbackUserOnline); err != nil {
return err
}
return nil
} }
func CallbackUserOffline(ctx context.Context, userID string, platformID int, connID string) error { func CallbackUserOffline(ctx context.Context, userID string, platformID int, connID string) error {
@ -59,7 +62,7 @@ func CallbackUserOffline(ctx context.Context, userID string, platformID int, con
req := &cbapi.CallbackUserOfflineReq{ req := &cbapi.CallbackUserOfflineReq{
UserStatusCallbackReq: cbapi.UserStatusCallbackReq{ UserStatusCallbackReq: cbapi.UserStatusCallbackReq{
UserStatusBaseCallback: cbapi.UserStatusBaseCallback{ UserStatusBaseCallback: cbapi.UserStatusBaseCallback{
CallbackCommand: constant.CallbackUserOfflineCommand, CallbackCommand: cbapi.CallbackUserOfflineCommand,
OperationID: mcontext.GetOperationID(ctx), OperationID: mcontext.GetOperationID(ctx),
PlatformID: platformID, PlatformID: platformID,
Platform: constant.PlatformIDToName(platformID), Platform: constant.PlatformIDToName(platformID),
@ -70,7 +73,10 @@ func CallbackUserOffline(ctx context.Context, userID string, platformID int, con
ConnID: connID, ConnID: connID,
} }
resp := &cbapi.CallbackUserOfflineResp{} resp := &cbapi.CallbackUserOfflineResp{}
return http.CallBackPostReturn(ctx, callBackURL(), req, resp, config.Config.Callback.CallbackUserOffline) if err := http.CallBackPostReturn(ctx, callBackURL(), req, resp, config.Config.Callback.CallbackUserOffline); err != nil {
return err
}
return nil
} }
func CallbackUserKickOff(ctx context.Context, userID string, platformID int) error { func CallbackUserKickOff(ctx context.Context, userID string, platformID int) error {
@ -80,7 +86,7 @@ func CallbackUserKickOff(ctx context.Context, userID string, platformID int) err
req := &cbapi.CallbackUserKickOffReq{ req := &cbapi.CallbackUserKickOffReq{
UserStatusCallbackReq: cbapi.UserStatusCallbackReq{ UserStatusCallbackReq: cbapi.UserStatusCallbackReq{
UserStatusBaseCallback: cbapi.UserStatusBaseCallback{ UserStatusBaseCallback: cbapi.UserStatusBaseCallback{
CallbackCommand: constant.CallbackUserKickOffCommand, CallbackCommand: cbapi.CallbackUserKickOffCommand,
OperationID: mcontext.GetOperationID(ctx), OperationID: mcontext.GetOperationID(ctx),
PlatformID: platformID, PlatformID: platformID,
Platform: constant.PlatformIDToName(platformID), Platform: constant.PlatformIDToName(platformID),
@ -90,7 +96,10 @@ func CallbackUserKickOff(ctx context.Context, userID string, platformID int) err
Seq: time.Now().UnixMilli(), Seq: time.Now().UnixMilli(),
} }
resp := &cbapi.CommonCallbackResp{} resp := &cbapi.CommonCallbackResp{}
return http.CallBackPostReturn(ctx, callBackURL(), req, resp, config.Config.Callback.CallbackUserOffline) if err := http.CallBackPostReturn(ctx, callBackURL(), req, resp, config.Config.Callback.CallbackUserOffline); err != nil {
return err
}
return nil
} }
// func callbackUserOnline(operationID, userID string, platformID int, token string, isAppBackground bool, connID // func callbackUserOnline(operationID, userID string, platformID int, token string, isAppBackground bool, connID

@ -37,7 +37,9 @@ func RunWsAndServer(rpcPort, wsPort, prometheusPort int) error {
WithPort(wsPort), WithPort(wsPort),
WithMaxConnNum(int64(config.Config.LongConnSvr.WebsocketMaxConnNum)), WithMaxConnNum(int64(config.Config.LongConnSvr.WebsocketMaxConnNum)),
WithHandshakeTimeout(time.Duration(config.Config.LongConnSvr.WebsocketTimeout)*time.Second), WithHandshakeTimeout(time.Duration(config.Config.LongConnSvr.WebsocketTimeout)*time.Second),
WithMessageMaxMsgLength(config.Config.LongConnSvr.WebsocketMaxMsgLen)) WithMessageMaxMsgLength(config.Config.LongConnSvr.WebsocketMaxMsgLen),
WithWriteBufferSize(config.Config.LongConnSvr.WebsocketWriteBufferSize),
)
if err != nil { if err != nil {
return err return err
} }

@ -50,10 +50,11 @@ type GWebSocket struct {
protocolType int protocolType int
conn *websocket.Conn conn *websocket.Conn
handshakeTimeout time.Duration handshakeTimeout time.Duration
writeBufferSize int
} }
func newGWebSocket(protocolType int, handshakeTimeout time.Duration) *GWebSocket { func newGWebSocket(protocolType int, handshakeTimeout time.Duration, wbs int) *GWebSocket {
return &GWebSocket{protocolType: protocolType, handshakeTimeout: handshakeTimeout} return &GWebSocket{protocolType: protocolType, handshakeTimeout: handshakeTimeout, writeBufferSize: wbs}
} }
func (d *GWebSocket) Close() error { func (d *GWebSocket) Close() error {
@ -65,6 +66,10 @@ func (d *GWebSocket) GenerateLongConn(w http.ResponseWriter, r *http.Request) er
HandshakeTimeout: d.handshakeTimeout, HandshakeTimeout: d.handshakeTimeout,
CheckOrigin: func(r *http.Request) bool { return true }, CheckOrigin: func(r *http.Request) bool { return true },
} }
if d.writeBufferSize > 0 { // default is 4kb.
upgrader.WriteBufferSize = d.writeBufferSize
}
conn, err := upgrader.Upgrade(w, r, nil) conn, err := upgrader.Upgrade(w, r, nil)
if err != nil { if err != nil {
return err return err

@ -68,6 +68,7 @@ type WsServer struct {
onlineUserNum atomic.Int64 onlineUserNum atomic.Int64
onlineUserConnNum atomic.Int64 onlineUserConnNum atomic.Int64
handshakeTimeout time.Duration handshakeTimeout time.Duration
writeBufferSize int
validate *validator.Validate validate *validator.Validate
cache cache.MsgModel cache cache.MsgModel
userClient *rpcclient.UserRpcClient userClient *rpcclient.UserRpcClient
@ -138,6 +139,7 @@ func NewWsServer(opts ...Option) (*WsServer, error) {
return &WsServer{ return &WsServer{
port: config.port, port: config.port,
wsMaxConnNum: config.maxConnNum, wsMaxConnNum: config.maxConnNum,
writeBufferSize: config.writeBufferSize,
handshakeTimeout: config.handshakeTimeout, handshakeTimeout: config.handshakeTimeout,
clientPool: sync.Pool{ clientPool: sync.Pool{
New: func() interface{} { New: func() interface{} {
@ -431,7 +433,8 @@ func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {
httpError(connContext, errs.ErrTokenNotExist.Wrap()) httpError(connContext, errs.ErrTokenNotExist.Wrap())
return return
} }
wsLongConn := newGWebSocket(WebSocket, ws.handshakeTimeout)
wsLongConn := newGWebSocket(WebSocket, ws.handshakeTimeout, ws.writeBufferSize)
err = wsLongConn.GenerateLongConn(w, r) err = wsLongConn.GenerateLongConn(w, r)
if err != nil { if err != nil {
httpError(connContext, err) httpError(connContext, err)

@ -27,6 +27,8 @@ type (
handshakeTimeout time.Duration handshakeTimeout time.Duration
// 允许消息最大长度 // 允许消息最大长度
messageMaxMsgLength int messageMaxMsgLength int
// websocket write buffer, default: 4096, 4kb.
writeBufferSize int
} }
) )
@ -53,3 +55,9 @@ func WithMessageMaxMsgLength(length int) Option {
opt.messageMaxMsgLength = length opt.messageMaxMsgLength = length
} }
} }
func WithWriteBufferSize(size int) Option {
return func(opt *configs) {
opt.writeBufferSize = size
}
}

@ -19,7 +19,6 @@ import (
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/sdkws" "github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/mcontext" "github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/utils" "github.com/OpenIMSDK/tools/utils"
@ -44,7 +43,7 @@ func callbackOfflinePush(
req := &callbackstruct.CallbackBeforePushReq{ req := &callbackstruct.CallbackBeforePushReq{
UserStatusBatchCallbackReq: callbackstruct.UserStatusBatchCallbackReq{ UserStatusBatchCallbackReq: callbackstruct.UserStatusBatchCallbackReq{
UserStatusBaseCallback: callbackstruct.UserStatusBaseCallback{ UserStatusBaseCallback: callbackstruct.UserStatusBaseCallback{
CallbackCommand: constant.CallbackOfflinePushCommand, CallbackCommand: callbackstruct.CallbackOfflinePushCommand,
OperationID: mcontext.GetOperationID(ctx), OperationID: mcontext.GetOperationID(ctx),
PlatformID: int(msg.SenderPlatformID), PlatformID: int(msg.SenderPlatformID),
Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)), Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)),
@ -62,9 +61,6 @@ func callbackOfflinePush(
} }
resp := &callbackstruct.CallbackBeforePushResp{} resp := &callbackstruct.CallbackBeforePushResp{}
if err := http.CallBackPostReturn(ctx, url(), req, resp, config.Config.Callback.CallbackOfflinePush); err != nil { if err := http.CallBackPostReturn(ctx, url(), req, resp, config.Config.Callback.CallbackOfflinePush); err != nil {
if err == errs.ErrCallbackContinue {
return nil
}
return err return err
} }
if len(resp.UserIDs) != 0 { if len(resp.UserIDs) != 0 {
@ -83,7 +79,7 @@ func callbackOnlinePush(ctx context.Context, userIDs []string, msg *sdkws.MsgDat
req := callbackstruct.CallbackBeforePushReq{ req := callbackstruct.CallbackBeforePushReq{
UserStatusBatchCallbackReq: callbackstruct.UserStatusBatchCallbackReq{ UserStatusBatchCallbackReq: callbackstruct.UserStatusBatchCallbackReq{
UserStatusBaseCallback: callbackstruct.UserStatusBaseCallback{ UserStatusBaseCallback: callbackstruct.UserStatusBaseCallback{
CallbackCommand: constant.CallbackOnlinePushCommand, CallbackCommand: callbackstruct.CallbackOnlinePushCommand,
OperationID: mcontext.GetOperationID(ctx), OperationID: mcontext.GetOperationID(ctx),
PlatformID: int(msg.SenderPlatformID), PlatformID: int(msg.SenderPlatformID),
Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)), Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)),
@ -99,7 +95,10 @@ func callbackOnlinePush(ctx context.Context, userIDs []string, msg *sdkws.MsgDat
Content: GetContent(msg), Content: GetContent(msg),
} }
resp := &callbackstruct.CallbackBeforePushResp{} resp := &callbackstruct.CallbackBeforePushResp{}
return http.CallBackPostReturn(ctx, url(), req, resp, config.Config.Callback.CallbackOnlinePush) if err := http.CallBackPostReturn(ctx, url(), req, resp, config.Config.Callback.CallbackOnlinePush); err != nil {
return err
}
return nil
} }
func callbackBeforeSuperGroupOnlinePush( func callbackBeforeSuperGroupOnlinePush(
@ -113,7 +112,7 @@ func callbackBeforeSuperGroupOnlinePush(
} }
req := callbackstruct.CallbackBeforeSuperGroupOnlinePushReq{ req := callbackstruct.CallbackBeforeSuperGroupOnlinePushReq{
UserStatusBaseCallback: callbackstruct.UserStatusBaseCallback{ UserStatusBaseCallback: callbackstruct.UserStatusBaseCallback{
CallbackCommand: constant.CallbackSuperGroupOnlinePushCommand, CallbackCommand: callbackstruct.CallbackSuperGroupOnlinePushCommand,
OperationID: mcontext.GetOperationID(ctx), OperationID: mcontext.GetOperationID(ctx),
PlatformID: int(msg.SenderPlatformID), PlatformID: int(msg.SenderPlatformID),
Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)), Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)),
@ -129,11 +128,9 @@ func callbackBeforeSuperGroupOnlinePush(
} }
resp := &callbackstruct.CallbackBeforeSuperGroupOnlinePushResp{} resp := &callbackstruct.CallbackBeforeSuperGroupOnlinePushResp{}
if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackBeforeSuperGroupOnlinePush); err != nil { if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackBeforeSuperGroupOnlinePush); err != nil {
if err == errs.ErrCallbackContinue {
return nil
}
return err return err
} }
return nil
if len(resp.UserIDs) != 0 { if len(resp.UserIDs) != 0 {
*pushToUserIDs = resp.UserIDs *pushToUserIDs = resp.UserIDs
} }

@ -16,7 +16,7 @@ package friend
import ( import (
"context" "context"
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/tools/utils"
pbfriend "github.com/OpenIMSDK/protocol/friend" pbfriend "github.com/OpenIMSDK/protocol/friend"
"github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/utils" "github.com/OpenIMSDK/tools/utils"
@ -30,16 +30,48 @@ func CallbackBeforeAddFriend(ctx context.Context, req *pbfriend.ApplyToAddFriend
return nil return nil
} }
cbReq := &cbapi.CallbackBeforeAddFriendReq{ cbReq := &cbapi.CallbackBeforeAddFriendReq{
CallbackCommand: constant.CallbackBeforeAddFriendCommand, CallbackCommand: cbapi.CallbackBeforeAddFriendCommand,
FromUserID: req.FromUserID, FromUserID: req.FromUserID,
ToUserID: req.ToUserID, ToUserID: req.ToUserID,
ReqMsg: req.ReqMsg, ReqMsg: req.ReqMsg,
} }
resp := &cbapi.CallbackBeforeAddFriendResp{} resp := &cbapi.CallbackBeforeAddFriendResp{}
if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeAddFriend); err != nil { if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeAddFriend); err != nil {
if err == errs.ErrCallbackContinue { return err
}
return nil
}
func CallbackBeforeSetFriendRemark(ctx context.Context, req *pbfriend.SetFriendRemarkReq) error {
if !config.Config.Callback.CallbackBeforeSetFriendRemark.Enable {
return nil
}
cbReq := &cbapi.CallbackBeforeSetFriendRemarkReq{
CallbackCommand: cbapi.CallbackBeforeSetFriendRemark,
OwnerUserID: req.OwnerUserID,
FriendUserID: req.FriendUserID,
Remark: req.Remark,
}
resp := &cbapi.CallbackBeforeSetFriendRemarkResp{}
if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeAddFriend); err != nil {
return err
}
utils.NotNilReplace(&req.Remark, &resp.Remark)
return nil
}
func CallbackAfterSetFriendRemark(ctx context.Context, req *pbfriend.SetFriendRemarkReq) error {
if !config.Config.Callback.CallbackAfterSetFriendRemark.Enable {
return nil return nil
} }
cbReq := &cbapi.CallbackAfterSetFriendRemarkReq{
CallbackCommand: cbapi.CallbackAfterSetFriendRemark,
OwnerUserID: req.OwnerUserID,
FriendUserID: req.FriendUserID,
Remark: req.Remark,
}
resp := &cbapi.CallbackAfterSetFriendRemarkResp{}
if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeAddFriend); err != nil {
return err return err
} }
return nil return nil

@ -233,6 +233,10 @@ func (s *friendServer) SetFriendRemark(
req *pbfriend.SetFriendRemarkReq, req *pbfriend.SetFriendRemarkReq,
) (resp *pbfriend.SetFriendRemarkResp, err error) { ) (resp *pbfriend.SetFriendRemarkResp, err error) {
defer log.ZInfo(ctx, utils.GetFuncName()+" Return") defer log.ZInfo(ctx, utils.GetFuncName()+" Return")
if err = CallbackBeforeSetFriendRemark(ctx, req); err != nil && err != errs.ErrCallbackContinue {
return nil, err
}
resp = &pbfriend.SetFriendRemarkResp{} resp = &pbfriend.SetFriendRemarkResp{}
if err := s.userRpcClient.Access(ctx, req.OwnerUserID); err != nil { if err := s.userRpcClient.Access(ctx, req.OwnerUserID); err != nil {
return nil, err return nil, err
@ -244,6 +248,9 @@ func (s *friendServer) SetFriendRemark(
if err := s.friendDatabase.UpdateRemark(ctx, req.OwnerUserID, req.FriendUserID, req.Remark); err != nil { if err := s.friendDatabase.UpdateRemark(ctx, req.OwnerUserID, req.FriendUserID, req.Remark); err != nil {
return nil, err return nil, err
} }
if err := CallbackAfterSetFriendRemark(ctx, req); err != nil && err != errs.ErrCallbackContinue {
return nil, err
}
s.notificationSender.FriendRemarkSetNotification(ctx, req.OwnerUserID, req.FriendUserID) s.notificationSender.FriendRemarkSetNotification(ctx, req.OwnerUserID, req.FriendUserID)
return resp, nil return resp, nil
} }

@ -22,10 +22,11 @@ import (
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/group" "github.com/OpenIMSDK/protocol/group"
"github.com/OpenIMSDK/protocol/wrapperspb" "github.com/OpenIMSDK/protocol/wrapperspb"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/mcontext" "github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/utils" "github.com/OpenIMSDK/tools/utils"
pbgroup "github.com/OpenIMSDK/protocol/group"
"github.com/openimsdk/open-im-server/v3/pkg/apistruct" "github.com/openimsdk/open-im-server/v3/pkg/apistruct"
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
@ -38,7 +39,7 @@ func CallbackBeforeCreateGroup(ctx context.Context, req *group.CreateGroupReq) (
return nil return nil
} }
cbReq := &callbackstruct.CallbackBeforeCreateGroupReq{ cbReq := &callbackstruct.CallbackBeforeCreateGroupReq{
CallbackCommand: constant.CallbackBeforeCreateGroupCommand, CallbackCommand: callbackstruct.CallbackBeforeCreateGroupCommand,
OperationID: mcontext.GetOperationID(ctx), OperationID: mcontext.GetOperationID(ctx),
GroupInfo: req.GroupInfo, GroupInfo: req.GroupInfo,
} }
@ -59,17 +60,7 @@ func CallbackBeforeCreateGroup(ctx context.Context, req *group.CreateGroupReq) (
}) })
} }
resp := &callbackstruct.CallbackBeforeCreateGroupResp{} resp := &callbackstruct.CallbackBeforeCreateGroupResp{}
err = http.CallBackPostReturn( if err = http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeCreateGroup); err != nil {
ctx,
config.Config.Callback.CallbackUrl,
cbReq,
resp,
config.Config.Callback.CallbackBeforeCreateGroup,
)
if err != nil {
if err == errs.ErrCallbackContinue {
return nil
}
return err return err
} }
utils.NotNilReplace(&req.GroupInfo.GroupID, resp.GroupID) utils.NotNilReplace(&req.GroupInfo.GroupID, resp.GroupID)
@ -87,6 +78,37 @@ func CallbackBeforeCreateGroup(ctx context.Context, req *group.CreateGroupReq) (
return nil return nil
} }
func CallbackAfterCreateGroup(ctx context.Context, req *group.CreateGroupReq) (err error) {
if !config.Config.Callback.CallbackAfterCreateGroup.Enable {
return nil
}
cbReq := &callbackstruct.CallbackAfterCreateGroupReq{
CallbackCommand: callbackstruct.CallbackAfterCreateGroupCommand,
GroupInfo: req.GroupInfo,
}
cbReq.InitMemberList = append(cbReq.InitMemberList, &apistruct.GroupAddMemberInfo{
UserID: req.OwnerUserID,
RoleLevel: constant.GroupOwner,
})
for _, userID := range req.AdminUserIDs {
cbReq.InitMemberList = append(cbReq.InitMemberList, &apistruct.GroupAddMemberInfo{
UserID: userID,
RoleLevel: constant.GroupAdmin,
})
}
for _, userID := range req.MemberUserIDs {
cbReq.InitMemberList = append(cbReq.InitMemberList, &apistruct.GroupAddMemberInfo{
UserID: userID,
RoleLevel: constant.GroupOrdinaryUsers,
})
}
resp := &callbackstruct.CallbackAfterCreateGroupResp{}
if err = http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackAfterCreateGroup); err != nil {
return err
}
return nil
}
func CallbackBeforeMemberJoinGroup( func CallbackBeforeMemberJoinGroup(
ctx context.Context, ctx context.Context,
groupMember *relation.GroupMemberModel, groupMember *relation.GroupMemberModel,
@ -96,8 +118,7 @@ func CallbackBeforeMemberJoinGroup(
return nil return nil
} }
callbackReq := &callbackstruct.CallbackBeforeMemberJoinGroupReq{ callbackReq := &callbackstruct.CallbackBeforeMemberJoinGroupReq{
CallbackCommand: constant.CallbackBeforeMemberJoinGroupCommand, CallbackCommand: callbackstruct.CallbackBeforeMemberJoinGroupCommand,
OperationID: mcontext.GetOperationID(ctx),
GroupID: groupMember.GroupID, GroupID: groupMember.GroupID,
UserID: groupMember.UserID, UserID: groupMember.UserID,
Ex: groupMember.Ex, Ex: groupMember.Ex,
@ -129,8 +150,7 @@ func CallbackBeforeSetGroupMemberInfo(ctx context.Context, req *group.SetGroupMe
return nil return nil
} }
callbackReq := callbackstruct.CallbackBeforeSetGroupMemberInfoReq{ callbackReq := callbackstruct.CallbackBeforeSetGroupMemberInfoReq{
CallbackCommand: constant.CallbackBeforeSetGroupMemberInfoCommand, CallbackCommand: callbackstruct.CallbackBeforeSetGroupMemberInfoCommand,
OperationID: mcontext.GetOperationID(ctx),
GroupID: req.GroupID, GroupID: req.GroupID,
UserID: req.UserID, UserID: req.UserID,
} }
@ -155,6 +175,9 @@ func CallbackBeforeSetGroupMemberInfo(ctx context.Context, req *group.SetGroupMe
config.Config.Callback.CallbackBeforeSetGroupMemberInfo, config.Config.Callback.CallbackBeforeSetGroupMemberInfo,
) )
if err != nil { if err != nil {
if err == errs.ErrCallbackContinue {
return nil
}
return err return err
} }
if resp.FaceURL != nil { if resp.FaceURL != nil {
@ -171,145 +194,3 @@ func CallbackBeforeSetGroupMemberInfo(ctx context.Context, req *group.SetGroupMe
} }
return nil return nil
} }
func CallbackBeforeInviteUserToGroup(ctx context.Context, req *group.InviteUserToGroupReq) (err error) {
if !config.Config.Callback.CallbackBeforeInviteUserToGroup.Enable {
return nil
}
callbackReq := &callbackstruct.CallbackBeforeInviteUserToGroupReq{
CallbackCommand: callbackstruct.CallbackBeforeInviteJoinGroupCommand,
OperationID: mcontext.GetOperationID(ctx),
GroupID: req.GroupID,
Reason: req.Reason,
InvitedUserIDs: req.InvitedUserIDs,
}
resp := &callbackstruct.CallbackBeforeInviteUserToGroupResp{}
err = http.CallBackPostReturn(
ctx,
config.Config.Callback.CallbackUrl,
callbackReq,
resp,
config.Config.Callback.CallbackBeforeInviteUserToGroup,
)
if err != nil {
return err
}
if len(resp.RefusedMembersAccount) > 0 {
// Handle the scenario where certain members are refused
// You might want to update the req.Members list or handle it as per your business logic
}
utils.StructFieldNotNilReplace(req, resp)
return nil
}
func CallbackAfterJoinGroup(ctx context.Context, req *group.JoinGroupReq) error {
if !config.Config.Callback.CallbackAfterJoinGroup.Enable {
return nil
}
callbackReq := &callbackstruct.CallbackAfterJoinGroupReq{
CallbackCommand: callbackstruct.CallbackAfterJoinGroupCommand,
OperationID: mcontext.GetOperationID(ctx),
GroupID: req.GroupID,
ReqMessage: req.ReqMessage,
JoinSource: req.JoinSource,
InviterUserID: req.InviterUserID,
}
resp := &callbackstruct.CallbackAfterJoinGroupResp{}
if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackAfterJoinGroup); err != nil {
if err == errs.ErrCallbackContinue {
return nil
}
return err
}
return nil
}
func CallbackBeforeSetGroupInfo(ctx context.Context, req *group.SetGroupInfoReq) error {
if !config.Config.Callback.CallbackBeforeSetGroupInfo.Enable {
return nil
}
callbackReq := &callbackstruct.CallbackBeforeSetGroupInfoReq{
CallbackCommand: callbackstruct.CallbackBeforeSetGroupInfoCommand,
GroupID: req.GroupInfoForSet.GroupID,
Notification: req.GroupInfoForSet.Notification,
Introduction: req.GroupInfoForSet.Introduction,
FaceURL: req.GroupInfoForSet.FaceURL,
GroupName: req.GroupInfoForSet.GroupName,
}
if req.GroupInfoForSet.Ex != nil {
callbackReq.Ex = req.GroupInfoForSet.Ex.Value
}
log.ZDebug(ctx, "debug CallbackBeforeSetGroupInfo", callbackReq.Ex)
if req.GroupInfoForSet.NeedVerification != nil {
callbackReq.NeedVerification = req.GroupInfoForSet.NeedVerification.Value
}
if req.GroupInfoForSet.LookMemberInfo != nil {
callbackReq.LookMemberInfo = req.GroupInfoForSet.LookMemberInfo.Value
}
if req.GroupInfoForSet.ApplyMemberFriend != nil {
callbackReq.ApplyMemberFriend = req.GroupInfoForSet.ApplyMemberFriend.Value
}
resp := &callbackstruct.CallbackBeforeSetGroupInfoResp{}
if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackBeforeSetGroupInfo); err != nil {
if err == errs.ErrCallbackContinue {
return nil
}
return err
}
if resp.Ex != nil {
req.GroupInfoForSet.Ex = wrapperspb.String(*resp.Ex)
}
if resp.NeedVerification != nil {
req.GroupInfoForSet.NeedVerification = wrapperspb.Int32(*resp.NeedVerification)
}
if resp.LookMemberInfo != nil {
req.GroupInfoForSet.LookMemberInfo = wrapperspb.Int32(*resp.LookMemberInfo)
}
if resp.ApplyMemberFriend != nil {
req.GroupInfoForSet.ApplyMemberFriend = wrapperspb.Int32(*resp.ApplyMemberFriend)
}
utils.StructFieldNotNilReplace(req, resp)
return nil
}
func CallbackAfterSetGroupInfo(ctx context.Context, req *group.SetGroupInfoReq) error {
if !config.Config.Callback.CallbackAfterSetGroupInfo.Enable {
return nil
}
callbackReq := &callbackstruct.CallbackAfterSetGroupInfoReq{
CallbackCommand: callbackstruct.CallbackAfterSetGroupInfoCommand,
GroupID: req.GroupInfoForSet.GroupID,
Notification: req.GroupInfoForSet.Notification,
Introduction: req.GroupInfoForSet.Introduction,
FaceURL: req.GroupInfoForSet.FaceURL,
GroupName: req.GroupInfoForSet.GroupName,
}
if req.GroupInfoForSet.Ex != nil {
callbackReq.Ex = &req.GroupInfoForSet.Ex.Value
}
if req.GroupInfoForSet.NeedVerification != nil {
callbackReq.NeedVerification = &req.GroupInfoForSet.NeedVerification.Value
}
if req.GroupInfoForSet.LookMemberInfo != nil {
callbackReq.LookMemberInfo = &req.GroupInfoForSet.LookMemberInfo.Value
}
if req.GroupInfoForSet.ApplyMemberFriend != nil {
callbackReq.ApplyMemberFriend = &req.GroupInfoForSet.ApplyMemberFriend.Value
}
resp := &callbackstruct.CallbackAfterSetGroupInfoResp{}
if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackAfterSetGroupInfo); err != nil {
if err == errs.ErrCallbackContinue {
return nil
}
return err
}
utils.StructFieldNotNilReplace(req, resp)
return nil
}

@ -26,6 +26,8 @@ import (
"strings" "strings"
"time" "time"
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
@ -225,6 +227,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR
if len(userMap) != len(userIDs) { if len(userMap) != len(userIDs) {
return nil, errs.ErrUserIDNotFound.Wrap("user not found") return nil, errs.ErrUserIDNotFound.Wrap("user not found")
} }
// Callback Before create Group
if err := CallbackBeforeCreateGroup(ctx, req); err != nil { if err := CallbackBeforeCreateGroup(ctx, req); err != nil {
return nil, err return nil, err
} }
@ -298,6 +301,17 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR
} }
s.Notification.GroupCreatedNotification(ctx, tips) s.Notification.GroupCreatedNotification(ctx, tips)
} }
reqCallBackAfter := &pbgroup.CreateGroupReq{
MemberUserIDs: userIDs,
GroupInfo: resp.GroupInfo,
OwnerUserID: req.OwnerUserID,
AdminUserIDs: req.AdminUserIDs,
}
if err := CallbackAfterCreateGroup(ctx, reqCallBackAfter); err != nil {
return nil, err
}
return resp, nil return resp, nil
} }
@ -613,6 +627,10 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbgroup.KickGrou
if err := s.deleteMemberAndSetConversationSeq(ctx, req.GroupID, req.KickedUserIDs); err != nil { if err := s.deleteMemberAndSetConversationSeq(ctx, req.GroupID, req.KickedUserIDs); err != nil {
return nil, err return nil, err
} }
if err := CallbackKillGroupMember(ctx, req); err != nil {
return nil, err
}
return resp, nil return resp, nil
} }
@ -824,6 +842,17 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq)
if group.Status == constant.GroupStatusDismissed { if group.Status == constant.GroupStatusDismissed {
return nil, errs.ErrDismissedAlready.Wrap() return nil, errs.ErrDismissedAlready.Wrap()
} }
reqCall := &callbackstruct.CallbackJoinGroupReq{
GroupID: req.GroupID,
GroupType: string(group.GroupType),
ApplyID: req.InviterUserID,
ReqMessage: req.ReqMessage,
}
if err = CallbackApplyJoinGroupBefore(ctx, reqCall); err != nil {
return nil, err
}
_, err = s.GroupDatabase.TakeGroupMember(ctx, req.GroupID, req.InviterUserID) _, err = s.GroupDatabase.TakeGroupMember(ctx, req.GroupID, req.InviterUserID)
if err == nil { if err == nil {
return nil, errs.ErrArgs.Wrap("already in group") return nil, errs.ErrArgs.Wrap("already in group")
@ -912,6 +941,10 @@ func (s *groupServer) QuitGroup(ctx context.Context, req *pbgroup.QuitGroupReq)
return nil, err return nil, err
} }
// callback
if err := CallbackQuitGroup(ctx, req); err != nil {
return nil, err
}
return resp, nil return resp, nil
} }
@ -1049,6 +1082,10 @@ func (s *groupServer) TransferGroupOwner(ctx context.Context, req *pbgroup.Trans
if err := s.GroupDatabase.TransferGroupOwner(ctx, req.GroupID, req.OldOwnerUserID, req.NewOwnerUserID, newOwner.RoleLevel); err != nil { if err := s.GroupDatabase.TransferGroupOwner(ctx, req.GroupID, req.OldOwnerUserID, req.NewOwnerUserID, newOwner.RoleLevel); err != nil {
return nil, err return nil, err
} }
if err := CallbackTransferGroupOwnerAfter(ctx, req); err != nil {
return nil, err
}
s.Notification.GroupOwnerTransferredNotification(ctx, req) s.Notification.GroupOwnerTransferredNotification(ctx, req)
return resp, nil return resp, nil
} }
@ -1219,6 +1256,20 @@ func (s *groupServer) DismissGroup(ctx context.Context, req *pbgroup.DismissGrou
s.Notification.GroupDismissedNotification(ctx, tips) s.Notification.GroupDismissedNotification(ctx, tips)
} }
} }
membersID, err := s.GroupDatabase.FindGroupMemberUserID(ctx, group.GroupID)
if err != nil {
return nil, err
}
reqCall := &callbackstruct.CallbackDisMissGroupReq{
GroupID: req.GroupID,
OwnerID: owner.UserID,
MembersID: membersID,
GroupType: string(group.GroupType),
}
if err := CallbackDismissGroup(ctx, reqCall); err != nil {
return nil, err
}
return resp, nil return resp, nil
} }
@ -1457,6 +1508,12 @@ func (s *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbgroup.SetGr
} }
} }
} }
for i := 0; i < len(req.Members); i++ {
if err := CallbackAfterSetGroupMemberInfo(ctx, req.Members[i]); err != nil {
return nil, err
}
}
return resp, nil return resp, nil
} }

@ -16,10 +16,14 @@ package msg
import ( import (
"context" "context"
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
utils2 "github.com/OpenIMSDK/tools/utils"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/conversation"
"github.com/OpenIMSDK/protocol/msg" "github.com/OpenIMSDK/protocol/msg"
"github.com/OpenIMSDK/protocol/sdkws" "github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/errs"
@ -88,10 +92,7 @@ func (m *msgServer) SetConversationHasReadSeq(
return &msg.SetConversationHasReadSeqResp{}, nil return &msg.SetConversationHasReadSeqResp{}, nil
} }
func (m *msgServer) MarkMsgsAsRead( func (m *msgServer) MarkMsgsAsRead(ctx context.Context, req *msg.MarkMsgsAsReadReq) (resp *msg.MarkMsgsAsReadResp, err error) {
ctx context.Context,
req *msg.MarkMsgsAsReadReq,
) (resp *msg.MarkMsgsAsReadResp, err error) {
if len(req.Seqs) < 1 { if len(req.Seqs) < 1 {
return nil, errs.ErrArgs.Wrap("seqs must not be empty") return nil, errs.ErrArgs.Wrap("seqs must not be empty")
} }
@ -127,10 +128,7 @@ func (m *msgServer) MarkMsgsAsRead(
return &msg.MarkMsgsAsReadResp{}, nil return &msg.MarkMsgsAsReadResp{}, nil
} }
func (m *msgServer) MarkConversationAsRead( func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkConversationAsReadReq) (resp *msg.MarkConversationAsReadResp, err error) {
ctx context.Context,
req *msg.MarkConversationAsReadReq,
) (resp *msg.MarkConversationAsReadResp, err error) {
conversation, err := m.Conversation.GetConversation(ctx, req.UserID, req.ConversationID) conversation, err := m.Conversation.GetConversation(ctx, req.UserID, req.ConversationID)
if err != nil { if err != nil {
return nil, err return nil, err
@ -139,49 +137,56 @@ func (m *msgServer) MarkConversationAsRead(
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil && errs.Unwrap(err) != redis.Nil {
return nil, err return nil, err
} }
var seqs []int64 seqs := generateSeqs(hasReadSeq, req)
log.ZDebug(ctx, "MarkConversationAsRead", "hasReadSeq", hasReadSeq, if len(seqs) > 0 || req.HasReadSeq > hasReadSeq {
"req.HasReadSeq", req.HasReadSeq) err = m.updateReadStatus(ctx, req, conversation, seqs, hasReadSeq)
if conversation.ConversationType == constant.SingleChatType { if err != nil {
for i := hasReadSeq + 1; i <= req.HasReadSeq; i++ { return nil, err
seqs = append(seqs, i)
} }
}
return &msg.MarkConversationAsReadResp{}, nil
}
if len(seqs) > 0 { func generateSeqs(hasReadSeq int64, req *msg.MarkConversationAsReadReq) []int64 {
log.ZDebug(ctx, "MarkConversationAsRead", "seqs", seqs, "conversationID", req.ConversationID) var seqs []int64
if err = m.MsgDatabase.MarkSingleChatMsgsAsRead(ctx, req.UserID, req.ConversationID, seqs); err != nil { for _, val := range req.Seqs {
return nil, err if val > hasReadSeq && !utils2.Contain(val, seqs...) {
seqs = append(seqs, val)
} }
} }
if req.HasReadSeq > hasReadSeq { return seqs
err = m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq) }
if err != nil {
return nil, err func (m *msgServer) updateReadStatus(ctx context.Context, req *msg.MarkConversationAsReadReq, conversation *conversation.Conversation, seqs []int64, hasReadSeq int64) error {
if conversation.ConversationType == constant.SingleChatType && len(seqs) > 0 {
log.ZDebug(ctx, "MarkConversationAsRead", "seqs", seqs, "conversationID", req.ConversationID)
if err := m.MsgDatabase.MarkSingleChatMsgsAsRead(ctx, req.UserID, req.ConversationID, seqs); err != nil {
return err
} }
hasReadSeq = req.HasReadSeq
} }
if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID, reqCall := &cbapi.CallbackGroupMsgReadReq{
m.conversationAndGetRecvID(conversation, req.UserID), seqs, hasReadSeq); err != nil { SendID: conversation.OwnerUserID,
return nil, err ReceiveID: req.UserID,
UnreadMsgNum: req.HasReadSeq,
ContentType: int64(conversation.ConversationType),
} }
} else if conversation.ConversationType == constant.SuperGroupChatType || if err := CallbackGroupMsgRead(ctx, reqCall); err != nil {
conversation.ConversationType == constant.NotificationChatType { return err
if req.HasReadSeq > hasReadSeq {
err = m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq)
if err != nil {
return nil, err
} }
hasReadSeq = req.HasReadSeq
if req.HasReadSeq > hasReadSeq {
if err := m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq); err != nil {
return err
} }
if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID,
req.UserID, seqs, hasReadSeq); err != nil {
return nil, err
} }
recvID := m.conversationAndGetRecvID(conversation, req.UserID)
if conversation.ConversationType == constant.SuperGroupChatType || conversation.ConversationType == constant.NotificationChatType {
recvID = req.UserID
} }
return &msg.MarkConversationAsReadResp{}, nil return m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID, recvID, seqs, req.HasReadSeq)
} }
func (m *msgServer) sendMarkAsReadNotification( func (m *msgServer) sendMarkAsReadNotification(

@ -16,9 +16,11 @@ package msg
import ( import (
"context" "context"
"github.com/OpenIMSDK/protocol/sdkws"
"google.golang.org/protobuf/proto"
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
pbchat "github.com/OpenIMSDK/protocol/msg" pbchat "github.com/OpenIMSDK/protocol/msg"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext" "github.com/OpenIMSDK/tools/mcontext"
@ -73,14 +75,11 @@ func callbackBeforeSendSingleMsg(ctx context.Context, msg *pbchat.SendMsgReq) er
return nil return nil
} }
req := &cbapi.CallbackBeforeSendSingleMsgReq{ req := &cbapi.CallbackBeforeSendSingleMsgReq{
CommonCallbackReq: toCommonCallback(ctx, msg, constant.CallbackBeforeSendSingleMsgCommand), CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackBeforeSendSingleMsgCommand),
RecvID: msg.MsgData.RecvID, RecvID: msg.MsgData.RecvID,
} }
resp := &cbapi.CallbackBeforeSendSingleMsgResp{} resp := &cbapi.CallbackBeforeSendSingleMsgResp{}
if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackBeforeSendSingleMsg); err != nil { if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackBeforeSendSingleMsg); err != nil {
if err == errs.ErrCallbackContinue {
return nil
}
return err return err
} }
return nil return nil
@ -91,14 +90,11 @@ func callbackAfterSendSingleMsg(ctx context.Context, msg *pbchat.SendMsgReq) err
return nil return nil
} }
req := &cbapi.CallbackAfterSendSingleMsgReq{ req := &cbapi.CallbackAfterSendSingleMsgReq{
CommonCallbackReq: toCommonCallback(ctx, msg, constant.CallbackAfterSendSingleMsgCommand), CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand),
RecvID: msg.MsgData.RecvID, RecvID: msg.MsgData.RecvID,
} }
resp := &cbapi.CallbackAfterSendSingleMsgResp{} resp := &cbapi.CallbackAfterSendSingleMsgResp{}
if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackAfterSendSingleMsg); err != nil { if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackAfterSendSingleMsg); err != nil {
if err == errs.ErrCallbackContinue {
return nil
}
return err return err
} }
return nil return nil
@ -109,14 +105,11 @@ func callbackBeforeSendGroupMsg(ctx context.Context, msg *pbchat.SendMsgReq) err
return nil return nil
} }
req := &cbapi.CallbackAfterSendGroupMsgReq{ req := &cbapi.CallbackAfterSendGroupMsgReq{
CommonCallbackReq: toCommonCallback(ctx, msg, constant.CallbackBeforeSendGroupMsgCommand), CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackBeforeSendGroupMsgCommand),
GroupID: msg.MsgData.GroupID, GroupID: msg.MsgData.GroupID,
} }
resp := &cbapi.CallbackBeforeSendGroupMsgResp{} resp := &cbapi.CallbackBeforeSendGroupMsgResp{}
if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackBeforeSendGroupMsg); err != nil { if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackBeforeSendGroupMsg); err != nil {
if err == errs.ErrCallbackContinue {
return nil
}
return err return err
} }
return nil return nil
@ -127,14 +120,11 @@ func callbackAfterSendGroupMsg(ctx context.Context, msg *pbchat.SendMsgReq) erro
return nil return nil
} }
req := &cbapi.CallbackAfterSendGroupMsgReq{ req := &cbapi.CallbackAfterSendGroupMsgReq{
CommonCallbackReq: toCommonCallback(ctx, msg, constant.CallbackAfterSendGroupMsgCommand), CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand),
GroupID: msg.MsgData.GroupID, GroupID: msg.MsgData.GroupID,
} }
resp := &cbapi.CallbackAfterSendGroupMsgResp{} resp := &cbapi.CallbackAfterSendGroupMsgResp{}
if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg); err != nil { if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg); err != nil {
if err == errs.ErrCallbackContinue {
return nil
}
return err return err
} }
return nil return nil
@ -145,13 +135,10 @@ func callbackMsgModify(ctx context.Context, msg *pbchat.SendMsgReq) error {
return nil return nil
} }
req := &cbapi.CallbackMsgModifyCommandReq{ req := &cbapi.CallbackMsgModifyCommandReq{
CommonCallbackReq: toCommonCallback(ctx, msg, constant.CallbackMsgModifyCommand), CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackMsgModifyCommand),
} }
resp := &cbapi.CallbackMsgModifyCommandResp{} resp := &cbapi.CallbackMsgModifyCommandResp{}
if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackMsgModify); err != nil { if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackMsgModify); err != nil {
if err == errs.ErrCallbackContinue {
return nil
}
return err return err
} }
if resp.Content != nil { if resp.Content != nil {
@ -176,24 +163,3 @@ func callbackMsgModify(ctx context.Context, msg *pbchat.SendMsgReq) error {
log.ZDebug(ctx, "callbackMsgModify", "msg", msg.MsgData) log.ZDebug(ctx, "callbackMsgModify", "msg", msg.MsgData)
return nil return nil
} }
func CallbackAfterRevokeMsg(ctx context.Context, req *pbchat.RevokeMsgReq) error {
if !config.Config.Callback.CallbackAfterRevokeMsg.Enable {
return nil
}
callbackReq := &cbapi.CallbackAfterRevokeMsgReq{
CallbackCommand: cbapi.CallbackAfterRevokeMsgCommand,
ConversationID: req.ConversationID,
Seq: req.Seq,
UserID: req.UserID,
}
resp := &cbapi.CallbackAfterSendGroupMsgResp{}
if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackAfterSetGroupInfo); err != nil {
if err == errs.ErrCallbackContinue {
return nil
}
return err
}
utils.StructFieldNotNilReplace(req, resp)
return nil
}

@ -16,11 +16,7 @@ package user
import ( import (
"context" "context"
"github.com/OpenIMSDK/protocol/constant"
pbuser "github.com/OpenIMSDK/protocol/user" pbuser "github.com/OpenIMSDK/protocol/user"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/utils" "github.com/OpenIMSDK/tools/utils"
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
@ -33,17 +29,13 @@ func CallbackBeforeUpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserInf
return nil return nil
} }
cbReq := &cbapi.CallbackBeforeUpdateUserInfoReq{ cbReq := &cbapi.CallbackBeforeUpdateUserInfoReq{
CallbackCommand: constant.CallbackBeforeUpdateUserInfoCommand, CallbackCommand: cbapi.CallbackBeforeUpdateUserInfoCommand,
OperationID: mcontext.GetOperationID(ctx),
UserID: req.UserInfo.UserID, UserID: req.UserInfo.UserID,
FaceURL: &req.UserInfo.FaceURL, FaceURL: &req.UserInfo.FaceURL,
Nickname: &req.UserInfo.Nickname, Nickname: &req.UserInfo.Nickname,
} }
resp := &cbapi.CallbackBeforeUpdateUserInfoResp{} resp := &cbapi.CallbackBeforeUpdateUserInfoResp{}
if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeUpdateUserInfo); err != nil { if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeUpdateUserInfo); err != nil {
if err == errs.ErrCallbackContinue {
return nil
}
return err return err
} }
utils.NotNilReplace(&req.UserInfo.FaceURL, resp.FaceURL) utils.NotNilReplace(&req.UserInfo.FaceURL, resp.FaceURL)
@ -51,3 +43,57 @@ func CallbackBeforeUpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserInf
utils.NotNilReplace(&req.UserInfo.Nickname, resp.Nickname) utils.NotNilReplace(&req.UserInfo.Nickname, resp.Nickname)
return nil return nil
} }
func CallbackAfterUpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserInfoReq) error {
if !config.Config.Callback.CallbackAfterUpdateUserInfo.Enable {
return nil
}
cbReq := &cbapi.CallbackAfterUpdateUserInfoReq{
CallbackCommand: cbapi.CallbackAfterUpdateUserInfoCommand,
UserID: req.UserInfo.UserID,
FaceURL: req.UserInfo.FaceURL,
Nickname: req.UserInfo.Nickname,
}
resp := &cbapi.CallbackAfterUpdateUserInfoResp{}
if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeUpdateUserInfo); err != nil {
return err
}
return nil
}
func CallbackBeforeUserRegister(ctx context.Context, req *pbuser.UserRegisterReq) error {
if !config.Config.Callback.CallbackBeforeUserRegister.Enable {
return nil
}
cbReq := &cbapi.CallbackBeforeUserRegisterReq{
CallbackCommand: cbapi.CallbackBeforeUserRegisterCommand,
Secret: req.Secret,
Users: req.Users,
}
resp := &cbapi.CallbackBeforeUserRegisterResp{}
if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeUpdateUserInfo); err != nil {
return err
}
if len(resp.Users) != 0 {
req.Users = resp.Users
}
return nil
}
func CallbackAfterUserRegister(ctx context.Context, req *pbuser.UserRegisterReq) error {
if !config.Config.Callback.CallbackAfterUserRegister.Enable {
return nil
}
cbReq := &cbapi.CallbackAfterUserRegisterReq{
CallbackCommand: cbapi.CallbackAfterUserRegisterCommand,
Secret: req.Secret,
Users: req.Users,
}
resp := &cbapi.CallbackBeforeUserRegisterResp{}
if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeUpdateUserInfo); err != nil {
return err
}
return nil
}

@ -139,6 +139,9 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI
for _, friendID := range friends { for _, friendID := range friends {
s.friendNotificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, friendID) s.friendNotificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, friendID)
} }
if err := CallbackAfterUpdateUserInfo(ctx, req); err != nil {
return nil, err
}
if err := s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil { if err := s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil {
log.ZError(ctx, "NotificationUserInfoUpdate", err, "userID", req.UserInfo.UserID) log.ZError(ctx, "NotificationUserInfoUpdate", err, "userID", req.UserInfo.UserID)
} }
@ -230,6 +233,9 @@ func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterR
if exist { if exist {
return nil, errs.ErrRegisteredAlready.Wrap("userID registered already") return nil, errs.ErrRegisteredAlready.Wrap("userID registered already")
} }
if err := CallbackBeforeUserRegister(ctx, req); err != nil {
return nil, err
}
now := time.Now() now := time.Now()
users := make([]*tablerelation.UserModel, 0, len(req.Users)) users := make([]*tablerelation.UserModel, 0, len(req.Users))
for _, user := range req.Users { for _, user := range req.Users {
@ -246,6 +252,10 @@ func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterR
if err := s.Create(ctx, users); err != nil { if err := s.Create(ctx, users); err != nil {
return nil, err return nil, err
} }
if err := CallbackAfterUserRegister(ctx, req); err != nil {
return nil, err
}
return resp, nil return resp, nil
} }

@ -14,8 +14,10 @@
package callbackstruct package callbackstruct
import ( import "github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/errs"
const (
Next = 1
) )
type CommonCallbackReq struct { type CommonCallbackReq struct {
@ -51,15 +53,15 @@ type CallbackResp interface {
} }
type CommonCallbackResp struct { type CommonCallbackResp struct {
ActionCode int `json:"actionCode"` ActionCode int32 `json:"actionCode"`
ErrCode int32 `json:"errCode"` ErrCode int32 `json:"errCode"`
ErrMsg string `json:"errMsg"` ErrMsg string `json:"errMsg"`
ErrDlt string `json:"errDlt"` ErrDlt string `json:"errDlt"`
NextCode string `json:"nextCode"` NextCode int32 `json:"nextCode"`
} }
func (c CommonCallbackResp) Parse() error { func (c CommonCallbackResp) Parse() error {
if c.ActionCode != errs.NoError || c.ErrCode != errs.NoError { if c.ActionCode != errs.NoError || c.NextCode == Next {
return errs.NewCodeError(int(c.ErrCode), c.ErrMsg).WithDetail(c.ErrDlt) return errs.NewCodeError(int(c.ErrCode), c.ErrMsg).WithDetail(c.ErrDlt)
} }
return nil return nil

@ -14,3 +14,35 @@ const CallbackAfterDeleteFriendCommand = "CallbackAfterDeleteFriendCommand"
const CallbackBeforeImportFriendsCommand = "CallbackBeforeImportFriendsCommand" const CallbackBeforeImportFriendsCommand = "CallbackBeforeImportFriendsCommand"
const CallbackAfterImportFriendsCommand = "CallbackAfterImportFriendsCommand" const CallbackAfterImportFriendsCommand = "CallbackAfterImportFriendsCommand"
const CallbackAfterRemoveBlackCommand = "CallbackAfterRemoveBlackCommand" const CallbackAfterRemoveBlackCommand = "CallbackAfterRemoveBlackCommand"
const (
CallbackQuitGroupCommand = "callbackQuitGroupCommand"
CallbackKillGroupCommand = "callbackKillGroupCommand"
CallbackDisMissGroupCommand = "callbackDisMissGroupCommand"
CallbackBeforeJoinGroupCommand = "callbackBeforeJoinGroupCommand"
CallbackGroupMsgReadCommand = "callbackGroupMsgReadCommand"
CallbackMsgModifyCommand = "callbackMsgModifyCommand"
CallbackAfterUpdateUserInfoCommand = "callbackAfterUpdateUserInfoCommand"
CallbackBeforeUserRegisterCommand = "callbackBeforeUserRegisterCommand"
CallbackAfterUserRegisterCommand = "callbackAfterUserRegisterCommand"
CallbackTransferGroupOwnerAfter = "callbackTransferGroupOwnerAfter"
CallbackBeforeSetFriendRemark = "callbackBeforeSetFriendRemark"
CallbackAfterSetFriendRemark = "callbackAfterSetFriendRemark"
CallbackSingleMsgRead = "callbackSingleMsgRead"
CallbackBeforeSendSingleMsgCommand = "callbackBeforeSendSingleMsgCommand"
CallbackAfterSendSingleMsgCommand = "callbackAfterSendSingleMsgCommand"
CallbackBeforeSendGroupMsgCommand = "callbackBeforeSendGroupMsgCommand"
CallbackAfterSendGroupMsgCommand = "callbackAfterSendGroupMsgCommand"
CallbackUserOnlineCommand = "callbackUserOnlineCommand"
CallbackUserOfflineCommand = "callbackUserOfflineCommand"
CallbackUserKickOffCommand = "callbackUserKickOffCommand"
CallbackOfflinePushCommand = "callbackOfflinePushCommand"
CallbackOnlinePushCommand = "callbackOnlinePushCommand"
CallbackSuperGroupOnlinePushCommand = "callbackSuperGroupOnlinePushCommand"
CallbackBeforeAddFriendCommand = "callbackBeforeAddFriendCommand"
CallbackBeforeUpdateUserInfoCommand = "callbackBeforeUpdateUserInfoCommand"
CallbackBeforeCreateGroupCommand = "callbackBeforeCreateGroupCommand"
CallbackAfterCreateGroupCommand = "callbackAfterCreateGroupCommand"
CallbackBeforeMemberJoinGroupCommand = "callbackBeforeMemberJoinGroupCommand"
CallbackBeforeSetGroupMemberInfoCommand = "CallbackBeforeSetGroupMemberInfoCommand"
)

@ -24,6 +24,50 @@ type CallbackBeforeAddFriendReq struct {
type CallbackBeforeAddFriendResp struct { type CallbackBeforeAddFriendResp struct {
CommonCallbackResp CommonCallbackResp
} }
type CallBackAddFriendReplyBeforeReq struct {
CallbackCommand `json:"callbackCommand"`
FromUserID string `json:"fromUserID" `
ToUserID string `json:"toUserID"`
}
type CallBackAddFriendReplyBeforeResp struct {
CommonCallbackResp
}
type CallbackAfterAddFriendReq struct {
CallbackCommand `json:"callbackCommand"`
FromUserID string `json:"fromUserID" `
ToUserID string `json:"toUserID"`
ReqMsg string `json:"reqMsg"`
}
type CallbackAfterAddFriendResp struct {
CommonCallbackResp
}
type CallbackBeforeSetFriendRemarkReq struct {
CallbackCommand `json:"callbackCommand"`
OwnerUserID string `json:"ownerUserID"`
FriendUserID string `json:"friendUserID"`
Remark string `json:"remark"`
}
type CallbackBeforeSetFriendRemarkResp struct {
CommonCallbackResp
Remark string `json:"remark"`
}
type CallbackAfterSetFriendRemarkReq struct {
CallbackCommand `json:"callbackCommand"`
OwnerUserID string `json:"ownerUserID"`
FriendUserID string `json:"friendUserID"`
Remark string `json:"remark"`
}
type CallbackAfterSetFriendRemarkResp struct {
CommonCallbackResp
}
type CallbackAfterAddFriendReq struct { type CallbackAfterAddFriendReq struct {
CallbackCommand `json:"callbackCommand"` CallbackCommand `json:"callbackCommand"`
FromUserID string `json:"fromUserID" ` FromUserID string `json:"fromUserID" `

@ -50,9 +50,18 @@ type CallbackBeforeCreateGroupResp struct {
ApplyMemberFriend *int32 `json:"applyMemberFriend"` ApplyMemberFriend *int32 `json:"applyMemberFriend"`
} }
type CallbackAfterCreateGroupReq struct {
CallbackCommand `json:"callbackCommand"`
*common.GroupInfo
InitMemberList []*apistruct.GroupAddMemberInfo `json:"initMemberList"`
}
type CallbackAfterCreateGroupResp struct {
CommonCallbackResp
}
type CallbackBeforeMemberJoinGroupReq struct { type CallbackBeforeMemberJoinGroupReq struct {
CallbackCommand `json:"callbackCommand"` CallbackCommand `json:"callbackCommand"`
OperationID string `json:"operationID"`
GroupID string `json:"groupID"` GroupID string `json:"groupID"`
UserID string `json:"userID"` UserID string `json:"userID"`
Ex string `json:"ex"` Ex string `json:"ex"`
@ -70,7 +79,6 @@ type CallbackBeforeMemberJoinGroupResp struct {
type CallbackBeforeSetGroupMemberInfoReq struct { type CallbackBeforeSetGroupMemberInfoReq struct {
CallbackCommand `json:"callbackCommand"` CallbackCommand `json:"callbackCommand"`
OperationID string `json:"operationID"`
GroupID string `json:"groupID"` GroupID string `json:"groupID"`
UserID string `json:"userID"` UserID string `json:"userID"`
Nickname *string `json:"nickName"` Nickname *string `json:"nickName"`
@ -87,6 +95,126 @@ type CallbackBeforeSetGroupMemberInfoResp struct {
RoleLevel *int32 `json:"roleLevel"` RoleLevel *int32 `json:"roleLevel"`
} }
type CallbackAfterSetGroupMemberInfoReq struct {
CallbackCommand `json:"callbackCommand"`
GroupID string `json:"groupID"`
UserID string `json:"userID"`
Nickname *string `json:"nickName"`
FaceURL *string `json:"faceURL"`
RoleLevel *int32 `json:"roleLevel"`
Ex *string `json:"ex"`
}
type CallbackAfterSetGroupMemberInfoResp struct {
CommonCallbackResp
}
type CallbackAfterGroupMemberExitReq struct {
CallbackCommand `json:"callbackCommand"`
GroupID string `json:"groupID"`
UserID string `json:"userID"`
GroupType *int32 `json:"groupType"`
ExitType string `json:"exitType"`
}
type CallbackAfterGroupMemberExitResp struct {
CommonCallbackResp
}
type CallbackAfterUngroupReq struct {
CallbackCommand `json:"callbackCommand"`
GroupID string `json:"groupID"`
GroupType *int32 `json:"groupType"`
OwnerID string `json:"ownerID"`
MemberList []string `json:"memberList"`
}
type CallbackAfterUngroupResp struct {
CommonCallbackResp
}
type CallbackAfterSetGroupInfoReq struct {
CallbackCommand `json:"callbackCommand"`
GroupID string `json:"groupID"`
GroupType *int32 `json:"groupType"`
UserID string `json:"userID"`
Name string `json:"name"`
Notification string `json:"notification"`
GroupUrl string `json:"groupUrl"`
}
type CallbackAfterSetGroupInfoResp struct {
CommonCallbackResp
}
type CallbackAfterRevokeMsgReq struct {
CallbackCommand `json:"callbackCommand"`
GroupID string `json:"groupID"`
GroupType *int32 `json:"groupType"`
UserID string `json:"userID"`
Content string `json:"content"`
}
type CallbackAfterRevokeMsgResp struct {
CommonCallbackResp
}
type CallbackQuitGroupReq struct {
CallbackCommand `json:"callbackCommand"`
GroupID string `json:"groupID"`
UserID string `json:"userID"`
}
type CallbackQuitGroupResp struct {
CommonCallbackResp
}
type CallbackKillGroupMemberReq struct {
CallbackCommand `json:"callbackCommand"`
GroupID string `json:"groupID"`
KickedUserIDs []string `json:"kickedUserIDs"`
Reason string `json:"reason"`
}
type CallbackKillGroupMemberResp struct {
CommonCallbackResp
}
type CallbackDisMissGroupReq struct {
CallbackCommand `json:"callbackCommand"`
GroupID string `json:"groupID"`
OwnerID string `json:"ownerID"`
GroupType string `json:"groupType"`
MembersID []string `json:"membersID"`
}
type CallbackDisMissGroupResp struct {
CommonCallbackResp
}
type CallbackJoinGroupReq struct {
CallbackCommand `json:"callbackCommand"`
GroupID string `json:"groupID"`
GroupType string `json:"groupType"`
ApplyID string `json:"applyID"`
ReqMessage string `json:"reqMessage"`
}
type CallbackJoinGroupResp struct {
CommonCallbackResp
}
type CallbackTransferGroupOwnerReq struct {
CallbackCommand `json:"callbackCommand"`
GroupID string `json:"groupID"`
OldOwnerUserID string `json:"oldOwnerUserID"`
NewOwnerUserID string `json:"newOwnerUserID"`
}
type CallbackTransferGroupOwnerResp struct {
CommonCallbackResp
}
type CallbackBeforeInviteUserToGroupReq struct { type CallbackBeforeInviteUserToGroupReq struct {
CallbackCommand `json:"callbackCommand"` CallbackCommand `json:"callbackCommand"`
OperationID string `json:"operationID"` OperationID string `json:"operationID"`

@ -79,3 +79,46 @@ type CallbackMsgModifyCommandResp struct {
AttachedInfo *string `json:"attachedInfo"` AttachedInfo *string `json:"attachedInfo"`
Ex *string `json:"ex"` Ex *string `json:"ex"`
} }
type CallbackSendGroupMsgErrorReq struct {
CommonCallbackReq
GroupID string `json:"groupID"`
}
type CallbackSendGroupMsgErrorResp struct {
CommonCallbackResp
}
type CallbackSingleMsgRevokeReq struct {
CallbackCommand `json:"callbackCommand"`
SendID string `json:"sendID"`
ReceiveID string `json:"receiveID"`
Content string `json:"content"`
}
type CallbackSingleMsgRevokeResp struct {
CommonCallbackResp
}
type CallbackGroupMsgReadReq struct {
CallbackCommand `json:"callbackCommand"`
SendID string `json:"sendID"`
ReceiveID string `json:"receiveID"`
UnreadMsgNum int64 `json:"unreadMsgNum"`
ContentType int64 `json:"contentType"`
}
type CallbackGroupMsgReadResp struct {
CommonCallbackResp
}
type CallbackSingleMsgReadReq struct {
CallbackCommand `json:"callbackCommand"`
SendID string `json:"sendID"`
ReceiveID string `json:"receiveID"`
ContentType int64 `json:"contentType"`
}
type CallbackSingleMsgReadResp struct {
CommonCallbackResp
}

@ -14,9 +14,10 @@
package callbackstruct package callbackstruct
import "github.com/OpenIMSDK/protocol/sdkws"
type CallbackBeforeUpdateUserInfoReq struct { type CallbackBeforeUpdateUserInfoReq struct {
CallbackCommand `json:"callbackCommand"` CallbackCommand `json:"callbackCommand"`
OperationID string `json:"operationID"`
UserID string `json:"userID"` UserID string `json:"userID"`
Nickname *string `json:"nickName"` Nickname *string `json:"nickName"`
FaceURL *string `json:"faceURL"` FaceURL *string `json:"faceURL"`
@ -28,3 +29,35 @@ type CallbackBeforeUpdateUserInfoResp struct {
FaceURL *string `json:"faceURL"` FaceURL *string `json:"faceURL"`
Ex *string `json:"ex"` Ex *string `json:"ex"`
} }
type CallbackAfterUpdateUserInfoReq struct {
CallbackCommand `json:"callbackCommand"`
UserID string `json:"userID"`
Nickname string `json:"nickName"`
FaceURL string `json:"faceURL"`
Ex string `json:"ex"`
}
type CallbackAfterUpdateUserInfoResp struct {
CommonCallbackResp
}
type CallbackBeforeUserRegisterReq struct {
CallbackCommand `json:"callbackCommand"`
Secret string `json:"secret"`
Users []*sdkws.UserInfo `json:"users"`
}
type CallbackBeforeUserRegisterResp struct {
CommonCallbackResp
Users []*sdkws.UserInfo `json:"users"`
}
type CallbackAfterUserRegisterReq struct {
CallbackCommand `json:"callbackCommand"`
Secret string `json:"secret"`
Users []*sdkws.UserInfo `json:"users"`
}
type CallbackAfterUserRegisterResp struct {
CommonCallbackResp
}

@ -195,6 +195,7 @@ type configStruct struct {
WebsocketMaxConnNum int `yaml:"websocketMaxConnNum"` WebsocketMaxConnNum int `yaml:"websocketMaxConnNum"`
WebsocketMaxMsgLen int `yaml:"websocketMaxMsgLen"` WebsocketMaxMsgLen int `yaml:"websocketMaxMsgLen"`
WebsocketTimeout int `yaml:"websocketTimeout"` WebsocketTimeout int `yaml:"websocketTimeout"`
WebsocketWriteBufferSize int `yaml:"websocketWriteBufferSize"`
} `yaml:"longConnSvr"` } `yaml:"longConnSvr"`
Push struct { Push struct {
@ -252,6 +253,8 @@ type configStruct struct {
CallbackBeforeSendGroupMsg CallBackConfig `yaml:"beforeSendGroupMsg"` CallbackBeforeSendGroupMsg CallBackConfig `yaml:"beforeSendGroupMsg"`
CallbackAfterSendGroupMsg CallBackConfig `yaml:"afterSendGroupMsg"` CallbackAfterSendGroupMsg CallBackConfig `yaml:"afterSendGroupMsg"`
CallbackMsgModify CallBackConfig `yaml:"msgModify"` CallbackMsgModify CallBackConfig `yaml:"msgModify"`
CallbackSingleMsgRead CallBackConfig `yaml:"singleMsgRead"`
CallbackGroupMsgRead CallBackConfig `yaml:"groupMsgRead"`
CallbackUserOnline CallBackConfig `yaml:"userOnline"` CallbackUserOnline CallBackConfig `yaml:"userOnline"`
CallbackUserOffline CallBackConfig `yaml:"userOffline"` CallbackUserOffline CallBackConfig `yaml:"userOffline"`
CallbackUserKickOff CallBackConfig `yaml:"userKickOff"` CallbackUserKickOff CallBackConfig `yaml:"userKickOff"`
@ -259,10 +262,21 @@ type configStruct struct {
CallbackOnlinePush CallBackConfig `yaml:"onlinePush"` CallbackOnlinePush CallBackConfig `yaml:"onlinePush"`
CallbackBeforeSuperGroupOnlinePush CallBackConfig `yaml:"superGroupOnlinePush"` CallbackBeforeSuperGroupOnlinePush CallBackConfig `yaml:"superGroupOnlinePush"`
CallbackBeforeAddFriend CallBackConfig `yaml:"beforeAddFriend"` CallbackBeforeAddFriend CallBackConfig `yaml:"beforeAddFriend"`
CallbackBeforeSetFriendRemark CallBackConfig `yaml:"callbackBeforeSetFriendRemark"`
CallbackAfterSetFriendRemark CallBackConfig `yaml:"callbackAfterSetFriendRemark"`
CallbackBeforeUpdateUserInfo CallBackConfig `yaml:"beforeUpdateUserInfo"` CallbackBeforeUpdateUserInfo CallBackConfig `yaml:"beforeUpdateUserInfo"`
CallbackBeforeUserRegister CallBackConfig `yaml:"beforeUserRegister"`
CallbackAfterUpdateUserInfo CallBackConfig `yaml:"updateUserInfo"`
CallbackAfterUserRegister CallBackConfig `yaml:"afterUserRegister"`
CallbackBeforeCreateGroup CallBackConfig `yaml:"beforeCreateGroup"` CallbackBeforeCreateGroup CallBackConfig `yaml:"beforeCreateGroup"`
CallbackAfterCreateGroup CallBackConfig `yaml:"afterCreateGroup"`
CallbackBeforeMemberJoinGroup CallBackConfig `yaml:"beforeMemberJoinGroup"` CallbackBeforeMemberJoinGroup CallBackConfig `yaml:"beforeMemberJoinGroup"`
CallbackBeforeSetGroupMemberInfo CallBackConfig `yaml:"beforeSetGroupMemberInfo"` CallbackBeforeSetGroupMemberInfo CallBackConfig `yaml:"beforeSetGroupMemberInfo"`
CallbackQuitGroup CallBackConfig `yaml:"quitGroup"`
CallbackKillGroupMember CallBackConfig `yaml:"killGroupMember"`
CallbackDismissGroup CallBackConfig `yaml:"dismissGroup"`
CallbackBeforeJoinGroup CallBackConfig `yaml:"joinGroup"`
CallbackTransferGroupOwnerAfter CallBackConfig `yaml:"transferGroupOwner"`
CallbackBeforeInviteUserToGroup CallBackConfig `yaml:"beforeInviteUserToGroup"` CallbackBeforeInviteUserToGroup CallBackConfig `yaml:"beforeInviteUserToGroup"`
CallbackAfterJoinGroup CallBackConfig `yaml:"joinGroupAfter"` CallbackAfterJoinGroup CallBackConfig `yaml:"joinGroupAfter"`
CallbackAfterSetGroupInfo CallBackConfig `yaml:"setGroupInfoAfter"` CallbackAfterSetGroupInfo CallBackConfig `yaml:"setGroupInfoAfter"`

@ -15,8 +15,6 @@
package convert package convert
import ( import (
"time"
"github.com/OpenIMSDK/protocol/sdkws" "github.com/OpenIMSDK/protocol/sdkws"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
@ -43,7 +41,6 @@ func UserPb2DB(user *sdkws.UserInfo) *relationtb.UserModel {
userDB.Nickname = user.Nickname userDB.Nickname = user.Nickname
userDB.FaceURL = user.FaceURL userDB.FaceURL = user.FaceURL
userDB.Ex = user.Ex userDB.Ex = user.Ex
userDB.CreateTime = time.UnixMilli(user.CreateTime)
userDB.AppMangerLevel = user.AppMangerLevel userDB.AppMangerLevel = user.AppMangerLevel
userDB.GlobalRecvMsgOpt = user.GlobalRecvMsgOpt userDB.GlobalRecvMsgOpt = user.GlobalRecvMsgOpt
return &userDB return &userDB

@ -645,19 +645,35 @@ func (c *msgCache) PipeDeleteMessages(ctx context.Context, conversationID string
} }
func (c *msgCache) CleanUpOneConversationAllMsg(ctx context.Context, conversationID string) error { func (c *msgCache) CleanUpOneConversationAllMsg(ctx context.Context, conversationID string) error {
vals, err := c.rdb.Keys(ctx, c.allMessageCacheKey(conversationID)).Result() var (
if errors.Is(err, redis.Nil) { cursor uint64
return nil keys []string
} err error
key = c.allMessageCacheKey(conversationID)
)
for {
// scan up to 10000 at a time, the count (10000) param refers to the number of scans on redis server.
// if the count is too small, needs to be run scan on redis frequently.
var limit int64 = 10000
keys, cursor, err = c.rdb.Scan(ctx, cursor, key, limit).Result()
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
for _, v := range vals {
if err := c.rdb.Del(ctx, v).Err(); err != nil { for _, key := range keys {
err := c.rdb.Del(ctx, key).Err()
if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
} }
// scan end
if cursor == 0 {
return nil return nil
}
}
} }
func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64) error { func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64) error {

@ -385,3 +385,50 @@ func testParallelDeleteMessagesMix(t *testing.T, cid string, seqs []int64, input
assert.EqualValues(t, 1, val) // exists assert.EqualValues(t, 1, val) // exists
} }
} }
func TestCleanUpOneConversationAllMsg(t *testing.T) {
rdb := redis.NewClient(&redis.Options{})
defer rdb.Close()
cacher := msgCache{rdb: rdb}
count := 1000
prefix := fmt.Sprintf("%v", rand.Int63())
ids := []string{}
for i := 0; i < count; i++ {
id := fmt.Sprintf("%v-cid-%v", prefix, rand.Int63())
ids = append(ids, id)
key := cacher.allMessageCacheKey(id)
rdb.Set(context.Background(), key, "openim", 0)
}
// delete 100 keys with scan.
for i := 0; i < 100; i++ {
pickedKey := ids[i]
err := cacher.CleanUpOneConversationAllMsg(context.Background(), pickedKey)
assert.Nil(t, err)
ls, err := rdb.Keys(context.Background(), pickedKey).Result()
assert.Nil(t, err)
assert.Equal(t, 0, len(ls))
rcode, err := rdb.Exists(context.Background(), pickedKey).Result()
assert.Nil(t, err)
assert.EqualValues(t, 0, rcode) // non-exists
}
sid := fmt.Sprintf("%v-cid-*", prefix)
ls, err := rdb.Keys(context.Background(), cacher.allMessageCacheKey(sid)).Result()
assert.Nil(t, err)
assert.Equal(t, count-100, len(ls))
// delete fuzzy matching keys.
err = cacher.CleanUpOneConversationAllMsg(context.Background(), sid)
assert.Nil(t, err)
// don't contains keys matched `{prefix}-cid-{random}` on redis
ls, err = rdb.Keys(context.Background(), cacher.allMessageCacheKey(sid)).Result()
assert.Nil(t, err)
assert.Equal(t, 0, len(ls))
}

@ -20,7 +20,6 @@ import (
"encoding/json" "encoding/json"
"io" "io"
"net/http" "net/http"
urllib "net/url"
"time" "time"
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
@ -107,17 +106,18 @@ func PostReturn(ctx context.Context, url string, header map[string]string, input
} }
func callBackPostReturn(ctx context.Context, url, command string, input interface{}, output callbackstruct.CallbackResp, callbackConfig config.CallBackConfig) error { func callBackPostReturn(ctx context.Context, url, command string, input interface{}, output callbackstruct.CallbackResp, callbackConfig config.CallBackConfig) error {
defer log.ZDebug(ctx, "callback", "url", url, "command", command, "input", input, "callbackConfig", callbackConfig) defer log.ZDebug(ctx, "callback", "url", url, "command", command, "input", input, "output", output, "callbackConfig", callbackConfig)
//
v := urllib.Values{} //v := urllib.Values{}
v.Set(constant.CallbackCommand, command) //v.Set(constant.CallbackCommand, command)
url = url + "?" + v.Encode() //url = url + "/" + v.Encode()
url = url + "/" + command
b, err := Post(ctx, url, nil, input, callbackConfig.CallbackTimeOut) b, err := Post(ctx, url, nil, input, callbackConfig.CallbackTimeOut)
if err != nil { if err != nil {
if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue { if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue {
log.ZWarn(ctx, "callback failed but continue", err, "url", url) log.ZWarn(ctx, "callback failed but continue", err, "url", url)
return errs.ErrCallbackContinue return nil
} }
return errs.ErrNetwork.Wrap(err.Error()) return errs.ErrNetwork.Wrap(err.Error())
} }
@ -125,7 +125,7 @@ func callBackPostReturn(ctx context.Context, url, command string, input interfac
if err = json.Unmarshal(b, output); err != nil { if err = json.Unmarshal(b, output); err != nil {
if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue { if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue {
log.ZWarn(ctx, "callback failed but continue", err, "url", url) log.ZWarn(ctx, "callback failed but continue", err, "url", url)
return errs.ErrCallbackContinue return nil
} }
return errs.ErrData.Wrap(err.Error()) return errs.ErrData.Wrap(err.Error())
} }

@ -282,7 +282,7 @@ readonly ALERTMANAGER_SMTP_REQUIRE_TLS=${ALERTMANAGER_SMTP_REQUIRE_TLS:-"false"}
# SMTP HELO/EHLO标识符 # SMTP HELO/EHLO标识符
readonly ALERTMANAGER_SMTP_HELLO=${ALERTMANAGER_SMTP_HELLO:-"xxx监控告警"} readonly ALERTMANAGER_SMTP_HELLO=${ALERTMANAGER_SMTP_HELLO:-"xxx监控告警"}
# 邮箱接收人 # 邮箱接收人
readonly ALERTMANAGER_EMAIL_TO=${ALERTMANAGER_EMAIL_TO:-"{EMAIL_TO:-'alert@example.com'}"} readonly ALERTMANAGER_EMAIL_TO=${ALERTMANAGER_EMAIL_TO:-"alert@example.com"}
# 邮箱主题 # 邮箱主题
readonly ALERTMANAGER_EMAIL_SUBJECT=${ALERTMANAGER_EMAIL_SUBJECT:-"{EMAIL_SUBJECT:-'[Alert] Notification'}"} readonly ALERTMANAGER_EMAIL_SUBJECT=${ALERTMANAGER_EMAIL_SUBJECT:-"{EMAIL_SUBJECT:-'[Alert] Notification'}"}
# 是否发送已解决的告警 # 是否发送已解决的告警
@ -376,6 +376,10 @@ def "FRIEND_VERIFY" "false" # 朋友验证
def "IOS_PUSH_SOUND" "xxx" # IOS推送声音 def "IOS_PUSH_SOUND" "xxx" # IOS推送声音
def "IOS_BADGE_COUNT" "true" # IOS徽章计数 def "IOS_BADGE_COUNT" "true" # IOS徽章计数
def "IOS_PRODUCTION" "false" # IOS生产 def "IOS_PRODUCTION" "false" # IOS生产
# callback 配置
def "CALLBACK_ENABLE" "true" # 是否开启 Callback
def "CALLBACK_TIMEOUT" "5" # 最长超时时间
def "CALLBACK_FAILED_CONTINUE" "true" # 失败后是否继续
###################### Prometheus 配置信息 ###################### ###################### Prometheus 配置信息 ######################
# 是否启用 Prometheus # 是否启用 Prometheus

@ -1168,6 +1168,40 @@ EOF
openim::test::check_error "$response" openim::test::check_error "$response"
} }
# Searches for messages.
openim::test::search_msg() {
local sendID="${1}"
local recvID="${2}"
local msgType="${3}"
local sendTime="${4}"
local sessionType="${5}"
local pageNumber="${6}"
local showNumber="${7}"
# Construct the request body
local request_body=$(cat <<EOF
{
"sendID": "${sendID}",
"recvID": "${recvID}",
"msgType": ${msgType},
"sendTime": "${sendTime}",
"sessionType": ${sessionType},
"pagination": {
"pageNumber": ${pageNumber},
"showNumber": ${showNumber}
}
}
EOF
)
echo "$request_body"
# Send the request
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/msg/search_msg" -d "${request_body}")
# Check the response for errors
openim::test::check_error "$response"
}
# Revokes a message. # Revokes a message.
openim::test::revoke_msg() { openim::test::revoke_msg() {
local userID="${1}" local userID="${1}"
@ -1221,14 +1255,22 @@ function openim::test::msg()
# 0. Send a message. # 0. Send a message.
openim::test::send_msg "${SEND_USER_ID}" "${RECV_USER_ID}" "${GROUP_ID}" openim::test::send_msg "${SEND_USER_ID}" "${RECV_USER_ID}" "${GROUP_ID}"
# Assuming message sending was successful and returned a sequence number. # Wait for a short duration to ensure message is sent
local SEQ_NUMBER=1 # This should be the actual sequence number of the message sent. # 1. Search for the message
local SEARCH_TIME="2023-01-01T00:00:00Z" # You may need to adjust this
local MSG_TYPE=101
local SESSION_TYPE=1
local PAGE_NUMBER=1
local SHOW_NUMBER=20
echo "Searching for messages between ${SEND_USER_ID} and ${RECV_USER_ID}..."
openim::test::search_msg "${MANAGER_USERID_1}" "${RECV_USER_ID}" "${MSG_TYPE}" "${SEARCH_TIME}" "${SESSION_TYPE}" "${PAGE_NUMBER}" "${SHOW_NUMBER}"
# 1. Revoke a message. # 2. Revoke a message.
# TODO # TODO
# openim::test::revoke_msg "${RECV_USER_ID}" "si_${SEND_USER_ID}_${RECV_USER_ID}" "${SEQ_NUMBER}" # openim::test::revoke_msg "${RECV_USER_ID}" "si_${SEND_USER_ID}_${RECV_USER_ID}" "${SEQ_NUMBER}"
# 2. Clear all messages for a user. # 3. Clear all messages for a user.
openim::test::user_clear_all_msg "${RECV_USER_ID}" openim::test::user_clear_all_msg "${RECV_USER_ID}"
# Log the completion of the message test suite. # Log the completion of the message test suite.

Loading…
Cancel
Save