Merge branch 'develop/tom' into feature/group_burn_msg

pull/3727/head
haoyunlt 2 weeks ago committed by GitHub
commit 29b5ac24c0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,14 +1,14 @@
# Cursor 代码索引忽略(语法与 .gitignore 相同)
# 与根目录 .gitignore 对齐;未列出的规则仍以 .gitignore 为准Git 不索引的路径 Cursor 通常也不关心)
# 与根目录 .gitignore 对齐;以下为补充规则,减少生成物/文档噪音,保留业务源码与 .proto
### OpenIM与 .gitignore 一致)###
logs
.devcontainer
components
out-test
logs/
.devcontainer/
components/
out-test/
Dockerfile.cross
### macOS / 本地工具(不入索引)###
### macOS / 本地工具 ###
.DS_Store
.playwright-mcp/
@ -17,26 +17,51 @@ tmp/
bin/
output/
_output/
build/
dist/
deployments/charts/generated-configs/
### 配置与密钥(勿入索引)###
.env
config/config.yaml
config/notification.yaml
start-config.yml
### 部署生成物 ###
deployments/openim-server/charts
deployments/openim-server/charts/
### 本地笔记 ###
.idea.md
.todo.md
.note.md
### 生成代码(以 .proto 为准,勿重复索引)###
protocol/**/*.pb.go
protocol/**/*_grpc.pb.go
### 文档与资源(保留 docs/contrib、根 README忽略多语言 readme 与静态资源)###
docs/readme/
docs/.generated_docs
docs/contributing/
assets/
virgil_chat_server_design.md
docs/virgil-e2ee-*.md
### 测试与脚本输出 ###
test/e2e/output/
scripts/**/*.log
### 通用备份与临时文件 ###
*.bak
*.gho
*.ori
*.orig
*.tmp
*~
dist/
*.BACKUP.*
*.BASE.*
*.LOCAL.*
*.REMOTE.*
### VS Code除团队共享配置外###
.vscode/*
@ -44,6 +69,7 @@ dist/
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
*.code-workspace
### Go ###
*.exe
@ -54,13 +80,19 @@ dist/
*.test
*.out
vendor/
go.work
go.work.sum
go.sum
### JetBrains / IDE ###
.idea/
out/
### Tags ###
### Git / CI低价值索引###
.git/
.github/
### Tags / 索引工具 ###
TAGS
tags
gtags.files
@ -70,3 +102,5 @@ GPATH
GSYMS
cscope.files
cscope.out
cscope.in.out
cscope.po.out

@ -1,8 +1,9 @@
# RedPacket 前端对接文档
本文档面向前端 / 网关 / App 对接方,说明红包领取和钱包绑定的真实接入方式,重点覆盖:
本文档面向前端 / 网关 / App 对接方,说明红包创建、领取和钱包绑定的真实接入方式,重点覆盖:
- 如何把当前登录用户传递给红包服务
- 如何创建红包(业务单 + 链上创建 + 回写激活)
- 如何绑定钱包
- 如何申请领取签名
- 前端何时发链、何时回写后端
@ -97,7 +98,111 @@ Content-Type: application/json
已经在后端建立了有效绑定关系。
## 3. 领取签名流程
## 3. 创建红包流程
### 4.1 流程图
```text
前端 -> 红包服务: POST /api/redpacket/create-order
红包服务 -> 前端: biz_id (状态 PENDING)
前端 -> 钱包/链上: createFixedPacket/createRandomPacket/createTransfer
链上 -> 前端: tx_hash + packet_id(从事件或回执解析)
前端 -> 红包服务: POST /api/redpacket/created-callback
红包服务 -> 红包服务: 校验创建参数并激活红包
红包服务 -> 前端: 回写成功 (状态 ACTIVE)
前端 -> 红包服务: POST /api/redpacket/detail (可选)
```
### 3.2 创建业务单(发链前必调)
请求:
```http
POST /api/redpacket/create-order
token: <user token>
Content-Type: application/json
```
```json
{
"chain_type": "EVM",
"chain_id": 1,
"contract_address": "0xA1f42567559aBA5Ff0aac84cdE1AaF1F9DbB888F",
"creator_wallet": "0x1111111111111111111111111111111111111111",
"group_id": "g001",
"scope_type": "GROUP",
"receiver_user_id": "",
"receiver_user_ids": [],
"packet_type": 1,
"token": "0x2222222222222222222222222222222222222222",
"total_amount": "1000000000000000000",
"total_shares": 10,
"expiry_at": 0,
"remark": "happy new year"
}
```
关键说明:
- 不需要传 `user_id`,创建人从上下文 `opUserID`
- `total_amount` 必须是链上最小单位十进制字符串(例如 wei
- `packet_type`: `0` 固定红包,`1` 拼手气红包,`2` 转账
- `scope_type=GROUP` 时必须传 `group_id`
- `scope_type=DIRECT` 时必须传 `receiver_user_id``receiver_user_ids`
成功响应里最关键的是:
- `biz_id`: 业务红包单号(后续回写必须带上)
### 3.3 链上创建红包
前端拿到 `biz_id` 后,再调用链上创建方法:
- 固定红包:`createFixedPacket(...)`
- 拼手气红包:`createRandomPacket(...)`
- 转账红包:`createTransfer(...)`
链上交易成功后,前端需要得到:
- `tx_hash`
- `packet_id`(优先从 `PacketCreated` 事件解析)
### 3.4 创建回写(激活红包)
请求:
```http
POST /api/redpacket/created-callback
token: <user token>
Content-Type: application/json
```
```json
{
"biz_id": "f8a0f87e-d9cb-4d4a-8350-7bd43ab2e9a4",
"tx_hash": "0xabc123...",
"packet_id": "10001",
"group_id": "g001",
"scope_type": "GROUP",
"receiver_user_id": "",
"receiver_user_ids": []
}
```
说明:
- `biz_id`、`tx_hash` 必填
- 推荐传 `packet_id`(可减少后端 fallback 分支)
- 回写成功后红包状态从 `PENDING` 变为 `ACTIVE`
- 回写后可调 `/api/redpacket/detail` 刷新页面状态
### 3.5 创建流程常见坑
- 先发链再 `create_order`:会导致回写阶段缺少有效 `biz_id`
- `create_order``creator_wallet` 与实际发链钱包不一致:可能被后端校验拦截
- 未调用 `created_callback`:红包会一直停留在 `PENDING`,领取侧会失败
## 4. 领取签名流程
### 3.1 流程图
@ -111,7 +216,7 @@ Content-Type: application/json
链监听器 -> 红包服务: 最终确认领取结果
```
### 3.2 申请领取签名
### 4.2 申请领取签名
请求:
@ -159,7 +264,7 @@ Content-Type: application/json
}
```
### 3.3 前端拿到响应后要做什么
### 4.3 前端拿到响应后要做什么
前端必须原样把这些参数传给链上:
@ -182,7 +287,7 @@ claim(packetId, authNonce, randomSeed, deadline, signature)
- 不要对摘要再次做 `signMessage`
- 后端返回的 `signature` 已经是最终可上链签名
## 4. 领取结果回写
## 5. 领取结果回写
`claim-result` 是可选的,主要作用是让业务侧尽快看到一条 `PENDING` 领取记录。
@ -210,29 +315,38 @@ Content-Type: application/json
- 如果不能,会先记成 `PENDING`
- 最终仍以链监听器为准
## 5. 前端推荐调用顺序
## 6. 前端推荐调用顺序
### 6.1 创建红包
1. 用户登录业务系统
2. 前端请求 `/api/redpacket/create-order`
3. 拿到 `biz_id` 后,钱包调用链上创建红包方法
4. 从交易回执/事件拿到 `tx_hash`、`packet_id`
5. 前端请求 `/api/redpacket/created-callback`
6. 前端请求 `/api/redpacket/detail` 刷新状态(确认 `ACTIVE`
### 5.1 首次使用钱包领取
### 6.2 首次使用钱包领取
1. 用户登录业务系统
2. 前端请求 `/wallet-bind/challenge`
2. 前端请求 `/api/redpacket/wallet-bind/challenge`
3. 钱包对 `message` 签名
4. 前端请求 `/wallet-bind/confirm`
4. 前端请求 `/api/redpacket/wallet-bind/confirm`
5. 绑定成功后再进入领取流程
### 5.2 正常领取
### 6.3 正常领取
1. 前端拿到红包 `packet_id`
2. 用户连接钱包,得到本次 `claimer` 地址
3. 前端请求 `/claim-sign`
3. 前端请求 `/api/redpacket/claim-sign`
4. 拿到 `auth_nonce + random_seed + deadline + signature`
5. 前端调用链上 `claim(...)`
6. 前端可选请求 `/claim-result`
6. 前端可选请求 `/api/redpacket/claim-result`
7. 页面轮询详情页或等待业务侧状态同步
## 6. 常见错误和排查
## 7. 常见错误和排查
### 6.1 `op user id missing in context`
### 7.1 `op user id missing in context`
原因:
@ -240,7 +354,7 @@ Content-Type: application/json
- 网关没有把 `opUserID` 注入上下文
- 直接绕过网关调用了红包服务
### 6.2 `wallet is not bound to user`
### 7.2 `wallet is not bound to user`
原因:
@ -248,20 +362,20 @@ Content-Type: application/json
- 当前钱包绑定的是别的业务用户
- 链类型不一致
### 6.3 `already claimed`
### 7.3 `already claimed`
原因:
- 同一个钱包地址已经领过该红包
### 6.4 `user already claimed`
### 7.4 `user already claimed`
原因:
- 同一个业务用户已经领取过该红包
- 即使换钱包地址,也会被后端拦截
## 7. 后端接口与代码位置
## 8. 后端接口与代码位置
- 接口契约文档:
[backend-api.md](/Users/panda/aiCode/red_packet/open-im-server-origin/cmd/openim-rpc/openim-rpc-redpacket/backend-api.md)

@ -1,6 +1,9 @@
cronExecuteTime: 0 2 * * *
burnCronExecuteTime: "*/1 * * * *" # 可选:仅阅后即焚清理使用;不配置则与 cronExecuteTime 相同
burnClearLimit: 100 # 可选:单次 RPC 批大小,默认 100
burnClearMaxLoop: 100 # 可选:单次定时内最大循环轮数,默认 10000
retainChatRecords: 365
fileExpireTime: 180
deleteObjectType: ["msg-picture","msg-file", "msg-voice","msg-video","msg-video-snapshot","sdklog"]
chatAPI:
address: http://127.0.0.1:10008
address: http://127.0.0.1:10008

