diff --git a/cmd/openim-api/main.go b/cmd/openim-api/main.go index 1375655ba..cb9b09802 100644 --- a/cmd/openim-api/main.go +++ b/cmd/openim-api/main.go @@ -18,11 +18,13 @@ import ( "context" "fmt" "net" + "net/http" _ "net/http/pprof" + "os" + "os/signal" "strconv" - - ginProm "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus" - "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" + "syscall" + "time" "github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/tools/discoveryregistry" @@ -33,6 +35,8 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" + ginProm "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" ) func main() { @@ -51,13 +55,12 @@ func run(port int, proPort int) error { if port == 0 || proPort == 0 { err := "port or proPort is empty:" + strconv.Itoa(port) + "," + strconv.Itoa(proPort) log.ZError(context.Background(), err, nil) - return fmt.Errorf(err) } + rdb, err := cache.NewRedis() if err != nil { log.ZError(context.Background(), "Failed to initialize Redis", err) - return err } log.ZInfo(context.Background(), "api start init discov client") @@ -68,30 +71,29 @@ func run(port int, proPort int) error { client, err = kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery) if err != nil { log.ZError(context.Background(), "Failed to initialize discovery register", err) - return err } + if err = client.CreateRpcRootNodes(config.Config.GetServiceNames()); err != nil { log.ZError(context.Background(), "Failed to create RPC root nodes", err) - return err } + log.ZInfo(context.Background(), "api register public config to discov") if err = client.RegisterConf2Registry(constant.OpenIMCommonConfigKey, config.Config.EncodeConfig()); err != nil { log.ZError(context.Background(), "Failed to register public config to discov", err) - return err } + log.ZInfo(context.Background(), "api register public config to discov success") router := api.NewGinRouter(client, rdb) - ////////////////////////////// if config.Config.Prometheus.Enable { p := ginProm.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api")) p.SetListenAddress(fmt.Sprintf(":%d", proPort)) p.Use(router) } - ///////////////////////////////// log.ZInfo(context.Background(), "api init router success") + var address string if config.Config.Api.ListenIP != "" { address = net.JoinHostPort(config.Config.Api.ListenIP, strconv.Itoa(port)) @@ -100,10 +102,25 @@ func run(port int, proPort int) error { } log.ZInfo(context.Background(), "start api server", "address", address, "OpenIM version", config.Version) - err = router.Run(address) - if err != nil { - log.ZError(context.Background(), "api run failed", err, "address", address) - + server := http.Server{Addr: address, Handler: router} + go func() { + err = server.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + log.ZError(context.Background(), "api run failed", err, "address", address) + os.Exit(1) + } + }() + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + <-sigs + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + // graceful shutdown operation. + if err := server.Shutdown(ctx); err != nil { + log.ZError(context.Background(), "failed to api-server shutdown", err) return err } diff --git a/deployments/README.md b/deployments/README.md index 0f73a553e..b24babb31 100644 --- a/deployments/README.md +++ b/deployments/README.md @@ -84,8 +84,8 @@ $ sudo sealos run labring/kubernetes:v1.25.0 labring/helm:v3.8.2 labring/calico: If you are local, you can also use Kind and Minikube to test, for example, using Kind: ```bash -$ sGO111MODULE="on" go get sigs.k8s.io/kind@v0.11.1 -$ skind create cluster +$ GO111MODULE="on" go get sigs.k8s.io/kind@v0.11.1 +$ kind create cluster ``` ### Installing helm @@ -174,4 +174,4 @@ cp ../config/notification.yaml ./charts/generated-configs/notification.yaml ```bash helmfile apply -``` \ No newline at end of file +``` diff --git a/deployments/templates/alertmanager.yml b/deployments/templates/alertmanager.yml index a6cd02fc7..ea99a9286 100644 --- a/deployments/templates/alertmanager.yml +++ b/deployments/templates/alertmanager.yml @@ -19,6 +19,7 @@ templates: - /etc/alertmanager/email.tmpl route: + group_by: ['alertname'] group_wait: 5s group_interval: 5s repeat_interval: 5m diff --git a/deployments/templates/openim.yaml b/deployments/templates/openim.yaml index 9788df2ba..70ec5843b 100644 --- a/deployments/templates/openim.yaml +++ b/deployments/templates/openim.yaml @@ -164,7 +164,7 @@ object: # These ports are passed into the program by the script and are not recommended to modify # For launching multiple programs, just fill in multiple ports separated by commas # For example, [10110, 10111] -rpcPort: +rpcPort: openImUserPort: [ ${OPENIM_USER_PORT} ] openImFriendPort: [ ${OPENIM_FRIEND_PORT} ] openImMessagePort: [ ${OPENIM_MESSAGE_PORT} ] @@ -314,68 +314,125 @@ iosPush: callback: url: "" beforeSendSingleMsg: - enable: false - timeout: 5 - failedContinue: true + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} afterSendSingleMsg: - enable: false - timeout: 5 + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} beforeSendGroupMsg: - enable: false - timeout: 5 - failedContinue: true + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} afterSendGroupMsg: - enable: false - timeout: 5 + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} msgModify: - enable: false - timeout: 5 - failedContinue: true + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} userOnline: - enable: false - timeout: 5 + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} userOffline: - enable: false - timeout: 5 + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} userKickOff: - enable: false - timeout: 5 + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} offlinePush: - enable: false - timeout: 5 - failedContinue: true + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} onlinePush: - enable: false - timeout: 5 - failedContinue: true + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} superGroupOnlinePush: - enable: false - timeout: 5 - failedContinue: true + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} beforeAddFriend: - enable: false - timeout: 5 - failedContinue: true + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} beforeUpdateUserInfo: - enable: false - timeout: 5 - failedContinue: true + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} beforeCreateGroup: - enable: false - timeout: 5 - failedContinue: true + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} + afterCreateGroup: + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} beforeMemberJoinGroup: - enable: false - timeout: 5 - failedContinue: true + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} beforeSetGroupMemberInfo: - enable: false - timeout: 5 - failedContinue: true + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} setMessageReactionExtensions: - enable: false - timeout: 5 - failedContinue: true + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} + quitGroup: + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} + killGroupMember: + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} + dismissGroup: + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} + joinGroup: + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} + groupMsgRead: + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} + singleMsgRead: + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} + updateUserInfo: + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} + beforeUserRegister: + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} + afterUserRegister: + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} + transferGroupOwner: + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} + beforeSetFriendRemark: + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} + afterSetFriendRemark: + enable: ${CALLBACK_ENABLE} + timeout: ${CALLBACK_TIMEOUT} + failedContinue: ${CALLBACK_FAILED_CONTINUE} afterGroupMsgRead: enable: false timeout: 5 diff --git a/docker-compose.yml b/docker-compose.yml index 1b9b391b2..1a6626add 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -100,7 +100,6 @@ services: - KAFKA_CFG_NODE_ID=0 - KAFKA_CFG_PROCESS_ROLES=controller,broker - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@:9093 - - KAFKA_HEAP_OPTS:"-Xmx256m -Xms256m" - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://${DOCKER_BRIDGE_GATEWAY}:${KAFKA_PORT} - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT diff --git a/docs/contrib/environment.md b/docs/contrib/environment.md index 221efdf03..8e0cf2572 100644 --- a/docs/contrib/environment.md +++ b/docs/contrib/environment.md @@ -466,7 +466,7 @@ This section involves configuring the log settings, including storage location, This section involves setting up additional configuration variables for Websocket, Push Notifications, and Chat. | Parameter | Example Value | Description | -| ----------------------- | ----------------- | ---------------------------------- | +|-------------------------|-------------------|------------------------------------| | WEBSOCKET_MAX_CONN_NUM | "100000" | Maximum Websocket connections | | WEBSOCKET_MAX_MSG_LEN | "4096" | Maximum Websocket message length | | WEBSOCKET_TIMEOUT | "10" | Websocket timeout | @@ -500,9 +500,9 @@ This section involves setting up additional configuration variables for Websocke | TOKEN_EXPIRE | "90" | Token Expiry Time | | FRIEND_VERIFY | "false" | Friend Verification Enable | | IOS_PUSH_SOUND | "xxx" | iOS | - - - +| CALLBACK_ENABLE | "true" | Enable callback | | | | +| CALLBACK_TIMEOUT | "5" | Maximum timeout for callback call | +| CALLBACK_FAILED_CONTINUE| "true" | fails to continue to the next step | ### 2.20. Prometheus Configuration This section involves configuring Prometheus, including enabling/disabling it and setting up ports for various services. diff --git a/internal/msggateway/callback.go b/internal/msggateway/callback.go index d7320a304..7d5381754 100644 --- a/internal/msggateway/callback.go +++ b/internal/msggateway/callback.go @@ -37,7 +37,7 @@ func CallbackUserOnline(ctx context.Context, userID string, platformID int, isAp req := cbapi.CallbackUserOnlineReq{ UserStatusCallbackReq: cbapi.UserStatusCallbackReq{ UserStatusBaseCallback: cbapi.UserStatusBaseCallback{ - CallbackCommand: constant.CallbackUserOnlineCommand, + CallbackCommand: cbapi.CallbackUserOnlineCommand, OperationID: mcontext.GetOperationID(ctx), PlatformID: platformID, Platform: constant.PlatformIDToName(platformID), @@ -49,7 +49,10 @@ func CallbackUserOnline(ctx context.Context, userID string, platformID int, isAp ConnID: connID, } resp := cbapi.CommonCallbackResp{} - return http.CallBackPostReturn(ctx, callBackURL(), &req, &resp, config.Config.Callback.CallbackUserOnline) + if err := http.CallBackPostReturn(ctx, callBackURL(), &req, &resp, config.Config.Callback.CallbackUserOnline); err != nil { + return err + } + return nil } func CallbackUserOffline(ctx context.Context, userID string, platformID int, connID string) error { @@ -59,7 +62,7 @@ func CallbackUserOffline(ctx context.Context, userID string, platformID int, con req := &cbapi.CallbackUserOfflineReq{ UserStatusCallbackReq: cbapi.UserStatusCallbackReq{ UserStatusBaseCallback: cbapi.UserStatusBaseCallback{ - CallbackCommand: constant.CallbackUserOfflineCommand, + CallbackCommand: cbapi.CallbackUserOfflineCommand, OperationID: mcontext.GetOperationID(ctx), PlatformID: platformID, Platform: constant.PlatformIDToName(platformID), @@ -70,7 +73,10 @@ func CallbackUserOffline(ctx context.Context, userID string, platformID int, con ConnID: connID, } resp := &cbapi.CallbackUserOfflineResp{} - return http.CallBackPostReturn(ctx, callBackURL(), req, resp, config.Config.Callback.CallbackUserOffline) + if err := http.CallBackPostReturn(ctx, callBackURL(), req, resp, config.Config.Callback.CallbackUserOffline); err != nil { + return err + } + return nil } func CallbackUserKickOff(ctx context.Context, userID string, platformID int) error { @@ -80,7 +86,7 @@ func CallbackUserKickOff(ctx context.Context, userID string, platformID int) err req := &cbapi.CallbackUserKickOffReq{ UserStatusCallbackReq: cbapi.UserStatusCallbackReq{ UserStatusBaseCallback: cbapi.UserStatusBaseCallback{ - CallbackCommand: constant.CallbackUserKickOffCommand, + CallbackCommand: cbapi.CallbackUserKickOffCommand, OperationID: mcontext.GetOperationID(ctx), PlatformID: platformID, Platform: constant.PlatformIDToName(platformID), @@ -90,7 +96,10 @@ func CallbackUserKickOff(ctx context.Context, userID string, platformID int) err Seq: time.Now().UnixMilli(), } resp := &cbapi.CommonCallbackResp{} - return http.CallBackPostReturn(ctx, callBackURL(), req, resp, config.Config.Callback.CallbackUserOffline) + if err := http.CallBackPostReturn(ctx, callBackURL(), req, resp, config.Config.Callback.CallbackUserOffline); err != nil { + return err + } + return nil } // func callbackUserOnline(operationID, userID string, platformID int, token string, isAppBackground bool, connID diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 12a6d3770..14c320c42 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -37,7 +37,9 @@ func RunWsAndServer(rpcPort, wsPort, prometheusPort int) error { WithPort(wsPort), WithMaxConnNum(int64(config.Config.LongConnSvr.WebsocketMaxConnNum)), WithHandshakeTimeout(time.Duration(config.Config.LongConnSvr.WebsocketTimeout)*time.Second), - WithMessageMaxMsgLength(config.Config.LongConnSvr.WebsocketMaxMsgLen)) + WithMessageMaxMsgLength(config.Config.LongConnSvr.WebsocketMaxMsgLen), + WithWriteBufferSize(config.Config.LongConnSvr.WebsocketWriteBufferSize), + ) if err != nil { return err } diff --git a/internal/msggateway/long_conn.go b/internal/msggateway/long_conn.go index 604619eb5..93e5cc33f 100644 --- a/internal/msggateway/long_conn.go +++ b/internal/msggateway/long_conn.go @@ -50,10 +50,11 @@ type GWebSocket struct { protocolType int conn *websocket.Conn handshakeTimeout time.Duration + writeBufferSize int } -func newGWebSocket(protocolType int, handshakeTimeout time.Duration) *GWebSocket { - return &GWebSocket{protocolType: protocolType, handshakeTimeout: handshakeTimeout} +func newGWebSocket(protocolType int, handshakeTimeout time.Duration, wbs int) *GWebSocket { + return &GWebSocket{protocolType: protocolType, handshakeTimeout: handshakeTimeout, writeBufferSize: wbs} } func (d *GWebSocket) Close() error { @@ -65,6 +66,10 @@ func (d *GWebSocket) GenerateLongConn(w http.ResponseWriter, r *http.Request) er HandshakeTimeout: d.handshakeTimeout, CheckOrigin: func(r *http.Request) bool { return true }, } + if d.writeBufferSize > 0 { // default is 4kb. + upgrader.WriteBufferSize = d.writeBufferSize + } + conn, err := upgrader.Upgrade(w, r, nil) if err != nil { return err diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index 736894708..5d7d4c542 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -68,6 +68,7 @@ type WsServer struct { onlineUserNum atomic.Int64 onlineUserConnNum atomic.Int64 handshakeTimeout time.Duration + writeBufferSize int validate *validator.Validate cache cache.MsgModel userClient *rpcclient.UserRpcClient @@ -138,6 +139,7 @@ func NewWsServer(opts ...Option) (*WsServer, error) { return &WsServer{ port: config.port, wsMaxConnNum: config.maxConnNum, + writeBufferSize: config.writeBufferSize, handshakeTimeout: config.handshakeTimeout, clientPool: sync.Pool{ New: func() interface{} { @@ -431,7 +433,8 @@ func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) { httpError(connContext, errs.ErrTokenNotExist.Wrap()) return } - wsLongConn := newGWebSocket(WebSocket, ws.handshakeTimeout) + + wsLongConn := newGWebSocket(WebSocket, ws.handshakeTimeout, ws.writeBufferSize) err = wsLongConn.GenerateLongConn(w, r) if err != nil { httpError(connContext, err) diff --git a/internal/msggateway/options.go b/internal/msggateway/options.go index 24cbbe43f..6513ac5dc 100644 --- a/internal/msggateway/options.go +++ b/internal/msggateway/options.go @@ -27,6 +27,8 @@ type ( handshakeTimeout time.Duration // 允许消息最大长度 messageMaxMsgLength int + // websocket write buffer, default: 4096, 4kb. + writeBufferSize int } ) @@ -53,3 +55,9 @@ func WithMessageMaxMsgLength(length int) Option { opt.messageMaxMsgLength = length } } + +func WithWriteBufferSize(size int) Option { + return func(opt *configs) { + opt.writeBufferSize = size + } +} diff --git a/internal/push/callback.go b/internal/push/callback.go index c646df3e2..2085493c5 100644 --- a/internal/push/callback.go +++ b/internal/push/callback.go @@ -19,7 +19,6 @@ import ( "github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/sdkws" - "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/mcontext" "github.com/OpenIMSDK/tools/utils" @@ -44,7 +43,7 @@ func callbackOfflinePush( req := &callbackstruct.CallbackBeforePushReq{ UserStatusBatchCallbackReq: callbackstruct.UserStatusBatchCallbackReq{ UserStatusBaseCallback: callbackstruct.UserStatusBaseCallback{ - CallbackCommand: constant.CallbackOfflinePushCommand, + CallbackCommand: callbackstruct.CallbackOfflinePushCommand, OperationID: mcontext.GetOperationID(ctx), PlatformID: int(msg.SenderPlatformID), Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)), @@ -62,9 +61,6 @@ func callbackOfflinePush( } resp := &callbackstruct.CallbackBeforePushResp{} if err := http.CallBackPostReturn(ctx, url(), req, resp, config.Config.Callback.CallbackOfflinePush); err != nil { - if err == errs.ErrCallbackContinue { - return nil - } return err } if len(resp.UserIDs) != 0 { @@ -83,7 +79,7 @@ func callbackOnlinePush(ctx context.Context, userIDs []string, msg *sdkws.MsgDat req := callbackstruct.CallbackBeforePushReq{ UserStatusBatchCallbackReq: callbackstruct.UserStatusBatchCallbackReq{ UserStatusBaseCallback: callbackstruct.UserStatusBaseCallback{ - CallbackCommand: constant.CallbackOnlinePushCommand, + CallbackCommand: callbackstruct.CallbackOnlinePushCommand, OperationID: mcontext.GetOperationID(ctx), PlatformID: int(msg.SenderPlatformID), Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)), @@ -99,7 +95,10 @@ func callbackOnlinePush(ctx context.Context, userIDs []string, msg *sdkws.MsgDat Content: GetContent(msg), } resp := &callbackstruct.CallbackBeforePushResp{} - return http.CallBackPostReturn(ctx, url(), req, resp, config.Config.Callback.CallbackOnlinePush) + if err := http.CallBackPostReturn(ctx, url(), req, resp, config.Config.Callback.CallbackOnlinePush); err != nil { + return err + } + return nil } func callbackBeforeSuperGroupOnlinePush( @@ -113,7 +112,7 @@ func callbackBeforeSuperGroupOnlinePush( } req := callbackstruct.CallbackBeforeSuperGroupOnlinePushReq{ UserStatusBaseCallback: callbackstruct.UserStatusBaseCallback{ - CallbackCommand: constant.CallbackSuperGroupOnlinePushCommand, + CallbackCommand: callbackstruct.CallbackSuperGroupOnlinePushCommand, OperationID: mcontext.GetOperationID(ctx), PlatformID: int(msg.SenderPlatformID), Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)), @@ -129,11 +128,9 @@ func callbackBeforeSuperGroupOnlinePush( } resp := &callbackstruct.CallbackBeforeSuperGroupOnlinePushResp{} if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackBeforeSuperGroupOnlinePush); err != nil { - if err == errs.ErrCallbackContinue { - return nil - } return err } + return nil if len(resp.UserIDs) != 0 { *pushToUserIDs = resp.UserIDs } diff --git a/internal/rpc/friend/callback.go b/internal/rpc/friend/callback.go index 886acbb2e..9cca90498 100644 --- a/internal/rpc/friend/callback.go +++ b/internal/rpc/friend/callback.go @@ -16,7 +16,7 @@ package friend import ( "context" - "github.com/OpenIMSDK/protocol/constant" + "github.com/OpenIMSDK/tools/utils" pbfriend "github.com/OpenIMSDK/protocol/friend" "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/utils" @@ -30,16 +30,48 @@ func CallbackBeforeAddFriend(ctx context.Context, req *pbfriend.ApplyToAddFriend return nil } cbReq := &cbapi.CallbackBeforeAddFriendReq{ - CallbackCommand: constant.CallbackBeforeAddFriendCommand, + CallbackCommand: cbapi.CallbackBeforeAddFriendCommand, FromUserID: req.FromUserID, ToUserID: req.ToUserID, ReqMsg: req.ReqMsg, } resp := &cbapi.CallbackBeforeAddFriendResp{} if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeAddFriend); err != nil { - if err == errs.ErrCallbackContinue { - return nil - } + return err + } + return nil +} + +func CallbackBeforeSetFriendRemark(ctx context.Context, req *pbfriend.SetFriendRemarkReq) error { + if !config.Config.Callback.CallbackBeforeSetFriendRemark.Enable { + return nil + } + cbReq := &cbapi.CallbackBeforeSetFriendRemarkReq{ + CallbackCommand: cbapi.CallbackBeforeSetFriendRemark, + OwnerUserID: req.OwnerUserID, + FriendUserID: req.FriendUserID, + Remark: req.Remark, + } + resp := &cbapi.CallbackBeforeSetFriendRemarkResp{} + if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeAddFriend); err != nil { + return err + } + utils.NotNilReplace(&req.Remark, &resp.Remark) + return nil +} + +func CallbackAfterSetFriendRemark(ctx context.Context, req *pbfriend.SetFriendRemarkReq) error { + if !config.Config.Callback.CallbackAfterSetFriendRemark.Enable { + return nil + } + cbReq := &cbapi.CallbackAfterSetFriendRemarkReq{ + CallbackCommand: cbapi.CallbackAfterSetFriendRemark, + OwnerUserID: req.OwnerUserID, + FriendUserID: req.FriendUserID, + Remark: req.Remark, + } + resp := &cbapi.CallbackAfterSetFriendRemarkResp{} + if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeAddFriend); err != nil { return err } return nil diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index 1ef363b8a..fa1ffc0b4 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -233,6 +233,10 @@ func (s *friendServer) SetFriendRemark( req *pbfriend.SetFriendRemarkReq, ) (resp *pbfriend.SetFriendRemarkResp, err error) { defer log.ZInfo(ctx, utils.GetFuncName()+" Return") + + if err = CallbackBeforeSetFriendRemark(ctx, req); err != nil && err != errs.ErrCallbackContinue { + return nil, err + } resp = &pbfriend.SetFriendRemarkResp{} if err := s.userRpcClient.Access(ctx, req.OwnerUserID); err != nil { return nil, err @@ -244,6 +248,9 @@ func (s *friendServer) SetFriendRemark( if err := s.friendDatabase.UpdateRemark(ctx, req.OwnerUserID, req.FriendUserID, req.Remark); err != nil { return nil, err } + if err := CallbackAfterSetFriendRemark(ctx, req); err != nil && err != errs.ErrCallbackContinue { + return nil, err + } s.notificationSender.FriendRemarkSetNotification(ctx, req.OwnerUserID, req.FriendUserID) return resp, nil } diff --git a/internal/rpc/group/callback.go b/internal/rpc/group/callback.go index 1e80a3f82..33b880aac 100644 --- a/internal/rpc/group/callback.go +++ b/internal/rpc/group/callback.go @@ -22,10 +22,11 @@ import ( "github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/group" "github.com/OpenIMSDK/protocol/wrapperspb" - "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/mcontext" "github.com/OpenIMSDK/tools/utils" + pbgroup "github.com/OpenIMSDK/protocol/group" + "github.com/openimsdk/open-im-server/v3/pkg/apistruct" "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -38,7 +39,7 @@ func CallbackBeforeCreateGroup(ctx context.Context, req *group.CreateGroupReq) ( return nil } cbReq := &callbackstruct.CallbackBeforeCreateGroupReq{ - CallbackCommand: constant.CallbackBeforeCreateGroupCommand, + CallbackCommand: callbackstruct.CallbackBeforeCreateGroupCommand, OperationID: mcontext.GetOperationID(ctx), GroupInfo: req.GroupInfo, } @@ -59,17 +60,7 @@ func CallbackBeforeCreateGroup(ctx context.Context, req *group.CreateGroupReq) ( }) } resp := &callbackstruct.CallbackBeforeCreateGroupResp{} - err = http.CallBackPostReturn( - ctx, - config.Config.Callback.CallbackUrl, - cbReq, - resp, - config.Config.Callback.CallbackBeforeCreateGroup, - ) - if err != nil { - if err == errs.ErrCallbackContinue { - return nil - } + if err = http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeCreateGroup); err != nil { return err } utils.NotNilReplace(&req.GroupInfo.GroupID, resp.GroupID) @@ -87,6 +78,37 @@ func CallbackBeforeCreateGroup(ctx context.Context, req *group.CreateGroupReq) ( return nil } +func CallbackAfterCreateGroup(ctx context.Context, req *group.CreateGroupReq) (err error) { + if !config.Config.Callback.CallbackAfterCreateGroup.Enable { + return nil + } + cbReq := &callbackstruct.CallbackAfterCreateGroupReq{ + CallbackCommand: callbackstruct.CallbackAfterCreateGroupCommand, + GroupInfo: req.GroupInfo, + } + cbReq.InitMemberList = append(cbReq.InitMemberList, &apistruct.GroupAddMemberInfo{ + UserID: req.OwnerUserID, + RoleLevel: constant.GroupOwner, + }) + for _, userID := range req.AdminUserIDs { + cbReq.InitMemberList = append(cbReq.InitMemberList, &apistruct.GroupAddMemberInfo{ + UserID: userID, + RoleLevel: constant.GroupAdmin, + }) + } + for _, userID := range req.MemberUserIDs { + cbReq.InitMemberList = append(cbReq.InitMemberList, &apistruct.GroupAddMemberInfo{ + UserID: userID, + RoleLevel: constant.GroupOrdinaryUsers, + }) + } + resp := &callbackstruct.CallbackAfterCreateGroupResp{} + if err = http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackAfterCreateGroup); err != nil { + return err + } + return nil +} + func CallbackBeforeMemberJoinGroup( ctx context.Context, groupMember *relation.GroupMemberModel, @@ -96,8 +118,7 @@ func CallbackBeforeMemberJoinGroup( return nil } callbackReq := &callbackstruct.CallbackBeforeMemberJoinGroupReq{ - CallbackCommand: constant.CallbackBeforeMemberJoinGroupCommand, - OperationID: mcontext.GetOperationID(ctx), + CallbackCommand: callbackstruct.CallbackBeforeMemberJoinGroupCommand, GroupID: groupMember.GroupID, UserID: groupMember.UserID, Ex: groupMember.Ex, @@ -129,8 +150,7 @@ func CallbackBeforeSetGroupMemberInfo(ctx context.Context, req *group.SetGroupMe return nil } callbackReq := callbackstruct.CallbackBeforeSetGroupMemberInfoReq{ - CallbackCommand: constant.CallbackBeforeSetGroupMemberInfoCommand, - OperationID: mcontext.GetOperationID(ctx), + CallbackCommand: callbackstruct.CallbackBeforeSetGroupMemberInfoCommand, GroupID: req.GroupID, UserID: req.UserID, } @@ -155,6 +175,9 @@ func CallbackBeforeSetGroupMemberInfo(ctx context.Context, req *group.SetGroupMe config.Config.Callback.CallbackBeforeSetGroupMemberInfo, ) if err != nil { + if err == errs.ErrCallbackContinue { + return nil + } return err } if resp.FaceURL != nil { @@ -171,145 +194,3 @@ func CallbackBeforeSetGroupMemberInfo(ctx context.Context, req *group.SetGroupMe } return nil } - -func CallbackBeforeInviteUserToGroup(ctx context.Context, req *group.InviteUserToGroupReq) (err error) { - if !config.Config.Callback.CallbackBeforeInviteUserToGroup.Enable { - return nil - } - - callbackReq := &callbackstruct.CallbackBeforeInviteUserToGroupReq{ - CallbackCommand: callbackstruct.CallbackBeforeInviteJoinGroupCommand, - OperationID: mcontext.GetOperationID(ctx), - GroupID: req.GroupID, - Reason: req.Reason, - InvitedUserIDs: req.InvitedUserIDs, - } - - resp := &callbackstruct.CallbackBeforeInviteUserToGroupResp{} - err = http.CallBackPostReturn( - ctx, - config.Config.Callback.CallbackUrl, - callbackReq, - resp, - config.Config.Callback.CallbackBeforeInviteUserToGroup, - ) - - if err != nil { - return err - } - - if len(resp.RefusedMembersAccount) > 0 { - // Handle the scenario where certain members are refused - // You might want to update the req.Members list or handle it as per your business logic - } - utils.StructFieldNotNilReplace(req, resp) - - return nil -} - -func CallbackAfterJoinGroup(ctx context.Context, req *group.JoinGroupReq) error { - if !config.Config.Callback.CallbackAfterJoinGroup.Enable { - return nil - } - callbackReq := &callbackstruct.CallbackAfterJoinGroupReq{ - CallbackCommand: callbackstruct.CallbackAfterJoinGroupCommand, - OperationID: mcontext.GetOperationID(ctx), - GroupID: req.GroupID, - ReqMessage: req.ReqMessage, - JoinSource: req.JoinSource, - InviterUserID: req.InviterUserID, - } - resp := &callbackstruct.CallbackAfterJoinGroupResp{} - if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackAfterJoinGroup); err != nil { - if err == errs.ErrCallbackContinue { - return nil - } - return err - } - return nil -} - -func CallbackBeforeSetGroupInfo(ctx context.Context, req *group.SetGroupInfoReq) error { - if !config.Config.Callback.CallbackBeforeSetGroupInfo.Enable { - return nil - } - callbackReq := &callbackstruct.CallbackBeforeSetGroupInfoReq{ - CallbackCommand: callbackstruct.CallbackBeforeSetGroupInfoCommand, - GroupID: req.GroupInfoForSet.GroupID, - Notification: req.GroupInfoForSet.Notification, - Introduction: req.GroupInfoForSet.Introduction, - FaceURL: req.GroupInfoForSet.FaceURL, - GroupName: req.GroupInfoForSet.GroupName, - } - - if req.GroupInfoForSet.Ex != nil { - callbackReq.Ex = req.GroupInfoForSet.Ex.Value - } - log.ZDebug(ctx, "debug CallbackBeforeSetGroupInfo", callbackReq.Ex) - if req.GroupInfoForSet.NeedVerification != nil { - callbackReq.NeedVerification = req.GroupInfoForSet.NeedVerification.Value - } - if req.GroupInfoForSet.LookMemberInfo != nil { - callbackReq.LookMemberInfo = req.GroupInfoForSet.LookMemberInfo.Value - } - if req.GroupInfoForSet.ApplyMemberFriend != nil { - callbackReq.ApplyMemberFriend = req.GroupInfoForSet.ApplyMemberFriend.Value - } - resp := &callbackstruct.CallbackBeforeSetGroupInfoResp{} - - if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackBeforeSetGroupInfo); err != nil { - if err == errs.ErrCallbackContinue { - return nil - } - return err - } - - if resp.Ex != nil { - req.GroupInfoForSet.Ex = wrapperspb.String(*resp.Ex) - } - if resp.NeedVerification != nil { - req.GroupInfoForSet.NeedVerification = wrapperspb.Int32(*resp.NeedVerification) - } - if resp.LookMemberInfo != nil { - req.GroupInfoForSet.LookMemberInfo = wrapperspb.Int32(*resp.LookMemberInfo) - } - if resp.ApplyMemberFriend != nil { - req.GroupInfoForSet.ApplyMemberFriend = wrapperspb.Int32(*resp.ApplyMemberFriend) - } - utils.StructFieldNotNilReplace(req, resp) - return nil -} -func CallbackAfterSetGroupInfo(ctx context.Context, req *group.SetGroupInfoReq) error { - if !config.Config.Callback.CallbackAfterSetGroupInfo.Enable { - return nil - } - callbackReq := &callbackstruct.CallbackAfterSetGroupInfoReq{ - CallbackCommand: callbackstruct.CallbackAfterSetGroupInfoCommand, - GroupID: req.GroupInfoForSet.GroupID, - Notification: req.GroupInfoForSet.Notification, - Introduction: req.GroupInfoForSet.Introduction, - FaceURL: req.GroupInfoForSet.FaceURL, - GroupName: req.GroupInfoForSet.GroupName, - } - if req.GroupInfoForSet.Ex != nil { - callbackReq.Ex = &req.GroupInfoForSet.Ex.Value - } - if req.GroupInfoForSet.NeedVerification != nil { - callbackReq.NeedVerification = &req.GroupInfoForSet.NeedVerification.Value - } - if req.GroupInfoForSet.LookMemberInfo != nil { - callbackReq.LookMemberInfo = &req.GroupInfoForSet.LookMemberInfo.Value - } - if req.GroupInfoForSet.ApplyMemberFriend != nil { - callbackReq.ApplyMemberFriend = &req.GroupInfoForSet.ApplyMemberFriend.Value - } - resp := &callbackstruct.CallbackAfterSetGroupInfoResp{} - if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackAfterSetGroupInfo); err != nil { - if err == errs.ErrCallbackContinue { - return nil - } - return err - } - utils.StructFieldNotNilReplace(req, resp) - return nil -} diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 85a98f89b..67cd70817 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -26,6 +26,8 @@ import ( "strings" "time" + "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" @@ -225,6 +227,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR if len(userMap) != len(userIDs) { return nil, errs.ErrUserIDNotFound.Wrap("user not found") } + // Callback Before create Group if err := CallbackBeforeCreateGroup(ctx, req); err != nil { return nil, err } @@ -298,6 +301,17 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR } s.Notification.GroupCreatedNotification(ctx, tips) } + reqCallBackAfter := &pbgroup.CreateGroupReq{ + MemberUserIDs: userIDs, + GroupInfo: resp.GroupInfo, + OwnerUserID: req.OwnerUserID, + AdminUserIDs: req.AdminUserIDs, + } + + if err := CallbackAfterCreateGroup(ctx, reqCallBackAfter); err != nil { + return nil, err + } + return resp, nil } @@ -613,6 +627,10 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbgroup.KickGrou if err := s.deleteMemberAndSetConversationSeq(ctx, req.GroupID, req.KickedUserIDs); err != nil { return nil, err } + + if err := CallbackKillGroupMember(ctx, req); err != nil { + return nil, err + } return resp, nil } @@ -824,6 +842,17 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq) if group.Status == constant.GroupStatusDismissed { return nil, errs.ErrDismissedAlready.Wrap() } + + reqCall := &callbackstruct.CallbackJoinGroupReq{ + GroupID: req.GroupID, + GroupType: string(group.GroupType), + ApplyID: req.InviterUserID, + ReqMessage: req.ReqMessage, + } + + if err = CallbackApplyJoinGroupBefore(ctx, reqCall); err != nil { + return nil, err + } _, err = s.GroupDatabase.TakeGroupMember(ctx, req.GroupID, req.InviterUserID) if err == nil { return nil, errs.ErrArgs.Wrap("already in group") @@ -912,6 +941,10 @@ func (s *groupServer) QuitGroup(ctx context.Context, req *pbgroup.QuitGroupReq) return nil, err } + // callback + if err := CallbackQuitGroup(ctx, req); err != nil { + return nil, err + } return resp, nil } @@ -1049,6 +1082,10 @@ func (s *groupServer) TransferGroupOwner(ctx context.Context, req *pbgroup.Trans if err := s.GroupDatabase.TransferGroupOwner(ctx, req.GroupID, req.OldOwnerUserID, req.NewOwnerUserID, newOwner.RoleLevel); err != nil { return nil, err } + + if err := CallbackTransferGroupOwnerAfter(ctx, req); err != nil { + return nil, err + } s.Notification.GroupOwnerTransferredNotification(ctx, req) return resp, nil } @@ -1219,6 +1256,20 @@ func (s *groupServer) DismissGroup(ctx context.Context, req *pbgroup.DismissGrou s.Notification.GroupDismissedNotification(ctx, tips) } } + membersID, err := s.GroupDatabase.FindGroupMemberUserID(ctx, group.GroupID) + if err != nil { + return nil, err + } + reqCall := &callbackstruct.CallbackDisMissGroupReq{ + GroupID: req.GroupID, + OwnerID: owner.UserID, + MembersID: membersID, + GroupType: string(group.GroupType), + } + if err := CallbackDismissGroup(ctx, reqCall); err != nil { + return nil, err + } + return resp, nil } @@ -1457,6 +1508,12 @@ func (s *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbgroup.SetGr } } } + for i := 0; i < len(req.Members); i++ { + if err := CallbackAfterSetGroupMemberInfo(ctx, req.Members[i]); err != nil { + return nil, err + } + } + return resp, nil } diff --git a/internal/rpc/msg/as_read.go b/internal/rpc/msg/as_read.go index 6e3bbe987..49113aa0b 100644 --- a/internal/rpc/msg/as_read.go +++ b/internal/rpc/msg/as_read.go @@ -16,10 +16,14 @@ package msg import ( "context" + cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" + + utils2 "github.com/OpenIMSDK/tools/utils" "github.com/redis/go-redis/v9" "github.com/OpenIMSDK/protocol/constant" + "github.com/OpenIMSDK/protocol/conversation" "github.com/OpenIMSDK/protocol/msg" "github.com/OpenIMSDK/protocol/sdkws" "github.com/OpenIMSDK/tools/errs" @@ -88,10 +92,7 @@ func (m *msgServer) SetConversationHasReadSeq( return &msg.SetConversationHasReadSeqResp{}, nil } -func (m *msgServer) MarkMsgsAsRead( - ctx context.Context, - req *msg.MarkMsgsAsReadReq, -) (resp *msg.MarkMsgsAsReadResp, err error) { +func (m *msgServer) MarkMsgsAsRead(ctx context.Context, req *msg.MarkMsgsAsReadReq) (resp *msg.MarkMsgsAsReadResp, err error) { if len(req.Seqs) < 1 { return nil, errs.ErrArgs.Wrap("seqs must not be empty") } @@ -127,10 +128,7 @@ func (m *msgServer) MarkMsgsAsRead( return &msg.MarkMsgsAsReadResp{}, nil } -func (m *msgServer) MarkConversationAsRead( - ctx context.Context, - req *msg.MarkConversationAsReadReq, -) (resp *msg.MarkConversationAsReadResp, err error) { +func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkConversationAsReadReq) (resp *msg.MarkConversationAsReadResp, err error) { conversation, err := m.Conversation.GetConversation(ctx, req.UserID, req.ConversationID) if err != nil { return nil, err @@ -139,49 +137,56 @@ func (m *msgServer) MarkConversationAsRead( if err != nil && errs.Unwrap(err) != redis.Nil { return nil, err } - var seqs []int64 + seqs := generateSeqs(hasReadSeq, req) - log.ZDebug(ctx, "MarkConversationAsRead", "hasReadSeq", hasReadSeq, - "req.HasReadSeq", req.HasReadSeq) - if conversation.ConversationType == constant.SingleChatType { - for i := hasReadSeq + 1; i <= req.HasReadSeq; i++ { - seqs = append(seqs, i) + if len(seqs) > 0 || req.HasReadSeq > hasReadSeq { + err = m.updateReadStatus(ctx, req, conversation, seqs, hasReadSeq) + if err != nil { + return nil, err } + } + return &msg.MarkConversationAsReadResp{}, nil +} - if len(seqs) > 0 { - log.ZDebug(ctx, "MarkConversationAsRead", "seqs", seqs, "conversationID", req.ConversationID) - if err = m.MsgDatabase.MarkSingleChatMsgsAsRead(ctx, req.UserID, req.ConversationID, seqs); err != nil { - return nil, err - } - } - if req.HasReadSeq > hasReadSeq { - err = m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq) - if err != nil { - return nil, err - } - hasReadSeq = req.HasReadSeq - } - if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID, - m.conversationAndGetRecvID(conversation, req.UserID), seqs, hasReadSeq); err != nil { - return nil, err +func generateSeqs(hasReadSeq int64, req *msg.MarkConversationAsReadReq) []int64 { + var seqs []int64 + for _, val := range req.Seqs { + if val > hasReadSeq && !utils2.Contain(val, seqs...) { + seqs = append(seqs, val) } - } else if conversation.ConversationType == constant.SuperGroupChatType || - conversation.ConversationType == constant.NotificationChatType { - if req.HasReadSeq > hasReadSeq { - err = m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq) - if err != nil { - return nil, err - } - hasReadSeq = req.HasReadSeq + } + return seqs +} + +func (m *msgServer) updateReadStatus(ctx context.Context, req *msg.MarkConversationAsReadReq, conversation *conversation.Conversation, seqs []int64, hasReadSeq int64) error { + if conversation.ConversationType == constant.SingleChatType && len(seqs) > 0 { + log.ZDebug(ctx, "MarkConversationAsRead", "seqs", seqs, "conversationID", req.ConversationID) + if err := m.MsgDatabase.MarkSingleChatMsgsAsRead(ctx, req.UserID, req.ConversationID, seqs); err != nil { + return err } - if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID, - req.UserID, seqs, hasReadSeq); err != nil { - return nil, err + } + reqCall := &cbapi.CallbackGroupMsgReadReq{ + SendID: conversation.OwnerUserID, + ReceiveID: req.UserID, + UnreadMsgNum: req.HasReadSeq, + ContentType: int64(conversation.ConversationType), + } + if err := CallbackGroupMsgRead(ctx, reqCall); err != nil { + return err + } + + if req.HasReadSeq > hasReadSeq { + if err := m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq); err != nil { + return err } + } + recvID := m.conversationAndGetRecvID(conversation, req.UserID) + if conversation.ConversationType == constant.SuperGroupChatType || conversation.ConversationType == constant.NotificationChatType { + recvID = req.UserID } - return &msg.MarkConversationAsReadResp{}, nil + return m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID, recvID, seqs, req.HasReadSeq) } func (m *msgServer) sendMarkAsReadNotification( diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go index 63952f2af..b35aa0815 100644 --- a/internal/rpc/msg/callback.go +++ b/internal/rpc/msg/callback.go @@ -16,9 +16,11 @@ package msg import ( "context" + "github.com/OpenIMSDK/protocol/sdkws" + "google.golang.org/protobuf/proto" + "github.com/OpenIMSDK/protocol/constant" pbchat "github.com/OpenIMSDK/protocol/msg" - "github.com/OpenIMSDK/protocol/sdkws" "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/mcontext" @@ -73,14 +75,11 @@ func callbackBeforeSendSingleMsg(ctx context.Context, msg *pbchat.SendMsgReq) er return nil } req := &cbapi.CallbackBeforeSendSingleMsgReq{ - CommonCallbackReq: toCommonCallback(ctx, msg, constant.CallbackBeforeSendSingleMsgCommand), + CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackBeforeSendSingleMsgCommand), RecvID: msg.MsgData.RecvID, } resp := &cbapi.CallbackBeforeSendSingleMsgResp{} if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackBeforeSendSingleMsg); err != nil { - if err == errs.ErrCallbackContinue { - return nil - } return err } return nil @@ -91,14 +90,11 @@ func callbackAfterSendSingleMsg(ctx context.Context, msg *pbchat.SendMsgReq) err return nil } req := &cbapi.CallbackAfterSendSingleMsgReq{ - CommonCallbackReq: toCommonCallback(ctx, msg, constant.CallbackAfterSendSingleMsgCommand), + CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), RecvID: msg.MsgData.RecvID, } resp := &cbapi.CallbackAfterSendSingleMsgResp{} if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackAfterSendSingleMsg); err != nil { - if err == errs.ErrCallbackContinue { - return nil - } return err } return nil @@ -109,14 +105,11 @@ func callbackBeforeSendGroupMsg(ctx context.Context, msg *pbchat.SendMsgReq) err return nil } req := &cbapi.CallbackAfterSendGroupMsgReq{ - CommonCallbackReq: toCommonCallback(ctx, msg, constant.CallbackBeforeSendGroupMsgCommand), + CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackBeforeSendGroupMsgCommand), GroupID: msg.MsgData.GroupID, } resp := &cbapi.CallbackBeforeSendGroupMsgResp{} if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackBeforeSendGroupMsg); err != nil { - if err == errs.ErrCallbackContinue { - return nil - } return err } return nil @@ -127,14 +120,11 @@ func callbackAfterSendGroupMsg(ctx context.Context, msg *pbchat.SendMsgReq) erro return nil } req := &cbapi.CallbackAfterSendGroupMsgReq{ - CommonCallbackReq: toCommonCallback(ctx, msg, constant.CallbackAfterSendGroupMsgCommand), + CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand), GroupID: msg.MsgData.GroupID, } resp := &cbapi.CallbackAfterSendGroupMsgResp{} if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg); err != nil { - if err == errs.ErrCallbackContinue { - return nil - } return err } return nil @@ -145,13 +135,10 @@ func callbackMsgModify(ctx context.Context, msg *pbchat.SendMsgReq) error { return nil } req := &cbapi.CallbackMsgModifyCommandReq{ - CommonCallbackReq: toCommonCallback(ctx, msg, constant.CallbackMsgModifyCommand), + CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackMsgModifyCommand), } resp := &cbapi.CallbackMsgModifyCommandResp{} if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackMsgModify); err != nil { - if err == errs.ErrCallbackContinue { - return nil - } return err } if resp.Content != nil { @@ -176,24 +163,3 @@ func callbackMsgModify(ctx context.Context, msg *pbchat.SendMsgReq) error { log.ZDebug(ctx, "callbackMsgModify", "msg", msg.MsgData) return nil } - -func CallbackAfterRevokeMsg(ctx context.Context, req *pbchat.RevokeMsgReq) error { - if !config.Config.Callback.CallbackAfterRevokeMsg.Enable { - return nil - } - callbackReq := &cbapi.CallbackAfterRevokeMsgReq{ - CallbackCommand: cbapi.CallbackAfterRevokeMsgCommand, - ConversationID: req.ConversationID, - Seq: req.Seq, - UserID: req.UserID, - } - resp := &cbapi.CallbackAfterSendGroupMsgResp{} - if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackAfterSetGroupInfo); err != nil { - if err == errs.ErrCallbackContinue { - return nil - } - return err - } - utils.StructFieldNotNilReplace(req, resp) - return nil -} diff --git a/internal/rpc/user/callback.go b/internal/rpc/user/callback.go index 8f6ceef23..01de2734d 100644 --- a/internal/rpc/user/callback.go +++ b/internal/rpc/user/callback.go @@ -16,11 +16,7 @@ package user import ( "context" - - "github.com/OpenIMSDK/protocol/constant" pbuser "github.com/OpenIMSDK/protocol/user" - "github.com/OpenIMSDK/tools/errs" - "github.com/OpenIMSDK/tools/mcontext" "github.com/OpenIMSDK/tools/utils" cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" @@ -33,17 +29,13 @@ func CallbackBeforeUpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserInf return nil } cbReq := &cbapi.CallbackBeforeUpdateUserInfoReq{ - CallbackCommand: constant.CallbackBeforeUpdateUserInfoCommand, - OperationID: mcontext.GetOperationID(ctx), + CallbackCommand: cbapi.CallbackBeforeUpdateUserInfoCommand, UserID: req.UserInfo.UserID, FaceURL: &req.UserInfo.FaceURL, Nickname: &req.UserInfo.Nickname, } resp := &cbapi.CallbackBeforeUpdateUserInfoResp{} if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeUpdateUserInfo); err != nil { - if err == errs.ErrCallbackContinue { - return nil - } return err } utils.NotNilReplace(&req.UserInfo.FaceURL, resp.FaceURL) @@ -51,3 +43,57 @@ func CallbackBeforeUpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserInf utils.NotNilReplace(&req.UserInfo.Nickname, resp.Nickname) return nil } + +func CallbackAfterUpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserInfoReq) error { + if !config.Config.Callback.CallbackAfterUpdateUserInfo.Enable { + return nil + } + cbReq := &cbapi.CallbackAfterUpdateUserInfoReq{ + CallbackCommand: cbapi.CallbackAfterUpdateUserInfoCommand, + UserID: req.UserInfo.UserID, + FaceURL: req.UserInfo.FaceURL, + Nickname: req.UserInfo.Nickname, + } + resp := &cbapi.CallbackAfterUpdateUserInfoResp{} + if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeUpdateUserInfo); err != nil { + return err + } + return nil +} + +func CallbackBeforeUserRegister(ctx context.Context, req *pbuser.UserRegisterReq) error { + if !config.Config.Callback.CallbackBeforeUserRegister.Enable { + return nil + } + cbReq := &cbapi.CallbackBeforeUserRegisterReq{ + CallbackCommand: cbapi.CallbackBeforeUserRegisterCommand, + Secret: req.Secret, + Users: req.Users, + } + + resp := &cbapi.CallbackBeforeUserRegisterResp{} + if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeUpdateUserInfo); err != nil { + return err + } + if len(resp.Users) != 0 { + req.Users = resp.Users + } + return nil +} + +func CallbackAfterUserRegister(ctx context.Context, req *pbuser.UserRegisterReq) error { + if !config.Config.Callback.CallbackAfterUserRegister.Enable { + return nil + } + cbReq := &cbapi.CallbackAfterUserRegisterReq{ + CallbackCommand: cbapi.CallbackAfterUserRegisterCommand, + Secret: req.Secret, + Users: req.Users, + } + + resp := &cbapi.CallbackBeforeUserRegisterResp{} + if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeUpdateUserInfo); err != nil { + return err + } + return nil +} diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index f2ceb3beb..f4164dbf2 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -139,6 +139,9 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI for _, friendID := range friends { s.friendNotificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, friendID) } + if err := CallbackAfterUpdateUserInfo(ctx, req); err != nil { + return nil, err + } if err := s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil { log.ZError(ctx, "NotificationUserInfoUpdate", err, "userID", req.UserInfo.UserID) } @@ -230,6 +233,9 @@ func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterR if exist { return nil, errs.ErrRegisteredAlready.Wrap("userID registered already") } + if err := CallbackBeforeUserRegister(ctx, req); err != nil { + return nil, err + } now := time.Now() users := make([]*tablerelation.UserModel, 0, len(req.Users)) for _, user := range req.Users { @@ -246,6 +252,10 @@ func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterR if err := s.Create(ctx, users); err != nil { return nil, err } + + if err := CallbackAfterUserRegister(ctx, req); err != nil { + return nil, err + } return resp, nil } diff --git a/pkg/callbackstruct/common.go b/pkg/callbackstruct/common.go index b32d7a0f7..c58b9e415 100644 --- a/pkg/callbackstruct/common.go +++ b/pkg/callbackstruct/common.go @@ -14,8 +14,10 @@ package callbackstruct -import ( - "github.com/OpenIMSDK/tools/errs" +import "github.com/OpenIMSDK/tools/errs" + +const ( + Next = 1 ) type CommonCallbackReq struct { @@ -51,15 +53,15 @@ type CallbackResp interface { } type CommonCallbackResp struct { - ActionCode int `json:"actionCode"` + ActionCode int32 `json:"actionCode"` ErrCode int32 `json:"errCode"` ErrMsg string `json:"errMsg"` ErrDlt string `json:"errDlt"` - NextCode string `json:"nextCode"` + NextCode int32 `json:"nextCode"` } func (c CommonCallbackResp) Parse() error { - if c.ActionCode != errs.NoError || c.ErrCode != errs.NoError { + if c.ActionCode != errs.NoError || c.NextCode == Next { return errs.NewCodeError(int(c.ErrCode), c.ErrMsg).WithDetail(c.ErrDlt) } return nil diff --git a/pkg/callbackstruct/constant.go b/pkg/callbackstruct/constant.go index 3f14c3bc0..b1f045fc5 100644 --- a/pkg/callbackstruct/constant.go +++ b/pkg/callbackstruct/constant.go @@ -14,3 +14,35 @@ const CallbackAfterDeleteFriendCommand = "CallbackAfterDeleteFriendCommand" const CallbackBeforeImportFriendsCommand = "CallbackBeforeImportFriendsCommand" const CallbackAfterImportFriendsCommand = "CallbackAfterImportFriendsCommand" const CallbackAfterRemoveBlackCommand = "CallbackAfterRemoveBlackCommand" + +const ( + CallbackQuitGroupCommand = "callbackQuitGroupCommand" + CallbackKillGroupCommand = "callbackKillGroupCommand" + CallbackDisMissGroupCommand = "callbackDisMissGroupCommand" + CallbackBeforeJoinGroupCommand = "callbackBeforeJoinGroupCommand" + CallbackGroupMsgReadCommand = "callbackGroupMsgReadCommand" + CallbackMsgModifyCommand = "callbackMsgModifyCommand" + CallbackAfterUpdateUserInfoCommand = "callbackAfterUpdateUserInfoCommand" + CallbackBeforeUserRegisterCommand = "callbackBeforeUserRegisterCommand" + CallbackAfterUserRegisterCommand = "callbackAfterUserRegisterCommand" + CallbackTransferGroupOwnerAfter = "callbackTransferGroupOwnerAfter" + CallbackBeforeSetFriendRemark = "callbackBeforeSetFriendRemark" + CallbackAfterSetFriendRemark = "callbackAfterSetFriendRemark" + CallbackSingleMsgRead = "callbackSingleMsgRead" + CallbackBeforeSendSingleMsgCommand = "callbackBeforeSendSingleMsgCommand" + CallbackAfterSendSingleMsgCommand = "callbackAfterSendSingleMsgCommand" + CallbackBeforeSendGroupMsgCommand = "callbackBeforeSendGroupMsgCommand" + CallbackAfterSendGroupMsgCommand = "callbackAfterSendGroupMsgCommand" + CallbackUserOnlineCommand = "callbackUserOnlineCommand" + CallbackUserOfflineCommand = "callbackUserOfflineCommand" + CallbackUserKickOffCommand = "callbackUserKickOffCommand" + CallbackOfflinePushCommand = "callbackOfflinePushCommand" + CallbackOnlinePushCommand = "callbackOnlinePushCommand" + CallbackSuperGroupOnlinePushCommand = "callbackSuperGroupOnlinePushCommand" + CallbackBeforeAddFriendCommand = "callbackBeforeAddFriendCommand" + CallbackBeforeUpdateUserInfoCommand = "callbackBeforeUpdateUserInfoCommand" + CallbackBeforeCreateGroupCommand = "callbackBeforeCreateGroupCommand" + CallbackAfterCreateGroupCommand = "callbackAfterCreateGroupCommand" + CallbackBeforeMemberJoinGroupCommand = "callbackBeforeMemberJoinGroupCommand" + CallbackBeforeSetGroupMemberInfoCommand = "CallbackBeforeSetGroupMemberInfoCommand" +) diff --git a/pkg/callbackstruct/friend.go b/pkg/callbackstruct/friend.go index 7a0cf3b1f..4c88d928a 100644 --- a/pkg/callbackstruct/friend.go +++ b/pkg/callbackstruct/friend.go @@ -24,6 +24,50 @@ type CallbackBeforeAddFriendReq struct { type CallbackBeforeAddFriendResp struct { CommonCallbackResp } + +type CallBackAddFriendReplyBeforeReq struct { + CallbackCommand `json:"callbackCommand"` + FromUserID string `json:"fromUserID" ` + ToUserID string `json:"toUserID"` +} + +type CallBackAddFriendReplyBeforeResp struct { + CommonCallbackResp +} + +type CallbackAfterAddFriendReq struct { + CallbackCommand `json:"callbackCommand"` + FromUserID string `json:"fromUserID" ` + ToUserID string `json:"toUserID"` + ReqMsg string `json:"reqMsg"` +} + +type CallbackAfterAddFriendResp struct { + CommonCallbackResp +} + +type CallbackBeforeSetFriendRemarkReq struct { + CallbackCommand `json:"callbackCommand"` + OwnerUserID string `json:"ownerUserID"` + FriendUserID string `json:"friendUserID"` + Remark string `json:"remark"` +} + +type CallbackBeforeSetFriendRemarkResp struct { + CommonCallbackResp + Remark string `json:"remark"` +} + +type CallbackAfterSetFriendRemarkReq struct { + CallbackCommand `json:"callbackCommand"` + OwnerUserID string `json:"ownerUserID"` + FriendUserID string `json:"friendUserID"` + Remark string `json:"remark"` +} + +type CallbackAfterSetFriendRemarkResp struct { + CommonCallbackResp +} type CallbackAfterAddFriendReq struct { CallbackCommand `json:"callbackCommand"` FromUserID string `json:"fromUserID" ` diff --git a/pkg/callbackstruct/group.go b/pkg/callbackstruct/group.go index b2464451a..65d9b62c6 100644 --- a/pkg/callbackstruct/group.go +++ b/pkg/callbackstruct/group.go @@ -50,9 +50,18 @@ type CallbackBeforeCreateGroupResp struct { ApplyMemberFriend *int32 `json:"applyMemberFriend"` } +type CallbackAfterCreateGroupReq struct { + CallbackCommand `json:"callbackCommand"` + *common.GroupInfo + InitMemberList []*apistruct.GroupAddMemberInfo `json:"initMemberList"` +} + +type CallbackAfterCreateGroupResp struct { + CommonCallbackResp +} + type CallbackBeforeMemberJoinGroupReq struct { CallbackCommand `json:"callbackCommand"` - OperationID string `json:"operationID"` GroupID string `json:"groupID"` UserID string `json:"userID"` Ex string `json:"ex"` @@ -70,7 +79,6 @@ type CallbackBeforeMemberJoinGroupResp struct { type CallbackBeforeSetGroupMemberInfoReq struct { CallbackCommand `json:"callbackCommand"` - OperationID string `json:"operationID"` GroupID string `json:"groupID"` UserID string `json:"userID"` Nickname *string `json:"nickName"` @@ -87,6 +95,126 @@ type CallbackBeforeSetGroupMemberInfoResp struct { RoleLevel *int32 `json:"roleLevel"` } +type CallbackAfterSetGroupMemberInfoReq struct { + CallbackCommand `json:"callbackCommand"` + GroupID string `json:"groupID"` + UserID string `json:"userID"` + Nickname *string `json:"nickName"` + FaceURL *string `json:"faceURL"` + RoleLevel *int32 `json:"roleLevel"` + Ex *string `json:"ex"` +} + +type CallbackAfterSetGroupMemberInfoResp struct { + CommonCallbackResp +} + +type CallbackAfterGroupMemberExitReq struct { + CallbackCommand `json:"callbackCommand"` + GroupID string `json:"groupID"` + UserID string `json:"userID"` + GroupType *int32 `json:"groupType"` + ExitType string `json:"exitType"` +} + +type CallbackAfterGroupMemberExitResp struct { + CommonCallbackResp +} + +type CallbackAfterUngroupReq struct { + CallbackCommand `json:"callbackCommand"` + GroupID string `json:"groupID"` + GroupType *int32 `json:"groupType"` + OwnerID string `json:"ownerID"` + MemberList []string `json:"memberList"` +} + +type CallbackAfterUngroupResp struct { + CommonCallbackResp +} + +type CallbackAfterSetGroupInfoReq struct { + CallbackCommand `json:"callbackCommand"` + GroupID string `json:"groupID"` + GroupType *int32 `json:"groupType"` + UserID string `json:"userID"` + Name string `json:"name"` + Notification string `json:"notification"` + GroupUrl string `json:"groupUrl"` +} + +type CallbackAfterSetGroupInfoResp struct { + CommonCallbackResp +} + +type CallbackAfterRevokeMsgReq struct { + CallbackCommand `json:"callbackCommand"` + GroupID string `json:"groupID"` + GroupType *int32 `json:"groupType"` + UserID string `json:"userID"` + Content string `json:"content"` +} + +type CallbackAfterRevokeMsgResp struct { + CommonCallbackResp +} + +type CallbackQuitGroupReq struct { + CallbackCommand `json:"callbackCommand"` + GroupID string `json:"groupID"` + UserID string `json:"userID"` +} + +type CallbackQuitGroupResp struct { + CommonCallbackResp +} + +type CallbackKillGroupMemberReq struct { + CallbackCommand `json:"callbackCommand"` + GroupID string `json:"groupID"` + KickedUserIDs []string `json:"kickedUserIDs"` + Reason string `json:"reason"` +} + +type CallbackKillGroupMemberResp struct { + CommonCallbackResp +} + +type CallbackDisMissGroupReq struct { + CallbackCommand `json:"callbackCommand"` + GroupID string `json:"groupID"` + OwnerID string `json:"ownerID"` + GroupType string `json:"groupType"` + MembersID []string `json:"membersID"` +} + +type CallbackDisMissGroupResp struct { + CommonCallbackResp +} + +type CallbackJoinGroupReq struct { + CallbackCommand `json:"callbackCommand"` + GroupID string `json:"groupID"` + GroupType string `json:"groupType"` + ApplyID string `json:"applyID"` + ReqMessage string `json:"reqMessage"` +} + +type CallbackJoinGroupResp struct { + CommonCallbackResp +} + +type CallbackTransferGroupOwnerReq struct { + CallbackCommand `json:"callbackCommand"` + GroupID string `json:"groupID"` + OldOwnerUserID string `json:"oldOwnerUserID"` + NewOwnerUserID string `json:"newOwnerUserID"` +} + +type CallbackTransferGroupOwnerResp struct { + CommonCallbackResp +} + type CallbackBeforeInviteUserToGroupReq struct { CallbackCommand `json:"callbackCommand"` OperationID string `json:"operationID"` diff --git a/pkg/callbackstruct/message.go b/pkg/callbackstruct/message.go index f404088e8..3adee618b 100644 --- a/pkg/callbackstruct/message.go +++ b/pkg/callbackstruct/message.go @@ -79,3 +79,46 @@ type CallbackMsgModifyCommandResp struct { AttachedInfo *string `json:"attachedInfo"` Ex *string `json:"ex"` } + +type CallbackSendGroupMsgErrorReq struct { + CommonCallbackReq + GroupID string `json:"groupID"` +} + +type CallbackSendGroupMsgErrorResp struct { + CommonCallbackResp +} + +type CallbackSingleMsgRevokeReq struct { + CallbackCommand `json:"callbackCommand"` + SendID string `json:"sendID"` + ReceiveID string `json:"receiveID"` + Content string `json:"content"` +} + +type CallbackSingleMsgRevokeResp struct { + CommonCallbackResp +} + +type CallbackGroupMsgReadReq struct { + CallbackCommand `json:"callbackCommand"` + SendID string `json:"sendID"` + ReceiveID string `json:"receiveID"` + UnreadMsgNum int64 `json:"unreadMsgNum"` + ContentType int64 `json:"contentType"` +} + +type CallbackGroupMsgReadResp struct { + CommonCallbackResp +} + +type CallbackSingleMsgReadReq struct { + CallbackCommand `json:"callbackCommand"` + SendID string `json:"sendID"` + ReceiveID string `json:"receiveID"` + ContentType int64 `json:"contentType"` +} + +type CallbackSingleMsgReadResp struct { + CommonCallbackResp +} diff --git a/pkg/callbackstruct/user.go b/pkg/callbackstruct/user.go index f01f4ca8a..f35cff554 100644 --- a/pkg/callbackstruct/user.go +++ b/pkg/callbackstruct/user.go @@ -14,9 +14,10 @@ package callbackstruct +import "github.com/OpenIMSDK/protocol/sdkws" + type CallbackBeforeUpdateUserInfoReq struct { CallbackCommand `json:"callbackCommand"` - OperationID string `json:"operationID"` UserID string `json:"userID"` Nickname *string `json:"nickName"` FaceURL *string `json:"faceURL"` @@ -28,3 +29,35 @@ type CallbackBeforeUpdateUserInfoResp struct { FaceURL *string `json:"faceURL"` Ex *string `json:"ex"` } + +type CallbackAfterUpdateUserInfoReq struct { + CallbackCommand `json:"callbackCommand"` + UserID string `json:"userID"` + Nickname string `json:"nickName"` + FaceURL string `json:"faceURL"` + Ex string `json:"ex"` +} +type CallbackAfterUpdateUserInfoResp struct { + CommonCallbackResp +} + +type CallbackBeforeUserRegisterReq struct { + CallbackCommand `json:"callbackCommand"` + Secret string `json:"secret"` + Users []*sdkws.UserInfo `json:"users"` +} + +type CallbackBeforeUserRegisterResp struct { + CommonCallbackResp + Users []*sdkws.UserInfo `json:"users"` +} + +type CallbackAfterUserRegisterReq struct { + CallbackCommand `json:"callbackCommand"` + Secret string `json:"secret"` + Users []*sdkws.UserInfo `json:"users"` +} + +type CallbackAfterUserRegisterResp struct { + CommonCallbackResp +} diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index f748aadce..ba350bb1b 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -195,6 +195,7 @@ type configStruct struct { WebsocketMaxConnNum int `yaml:"websocketMaxConnNum"` WebsocketMaxMsgLen int `yaml:"websocketMaxMsgLen"` WebsocketTimeout int `yaml:"websocketTimeout"` + WebsocketWriteBufferSize int `yaml:"websocketWriteBufferSize"` } `yaml:"longConnSvr"` Push struct { @@ -252,6 +253,8 @@ type configStruct struct { CallbackBeforeSendGroupMsg CallBackConfig `yaml:"beforeSendGroupMsg"` CallbackAfterSendGroupMsg CallBackConfig `yaml:"afterSendGroupMsg"` CallbackMsgModify CallBackConfig `yaml:"msgModify"` + CallbackSingleMsgRead CallBackConfig `yaml:"singleMsgRead"` + CallbackGroupMsgRead CallBackConfig `yaml:"groupMsgRead"` CallbackUserOnline CallBackConfig `yaml:"userOnline"` CallbackUserOffline CallBackConfig `yaml:"userOffline"` CallbackUserKickOff CallBackConfig `yaml:"userKickOff"` @@ -259,10 +262,21 @@ type configStruct struct { CallbackOnlinePush CallBackConfig `yaml:"onlinePush"` CallbackBeforeSuperGroupOnlinePush CallBackConfig `yaml:"superGroupOnlinePush"` CallbackBeforeAddFriend CallBackConfig `yaml:"beforeAddFriend"` + CallbackBeforeSetFriendRemark CallBackConfig `yaml:"callbackBeforeSetFriendRemark"` + CallbackAfterSetFriendRemark CallBackConfig `yaml:"callbackAfterSetFriendRemark"` CallbackBeforeUpdateUserInfo CallBackConfig `yaml:"beforeUpdateUserInfo"` + CallbackBeforeUserRegister CallBackConfig `yaml:"beforeUserRegister"` + CallbackAfterUpdateUserInfo CallBackConfig `yaml:"updateUserInfo"` + CallbackAfterUserRegister CallBackConfig `yaml:"afterUserRegister"` CallbackBeforeCreateGroup CallBackConfig `yaml:"beforeCreateGroup"` + CallbackAfterCreateGroup CallBackConfig `yaml:"afterCreateGroup"` CallbackBeforeMemberJoinGroup CallBackConfig `yaml:"beforeMemberJoinGroup"` CallbackBeforeSetGroupMemberInfo CallBackConfig `yaml:"beforeSetGroupMemberInfo"` + CallbackQuitGroup CallBackConfig `yaml:"quitGroup"` + CallbackKillGroupMember CallBackConfig `yaml:"killGroupMember"` + CallbackDismissGroup CallBackConfig `yaml:"dismissGroup"` + CallbackBeforeJoinGroup CallBackConfig `yaml:"joinGroup"` + CallbackTransferGroupOwnerAfter CallBackConfig `yaml:"transferGroupOwner"` CallbackBeforeInviteUserToGroup CallBackConfig `yaml:"beforeInviteUserToGroup"` CallbackAfterJoinGroup CallBackConfig `yaml:"joinGroupAfter"` CallbackAfterSetGroupInfo CallBackConfig `yaml:"setGroupInfoAfter"` diff --git a/pkg/common/convert/user.go b/pkg/common/convert/user.go index abb3a2144..4ca1899be 100644 --- a/pkg/common/convert/user.go +++ b/pkg/common/convert/user.go @@ -15,8 +15,6 @@ package convert import ( - "time" - "github.com/OpenIMSDK/protocol/sdkws" relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" @@ -43,7 +41,6 @@ func UserPb2DB(user *sdkws.UserInfo) *relationtb.UserModel { userDB.Nickname = user.Nickname userDB.FaceURL = user.FaceURL userDB.Ex = user.Ex - userDB.CreateTime = time.UnixMilli(user.CreateTime) userDB.AppMangerLevel = user.AppMangerLevel userDB.GlobalRecvMsgOpt = user.GlobalRecvMsgOpt return &userDB diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index 6d0ee8c67..282d1d1c1 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -645,19 +645,35 @@ func (c *msgCache) PipeDeleteMessages(ctx context.Context, conversationID string } func (c *msgCache) CleanUpOneConversationAllMsg(ctx context.Context, conversationID string) error { - vals, err := c.rdb.Keys(ctx, c.allMessageCacheKey(conversationID)).Result() - if errors.Is(err, redis.Nil) { - return nil - } - if err != nil { - return errs.Wrap(err) - } - for _, v := range vals { - if err := c.rdb.Del(ctx, v).Err(); err != nil { + var ( + cursor uint64 + keys []string + err error + + key = c.allMessageCacheKey(conversationID) + ) + + for { + // scan up to 10000 at a time, the count (10000) param refers to the number of scans on redis server. + // if the count is too small, needs to be run scan on redis frequently. + var limit int64 = 10000 + keys, cursor, err = c.rdb.Scan(ctx, cursor, key, limit).Result() + if err != nil { return errs.Wrap(err) } + + for _, key := range keys { + err := c.rdb.Del(ctx, key).Err() + if err != nil { + return errs.Wrap(err) + } + } + + // scan end + if cursor == 0 { + return nil + } } - return nil } func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64) error { diff --git a/pkg/common/db/cache/msg_test.go b/pkg/common/db/cache/msg_test.go index 3fddf5965..a5be018ed 100644 --- a/pkg/common/db/cache/msg_test.go +++ b/pkg/common/db/cache/msg_test.go @@ -385,3 +385,50 @@ func testParallelDeleteMessagesMix(t *testing.T, cid string, seqs []int64, input assert.EqualValues(t, 1, val) // exists } } + +func TestCleanUpOneConversationAllMsg(t *testing.T) { + rdb := redis.NewClient(&redis.Options{}) + defer rdb.Close() + + cacher := msgCache{rdb: rdb} + count := 1000 + prefix := fmt.Sprintf("%v", rand.Int63()) + + ids := []string{} + for i := 0; i < count; i++ { + id := fmt.Sprintf("%v-cid-%v", prefix, rand.Int63()) + ids = append(ids, id) + + key := cacher.allMessageCacheKey(id) + rdb.Set(context.Background(), key, "openim", 0) + } + + // delete 100 keys with scan. + for i := 0; i < 100; i++ { + pickedKey := ids[i] + err := cacher.CleanUpOneConversationAllMsg(context.Background(), pickedKey) + assert.Nil(t, err) + + ls, err := rdb.Keys(context.Background(), pickedKey).Result() + assert.Nil(t, err) + assert.Equal(t, 0, len(ls)) + + rcode, err := rdb.Exists(context.Background(), pickedKey).Result() + assert.Nil(t, err) + assert.EqualValues(t, 0, rcode) // non-exists + } + + sid := fmt.Sprintf("%v-cid-*", prefix) + ls, err := rdb.Keys(context.Background(), cacher.allMessageCacheKey(sid)).Result() + assert.Nil(t, err) + assert.Equal(t, count-100, len(ls)) + + // delete fuzzy matching keys. + err = cacher.CleanUpOneConversationAllMsg(context.Background(), sid) + assert.Nil(t, err) + + // don't contains keys matched `{prefix}-cid-{random}` on redis + ls, err = rdb.Keys(context.Background(), cacher.allMessageCacheKey(sid)).Result() + assert.Nil(t, err) + assert.Equal(t, 0, len(ls)) +} diff --git a/pkg/common/http/http_client.go b/pkg/common/http/http_client.go index e0cedf357..f0fde3099 100644 --- a/pkg/common/http/http_client.go +++ b/pkg/common/http/http_client.go @@ -20,7 +20,6 @@ import ( "encoding/json" "io" "net/http" - urllib "net/url" "time" "github.com/OpenIMSDK/protocol/constant" @@ -107,17 +106,18 @@ func PostReturn(ctx context.Context, url string, header map[string]string, input } func callBackPostReturn(ctx context.Context, url, command string, input interface{}, output callbackstruct.CallbackResp, callbackConfig config.CallBackConfig) error { - defer log.ZDebug(ctx, "callback", "url", url, "command", command, "input", input, "callbackConfig", callbackConfig) - - v := urllib.Values{} - v.Set(constant.CallbackCommand, command) - url = url + "?" + v.Encode() + defer log.ZDebug(ctx, "callback", "url", url, "command", command, "input", input, "output", output, "callbackConfig", callbackConfig) + // + //v := urllib.Values{} + //v.Set(constant.CallbackCommand, command) + //url = url + "/" + v.Encode() + url = url + "/" + command b, err := Post(ctx, url, nil, input, callbackConfig.CallbackTimeOut) if err != nil { if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue { log.ZWarn(ctx, "callback failed but continue", err, "url", url) - return errs.ErrCallbackContinue + return nil } return errs.ErrNetwork.Wrap(err.Error()) } @@ -125,7 +125,7 @@ func callBackPostReturn(ctx context.Context, url, command string, input interfac if err = json.Unmarshal(b, output); err != nil { if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue { log.ZWarn(ctx, "callback failed but continue", err, "url", url) - return errs.ErrCallbackContinue + return nil } return errs.ErrData.Wrap(err.Error()) } diff --git a/scripts/install/environment.sh b/scripts/install/environment.sh index 777f88e18..aa4141a7d 100755 --- a/scripts/install/environment.sh +++ b/scripts/install/environment.sh @@ -282,7 +282,7 @@ readonly ALERTMANAGER_SMTP_REQUIRE_TLS=${ALERTMANAGER_SMTP_REQUIRE_TLS:-"false"} # SMTP HELO/EHLO标识符 readonly ALERTMANAGER_SMTP_HELLO=${ALERTMANAGER_SMTP_HELLO:-"xxx监控告警"} # 邮箱接收人 -readonly ALERTMANAGER_EMAIL_TO=${ALERTMANAGER_EMAIL_TO:-"{EMAIL_TO:-'alert@example.com'}"} +readonly ALERTMANAGER_EMAIL_TO=${ALERTMANAGER_EMAIL_TO:-"alert@example.com"} # 邮箱主题 readonly ALERTMANAGER_EMAIL_SUBJECT=${ALERTMANAGER_EMAIL_SUBJECT:-"{EMAIL_SUBJECT:-'[Alert] Notification'}"} # 是否发送已解决的告警 @@ -376,6 +376,10 @@ def "FRIEND_VERIFY" "false" # 朋友验证 def "IOS_PUSH_SOUND" "xxx" # IOS推送声音 def "IOS_BADGE_COUNT" "true" # IOS徽章计数 def "IOS_PRODUCTION" "false" # IOS生产 +# callback 配置 +def "CALLBACK_ENABLE" "true" # 是否开启 Callback +def "CALLBACK_TIMEOUT" "5" # 最长超时时间 +def "CALLBACK_FAILED_CONTINUE" "true" # 失败后是否继续 ###################### Prometheus 配置信息 ###################### # 是否启用 Prometheus diff --git a/scripts/install/test.sh b/scripts/install/test.sh index 7bcea3b82..eb3f6a200 100755 --- a/scripts/install/test.sh +++ b/scripts/install/test.sh @@ -1168,6 +1168,40 @@ EOF openim::test::check_error "$response" } +# Searches for messages. +openim::test::search_msg() { + local sendID="${1}" + local recvID="${2}" + local msgType="${3}" + local sendTime="${4}" + local sessionType="${5}" + local pageNumber="${6}" + local showNumber="${7}" + + # Construct the request body + local request_body=$(cat <