diff --git a/.cursorignore b/.cursorignore index 95694ea0e..777a278ee 100644 --- a/.cursorignore +++ b/.cursorignore @@ -1,14 +1,14 @@ # Cursor 代码索引忽略(语法与 .gitignore 相同) -# 与根目录 .gitignore 对齐;未列出的规则仍以 .gitignore 为准(Git 不索引的路径 Cursor 通常也不关心) +# 与根目录 .gitignore 对齐;以下为补充规则,减少生成物/文档噪音,保留业务源码与 .proto ### OpenIM(与 .gitignore 一致)### -logs -.devcontainer -components -out-test +logs/ +.devcontainer/ +components/ +out-test/ Dockerfile.cross -### macOS / 本地工具(不入索引)### +### macOS / 本地工具 ### .DS_Store .playwright-mcp/ @@ -17,26 +17,51 @@ tmp/ bin/ output/ _output/ +build/ +dist/ deployments/charts/generated-configs/ ### 配置与密钥(勿入索引)### .env config/config.yaml config/notification.yaml +start-config.yml ### 部署生成物 ### -deployments/openim-server/charts +deployments/openim-server/charts/ ### 本地笔记 ### .idea.md .todo.md .note.md +### 生成代码(以 .proto 为准,勿重复索引)### +protocol/**/*.pb.go +protocol/**/*_grpc.pb.go + +### 文档与资源(保留 docs/contrib、根 README;忽略多语言 readme 与静态资源)### +docs/readme/ +docs/.generated_docs +docs/contributing/ +assets/ +virgil_chat_server_design.md +docs/virgil-e2ee-*.md + +### 测试与脚本输出 ### +test/e2e/output/ +scripts/**/*.log + ### 通用备份与临时文件 ### *.bak +*.gho +*.ori +*.orig *.tmp *~ -dist/ +*.BACKUP.* +*.BASE.* +*.LOCAL.* +*.REMOTE.* ### VS Code(除团队共享配置外)### .vscode/* @@ -44,6 +69,7 @@ dist/ !.vscode/tasks.json !.vscode/launch.json !.vscode/extensions.json +*.code-workspace ### Go ### *.exe @@ -54,13 +80,19 @@ dist/ *.test *.out vendor/ +go.work go.work.sum +go.sum ### JetBrains / IDE ### .idea/ out/ -### Tags ### +### Git / CI(低价值索引)### +.git/ +.github/ + +### Tags / 索引工具 ### TAGS tags gtags.files @@ -70,3 +102,5 @@ GPATH GSYMS cscope.files cscope.out +cscope.in.out +cscope.po.out diff --git a/cmd/openim-rpc/openim-rpc-redpacket/client-integration-guide.md b/cmd/openim-rpc/openim-rpc-redpacket/client-integration-guide.md index aaf3b50d0..19ad75d16 100644 --- a/cmd/openim-rpc/openim-rpc-redpacket/client-integration-guide.md +++ b/cmd/openim-rpc/openim-rpc-redpacket/client-integration-guide.md @@ -1,8 +1,9 @@ # RedPacket 前端对接文档 -本文档面向前端 / 网关 / App 对接方,说明红包领取和钱包绑定的真实接入方式,重点覆盖: +本文档面向前端 / 网关 / App 对接方,说明红包创建、领取和钱包绑定的真实接入方式,重点覆盖: - 如何把当前登录用户传递给红包服务 +- 如何创建红包(业务单 + 链上创建 + 回写激活) - 如何绑定钱包 - 如何申请领取签名 - 前端何时发链、何时回写后端 @@ -97,7 +98,111 @@ Content-Type: application/json 已经在后端建立了有效绑定关系。 -## 3. 领取签名流程 +## 3. 创建红包流程 + +### 4.1 流程图 + +```text +前端 -> 红包服务: POST /api/redpacket/create-order +红包服务 -> 前端: biz_id (状态 PENDING) +前端 -> 钱包/链上: createFixedPacket/createRandomPacket/createTransfer +链上 -> 前端: tx_hash + packet_id(从事件或回执解析) +前端 -> 红包服务: POST /api/redpacket/created-callback +红包服务 -> 红包服务: 校验创建参数并激活红包 +红包服务 -> 前端: 回写成功 (状态 ACTIVE) +前端 -> 红包服务: POST /api/redpacket/detail (可选) +``` + +### 3.2 创建业务单(发链前必调) + +请求: + +```http +POST /api/redpacket/create-order +token: +Content-Type: application/json +``` + +```json +{ + "chain_type": "EVM", + "chain_id": 1, + "contract_address": "0xA1f42567559aBA5Ff0aac84cdE1AaF1F9DbB888F", + "creator_wallet": "0x1111111111111111111111111111111111111111", + "group_id": "g001", + "scope_type": "GROUP", + "receiver_user_id": "", + "receiver_user_ids": [], + "packet_type": 1, + "token": "0x2222222222222222222222222222222222222222", + "total_amount": "1000000000000000000", + "total_shares": 10, + "expiry_at": 0, + "remark": "happy new year" +} +``` + +关键说明: + +- 不需要传 `user_id`,创建人从上下文 `opUserID` 取 +- `total_amount` 必须是链上最小单位十进制字符串(例如 wei) +- `packet_type`: `0` 固定红包,`1` 拼手气红包,`2` 转账 +- `scope_type=GROUP` 时必须传 `group_id` +- `scope_type=DIRECT` 时必须传 `receiver_user_id` 或 `receiver_user_ids` + +成功响应里最关键的是: + +- `biz_id`: 业务红包单号(后续回写必须带上) + +### 3.3 链上创建红包 + +前端拿到 `biz_id` 后,再调用链上创建方法: + +- 固定红包:`createFixedPacket(...)` +- 拼手气红包:`createRandomPacket(...)` +- 转账红包:`createTransfer(...)` + +链上交易成功后,前端需要得到: + +- `tx_hash` +- `packet_id`(优先从 `PacketCreated` 事件解析) + +### 3.4 创建回写(激活红包) + +请求: + +```http +POST /api/redpacket/created-callback +token: +Content-Type: application/json +``` + +```json +{ + "biz_id": "f8a0f87e-d9cb-4d4a-8350-7bd43ab2e9a4", + "tx_hash": "0xabc123...", + "packet_id": "10001", + "group_id": "g001", + "scope_type": "GROUP", + "receiver_user_id": "", + "receiver_user_ids": [] +} +``` + +说明: + +- `biz_id`、`tx_hash` 必填 +- 推荐传 `packet_id`(可减少后端 fallback 分支) +- 回写成功后红包状态从 `PENDING` 变为 `ACTIVE` +- 回写后可调 `/api/redpacket/detail` 刷新页面状态 + +### 3.5 创建流程常见坑 + +- 先发链再 `create_order`:会导致回写阶段缺少有效 `biz_id` +- `create_order` 的 `creator_wallet` 与实际发链钱包不一致:可能被后端校验拦截 +- 未调用 `created_callback`:红包会一直停留在 `PENDING`,领取侧会失败 + +## 4. 领取签名流程 ### 3.1 流程图 @@ -111,7 +216,7 @@ Content-Type: application/json 链监听器 -> 红包服务: 最终确认领取结果 ``` -### 3.2 申请领取签名 +### 4.2 申请领取签名 请求: @@ -159,7 +264,7 @@ Content-Type: application/json } ``` -### 3.3 前端拿到响应后要做什么 +### 4.3 前端拿到响应后要做什么 前端必须原样把这些参数传给链上: @@ -182,7 +287,7 @@ claim(packetId, authNonce, randomSeed, deadline, signature) - 不要对摘要再次做 `signMessage` - 后端返回的 `signature` 已经是最终可上链签名 -## 4. 领取结果回写 +## 5. 领取结果回写 `claim-result` 是可选的,主要作用是让业务侧尽快看到一条 `PENDING` 领取记录。 @@ -210,29 +315,38 @@ Content-Type: application/json - 如果不能,会先记成 `PENDING` - 最终仍以链监听器为准 -## 5. 前端推荐调用顺序 +## 6. 前端推荐调用顺序 + +### 6.1 创建红包 + +1. 用户登录业务系统 +2. 前端请求 `/api/redpacket/create-order` +3. 拿到 `biz_id` 后,钱包调用链上创建红包方法 +4. 从交易回执/事件拿到 `tx_hash`、`packet_id` +5. 前端请求 `/api/redpacket/created-callback` +6. 前端请求 `/api/redpacket/detail` 刷新状态(确认 `ACTIVE`) -### 5.1 首次使用钱包领取 +### 6.2 首次使用钱包领取 1. 用户登录业务系统 -2. 前端请求 `/wallet-bind/challenge` +2. 前端请求 `/api/redpacket/wallet-bind/challenge` 3. 钱包对 `message` 签名 -4. 前端请求 `/wallet-bind/confirm` +4. 前端请求 `/api/redpacket/wallet-bind/confirm` 5. 绑定成功后再进入领取流程 -### 5.2 正常领取 +### 6.3 正常领取 1. 前端拿到红包 `packet_id` 2. 用户连接钱包,得到本次 `claimer` 地址 -3. 前端请求 `/claim-sign` +3. 前端请求 `/api/redpacket/claim-sign` 4. 拿到 `auth_nonce + random_seed + deadline + signature` 5. 前端调用链上 `claim(...)` -6. 前端可选请求 `/claim-result` +6. 前端可选请求 `/api/redpacket/claim-result` 7. 页面轮询详情页或等待业务侧状态同步 -## 6. 常见错误和排查 +## 7. 常见错误和排查 -### 6.1 `op user id missing in context` +### 7.1 `op user id missing in context` 原因: @@ -240,7 +354,7 @@ Content-Type: application/json - 网关没有把 `opUserID` 注入上下文 - 直接绕过网关调用了红包服务 -### 6.2 `wallet is not bound to user` +### 7.2 `wallet is not bound to user` 原因: @@ -248,20 +362,20 @@ Content-Type: application/json - 当前钱包绑定的是别的业务用户 - 链类型不一致 -### 6.3 `already claimed` +### 7.3 `already claimed` 原因: - 同一个钱包地址已经领过该红包 -### 6.4 `user already claimed` +### 7.4 `user already claimed` 原因: - 同一个业务用户已经领取过该红包 - 即使换钱包地址,也会被后端拦截 -## 7. 后端接口与代码位置 +## 8. 后端接口与代码位置 - 接口契约文档: [backend-api.md](/Users/panda/aiCode/red_packet/open-im-server-origin/cmd/openim-rpc/openim-rpc-redpacket/backend-api.md) diff --git a/config/openim-crontask.yml b/config/openim-crontask.yml index ccce6cfd7..0cf66780c 100644 --- a/config/openim-crontask.yml +++ b/config/openim-crontask.yml @@ -1,6 +1,9 @@ cronExecuteTime: 0 2 * * * +burnCronExecuteTime: "*/1 * * * *" # 可选:仅阅后即焚清理使用;不配置则与 cronExecuteTime 相同 +burnClearLimit: 100 # 可选:单次 RPC 批大小,默认 100 +burnClearMaxLoop: 100 # 可选:单次定时内最大循环轮数,默认 10000 retainChatRecords: 365 fileExpireTime: 180 deleteObjectType: ["msg-picture","msg-file", "msg-voice","msg-video","msg-video-snapshot","sdklog"] chatAPI: - address: http://127.0.0.1:10008 \ No newline at end of file + address: http://127.0.0.1:10008 diff --git a/config/openim-rpc-redpacket.yml b/config/openim-rpc-redpacket.yml index 58bed2edf..b9e69b305 100644 --- a/config/openim-rpc-redpacket.yml +++ b/config/openim-rpc-redpacket.yml @@ -12,20 +12,23 @@ prometheus: # Leave rpcURL empty to disable the EVM client; the RPC service will then # only expose TRON-related functionality (or the offchain code paths). chain: - rpcURL: "" - contractAddress: "" - chainID: 0 - signerPrivateKey: "" - configAdminPrivateKey: "" + rpcURL: "https://data-seed-prebsc-1-s1.bnbchain.org:8545" + contractAddress: "0x9f2e22F5D0cf8d8127E319D38b3EDDaE43bb4DC0" + chainID: 97 + signerPrivateKey: "e9f6a5f3a3c3a97099ca31e7f44151e529c0a4f8a91d5d4232c7282f2b798df4" + configAdminPrivateKey: "e9f6a5f3a3c3a97099ca31e7f44151e529c0a4f8a91d5d4232c7282f2b798df4" # TRON full-node configuration. Leave fullNodeURL empty to disable TRON. tron: - fullNodeURL: "" - contractBase58: "" - ownerBase58: "" - privateKeyHex: "" - feeLimit: 100000000 + fullNodeURL: "https://nile.trongrid.io" + contractBase58: "TTLFvSEyH6BAUp3cVGygWyTVGPBqdLw81M" + ownerBase58: "TJG6ZbEUZNVAQzFQTZehfQJniVJtTBdZ2e" + privateKeyHex: "e9f6a5f3a3c3a97099ca31e7f44151e529c0a4f8a91d5d4232c7282f2b798df4" + feeLimit: 150000000 # Indexer polling interval (in seconds). Used by both EVM and TRON event indexers. +# Set to 0 or negative to disable block scanning completely and rely on tx-hash parsing paths. indexer: - pollInterval: 5 + pollInterval: 0 + # EVM only: max block span per eth_getLogs request (0 = default 2000). Increase if your node allows larger ranges. + maxBlocksPerPoll: 2000 diff --git a/internal/api/conversation.go b/internal/api/conversation.go index f7dbc133c..23024e14b 100644 --- a/internal/api/conversation.go +++ b/internal/api/conversation.go @@ -71,3 +71,11 @@ func (o *ConversationApi) GetNotNotifyConversationIDs(c *gin.Context) { func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) { a2r.Call(c, conversation.ConversationClient.GetPinnedConversationIDs, o.Client) } + +func (o *ConversationApi) SetMute(c *gin.Context) { + a2r.Call(c, conversation.ConversationClient.SetConversationMute, o.Client) +} + +func (o *ConversationApi) SetBurn(c *gin.Context) { + a2r.Call(c, conversation.ConversationClient.SetConversationBurn, o.Client) +} diff --git a/internal/api/friend.go b/internal/api/friend.go index 14bce2388..6932c495b 100644 --- a/internal/api/friend.go +++ b/internal/api/friend.go @@ -41,6 +41,10 @@ func (o *FriendApi) DeleteFriend(c *gin.Context) { a2r.Call(c, relation.FriendClient.DeleteFriend, o.Client) } +func (o *FriendApi) DeleteFriendOneway(c *gin.Context) { + a2r.Call(c, relation.FriendClient.DeleteFriendOneway, o.Client) +} + func (o *FriendApi) GetFriendApplyList(c *gin.Context) { a2r.Call(c, relation.FriendClient.GetPaginationFriendsApplyTo, o.Client) } @@ -127,13 +131,6 @@ func (o *FriendApi) AddOnewayFriend(c *gin.Context) { a2r.Call(c, relation.FriendClient.AddOnewayFriend, o.Client) } -func (o *FriendApi) SetMute(c *gin.Context) { - a2r.Call(c, relation.FriendClient.SetMute, o.Client) -} - -func (o *FriendApi) GetMute(c *gin.Context) { - a2r.Call(c, relation.FriendClient.GetMute, o.Client) -} func (o *FriendApi) PinFriend(c *gin.Context) { a2r.Call(c, relation.FriendClient.PinFriend, o.Client) diff --git a/internal/api/router.go b/internal/api/router.go index 1f2b98eb5..ed0dc0bde 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -198,6 +198,7 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co f := NewFriendApi(relation.NewFriendClient(friendConn)) friendRouterGroup := r.Group("/friend") friendRouterGroup.POST("/delete_friend", f.DeleteFriend) + friendRouterGroup.POST("/delete_friend_oneway", f.DeleteFriendOneway) friendRouterGroup.POST("/get_friend_apply_list", f.GetFriendApplyList) friendRouterGroup.POST("/get_designated_friend_apply", f.GetDesignatedFriendsApply) friendRouterGroup.POST("/get_self_friend_apply_list", f.GetSelfApplyList) @@ -221,8 +222,6 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co friendRouterGroup.POST("/get_self_unhandled_apply_count", f.GetSelfUnhandledApplyCount) friendRouterGroup.POST("/get_pinned_friend_ids", f.GetPinnedFriendIDs) friendRouterGroup.POST("/add_oneway_friend", f.AddOnewayFriend) - friendRouterGroup.POST("/set_mute", f.SetMute) - friendRouterGroup.POST("/get_mute", f.GetMute) friendRouterGroup.POST("/pin", f.PinFriend) friendRouterGroup.POST("/unpin", f.UnpinFriend) } @@ -358,6 +357,8 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation) conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs) conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs) + conversationGroup.POST("/set_mute", c.SetMute) + conversationGroup.POST("/set_burn", c.SetBurn) } { diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index 31a67a4d5..26dd34175 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -36,6 +36,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/protocol/constant" pbconversation "github.com/openimsdk/protocol/conversation" + "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/discovery" @@ -127,6 +128,7 @@ func (c *conversationServer) GetConversation(ctx context.Context, req *pbconvers } resp := &pbconversation.GetConversationResp{Conversation: &pbconversation.Conversation{}} resp.Conversation = convert.ConversationDB2Pb(conversations[0]) + c.fillConversationUserMute(ctx, resp.Conversation) return resp, nil } @@ -201,6 +203,15 @@ func (c *conversationServer) GetSortedConversationList(ctx context.Context, req } conversation_notPinTime[time] = conversationID } + for _, v := range conversations { + elem, ok := conversationMsg[v.ConversationID] + if !ok { + continue + } + elem.MuteDuration = v.MuteDuration + elem.MuteEndTime = v.MuteEndTime + elem.IsMuted = computeIsMuted(v.MuteDuration, v.MuteEndTime) + } resp = &pbconversation.GetSortedConversationListResp{ ConversationTotal: int64(len(chatLogs)), ConversationElems: []*pbconversation.ConversationElem{}, @@ -221,6 +232,7 @@ func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbcon } resp := &pbconversation.GetAllConversationsResp{Conversations: []*pbconversation.Conversation{}} resp.Conversations = convert.ConversationsDB2Pb(conversations) + c.fillConversationsUserMute(ctx, resp.Conversations) return resp, nil } @@ -239,9 +251,9 @@ func (c *conversationServer) getConversations(ctx context.Context, ownerUserID s if err != nil { return nil, err } - resp := &pbconversation.GetConversationsResp{Conversations: []*pbconversation.Conversation{}} - resp.Conversations = convert.ConversationsDB2Pb(conversations) - return convert.ConversationsDB2Pb(conversations), nil + list := convert.ConversationsDB2Pb(conversations) + c.fillConversationsUserMute(ctx, list) + return list, nil } // Deprecated @@ -529,7 +541,9 @@ func (c *conversationServer) GetConversationsByConversationID( if err != nil { return nil, err } - return &pbconversation.GetConversationsByConversationIDResp{Conversations: convert.ConversationsDB2Pb(conversations)}, nil + list := convert.ConversationsDB2Pb(conversations) + c.fillConversationsUserMute(ctx, list) + return &pbconversation.GetConversationsByConversationIDResp{Conversations: list}, nil } func (c *conversationServer) GetConversationOfflinePushUserIDs(ctx context.Context, req *pbconversation.GetConversationOfflinePushUserIDsReq) (*pbconversation.GetConversationOfflinePushUserIDsResp, error) { @@ -717,9 +731,11 @@ func (c *conversationServer) GetOwnerConversation(ctx context.Context, req *pbco if err != nil { return nil, err } + list := convert.ConversationsDB2Pb(conversations) + c.fillConversationsUserMute(ctx, list) return &pbconversation.GetOwnerConversationResp{ Total: total, - Conversations: convert.ConversationsDB2Pb(conversations), + Conversations: list, }, nil } @@ -770,7 +786,9 @@ func (c *conversationServer) GetConversationsNeedClearMsg(ctx context.Context, _ } } - return &pbconversation.GetConversationsNeedClearMsgResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil + list := convert.ConversationsDB2Pb(temp) + c.fillConversationsUserMute(ctx, list) + return &pbconversation.GetConversationsNeedClearMsgResp{Conversations: list}, nil } func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, req *pbconversation.GetNotNotifyConversationIDsReq) (*pbconversation.GetNotNotifyConversationIDsResp, error) { @@ -839,9 +857,11 @@ func (c *conversationServer) setConversationMinSeqAndLatestMsgDestructTime(ctx c // ClearBurnExpiredMsgs 处理「阅后即焚」过期消息: // 1. 从 msg_burn_deadline 中拉取一批过期分组(按 user/conversation 聚合,含每组最大 seq)。 -// 2. 对每个分组把用户在该会话上的 min_seq 推进到 max(过期 seq) + 1。 -// 3. 同步更新 conversation 文档的 min_seq 字段并下发会话变更通知。 -// 4. 删除已处理的 deadline 记录。 +// 2. 对每个分组把用户在该会话上的 min_seq 推进到 max(过期 seq) + 1,更新 conversation 文档。 +// 3. 向阅读方发 ConversationChangeNotification + DeleteMsgsNotification(含精确 seqs)。 +// 4. 单聊场景(si_ 前缀):利用 deadline 记录中的 PeerID 直接推进对端 min_seq, +// 并向对端也发两条通知,保证双方客户端都删本地消息。 +// 5. 物理删除 msg 存储中的焚毁消息;清理 deadline 记录。 // // 单次最多处理 req.Limit 个分组;若返回的 count == limit,cron 可继续触发。 func (c *conversationServer) ClearBurnExpiredMsgs(ctx context.Context, req *pbconversation.ClearBurnExpiredMsgsReq) (*pbconversation.ClearBurnExpiredMsgsResp, error) { @@ -861,30 +881,46 @@ func (c *conversationServer) ClearBurnExpiredMsgs(ctx context.Context, req *pbco if g.UserID == "" || g.ConversationID == "" || g.MaxSeq <= 0 { continue } - newMinSeq := g.MaxSeq + 1 - if err := c.msgClient.SetUserConversationMin(ctx, g.ConversationID, []string{g.UserID}, newMinSeq); err != nil { - log.ZError(ctx, "ClearBurnExpiredMsgs SetUserConversationMin failed", err, - "userID", g.UserID, "conversationID", g.ConversationID, "minSeq", newMinSeq) - continue - } - if err := c.conversationDatabase.UpdateUsersConversationField(ctx, []string{g.UserID}, g.ConversationID, - map[string]any{"min_seq": newMinSeq}); err != nil { - log.ZError(ctx, "ClearBurnExpiredMsgs UpdateUsersConversationField failed", err, - "userID", g.UserID, "conversationID", g.ConversationID, "minSeq", newMinSeq) - continue + //newMinSeq := g.MaxSeq + 1 + + // 推进阅读方 min_seq。 + //if err := c.msgClient.SetUserConversationMin(ctx, g.ConversationID, []string{g.UserID}, newMinSeq); err != nil { + // log.ZError(ctx, "ClearBurnExpiredMsgs SetUserConversationMin failed", err, + // "userID", g.UserID, "conversationID", g.ConversationID, "minSeq", newMinSeq) + // continue + //} + //if err := c.conversationDatabase.UpdateUsersConversationField(ctx, []string{g.UserID}, g.ConversationID, + // map[string]any{"min_seq": newMinSeq}); err != nil { + // log.ZError(ctx, "ClearBurnExpiredMsgs UpdateUsersConversationField failed", err, + // "userID", g.UserID, "conversationID", g.ConversationID, "minSeq", newMinSeq) + // continue + //} + // 通知 g.UserID 客户端:会话变更 + 精确删除指定 seqs。 + // 对端用户在 msg_burn_deadline 中有独立记录,cron 处理其分组时会自行通知, + // 无需在此重复推进对端 min_seq 或发送额外通知。 + //c.conversationNotificationSender.ConversationChangeNotification(ctx, g.UserID, []string{g.ConversationID}) + + // 删除焚毁消息并同步通知阅读方客户端(best-effort,失败不中断流程)。 + if err := c.msgClient.DeleteMsgs(ctx, g.UserID, g.ConversationID, g.Seqs, &msg.DeleteSyncOpt{ + IsSyncSelf: true, + }); err != nil { + log.ZError(ctx, "ClearBurnExpiredMsgs DeleteMsgs failed", err, + "userID", g.UserID, "conversationID", g.ConversationID, "seqs", g.Seqs) } - c.conversationNotificationSender.ConversationChangeNotification(ctx, g.UserID, []string{g.ConversationID}) + if err := c.msgBurnDeadlineDB.DeleteByUserConversationSeqs(ctx, g.UserID, g.ConversationID, g.Seqs); err != nil { log.ZError(ctx, "ClearBurnExpiredMsgs DeleteByUserConversationSeqs failed", err, "userID", g.UserID, "conversationID", g.ConversationID, "seqs", g.Seqs) } + log.ZDebug(ctx, "ClearBurnExpiredMsgs advanced min_seq", "userID", g.UserID, - "conversationID", g.ConversationID, "minSeq", newMinSeq, "seqs", g.Seqs) + "conversationID", g.ConversationID, "seqs", g.Seqs) processed++ } return &pbconversation.ClearBurnExpiredMsgsResp{Count: processed}, nil } + // ClearGroupBurnExpiredMsgs 处理群消息「阅后即焚」到期记录: // 1. 查询满足 read_count >= member_count 且 burn_end_time 过期的记录(按 group_id 聚合)。 // 2. 对每个群,获取所有成员 ID,批量推进他们在群会话上的 min_seq。 @@ -951,3 +987,49 @@ func (c *conversationServer) ClearGroupBurnExpiredMsgs(ctx context.Context, req } return &pbconversation.ClearGroupBurnExpiredMsgsResp{Count: processed}, nil } + +func (c *conversationServer) SetConversationBurn(ctx context.Context, req *pbconversation.SetConversationBurnReq) (*pbconversation.SetConversationBurnResp, error) { + if err := c.conversationDatabase.UpdateUsersConversationField( + ctx, + []string{req.OwnerUserID}, + req.ConversationID, + map[string]any{ + "burn_duration": req.BurnDuration, + }, + ); err != nil { + return nil, err + } + c.conversationNotificationSender.ConversationChangeNotification(ctx, req.OwnerUserID, []string{req.ConversationID}) + return &pbconversation.SetConversationBurnResp{}, nil +} + +func (c *conversationServer) SetConversationMute(ctx context.Context, req *pbconversation.SetConversationMuteReq) (*pbconversation.SetConversationMuteResp, error) { + var ( + muteDuration int32 + muteEndTime int64 + ) + switch { + case req.Duration == 0: + // 取消静音:清零所有静音字段 + case req.Duration == -1: + // 永久静音 + muteDuration = -1 + default: + // 定时静音 + muteDuration = req.Duration + muteEndTime = time.Now().Unix() + int64(req.Duration) + } + if err := c.conversationDatabase.UpdateUsersConversationField( + ctx, + []string{req.OwnerUserID}, + req.ConversationID, + map[string]any{ + "mute_duration": muteDuration, + "mute_end_time": muteEndTime, + }, + ); err != nil { + return nil, err + } + c.conversationNotificationSender.ConversationChangeNotification(ctx, req.OwnerUserID, []string{req.ConversationID}) + return &pbconversation.SetConversationMuteResp{}, nil +} diff --git a/internal/rpc/conversation/mute_fill.go b/internal/rpc/conversation/mute_fill.go new file mode 100644 index 000000000..771b7f36c --- /dev/null +++ b/internal/rpc/conversation/mute_fill.go @@ -0,0 +1,40 @@ +// Copyright © 2023 OpenIM. All rights reserved. + +package conversation + +import ( + "context" + "time" + + pbconversation "github.com/openimsdk/protocol/conversation" +) + +// computeIsMuted 根据会话模型中存储的 mute_duration 和 mute_end_time 计算当前是否处于静音状态: +// - duration == 0 且 end == 0:未静音 +// - duration == -1 且 end == 0:永久静音 +// - end > 0 且 end > now:定时静音仍有效 +// - end > 0 且 end <= now:定时静音已过期,视为未静音 +func computeIsMuted(muteDuration int32, muteEndTime int64) bool { + if muteDuration == 0 && muteEndTime == 0 { + return false + } + if muteDuration == -1 && muteEndTime == 0 { + return true + } + return muteEndTime > time.Now().Unix() +} + +// fillConversationUserMute 根据会话模型字段(已由 ConversationDB2Pb 通过 CopyStructFields 填入 +// conv.MuteDuration / conv.MuteEndTime)计算并设置 conv.IsMuted,无需额外数据库查询。 +func (c *conversationServer) fillConversationUserMute(_ context.Context, conv *pbconversation.Conversation) { + if conv == nil { + return + } + conv.IsMuted = computeIsMuted(conv.MuteDuration, conv.MuteEndTime) +} + +func (c *conversationServer) fillConversationsUserMute(ctx context.Context, list []*pbconversation.Conversation) { + for _, conv := range list { + c.fillConversationUserMute(ctx, conv) + } +} diff --git a/internal/rpc/conversation/notification.go b/internal/rpc/conversation/notification.go index 370865c1a..84c91a267 100644 --- a/internal/rpc/conversation/notification.go +++ b/internal/rpc/conversation/notification.go @@ -73,3 +73,19 @@ func (c *ConversationNotificationSender) ConversationUnreadChangeNotification( c.Notification(ctx, userID, userID, constant.ConversationUnreadNotification, tips) } + +// BurnMsgsDeleteNotification 通知 recvUserID 按 seqs 删除本地消息。 +// sendUserID 为触发焚烧的一方(阅读方),recvUserID 为需要执行删除的一方。 +// 双方各调用一次,保证单聊两端客户端都删除本地缓存。 +func (c *ConversationNotificationSender) BurnMsgsDeleteNotification( + ctx context.Context, + sendUserID, recvUserID, conversationID string, + seqs []int64, +) { + tips := &sdkws.DeleteMsgsTips{ + UserID: sendUserID, + ConversationID: conversationID, + Seqs: seqs, + } + c.Notification(ctx, sendUserID, recvUserID, constant.DeleteMsgsNotification, tips) +} diff --git a/internal/rpc/msg/as_read.go b/internal/rpc/msg/as_read.go index a22343e6f..d830bd608 100644 --- a/internal/rpc/msg/as_read.go +++ b/internal/rpc/msg/as_read.go @@ -215,16 +215,18 @@ func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkCon return &msg.MarkConversationAsReadResp{}, nil } -// recordBurnDeadlines 在「单聊」场景下,根据对端(发送者)的 MsgBurnDuration -// 为本次已读的每条消息同时给接收者和发送者各记录一份「阅后即焚」截止时间。 -// cron 到期后会分别推进两人各自的 min_seq,双方都看不到该消息。 +// recordBurnDeadlines 在「单聊」场景下,为本次已读的每条消息同时给接收者和发送者 +// 各记录一份「阅后即焚」截止时间。cron 到期后会分别删除双方该消息,双方都看不到。 +// +// 销毁时长优先级: +// 1. 会话级 BurnDuration(通过 /conversation/set_burn 设置); +// 2. 对端(发送者)全局 MsgBurnDuration。 // // 设计要点: // 1. 仅单聊。 -// 2. 仅当发送者 MsgBurnDuration > 0 时才记录;0 表示未开启。 -// 3. $setOnInsert 确保同一 (UserID, ConversationID, Seq) 已存在时不覆盖, +// 2. $setOnInsert 确保同一 (UserID, ConversationID, Seq) 已存在时不覆盖, // 以「首次阅读时刻」为 deadline 基准,多端重复 MarkAsRead 不会往后推。 -// 4. 失败仅记录日志,不影响已读主流程。 +// 3. 失败仅记录日志,不影响已读主流程。 func (m *msgServer) recordBurnDeadlines(ctx context.Context, conv *conversation.Conversation, readerUserID string, seqs []int64) { if len(seqs) == 0 { return @@ -236,16 +238,23 @@ func (m *msgServer) recordBurnDeadlines(ctx context.Context, conv *conversation. if peerID == "" || peerID == readerUserID { return } - peerInfo, err := m.UserLocalCache.GetUserInfo(ctx, peerID) - if err != nil { - log.ZWarn(ctx, "recordBurnDeadlines GetUserInfo failed", err, "peerID", peerID) - return - } - if peerInfo == nil || peerInfo.MsgBurnDuration <= 0 { - return + + // 优先使用会话级 BurnDuration(双方协商后保存到会话),否则回退到发送者全局设置。 + burnSeconds := conv.BurnDuration + if burnSeconds <= 0 { + peerInfo, err := m.UserLocalCache.GetUserInfo(ctx, peerID) + if err != nil { + log.ZWarn(ctx, "recordBurnDeadlines GetUserInfo failed", err, "peerID", peerID) + return + } + if peerInfo == nil || peerInfo.MsgBurnDuration <= 0 { + return + } + burnSeconds = peerInfo.MsgBurnDuration } + now := time.Now().UnixMilli() - deadline := now + int64(peerInfo.MsgBurnDuration)*1000 + deadline := now + int64(burnSeconds)*1000 // 每条消息同时为接收者和发送者各写一条 deadline,双方消息同步焚毁。 items := make([]*model.MsgBurnDeadline, 0, len(seqs)*2) for _, seq := range seqs { @@ -254,6 +263,7 @@ func (m *msgServer) recordBurnDeadlines(ctx context.Context, conv *conversation. UserID: readerUserID, ConversationID: conv.ConversationID, Seq: seq, + PeerID: peerID, DeadlineMs: deadline, CreateTime: now, }, @@ -261,6 +271,7 @@ func (m *msgServer) recordBurnDeadlines(ctx context.Context, conv *conversation. UserID: peerID, ConversationID: conv.ConversationID, Seq: seq, + PeerID: readerUserID, DeadlineMs: deadline, CreateTime: now, }, diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index e96f0d935..99c4e908f 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -164,18 +164,16 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq isNotification := msgprocessor.IsNotificationByMsg(req.MsgData) log.ZInfo(ctx, "sendMsgSingleChat", "isNotification", isNotification, "msgdata", req.MsgData) - if !isNotification { - // 非通知类消息:执行发送权限校验 + 接收偏好校验(含 blacklist / MsgReceiveSetting / webhook / FriendVerify / globalOpt / convOpt) - isSend, err = m.modifyMessageByUserMessageReceiveOpt( - ctx, - req.MsgData.RecvID, - conversationutil.GenConversationIDForSingle(req.MsgData.SendID, req.MsgData.RecvID), - constant.SingleChatType, - req, - ) - if err != nil { - return nil, err - } + // 单聊一律校验接收方消息设置(含「仅好友可发」)、黑名单、会话接收偏好等;不再因通知类 Options 跳过 + isSend, err = m.modifyMessageByUserMessageReceiveOpt( + ctx, + req.MsgData.RecvID, + conversationutil.GenConversationIDForSingle(req.MsgData.SendID, req.MsgData.RecvID), + constant.SingleChatType, + req, + ) + if err != nil { + return nil, err } if !isSend { prommetrics.SingleChatMsgProcessFailedCounter.Inc() diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 7a79a43b3..990024a21 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -72,7 +72,6 @@ type msgServer struct { conversationClient *rpcli.ConversationClient spamReportDB database.SpamReport globalBlackDB controller.UserGlobalBlackDatabase - userMuteDB controller.UserMuteDatabase msgBurnDeadlineDB database.MsgBurnDeadline groupMsgBurnRecordDB database.GroupMsgBurnRecord } @@ -139,6 +138,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } + groupMsgBurnRecordDB, err := mgo.NewGroupMsgBurnRecordMongo(mgocli.GetDB()) if err != nil { return err @@ -147,6 +147,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } + s := &msgServer{ MsgDatabase: msgDatabase, RegisterCenter: client, @@ -159,7 +160,6 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg conversationClient: conversationClient, spamReportDB: spamReportDB, globalBlackDB: controller.NewUserGlobalBlackDatabase(globalBlackMgo), - userMuteDB: controller.NewUserMuteDatabase(userMuteMgo), msgBurnDeadlineDB: msgBurnDeadlineDB, groupMsgBurnRecordDB: groupMsgBurnRecordDB, } diff --git a/internal/rpc/msg/verify.go b/internal/rpc/msg/verify.go index e0028e10b..ff7ff1051 100644 --- a/internal/rpc/msg/verify.go +++ b/internal/rpc/msg/verify.go @@ -226,7 +226,7 @@ func (m *msgServer) modifyMessageByUserMessageReceiveOpt(ctx context.Context, us } // 第二优先级:单聊发送权限校验(从 messageVerification 迁移) - // 仅对非通知类消息生效(调用方已通过 !isNotification 做过前置过滤) + // 单聊路径下由 sendMsgSingleChat 始终调用本函数(含通知类),以校验接收方 MsgReceiveSetting 等 if sessionType == constant.SingleChatType { // 管理员跳过发送权限拦截,直接进入接收偏好校验 if !datautil.Contain(pb.MsgData.SendID, m.config.Share.IMAdminUserID...) { @@ -255,10 +255,12 @@ func (m *msgServer) modifyMessageByUserMessageReceiveOpt(ctx context.Context, us // skipFriendVerify: MsgReceiveSetting=1 已确认好友关系,无需再做 FriendVerify 重复查询 skipFriendVerify := false switch recvUserInfo.MsgReceiveSetting { - case 2: // MsgReceiveSettingNobody + case model.MsgReceiveSettingNobody: return false, servererrs.ErrMsgReceiveNotAllowed.Wrap() - case 1: // MsgReceiveSettingFriends - isFriend, err := m.FriendLocalCache.IsFriend(ctx, pb.MsgData.RecvID, pb.MsgData.SendID) + case model.MsgReceiveSettingFriends: + // FriendLocalCache.IsFriend(possibleFriendUserID, userID) 对应「userID 的好友列表里是否有 possibleFriendUserID」 + // 此处须判断:接收方 recv 的好友列表里是否有发送方 send + isFriend, err := m.FriendLocalCache.IsFriend(ctx, pb.MsgData.SendID, pb.MsgData.RecvID) if err != nil { log.ZError(ctx, "modifyMessageByUserMessageReceiveOpt: IsFriend failed (MsgReceiveSetting)", err, "sendID", pb.MsgData.SendID, "recvID", pb.MsgData.RecvID, @@ -323,14 +325,20 @@ func (m *msgServer) modifyMessageByUserMessageReceiveOpt(ctx context.Context, us } } - // 第四优先级:用户静音设置(user_mute 集合,支持好友与非好友) - // 无论会话记录是否存在均检查,以支持对非好友的静音 - if m.userMuteDB != nil { - muted, err := m.userMuteDB.IsMuted(ctx, userID, pb.MsgData.SendID) - if err != nil { - return false, err + // 第四优先级:会话静音设置(存储于 conversations 集合的 mute_duration/mute_end_time) + conv, convErr := m.ConversationLocalCache.GetConversation(ctx, userID, conversationID) + if convErr != nil && !errs.ErrRecordNotFound.Is(convErr) { + return false, convErr + } + if convErr == nil && conv != nil { + var isMuted bool + switch { + case conv.MuteDuration == -1 && conv.MuteEndTime == 0: + isMuted = true + case conv.MuteEndTime > 0: + isMuted = conv.MuteEndTime > time.Now().Unix() } - if muted { + if isMuted { if pb.MsgData.Options == nil { pb.MsgData.Options = make(map[string]bool, 10) } diff --git a/internal/rpc/redpacket/admin.go b/internal/rpc/redpacket/admin.go index e2802d7cb..ec4890bf8 100644 --- a/internal/rpc/redpacket/admin.go +++ b/internal/rpc/redpacket/admin.go @@ -173,7 +173,7 @@ func (s *redPacketServer) ParseTxEvents(ctx context.Context, req *pbredpacket.Pa if s.tronClient == nil { return nil, errs.ErrInternalServer.WrapMsg("TRON client not configured") } - events, err := s.tronClient.ParseTransactionReceipt(ctx, req.TxHash) + success, events, err := s.tronClient.ParseTransactionReceiptWithStatus(ctx, req.TxHash) if err != nil { return nil, errs.ErrInternalServer.WrapMsg("parse TRON tx receipt failed: " + err.Error()) } @@ -185,12 +185,16 @@ func (s *redPacketServer) ParseTxEvents(ctx context.Context, req *pbredpacket.Pa } out = append(out, &pbredpacket.ParsedEvent{Name: e.Name, Data: data}) } - return &pbredpacket.ParseTxEventsResp{Chain: "tron", TxHash: req.TxHash, Events: out}, nil + note := "tx_status=FAILED" + if success { + note = "tx_status=SUCCESS" + } + return &pbredpacket.ParseTxEventsResp{Chain: "tron", TxHash: req.TxHash, Events: out, Note: note}, nil } if s.chainClient != nil { txHashBytes := common.HexToHash(req.TxHash) - events, err := s.chainClient.ParseTransactionReceipt(ctx, txHashBytes) + success, events, err := s.chainClient.ParseTransactionReceiptWithStatus(ctx, txHashBytes) if err != nil { return nil, errs.ErrInternalServer.WrapMsg("parse tx receipt failed: " + err.Error()) } @@ -206,10 +210,15 @@ func (s *redPacketServer) ParseTxEvents(ctx context.Context, req *pbredpacket.Pa Data: data, }) } + note := "tx_status=FAILED" + if success { + note = "tx_status=SUCCESS" + } return &pbredpacket.ParseTxEventsResp{ Chain: "eth", TxHash: req.TxHash, Events: out, + Note: note, }, nil } diff --git a/internal/rpc/redpacket/chain/client.go b/internal/rpc/redpacket/chain/client.go index 896e8c903..0db953f56 100644 --- a/internal/rpc/redpacket/chain/client.go +++ b/internal/rpc/redpacket/chain/client.go @@ -113,12 +113,29 @@ func (c *ChainClient) SignClaim(digest [32]byte) ([]byte, error) { } func (c *ChainClient) ParseTransactionReceipt(ctx context.Context, txHash common.Hash) ([]*ParsedEvent, error) { + _, events, err := c.ParseTransactionReceiptWithStatus(ctx, txHash) + return events, err +} + +// ParseTransactionReceiptWithStatus fetches tx receipt once and returns both +// execution status and decoded contract events. +func (c *ChainClient) ParseTransactionReceiptWithStatus(ctx context.Context, txHash common.Hash) (bool, []*ParsedEvent, error) { receipt, err := c.client.TransactionReceipt(ctx, txHash) if err != nil { - return nil, fmt.Errorf("get receipt failed: %w", err) + return false, nil, fmt.Errorf("get receipt failed: %w", err) } + events, err := ParseEventsFromLogs(receipt.Logs, c.contractABI) + if err != nil { + return false, nil, err + } + return receipt.Status == types.ReceiptStatusSuccessful, events, nil +} - return ParseEventsFromLogs(receipt.Logs, c.contractABI) +// IsTransactionSuccessful reports whether the EVM transaction executed +// successfully according to receipt.status (1=success, 0=failure). +func (c *ChainClient) IsTransactionSuccessful(ctx context.Context, txHash common.Hash) (bool, error) { + success, _, err := c.ParseTransactionReceiptWithStatus(ctx, txHash) + return success, err } func (c *ChainClient) ContractAddress() common.Address { diff --git a/internal/rpc/redpacket/chain/indexer.go b/internal/rpc/redpacket/chain/indexer.go index 590b6049d..9cd7a3981 100644 --- a/internal/rpc/redpacket/chain/indexer.go +++ b/internal/rpc/redpacket/chain/indexer.go @@ -14,27 +14,50 @@ import ( "github.com/openimsdk/tools/log" ) +const defaultIndexerMaxBlocksPerPoll uint64 = 2000 + type Indexer struct { - client *ChainClient - db controller.RedPacketDatabase - pollInterval time.Duration - lastBlock uint64 - contractAddr common.Address + client *ChainClient + db controller.RedPacketDatabase + pollInterval time.Duration + lastBlock uint64 + contractAddr common.Address + maxBlocksPerPoll uint64 // 0 => defaultIndexerMaxBlocksPerPoll } -func NewIndexer(client *ChainClient, db controller.RedPacketDatabase, pollInterval int, startBlock uint64) *Indexer { +func NewIndexer(client *ChainClient, db controller.RedPacketDatabase, pollInterval int, startBlock uint64, maxBlocksPerPoll int) *Indexer { if pollInterval <= 0 { pollInterval = 5 } + var maxB uint64 + if maxBlocksPerPoll > 0 { + maxB = uint64(maxBlocksPerPoll) + } return &Indexer{ - client: client, - db: db, - pollInterval: time.Duration(pollInterval) * time.Second, - lastBlock: startBlock, - contractAddr: client.contractAddr, + client: client, + db: db, + pollInterval: time.Duration(pollInterval) * time.Second, + lastBlock: startBlock, + contractAddr: client.contractAddr, + maxBlocksPerPoll: maxB, } } +func (i *Indexer) chunkEndBlock(chainTip uint64) uint64 { + maxSpan := i.maxBlocksPerPoll + if maxSpan == 0 { + maxSpan = defaultIndexerMaxBlocksPerPoll + } + if chainTip <= i.lastBlock { + return i.lastBlock + } + span := chainTip - i.lastBlock + if span > maxSpan { + return i.lastBlock + maxSpan + } + return chainTip +} + func (i *Indexer) Start(ctx context.Context) { log.ZInfo(ctx, "starting RedPacket ETH event indexer") @@ -105,20 +128,25 @@ func (i *Indexer) poll(ctx context.Context) error { return fmt.Errorf("get header failed: %w", err) } - currentBlock := header.Number.Uint64() - if currentBlock <= i.lastBlock { + chainTip := header.Number.Uint64() + if chainTip <= i.lastBlock { + return nil + } + + toBlock := i.chunkEndBlock(chainTip) + if toBlock <= i.lastBlock { return nil } query := ethereum.FilterQuery{ FromBlock: big.NewInt(int64(i.lastBlock + 1)), - ToBlock: big.NewInt(int64(currentBlock)), + ToBlock: big.NewInt(int64(toBlock)), Addresses: []common.Address{i.contractAddr}, } logs, err := i.client.client.FilterLogs(ctx, query) if err != nil { - return fmt.Errorf("filter logs failed: %w", err) + return fmt.Errorf("filter logs failed (blocks %d-%d): %w", i.lastBlock+1, toBlock, err) } logPtrs := make([]*types.Log, len(logs)) @@ -137,8 +165,12 @@ func (i *Indexer) poll(ctx context.Context) error { } } - i.lastBlock = currentBlock - log.ZInfo(ctx, "redpacket eth indexed", "block", currentBlock, "events", len(events)) + i.lastBlock = toBlock + if toBlock < chainTip { + log.ZDebug(ctx, "redpacket eth indexer chunk done, catching up", "indexedTo", toBlock, "chainTip", chainTip, "events", len(events)) + } else { + log.ZInfo(ctx, "redpacket eth indexed", "block", toBlock, "events", len(events)) + } return nil } diff --git a/internal/rpc/redpacket/chain/tron.go b/internal/rpc/redpacket/chain/tron.go index 08ff077da..0792fb64e 100644 --- a/internal/rpc/redpacket/chain/tron.go +++ b/internal/rpc/redpacket/chain/tron.go @@ -63,17 +63,28 @@ func (t *TronClient) FullNodeURL() string { } func (t *TronClient) ParseTransactionReceipt(ctx context.Context, txID string) ([]*ParsedEvent, error) { + _, events, err := t.ParseTransactionReceiptWithStatus(ctx, txID) + return events, err +} + +// ParseTransactionReceiptWithStatus fetches tx info once and returns both +// execution status and decoded contract events. +func (t *TronClient) ParseTransactionReceiptWithStatus(ctx context.Context, txID string) (bool, []*ParsedEvent, error) { info, err := t.getTransactionInfo(ctx, txID) if err != nil { - return nil, err + return false, nil, err } logs, err := tronLogsToEVMLogs(info, txID) if err != nil { - return nil, err + return false, nil, err } - - return ParseEventsFromLogs(logs, t.parsedABI) + events, err := ParseEventsFromLogs(logs, t.parsedABI) + if err != nil { + return false, nil, err + } + success := strings.EqualFold(info.Receipt.Result, "SUCCESS") + return success, events, nil } func (t *TronClient) SendAdminTransaction(ctx context.Context, methodName string, args ...interface{}) (string, error) { @@ -111,13 +122,23 @@ func (t *TronClient) GetSignMessageForTron(ctx context.Context, packetID *big.In type tronTxInfoResp struct { ID string `json:"id"` BlockNumber uint64 `json:"blockNumber"` - Log []struct { + Receipt struct { + Result string `json:"result"` + } `json:"receipt"` + Log []struct { Address string `json:"address"` Topics []string `json:"topics"` Data string `json:"data"` } `json:"log"` } +// IsTransactionSuccessful reports whether the TRON transaction execution +// succeeded based on receipt.result == "SUCCESS". +func (t *TronClient) IsTransactionSuccessful(ctx context.Context, txID string) (bool, error) { + success, _, err := t.ParseTransactionReceiptWithStatus(ctx, txID) + return success, err +} + func getParamTypes(args []interface{}) string { types := make([]string, len(args)) for i, arg := range args { diff --git a/internal/rpc/redpacket/redpacket.go b/internal/rpc/redpacket/redpacket.go index 15b9b1139..4acc321a8 100644 --- a/internal/rpc/redpacket/redpacket.go +++ b/internal/rpc/redpacket/redpacket.go @@ -137,13 +137,17 @@ func Start(ctx context.Context, conf *Config, registry discovery.SvcDiscoveryReg pbredpacket.RegisterRedPacketServer(server, srv) - if chainClient != nil { - ethIndexer := chain.NewIndexer(chainClient, repo, conf.RpcConfig.Indexer.PollInterval, 0) - ethIndexer.Start(ctx) - } - if tronClient != nil { - tronIndexer := chain.NewTronIndexer(tronClient, repo, conf.RpcConfig.Indexer.PollInterval, 0) - tronIndexer.Start(ctx) + if conf.RpcConfig.Indexer.PollInterval > 0 { + if chainClient != nil { + ethIndexer := chain.NewIndexer(chainClient, repo, conf.RpcConfig.Indexer.PollInterval, 0, conf.RpcConfig.Indexer.MaxBlocksPerPoll) + ethIndexer.Start(ctx) + } + if tronClient != nil { + tronIndexer := chain.NewTronIndexer(tronClient, repo, conf.RpcConfig.Indexer.PollInterval, 0) + tronIndexer.Start(ctx) + } + } else { + log.ZInfo(ctx, "redpacket indexer disabled by config", "pollInterval", conf.RpcConfig.Indexer.PollInterval) } return nil diff --git a/internal/rpc/redpacket/service.go b/internal/rpc/redpacket/service.go index 71aad905a..185a1bb40 100644 --- a/internal/rpc/redpacket/service.go +++ b/internal/rpc/redpacket/service.go @@ -124,6 +124,12 @@ func (s *redPacketServer) CreatedCallback(ctx context.Context, req *pbredpacket. PacketID: createdPacket.PacketID, ChainID: createdPacket.ChainID, ContractAddress: createdPacket.ContractAddress, + CreatorWallet: createdPacket.CreatorWallet, + PacketType: createdPacket.PacketType, + Token: createdPacket.Token, + TotalAmount: createdPacket.TotalAmount, + TotalShares: createdPacket.TotalShares, + ExpiryAt: createdPacket.ExpiryAt, TxHash: req.TxHash, GroupID: groupID, ScopeType: scopeType, @@ -268,15 +274,36 @@ func (s *redPacketServer) ClaimResult(ctx context.Context, req *pbredpacket.Clai return nil, err } - claimedEvent, err := s.resolveClaimedEvent(ctx, rp, req.TxHash) + txSuccess, events, err := s.parseChainReceiptWithStatus(ctx, rp, req.TxHash) + if err != nil { + log.ZWarn(ctx, "parse claim receipt failed", err, "txHash", req.TxHash) + return &pbredpacket.ClaimResultResp{}, nil + } + if !txSuccess { + if markErr := s.markClaimFailed(ctx, req.PacketID, currentUserID, req.Claimer, req.TxHash); markErr != nil { + log.ZWarn(ctx, "mark claim failed status failed", markErr, "txHash", req.TxHash) + } + return &pbredpacket.ClaimResultResp{}, nil + } + + claimedEvent, err := resolveClaimedEventFromParsedEvents(rp, events) if err != nil { log.ZWarn(ctx, "resolve claim event failed", err, "txHash", req.TxHash) + if markErr := s.markClaimFailed(ctx, req.PacketID, currentUserID, req.Claimer, req.TxHash); markErr != nil { + log.ZWarn(ctx, "mark claim failed status failed", markErr, "txHash", req.TxHash) + } return &pbredpacket.ClaimResultResp{}, nil } if claimedEvent == nil { + if markErr := s.markClaimFailed(ctx, req.PacketID, currentUserID, req.Claimer, req.TxHash); markErr != nil { + log.ZWarn(ctx, "mark claim failed status failed", markErr, "txHash", req.TxHash) + } return &pbredpacket.ClaimResultResp{}, nil } if !strings.EqualFold(claimedEvent.ClaimerWallet, req.Claimer) { + if markErr := s.markClaimFailed(ctx, req.PacketID, currentUserID, req.Claimer, req.TxHash); markErr != nil { + log.ZWarn(ctx, "mark claim failed status failed", markErr, "txHash", req.TxHash) + } return nil, errs.ErrArgs.WrapMsg(fmt.Sprintf("claim event claimer mismatch: got %s want %s", claimedEvent.ClaimerWallet, req.Claimer)) } @@ -311,6 +338,34 @@ func (s *redPacketServer) ClaimResult(ctx context.Context, req *pbredpacket.Clai return &pbredpacket.ClaimResultResp{}, nil } +func (s *redPacketServer) parseChainReceiptWithStatus(ctx context.Context, rp *model.RedPacket, txHash string) (bool, []*chain.ParsedEvent, error) { + switch rp.ChainType { + case "EVM": + if s.chainClient == nil { + return false, nil, errs.ErrInternalServer.WrapMsg("evm client is unavailable") + } + return s.chainClient.ParseTransactionReceiptWithStatus(ctx, common.HexToHash(txHash)) + case "TRON": + if s.tronClient == nil { + return false, nil, errs.ErrInternalServer.WrapMsg("tron client is unavailable") + } + return s.tronClient.ParseTransactionReceiptWithStatus(ctx, txHash) + default: + return false, nil, errs.ErrArgs.WrapMsg("unsupported chain_type: " + rp.ChainType) + } +} + +func (s *redPacketServer) markClaimFailed(ctx context.Context, packetID, userID, claimer, txHash string) error { + return s.db.SaveClaim(ctx, &model.RedPacketClaim{ + PacketID: packetID, + UserID: userID, + ClaimerWallet: claimer, + ClaimTxHash: txHash, + Status: "FAILED", + UpdatedAt: time.Now(), + }) +} + // canClaim runs the claim-eligibility check (formerly RedPacketService.CanClaim). func (s *redPacketServer) canClaim(ctx context.Context, packetID, claimer, userID string) error { rp, err := s.db.GetRedPacketByPacketID(ctx, packetID) @@ -344,6 +399,12 @@ type claimedEventSnapshot struct { BlockNumber uint64 } +type refundedEventSnapshot struct { + RefundTo string + Amount string + BlockNumber uint64 +} + type createdPacketSnapshot struct { PacketID string ChainID int64 @@ -367,10 +428,13 @@ func (s *redPacketServer) resolveCreatedPacket(ctx context.Context, rp *model.Re return buildFallbackCreatedPacket(rp, fallbackPacketID), nil } - events, err := s.chainClient.ParseTransactionReceipt(ctx, common.HexToHash(txHashHex)) + success, events, err := s.chainClient.ParseTransactionReceiptWithStatus(ctx, common.HexToHash(txHashHex)) if err != nil { return nil, errs.ErrInternalServer.WrapMsg("parse created tx failed: " + err.Error()) } + if !success { + return nil, errs.ErrArgs.WrapMsg("created tx execution failed on chain") + } for _, event := range events { if event.Name != "PacketCreated" { @@ -396,10 +460,13 @@ func (s *redPacketServer) resolveCreatedPacket(ctx context.Context, rp *model.Re return buildFallbackCreatedPacket(rp, fallbackPacketID), nil } - events, err := s.tronClient.ParseTransactionReceipt(ctx, txHashHex) + success, events, err := s.tronClient.ParseTransactionReceiptWithStatus(ctx, txHashHex) if err != nil { return nil, errs.ErrInternalServer.WrapMsg("parse tron created tx failed: " + err.Error()) } + if !success { + return nil, errs.ErrArgs.WrapMsg("created tx execution failed on chain") + } for _, event := range events { if event.Name != "PacketCreated" { @@ -795,7 +862,10 @@ func (s *redPacketServer) resolveClaimedEvent(ctx context.Context, rp *model.Red if err != nil { return nil, err } + return resolveClaimedEventFromParsedEvents(rp, events) +} +func resolveClaimedEventFromParsedEvents(rp *model.RedPacket, events []*chain.ParsedEvent) (*claimedEventSnapshot, error) { for _, event := range events { if event.Name != "PacketClaimed" { continue @@ -816,6 +886,24 @@ func (s *redPacketServer) resolveClaimedEvent(ctx context.Context, rp *model.Red return nil, nil } +func resolveRefundedEventFromParsedEvents(rp *model.RedPacket, events []*chain.ParsedEvent) (*refundedEventSnapshot, error) { + for _, event := range events { + if event.Name != "PacketRefunded" { + continue + } + packetID := chain.GetPacketIDFromEvent(event).String() + if packetID != rp.PacketID { + return nil, errs.ErrArgs.WrapMsg(fmt.Sprintf("refund event packet mismatch: got %s want %s", packetID, rp.PacketID)) + } + return &refundedEventSnapshot{ + RefundTo: strings.ToLower(chain.GetAddressFromEvent(event, "refundTo").Hex()), + Amount: chain.GetAmountFromEvent(event).String(), + BlockNumber: event.BlockNumber, + }, nil + } + return nil, nil +} + // maxTotalShares caps the number of shares to prevent abuse. const maxTotalShares = 10_000 @@ -946,7 +1034,37 @@ func (s *redPacketServer) RequestRefund(ctx context.Context, req *pbredpacket.Re } log.ZInfo(ctx, "redpacket refund submitted", "packetID", rp.PacketID, "txHash", txHash) - return &pbredpacket.RequestRefundResp{TxHash: txHash, Status: "PENDING"}, nil + txSuccess, events, parseErr := s.parseChainReceiptWithStatus(ctx, rp, txHash) + if parseErr != nil { + log.ZWarn(ctx, "parse refund receipt failed, fallback to async indexer", parseErr, "packetID", rp.PacketID, "txHash", txHash) + return &pbredpacket.RequestRefundResp{TxHash: txHash, Status: "PENDING"}, nil + } + if !txSuccess { + return &pbredpacket.RequestRefundResp{TxHash: txHash, Status: "FAILED"}, nil + } + + refundedEvent, err := resolveRefundedEventFromParsedEvents(rp, events) + if err != nil { + log.ZWarn(ctx, "resolve refunded event failed, fallback to async indexer", err, "packetID", rp.PacketID, "txHash", txHash) + return &pbredpacket.RequestRefundResp{TxHash: txHash, Status: "PENDING"}, nil + } + if refundedEvent == nil { + return &pbredpacket.RequestRefundResp{TxHash: txHash, Status: "PENDING"}, nil + } + + if err := s.db.SaveRefund(ctx, &model.RedPacketRefund{ + PacketID: rp.PacketID, + RefundTo: refundedEvent.RefundTo, + TxHash: txHash, + Amount: refundedEvent.Amount, + CreatedAt: time.Now(), + }); err != nil { + return nil, err + } + if err := s.db.UpdateRedPacketStatus(ctx, rp.PacketID, "REFUNDED"); err != nil { + return nil, err + } + return &pbredpacket.RequestRefundResp{TxHash: txHash, Status: "REFUNDED"}, nil } func (s *redPacketServer) GetRefund(ctx context.Context, req *pbredpacket.GetRefundReq) (*pbredpacket.GetRefundResp, error) { diff --git a/internal/rpc/relation/friend.go b/internal/rpc/relation/friend.go index da6d360ea..737be75f2 100644 --- a/internal/rpc/relation/friend.go +++ b/internal/rpc/relation/friend.go @@ -286,6 +286,29 @@ func (s *friendServer) DeleteFriend(ctx context.Context, req *relation.DeleteFri return &relation.DeleteFriendResp{}, nil } +// DeleteFriendOneway 单向删除好友:只删除 ownerUserID 侧的 friend 文档; +// 对端 friend 文档保留;仅向 owner 下发 FriendsInfoUpdateNotification 以刷新本地好友列表, +// 不向对端发送 FriendDeletedNotification。 +func (s *friendServer) DeleteFriendOneway(ctx context.Context, req *relation.DeleteFriendReq) (resp *relation.DeleteFriendResp, err error) { + if err := authverify.CheckAccessV3(ctx, req.OwnerUserID, s.config.Share.IMAdminUserID); err != nil { + return nil, err + } + + _, err = s.db.FindFriendsWithError(ctx, req.OwnerUserID, []string{req.FriendUserID}) + if err != nil { + return nil, err + } + + if err := s.db.Delete(ctx, req.OwnerUserID, []string{req.FriendUserID}); err != nil { + return nil, err + } + + s.notificationSender.FriendDeletedOnewayNotification(ctx, req.OwnerUserID, req.FriendUserID) + s.webhookAfterDeleteFriend(ctx, &s.config.WebhooksConfig.AfterDeleteFriend, req) + + return &relation.DeleteFriendResp{}, nil +} + // ok. func (s *friendServer) SetFriendRemark(ctx context.Context, req *relation.SetFriendRemarkReq) (resp *relation.SetFriendRemarkResp, err error) { if err = s.webhookBeforeSetFriendRemark(ctx, &s.config.WebhooksConfig.BeforeSetFriendRemark, req); err != nil && err != servererrs.ErrCallbackContinue { @@ -726,6 +749,7 @@ func (s *friendServer) AddOnewayFriend(ctx context.Context, req *relation.ApplyT return &relation.ApplyToAddFriendResp{}, nil } +// SetMute 设置用户对另一用户的静音:duration 为秒;0=取消静音;-1=永久静音;>0=从现在起持续 duration 秒后自动解除。 func (s *friendServer) SetMute(ctx context.Context, req *relation.SetMuteReq) (*relation.SetMuteResp, error) { if err := authverify.CheckAccessV3(ctx, req.OwnerUserID, s.config.Share.IMAdminUserID); err != nil { return nil, err @@ -733,19 +757,23 @@ func (s *friendServer) SetMute(ctx context.Context, req *relation.SetMuteReq) (* if req.Duration == 0 { return &relation.SetMuteResp{}, s.userMuteDB.Delete(ctx, req.OwnerUserID, req.TargetUserID) } + if req.Duration < 0 && req.Duration != -1 { + return nil, errs.ErrArgs.WrapMsg("duration must be 0 (unmute), -1 (permanent), or positive seconds") + } var muteEndTime int64 if req.Duration != -1 { muteEndTime = time.Now().Unix() + req.Duration } return &relation.SetMuteResp{}, s.userMuteDB.Upsert(ctx, &model.UserMute{ - OwnerUserID: req.OwnerUserID, - MutedUserID: req.TargetUserID, - MuteEndTime: muteEndTime, - MuteDuration: req.Duration, - CreateTime: time.Now(), + OwnerUserID: req.OwnerUserID, + MutedUserID: req.TargetUserID, + MuteEndTime: muteEndTime, + MuteDuration: req.Duration, + CreateTime: time.Now(), }) } +// GetMute 查询静音状态:未静音或已过期时 muted=false、duration=0;永久静音为 duration=-1 且 muteEndTime=0。 func (s *friendServer) GetMute(ctx context.Context, req *relation.GetMuteReq) (*relation.GetMuteResp, error) { if err := authverify.CheckAccessV3(ctx, req.OwnerUserID, s.config.Share.IMAdminUserID); err != nil { return nil, err @@ -759,13 +787,9 @@ func (s *friendServer) GetMute(ctx context.Context, req *relation.GetMuteReq) (* } now := time.Now().Unix() if rec.MuteEndTime != 0 && rec.MuteEndTime <= now { - return &relation.GetMuteResp{Muted: false, MuteEndTime: 0, Duration: 0}, nil - } - duration := rec.MuteDuration - if duration == 0 && rec.MuteEndTime == 0 { - duration = -1 + return &relation.GetMuteResp{Muted: false, MuteEndTime: 0, Duration: rec.MuteDuration}, nil } - return &relation.GetMuteResp{Muted: true, MuteEndTime: rec.MuteEndTime, Duration: duration}, nil + return &relation.GetMuteResp{Muted: true, MuteEndTime: rec.MuteEndTime, Duration: rec.MuteDuration}, nil } func (s *friendServer) getCommonUserMap(ctx context.Context, userIDs []string) (map[string]common_user.CommonUser, error) { diff --git a/internal/rpc/relation/notification.go b/internal/rpc/relation/notification.go index d9d773c76..10f7e7736 100644 --- a/internal/rpc/relation/notification.go +++ b/internal/rpc/relation/notification.go @@ -241,6 +241,18 @@ func (f *FriendNotificationSender) FriendDeletedNotification(ctx context.Context f.Notification(ctx, req.OwnerUserID, req.FriendUserID, constant.FriendDeletedNotification, &tips) } +// FriendDeletedOnewayNotification 单向删好友:仅通知操作方本人刷新好友列表, +// 不向对端发送 FriendDeletedNotification,对端仍保留「你是 TA 好友」的关系。 +func (f *FriendNotificationSender) FriendDeletedOnewayNotification(ctx context.Context, ownerUserID, friendUserID string) { + tips := sdkws.FriendsInfoUpdateTips{ + FromToUserID: &sdkws.FromToUserID{ToUserID: ownerUserID}, + FriendIDs: []string{friendUserID}, + } + f.setSortVersion(ctx, &tips.FriendVersion, &tips.FriendVersionID, + database.FriendVersionName, ownerUserID, &tips.FriendSortVersion) + f.Notification(ctx, ownerUserID, ownerUserID, constant.FriendsInfoUpdateNotification, &tips) +} + func (f *FriendNotificationSender) setVersion(ctx context.Context, version *uint64, versionID *string, collName string, id string) { versions := versionctx.GetVersionLog(ctx).Get() for _, coll := range versions { diff --git a/internal/rpc/rtc/signal.go b/internal/rpc/rtc/signal.go index be4b8ea50..89dfeb52d 100644 --- a/internal/rpc/rtc/signal.go +++ b/internal/rpc/rtc/signal.go @@ -97,6 +97,10 @@ func (s *rtcServer) handleInvite(ctx context.Context, req *rtc.SignalInviteReq, inv.InviterUserID = req.UserID inv.InitiateTime = time.Now().UnixMilli() + if len(inv.InviteeUserIDList) == 0 { + return nil, errs.ErrArgs.WrapMsg("no invitees", "inviteeUserIDList", inv.InviteeUserIDList) + } + for _, inviteeID := range inv.InviteeUserIDList { allowed, err := s.isCallAllowed(ctx, req.UserID, inviteeID) if err != nil { @@ -119,6 +123,10 @@ func (s *rtcServer) handleInvite(ctx context.Context, req *rtc.SignalInviteReq, } inv.BusyLineUserIDList = busyUserIDs + if len(inv.InviteeUserIDList) == len(busyUserIDs) { + return nil, errs.ErrNoPermission.WrapMsg("all invitees are busy", "inviteeUserIDList", inv.InviteeUserIDList) + } + // 从主叫用户资料获取铃声 URL,注入到邀请信息中,被叫方收到后播放主叫方铃声 if inviterInfo, err := s.userClient.GetUserInfo(ctx, req.UserID); err == nil && inviterInfo.CallRingtoneURL != "" { inv.CallerRingtoneURL = inviterInfo.CallRingtoneURL @@ -167,7 +175,7 @@ func (s *rtcServer) handleInvite(ctx context.Context, req *rtc.SignalInviteReq, continue } log.ZInfo(ctx, "sendSignalingNotification to invitee", "sendID", req.UserID, "recvID", inviteeID) - if err := s.sendSignalingNotification(ctx, req.UserID, inviteeID, int32(constant.SingleChatType), req.OfflinePushInfo, content); err != nil { + if err := s.sendSignalingNotification(ctx, req.UserID, inviteeID, int32(constant.SingleChatType), "", req.OfflinePushInfo, content); err != nil { log.ZError(ctx, "sendSignalingNotification to invitee failed", err, "inviteeID", inviteeID) return nil, errs.WrapMsg(err, "failed to notify invitee", "inviteeID", inviteeID) } @@ -189,6 +197,9 @@ func (s *rtcServer) handleInviteInGroup(ctx context.Context, req *rtc.SignalInvi if inv == nil { return nil, errs.ErrArgs.WrapMsg("invitation is nil") } + if inv.GroupID == "" { + return nil, errs.ErrArgs.WrapMsg("groupID is empty") + } inv.RoomID = newRoomID() inv.InviterUserID = req.UserID @@ -258,18 +269,22 @@ func (s *rtcServer) handleInviteInGroup(ctx context.Context, req *rtc.SignalInvi log.ZInfo(ctx, "handleInviteInGroup: skip busy invitee", "inviteeID", inviteeID) continue } - if err := s.sendSignalingNotification(ctx, req.UserID, inviteeID, int32(constant.ReadGroupChatType), req.OfflinePushInfo, content); err != nil { - log.ZWarn(ctx, "sendSignalingNotification to group invitee failed", err, "inviteeID", inviteeID) + if err := s.sendSignalingNotification(ctx, req.UserID, inviteeID, int32(constant.ReadGroupChatType), inv.GroupID, req.OfflinePushInfo, content); err != nil { + log.ZWarn(ctx, "handleInviteInGroup to group invitee failed", err, "inviteeID", inviteeID) } } - return &rtc.SignalInviteInGroupResp{ + resp := &rtc.SignalInviteInGroupResp{ Token: token, RoomID: inv.RoomID, LiveURL: s.config.RpcConfig.LiveKit.ExternalAddress, BusyLineUserIDList: busyUserIDs, CalleeRingtoneURL: calleeRingtoneURL, - }, nil + } + + log.ZDebug(ctx, "handleInviteInGroup", "req", req, "resp", resp) + + return resp, nil } // isCallAllowed 判断 inviterID 是否被允许向 inviteeID 发起音视频通话。 @@ -325,7 +340,7 @@ func (s *rtcServer) handleAccept(ctx context.Context, req *rtc.SignalAcceptReq, return nil, err } - if err := s.sendSignalingNotification(ctx, req.UserID, dbInv.InviterUserID, sessionType, req.OfflinePushInfo, content); err != nil { + if err := s.sendSignalingNotification(ctx, req.UserID, dbInv.InviterUserID, sessionType, dbInv.GroupID, req.OfflinePushInfo, content); err != nil { log.ZWarn(ctx, "sendSignalingNotification accept to inviter failed", err, "inviterID", dbInv.InviterUserID) } @@ -365,7 +380,7 @@ func (s *rtcServer) handleReject(ctx context.Context, req *rtc.SignalRejectReq, if err != nil { return nil, err } - if err := s.sendSignalingNotification(ctx, req.UserID, dbInv.InviterUserID, sessionType, req.OfflinePushInfo, content); err != nil { + if err := s.sendSignalingNotification(ctx, req.UserID, dbInv.InviterUserID, sessionType, dbInv.GroupID, req.OfflinePushInfo, content); err != nil { log.ZWarn(ctx, "sendSignalingNotification reject to inviter failed", err, "inviterID", dbInv.InviterUserID) } @@ -405,7 +420,7 @@ func (s *rtcServer) handleCancel(ctx context.Context, req *rtc.SignalCancelReq, return nil, err } for _, inviteeID := range dbInv.InviteeUserIDList { - if err := s.sendSignalingNotification(ctx, req.UserID, inviteeID, sessionType, req.OfflinePushInfo, content); err != nil { + if err := s.sendSignalingNotification(ctx, req.UserID, inviteeID, sessionType, dbInv.GroupID, req.OfflinePushInfo, content); err != nil { log.ZWarn(ctx, "sendSignalingNotification cancel to invitee failed", err, "inviteeID", inviteeID) } } @@ -441,7 +456,7 @@ func (s *rtcServer) handleHungUp(ctx context.Context, req *rtc.SignalHungUpReq, } // 使用 DB 中的参与者列表,不信任客户端传入的 InviteeUserIDList for _, peerID := range hungUpPeerIDsFromDB(dbInv, req.UserID) { - if err := s.sendSignalingNotification(ctx, req.UserID, peerID, sessionType, req.OfflinePushInfo, content); err != nil { + if err := s.sendSignalingNotification(ctx, req.UserID, peerID, sessionType, dbInv.GroupID, req.OfflinePushInfo, content); err != nil { log.ZWarn(ctx, "sendSignalingNotification hungUp to peer failed", err, "peerID", peerID) } } @@ -670,12 +685,14 @@ func signalingMsgOptions() map[string]bool { } // sendSignalingNotification sends a SignalingNotification message to a user via the msg service. -func (s *rtcServer) sendSignalingNotification(ctx context.Context, sendID, recvID string, sessionType int32, offlinePush *sdkws.OfflinePushInfo, content []byte) error { +// groupID 在 SessionType 为群类型(如 ReadGroupChatType)时必须非空,否则 msg 服务群聊校验会失败。 +func (s *rtcServer) sendSignalingNotification(ctx context.Context, sendID, recvID string, sessionType int32, groupID string, offlinePush *sdkws.OfflinePushInfo, content []byte) error { now := time.Now().UnixMilli() msgData := &sdkws.MsgData{ SendID: sendID, RecvID: recvID, SessionType: sessionType, + GroupID: groupID, ContentType: int32(constant.SignalingNotification), MsgFrom: int32(constant.SysMsgType), Content: content, diff --git a/internal/tools/burn_msg.go b/internal/tools/burn_msg.go index 1285ca6e1..3adee966a 100644 --- a/internal/tools/burn_msg.go +++ b/internal/tools/burn_msg.go @@ -25,17 +25,15 @@ import ( ) // clearBurnExpiredMsgs 阅后即焚 cron 入口:循环调用 conversation 服务的 -// ClearBurnExpiredMsgs,每次至多处理 burnLimit 个 (user, conversation) 分组, -// 直至本轮没有新的过期分组或达到防御性的最大循环次数。 +// ClearBurnExpiredMsgs,每次至多处理 burnClearLimit 个 (user, conversation) 分组, +// 直至本轮没有新的过期分组或达到 burnClearMaxLoop 轮。 func (c *cronServer) clearBurnExpiredMsgs() { now := time.Now() operationID := fmt.Sprintf("cron_burn_msg_%d_%d", os.Getpid(), now.UnixMilli()) ctx := mcontext.SetOperationID(c.ctx, operationID) log.ZDebug(ctx, "clear burn expired msgs cron start") - const ( - maxLoop = 10000 - burnLimit = 100 - ) + burnLimit := int32(c.config.CronTask.BurnClearLimit) + maxLoop := c.config.CronTask.BurnClearMaxLoop var count int for i := 1; i <= maxLoop; i++ { resp, err := c.conversationClient.ClearBurnExpiredMsgs(ctx, &pbconversation.ClearBurnExpiredMsgsReq{ diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 452ed6fd6..fc9a1809c 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -16,6 +16,7 @@ package tools import ( "context" + "strings" "github.com/openimsdk/open-im-server/v3/pkg/common/config" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" @@ -44,36 +45,37 @@ type CronTaskConfig struct { MongodbConfig config.Mongo } -func Start(ctx context.Context, config *CronTaskConfig) error { - log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime", config.CronTask.CronExecuteTime, "msgDestructTime", config.CronTask.RetainChatRecords) - if config.CronTask.RetainChatRecords < 1 { +func Start(ctx context.Context, cfg *CronTaskConfig) error { + config.FillCronTaskDefaults(&cfg.CronTask) + log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime", cfg.CronTask.CronExecuteTime, "msgDestructTime", cfg.CronTask.RetainChatRecords) + if cfg.CronTask.RetainChatRecords < 1 { return errs.New("msg destruct time must be greater than 1").Wrap() } - client, err := kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share, nil) + client, err := kdisc.NewDiscoveryRegister(&cfg.Discovery, &cfg.Share, nil) if err != nil { return errs.WrapMsg(err, "failed to register discovery service") } client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) - ctx = mcontext.SetOpUserID(ctx, config.Share.IMAdminUserID[0]) + ctx = mcontext.SetOpUserID(ctx, cfg.Share.IMAdminUserID[0]) - msgConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Msg) + msgConn, err := client.GetConn(ctx, cfg.Share.RpcRegisterName.Msg) if err != nil { return err } - thirdConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third) + thirdConn, err := client.GetConn(ctx, cfg.Share.RpcRegisterName.Third) if err != nil { return err } - conversationConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Conversation) + conversationConn, err := client.GetConn(ctx, cfg.Share.RpcRegisterName.Conversation) if err != nil { return err } - authConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Auth) + authConn, err := client.GetConn(ctx, cfg.Share.RpcRegisterName.Auth) if err != nil { return err } - mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) + mgocli, err := mongoutil.NewMongoDB(ctx, cfg.MongodbConfig.Build()) if err != nil { return errs.WrapMsg(err, "crontask: connect mongodb failed") } @@ -86,14 +88,14 @@ func Start(ctx context.Context, config *CronTaskConfig) error { srv := &cronServer{ ctx: ctx, - config: config, + config: cfg, cron: cron.New(), msgClient: msg.NewMsgClient(msgConn), conversationClient: pbconversation.NewConversationClient(conversationConn), thirdClient: third.NewThirdClient(thirdConn), authClient: rpcli.NewAuthClient(authConn), userOfflineRecordDB: userOfflineRecordDB, - chatAPIAddress: config.CronTask.ChatAPI.Address, + chatAPIAddress: cfg.CronTask.ChatAPI.Address, } if err := srv.registerClearS3(); err != nil { @@ -114,7 +116,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error { if err := srv.registerDeleteExpiredOfflineUsers(); err != nil { return err } - log.ZDebug(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime) + log.ZDebug(ctx, "start cron task", "CronExecuteTime", cfg.CronTask.CronExecuteTime) srv.cron.Start() <-ctx.Done() return nil @@ -156,7 +158,11 @@ func (c *cronServer) registerClearUserMsg() error { } func (c *cronServer) registerClearBurnExpiredMsgs() error { - _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearBurnExpiredMsgs) + schedule := strings.TrimSpace(c.config.CronTask.BurnCronExecuteTime) + if schedule == "" { + schedule = c.config.CronTask.CronExecuteTime + } + _, err := c.cron.AddFunc(schedule, c.clearBurnExpiredMsgs) return errs.WrapMsg(err, "failed to register clear burn expired msgs cron task") } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 40ce852b1..34673884a 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -115,10 +115,18 @@ type API struct { } type CronTask struct { + // CronExecuteTime 标准 5 段 cron 表达式,供聊天记录清理、S3 清理、用户消息清理及阅后即焚清理等任务共用(除非 burnCronExecuteTime 单独指定)。 + // 未配置时由 FillCronTaskDefaults 设为每天 02:00(与 config/openim-crontask.yml 一致)。 CronExecuteTime string `mapstructure:"cronExecuteTime"` RetainChatRecords int `mapstructure:"retainChatRecords"` FileExpireTime int `mapstructure:"fileExpireTime"` DeleteObjectType []string `mapstructure:"deleteObjectType"` + // BurnCronExecuteTime 仅「单聊阅后即焚」清理(ClearBurnExpiredMsgs)的 cron;留空则与 CronExecuteTime 相同。 + BurnCronExecuteTime string `mapstructure:"burnCronExecuteTime"` + // BurnClearLimit 单次 RPC 最多处理多少个 (user, conversation) 分组;<=0 时默认 100。 + BurnClearLimit int `mapstructure:"burnClearLimit"` + // BurnClearMaxLoop 单次定时触发内最多循环轮数;<=0 时默认 10000。 + BurnClearMaxLoop int `mapstructure:"burnClearMaxLoop"` // ChatAPI 是 chat HTTP API 服务的访问配置,用于调用 /account/del 等需要管理员权限的接口。 ChatAPI ChatAPI `mapstructure:"chatAPI"` } @@ -290,9 +298,9 @@ type Group struct { AutoSetPorts bool `mapstructure:"autoSetPorts"` Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` - Prometheus Prometheus `mapstructure:"prometheus"` - EnableHistoryForNewMembers bool `mapstructure:"enableHistoryForNewMembers"` - CommonGroupsLimitWithFriend int `mapstructure:"commonGroupsLimitWithFriend"` + Prometheus Prometheus `mapstructure:"prometheus"` + EnableHistoryForNewMembers bool `mapstructure:"enableHistoryForNewMembers"` + CommonGroupsLimitWithFriend int `mapstructure:"commonGroupsLimitWithFriend"` } type Msg struct { @@ -483,7 +491,7 @@ type Crypto struct { AutoSetPorts bool `mapstructure:"autoSetPorts"` Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` - Prometheus Prometheus `mapstructure:"prometheus"` + Prometheus Prometheus `mapstructure:"prometheus"` Virgil VirgilConfig `mapstructure:"virgil"` } @@ -524,9 +532,11 @@ type RedPacketTron struct { type RedPacketIndexer struct { PollInterval int `mapstructure:"pollInterval"` + // MaxBlocksPerPoll limits each eth_getLogs range (lastBlock+1 .. toBlock). Many public RPCs + // reject large ranges; 0 means use default 2000. + MaxBlocksPerPoll int `mapstructure:"maxBlocksPerPoll"` } - // FullConfig stores all configurations for before and after events type Webhooks struct { URL string `mapstructure:"url"` @@ -780,6 +790,37 @@ func (ct *CronTask) GetConfigFileName() string { return OpenIMCronTaskCfgFileName } +// FillCronTaskDefaults applies defaults after YAML/env load. Only fills empty or invalid placeholders. +func FillCronTaskDefaults(ct *CronTask) { + if ct == nil { + return + } + if strings.TrimSpace(ct.CronExecuteTime) == "" { + ct.CronExecuteTime = "0 2 * * *" + } + if strings.TrimSpace(ct.BurnCronExecuteTime) == "" { + ct.BurnCronExecuteTime = "*/1 * * * *" + } + if ct.BurnClearLimit <= 0 { + ct.BurnClearLimit = 100 + } + if ct.BurnClearMaxLoop <= 0 { + ct.BurnClearMaxLoop = 100 + } + if ct.ChatAPI.Address == "" { + ct.ChatAPI.Address = "http://127.0.0.1:10008" + } + if ct.RetainChatRecords < 1 { + ct.RetainChatRecords = 365 + } + if ct.BurnClearLimit < 1 { + ct.BurnClearLimit = 100 + } + if ct.BurnClearMaxLoop < 1 { + ct.BurnClearMaxLoop = 10000 + } +} + func (mg *MsgGateway) GetConfigFileName() string { return OpenIMMsgGatewayCfgFileName } diff --git a/pkg/common/storage/database/mgo/msg_burn_deadline.go b/pkg/common/storage/database/mgo/msg_burn_deadline.go index cc9b6832f..3ae08a3c3 100644 --- a/pkg/common/storage/database/mgo/msg_burn_deadline.go +++ b/pkg/common/storage/database/mgo/msg_burn_deadline.go @@ -66,6 +66,7 @@ func (m *msgBurnDeadlineMgo) UpsertIfAbsent(ctx context.Context, items []*model. "user_id": item.UserID, "conversation_id": item.ConversationID, "seq": item.Seq, + "peer_id": item.PeerID, "deadline_ms": item.DeadlineMs, "create_time": item.CreateTime, } @@ -91,6 +92,7 @@ func (m *msgBurnDeadlineMgo) FindExpiredGroups(ctx context.Context, nowMs int64, "user_id": "$user_id", "conversation_id": "$conversation_id", }, + "peer_id": bson.M{"$first": "$peer_id"}, "max_seq": bson.M{"$max": "$seq"}, "seqs": bson.M{"$push": "$seq"}, }}}, @@ -101,6 +103,7 @@ func (m *msgBurnDeadlineMgo) FindExpiredGroups(ctx context.Context, nowMs int64, UserID string `bson:"user_id"` ConversationID string `bson:"conversation_id"` } `bson:"_id"` + PeerID string `bson:"peer_id"` MaxSeq int64 `bson:"max_seq"` Seqs []int64 `bson:"seqs"` } @@ -113,6 +116,7 @@ func (m *msgBurnDeadlineMgo) FindExpiredGroups(ctx context.Context, nowMs int64, res = append(res, &database.ExpiredBurnGroup{ UserID: r.ID.UserID, ConversationID: r.ID.ConversationID, + PeerID: r.PeerID, MaxSeq: r.MaxSeq, Seqs: r.Seqs, }) diff --git a/pkg/common/storage/database/mgo/redpacket.go b/pkg/common/storage/database/mgo/redpacket.go index 0cf51b4c5..536551439 100644 --- a/pkg/common/storage/database/mgo/redpacket.go +++ b/pkg/common/storage/database/mgo/redpacket.go @@ -75,6 +75,12 @@ func (m *RedPacketMgo) UpdateCreated(ctx context.Context, rp *model.RedPacket) e "tx_hash": rp.TxHash, "chain_id": rp.ChainID, "contract_address": rp.ContractAddress, + "creator_wallet": rp.CreatorWallet, + "packet_type": rp.PacketType, + "token": rp.Token, + "total_amount": rp.TotalAmount, + "total_shares": rp.TotalShares, + "expiry_at": rp.ExpiryAt, "group_id": rp.GroupID, "scope_type": rp.ScopeType, "receiver_user_id": rp.ReceiverUserID, diff --git a/pkg/common/storage/database/mgo/user.go b/pkg/common/storage/database/mgo/user.go index 25044cb1d..cf7e3479a 100644 --- a/pkg/common/storage/database/mgo/user.go +++ b/pkg/common/storage/database/mgo/user.go @@ -82,7 +82,6 @@ func (u *UserMgo) UpdateByMap(ctx context.Context, userID string, args map[strin "first_name", "last_name", "full_name", - "remark", "face_url", "phone_number", "area_code", diff --git a/pkg/common/storage/database/msg_burn_deadline.go b/pkg/common/storage/database/msg_burn_deadline.go index f5d7676ea..829d77b80 100644 --- a/pkg/common/storage/database/msg_burn_deadline.go +++ b/pkg/common/storage/database/msg_burn_deadline.go @@ -24,6 +24,8 @@ import ( type ExpiredBurnGroup struct { UserID string ConversationID string + // PeerID 单聊中的对端用户 ID,直接从 deadline 记录读取,无需额外查 conversation 表。 + PeerID string // MaxSeq 当前批次中最大的过期 seq;推进 min_seq 时使用 MaxSeq + 1。 MaxSeq int64 // Seqs 当前批次实际涉及的所有过期 seq,便于精确删除已处理的 deadline 记录。 diff --git a/pkg/common/storage/model/conversation.go b/pkg/common/storage/model/conversation.go index 590899b3f..d052b25f7 100644 --- a/pkg/common/storage/model/conversation.go +++ b/pkg/common/storage/model/conversation.go @@ -37,4 +37,6 @@ type Conversation struct { IsMsgDestruct bool `bson:"is_msg_destruct"` MsgDestructTime int64 `bson:"msg_destruct_time"` LatestMsgDestructTime time.Time `bson:"latest_msg_destruct_time"` + MuteDuration int32 `bson:"mute_duration"` + MuteEndTime int64 `bson:"mute_end_time"` } diff --git a/pkg/common/storage/model/msg_burn_deadline.go b/pkg/common/storage/model/msg_burn_deadline.go index ef1055662..d44849f46 100644 --- a/pkg/common/storage/model/msg_burn_deadline.go +++ b/pkg/common/storage/model/msg_burn_deadline.go @@ -24,6 +24,9 @@ type MsgBurnDeadline struct { UserID string `bson:"user_id"` ConversationID string `bson:"conversation_id"` Seq int64 `bson:"seq"` + // PeerID 单聊中的对端用户 ID。 + // cron 处理时可直接获取对端,无需额外查询 conversation 表。 + PeerID string `bson:"peer_id"` // DeadlineMs 截止时间戳(毫秒);超过即可被 cron 收走推进 min_seq。 DeadlineMs int64 `bson:"deadline_ms"` // CreateTime 写入时刻(毫秒);用于排查/审计。 diff --git a/pkg/rpcli/msg.go b/pkg/rpcli/msg.go index d439d0c12..b5c6ef4b6 100644 --- a/pkg/rpcli/msg.go +++ b/pkg/rpcli/msg.go @@ -113,3 +113,27 @@ func (x *MsgClient) SetUserConversationsMinSeq(ctx context.Context, conversation req := &msg.SetUserConversationsMinSeqReq{ConversationID: conversationID, UserIDs: userIDs, Seq: seq} return ignoreResp(x.MsgClient.SetUserConversationsMinSeq(ctx, req)) } + +// DeleteMsgs 按 seq 删除消息,行为与 msg RPC DeleteMsgs 一致。 +func (x *MsgClient) DeleteMsgs(ctx context.Context, userID, conversationID string, seqs []int64, deleteSyncOpt *msg.DeleteSyncOpt) error { + if len(seqs) == 0 { + return nil + } + req := &msg.DeleteMsgsReq{ + ConversationID: conversationID, + UserID: userID, + Seqs: seqs, + DeleteSyncOpt: deleteSyncOpt, + } + return ignoreResp(x.MsgClient.DeleteMsgs(ctx, req)) +} + +// DeleteMsgPhysicalBySeqs 按 seq 物理删除会话内的消息(无鉴权)。 +// 用于阅后即焚、系统级消息清理等场景。 +func (x *MsgClient) DeleteMsgPhysicalBySeqs(ctx context.Context, conversationID string, seqs []int64) error { + if len(seqs) == 0 { + return nil + } + req := &msg.DeleteMsgPhysicalBySeqReq{ConversationID: conversationID, Seqs: seqs} + return ignoreResp(x.MsgClient.DeleteMsgPhysicalBySeq(ctx, req)) +} diff --git a/protocol b/protocol index 2e1f23fe0..7eb773753 160000 --- a/protocol +++ b/protocol @@ -1 +1 @@ -Subproject commit 2e1f23fe06d15adcabf07ef72c1e8b08bca0f2c1 +Subproject commit 7eb7737531d6ff779571e77f5a49aa21ce9bcb43