@ -12,20 +12,23 @@ prometheus:
# Leave rpcURL empty to disable the EVM client; the RPC service will then
# only expose TRON-related functionality (or the offchain code paths).
chain:
rpcURL: ""
contractAddress: ""
chainID: 0
signerPrivateKey: ""
configAdminPrivateKey: ""
rpcURL: "https://data-seed-prebsc-1-s1.bnbchain.org:8545"
contractAddress: "0x9f2e22F5D0cf8d8127E319D38b3EDDaE43bb4DC0"
chainID: 97
signerPrivateKey: "e9f6a5f3a3c3a97099ca31e7f44151e529c0a4f8a91d5d4232c7282f2b798df4"
configAdminPrivateKey: "e9f6a5f3a3c3a97099ca31e7f44151e529c0a4f8a91d5d4232c7282f2b798df4"
# TRON full-node configuration. Leave fullNodeURL empty to disable TRON.
tron:
fullNodeURL: ""
contractBase58: ""
ownerBase58: ""
privateKeyHex: ""
feeLimit: 100000000
fullNodeURL: "https://nile.trongrid.io"
contractBase58: "TTLFvSEyH6BAUp3cVGygWyTVGPBqdLw81M"
ownerBase58: "TJG6ZbEUZNVAQzFQTZehfQJniVJtTBdZ2e"
privateKeyHex: "e9f6a5f3a3c3a97099ca31e7f44151e529c0a4f8a91d5d4232c7282f2b798df4"
feeLimit: 150000000
# Indexer polling interval (in seconds). Used by both EVM and TRON event indexers.
# Set to 0 or negative to disable block scanning completely and rely on tx-hash parsing paths.
indexer:
pollInterval: 5
pollInterval: 0
# EVM only: max block span per eth_getLogs request (0 = default 2000). Increase if your node allows larger ranges.
maxBlocksPerPoll: 2000

@ -71,3 +71,11 @@ func (o *ConversationApi) GetNotNotifyConversationIDs(c *gin.Context) {
func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) {
a2r.Call(c, conversation.ConversationClient.GetPinnedConversationIDs, o.Client)
}
func (o *ConversationApi) SetMute(c *gin.Context) {
a2r.Call(c, conversation.ConversationClient.SetConversationMute, o.Client)
}
func (o *ConversationApi) SetBurn(c *gin.Context) {
a2r.Call(c, conversation.ConversationClient.SetConversationBurn, o.Client)
}

@ -41,6 +41,10 @@ func (o *FriendApi) DeleteFriend(c *gin.Context) {
a2r.Call(c, relation.FriendClient.DeleteFriend, o.Client)
}
func (o *FriendApi) DeleteFriendOneway(c *gin.Context) {
a2r.Call(c, relation.FriendClient.DeleteFriendOneway, o.Client)
}
func (o *FriendApi) GetFriendApplyList(c *gin.Context) {
a2r.Call(c, relation.FriendClient.GetPaginationFriendsApplyTo, o.Client)
}
@ -127,13 +131,6 @@ func (o *FriendApi) AddOnewayFriend(c *gin.Context) {
a2r.Call(c, relation.FriendClient.AddOnewayFriend, o.Client)
}
func (o *FriendApi) SetMute(c *gin.Context) {
a2r.Call(c, relation.FriendClient.SetMute, o.Client)
}
func (o *FriendApi) GetMute(c *gin.Context) {
a2r.Call(c, relation.FriendClient.GetMute, o.Client)
}
func (o *FriendApi) PinFriend(c *gin.Context) {
a2r.Call(c, relation.FriendClient.PinFriend, o.Client)

@ -198,6 +198,7 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co
f := NewFriendApi(relation.NewFriendClient(friendConn))
friendRouterGroup := r.Group("/friend")
friendRouterGroup.POST("/delete_friend", f.DeleteFriend)
friendRouterGroup.POST("/delete_friend_oneway", f.DeleteFriendOneway)
friendRouterGroup.POST("/get_friend_apply_list", f.GetFriendApplyList)
friendRouterGroup.POST("/get_designated_friend_apply", f.GetDesignatedFriendsApply)
friendRouterGroup.POST("/get_self_friend_apply_list", f.GetSelfApplyList)
@ -221,8 +222,6 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co
friendRouterGroup.POST("/get_self_unhandled_apply_count", f.GetSelfUnhandledApplyCount)
friendRouterGroup.POST("/get_pinned_friend_ids", f.GetPinnedFriendIDs)
friendRouterGroup.POST("/add_oneway_friend", f.AddOnewayFriend)
friendRouterGroup.POST("/set_mute", f.SetMute)
friendRouterGroup.POST("/get_mute", f.GetMute)
friendRouterGroup.POST("/pin", f.PinFriend)
friendRouterGroup.POST("/unpin", f.UnpinFriend)
}
@ -358,6 +357,8 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co
conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation)
conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs)
conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs)
conversationGroup.POST("/set_mute", c.SetMute)
conversationGroup.POST("/set_burn", c.SetBurn)
}
{

@ -36,6 +36,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/protocol/constant"
pbconversation "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/discovery"
@ -127,6 +128,7 @@ func (c *conversationServer) GetConversation(ctx context.Context, req *pbconvers
}
resp := &pbconversation.GetConversationResp{Conversation: &pbconversation.Conversation{}}
resp.Conversation = convert.ConversationDB2Pb(conversations[0])
c.fillConversationUserMute(ctx, resp.Conversation)
return resp, nil
}
@ -201,6 +203,15 @@ func (c *conversationServer) GetSortedConversationList(ctx context.Context, req
}
conversation_notPinTime[time] = conversationID
}
for _, v := range conversations {
elem, ok := conversationMsg[v.ConversationID]
if !ok {
continue
}
elem.MuteDuration = v.MuteDuration
elem.MuteEndTime = v.MuteEndTime
elem.IsMuted = computeIsMuted(v.MuteDuration, v.MuteEndTime)
}
resp = &pbconversation.GetSortedConversationListResp{
ConversationTotal: int64(len(chatLogs)),
ConversationElems: []*pbconversation.ConversationElem{},
@ -221,6 +232,7 @@ func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbcon
}
resp := &pbconversation.GetAllConversationsResp{Conversations: []*pbconversation.Conversation{}}
resp.Conversations = convert.ConversationsDB2Pb(conversations)
c.fillConversationsUserMute(ctx, resp.Conversations)
return resp, nil
}
@ -239,9 +251,9 @@ func (c *conversationServer) getConversations(ctx context.Context, ownerUserID s
if err != nil {
return nil, err
}
resp := &pbconversation.GetConversationsResp{Conversations: []*pbconversation.Conversation{}}
resp.Conversations = convert.ConversationsDB2Pb(conversations)
return convert.ConversationsDB2Pb(conversations), nil
list := convert.ConversationsDB2Pb(conversations)
c.fillConversationsUserMute(ctx, list)
return list, nil
}
// Deprecated
@ -529,7 +541,9 @@ func (c *conversationServer) GetConversationsByConversationID(
if err != nil {
return nil, err
}
return &pbconversation.GetConversationsByConversationIDResp{Conversations: convert.ConversationsDB2Pb(conversations)}, nil
list := convert.ConversationsDB2Pb(conversations)
c.fillConversationsUserMute(ctx, list)
return &pbconversation.GetConversationsByConversationIDResp{Conversations: list}, nil
}
func (c *conversationServer) GetConversationOfflinePushUserIDs(ctx context.Context, req *pbconversation.GetConversationOfflinePushUserIDsReq) (*pbconversation.GetConversationOfflinePushUserIDsResp, error) {
@ -717,9 +731,11 @@ func (c *conversationServer) GetOwnerConversation(ctx context.Context, req *pbco
if err != nil {
return nil, err
}
list := convert.ConversationsDB2Pb(conversations)
c.fillConversationsUserMute(ctx, list)
return &pbconversation.GetOwnerConversationResp{
Total: total,
Conversations: convert.ConversationsDB2Pb(conversations),
Conversations: list,
}, nil
}
@ -770,7 +786,9 @@ func (c *conversationServer) GetConversationsNeedClearMsg(ctx context.Context, _
}
}
return &pbconversation.GetConversationsNeedClearMsgResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil
list := convert.ConversationsDB2Pb(temp)
c.fillConversationsUserMute(ctx, list)
return &pbconversation.GetConversationsNeedClearMsgResp{Conversations: list}, nil
}
func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, req *pbconversation.GetNotNotifyConversationIDsReq) (*pbconversation.GetNotNotifyConversationIDsResp, error) {
@ -839,9 +857,11 @@ func (c *conversationServer) setConversationMinSeqAndLatestMsgDestructTime(ctx c
// ClearBurnExpiredMsgs 处理「阅后即焚」过期消息:
// 1. 从 msg_burn_deadline 中拉取一批过期分组(按 user/conversation 聚合,含每组最大 seq
// 2. 对每个分组把用户在该会话上的 min_seq 推进到 max(过期 seq) + 1。
// 3. 同步更新 conversation 文档的 min_seq 字段并下发会话变更通知。
// 4. 删除已处理的 deadline 记录。
// 2. 对每个分组把用户在该会话上的 min_seq 推进到 max(过期 seq) + 1更新 conversation 文档。
// 3. 向阅读方发 ConversationChangeNotification + DeleteMsgsNotification含精确 seqs
// 4. 单聊场景si_ 前缀):利用 deadline 记录中的 PeerID 直接推进对端 min_seq
// 并向对端也发两条通知,保证双方客户端都删本地消息。
// 5. 物理删除 msg 存储中的焚毁消息;清理 deadline 记录。
//
// 单次最多处理 req.Limit 个分组;若返回的 count == limitcron 可继续触发。
func (c *conversationServer) ClearBurnExpiredMsgs(ctx context.Context, req *pbconversation.ClearBurnExpiredMsgsReq) (*pbconversation.ClearBurnExpiredMsgsResp, error) {
@ -861,30 +881,46 @@ func (c *conversationServer) ClearBurnExpiredMsgs(ctx context.Context, req *pbco
if g.UserID == "" || g.ConversationID == "" || g.MaxSeq <= 0 {
continue
}
newMinSeq := g.MaxSeq + 1
if err := c.msgClient.SetUserConversationMin(ctx, g.ConversationID, []string{g.UserID}, newMinSeq); err != nil {
log.ZError(ctx, "ClearBurnExpiredMsgs SetUserConversationMin failed", err,
"userID", g.UserID, "conversationID", g.ConversationID, "minSeq", newMinSeq)
continue
}
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, []string{g.UserID}, g.ConversationID,
map[string]any{"min_seq": newMinSeq}); err != nil {
log.ZError(ctx, "ClearBurnExpiredMsgs UpdateUsersConversationField failed", err,
"userID", g.UserID, "conversationID", g.ConversationID, "minSeq", newMinSeq)
continue
//newMinSeq := g.MaxSeq + 1
// 推进阅读方 min_seq。
//if err := c.msgClient.SetUserConversationMin(ctx, g.ConversationID, []string{g.UserID}, newMinSeq); err != nil {
// log.ZError(ctx, "ClearBurnExpiredMsgs SetUserConversationMin failed", err,
// "userID", g.UserID, "conversationID", g.ConversationID, "minSeq", newMinSeq)
// continue
//}
//if err := c.conversationDatabase.UpdateUsersConversationField(ctx, []string{g.UserID}, g.ConversationID,
// map[string]any{"min_seq": newMinSeq}); err != nil {
// log.ZError(ctx, "ClearBurnExpiredMsgs UpdateUsersConversationField failed", err,
// "userID", g.UserID, "conversationID", g.ConversationID, "minSeq", newMinSeq)
// continue
//}
// 通知 g.UserID 客户端:会话变更 + 精确删除指定 seqs。
// 对端用户在 msg_burn_deadline 中有独立记录cron 处理其分组时会自行通知,
// 无需在此重复推进对端 min_seq 或发送额外通知。
//c.conversationNotificationSender.ConversationChangeNotification(ctx, g.UserID, []string{g.ConversationID})
// 删除焚毁消息并同步通知阅读方客户端best-effort失败不中断流程
if err := c.msgClient.DeleteMsgs(ctx, g.UserID, g.ConversationID, g.Seqs, &msg.DeleteSyncOpt{
IsSyncSelf: true,
}); err != nil {
log.ZError(ctx, "ClearBurnExpiredMsgs DeleteMsgs failed", err,
"userID", g.UserID, "conversationID", g.ConversationID, "seqs", g.Seqs)
}
c.conversationNotificationSender.ConversationChangeNotification(ctx, g.UserID, []string{g.ConversationID})
if err := c.msgBurnDeadlineDB.DeleteByUserConversationSeqs(ctx, g.UserID, g.ConversationID, g.Seqs); err != nil {
log.ZError(ctx, "ClearBurnExpiredMsgs DeleteByUserConversationSeqs failed", err,
"userID", g.UserID, "conversationID", g.ConversationID, "seqs", g.Seqs)
}
log.ZDebug(ctx, "ClearBurnExpiredMsgs advanced min_seq", "userID", g.UserID,
"conversationID", g.ConversationID, "minSeq", newMinSeq, "seqs", g.Seqs)
"conversationID", g.ConversationID, "seqs", g.Seqs)
processed++
}
return &pbconversation.ClearBurnExpiredMsgsResp{Count: processed}, nil
}
// ClearGroupBurnExpiredMsgs 处理群消息「阅后即焚」到期记录:
// 1. 查询满足 read_count >= member_count 且 burn_end_time 过期的记录(按 group_id 聚合)。
// 2. 对每个群,获取所有成员 ID批量推进他们在群会话上的 min_seq。
@ -951,3 +987,49 @@ func (c *conversationServer) ClearGroupBurnExpiredMsgs(ctx context.Context, req
}
return &pbconversation.ClearGroupBurnExpiredMsgsResp{Count: processed}, nil
}
func (c *conversationServer) SetConversationBurn(ctx context.Context, req *pbconversation.SetConversationBurnReq) (*pbconversation.SetConversationBurnResp, error) {
if err := c.conversationDatabase.UpdateUsersConversationField(
ctx,
[]string{req.OwnerUserID},
req.ConversationID,
map[string]any{
"burn_duration": req.BurnDuration,
},
); err != nil {
return nil, err
}
c.conversationNotificationSender.ConversationChangeNotification(ctx, req.OwnerUserID, []string{req.ConversationID})
return &pbconversation.SetConversationBurnResp{}, nil
}
func (c *conversationServer) SetConversationMute(ctx context.Context, req *pbconversation.SetConversationMuteReq) (*pbconversation.SetConversationMuteResp, error) {
var (
muteDuration int32
muteEndTime int64
)
switch {
case req.Duration == 0:
// 取消静音:清零所有静音字段
case req.Duration == -1:
// 永久静音
muteDuration = -1
default:
// 定时静音
muteDuration = req.Duration
muteEndTime = time.Now().Unix() + int64(req.Duration)
}
if err := c.conversationDatabase.UpdateUsersConversationField(
ctx,
[]string{req.OwnerUserID},
req.ConversationID,
map[string]any{
"mute_duration": muteDuration,
"mute_end_time": muteEndTime,
},
); err != nil {
return nil, err
}
c.conversationNotificationSender.ConversationChangeNotification(ctx, req.OwnerUserID, []string{req.ConversationID})
return &pbconversation.SetConversationMuteResp{}, nil
}

