diff --git a/.github/workflows/publish-docker-image.yml b/.github/workflows/publish-docker-image.yml index 998a11cf3..4cd3316dd 100644 --- a/.github/workflows/publish-docker-image.yml +++ b/.github/workflows/publish-docker-image.yml @@ -4,42 +4,80 @@ on: push: branches: - release-* - # tags: - # - 'v*' - release: types: [published] - workflow_dispatch: inputs: tag: description: "Tag version to be used for Docker image" required: true - default: "v3.8.0" + default: "v3.8.3" + +env: + GO_VERSION: "1.22" + IMAGE_NAME: "openim-server" + # IMAGE_NAME: ${{ github.event.repository.name }} + DOCKER_BUILDKIT: 1 jobs: - build-and-test: + publish-docker-images: runs-on: ubuntu-latest + if: ${{ !(github.event_name == 'pull_request' && github.event.pull_request.merged == false) }} steps: - - uses: actions/checkout@v4 + - name: Checkout main repository + uses: actions/checkout@v4 with: path: main-repo - # - name: Set up QEMU - # uses: docker/setup-qemu-action@v3.3.0 + - name: Set up QEMU + uses: docker/setup-qemu-action@v3.3.0 - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3.8.0 + id: buildx + uses: docker/setup-buildx-action@v3 + with: + driver-opts: network=host - - name: Build Docker image - id: build - uses: docker/build-push-action@v5 + - name: Extract metadata for Docker + id: meta + uses: docker/metadata-action@v5.6.0 with: - context: ./main-repo - load: true - tags: "openim/openim-server:local" - cache-from: type=gha,scope=build - cache-to: type=gha,mode=max,scope=build + images: | + ${{ secrets.DOCKER_USERNAME }}/${{ env.IMAGE_NAME }} + ghcr.io/${{ github.repository_owner }}/${{ env.IMAGE_NAME }} + registry.cn-hangzhou.aliyuncs.com/openimsdk/${{ env.IMAGE_NAME }} + tags: | + type=ref,event=tag + type=schedule + type=ref,event=branch + type=ref,event=pr + type=semver,pattern={{version}} + type=semver,pattern=v{{version}} + type=semver,pattern={{major}}.{{minor}} + type=semver,pattern={{major}} + type=sha + + - name: Install skopeo + run: | + sudo apt-get update && sudo apt-get install -y skopeo + + - name: Build multi-arch images as OCI + run: | + mkdir -p /tmp/oci-image /tmp/docker-cache + + # Build multi-architecture image and save in OCI format + docker buildx build \ + --platform linux/amd64,linux/arm64 \ + --output type=oci,dest=/tmp/oci-image/multi-arch.tar \ + --cache-to type=local,dest=/tmp/docker-cache \ + --cache-from type=gha \ + ./main-repo + + # Use skopeo to convert the amd64 image from OCI format to Docker format and load it + skopeo copy --override-arch amd64 oci-archive:/tmp/oci-image/multi-arch.tar docker-daemon:${{ secrets.DOCKER_USERNAME }}/${{ env.IMAGE_NAME }}:local + + # check image + docker image ls | grep openim - name: Checkout compose repository uses: actions/checkout@v4 @@ -52,11 +90,11 @@ jobs: run: | IP=$(hostname -I | awk '{print $1}') echo "The IP Address is: $IP" - echo "::set-output name=ip::$IP" + echo "ip=$IP" >> $GITHUB_OUTPUT - name: Update .env to use the local image run: | - sed -i 's|OPENIM_SERVER_IMAGE=.*|OPENIM_SERVER_IMAGE=openim/openim-server:local|' ${{ github.workspace }}/compose-repo/.env + sed -i 's|OPENIM_SERVER_IMAGE=.*|OPENIM_SERVER_IMAGE=${{ secrets.DOCKER_USERNAME }}/${{ env.IMAGE_NAME }}:local|' ${{ github.workspace }}/compose-repo/.env sed -i 's|MINIO_EXTERNAL_ADDRESS=.*|MINIO_EXTERNAL_ADDRESS=http://${{ steps.get-ip.outputs.ip }}:10005|' ${{ github.workspace }}/compose-repo/.env - name: Start services using Docker Compose @@ -66,23 +104,34 @@ jobs: docker compose ps - - name: Extract metadata for Docker (tags, labels) - id: meta - uses: docker/metadata-action@v5.6.0 - with: - images: | - openim/openim-server - ghcr.io/openimsdk/openim-server - registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-server - tags: | - type=ref,event=tag - type=schedule - type=ref,event=branch - # type=semver,pattern={{version}} - type=semver,pattern=v{{version}} - type=semver,pattern=release-{{raw}} - type=sha - type=raw,value=${{ github.event.inputs.tag }} + # - name: Check openim-server health + # run: | + # timeout=300 + # interval=30 + # elapsed=0 + # while [[ $elapsed -le $timeout ]]; do + # if ! docker exec openim-server mage check; then + # echo "openim-server is not ready, waiting..." + # sleep $interval + # elapsed=$(($elapsed + $interval)) + # else + # echo "Health check successful" + # exit 0 + # fi + # done + # echo "Health check failed after 5 minutes" + # exit 1 + + # - name: Check openim-chat health + # if: success() + # run: | + # if ! docker exec openim-chat mage check; then + # echo "openim-chat check failed" + # exit 1 + # else + # echo "Health check successful" + # exit 0 + # fi - name: Log in to Docker Hub uses: docker/login-action@v3.3.0 @@ -104,22 +153,27 @@ jobs: username: ${{ secrets.ALIREGISTRY_USERNAME }} password: ${{ secrets.ALIREGISTRY_TOKEN }} - - name: Push Docker images - uses: docker/build-push-action@v5 - with: - context: ./main-repo - push: true - platforms: linux/amd64,linux/arm64 - tags: ${{ steps.meta.outputs.tags }} - labels: ${{ steps.meta.outputs.labels }} - cache-from: type=gha,scope=build - cache-to: type=gha,mode=max,scope=build + - name: Push multi-architecture images + if: success() + run: | + docker buildx build \ + --platform linux/amd64,linux/arm64 \ + $(echo "${{ steps.meta.outputs.tags }}" | sed 's/,/ --tag /g' | sed 's/^/--tag /') \ + --cache-from type=local,src=/tmp/docker-cache \ + --push \ + ./main-repo - name: Verify multi-platform support run: | - images=("openim/openim-server" "ghcr.io/openimsdk/openim-server" "registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-server") + images=( + "${{ secrets.DOCKER_USERNAME }}/${{ env.IMAGE_NAME }}" + "ghcr.io/${{ github.repository_owner }}/${{ env.IMAGE_NAME }}" + "registry.cn-hangzhou.aliyuncs.com/openimsdk/${{ env.IMAGE_NAME }}" + ) + for image in "${images[@]}"; do - for tag in $(echo "${{ steps.meta.outputs.tags }}" | tr ',' '\n'); do + for tag in $(echo "${{ steps.meta.outputs.tags }}" | tr ',' '\n' | cut -d':' -f2); do + echo "Verifying multi-arch support for $image:$tag" manifest=$(docker manifest inspect "$image:$tag" || echo "error") if [[ "$manifest" == "error" ]]; then echo "Manifest not found for $image:$tag" @@ -135,5 +189,6 @@ jobs: echo "Multi-platform support check failed for $image:$tag - missing arm64" exit 1 fi + echo "✅ $image:$tag supports both amd64 and arm64 architectures" done done diff --git a/config/webhooks.yml b/config/webhooks.yml index 9fd3eb339..c1645e7c8 100644 --- a/config/webhooks.yml +++ b/config/webhooks.yml @@ -41,6 +41,9 @@ afterSendGroupMsg: attentionIds: [] # See beforeSendSingleMsg comment. deniedTypes: [] +afterMsgSaveDB: + enable: false + timeout: 5 afterUserOnline: enable: false timeout: 5 diff --git a/go.mod b/go.mod index 134b0c9fc..de5c4f92f 100644 --- a/go.mod +++ b/go.mod @@ -12,8 +12,8 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.73-alpha.14 - github.com/openimsdk/tools v0.0.50-alpha.104 + github.com/openimsdk/protocol v0.0.73-alpha.17 + github.com/openimsdk/tools v0.0.50-alpha.105 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.10.0 diff --git a/go.sum b/go.sum index 24f913f26..ea874f8bc 100644 --- a/go.sum +++ b/go.sum @@ -349,10 +349,10 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.15-alpha.11 h1:PQudYDRESYeYlUYrrLLJhYIlUPO5x7FAx+o5El9U/Bw= github.com/openimsdk/gomake v0.0.15-alpha.11/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.73-alpha.14 h1:lv9wNiPRm6G7q74TfpMobKrSfeTaBlZ+Ps3O6UFPmaE= -github.com/openimsdk/protocol v0.0.73-alpha.14/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= -github.com/openimsdk/tools v0.0.50-alpha.104 h1:fmDMqMC+SasN8noGKCT++0BQDqEF2O4ek9bLkiLMeHw= -github.com/openimsdk/tools v0.0.50-alpha.104/go.mod h1:x9i/e+WJFW4tocy6RNJQ9NofQiP3KJ1Y576/06TqOG4= +github.com/openimsdk/protocol v0.0.73-alpha.17 h1:ddo0QMns1GVwAmrPIPlAQ7uKmThAYLnOt+CIOgLsJyE= +github.com/openimsdk/protocol v0.0.73-alpha.17/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= +github.com/openimsdk/tools v0.0.50-alpha.105 h1:axuCvKXhxY2RGLhpMMFNgBtE0B65T2Sr1JDW3UD9nBs= +github.com/openimsdk/tools v0.0.50-alpha.105/go.mod h1:x9i/e+WJFW4tocy6RNJQ9NofQiP3KJ1Y576/06TqOG4= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/api/conversation.go b/internal/api/conversation.go index 5a191c0ec..78affee44 100644 --- a/internal/api/conversation.go +++ b/internal/api/conversation.go @@ -76,3 +76,7 @@ func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) { func (o *ConversationApi) UpdateConversationsByUser(c *gin.Context) { a2r.Call(c, conversation.ConversationClient.UpdateConversationsByUser, o.Client) } + +func (o *ConversationApi) DeleteConversations(c *gin.Context) { + a2r.Call(c, conversation.ConversationClient.DeleteConversations, o.Client) +} diff --git a/internal/api/router.go b/internal/api/router.go index 7e1192f21..b3bad136e 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -289,6 +289,7 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation) conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs) conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs) + conversationGroup.POST("/delete_conversations", c.DeleteConversations) } { diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index bdb62aece..7b9f4bc0e 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -70,6 +70,7 @@ type Client struct { UserID string `json:"userID"` IsBackground bool `json:"isBackground"` SDKType string `json:"sdkType"` + SDKVersion string `json:"sdkVersion"` Encoder Encoder ctx *UserConnContext longConnServer LongConnServer @@ -97,6 +98,7 @@ func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer c.closedErr = nil c.token = ctx.GetToken() c.SDKType = ctx.GetSDKType() + c.SDKVersion = ctx.GetSDKVersion() c.hbCtx, c.hbCancel = context.WithCancel(c.ctx) c.subLock = new(sync.Mutex) if c.subUserIDs != nil { diff --git a/internal/msggateway/constant.go b/internal/msggateway/constant.go index 1e7ab3bb7..3959e1138 100644 --- a/internal/msggateway/constant.go +++ b/internal/msggateway/constant.go @@ -28,6 +28,7 @@ const ( BackgroundStatus = "isBackground" SendResponse = "isMsgResp" SDKType = "sdkType" + SDKVersion = "sdkVersion" ) const ( diff --git a/internal/msggateway/context.go b/internal/msggateway/context.go index d73a96df4..37b5a7cdc 100644 --- a/internal/msggateway/context.go +++ b/internal/msggateway/context.go @@ -15,12 +15,13 @@ package msggateway import ( - "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "net/http" "net/url" "strconv" "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" + "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/utils/encrypt" "github.com/openimsdk/tools/utils/stringutil" @@ -140,6 +141,10 @@ func (c *UserConnContext) GetToken() string { return c.Req.URL.Query().Get(Token) } +func (c *UserConnContext) GetSDKVersion() string { + return c.Req.URL.Query().Get(SDKVersion) +} + func (c *UserConnContext) GetCompression() bool { compression, exists := c.Query(Compression) if exists && compression == GzipCompressionProtocol { diff --git a/internal/msggateway/ws_server.go b/internal/msggateway/ws_server.go index bc7a2fa5f..d490cc8b9 100644 --- a/internal/msggateway/ws_server.go +++ b/internal/msggateway/ws_server.go @@ -13,6 +13,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/rpccache" pbAuth "github.com/openimsdk/protocol/auth" + "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/mcontext" "github.com/go-playground/validator/v10" @@ -64,6 +65,8 @@ type WsServer struct { webhookClient *webhook.Client userClient *rpcli.UserClient authClient *rpcli.AuthClient + + ready atomic.Bool } type kickHandler struct { @@ -93,6 +96,8 @@ func (ws *WsServer) SetDiscoveryRegistry(ctx context.Context, disCov discovery.C ws.authClient = rpcli.NewAuthClient(authConn) ws.MessageHandler = NewGrpcHandler(ws.validate, rpcli.NewMsgClient(msgConn), rpcli.NewPushMsgServiceClient(pushConn)) ws.disCov = disCov + + ws.ready.Store(true) return nil } @@ -254,6 +259,10 @@ func (ws *WsServer) registerClient(client *Client) { oldClients []*Client ) oldClients, userOK, clientOK = ws.clients.Get(client.UserID, client.PlatformID) + + log.ZInfo(client.ctx, "registerClient", "userID", client.UserID, "platformID", client.PlatformID, + "sdkVersion", client.SDKVersion) + if !userOK { ws.clients.Set(client.UserID, client) log.ZDebug(client.ctx, "user not exist", "userID", client.UserID, "platformID", client.PlatformID) @@ -453,6 +462,11 @@ func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) { // Create a new connection context connContext := newContext(w, r) + if !ws.ready.Load() { + httpError(connContext, errs.New("ws server not ready")) + return + } + // Check if the current number of online user connections exceeds the maximum limit if ws.onlineUserConnNum.Load() >= ws.wsMaxConnNum { // If it exceeds the maximum connection number, return an error via HTTP and stop processing @@ -469,6 +483,11 @@ func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) { return } + if ws.authClient == nil { + httpError(connContext, errs.New("auth client is not initialized")) + return + } + // Call the authentication client to parse the Token obtained from the context resp, err := ws.authClient.ParseToken(connContext, connContext.GetToken()) if err != nil { diff --git a/internal/msgtransfer/callback.go b/internal/msgtransfer/callback.go index f0d439779..575efb4ae 100644 --- a/internal/msgtransfer/callback.go +++ b/internal/msgtransfer/callback.go @@ -51,37 +51,24 @@ func GetContent(msg *sdkws.MsgData) string { } } -func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) { - if msg.ContentType == constant.Typing { - return - } - +func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterMsgSaveDB(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) { if !filterAfterMsg(msg, after) { return } - cbReq := &cbapi.CallbackAfterSendSingleMsgReq{ - CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), - RecvID: msg.RecvID, - } - mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg)) -} - -func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) { - if msg.ContentType == constant.Typing { - return - } - - if !filterAfterMsg(msg, after) { - return + cbReq := &cbapi.CallbackAfterMsgSaveDBReq{ + CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterMsgSaveDBCommand), } - cbReq := &cbapi.CallbackAfterSendGroupMsgReq{ - CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand), - GroupID: msg.GroupID, + switch msg.SessionType { + case constant.SingleChatType, constant.NotificationChatType: + cbReq.RecvID = msg.RecvID + case constant.ReadGroupChatType: + cbReq.GroupID = msg.GroupID + default: } - mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg)) + mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterMsgSaveDBResp{}, after, buildKeyMsgDataQuery(msg)) } func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string { diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 8b212774a..8af585ce3 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "errors" + "github.com/openimsdk/tools/mq" "sync" diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 8611af7ea..147bd37b0 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -15,7 +15,6 @@ package msgtransfer import ( - "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/mq" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" @@ -57,7 +56,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(val mq.Message) log.ZDebug(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.String()) err = mc.msgTransferDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq) if err != nil { - log.ZError(ctx, "single data insert to mongo err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID) + log.ZError(ctx, "batch data insert to mongo err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID) prommetrics.MsgInsertMongoFailedCounter.Inc() } else { prommetrics.MsgInsertMongoSuccessCounter.Inc() @@ -65,12 +64,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(val mq.Message) } for _, msgData := range msgFromMQ.MsgData { - switch msgData.SessionType { - case constant.SingleChatType: - mc.webhookAfterSendSingleMsg(ctx, &mc.config.WebhooksConfig.AfterSendSingleMsg, msgData) - case constant.ReadGroupChatType: - mc.webhookAfterSendGroupMsg(ctx, &mc.config.WebhooksConfig.AfterSendGroupMsg, msgData) - } + mc.webhookAfterMsgSaveDB(ctx, &mc.config.WebhooksConfig.AfterMsgSaveDB, msgData) } //var seqs []int64 diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index c9bfd3c56..562a76d82 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -37,6 +37,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/protocol/constant" pbconversation "github.com/openimsdk/protocol/conversation" + "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/errs" @@ -795,7 +796,7 @@ func (c *conversationServer) ClearUserConversationMsg(ctx context.Context, req * } latestMsgDestructTime := time.UnixMilli(req.Timestamp) for i, conversation := range conversations { - if conversation.IsMsgDestruct == false || conversation.MsgDestructTime == 0 { + if !conversation.IsMsgDestruct || conversation.MsgDestructTime == 0 { continue } seq, err := c.msgClient.GetLastMessageSeqByTime(ctx, conversation.ConversationID, req.Timestamp-(conversation.MsgDestructTime*1000)) @@ -835,3 +836,53 @@ func (c *conversationServer) setConversationMinSeqAndLatestMsgDestructTime(ctx c c.conversationNotificationSender.ConversationChangeNotification(ctx, ownerUserID, []string{conversationID}) return nil } + +func (c *conversationServer) DeleteConversations(ctx context.Context, req *pbconversation.DeleteConversationsReq) (resp *pbconversation.DeleteConversationsResp, err error) { + if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil { + return nil, err + } + if req.NeedDeleteTime == 0 && len(req.ConversationIDs) == 0 { + return nil, errs.ErrArgs.WrapMsg("need_delete_time or conversationIDs need be set") + } + + if req.NeedDeleteTime != 0 && len(req.ConversationIDs) != 0 { + return nil, errs.ErrArgs.WrapMsg("need_delete_time and conversationIDs cannot both be set") + } + + var needDeleteConversationIDs []string + + if len(req.ConversationIDs) == 0 { + deleteTimeThreshold := time.Now().AddDate(0, 0, -int(req.NeedDeleteTime)).UnixMilli() + conversationIDs, err := c.conversationDatabase.GetConversationIDs(ctx, req.OwnerUserID) + if err != nil { + return nil, err + } + latestMsgs, err := c.msgClient.GetLastMessage(ctx, &msg.GetLastMessageReq{ + UserID: req.OwnerUserID, + ConversationIDs: conversationIDs, + }) + if err != nil { + return nil, err + } + + for conversationID, msg := range latestMsgs.Msgs { + if msg.SendTime < deleteTimeThreshold { + needDeleteConversationIDs = append(needDeleteConversationIDs, conversationID) + } + } + + if len(needDeleteConversationIDs) == 0 { + return &pbconversation.DeleteConversationsResp{}, nil + } + } else { + needDeleteConversationIDs = req.ConversationIDs + } + + if err := c.conversationDatabase.DeleteUsersConversations(ctx, req.OwnerUserID, needDeleteConversationIDs); err != nil { + return nil, err + } + + // c.conversationNotificationSender.ConversationDeleteNotification(ctx, req.OwnerUserID, needDeleteConversationIDs) + + return &pbconversation.DeleteConversationsResp{}, nil +} diff --git a/internal/rpc/conversation/notification.go b/internal/rpc/conversation/notification.go index 6e865ba42..aa7b7d227 100644 --- a/internal/rpc/conversation/notification.go +++ b/internal/rpc/conversation/notification.go @@ -73,3 +73,12 @@ func (c *ConversationNotificationSender) ConversationUnreadChangeNotification( c.Notification(ctx, userID, userID, constant.ConversationUnreadNotification, tips) } + +func (c *ConversationNotificationSender) ConversationDeleteNotification(ctx context.Context, userID string, conversationIDs []string) { + tips := &sdkws.ConversationDeleteTips{ + UserID: userID, + ConversationIDs: conversationIDs, + } + + c.Notification(ctx, userID, userID, constant.ConversationDeleteNotification, tips) +} diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go index 2c00efd43..49fb04477 100644 --- a/internal/rpc/msg/callback.go +++ b/internal/rpc/msg/callback.go @@ -16,8 +16,10 @@ package msg import ( "context" + "encoding/base64" "encoding/json" + "github.com/openimsdk/open-im-server/v3/pkg/apistruct" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/tools/errs" @@ -28,6 +30,7 @@ import ( "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/datautil" + "github.com/openimsdk/tools/utils/stringutil" "google.golang.org/protobuf/proto" ) @@ -87,19 +90,19 @@ func (m *msgServer) webhookBeforeSendSingleMsg(ctx context.Context, before *conf } // Move to msgtransfer -// func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) { -// if msg.MsgData.ContentType == constant.Typing { -// return -// } -// if !filterAfterMsg(msg, after) { -// return -// } -// cbReq := &cbapi.CallbackAfterSendSingleMsgReq{ -// CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), -// RecvID: msg.MsgData.RecvID, -// } -// m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) -// } +func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) { + if msg.MsgData.ContentType == constant.Typing { + return + } + if !filterAfterMsg(msg, after) { + return + } + cbReq := &cbapi.CallbackAfterSendSingleMsgReq{ + CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), + RecvID: msg.MsgData.RecvID, + } + m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) +} func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error { return webhook.WithCondition(ctx, before, func(ctx context.Context) error { @@ -121,21 +124,20 @@ func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *confi }) } -// Move to msgtransfer -// func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) { -// if msg.MsgData.ContentType == constant.Typing { -// return -// } -// if !filterAfterMsg(msg, after) { -// return -// } -// cbReq := &cbapi.CallbackAfterSendGroupMsgReq{ -// CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand), -// GroupID: msg.MsgData.GroupID, -// } - -// m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) -// } +func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *pbchat.SendMsgReq) { + if msg.MsgData.ContentType == constant.Typing { + return + } + if !filterAfterMsg(msg, after) { + return + } + cbReq := &cbapi.CallbackAfterSendGroupMsgReq{ + CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand), + GroupID: msg.MsgData.GroupID, + } + + m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) +} func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq, beforeMsgData **sdkws.MsgData) error { return webhook.WithCondition(ctx, before, func(ctx context.Context) error { @@ -204,14 +206,14 @@ func (m *msgServer) webhookAfterRevokeMsg(ctx context.Context, after *config.Aft m.webhookClient.AsyncPost(ctx, callbackReq.GetCallbackCommand(), callbackReq, &cbapi.CallbackAfterRevokeMsgResp{}, after) } -// func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string { -// keyMsgData := apistruct.KeyMsgData{ -// SendID: msg.SendID, -// RecvID: msg.RecvID, -// GroupID: msg.GroupID, -// } - -// return map[string]string{ -// webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)), -// } -// } +func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string { + keyMsgData := apistruct.KeyMsgData{ + SendID: msg.SendID, + RecvID: msg.RecvID, + GroupID: msg.GroupID, + } + + return map[string]string{ + webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)), + } +} diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index d97905bff..18ad9cc56 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -86,7 +86,7 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq, go m.setConversationAtInfo(ctx, req.MsgData) } - // m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req) + m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req) prommetrics.GroupChatMsgProcessSuccessCounter.Inc() resp = &pbmsg.SendMsgResp{} @@ -194,7 +194,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq return nil, err } - // m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req) + m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req) prommetrics.SingleChatMsgProcessSuccessCounter.Inc() return &pbmsg.SendMsgResp{ ServerMsgID: req.MsgData.ServerMsgID, diff --git a/pkg/callbackstruct/constant.go b/pkg/callbackstruct/constant.go index bbc34e71f..ef7cb502f 100644 --- a/pkg/callbackstruct/constant.go +++ b/pkg/callbackstruct/constant.go @@ -66,4 +66,5 @@ const ( CallbackAfterCreateSingleChatConversationsCommand = "callbackAfterCreateSingleChatConversationsCommand" CallbackBeforeCreateGroupChatConversationsCommand = "callbackBeforeCreateGroupChatConversationsCommand" CallbackAfterCreateGroupChatConversationsCommand = "callbackAfterCreateGroupChatConversationsCommand" + CallbackAfterMsgSaveDBCommand = "callbackAfterMsgSaveDBCommand" ) diff --git a/pkg/callbackstruct/conversation.go b/pkg/callbackstruct/conversation.go index 14e78094c..d40e914d3 100644 --- a/pkg/callbackstruct/conversation.go +++ b/pkg/callbackstruct/conversation.go @@ -2,42 +2,42 @@ package callbackstruct type CallbackBeforeCreateSingleChatConversationsReq struct { CallbackCommand `json:"callbackCommand"` - OwnerUserID string `json:"owner_user_id"` - ConversationID string `json:"conversation_id"` - ConversationType int32 `json:"conversation_type"` - UserID string `json:"user_id"` - RecvMsgOpt int32 `json:"recv_msg_opt"` - IsPinned bool `json:"is_pinned"` - IsPrivateChat bool `json:"is_private_chat"` - BurnDuration int32 `json:"burn_duration"` - GroupAtType int32 `json:"group_at_type"` - AttachedInfo string `json:"attached_info"` + OwnerUserID string `json:"ownerUserId"` + ConversationID string `json:"conversationId"` + ConversationType int32 `json:"conversationType"` + UserID string `json:"userId"` + RecvMsgOpt int32 `json:"recvMsgOpt"` + IsPinned bool `json:"isPinned"` + IsPrivateChat bool `json:"isPrivateChat"` + BurnDuration int32 `json:"burnDuration"` + GroupAtType int32 `json:"groupAtType"` + AttachedInfo string `json:"attachedInfo"` Ex string `json:"ex"` } type CallbackBeforeCreateSingleChatConversationsResp struct { CommonCallbackResp - RecvMsgOpt *int32 `json:"recv_msg_opt"` - IsPinned *bool `json:"is_pinned"` - IsPrivateChat *bool `json:"is_private_chat"` - BurnDuration *int32 `json:"burn_duration"` - GroupAtType *int32 `json:"group_at_type"` - AttachedInfo *string `json:"attached_info"` + RecvMsgOpt *int32 `json:"recvMsgOpt"` + IsPinned *bool `json:"isPinned"` + IsPrivateChat *bool `json:"isPrivateChat"` + BurnDuration *int32 `json:"burnDuration"` + GroupAtType *int32 `json:"groupAtType"` + AttachedInfo *string `json:"attachedInfo"` Ex *string `json:"ex"` } type CallbackAfterCreateSingleChatConversationsReq struct { CallbackCommand `json:"callbackCommand"` - OwnerUserID string `json:"owner_user_id"` - ConversationID string `json:"conversation_id"` - ConversationType int32 `json:"conversation_type"` - UserID string `json:"user_id"` - RecvMsgOpt int32 `json:"recv_msg_opt"` - IsPinned bool `json:"is_pinned"` - IsPrivateChat bool `json:"is_private_chat"` - BurnDuration int32 `json:"burn_duration"` - GroupAtType int32 `json:"group_at_type"` - AttachedInfo string `json:"attached_info"` + OwnerUserID string `json:"ownerUserId"` + ConversationID string `json:"conversationId"` + ConversationType int32 `json:"conversationType"` + UserID string `json:"userId"` + RecvMsgOpt int32 `json:"recvMsgOpt"` + IsPinned bool `json:"isPinned"` + IsPrivateChat bool `json:"isPrivateChat"` + BurnDuration int32 `json:"burnDuration"` + GroupAtType int32 `json:"groupAtType"` + AttachedInfo string `json:"attachedInfo"` Ex string `json:"ex"` } @@ -47,42 +47,42 @@ type CallbackAfterCreateSingleChatConversationsResp struct { type CallbackBeforeCreateGroupChatConversationsReq struct { CallbackCommand `json:"callbackCommand"` - OwnerUserID string `json:"owner_user_id"` - ConversationID string `json:"conversation_id"` - ConversationType int32 `json:"conversation_type"` - GroupID string `json:"group_id"` - RecvMsgOpt int32 `json:"recv_msg_opt"` - IsPinned bool `json:"is_pinned"` - IsPrivateChat bool `json:"is_private_chat"` - BurnDuration int32 `json:"burn_duration"` - GroupAtType int32 `json:"group_at_type"` - AttachedInfo string `json:"attached_info"` + OwnerUserID string `json:"ownerUserId"` + ConversationID string `json:"conversationId"` + ConversationType int32 `json:"conversationType"` + GroupID string `json:"groupId"` + RecvMsgOpt int32 `json:"recvMsgOpt"` + IsPinned bool `json:"isPinned"` + IsPrivateChat bool `json:"isPrivateChat"` + BurnDuration int32 `json:"burnDuration"` + GroupAtType int32 `json:"groupAtType"` + AttachedInfo string `json:"attachedInfo"` Ex string `json:"ex"` } type CallbackBeforeCreateGroupChatConversationsResp struct { CommonCallbackResp - RecvMsgOpt *int32 `json:"recv_msg_opt"` - IsPinned *bool `json:"is_pinned"` - IsPrivateChat *bool `json:"is_private_chat"` - BurnDuration *int32 `json:"burn_duration"` - GroupAtType *int32 `json:"group_at_type"` - AttachedInfo *string `json:"attached_info"` + RecvMsgOpt *int32 `json:"recvMsgOpt"` + IsPinned *bool `json:"isPinned"` + IsPrivateChat *bool `json:"isPrivateChat"` + BurnDuration *int32 `json:"burnDuration"` + GroupAtType *int32 `json:"groupAtType"` + AttachedInfo *string `json:"attachedInfo"` Ex *string `json:"ex"` } type CallbackAfterCreateGroupChatConversationsReq struct { CallbackCommand `json:"callbackCommand"` - OwnerUserID string `json:"owner_user_id"` - ConversationID string `json:"conversation_id"` - ConversationType int32 `json:"conversation_type"` - GroupID string `json:"group_id"` - RecvMsgOpt int32 `json:"recv_msg_opt"` - IsPinned bool `json:"is_pinned"` - IsPrivateChat bool `json:"is_private_chat"` - BurnDuration int32 `json:"burn_duration"` - GroupAtType int32 `json:"group_at_type"` - AttachedInfo string `json:"attached_info"` + OwnerUserID string `json:"ownerUserId"` + ConversationID string `json:"conversationId"` + ConversationType int32 `json:"conversationType"` + GroupID string `json:"groupId"` + RecvMsgOpt int32 `json:"recvMsgOpt"` + IsPinned bool `json:"isPinned"` + IsPrivateChat bool `json:"isPrivateChat"` + BurnDuration int32 `json:"burnDuration"` + GroupAtType int32 `json:"groupAtType"` + AttachedInfo string `json:"attachedInfo"` Ex string `json:"ex"` } diff --git a/pkg/callbackstruct/message.go b/pkg/callbackstruct/message.go index 902fa6110..ef8a8bb28 100644 --- a/pkg/callbackstruct/message.go +++ b/pkg/callbackstruct/message.go @@ -103,3 +103,13 @@ type CallbackSingleMsgReadReq struct { type CallbackSingleMsgReadResp struct { CommonCallbackResp } + +type CallbackAfterMsgSaveDBReq struct { + CommonCallbackReq + RecvID string `json:"recvID"` + GroupID string `json:"groupID"` +} + +type CallbackAfterMsgSaveDBResp struct { + CommonCallbackResp +} diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index e645c9356..695684d15 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -473,6 +473,7 @@ type Webhooks struct { BeforeSendGroupMsg BeforeConfig `yaml:"beforeSendGroupMsg"` BeforeMsgModify BeforeConfig `yaml:"beforeMsgModify"` AfterSendGroupMsg AfterConfig `yaml:"afterSendGroupMsg"` + AfterMsgSaveDB AfterConfig `yaml:"afterMsgSaveDB"` AfterUserOnline AfterConfig `yaml:"afterUserOnline"` AfterUserOffline AfterConfig `yaml:"afterUserOffline"` AfterUserKickOff AfterConfig `yaml:"afterUserKickOff"` diff --git a/pkg/common/storage/cache/conversation.go b/pkg/common/storage/cache/conversation.go index ac3011107..d5ab2264f 100644 --- a/pkg/common/storage/cache/conversation.go +++ b/pkg/common/storage/cache/conversation.go @@ -16,6 +16,7 @@ package cache import ( "context" + relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" ) @@ -57,7 +58,7 @@ type ConversationCache interface { GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache DelConversationNotNotifyMessageUserIDs(userIDs ...string) ConversationCache - DelConversationPinnedMessageUserIDs(userIDs ...string) ConversationCache + DelUserPinnedConversations(userIDs ...string) ConversationCache DelConversationVersionUserIDs(userIDs ...string) ConversationCache FindMaxConversationUserVersion(ctx context.Context, userID string) (*relationtb.VersionLog, error) diff --git a/pkg/common/storage/cache/redis/conversation.go b/pkg/common/storage/cache/redis/conversation.go index 3453b570d..16d67322b 100644 --- a/pkg/common/storage/cache/redis/conversation.go +++ b/pkg/common/storage/cache/redis/conversation.go @@ -253,7 +253,7 @@ func (c *ConversationRedisCache) DelConversationNotNotifyMessageUserIDs(userIDs return cache } -func (c *ConversationRedisCache) DelConversationPinnedMessageUserIDs(userIDs ...string) cache.ConversationCache { +func (c *ConversationRedisCache) DelUserPinnedConversations(userIDs ...string) cache.ConversationCache { cache := c.CloneConversationCache() for _, userID := range userIDs { cache.AddKeys(c.getPinnedConversationIDsKey(userID)) diff --git a/pkg/common/storage/controller/conversation.go b/pkg/common/storage/controller/conversation.go index 27442ca66..d085c7466 100644 --- a/pkg/common/storage/controller/conversation.go +++ b/pkg/common/storage/controller/conversation.go @@ -78,6 +78,8 @@ type ConversationDatabase interface { GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error) // FindRandConversation finds random conversations based on the specified timestamp and limit. FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error) + + DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) } func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase { @@ -120,7 +122,7 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context, cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...) } if _, ok := fieldMap["is_pinned"]; ok { - cache = cache.DelConversationPinnedMessageUserIDs(userIDs...) + cache = cache.DelUserPinnedConversations(userIDs...) } cache = cache.DelConversationVersionUserIDs(haveUserIDs...) } @@ -172,7 +174,7 @@ func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context, cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...) } if _, ok := args["is_pinned"]; ok { - cache = cache.DelConversationPinnedMessageUserIDs(userIDs...) + cache = cache.DelUserPinnedConversations(userIDs...) } return cache.ChainExecDel(ctx) } @@ -203,7 +205,7 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat DelUserConversationIDsHash(userIDs...). DelConversationVersionUserIDs(userIDs...). DelConversationNotNotifyMessageUserIDs(notNotifyUserIDs...). - DelConversationPinnedMessageUserIDs(pinnedUserIDs...). + DelUserPinnedConversations(pinnedUserIDs...). ChainExecDel(ctx) } @@ -259,7 +261,7 @@ func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUs cache := c.cache.CloneConversationCache() cache = cache.DelConversationVersionUserIDs(ownerUserID). DelConversationNotNotifyMessageUserIDs(ownerUserID). - DelConversationPinnedMessageUserIDs(ownerUserID) + DelUserPinnedConversations(ownerUserID) groupIDs := datautil.Distinct(datautil.Filter(conversations, func(e *relationtb.Conversation) (string, bool) { return e.GroupID, e.GroupID != "" @@ -429,3 +431,21 @@ func (c *conversationDatabase) GetPinnedConversationIDs(ctx context.Context, use func (c *conversationDatabase) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error) { return c.conversationDB.FindRandConversation(ctx, ts, limit) } + +func (c *conversationDatabase) DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) { + return c.tx.Transaction(ctx, func(ctx context.Context) error { + err = c.conversationDB.DeleteUsersConversations(ctx, userID, conversationIDs) + if err != nil { + return err + } + cache := c.cache.CloneConversationCache() + cache = cache.DelConversations(userID, conversationIDs...). + DelConversationVersionUserIDs(userID). + DelConversationIDs(userID). + DelUserConversationIDsHash(userID). + DelConversationNotNotifyMessageUserIDs(userID). + DelUserPinnedConversations(userID) + + return cache.ChainExecDel(ctx) + }) +} diff --git a/pkg/common/storage/database/conversation.go b/pkg/common/storage/database/conversation.go index d612dfc2d..562782346 100644 --- a/pkg/common/storage/database/conversation.go +++ b/pkg/common/storage/database/conversation.go @@ -44,4 +44,5 @@ type Conversation interface { GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*model.Conversation, error) + DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) } diff --git a/pkg/common/storage/database/mgo/conversation.go b/pkg/common/storage/database/mgo/conversation.go index 89f13ea3d..d5ab046aa 100644 --- a/pkg/common/storage/database/mgo/conversation.go +++ b/pkg/common/storage/database/mgo/conversation.go @@ -308,3 +308,20 @@ func (c *ConversationMgo) FindRandConversation(ctx context.Context, ts int64, li } return mongoutil.Aggregate[*model.Conversation](ctx, c.coll, pipeline) } + +func (c *ConversationMgo) DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) { + if len(conversationIDs) == 0 { + return nil + } + return mongoutil.IncrVersion(func() error { + err := mongoutil.DeleteMany(ctx, c.coll, bson.M{"owner_user_id": userID, "conversation_id": bson.M{"$in": conversationIDs}}) + return err + }, func() error { + for _, conversationID := range conversationIDs { + if err := c.version.IncrVersion(ctx, userID, []string{conversationID}, model.VersionStateDelete); err != nil { + return err + } + } + return nil + }) +} diff --git a/pkg/common/storage/database/mgo/user.go b/pkg/common/storage/database/mgo/user.go index a08765baf..04ecca1e8 100644 --- a/pkg/common/storage/database/mgo/user.go +++ b/pkg/common/storage/database/mgo/user.go @@ -73,7 +73,7 @@ func (u *UserMgo) TakeNotification(ctx context.Context, level int64) (user []*mo } func (u *UserMgo) TakeGTEAppManagerLevel(ctx context.Context, level int64) (user []*model.User, err error) { - return mongoutil.Find[*model.User](ctx, u.coll, bson.M{"app_manager_level": bson.M{"$gte": level}}) + return mongoutil.Find[*model.User](ctx, u.coll, bson.M{"app_manger_level": bson.M{"$gte": level}}) } func (u *UserMgo) TakeByNickname(ctx context.Context, nickname string) (user []*model.User, err error) { diff --git a/pkg/msgprocessor/conversation.go b/pkg/msgprocessor/conversation.go index 04d772d16..137eb4457 100644 --- a/pkg/msgprocessor/conversation.go +++ b/pkg/msgprocessor/conversation.go @@ -109,7 +109,7 @@ func GetConversationIDBySessionType(sessionType int, ids ...string) string { case constant.ReadGroupChatType: return "sg_" + ids[0] // super group chat case constant.NotificationChatType: - return "sn_" + ids[0] // server notification chat + return "sn_" + strings.Join(ids, "_") // server notification chat } return "" }