@ -0,0 +1,40 @@
// Copyright © 2023 OpenIM. All rights reserved.
package conversation
import (
"context"
"time"
pbconversation "github.com/openimsdk/protocol/conversation"
)
// computeIsMuted 根据会话模型中存储的 mute_duration 和 mute_end_time 计算当前是否处于静音状态:
// - duration == 0 且 end == 0未静音
// - duration == -1 且 end == 0永久静音
// - end > 0 且 end > now定时静音仍有效
// - end > 0 且 end <= now定时静音已过期视为未静音
func computeIsMuted(muteDuration int32, muteEndTime int64) bool {
if muteDuration == 0 && muteEndTime == 0 {
return false
}
if muteDuration == -1 && muteEndTime == 0 {
return true
}
return muteEndTime > time.Now().Unix()
}
// fillConversationUserMute 根据会话模型字段(已由 ConversationDB2Pb 通过 CopyStructFields 填入
// conv.MuteDuration / conv.MuteEndTime计算并设置 conv.IsMuted无需额外数据库查询。
func (c *conversationServer) fillConversationUserMute(_ context.Context, conv *pbconversation.Conversation) {
if conv == nil {
return
}
conv.IsMuted = computeIsMuted(conv.MuteDuration, conv.MuteEndTime)
}
func (c *conversationServer) fillConversationsUserMute(ctx context.Context, list []*pbconversation.Conversation) {
for _, conv := range list {
c.fillConversationUserMute(ctx, conv)
}
}

@ -73,3 +73,19 @@ func (c *ConversationNotificationSender) ConversationUnreadChangeNotification(
c.Notification(ctx, userID, userID, constant.ConversationUnreadNotification, tips)
}
// BurnMsgsDeleteNotification 通知 recvUserID 按 seqs 删除本地消息。
// sendUserID 为触发焚烧的一方阅读方recvUserID 为需要执行删除的一方。
// 双方各调用一次,保证单聊两端客户端都删除本地缓存。
func (c *ConversationNotificationSender) BurnMsgsDeleteNotification(
ctx context.Context,
sendUserID, recvUserID, conversationID string,
seqs []int64,
) {
tips := &sdkws.DeleteMsgsTips{
UserID: sendUserID,
ConversationID: conversationID,
Seqs: seqs,
}
c.Notification(ctx, sendUserID, recvUserID, constant.DeleteMsgsNotification, tips)
}

@ -215,16 +215,18 @@ func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkCon
return &msg.MarkConversationAsReadResp{}, nil
}
// recordBurnDeadlines 在「单聊」场景下,根据对端(发送者)的 MsgBurnDuration
// 为本次已读的每条消息同时给接收者和发送者各记录一份「阅后即焚」截止时间。
// cron 到期后会分别推进两人各自的 min_seq双方都看不到该消息。
// recordBurnDeadlines 在「单聊」场景下,为本次已读的每条消息同时给接收者和发送者
// 各记录一份「阅后即焚」截止时间。cron 到期后会分别删除双方该消息,双方都看不到。
//
// 销毁时长优先级:
// 1. 会话级 BurnDuration通过 /conversation/set_burn 设置);
// 2. 对端(发送者)全局 MsgBurnDuration。
//
// 设计要点:
// 1. 仅单聊。
// 2. 仅当发送者 MsgBurnDuration > 0 时才记录0 表示未开启。
// 3. $setOnInsert 确保同一 (UserID, ConversationID, Seq) 已存在时不覆盖,
// 2. $setOnInsert 确保同一 (UserID, ConversationID, Seq) 已存在时不覆盖,
// 以「首次阅读时刻」为 deadline 基准,多端重复 MarkAsRead 不会往后推。
// 4. 失败仅记录日志,不影响已读主流程。
// 3. 失败仅记录日志,不影响已读主流程。
func (m *msgServer) recordBurnDeadlines(ctx context.Context, conv *conversation.Conversation, readerUserID string, seqs []int64) {
if len(seqs) == 0 {
return
@ -236,16 +238,23 @@ func (m *msgServer) recordBurnDeadlines(ctx context.Context, conv *conversation.
if peerID == "" || peerID == readerUserID {
return
}
peerInfo, err := m.UserLocalCache.GetUserInfo(ctx, peerID)
if err != nil {
log.ZWarn(ctx, "recordBurnDeadlines GetUserInfo failed", err, "peerID", peerID)
return
}
if peerInfo == nil || peerInfo.MsgBurnDuration <= 0 {
return
// 优先使用会话级 BurnDuration双方协商后保存到会话否则回退到发送者全局设置。
burnSeconds := conv.BurnDuration
if burnSeconds <= 0 {
peerInfo, err := m.UserLocalCache.GetUserInfo(ctx, peerID)
if err != nil {
log.ZWarn(ctx, "recordBurnDeadlines GetUserInfo failed", err, "peerID", peerID)
return
}
if peerInfo == nil || peerInfo.MsgBurnDuration <= 0 {
return
}
burnSeconds = peerInfo.MsgBurnDuration
}
now := time.Now().UnixMilli()
deadline := now + int64(peerInfo.MsgBurnDuration)*1000
deadline := now + int64(burnSeconds)*1000
// 每条消息同时为接收者和发送者各写一条 deadline双方消息同步焚毁。
items := make([]*model.MsgBurnDeadline, 0, len(seqs)*2)
for _, seq := range seqs {
@ -254,6 +263,7 @@ func (m *msgServer) recordBurnDeadlines(ctx context.Context, conv *conversation.
UserID: readerUserID,
ConversationID: conv.ConversationID,
Seq: seq,
PeerID: peerID,
DeadlineMs: deadline,
CreateTime: now,
},
@ -261,6 +271,7 @@ func (m *msgServer) recordBurnDeadlines(ctx context.Context, conv *conversation.
UserID: peerID,
ConversationID: conv.ConversationID,
Seq: seq,
PeerID: readerUserID,
DeadlineMs: deadline,
CreateTime: now,
},

@ -164,18 +164,16 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq
isNotification := msgprocessor.IsNotificationByMsg(req.MsgData)
log.ZInfo(ctx, "sendMsgSingleChat", "isNotification", isNotification, "msgdata", req.MsgData)
if !isNotification {
// 非通知类消息:执行发送权限校验 + 接收偏好校验(含 blacklist / MsgReceiveSetting / webhook / FriendVerify / globalOpt / convOpt
isSend, err = m.modifyMessageByUserMessageReceiveOpt(
ctx,
req.MsgData.RecvID,
conversationutil.GenConversationIDForSingle(req.MsgData.SendID, req.MsgData.RecvID),
constant.SingleChatType,
req,
)
if err != nil {
return nil, err
}
// 单聊一律校验接收方消息设置(含「仅好友可发」)、黑名单、会话接收偏好等;不再因通知类 Options 跳过
isSend, err = m.modifyMessageByUserMessageReceiveOpt(
ctx,
req.MsgData.RecvID,
conversationutil.GenConversationIDForSingle(req.MsgData.SendID, req.MsgData.RecvID),
constant.SingleChatType,
req,
)
if err != nil {
return nil, err
}
if !isSend {
prommetrics.SingleChatMsgProcessFailedCounter.Inc()

@ -72,7 +72,6 @@ type msgServer struct {
conversationClient *rpcli.ConversationClient
spamReportDB database.SpamReport
globalBlackDB controller.UserGlobalBlackDatabase
userMuteDB controller.UserMuteDatabase
msgBurnDeadlineDB database.MsgBurnDeadline
groupMsgBurnRecordDB database.GroupMsgBurnRecord
}
@ -139,6 +138,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil {
return err
}
groupMsgBurnRecordDB, err := mgo.NewGroupMsgBurnRecordMongo(mgocli.GetDB())
if err != nil {
return err
@ -147,6 +147,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil {
return err
}
s := &msgServer{
MsgDatabase: msgDatabase,
RegisterCenter: client,
@ -159,7 +160,6 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
conversationClient: conversationClient,
spamReportDB: spamReportDB,
globalBlackDB: controller.NewUserGlobalBlackDatabase(globalBlackMgo),
userMuteDB: controller.NewUserMuteDatabase(userMuteMgo),
msgBurnDeadlineDB: msgBurnDeadlineDB,
groupMsgBurnRecordDB: groupMsgBurnRecordDB,
}

@ -226,7 +226,7 @@ func (m *msgServer) modifyMessageByUserMessageReceiveOpt(ctx context.Context, us
}
// 第二优先级:单聊发送权限校验(从 messageVerification 迁移)
// 仅对非通知类消息生效(调用方已通过 !isNotification 做过前置过滤)
// 单聊路径下由 sendMsgSingleChat 始终调用本函数(含通知类),以校验接收方 MsgReceiveSetting 等
if sessionType == constant.SingleChatType {
// 管理员跳过发送权限拦截,直接进入接收偏好校验
if !datautil.Contain(pb.MsgData.SendID, m.config.Share.IMAdminUserID...) {
@ -255,10 +255,12 @@ func (m *msgServer) modifyMessageByUserMessageReceiveOpt(ctx context.Context, us
// skipFriendVerify: MsgReceiveSetting=1 已确认好友关系,无需再做 FriendVerify 重复查询
skipFriendVerify := false
switch recvUserInfo.MsgReceiveSetting {
case 2: // MsgReceiveSettingNobody
case model.MsgReceiveSettingNobody:
return false, servererrs.ErrMsgReceiveNotAllowed.Wrap()
case 1: // MsgReceiveSettingFriends
isFriend, err := m.FriendLocalCache.IsFriend(ctx, pb.MsgData.RecvID, pb.MsgData.SendID)
case model.MsgReceiveSettingFriends:
// FriendLocalCache.IsFriend(possibleFriendUserID, userID) 对应「userID 的好友列表里是否有 possibleFriendUserID」
// 此处须判断:接收方 recv 的好友列表里是否有发送方 send
isFriend, err := m.FriendLocalCache.IsFriend(ctx, pb.MsgData.SendID, pb.MsgData.RecvID)
if err != nil {
log.ZError(ctx, "modifyMessageByUserMessageReceiveOpt: IsFriend failed (MsgReceiveSetting)", err,
"sendID", pb.MsgData.SendID, "recvID", pb.MsgData.RecvID,
@ -323,14 +325,20 @@ func (m *msgServer) modifyMessageByUserMessageReceiveOpt(ctx context.Context, us
}
}
// 第四优先级用户静音设置user_mute 集合,支持好友与非好友)
// 无论会话记录是否存在均检查,以支持对非好友的静音
if m.userMuteDB != nil {
muted, err := m.userMuteDB.IsMuted(ctx, userID, pb.MsgData.SendID)
if err != nil {
return false, err
// 第四优先级:会话静音设置(存储于 conversations 集合的 mute_duration/mute_end_time
conv, convErr := m.ConversationLocalCache.GetConversation(ctx, userID, conversationID)
if convErr != nil && !errs.ErrRecordNotFound.Is(convErr) {
return false, convErr
}
if convErr == nil && conv != nil {
var isMuted bool
switch {
case conv.MuteDuration == -1 && conv.MuteEndTime == 0:
isMuted = true
case conv.MuteEndTime > 0:
isMuted = conv.MuteEndTime > time.Now().Unix()
}
if muted {
if isMuted {
if pb.MsgData.Options == nil {
pb.MsgData.Options = make(map[string]bool, 10)
}

@ -173,7 +173,7 @@ func (s *redPacketServer) ParseTxEvents(ctx context.Context, req *pbredpacket.Pa
if s.tronClient == nil {
return nil, errs.ErrInternalServer.WrapMsg("TRON client not configured")
}
events, err := s.tronClient.ParseTransactionReceipt(ctx, req.TxHash)
success, events, err := s.tronClient.ParseTransactionReceiptWithStatus(ctx, req.TxHash)
if err != nil {
return nil, errs.ErrInternalServer.WrapMsg("parse TRON tx receipt failed: " + err.Error())
}
@ -185,12 +185,16 @@ func (s *redPacketServer) ParseTxEvents(ctx context.Context, req *pbredpacket.Pa
}
out = append(out, &pbredpacket.ParsedEvent{Name: e.Name, Data: data})
}
return &pbredpacket.ParseTxEventsResp{Chain: "tron", TxHash: req.TxHash, Events: out}, nil
note := "tx_status=FAILED"
if success {
note = "tx_status=SUCCESS"
}
return &pbredpacket.ParseTxEventsResp{Chain: "tron", TxHash: req.TxHash, Events: out, Note: note}, nil
}
if s.chainClient != nil {
txHashBytes := common.HexToHash(req.TxHash)
events, err := s.chainClient.ParseTransactionReceipt(ctx, txHashBytes)
success, events, err := s.chainClient.ParseTransactionReceiptWithStatus(ctx, txHashBytes)
if err != nil {
return nil, errs.ErrInternalServer.WrapMsg("parse tx receipt failed: " + err.Error())
}
@ -206,10 +210,15 @@ func (s *redPacketServer) ParseTxEvents(ctx context.Context, req *pbredpacket.Pa
Data: data,
})
}
note := "tx_status=FAILED"
if success {
note = "tx_status=SUCCESS"
}
return &pbredpacket.ParseTxEventsResp{
Chain: "eth",
TxHash: req.TxHash,
Events: out,
Note: note,
}, nil
}

@ -113,12 +113,29 @@ func (c *ChainClient) SignClaim(digest [32]byte) ([]byte, error) {
}
func (c *ChainClient) ParseTransactionReceipt(ctx context.Context, txHash common.Hash) ([]*ParsedEvent, error) {
_, events, err := c.ParseTransactionReceiptWithStatus(ctx, txHash)
return events, err
}
// ParseTransactionReceiptWithStatus fetches tx receipt once and returns both
// execution status and decoded contract events.
func (c *ChainClient) ParseTransactionReceiptWithStatus(ctx context.Context, txHash common.Hash) (bool, []*ParsedEvent, error) {
receipt, err := c.client.TransactionReceipt(ctx, txHash)
if err != nil {
return nil, fmt.Errorf("get receipt failed: %w", err)
return false, nil, fmt.Errorf("get receipt failed: %w", err)
}
events, err := ParseEventsFromLogs(receipt.Logs, c.contractABI)
if err != nil {
return false, nil, err
}
return receipt.Status == types.ReceiptStatusSuccessful, events, nil
}
return ParseEventsFromLogs(receipt.Logs, c.contractABI)
// IsTransactionSuccessful reports whether the EVM transaction executed
// successfully according to receipt.status (1=success, 0=failure).
func (c *ChainClient) IsTransactionSuccessful(ctx context.Context, txHash common.Hash) (bool, error) {
success, _, err := c.ParseTransactionReceiptWithStatus(ctx, txHash)
return success, err
}
func (c *ChainClient) ContractAddress() common.Address {

@ -14,27 +14,50 @@ import (
"github.com/openimsdk/tools/log"
)
const defaultIndexerMaxBlocksPerPoll uint64 = 2000
type Indexer struct {
client *ChainClient
db controller.RedPacketDatabase
pollInterval time.Duration
lastBlock uint64
contractAddr common.Address
client *ChainClient
db controller.RedPacketDatabase
pollInterval time.Duration
lastBlock uint64
contractAddr common.Address
maxBlocksPerPoll uint64 // 0 => defaultIndexerMaxBlocksPerPoll
}
func NewIndexer(client *ChainClient, db controller.RedPacketDatabase, pollInterval int, startBlock uint64) *Indexer {
func NewIndexer(client *ChainClient, db controller.RedPacketDatabase, pollInterval int, startBlock uint64, maxBlocksPerPoll int) *Indexer {
if pollInterval <= 0 {
pollInterval = 5
}
var maxB uint64
if maxBlocksPerPoll > 0 {
maxB = uint64(maxBlocksPerPoll)
}
return &Indexer{
client: client,
db: db,
pollInterval: time.Duration(pollInterval) * time.Second,
lastBlock: startBlock,
contractAddr: client.contractAddr,
client: client,
db: db,
pollInterval: time.Duration(pollInterval) * time.Second,
lastBlock: startBlock,
contractAddr: client.contractAddr,
maxBlocksPerPoll: maxB,
}
}
func (i *Indexer) chunkEndBlock(chainTip uint64) uint64 {
maxSpan := i.maxBlocksPerPoll
if maxSpan == 0 {
maxSpan = defaultIndexerMaxBlocksPerPoll
}
if chainTip <= i.lastBlock {
return i.lastBlock
}
span := chainTip - i.lastBlock
if span > maxSpan {
return i.lastBlock + maxSpan
}
return chainTip
}
func (i *Indexer) Start(ctx context.Context) {
log.ZInfo(ctx, "starting RedPacket ETH event indexer")
@ -105,20 +128,25 @@ func (i *Indexer) poll(ctx context.Context) error {
return fmt.Errorf("get header failed: %w", err)
}
currentBlock := header.Number.Uint64()
if currentBlock <= i.lastBlock {
chainTip := header.Number.Uint64()
if chainTip <= i.lastBlock {
return nil
}
toBlock := i.chunkEndBlock(chainTip)
if toBlock <= i.lastBlock {
return nil
}
query := ethereum.FilterQuery{
FromBlock: big.NewInt(int64(i.lastBlock + 1)),
ToBlock: big.NewInt(int64(currentBlock)),
ToBlock: big.NewInt(int64(toBlock)),
Addresses: []common.Address{i.contractAddr},
}
logs, err := i.client.client.FilterLogs(ctx, query)
if err != nil {
return fmt.Errorf("filter logs failed: %w", err)
return fmt.Errorf("filter logs failed (blocks %d-%d): %w", i.lastBlock+1, toBlock, err)
}
logPtrs := make([]*types.Log, len(logs))
@ -137,8 +165,12 @@ func (i *Indexer) poll(ctx context.Context) error {
}
}
i.lastBlock = currentBlock
log.ZInfo(ctx, "redpacket eth indexed", "block", currentBlock, "events", len(events))
i.lastBlock = toBlock
if toBlock < chainTip {
log.ZDebug(ctx, "redpacket eth indexer chunk done, catching up", "indexedTo", toBlock, "chainTip", chainTip, "events", len(events))
} else {
log.ZInfo(ctx, "redpacket eth indexed", "block", toBlock, "events", len(events))
}
return nil
}

@ -63,17 +63,28 @@ func (t *TronClient) FullNodeURL() string {
}
func (t *TronClient) ParseTransactionReceipt(ctx context.Context, txID string) ([]*ParsedEvent, error) {
_, events, err := t.ParseTransactionReceiptWithStatus(ctx, txID)
return events, err
}
// ParseTransactionReceiptWithStatus fetches tx info once and returns both
// execution status and decoded contract events.
func (t *TronClient) ParseTransactionReceiptWithStatus(ctx context.Context, txID string) (bool, []*ParsedEvent, error) {
info, err := t.getTransactionInfo(ctx, txID)
if err != nil {
return nil, err
return false, nil, err
}
logs, err := tronLogsToEVMLogs(info, txID)
if err != nil {
return nil, err
return false, nil, err
}
return ParseEventsFromLogs(logs, t.parsedABI)
events, err := ParseEventsFromLogs(logs, t.parsedABI)
if err != nil {
return false, nil, err
}
success := strings.EqualFold(info.Receipt.Result, "SUCCESS")
return success, events, nil
}
func (t *TronClient) SendAdminTransaction(ctx context.Context, methodName string, args ...interface{}) (string, error) {
@ -111,13 +122,23 @@ func (t *TronClient) GetSignMessageForTron(ctx context.Context, packetID *big.In
type tronTxInfoResp struct {
ID string `json:"id"`
BlockNumber uint64 `json:"blockNumber"`
Log []struct {
Receipt struct {
Result string `json:"result"`
} `json:"receipt"`
Log []struct {
Address string `json:"address"`
Topics []string `json:"topics"`
Data string `json:"data"`
} `json:"log"`
}
// IsTransactionSuccessful reports whether the TRON transaction execution
// succeeded based on receipt.result == "SUCCESS".
func (t *TronClient) IsTransactionSuccessful(ctx context.Context, txID string) (bool, error) {
success, _, err := t.ParseTransactionReceiptWithStatus(ctx, txID)
return success, err
}
func getParamTypes(args []interface{}) string {
types := make([]string, len(args))
for i, arg := range args {

@ -137,13 +137,17 @@ func Start(ctx context.Context, conf *Config, registry discovery.SvcDiscoveryReg
pbredpacket.RegisterRedPacketServer(server, srv)
if chainClient != nil {
ethIndexer := chain.NewIndexer(chainClient, repo, conf.RpcConfig.Indexer.PollInterval, 0)
ethIndexer.Start(ctx)
}
if tronClient != nil {
tronIndexer := chain.NewTronIndexer(tronClient, repo, conf.RpcConfig.Indexer.PollInterval, 0)
tronIndexer.Start(ctx)
if conf.RpcConfig.Indexer.PollInterval > 0 {
if chainClient != nil {
ethIndexer := chain.NewIndexer(chainClient, repo, conf.RpcConfig.Indexer.PollInterval, 0, conf.RpcConfig.Indexer.MaxBlocksPerPoll)
ethIndexer.Start(ctx)
}
if tronClient != nil {
tronIndexer := chain.NewTronIndexer(tronClient, repo, conf.RpcConfig.Indexer.PollInterval, 0)
tronIndexer.Start(ctx)
}
} else {
log.ZInfo(ctx, "redpacket indexer disabled by config", "pollInterval", conf.RpcConfig.Indexer.PollInterval)
}
return nil

@ -124,6 +124,12 @@ func (s *redPacketServer) CreatedCallback(ctx context.Context, req *pbredpacket.
PacketID: createdPacket.PacketID,
ChainID: createdPacket.ChainID,
ContractAddress: createdPacket.ContractAddress,
CreatorWallet: createdPacket.CreatorWallet,
PacketType: createdPacket.PacketType,
Token: createdPacket.Token,
TotalAmount: createdPacket.TotalAmount,
TotalShares: createdPacket.TotalShares,
ExpiryAt: createdPacket.ExpiryAt,
TxHash: req.TxHash,
GroupID: groupID,
ScopeType: scopeType,
@ -268,15 +274,36 @@ func (s *redPacketServer) ClaimResult(ctx context.Context, req *pbredpacket.Clai
return nil, err
}
claimedEvent, err := s.resolveClaimedEvent(ctx, rp, req.TxHash)
txSuccess, events, err := s.parseChainReceiptWithStatus(ctx, rp, req.TxHash)
if err != nil {
log.ZWarn(ctx, "parse claim receipt failed", err, "txHash", req.TxHash)
return &pbredpacket.ClaimResultResp{}, nil
}
if !txSuccess {
if markErr := s.markClaimFailed(ctx, req.PacketID, currentUserID, req.Claimer, req.TxHash); markErr != nil {
log.ZWarn(ctx, "mark claim failed status failed", markErr, "txHash", req.TxHash)
}
return &pbredpacket.ClaimResultResp{}, nil
}
claimedEvent, err := resolveClaimedEventFromParsedEvents(rp, events)
if err != nil {
log.ZWarn(ctx, "resolve claim event failed", err, "txHash", req.TxHash)
if markErr := s.markClaimFailed(ctx, req.PacketID, currentUserID, req.Claimer, req.TxHash); markErr != nil {
log.ZWarn(ctx, "mark claim failed status failed", markErr, "txHash", req.TxHash)
}
return &pbredpacket.ClaimResultResp{}, nil
}
if claimedEvent == nil {
if markErr := s.markClaimFailed(ctx, req.PacketID, currentUserID, req.Claimer, req.TxHash); markErr != nil {
log.ZWarn(ctx, "mark claim failed status failed", markErr, "txHash", req.TxHash)
}
return &pbredpacket.ClaimResultResp{}, nil
}
if !strings.EqualFold(claimedEvent.ClaimerWallet, req.Claimer) {
if markErr := s.markClaimFailed(ctx, req.PacketID, currentUserID, req.Claimer, req.TxHash); markErr != nil {
log.ZWarn(ctx, "mark claim failed status failed", markErr, "txHash", req.TxHash)
}
return nil, errs.ErrArgs.WrapMsg(fmt.Sprintf("claim event claimer mismatch: got %s want %s", claimedEvent.ClaimerWallet, req.Claimer))
}
@ -311,6 +338,34 @@ func (s *redPacketServer) ClaimResult(ctx context.Context, req *pbredpacket.Clai
return &pbredpacket.ClaimResultResp{}, nil
}
func (s *redPacketServer) parseChainReceiptWithStatus(ctx context.Context, rp *model.RedPacket, txHash string) (bool, []*chain.ParsedEvent, error) {
switch rp.ChainType {
case "EVM":
if s.chainClient == nil {
return false, nil, errs.ErrInternalServer.WrapMsg("evm client is unavailable")
}
return s.chainClient.ParseTransactionReceiptWithStatus(ctx, common.HexToHash(txHash))
case "TRON":
if s.tronClient == nil {
return false, nil, errs.ErrInternalServer.WrapMsg("tron client is unavailable")
}
return s.tronClient.ParseTransactionReceiptWithStatus(ctx, txHash)
default:
return false, nil, errs.ErrArgs.WrapMsg("unsupported chain_type: " + rp.ChainType)
}
}
func (s *redPacketServer) markClaimFailed(ctx context.Context, packetID, userID, claimer, txHash string) error {
return s.db.SaveClaim(ctx, &model.RedPacketClaim{
PacketID: packetID,
UserID: userID,
ClaimerWallet: claimer,
ClaimTxHash: txHash,
Status: "FAILED",
UpdatedAt: time.Now(),
})
}
// canClaim runs the claim-eligibility check (formerly RedPacketService.CanClaim).
func (s *redPacketServer) canClaim(ctx context.Context, packetID, claimer, userID string) error {
rp, err := s.db.GetRedPacketByPacketID(ctx, packetID)
@ -344,6 +399,12 @@ type claimedEventSnapshot struct {
BlockNumber uint64
}
type refundedEventSnapshot struct {
RefundTo string
Amount string
BlockNumber uint64
}
type createdPacketSnapshot struct {
PacketID string
ChainID int64
@ -367,10 +428,13 @@ func (s *redPacketServer) resolveCreatedPacket(ctx context.Context, rp *model.Re
return buildFallbackCreatedPacket(rp, fallbackPacketID), nil
}
events, err := s.chainClient.ParseTransactionReceipt(ctx, common.HexToHash(txHashHex))
success, events, err := s.chainClient.ParseTransactionReceiptWithStatus(ctx, common.HexToHash(txHashHex))
if err != nil {
return nil, errs.ErrInternalServer.WrapMsg("parse created tx failed: " + err.Error())
}
if !success {
return nil, errs.ErrArgs.WrapMsg("created tx execution failed on chain")
}
for _, event := range events {
if event.Name != "PacketCreated" {
@ -396,10 +460,13 @@ func (s *redPacketServer) resolveCreatedPacket(ctx context.Context, rp *model.Re
return buildFallbackCreatedPacket(rp, fallbackPacketID), nil
}
events, err := s.tronClient.ParseTransactionReceipt(ctx, txHashHex)
success, events, err := s.tronClient.ParseTransactionReceiptWithStatus(ctx, txHashHex)
if err != nil {
return nil, errs.ErrInternalServer.WrapMsg("parse tron created tx failed: " + err.Error())
}
if !success {
return nil, errs.ErrArgs.WrapMsg("created tx execution failed on chain")
}
for _, event := range events {
if event.Name != "PacketCreated" {
@ -795,7 +862,10 @@ func (s *redPacketServer) resolveClaimedEvent(ctx context.Context, rp *model.Red
if err != nil {
return nil, err
}
return resolveClaimedEventFromParsedEvents(rp, events)
}
func resolveClaimedEventFromParsedEvents(rp *model.RedPacket, events []*chain.ParsedEvent) (*claimedEventSnapshot, error) {
for _, event := range events {
if event.Name != "PacketClaimed" {
continue
@ -816,6 +886,24 @@ func (s *redPacketServer) resolveClaimedEvent(ctx context.Context, rp *model.Red
return nil, nil
}
func resolveRefundedEventFromParsedEvents(rp *model.RedPacket, events []*chain.ParsedEvent) (*refundedEventSnapshot, error) {
for _, event := range events {
if event.Name != "PacketRefunded" {
continue
}
packetID := chain.GetPacketIDFromEvent(event).String()
if packetID != rp.PacketID {
return nil, errs.ErrArgs.WrapMsg(fmt.Sprintf("refund event packet mismatch: got %s want %s", packetID, rp.PacketID))
}
return &refundedEventSnapshot{
RefundTo: strings.ToLower(chain.GetAddressFromEvent(event, "refundTo").Hex()),
Amount: chain.GetAmountFromEvent(event).String(),
BlockNumber: event.BlockNumber,
}, nil
}
return nil, nil
}
// maxTotalShares caps the number of shares to prevent abuse.
const maxTotalShares = 10_000
@ -946,7 +1034,37 @@ func (s *redPacketServer) RequestRefund(ctx context.Context, req *pbredpacket.Re
}
log.ZInfo(ctx, "redpacket refund submitted", "packetID", rp.PacketID, "txHash", txHash)
return &pbredpacket.RequestRefundResp{TxHash: txHash, Status: "PENDING"}, nil
txSuccess, events, parseErr := s.parseChainReceiptWithStatus(ctx, rp, txHash)
if parseErr != nil {
log.ZWarn(ctx, "parse refund receipt failed, fallback to async indexer", parseErr, "packetID", rp.PacketID, "txHash", txHash)
return &pbredpacket.RequestRefundResp{TxHash: txHash, Status: "PENDING"}, nil
}
if !txSuccess {
return &pbredpacket.RequestRefundResp{TxHash: txHash, Status: "FAILED"}, nil
}
refundedEvent, err := resolveRefundedEventFromParsedEvents(rp, events)
if err != nil {
log.ZWarn(ctx, "resolve refunded event failed, fallback to async indexer", err, "packetID", rp.PacketID, "txHash", txHash)
return &pbredpacket.RequestRefundResp{TxHash: txHash, Status: "PENDING"}, nil
}
if refundedEvent == nil {
return &pbredpacket.RequestRefundResp{TxHash: txHash, Status: "PENDING"}, nil
}
if err := s.db.SaveRefund(ctx, &model.RedPacketRefund{
PacketID: rp.PacketID,
RefundTo: refundedEvent.RefundTo,
TxHash: txHash,
Amount: refundedEvent.Amount,
CreatedAt: time.Now(),
}); err != nil {
return nil, err
}
if err := s.db.UpdateRedPacketStatus(ctx, rp.PacketID, "REFUNDED"); err != nil {
return nil, err
}
return &pbredpacket.RequestRefundResp{TxHash: txHash, Status: "REFUNDED"}, nil
}
func (s *redPacketServer) GetRefund(ctx context.Context, req *pbredpacket.GetRefundReq) (*pbredpacket.GetRefundResp, error) {

@ -286,6 +286,29 @@ func (s *friendServer) DeleteFriend(ctx context.Context, req *relation.DeleteFri
return &relation.DeleteFriendResp{}, nil
}
// DeleteFriendOneway 单向删除好友:只删除 ownerUserID 侧的 friend 文档;
// 对端 friend 文档保留;仅向 owner 下发 FriendsInfoUpdateNotification 以刷新本地好友列表,
// 不向对端发送 FriendDeletedNotification。
func (s *friendServer) DeleteFriendOneway(ctx context.Context, req *relation.DeleteFriendReq) (resp *relation.DeleteFriendResp, err error) {
if err := authverify.CheckAccessV3(ctx, req.OwnerUserID, s.config.Share.IMAdminUserID); err != nil {
return nil, err
}
_, err = s.db.FindFriendsWithError(ctx, req.OwnerUserID, []string{req.FriendUserID})
if err != nil {
return nil, err
}
if err := s.db.Delete(ctx, req.OwnerUserID, []string{req.FriendUserID}); err != nil {
return nil, err
}
s.notificationSender.FriendDeletedOnewayNotification(ctx, req.OwnerUserID, req.FriendUserID)
s.webhookAfterDeleteFriend(ctx, &s.config.WebhooksConfig.AfterDeleteFriend, req)
return &relation.DeleteFriendResp{}, nil
}
// ok.
func (s *friendServer) SetFriendRemark(ctx context.Context, req *relation.SetFriendRemarkReq) (resp *relation.SetFriendRemarkResp, err error) {
if err = s.webhookBeforeSetFriendRemark(ctx, &s.config.WebhooksConfig.BeforeSetFriendRemark, req); err != nil && err != servererrs.ErrCallbackContinue {
@ -726,6 +749,7 @@ func (s *friendServer) AddOnewayFriend(ctx context.Context, req *relation.ApplyT
return &relation.ApplyToAddFriendResp{}, nil
}
// SetMute 设置用户对另一用户的静音duration 为秒0=取消静音;-1=永久静音;>0=从现在起持续 duration 秒后自动解除。
func (s *friendServer) SetMute(ctx context.Context, req *relation.SetMuteReq) (*relation.SetMuteResp, error) {
if err := authverify.CheckAccessV3(ctx, req.OwnerUserID, s.config.Share.IMAdminUserID); err != nil {
return nil, err
@ -733,19 +757,23 @@ func (s *friendServer) SetMute(ctx context.Context, req *relation.SetMuteReq) (*
if req.Duration == 0 {
return &relation.SetMuteResp{}, s.userMuteDB.Delete(ctx, req.OwnerUserID, req.TargetUserID)
}
if req.Duration < 0 && req.Duration != -1 {
return nil, errs.ErrArgs.WrapMsg("duration must be 0 (unmute), -1 (permanent), or positive seconds")
}
var muteEndTime int64
if req.Duration != -1 {
muteEndTime = time.Now().Unix() + req.Duration
}
return &relation.SetMuteResp{}, s.userMuteDB.Upsert(ctx, &model.UserMute{
OwnerUserID: req.OwnerUserID,
MutedUserID: req.TargetUserID,
MuteEndTime: muteEndTime,
MuteDuration: req.Duration,
CreateTime: time.Now(),
OwnerUserID: req.OwnerUserID,
MutedUserID: req.TargetUserID,
MuteEndTime: muteEndTime,
MuteDuration: req.Duration,
CreateTime: time.Now(),
})
}
// GetMute 查询静音状态:未静音或已过期时 muted=false、duration=0永久静音为 duration=-1 且 muteEndTime=0。
func (s *friendServer) GetMute(ctx context.Context, req *relation.GetMuteReq) (*relation.GetMuteResp, error) {
if err := authverify.CheckAccessV3(ctx, req.OwnerUserID, s.config.Share.IMAdminUserID); err != nil {
return nil, err
@ -759,13 +787,9 @@ func (s *friendServer) GetMute(ctx context.Context, req *relation.GetMuteReq) (*
}
now := time.Now().Unix()
if rec.MuteEndTime != 0 && rec.MuteEndTime <= now {
return &relation.GetMuteResp{Muted: false, MuteEndTime: 0, Duration: 0}, nil
}
duration := rec.MuteDuration
if duration == 0 && rec.MuteEndTime == 0 {
duration = -1
return &relation.GetMuteResp{Muted: false, MuteEndTime: 0, Duration: rec.MuteDuration}, nil
}
return &relation.GetMuteResp{Muted: true, MuteEndTime: rec.MuteEndTime, Duration: duration}, nil
return &relation.GetMuteResp{Muted: true, MuteEndTime: rec.MuteEndTime, Duration: rec.MuteDuration}, nil
}
func (s *friendServer) getCommonUserMap(ctx context.Context, userIDs []string) (map[string]common_user.CommonUser, error) {

@ -241,6 +241,18 @@ func (f *FriendNotificationSender) FriendDeletedNotification(ctx context.Context
f.Notification(ctx, req.OwnerUserID, req.FriendUserID, constant.FriendDeletedNotification, &tips)
}
// FriendDeletedOnewayNotification 单向删好友:仅通知操作方本人刷新好友列表,
// 不向对端发送 FriendDeletedNotification对端仍保留「你是 TA 好友」的关系。
func (f *FriendNotificationSender) FriendDeletedOnewayNotification(ctx context.Context, ownerUserID, friendUserID string) {
tips := sdkws.FriendsInfoUpdateTips{
FromToUserID: &sdkws.FromToUserID{ToUserID: ownerUserID},
FriendIDs: []string{friendUserID},
}
f.setSortVersion(ctx, &tips.FriendVersion, &tips.FriendVersionID,
database.FriendVersionName, ownerUserID, &tips.FriendSortVersion)
f.Notification(ctx, ownerUserID, ownerUserID, constant.FriendsInfoUpdateNotification, &tips)
}
func (f *FriendNotificationSender) setVersion(ctx context.Context, version *uint64, versionID *string, collName string, id string) {
versions := versionctx.GetVersionLog(ctx).Get()
for _, coll := range versions {

@ -97,6 +97,10 @@ func (s *rtcServer) handleInvite(ctx context.Context, req *rtc.SignalInviteReq,
inv.InviterUserID = req.UserID
inv.InitiateTime = time.Now().UnixMilli()
if len(inv.InviteeUserIDList) == 0 {
return nil, errs.ErrArgs.WrapMsg("no invitees", "inviteeUserIDList", inv.InviteeUserIDList)
}
for _, inviteeID := range inv.InviteeUserIDList {
allowed, err := s.isCallAllowed(ctx, req.UserID, inviteeID)
if err != nil {
@ -119,6 +123,10 @@ func (s *rtcServer) handleInvite(ctx context.Context, req *rtc.SignalInviteReq,
}
inv.BusyLineUserIDList = busyUserIDs
if len(inv.InviteeUserIDList) == len(busyUserIDs) {
return nil, errs.ErrNoPermission.WrapMsg("all invitees are busy", "inviteeUserIDList", inv.InviteeUserIDList)
}
// 从主叫用户资料获取铃声 URL注入到邀请信息中被叫方收到后播放主叫方铃声
if inviterInfo, err := s.userClient.GetUserInfo(ctx, req.UserID); err == nil && inviterInfo.CallRingtoneURL != "" {
inv.CallerRingtoneURL = inviterInfo.CallRingtoneURL
@ -167,7 +175,7 @@ func (s *rtcServer) handleInvite(ctx context.Context, req *rtc.SignalInviteReq,
continue
}
log.ZInfo(ctx, "sendSignalingNotification to invitee", "sendID", req.UserID, "recvID", inviteeID)
if err := s.sendSignalingNotification(ctx, req.UserID, inviteeID, int32(constant.SingleChatType), req.OfflinePushInfo, content); err != nil {
if err := s.sendSignalingNotification(ctx, req.UserID, inviteeID, int32(constant.SingleChatType), "", req.OfflinePushInfo, content); err != nil {
log.ZError(ctx, "sendSignalingNotification to invitee failed", err, "inviteeID", inviteeID)
return nil, errs.WrapMsg(err, "failed to notify invitee", "inviteeID", inviteeID)
}
@ -189,6 +197,9 @@ func (s *rtcServer) handleInviteInGroup(ctx context.Context, req *rtc.SignalInvi
if inv == nil {
return nil, errs.ErrArgs.WrapMsg("invitation is nil")
}
if inv.GroupID == "" {
return nil, errs.ErrArgs.WrapMsg("groupID is empty")
}
inv.RoomID = newRoomID()
inv.InviterUserID = req.UserID
@ -258,18 +269,22 @@ func (s *rtcServer) handleInviteInGroup(ctx context.Context, req *rtc.SignalInvi
log.ZInfo(ctx, "handleInviteInGroup: skip busy invitee", "inviteeID", inviteeID)
continue
}
if err := s.sendSignalingNotification(ctx, req.UserID, inviteeID, int32(constant.ReadGroupChatType), req.OfflinePushInfo, content); err != nil {
log.ZWarn(ctx, "sendSignalingNotification to group invitee failed", err, "inviteeID", inviteeID)
if err := s.sendSignalingNotification(ctx, req.UserID, inviteeID, int32(constant.ReadGroupChatType), inv.GroupID, req.OfflinePushInfo, content); err != nil {
log.ZWarn(ctx, "handleInviteInGroup to group invitee failed", err, "inviteeID", inviteeID)
}
}
return &rtc.SignalInviteInGroupResp{
resp := &rtc.SignalInviteInGroupResp{
Token: token,
RoomID: inv.RoomID,
LiveURL: s.config.RpcConfig.LiveKit.ExternalAddress,
BusyLineUserIDList: busyUserIDs,
CalleeRingtoneURL: calleeRingtoneURL,
}, nil
}
log.ZDebug(ctx, "handleInviteInGroup", "req", req, "resp", resp)
return resp, nil
}
// isCallAllowed 判断 inviterID 是否被允许向 inviteeID 发起音视频通话。
@ -325,7 +340,7 @@ func (s *rtcServer) handleAccept(ctx context.Context, req *rtc.SignalAcceptReq,
return nil, err
}
if err := s.sendSignalingNotification(ctx, req.UserID, dbInv.InviterUserID, sessionType, req.OfflinePushInfo, content); err != nil {
if err := s.sendSignalingNotification(ctx, req.UserID, dbInv.InviterUserID, sessionType, dbInv.GroupID, req.OfflinePushInfo, content); err != nil {
log.ZWarn(ctx, "sendSignalingNotification accept to inviter failed", err, "inviterID", dbInv.InviterUserID)
}
@ -365,7 +380,7 @@ func (s *rtcServer) handleReject(ctx context.Context, req *rtc.SignalRejectReq,
if err != nil {
return nil, err
}
if err := s.sendSignalingNotification(ctx, req.UserID, dbInv.InviterUserID, sessionType, req.OfflinePushInfo, content); err != nil {
if err := s.sendSignalingNotification(ctx, req.UserID, dbInv.InviterUserID, sessionType, dbInv.GroupID, req.OfflinePushInfo, content); err != nil {
log.ZWarn(ctx, "sendSignalingNotification reject to inviter failed", err, "inviterID", dbInv.InviterUserID)
}
@ -405,7 +420,7 @@ func (s *rtcServer) handleCancel(ctx context.Context, req *rtc.SignalCancelReq,
return nil, err
}
for _, inviteeID := range dbInv.InviteeUserIDList {
if err := s.sendSignalingNotification(ctx, req.UserID, inviteeID, sessionType, req.OfflinePushInfo, content); err != nil {
if err := s.sendSignalingNotification(ctx, req.UserID, inviteeID, sessionType, dbInv.GroupID, req.OfflinePushInfo, content); err != nil {
log.ZWarn(ctx, "sendSignalingNotification cancel to invitee failed", err, "inviteeID", inviteeID)
}
}
@ -441,7 +456,7 @@ func (s *rtcServer) handleHungUp(ctx context.Context, req *rtc.SignalHungUpReq,
}
// 使用 DB 中的参与者列表,不信任客户端传入的 InviteeUserIDList
for _, peerID := range hungUpPeerIDsFromDB(dbInv, req.UserID) {
if err := s.sendSignalingNotification(ctx, req.UserID, peerID, sessionType, req.OfflinePushInfo, content); err != nil {
if err := s.sendSignalingNotification(ctx, req.UserID, peerID, sessionType, dbInv.GroupID, req.OfflinePushInfo, content); err != nil {
log.ZWarn(ctx, "sendSignalingNotification hungUp to peer failed", err, "peerID", peerID)
}
}
@ -670,12 +685,14 @@ func signalingMsgOptions() map[string]bool {
}
// sendSignalingNotification sends a SignalingNotification message to a user via the msg service.
func (s *rtcServer) sendSignalingNotification(ctx context.Context, sendID, recvID string, sessionType int32, offlinePush *sdkws.OfflinePushInfo, content []byte) error {
// groupID 在 SessionType 为群类型(如 ReadGroupChatType时必须非空否则 msg 服务群聊校验会失败。
func (s *rtcServer) sendSignalingNotification(ctx context.Context, sendID, recvID string, sessionType int32, groupID string, offlinePush *sdkws.OfflinePushInfo, content []byte) error {
now := time.Now().UnixMilli()
msgData := &sdkws.MsgData{
SendID: sendID,
RecvID: recvID,
SessionType: sessionType,
GroupID: groupID,
ContentType: int32(constant.SignalingNotification),
MsgFrom: int32(constant.SysMsgType),
Content: content,

@ -25,17 +25,15 @@ import (
)
// clearBurnExpiredMsgs 阅后即焚 cron 入口:循环调用 conversation 服务的
// ClearBurnExpiredMsgs每次至多处理 burnLimit 个 (user, conversation) 分组,
// 直至本轮没有新的过期分组或达到防御性的最大循环次数
// ClearBurnExpiredMsgs每次至多处理 burnClearLimit 个 (user, conversation) 分组,
// 直至本轮没有新的过期分组或达到 burnClearMaxLoop 轮
func (c *cronServer) clearBurnExpiredMsgs() {
now := time.Now()
operationID := fmt.Sprintf("cron_burn_msg_%d_%d", os.Getpid(), now.UnixMilli())
ctx := mcontext.SetOperationID(c.ctx, operationID)
log.ZDebug(ctx, "clear burn expired msgs cron start")
const (
maxLoop = 10000
burnLimit = 100
)
burnLimit := int32(c.config.CronTask.BurnClearLimit)
maxLoop := c.config.CronTask.BurnClearMaxLoop
var count int
for i := 1; i <= maxLoop; i++ {
resp, err := c.conversationClient.ClearBurnExpiredMsgs(ctx, &pbconversation.ClearBurnExpiredMsgsReq{

@ -16,6 +16,7 @@ package tools
import (
"context"
"strings"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
@ -44,36 +45,37 @@ type CronTaskConfig struct {
MongodbConfig config.Mongo
}
func Start(ctx context.Context, config *CronTaskConfig) error {
log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime", config.CronTask.CronExecuteTime, "msgDestructTime", config.CronTask.RetainChatRecords)
if config.CronTask.RetainChatRecords < 1 {
func Start(ctx context.Context, cfg *CronTaskConfig) error {
config.FillCronTaskDefaults(&cfg.CronTask)
log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime", cfg.CronTask.CronExecuteTime, "msgDestructTime", cfg.CronTask.RetainChatRecords)
if cfg.CronTask.RetainChatRecords < 1 {
return errs.New("msg destruct time must be greater than 1").Wrap()
}
client, err := kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share, nil)
client, err := kdisc.NewDiscoveryRegister(&cfg.Discovery, &cfg.Share, nil)
if err != nil {
return errs.WrapMsg(err, "failed to register discovery service")
}
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
ctx = mcontext.SetOpUserID(ctx, config.Share.IMAdminUserID[0])
ctx = mcontext.SetOpUserID(ctx, cfg.Share.IMAdminUserID[0])
msgConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Msg)
msgConn, err := client.GetConn(ctx, cfg.Share.RpcRegisterName.Msg)
if err != nil {
return err
}
thirdConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third)
thirdConn, err := client.GetConn(ctx, cfg.Share.RpcRegisterName.Third)
if err != nil {
return err
}
conversationConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Conversation)
conversationConn, err := client.GetConn(ctx, cfg.Share.RpcRegisterName.Conversation)
if err != nil {
return err
}
authConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Auth)
authConn, err := client.GetConn(ctx, cfg.Share.RpcRegisterName.Auth)
if err != nil {
return err
}
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
mgocli, err := mongoutil.NewMongoDB(ctx, cfg.MongodbConfig.Build())
if err != nil {
return errs.WrapMsg(err, "crontask: connect mongodb failed")
}
@ -86,14 +88,14 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
srv := &cronServer{
ctx: ctx,
config: config,
config: cfg,
cron: cron.New(),
msgClient: msg.NewMsgClient(msgConn),
conversationClient: pbconversation.NewConversationClient(conversationConn),
thirdClient: third.NewThirdClient(thirdConn),
authClient: rpcli.NewAuthClient(authConn),
userOfflineRecordDB: userOfflineRecordDB,
chatAPIAddress: config.CronTask.ChatAPI.Address,
chatAPIAddress: cfg.CronTask.ChatAPI.Address,
}
if err := srv.registerClearS3(); err != nil {
@ -114,7 +116,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
if err := srv.registerDeleteExpiredOfflineUsers(); err != nil {
return err
}
log.ZDebug(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime)
log.ZDebug(ctx, "start cron task", "CronExecuteTime", cfg.CronTask.CronExecuteTime)
srv.cron.Start()
<-ctx.Done()
return nil
@ -156,7 +158,11 @@ func (c *cronServer) registerClearUserMsg() error {
}
func (c *cronServer) registerClearBurnExpiredMsgs() error {
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearBurnExpiredMsgs)
schedule := strings.TrimSpace(c.config.CronTask.BurnCronExecuteTime)
if schedule == "" {
schedule = c.config.CronTask.CronExecuteTime
}
_, err := c.cron.AddFunc(schedule, c.clearBurnExpiredMsgs)
return errs.WrapMsg(err, "failed to register clear burn expired msgs cron task")
}

@ -115,10 +115,18 @@ type API struct {
}
type CronTask struct {
// CronExecuteTime 标准 5 段 cron 表达式供聊天记录清理、S3 清理、用户消息清理及阅后即焚清理等任务共用(除非 burnCronExecuteTime 单独指定)。
// 未配置时由 FillCronTaskDefaults 设为每天 02:00与 config/openim-crontask.yml 一致)。
CronExecuteTime string `mapstructure:"cronExecuteTime"`
RetainChatRecords int `mapstructure:"retainChatRecords"`
FileExpireTime int `mapstructure:"fileExpireTime"`
DeleteObjectType []string `mapstructure:"deleteObjectType"`
// BurnCronExecuteTime 仅「单聊阅后即焚」清理ClearBurnExpiredMsgs的 cron留空则与 CronExecuteTime 相同。
BurnCronExecuteTime string `mapstructure:"burnCronExecuteTime"`
// BurnClearLimit 单次 RPC 最多处理多少个 (user, conversation) 分组;<=0 时默认 100。
BurnClearLimit int `mapstructure:"burnClearLimit"`
// BurnClearMaxLoop 单次定时触发内最多循环轮数;<=0 时默认 10000。
BurnClearMaxLoop int `mapstructure:"burnClearMaxLoop"`
// ChatAPI 是 chat HTTP API 服务的访问配置,用于调用 /account/del 等需要管理员权限的接口。
ChatAPI ChatAPI `mapstructure:"chatAPI"`
}
@ -290,9 +298,9 @@ type Group struct {
AutoSetPorts bool `mapstructure:"autoSetPorts"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"`
EnableHistoryForNewMembers bool `mapstructure:"enableHistoryForNewMembers"`
CommonGroupsLimitWithFriend int `mapstructure:"commonGroupsLimitWithFriend"`
Prometheus Prometheus `mapstructure:"prometheus"`
EnableHistoryForNewMembers bool `mapstructure:"enableHistoryForNewMembers"`
CommonGroupsLimitWithFriend int `mapstructure:"commonGroupsLimitWithFriend"`
}
type Msg struct {
@ -483,7 +491,7 @@ type Crypto struct {
AutoSetPorts bool `mapstructure:"autoSetPorts"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"`
Prometheus Prometheus `mapstructure:"prometheus"`
Virgil VirgilConfig `mapstructure:"virgil"`
}
@ -524,9 +532,11 @@ type RedPacketTron struct {
type RedPacketIndexer struct {
PollInterval int `mapstructure:"pollInterval"`
// MaxBlocksPerPoll limits each eth_getLogs range (lastBlock+1 .. toBlock). Many public RPCs
// reject large ranges; 0 means use default 2000.
MaxBlocksPerPoll int `mapstructure:"maxBlocksPerPoll"`
}
// FullConfig stores all configurations for before and after events
type Webhooks struct {
URL string `mapstructure:"url"`
@ -780,6 +790,37 @@ func (ct *CronTask) GetConfigFileName() string {
return OpenIMCronTaskCfgFileName
}
// FillCronTaskDefaults applies defaults after YAML/env load. Only fills empty or invalid placeholders.
func FillCronTaskDefaults(ct *CronTask) {
if ct == nil {
return
}
if strings.TrimSpace(ct.CronExecuteTime) == "" {
ct.CronExecuteTime = "0 2 * * *"
}
if strings.TrimSpace(ct.BurnCronExecuteTime) == "" {
ct.BurnCronExecuteTime = "*/1 * * * *"
}
if ct.BurnClearLimit <= 0 {
ct.BurnClearLimit = 100
}
if ct.BurnClearMaxLoop <= 0 {
ct.BurnClearMaxLoop = 100
}
if ct.ChatAPI.Address == "" {
ct.ChatAPI.Address = "http://127.0.0.1:10008"
}
if ct.RetainChatRecords < 1 {
ct.RetainChatRecords = 365
}
if ct.BurnClearLimit < 1 {
ct.BurnClearLimit = 100
}
if ct.BurnClearMaxLoop < 1 {
ct.BurnClearMaxLoop = 10000
}
}
func (mg *MsgGateway) GetConfigFileName() string {
return OpenIMMsgGatewayCfgFileName
}

@ -66,6 +66,7 @@ func (m *msgBurnDeadlineMgo) UpsertIfAbsent(ctx context.Context, items []*model.
"user_id": item.UserID,
"conversation_id": item.ConversationID,
"seq": item.Seq,
"peer_id": item.PeerID,
"deadline_ms": item.DeadlineMs,
"create_time": item.CreateTime,
}
@ -91,6 +92,7 @@ func (m *msgBurnDeadlineMgo) FindExpiredGroups(ctx context.Context, nowMs int64,
"user_id": "$user_id",
"conversation_id": "$conversation_id",
},
"peer_id": bson.M{"$first": "$peer_id"},
"max_seq": bson.M{"$max": "$seq"},
"seqs": bson.M{"$push": "$seq"},
}}},
@ -101,6 +103,7 @@ func (m *msgBurnDeadlineMgo) FindExpiredGroups(ctx context.Context, nowMs int64,
UserID string `bson:"user_id"`
ConversationID string `bson:"conversation_id"`
} `bson:"_id"`
PeerID string `bson:"peer_id"`
MaxSeq int64 `bson:"max_seq"`
Seqs []int64 `bson:"seqs"`
}
@ -113,6 +116,7 @@ func (m *msgBurnDeadlineMgo) FindExpiredGroups(ctx context.Context, nowMs int64,
res = append(res, &database.ExpiredBurnGroup{
UserID: r.ID.UserID,
ConversationID: r.ID.ConversationID,
PeerID: r.PeerID,
MaxSeq: r.MaxSeq,
Seqs: r.Seqs,
})

@ -75,6 +75,12 @@ func (m *RedPacketMgo) UpdateCreated(ctx context.Context, rp *model.RedPacket) e
"tx_hash": rp.TxHash,
"chain_id": rp.ChainID,
"contract_address": rp.ContractAddress,
"creator_wallet": rp.CreatorWallet,
"packet_type": rp.PacketType,
"token": rp.Token,
"total_amount": rp.TotalAmount,
"total_shares": rp.TotalShares,
"expiry_at": rp.ExpiryAt,
"group_id": rp.GroupID,
"scope_type": rp.ScopeType,
"receiver_user_id": rp.ReceiverUserID,

@ -82,7 +82,6 @@ func (u *UserMgo) UpdateByMap(ctx context.Context, userID string, args map[strin
"first_name",
"last_name",
"full_name",
"remark",
"face_url",
"phone_number",
"area_code",

@ -24,6 +24,8 @@ import (
type ExpiredBurnGroup struct {
UserID string
ConversationID string
// PeerID 单聊中的对端用户 ID直接从 deadline 记录读取,无需额外查 conversation 表。
PeerID string
// MaxSeq 当前批次中最大的过期 seq推进 min_seq 时使用 MaxSeq + 1。
MaxSeq int64
// Seqs 当前批次实际涉及的所有过期 seq便于精确删除已处理的 deadline 记录。

@ -37,4 +37,6 @@ type Conversation struct {
IsMsgDestruct bool `bson:"is_msg_destruct"`
MsgDestructTime int64 `bson:"msg_destruct_time"`
LatestMsgDestructTime time.Time `bson:"latest_msg_destruct_time"`
MuteDuration int32 `bson:"mute_duration"`
MuteEndTime int64 `bson:"mute_end_time"`
}

@ -24,6 +24,9 @@ type MsgBurnDeadline struct {
UserID string `bson:"user_id"`
ConversationID string `bson:"conversation_id"`
Seq int64 `bson:"seq"`
// PeerID 单聊中的对端用户 ID。
// cron 处理时可直接获取对端,无需额外查询 conversation 表。
PeerID string `bson:"peer_id"`
// DeadlineMs 截止时间戳(毫秒);超过即可被 cron 收走推进 min_seq。
DeadlineMs int64 `bson:"deadline_ms"`
// CreateTime 写入时刻(毫秒);用于排查/审计。

@ -113,3 +113,27 @@ func (x *MsgClient) SetUserConversationsMinSeq(ctx context.Context, conversation
req := &msg.SetUserConversationsMinSeqReq{ConversationID: conversationID, UserIDs: userIDs, Seq: seq}
return ignoreResp(x.MsgClient.SetUserConversationsMinSeq(ctx, req))
}
// DeleteMsgs 按 seq 删除消息,行为与 msg RPC DeleteMsgs 一致。
func (x *MsgClient) DeleteMsgs(ctx context.Context, userID, conversationID string, seqs []int64, deleteSyncOpt *msg.DeleteSyncOpt) error {
if len(seqs) == 0 {
return nil
}
req := &msg.DeleteMsgsReq{
ConversationID: conversationID,
UserID: userID,
Seqs: seqs,
DeleteSyncOpt: deleteSyncOpt,
}
return ignoreResp(x.MsgClient.DeleteMsgs(ctx, req))
}
// DeleteMsgPhysicalBySeqs 按 seq 物理删除会话内的消息(无鉴权)。
// 用于阅后即焚、系统级消息清理等场景。
func (x *MsgClient) DeleteMsgPhysicalBySeqs(ctx context.Context, conversationID string, seqs []int64) error {
if len(seqs) == 0 {
return nil
}
req := &msg.DeleteMsgPhysicalBySeqReq{ConversationID: conversationID, Seqs: seqs}
return ignoreResp(x.MsgClient.DeleteMsgPhysicalBySeq(ctx, req))
}

@ -1 +1 @@
Subproject commit 2e1f23fe06d15adcabf07ef72c1e8b08bca0f2c1
Subproject commit 7eb7737531d6ff779571e77f5a49aa21ce9bcb43
Loading…
Cancel
Save