Merge remote-tracking branch 'origin/feature/redpacket_new1' into feature/redpacket_new1

pull/3727/head
panda 2 weeks ago
commit 54c66a6a5a

@ -1,4 +1,6 @@
cronExecuteTime: 0 2 * * *
retainChatRecords: 365
fileExpireTime: 180
deleteObjectType: ["msg-picture","msg-file", "msg-voice","msg-video","msg-video-snapshot","sdklog"]
deleteObjectType: ["msg-picture","msg-file", "msg-voice","msg-video","msg-video-snapshot","sdklog"]
chatAPI:
address: http://127.0.0.1:10008

@ -18,7 +18,7 @@ prometheus:
maxConcurrentWorkers: 3
#Use geTui for offline push notifications, or choose fcm or jpush; corresponding configuration settings must be specified.
enable: geTui
enable: fcm
geTui:
pushUrl: https://restapi.getui.com/v2/$appId
masterSecret:
@ -28,7 +28,7 @@ geTui:
channelName:
fcm:
# Prioritize using file paths. If the file path is empty, use URL
filePath: # File path is concatenated with the parameters passed in through - c(`mage` default pass in `config/`) and filePath.
filePath: sokim-firebase-adminsdk.json # File path is concatenated with the parameters passed in through - c(`mage` default pass in `config/`) and filePath.
authURL: # Must start with https or http.
jpush:
appKey:

@ -0,0 +1,154 @@
package api
import (
"github.com/gin-gonic/gin"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/group"
"github.com/openimsdk/protocol/relation"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/apiresp"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
)
// DeleteUserApi handles real account deletion (hard delete).
// It follows the same direct-DB pattern as UserGlobalBlackApi.
type DeleteUserApi struct {
userDB database.User
phoneSNDB database.PhoneSN
authClient *rpcli.AuthClient
groupClient group.GroupClient
friendClient relation.FriendClient
imAdminUserIDs []string
}
func NewDeleteUserApi(
userDB database.User,
phoneSNDB database.PhoneSN,
authClient *rpcli.AuthClient,
groupClient group.GroupClient,
friendClient relation.FriendClient,
imAdminUserIDs []string,
) *DeleteUserApi {
return &DeleteUserApi{
userDB: userDB,
phoneSNDB: phoneSNDB,
authClient: authClient,
groupClient: groupClient,
friendClient: friendClient,
imAdminUserIDs: imAdminUserIDs,
}
}
type deleteUserReq struct {
UserID string `json:"userID" binding:"required"`
}
// DeleteUser permanently deletes a user account and cleans up associated data.
// Steps: force-logout → delete friends → quit/kick groups → hard-delete user doc.
// Caller must be the same user as userID, or an IM admin (see CheckAccessV3).
func (d *DeleteUserApi) DeleteUser(c *gin.Context) {
var req deleteUserReq
if err := c.ShouldBindJSON(&req); err != nil {
apiresp.GinError(c, errs.ErrArgs.WrapMsg(err.Error()))
return
}
// Only the user themselves (or an IM admin) may delete the account.
if err := authverify.CheckAccessV3(c, req.UserID, d.imAdminUserIDs); err != nil {
apiresp.GinError(c, err)
return
}
// 1. Verify user exists
users, err := d.userDB.Find(c, []string{req.UserID})
if err != nil {
apiresp.GinError(c, err)
return
}
if len(users) == 0 {
apiresp.GinError(c, errs.ErrRecordNotFound.WrapMsg("user not found", "userID", req.UserID))
return
}
// 2. Force logout from every platform
for platformID := range constant.PlatformID2Name {
if int32(platformID) == constant.AdminPlatformID {
continue
}
if err := d.authClient.ForceLogout(c, req.UserID, int32(platformID)); err != nil {
log.ZWarn(c, "DeleteUser: ForceLogout failed", err, "userID", req.UserID, "platformID", platformID)
}
}
// 3. Delete all friendships (both directions: target→friend and friend→target)
friendIDsResp, err := d.friendClient.GetFriendIDs(c, &relation.GetFriendIDsReq{UserID: req.UserID})
if err != nil {
log.ZWarn(c, "DeleteUser: GetFriendIDs failed", err, "userID", req.UserID)
} else {
for _, friendID := range friendIDsResp.FriendIDs {
// Remove from target user's friend list
if _, err := d.friendClient.DeleteFriend(c, &relation.DeleteFriendReq{
OwnerUserID: req.UserID,
FriendUserID: friendID,
}); err != nil {
log.ZWarn(c, "DeleteUser: DeleteFriend (owner→friend) failed", err,
"ownerUserID", req.UserID, "friendUserID", friendID)
}
// Remove from the friend's friend list
//if _, err := d.friendClient.DeleteFriend(c, &relation.DeleteFriendReq{
// OwnerUserID: friendID,
// FriendUserID: req.UserID,
//}); err != nil {
// log.ZWarn(c, "DeleteUser: DeleteFriend (friend→owner) failed", err,
// "ownerUserID", friendID, "friendUserID", req.UserID)
//}
}
}
// 4. Quit / kick from all joined groups (paginated, page size 100)
pageNumber := int32(1)
const pageSize = int32(100)
for {
groupListResp, err := d.groupClient.GetJoinedGroupList(c, &group.GetJoinedGroupListReq{
FromUserID: req.UserID,
Pagination: &sdkws.RequestPagination{PageNumber: pageNumber, ShowNumber: pageSize},
})
if err != nil {
log.ZWarn(c, "DeleteUser: GetJoinedGroupList failed", err, "userID", req.UserID, "page", pageNumber)
break
}
for _, g := range groupListResp.Groups {
if _, err := d.groupClient.QuitGroup(c, &group.QuitGroupReq{
GroupID: g.GroupID,
UserID: req.UserID,
}); err != nil {
log.ZWarn(c, "DeleteUser: QuitGroup failed", err, "userID", req.UserID, "groupID", g.GroupID)
}
}
if int32(len(groupListResp.Groups)) < pageSize {
break
}
pageNumber++
}
// 5. Delete phone_sn_info record bound to this user's phone number.
if phone := users[0].Phone; phone != "" {
if err := d.phoneSNDB.DeleteByPhone(c, phone); err != nil {
log.ZWarn(c, "DeleteUser: DeleteByPhone failed", err, "userID", req.UserID, "phone", phone)
}
}
// 6. Hard-delete user document from MongoDB.
// Redis cache will become stale and expire via TTL; the user can no longer
// authenticate because their tokens were already invalidated in step 2.
if err := d.userDB.Delete(c, []string{req.UserID}); err != nil {
apiresp.GinError(c, err)
return
}
log.ZInfo(c, "DeleteUser: user deleted", "userID", req.UserID)
apiresp.GinSuccess(c, nil)
}

@ -173,3 +173,15 @@ func (o *GroupApi) GetGroupApplicationUnhandledCount(c *gin.Context) {
func (o *GroupApi) GetCommonGroupsWithFriend(c *gin.Context) {
a2r.Call(c, group.GroupClient.GetCommonGroupsWithFriend, o.Client)
}
func (o *GroupApi) PinGroupMessage(c *gin.Context) {
a2r.Call(c, group.GroupClient.PinGroupMessage, o.Client)
}
func (o *GroupApi) UnpinGroupMessage(c *gin.Context) {
a2r.Call(c, group.GroupClient.UnpinGroupMessage, o.Client)
}
func (o *GroupApi) GetGroupPinnedMessages(c *gin.Context) {
a2r.Call(c, group.GroupClient.GetGroupPinnedMessages, o.Client)
}

@ -9,11 +9,11 @@ import (
pbAuth "github.com/openimsdk/protocol/auth"
pbcaptcha "github.com/openimsdk/protocol/captcha"
"github.com/openimsdk/protocol/conversation"
pbcrypto "github.com/openimsdk/protocol/crypto"
"github.com/openimsdk/protocol/group"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/relation"
pbcrypto "github.com/openimsdk/protocol/crypto"
pbredpacket "github.com/openimsdk/protocol/redpacket"
"github.com/openimsdk/protocol/relation"
"github.com/openimsdk/protocol/rtc"
"github.com/openimsdk/protocol/third"
"github.com/openimsdk/protocol/user"
@ -141,6 +141,7 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co
m := NewMessageApi(msg.NewMsgClient(msgConn), rpcli.NewUserClient(userConn), config.Share.IMAdminUserID)
cp := NewCaptchaApi(pbcaptcha.NewCaptchaClient(captchaConn))
bl := NewUserGlobalBlackApi(blacklistCtrl, userDB, config.Share.IMAdminUserID, rpcli.NewAuthClient(authConn))
du := NewDeleteUserApi(userDB, phoneSNDB, rpcli.NewAuthClient(authConn), group.NewGroupClient(groupConn), relation.NewFriendClient(friendConn), config.Share.IMAdminUserID)
phoneSN := NewPhoneSNApi(phoneSNDB)
userRouterGroup := r.Group("/user")
{
@ -173,6 +174,13 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co
userRouterGroup.POST("/set_phone_visibility", u.SetPhoneVisibility)
userRouterGroup.POST("/set_call_accept_setting", u.SetCallAcceptSetting)
userRouterGroup.POST("/set_msg_receive_setting", u.SetMsgReceiveSetting)
userRouterGroup.POST("/set_group_invite_setting", u.SetGroupInviteSetting)
// 设置用户全局阅后即焚时长0 表示关闭
userRouterGroup.POST("/set_user_msg_burn_duration", u.SetUserMsgBurnDuration)
// 设置删除账号等待间隔0 表示使用系统默认18个月
userRouterGroup.POST("/set_delete_account_interval", u.SetDeleteAccountInterval)
// 批量查询阅后即焚、手机号可见性、音视频接收、全局/会话消息接收、群邀请等设置
userRouterGroup.POST("/get_user_privacy_settings", u.GetUserPrivacySettings)
// 根据手机号精确查找用户phoneSearchVisibility=true 时遵守 phone_visibility 设置)
userRouterGroup.POST("/get_user_by_phone", u.GetUserByPhone)
// 根据昵称精确查询用户(可多结果,与 getPaginationUsers 模糊搜索不同)
@ -182,6 +190,8 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co
userRouterGroup.POST("/add_global_blacklist", bl.AddGlobalBlacklist)
userRouterGroup.POST("/remove_global_blacklist", bl.RemoveGlobalBlacklist)
userRouterGroup.POST("/get_global_blacklist", bl.GetGlobalBlacklist)
userRouterGroup.POST("/delete_user", du.DeleteUser)
}
// friend routing group
{
@ -193,7 +203,7 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co
friendRouterGroup.POST("/get_self_friend_apply_list", f.GetSelfApplyList)
friendRouterGroup.POST("/get_friend_list", f.GetFriendList)
friendRouterGroup.POST("/get_designated_friends", f.GetDesignatedFriends)
friendRouterGroup.POST("/add_friend", f.ApplyToAddFriend)
friendRouterGroup.POST("/add_friend", f.AddOnewayFriend)
friendRouterGroup.POST("/add_friend_response", f.RespondFriendApply)
friendRouterGroup.POST("/set_friend_remark", f.SetFriendRemark)
friendRouterGroup.POST("/add_black", f.AddBlack)
@ -210,6 +220,7 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co
friendRouterGroup.POST("/get_full_friend_user_ids", f.GetFullFriendUserIDs)
friendRouterGroup.POST("/get_self_unhandled_apply_count", f.GetSelfUnhandledApplyCount)
friendRouterGroup.POST("/get_pinned_friend_ids", f.GetPinnedFriendIDs)
friendRouterGroup.POST("/add_oneway_friend", f.AddOnewayFriend)
}
g := NewGroupApi(group.NewGroupClient(groupConn))
@ -248,6 +259,9 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co
groupRouterGroup.POST("/get_full_join_group_ids", g.GetFullJoinGroupIDs)
groupRouterGroup.POST("/get_group_application_unhandled_count", g.GetGroupApplicationUnhandledCount)
groupRouterGroup.POST("/get_common_groups_with_friend", g.GetCommonGroupsWithFriend)
groupRouterGroup.POST("/pin_group_message", g.PinGroupMessage)
groupRouterGroup.POST("/unpin_group_message", g.UnpinGroupMessage)
groupRouterGroup.POST("/get_group_pinned_messages", g.GetGroupPinnedMessages)
}
// certificate
{
@ -339,8 +353,8 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co
phoneGroup := r.Group("/phone")
phoneGroup.POST("/get_sn_info", phoneSN.GetSNInfo)
phoneGroup.POST("/set_sn_info", phoneSN.SetSNInfo)
}
{
}
{
rc := NewRtcApi(rtc.NewRtcServiceClient(rtcConn))
rtcGroup := r.Group("/rtc")
rtcGroup.POST("/signal_message_assemble", rc.SignalMessageAssemble)

@ -351,6 +351,22 @@ func (u *UserApi) SetMsgReceiveSetting(c *gin.Context) {
a2r.Call(c, user.UserClient.SetMsgReceiveSetting, u.Client)
}
func (u *UserApi) SetGroupInviteSetting(c *gin.Context) {
a2r.Call(c, user.UserClient.SetGroupInviteSetting, u.Client)
}
func (u *UserApi) SetUserMsgBurnDuration(c *gin.Context) {
a2r.Call(c, user.UserClient.SetUserMsgBurnDuration, u.Client)
}
func (u *UserApi) SetDeleteAccountInterval(c *gin.Context) {
a2r.Call(c, user.UserClient.SetDeleteAccountInterval, u.Client)
}
func (u *UserApi) GetUserPrivacySettings(c *gin.Context) {
a2r.Call(c, user.UserClient.GetUserPrivacySettings, u.Client)
}
func (u *UserApi) GetUserByPhone(c *gin.Context) {
a2r.Call(c, user.UserClient.GetUserByPhone, u.Client)
}

@ -29,10 +29,14 @@ func NewUserGlobalBlackApi(blacklistDB controller.UserGlobalBlackDatabase, userD
type addGlobalBlacklistReq struct {
UserIDs []string `json:"userIDs" binding:"required,min=1"`
Reason string `json:"reason"`
// Status 限制类型1=冻结可登录不能收发消息2=黑名单(不可登录,自动踢下线)
Status int32 `json:"status" binding:"required,oneof=1 2"`
}
type removeGlobalBlacklistReq struct {
UserIDs []string `json:"userIDs" binding:"required,min=1"`
// Status 目标状态0=恢复正常(同步从 blacklistDB 删除记录1=冻结2=黑名单
Status int32 `json:"status" binding:"oneof=0 1 2"`
}
type getGlobalBlacklistReq struct {
@ -45,6 +49,8 @@ type globalBlackItem struct {
OperatorID string `json:"operatorID"`
Reason string `json:"reason"`
CreateTime int64 `json:"createTime"`
// Status 限制类型1=冻结2=黑名单
Status int32 `json:"status"`
}
type getGlobalBlacklistResp struct {
@ -52,7 +58,8 @@ type getGlobalBlacklistResp struct {
Blacks []globalBlackItem `json:"blacks"`
}
// AddGlobalBlacklist 管理员将用户加入全局黑名单,并立即踢下线(所有平台 token 标记 KickedToken
// AddGlobalBlacklist 管理员设置用户限制状态。
// Status=1冻结可登录但不能收发消息Status=2黑名单不可登录自动踢下线不能收发消息。
func (b *UserGlobalBlackApi) AddGlobalBlacklist(c *gin.Context) {
var req addGlobalBlacklistReq
if err := c.ShouldBindJSON(&req); err != nil {
@ -85,31 +92,44 @@ func (b *UserGlobalBlackApi) AddGlobalBlacklist(c *gin.Context) {
Nickname: u.Nickname,
OperatorID: operatorID,
Reason: req.Reason,
Status: req.Status,
})
}
if err := b.blacklistDB.AddBlack(c, blacks); err != nil {
apiresp.GinError(c, err)
return
}
// 黑名单写入成功后,对每个被封禁用户的所有非管理员平台执行 force_logout
// 1. 断开 WS 长连接msggateway.KickUserOffline
// 2. 将 Redis 中该平台的所有 token 标记为 KickedToken
for _, black := range blacks {
for platformID := range constant.PlatformID2Name {
if int32(platformID) == constant.AdminPlatformID {
continue
}
if err := b.authClient.ForceLogout(c, black.UserID, int32(platformID)); err != nil {
// 踢下线失败不阻断主流程,记录警告即可
log.ZWarn(c, "AddGlobalBlacklist: ForceLogout failed", err,
"userID", black.UserID, "platformID", platformID)
// 同步更新 user 集合中的状态字段
for _, userID := range req.UserIDs {
if err := b.userDB.UpdateByMap(c, userID, map[string]any{"status": req.Status}); err != nil {
log.ZWarn(c, "AddGlobalBlacklist: UpdateByMap status failed", err,
"userID", userID, "status", req.Status)
}
}
// 仅黑名单Status=2需要踢下线断开 WS 长连接并将 token 标记为 KickedToken
if req.Status == model.UserStatusBlacklist {
for _, black := range blacks {
for platformID := range constant.PlatformID2Name {
if int32(platformID) == constant.AdminPlatformID {
continue
}
if err := b.authClient.ForceLogout(c, black.UserID, int32(platformID)); err != nil {
log.ZWarn(c, "AddGlobalBlacklist: ForceLogout failed", err,
"userID", black.UserID, "platformID", platformID)
}
}
}
}
apiresp.GinSuccess(c, nil)
}
// RemoveGlobalBlacklist 管理员从全局黑名单移除用户
// RemoveGlobalBlacklist 管理员更新用户账号状态。
// 执行顺序:
// 1. 将 user 集合中的 status 字段更新为请求值
// 2. 仅当 status == 0恢复正常才从 blacklistDB 删除该用户的限制记录
//
// 说明blacklistDB 是 auth/msg 层的拦截依据;状态先落 user 集合,
// 只有确认目标状态为"正常"时才清除黑名单记录,避免状态写入成功但记录未删导致仍被拦截。
func (b *UserGlobalBlackApi) RemoveGlobalBlacklist(c *gin.Context) {
var req removeGlobalBlacklistReq
if err := c.ShouldBindJSON(&req); err != nil {
@ -120,9 +140,19 @@ func (b *UserGlobalBlackApi) RemoveGlobalBlacklist(c *gin.Context) {
apiresp.GinError(c, err)
return
}
if err := b.blacklistDB.RemoveBlack(c, req.UserIDs); err != nil {
apiresp.GinError(c, err)
return
for _, userID := range req.UserIDs {
if err := b.userDB.UpdateByMap(c, userID, map[string]any{"status": req.Status}); err != nil {
log.ZError(c, "RemoveGlobalBlacklist: UpdateByMap status failed", err, "userID", userID, "status", req.Status)
apiresp.GinError(c, err)
return
}
}
// 只有目标状态为 0正常时才删除 blacklistDB 中的限制记录
if req.Status == model.UserStatusNormal {
if err := b.blacklistDB.RemoveBlack(c, req.UserIDs); err != nil {
apiresp.GinError(c, err)
return
}
}
apiresp.GinSuccess(c, nil)
}
@ -151,6 +181,7 @@ func (b *UserGlobalBlackApi) GetGlobalBlacklist(c *gin.Context) {
OperatorID: blk.OperatorID,
Reason: blk.Reason,
CreateTime: blk.CreateTime.UnixMilli(),
Status: blk.Status,
})
}
apiresp.GinSuccess(c, getGlobalBlacklistResp{Total: total, Blacks: items})

@ -32,6 +32,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
pbauth "github.com/openimsdk/protocol/auth"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/msggateway"
@ -140,13 +141,13 @@ func (s *authServer) GetUserToken(ctx context.Context, req *pbauth.GetUserTokenR
return nil, errs.ErrArgs.WrapMsg("app account can`t get token")
}
blocked, _ := s.blacklistDB.IsBlocked(ctx, req.UserID)
if blocked {
// Blacklisted users should be actively kicked to invalidate existing sessions.
// 仅黑名单status=2禁止登录冻结status=1允许获取 token仅在收发消息层面拦截
status, _ := s.blacklistDB.GetStatus(ctx, req.UserID)
if status == model.UserStatusBlacklist {
if kickErr := s.forceKickOffAllPlatforms(ctx, req.UserID); kickErr != nil {
log.ZWarn(ctx, "GetUserToken forceKickOffAllPlatforms failed", kickErr, "userID", req.UserID)
}
log.ZWarn(ctx, "GetUserToken is blocked", errors.New("user is in global blacklist, userID="+req.UserID), "userID", req.UserID, "blocked", blocked)
log.ZWarn(ctx, "GetUserToken is blocked", errors.New("user is in global blacklist, userID="+req.UserID), "userID", req.UserID, "status", status)
return nil, servererrs.ErrUserBlocked.WithDetail("user is in global blacklist, userID=" + req.UserID)
}
token, err := s.authDatabase.CreateToken(ctx, req.UserID, int(req.PlatformID))
@ -167,14 +168,13 @@ func (s *authServer) parseToken(ctx context.Context, tokensString string) (claim
if isAdmin {
return claims, nil
}
// 非管理员用户检查全局黑名单
blocked, _ := s.blacklistDB.IsBlocked(ctx, claims.UserID)
if blocked {
// Blacklisted users should be actively kicked to invalidate existing sessions.
// 非管理员用户检查全局黑名单:仅 status=2黑名单拦截status=1冻结允许通过 token 校验
status, _ := s.blacklistDB.GetStatus(ctx, claims.UserID)
if status == model.UserStatusBlacklist {
if kickErr := s.forceKickOffAllPlatforms(ctx, claims.UserID); kickErr != nil {
log.ZWarn(ctx, "parseToken forceKickOffAllPlatforms failed", kickErr, "userID", claims.UserID)
}
log.ZWarn(ctx, "parseToken is blocked", errors.New("user is in global blacklist, userID="+claims.UserID), "userID", claims.UserID, "blocked", blocked)
log.ZWarn(ctx, "parseToken is blocked", errors.New("user is in global blacklist, userID="+claims.UserID), "userID", claims.UserID, "status", status)
return nil, servererrs.ErrUserBlocked.WithDetail("user is in global blacklist, userID=" + claims.UserID)
}
m, err := s.authDatabase.GetTokensWithoutError(ctx, claims.UserID, claims.PlatformID)

@ -23,6 +23,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
@ -47,6 +48,7 @@ import (
type conversationServer struct {
pbconversation.UnimplementedConversationServer
conversationDatabase controller.ConversationDatabase
msgBurnDeadlineDB database.MsgBurnDeadline
conversationNotificationSender *ConversationNotificationSender
config *Config
@ -79,6 +81,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil {
return err
}
msgBurnDeadlineDB, err := mgo.NewMsgBurnDeadlineMongo(mgocli.GetDB())
if err != nil {
return err
}
userConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.User)
if err != nil {
return err
@ -97,9 +103,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
conversationNotificationSender: NewConversationNotificationSender(&config.NotificationConfig, msgClient),
conversationDatabase: controller.NewConversationDatabase(conversationDB,
redis.NewConversationRedis(rdb, &config.LocalCacheConfig, redis.GetRocksCacheOptions(), conversationDB), mgocli.GetTx()),
userClient: rpcli.NewUserClient(userConn),
groupClient: rpcli.NewGroupClient(groupConn),
msgClient: msgClient,
msgBurnDeadlineDB: msgBurnDeadlineDB,
userClient: rpcli.NewUserClient(userConn),
groupClient: rpcli.NewGroupClient(groupConn),
msgClient: msgClient,
})
return nil
}
@ -823,3 +830,51 @@ func (c *conversationServer) setConversationMinSeqAndLatestMsgDestructTime(ctx c
c.conversationNotificationSender.ConversationChangeNotification(ctx, ownerUserID, []string{conversationID})
return nil
}
// ClearBurnExpiredMsgs 处理「阅后即焚」过期消息:
// 1. 从 msg_burn_deadline 中拉取一批过期分组(按 user/conversation 聚合,含每组最大 seq
// 2. 对每个分组把用户在该会话上的 min_seq 推进到 max(过期 seq) + 1。
// 3. 同步更新 conversation 文档的 min_seq 字段并下发会话变更通知。
// 4. 删除已处理的 deadline 记录。
//
// 单次最多处理 req.Limit 个分组;若返回的 count == limitcron 可继续触发。
func (c *conversationServer) ClearBurnExpiredMsgs(ctx context.Context, req *pbconversation.ClearBurnExpiredMsgsReq) (*pbconversation.ClearBurnExpiredMsgsResp, error) {
if c.msgBurnDeadlineDB == nil {
return &pbconversation.ClearBurnExpiredMsgsResp{Count: 0}, nil
}
limit := int(req.Limit)
if limit <= 0 {
limit = 100
}
groups, err := c.msgBurnDeadlineDB.FindExpiredGroups(ctx, req.Timestamp, limit)
if err != nil {
return nil, err
}
var processed int32
for _, g := range groups {
if g.UserID == "" || g.ConversationID == "" || g.MaxSeq <= 0 {
continue
}
newMinSeq := g.MaxSeq + 1
if err := c.msgClient.SetUserConversationMin(ctx, g.ConversationID, []string{g.UserID}, newMinSeq); err != nil {
log.ZError(ctx, "ClearBurnExpiredMsgs SetUserConversationMin failed", err,
"userID", g.UserID, "conversationID", g.ConversationID, "minSeq", newMinSeq)
continue
}
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, []string{g.UserID}, g.ConversationID,
map[string]any{"min_seq": newMinSeq}); err != nil {
log.ZError(ctx, "ClearBurnExpiredMsgs UpdateUsersConversationField failed", err,
"userID", g.UserID, "conversationID", g.ConversationID, "minSeq", newMinSeq)
continue
}
c.conversationNotificationSender.ConversationChangeNotification(ctx, g.UserID, []string{g.ConversationID})
if err := c.msgBurnDeadlineDB.DeleteByUserConversationSeqs(ctx, g.UserID, g.ConversationID, g.Seqs); err != nil {
log.ZError(ctx, "ClearBurnExpiredMsgs DeleteByUserConversationSeqs failed", err,
"userID", g.UserID, "conversationID", g.ConversationID, "seqs", g.Seqs)
}
log.ZDebug(ctx, "ClearBurnExpiredMsgs advanced min_seq", "userID", g.UserID,
"conversationID", g.ConversationID, "minSeq", newMinSeq, "seqs", g.Seqs)
processed++
}
return &pbconversation.ClearBurnExpiredMsgsResp{Count: processed}, nil
}

@ -67,6 +67,7 @@ type groupServer struct {
msgClient *rpcli.MsgClient
conversationClient *rpcli.ConversationClient
cryptoClient *rpcli.CryptoClient
relationClient *rpcli.RelationClient
}
type Config struct {
@ -101,6 +102,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil {
return err
}
groupPinnedMsgDB, err := mgo.NewGroupPinnedMsgMongo(mgocli.GetDB())
if err != nil {
return err
}
//userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID)
//msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg)
@ -118,6 +123,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil {
return err
}
friendConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Friend)
if err != nil {
return err
}
//cryptoConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Crypto)
//if err != nil {
// return err
@ -128,9 +137,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
userClient: rpcli.NewUserClient(userConn),
msgClient: rpcli.NewMsgClient(msgConn),
conversationClient: rpcli.NewConversationClient(conversationConn),
relationClient: rpcli.NewRelationClient(friendConn),
//cryptoClient: rpcli.NewCryptoClient(cryptoConn),
}
gs.db = controller.NewGroupDatabase(rdb, &config.LocalCacheConfig, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs))
gs.db = controller.NewGroupDatabase(rdb, &config.LocalCacheConfig, groupDB, groupMemberDB, groupRequestDB, groupPinnedMsgDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs))
gs.notification = NewNotificationSender(gs.db, config, gs.userClient, gs.msgClient, gs.conversationClient)
localcache.InitLocalCache(&config.LocalCacheConfig)
pbgroup.RegisterGroupServer(server, &gs)
@ -453,6 +463,31 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite
return nil, errs.ErrRecordNotFound.WrapMsg("user not found")
}
// 检查受邀用户的群邀请权限设置(管理员操作跳过)
if !authverify.IsAppManagerUid(ctx, s.config.Share.IMAdminUserID) {
inviterID := mcontext.GetOpUserID(ctx)
for _, userID := range req.InvitedUserIDs {
info, ok := userMap[userID]
if !ok {
continue
}
switch info.GroupInviteSetting {
case 2: // GroupInviteSettingNobody所有人不可邀请
return nil, errs.ErrNoPermission.WrapMsg("user has disabled group invitations", "userID", userID)
case 1: // GroupInviteSettingFriends仅好友可邀请
isFriend, err := s.relationClient.IsFriend(ctx, inviterID, userID)
if err != nil {
log.ZError(ctx, "InviteUserToGroup: IsFriend check failed", err,
"inviterID", inviterID, "invitedUserID", userID)
return nil, err
}
if !isFriend {
return nil, errs.ErrNoPermission.WrapMsg("user only allows friends to invite them to groups", "userID", userID)
}
}
}
}
var groupMember *model.GroupMember
var opUserID string
if !authverify.IsAppManagerUid(ctx, s.config.Share.IMAdminUserID) {

@ -852,6 +852,32 @@ func (g *NotificationSender) GroupMemberSetToAdminNotification(ctx context.Conte
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberSetToAdminNotification, tips)
}
// GroupMessagePinnedNotification 通知群成员有消息被置顶或取消置顶
// pinType: 1=置顶, 2=取消置顶
func (g *NotificationSender) GroupMessagePinnedNotification(ctx context.Context, groupID string, pinType int32,
pinned *sdkws.GroupPinnedMsgInfo, pinnedList []*sdkws.GroupPinnedMsgInfo) {
var err error
defer func() {
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
groupInfo, err := g.getGroupInfo(ctx, groupID)
if err != nil {
return
}
tips := &sdkws.GroupMessagePinnedTips{
Group: groupInfo,
Type: pinType,
PinnedMsg: pinned,
PinnedList: pinnedList,
}
if err = g.fillOpUser(ctx, &tips.OpUser, groupID); err != nil {
return
}
g.Notification(ctx, mcontext.GetOpUserID(ctx), groupID, constant.GroupMessagePinnedNotification, tips)
}
func (g *NotificationSender) GroupMemberSetToOrdinaryUserNotification(ctx context.Context, groupID, groupMemberUserID string) {
var err error
defer func() {

@ -0,0 +1,271 @@
// Copyright © 2026 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
package group
import (
"context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/protocol/constant"
pbgroup "github.com/openimsdk/protocol/group"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/mcontext"
)
// 群置顶消息相关 RPC 实现:
// - 自动滚动保留最近 N 条置顶消息N=model.GroupPinnedMsgMaxKeep默认为 3
// - 置顶时把整条消息内容做完整快照存档,避免后续消息删除/撤回影响展示
// - 每条置顶记录拥有唯一 pinID作为 unpin 时的精准删除凭据
// - 权限:默认全员可置顶;当 group.AllowPinMsg=1 时,仅群主/管理员可置顶或取消置顶
const (
groupPinnedActionPin = int32(1)
groupPinnedActionUnpin = int32(2)
)
// PinGroupMessage 群聊中置顶单条消息
func (s *groupServer) PinGroupMessage(ctx context.Context, req *pbgroup.PinGroupMessageReq) (*pbgroup.PinGroupMessageResp, error) {
if req.GroupID == "" {
return nil, errs.ErrArgs.WrapMsg("groupID empty")
}
if req.Seq <= 0 {
return nil, errs.ErrArgs.WrapMsg("seq must be positive")
}
group, err := s.db.TakeGroup(ctx, req.GroupID)
if err != nil {
return nil, err
}
if group.Status == constant.GroupStatusDismissed {
return nil, servererrs.ErrDismissedAlready.Wrap()
}
if err := s.checkPinPermission(ctx, group); err != nil {
return nil, err
}
conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupID)
msgData, err := s.msgClient.GetSingleMsgBySeq(ctx, conversationID, req.Seq)
if err != nil {
return nil, err
}
if msgData == nil {
return nil, servererrs.ErrRecordNotFound.WrapMsg("message not found by seq")
}
if msgData.GroupID != "" && msgData.GroupID != req.GroupID {
return nil, errs.ErrArgs.WrapMsg("seq does not belong to this group")
}
if msgData.Status >= constant.MsgStatusHasDeleted {
return nil, servererrs.ErrRecordNotFound.WrapMsg("message has been deleted")
}
pin := buildPinSnapshot(req.GroupID, conversationID, mcontext.GetOpUserID(ctx), msgData)
pinnedList, err := s.db.PinGroupMessage(ctx, req.GroupID, pin)
if err != nil {
return nil, err
}
pbPinned := pinnedMsgDB2PB(pin)
pbList := pinnedListDB2PB(pinnedList)
s.notification.GroupMessagePinnedNotification(ctx, req.GroupID, groupPinnedActionPin, pbPinned, pbList)
return &pbgroup.PinGroupMessageResp{
PinnedMsg: pbPinned,
PinnedList: pbList,
}, nil
}
// UnpinGroupMessage 群聊中取消置顶单条消息pinID 优先;为空则按 seq
func (s *groupServer) UnpinGroupMessage(ctx context.Context, req *pbgroup.UnpinGroupMessageReq) (*pbgroup.UnpinGroupMessageResp, error) {
if req.GroupID == "" {
return nil, errs.ErrArgs.WrapMsg("groupID empty")
}
if req.PinID == "" && req.Seq <= 0 {
return nil, errs.ErrArgs.WrapMsg("either pinID or seq must be provided")
}
group, err := s.db.TakeGroup(ctx, req.GroupID)
if err != nil {
return nil, err
}
if group.Status == constant.GroupStatusDismissed {
return nil, servererrs.ErrDismissedAlready.Wrap()
}
if err := s.checkPinPermission(ctx, group); err != nil {
return nil, err
}
current, err := s.db.GetGroupPinnedMessages(ctx, req.GroupID)
if err != nil {
return nil, err
}
var target *model.GroupPinnedMessage
for _, m := range current {
if req.PinID != "" {
if m.PinID == req.PinID {
target = m
break
}
} else if m.Seq == req.Seq {
target = m
break
}
}
if target == nil {
return nil, servererrs.ErrRecordNotFound.WrapMsg("pinned message not found")
}
pinnedList, err := s.db.UnpinGroupMessage(ctx, req.GroupID, req.PinID, req.Seq)
if err != nil {
return nil, err
}
pbPinned := pinnedMsgDB2PB(target)
pbList := pinnedListDB2PB(pinnedList)
s.notification.GroupMessagePinnedNotification(ctx, req.GroupID, groupPinnedActionUnpin, pbPinned, pbList)
return &pbgroup.UnpinGroupMessageResp{PinnedList: pbList}, nil
}
// GetGroupPinnedMessages 获取群置顶消息列表
func (s *groupServer) GetGroupPinnedMessages(ctx context.Context, req *pbgroup.GetGroupPinnedMessagesReq) (*pbgroup.GetGroupPinnedMessagesResp, error) {
if req.GroupID == "" {
return nil, errs.ErrArgs.WrapMsg("groupID empty")
}
if err := s.checkAdminOrInGroup(ctx, req.GroupID); err != nil {
return nil, err
}
pinnedList, err := s.db.GetGroupPinnedMessages(ctx, req.GroupID)
if err != nil {
return nil, err
}
return &pbgroup.GetGroupPinnedMessagesResp{
PinnedList: pinnedListDB2PB(pinnedList),
}, nil
}
// checkPinPermission 校验当前操作者是否具备群消息置顶权限
func (s *groupServer) checkPinPermission(ctx context.Context, group *model.Group) error {
if authverify.IsAppManagerUid(ctx, s.config.Share.IMAdminUserID) {
return nil
}
opUserID := mcontext.GetOpUserID(ctx)
if opUserID == "" {
return errs.ErrNoPermission.WrapMsg("op user id empty")
}
member, err := s.db.TakeGroupMember(ctx, group.GroupID, opUserID)
if err != nil {
return err
}
isOwnerOrAdmin := member.RoleLevel == constant.GroupOwner || member.RoleLevel == constant.GroupAdmin
if group.AllowPinMsg == model.GroupPermAdminOnly && !isOwnerOrAdmin {
return errs.ErrNoPermission.WrapMsg("only owner or admin can pin/unpin group message")
}
return nil
}
// buildPinSnapshot 把 sdkws.MsgData 完整快照成 GroupPinnedMessage
// PinID 在 mgo 层 Pin 时若为空会自动生成;这里留空交由存储层处理
func buildPinSnapshot(groupID, conversationID, opUserID string, m *sdkws.MsgData) *model.GroupPinnedMessage {
pin := &model.GroupPinnedMessage{
GroupID: groupID,
ConversationID: conversationID,
Seq: m.Seq,
ServerMsgID: m.ServerMsgID,
ClientMsgID: m.ClientMsgID,
SendID: m.SendID,
RecvID: m.RecvID,
SenderPlatformID: m.SenderPlatformID,
SenderNickname: m.SenderNickname,
SenderFaceURL: m.SenderFaceURL,
SessionType: m.SessionType,
MsgFrom: m.MsgFrom,
ContentType: m.ContentType,
Content: string(m.Content),
AtUserIDList: append([]string(nil), m.AtUserIDList...),
Options: copyOptions(m.Options),
AttachedInfo: m.AttachedInfo,
Ex: m.Ex,
SendTime: m.SendTime,
CreateTime: m.CreateTime,
Status: m.Status,
PinUserID: opUserID,
PinTime: time.Now().UnixMilli(),
}
if m.OfflinePushInfo != nil {
pin.OfflinePush = &model.GroupPinnedOfflinePush{
Title: m.OfflinePushInfo.Title,
Desc: m.OfflinePushInfo.Desc,
Ex: m.OfflinePushInfo.Ex,
IOSPushSound: m.OfflinePushInfo.IOSPushSound,
IOSBadgeCount: m.OfflinePushInfo.IOSBadgeCount,
SignalInfo: m.OfflinePushInfo.SignalInfo,
}
}
return pin
}
func copyOptions(src map[string]bool) map[string]bool {
if len(src) == 0 {
return nil
}
dst := make(map[string]bool, len(src))
for k, v := range src {
dst[k] = v
}
return dst
}
func pinnedMsgDB2PB(m *model.GroupPinnedMessage) *sdkws.GroupPinnedMsgInfo {
if m == nil {
return nil
}
return &sdkws.GroupPinnedMsgInfo{
PinID: m.PinID,
GroupID: m.GroupID,
ConversationID: m.ConversationID,
Seq: m.Seq,
ServerMsgID: m.ServerMsgID,
ClientMsgID: m.ClientMsgID,
SendID: m.SendID,
RecvID: m.RecvID,
SenderPlatformID: m.SenderPlatformID,
SenderNickname: m.SenderNickname,
SenderFaceURL: m.SenderFaceURL,
SessionType: m.SessionType,
MsgFrom: m.MsgFrom,
ContentType: m.ContentType,
Content: m.Content,
AtUserIDList: append([]string(nil), m.AtUserIDList...),
Options: copyOptions(m.Options),
AttachedInfo: m.AttachedInfo,
Ex: m.Ex,
SendTime: m.SendTime,
CreateTime: m.CreateTime,
Status: m.Status,
PinUserID: m.PinUserID,
PinTime: m.PinTime,
}
}
func pinnedListDB2PB(list []*model.GroupPinnedMessage) []*sdkws.GroupPinnedMsgInfo {
if len(list) == 0 {
return nil
}
result := make([]*sdkws.GroupPinnedMsgInfo, 0, len(list))
for _, m := range list {
result = append(result, pinnedMsgDB2PB(m))
}
return result
}

@ -17,9 +17,12 @@ package msg
import (
"context"
"errors"
"time"
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs"
@ -126,6 +129,7 @@ func (m *msgServer) MarkMsgsAsRead(ctx context.Context, req *msg.MarkMsgsAsReadR
ContentType: conversation.ConversationType,
}
m.webhookAfterSingleMsgRead(ctx, &m.config.WebhooksConfig.AfterSingleMsgRead, reqCallback)
m.recordBurnDeadlines(ctx, conversation, req.UserID, req.Seqs)
m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID,
m.conversationAndGetRecvID(conversation, req.UserID), req.Seqs, hasReadSeq)
return &msg.MarkMsgsAsReadResp{}, nil
@ -166,6 +170,7 @@ func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkCon
}
hasReadSeq = req.HasReadSeq
}
m.recordBurnDeadlines(ctx, conversation, req.UserID, seqs)
m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID,
m.conversationAndGetRecvID(conversation, req.UserID), seqs, hasReadSeq)
} else if conversation.ConversationType == constant.ReadGroupChatType ||
@ -201,6 +206,64 @@ func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkCon
return &msg.MarkConversationAsReadResp{}, nil
}
// recordBurnDeadlines 在「单聊」场景下,根据对端(发送者)的 MsgBurnDuration
// 为本次已读的每条消息同时给接收者和发送者各记录一份「阅后即焚」截止时间。
// cron 到期后会分别推进两人各自的 min_seq双方都看不到该消息。
//
// 设计要点:
// 1. 仅单聊。
// 2. 仅当发送者 MsgBurnDuration > 0 时才记录0 表示未开启。
// 3. $setOnInsert 确保同一 (UserID, ConversationID, Seq) 已存在时不覆盖,
// 以「首次阅读时刻」为 deadline 基准,多端重复 MarkAsRead 不会往后推。
// 4. 失败仅记录日志,不影响已读主流程。
func (m *msgServer) recordBurnDeadlines(ctx context.Context, conv *conversation.Conversation, readerUserID string, seqs []int64) {
if len(seqs) == 0 {
return
}
if conv.ConversationType != constant.SingleChatType {
return
}
peerID := m.conversationAndGetRecvID(conv, readerUserID)
if peerID == "" || peerID == readerUserID {
return
}
peerInfo, err := m.UserLocalCache.GetUserInfo(ctx, peerID)
if err != nil {
log.ZWarn(ctx, "recordBurnDeadlines GetUserInfo failed", err, "peerID", peerID)
return
}
if peerInfo == nil || peerInfo.MsgBurnDuration <= 0 {
return
}
now := time.Now().UnixMilli()
deadline := now + int64(peerInfo.MsgBurnDuration)*1000
// 每条消息同时为接收者和发送者各写一条 deadline双方消息同步焚毁。
items := make([]*model.MsgBurnDeadline, 0, len(seqs)*2)
for _, seq := range seqs {
items = append(items,
&model.MsgBurnDeadline{
UserID: readerUserID,
ConversationID: conv.ConversationID,
Seq: seq,
DeadlineMs: deadline,
CreateTime: now,
},
&model.MsgBurnDeadline{
UserID: peerID,
ConversationID: conv.ConversationID,
Seq: seq,
DeadlineMs: deadline,
CreateTime: now,
},
)
}
if err := m.msgBurnDeadlineDB.UpsertIfAbsent(ctx, items); err != nil {
log.ZError(ctx, "recordBurnDeadlines UpsertIfAbsent failed", err,
"readerUserID", readerUserID, "peerID", peerID,
"conversationID", conv.ConversationID, "seqs", seqs)
}
}
func (m *msgServer) sendMarkAsReadNotification(ctx context.Context, conversationID string, sessionType int32, sendID, recvID string, seqs []int64, hasReadSeq int64) {
tips := &sdkws.MarkAsReadTips{
MarkAsReadUserID: sendID,

@ -34,6 +34,10 @@ import (
func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg.SendMsgResp, error) {
if req.MsgData != nil {
m.encapsulateMsgData(req.MsgData)
// 全局账号状态校验:冻结/黑名单用户不可收发消息
if err := m.verifyUserStatus(ctx, req); err != nil {
return nil, err
}
switch req.MsgData.SessionType {
case constant.SingleChatType:
return m.sendMsgSingleChat(ctx, req)

@ -71,6 +71,8 @@ type msgServer struct {
webhookClient *webhook.Client
conversationClient *rpcli.ConversationClient
spamReportDB database.SpamReport
globalBlackDB controller.UserGlobalBlackDatabase
msgBurnDeadlineDB database.MsgBurnDeadline
}
func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorFunc) {
@ -127,6 +129,14 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil {
return err
}
globalBlackMgo, err := mgo.NewUserGlobalBlackMongo(mgocli.GetDB())
if err != nil {
return err
}
msgBurnDeadlineDB, err := mgo.NewMsgBurnDeadlineMongo(mgocli.GetDB())
if err != nil {
return err
}
s := &msgServer{
MsgDatabase: msgDatabase,
RegisterCenter: client,
@ -138,6 +148,8 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
conversationClient: conversationClient,
spamReportDB: spamReportDB,
globalBlackDB: controller.NewUserGlobalBlackDatabase(globalBlackMgo),
msgBurnDeadlineDB: msgBurnDeadlineDB,
}
s.notificationSender = notification.NewNotificationSender(&config.NotificationConfig, notification.WithLocalSendMsg(s.SendMsg))

@ -21,6 +21,7 @@ import (
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
@ -50,6 +51,43 @@ type MessageRevoked struct {
Seq uint32 `json:"seq"`
}
// verifyUserStatus 校验发送方/接收方的全局账号状态。
// 任意一方处于冻结(1)或黑名单(2)即拒绝消息发送/投递。
// 通知类消息NotificationBegin~NotificationEnd和管理员发送方放行。
func (m *msgServer) verifyUserStatus(ctx context.Context, data *msg.SendMsgReq) error {
if data == nil || data.MsgData == nil {
return nil
}
if data.MsgData.ContentType >= constant.NotificationBegin && data.MsgData.ContentType <= constant.NotificationEnd {
return nil
}
sendID := data.MsgData.SendID
if datautil.Contain(sendID, m.config.Share.IMAdminUserID...) {
return nil
}
if sendID != "" {
st, err := m.globalBlackDB.GetStatus(ctx, sendID)
if err != nil {
log.ZWarn(ctx, "verifyUserStatus: GetStatus(send) failed", err, "sendID", sendID)
} else if st == model.UserStatusFrozen || st == model.UserStatusBlacklist {
return servererrs.ErrUserBlocked.WithDetail("sender is restricted, status=" + strconv.Itoa(int(st)))
}
}
// 单聊:同时校验接收方状态;群聊接收方拦截在推送层处理
if data.MsgData.SessionType == constant.SingleChatType {
recvID := data.MsgData.RecvID
if recvID != "" && !datautil.Contain(recvID, m.config.Share.IMAdminUserID...) {
st, err := m.globalBlackDB.GetStatus(ctx, recvID)
if err != nil {
log.ZWarn(ctx, "verifyUserStatus: GetStatus(recv) failed", err, "recvID", recvID)
} else if st == model.UserStatusFrozen || st == model.UserStatusBlacklist {
return servererrs.ErrMsgReceiveNotAllowed.WrapMsg("receiver is restricted")
}
}
}
return nil
}
func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgReq) error {
switch data.MsgData.SessionType {
case constant.SingleChatType:

@ -314,7 +314,11 @@ func (s *friendServer) GetFriendInfo(ctx context.Context, req *relation.GetFrien
if err != nil {
return nil, err
}
return &relation.GetFriendInfoResp{FriendInfos: convert.FriendOnlyDB2PbOnly(friends)}, nil
users, err := s.userClient.GetUsersInfoMap(ctx, req.FriendUserIDs)
if err != nil {
return nil, err
}
return &relation.GetFriendInfoResp{FriendInfos: convert.FriendOnlyDB2PbOnly(friends, users)}, nil
}
func (s *friendServer) GetDesignatedFriends(ctx context.Context, req *relation.GetDesignatedFriendsReq) (resp *relation.GetDesignatedFriendsResp, err error) {
@ -693,18 +697,24 @@ func (s *friendServer) AddOnewayFriend(ctx context.Context, req *relation.ApplyT
if in1 {
return nil, servererrs.ErrRelationshipAlready.WrapMsg("already in friend list")
}
if err := s.db.BecomeOnewayFriend(ctx, req.FromUserID, req.ToUserID, becomeFriendByOneway); err != nil {
if err := s.db.BecomeOnewayFriend(ctx, req.FromUserID, req.ToUserID, becomeFriendByOneway, req.Remark); err != nil {
return nil, err
}
// Silently notify only A (FromUserID) to trigger an incremental friend-list sync
// so the remark is reflected in the conversation list.
// B (ToUserID) receives no notification of any kind.
s.notificationSender.FriendAddedOnewayNotification(ctx, req.FromUserID, req.ToUserID)
// Notify only A (FromUserID) so incremental friend sync is triggered
// without notifying B (ToUserID).
tips := sdkws.FriendApplicationApprovedTips{
FromToUserID: &sdkws.FromToUserID{
FromUserID: req.FromUserID,
ToUserID: req.ToUserID,
},
}
s.notificationSender.Notification(ctx, req.FromUserID, req.FromUserID, constant.FriendApplicationApprovedNotification, &tips)
//tips := sdkws.FriendApplicationApprovedTips{
// FromToUserID: &sdkws.FromToUserID{
// FromUserID: req.FromUserID,
// ToUserID: req.ToUserID,
// },
//}
//s.notificationSender.Notification(ctx, req.FromUserID, req.FromUserID, constant.FriendApplicationApprovedNotification, &tips)
return &relation.ApplyToAddFriendResp{}, nil
}

@ -282,6 +282,19 @@ func (f *FriendNotificationSender) FriendsInfoUpdateNotification(ctx context.Con
f.Notification(ctx, toUserID, toUserID, constant.FriendsInfoUpdateNotification, &tips)
}
// FriendAddedOnewayNotification silently notifies ownerUserID that friendUserID has been added
// to their friend list (one-way, no consent from friendUserID required).
// isSendMsg=false ensures no visible message appears in either user's conversation list.
func (f *FriendNotificationSender) FriendAddedOnewayNotification(ctx context.Context, ownerUserID, friendUserID string) {
tips := sdkws.FriendsInfoUpdateTips{
FromToUserID: &sdkws.FromToUserID{ToUserID: ownerUserID},
FriendIDs: []string{friendUserID},
}
f.setSortVersion(ctx, &tips.FriendVersion, &tips.FriendVersionID,
database.FriendVersionName, ownerUserID, &tips.FriendSortVersion)
f.Notification(ctx, ownerUserID, ownerUserID, constant.FriendsInfoUpdateNotification, &tips)
}
func (f *FriendNotificationSender) BlackAddedNotification(ctx context.Context, req *relation.AddBlackReq) {
tips := sdkws.BlackAddedTips{FromToUserID: &sdkws.FromToUserID{}}
tips.FromToUserID.FromUserID = req.OwnerUserID

@ -108,6 +108,30 @@ func (s *rtcServer) handleInvite(ctx context.Context, req *rtc.SignalInviteReq,
}
}
// 检测哪些被叫用户正忙(已在通话中),记录到 BusyLineUserIDList
busyUserIDs, err := s.db.GetBusyUserIDs(ctx, inv.InviteeUserIDList)
if err != nil {
log.ZWarn(ctx, "handleInvite: GetBusyUserIDs failed (non-fatal)", err)
}
busySet := make(map[string]struct{}, len(busyUserIDs))
for _, uid := range busyUserIDs {
busySet[uid] = struct{}{}
}
inv.BusyLineUserIDList = busyUserIDs
// 从主叫用户资料获取铃声 URL注入到邀请信息中被叫方收到后播放主叫方铃声
if inviterInfo, err := s.userClient.GetUserInfo(ctx, req.UserID); err == nil && inviterInfo.CallRingtoneURL != "" {
inv.CallerRingtoneURL = inviterInfo.CallRingtoneURL
}
// 查询被叫方铃声 URL供主叫方在等待时播放
var calleeRingtoneURL string
if len(inv.InviteeUserIDList) > 0 {
if inviteeInfo, err := s.userClient.GetUserInfo(ctx, inv.InviteeUserIDList[0]); err == nil {
calleeRingtoneURL = inviteeInfo.CallRingtoneURL
}
}
if _, err := s.roomClient.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: inv.RoomID}); err != nil {
log.ZError(ctx, "handleInvite", err, "r", err.Error())
return nil, errs.WrapMsg(err, "LiveKit CreateRoom failed", "roomID", inv.RoomID)
@ -138,6 +162,10 @@ func (s *rtcServer) handleInvite(ctx context.Context, req *rtc.SignalInviteReq,
}
for _, inviteeID := range inv.InviteeUserIDList {
if _, busy := busySet[inviteeID]; busy {
log.ZInfo(ctx, "handleInvite: skip busy invitee", "inviteeID", inviteeID)
continue
}
log.ZInfo(ctx, "sendSignalingNotification to invitee", "sendID", req.UserID, "recvID", inviteeID)
if err := s.sendSignalingNotification(ctx, req.UserID, inviteeID, int32(constant.SingleChatType), req.OfflinePushInfo, content); err != nil {
log.ZError(ctx, "sendSignalingNotification to invitee failed", err, "inviteeID", inviteeID)
@ -147,9 +175,11 @@ func (s *rtcServer) handleInvite(ctx context.Context, req *rtc.SignalInviteReq,
log.ZDebug(ctx, "handleInvite", "token", token, "roomID", inv.RoomID, "liveURL", s.config.RpcConfig.LiveKit.ExternalAddress)
return &rtc.SignalInviteResp{
Token: token,
RoomID: inv.RoomID,
LiveURL: s.config.RpcConfig.LiveKit.ExternalAddress,
Token: token,
RoomID: inv.RoomID,
LiveURL: s.config.RpcConfig.LiveKit.ExternalAddress,
BusyLineUserIDList: busyUserIDs,
CalleeRingtoneURL: calleeRingtoneURL,
}, nil
}
@ -164,6 +194,30 @@ func (s *rtcServer) handleInviteInGroup(ctx context.Context, req *rtc.SignalInvi
inv.InviterUserID = req.UserID
inv.InitiateTime = time.Now().UnixMilli()
// 检测哪些被叫用户正忙(已在通话中),记录到 BusyLineUserIDList
busyUserIDs, err := s.db.GetBusyUserIDs(ctx, inv.InviteeUserIDList)
if err != nil {
log.ZWarn(ctx, "handleInviteInGroup: GetBusyUserIDs failed (non-fatal)", err)
}
busySet := make(map[string]struct{}, len(busyUserIDs))
for _, uid := range busyUserIDs {
busySet[uid] = struct{}{}
}
inv.BusyLineUserIDList = busyUserIDs
// 从主叫用户资料获取铃声 URL注入到邀请信息中被叫方收到后播放主叫方铃声
if inviterInfo, err := s.userClient.GetUserInfo(ctx, req.UserID); err == nil && inviterInfo.CallRingtoneURL != "" {
inv.CallerRingtoneURL = inviterInfo.CallRingtoneURL
}
// 查询第一位被叫的铃声 URL供主叫方在等待时播放
var calleeRingtoneURL string
if len(inv.InviteeUserIDList) > 0 {
if inviteeInfo, err := s.userClient.GetUserInfo(ctx, inv.InviteeUserIDList[0]); err == nil {
calleeRingtoneURL = inviteeInfo.CallRingtoneURL
}
}
if _, err := s.roomClient.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: inv.RoomID}); err != nil {
return nil, errs.WrapMsg(err, "LiveKit CreateRoom failed", "roomID", inv.RoomID)
}
@ -200,15 +254,21 @@ func (s *rtcServer) handleInviteInGroup(ctx context.Context, req *rtc.SignalInvi
log.ZInfo(ctx, "handleInviteInGroup: skipping invitee (call setting blocked)", "inviteeID", inviteeID)
continue
}
if _, busy := busySet[inviteeID]; busy {
log.ZInfo(ctx, "handleInviteInGroup: skip busy invitee", "inviteeID", inviteeID)
continue
}
if err := s.sendSignalingNotification(ctx, req.UserID, inviteeID, int32(constant.ReadGroupChatType), req.OfflinePushInfo, content); err != nil {
log.ZWarn(ctx, "sendSignalingNotification to group invitee failed", err, "inviteeID", inviteeID)
}
}
return &rtc.SignalInviteInGroupResp{
Token: token,
RoomID: inv.RoomID,
LiveURL: s.config.RpcConfig.LiveKit.ExternalAddress,
Token: token,
RoomID: inv.RoomID,
LiveURL: s.config.RpcConfig.LiveKit.ExternalAddress,
BusyLineUserIDList: busyUserIDs,
CalleeRingtoneURL: calleeRingtoneURL,
}, nil
}
@ -269,12 +329,12 @@ func (s *rtcServer) handleAccept(ctx context.Context, req *rtc.SignalAcceptReq,
log.ZWarn(ctx, "sendSignalingNotification accept to inviter failed", err, "inviterID", dbInv.InviterUserID)
}
// TODO: 群通话可通过 RemoveInvitee 实现精细化状态管理
if dbInv.GroupID == "" {
if err := s.db.DeleteInvitation(ctx, dbInv.RoomID); err != nil {
log.ZWarn(ctx, "handleAccept: DeleteInvitation failed (non-fatal)", err, "roomID", dbInv.RoomID)
}
}
// 接受邀请后不删除 invitation通话仍在进行双方应被标记为忙线BusyLineUserIDList
// invitation 的清理由以下路径负责:
// - 主动挂断handleHungUp → DeleteInvitation
// - 主叫取消handleCancel → DeleteInvitation
// - 被叫拒绝handleReject → DeleteInvitation / RemoveInvitee
// - 异常中断MongoDB TTL 索引expire_at 字段)自动清理
return &rtc.SignalAcceptResp{
Token: token,
@ -599,6 +659,7 @@ func signalingMsgOptions() map[string]bool {
// IsNotNotification=false 表示"这是通知消息",让 IsNotificationByMsg 返回 true
// 从而跳过 modifyMessageByUserMessageReceiveOpt 中的黑名单/好友关系等校验
datautil.SetSwitchFromOptions(opts, constant.IsNotNotification, false)
datautil.SetSwitchFromOptions(opts, constant.IsSendMsg, false)
datautil.SetSwitchFromOptions(opts, constant.IsHistory, false)
datautil.SetSwitchFromOptions(opts, constant.IsPersistent, false)
datautil.SetSwitchFromOptions(opts, constant.IsUnreadCount, false)

@ -2,7 +2,10 @@ package user
import (
"context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/protocol/constant"
@ -80,10 +83,49 @@ func (s *userServer) SetUserOnlineStatus(ctx context.Context, req *pbuser.SetUse
if err := s.online.SetUserOnline(ctx, status.UserID, status.Online, status.Offline); err != nil {
return nil, err
}
s.updateOfflineRecord(ctx, status.UserID, len(status.Offline) > 0, len(status.Online) > 0)
}
return &pbuser.SetUserOnlineStatusResp{}, nil
}
// updateOfflineRecord 根据用户当前在线状态维护 user_offline_record 集合:
// - 若某平台刚断开且用户已全平台离线 → upsert 离线记录(仅首次写入,保留最早离线时刻)
// - 若某平台刚上线且用户当前有在线平台 → 删除离线记录(停止计时)
func (s *userServer) updateOfflineRecord(ctx context.Context, userID string, hasOffline, hasOnline bool) {
if !hasOffline && !hasOnline {
return
}
platformIDs, err := s.online.GetOnline(ctx, userID)
if err != nil {
log.ZWarn(ctx, "updateOfflineRecord: GetOnline failed", err, "userID", userID)
return
}
if len(platformIDs) == 0 {
// 所有平台已离线,写入离线记录(含预计算的删除截止时间)
offlineTime := time.Now()
deadline := s.calcDeleteDeadline(ctx, userID, offlineTime)
if err := s.userOfflineRecord.Upsert(ctx, userID, offlineTime, deadline); err != nil {
log.ZWarn(ctx, "updateOfflineRecord: Upsert failed", err, "userID", userID)
}
} else if hasOnline {
// 用户重新上线,删除离线记录,停止计时
if err := s.userOfflineRecord.Delete(ctx, userID); err != nil {
log.ZWarn(ctx, "updateOfflineRecord: Delete failed", err, "userID", userID)
}
}
}
// calcDeleteDeadline 查询用户的 delete_account_interval 并计算删除截止时间。
// 若查询失败或 interval 为 0则使用系统默认值18 个月)。
func (s *userServer) calcDeleteDeadline(ctx context.Context, userID string, from time.Time) time.Time {
interval := int32(model.DefaultDeleteAccountIntervalSec)
users, err := s.db.Find(ctx, []string{userID})
if err == nil && len(users) > 0 && users[0].DeleteAccountInterval > 0 {
interval = users[0].DeleteAccountInterval
}
return from.Add(time.Duration(interval) * time.Second)
}
func (s *userServer) GetAllOnlineUsers(ctx context.Context, req *pbuser.GetAllOnlineUsersReq) (*pbuser.GetAllOnlineUsersResp, error) {
resMap, nextCursor, err := s.online.GetAllOnlineUsers(ctx, req.Cursor)
if err != nil {

@ -29,6 +29,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
@ -71,6 +72,7 @@ type userServer struct {
groupClient *rpcli.GroupClient
relationClient *rpcli.RelationClient
globalBlackDB controller.UserGlobalBlackDatabase
userOfflineRecord database.UserOfflineRecord
}
type Config struct {
@ -122,6 +124,10 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi
if err != nil {
return err
}
userOfflineRecordDB, err := mgo.NewUserOfflineRecordMongo(mgocli.GetDB())
if err != nil {
return err
}
localcache.InitLocalCache(&config.LocalCacheConfig)
u := &userServer{
online: redis.NewUserOnline(rdb),
@ -132,9 +138,10 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi
config: config,
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
groupClient: rpcli.NewGroupClient(groupConn),
relationClient: rpcli.NewRelationClient(friendConn),
globalBlackDB: controller.NewUserGlobalBlackDatabase(globalBlackMgo),
groupClient: rpcli.NewGroupClient(groupConn),
relationClient: rpcli.NewRelationClient(friendConn),
globalBlackDB: controller.NewUserGlobalBlackDatabase(globalBlackMgo),
userOfflineRecord: userOfflineRecordDB,
}
pbuser.RegisterUserServer(server, u)
return u.db.InitOnce(context.Background(), users)
@ -389,18 +396,146 @@ func (s *userServer) SetMsgReceiveSetting(ctx context.Context, req *pbuser.SetMs
"msgReceiveSetting", req.MsgReceiveSetting)
return nil, err
}
s.friendNotificationSender.UserInfoUpdatedNotification(ctx, req.UserID)
//s.friendNotificationSender.UserInfoUpdatedNotification(ctx, req.UserID)
return &pbuser.SetMsgReceiveSettingResp{}, nil
}
// SetGroupInviteSetting 设置群邀请权限0=所有人可邀请1=仅好友可邀请2=所有人不可邀请)。
// 只允许本人或管理员操作。
func (s *userServer) SetGroupInviteSetting(ctx context.Context, req *pbuser.SetGroupInviteSettingReq) (*pbuser.SetGroupInviteSettingResp, error) {
if req.UserID == "" {
return nil, errs.ErrArgs.WrapMsg("userID is required")
}
if req.GroupInviteSetting < 0 || req.GroupInviteSetting > 2 {
return nil, errs.ErrArgs.WrapMsg("groupInviteSetting must be 0, 1 or 2")
}
if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil {
log.ZWarn(ctx, "SetGroupInviteSetting: access denied", err,
"opUserID", mcontext.GetOpUserID(ctx), "targetUserID", req.UserID)
return nil, err
}
if _, err := s.db.FindWithError(ctx, []string{req.UserID}); err != nil {
log.ZError(ctx, "SetGroupInviteSetting: user not found or db error", err,
"opUserID", mcontext.GetOpUserID(ctx), "targetUserID", req.UserID)
return nil, err
}
if err := s.db.UpdateByMap(ctx, req.UserID, map[string]any{
"group_invite_setting": req.GroupInviteSetting,
}); err != nil {
log.ZError(ctx, "SetGroupInviteSetting: UpdateByMap failed", err,
"opUserID", mcontext.GetOpUserID(ctx), "targetUserID", req.UserID,
"groupInviteSetting", req.GroupInviteSetting)
return nil, err
}
s.friendNotificationSender.UserInfoUpdatedNotification(ctx, req.UserID)
return &pbuser.SetGroupInviteSettingResp{}, nil
}
// SetUserMsgBurnDuration 设置用户全局消息阅后即焚时长0 表示关闭,要求 >=0。
// 只允许本人或管理员操作。
func (s *userServer) SetUserMsgBurnDuration(ctx context.Context, req *pbuser.SetUserMsgBurnDurationReq) (*pbuser.SetUserMsgBurnDurationResp, error) {
if req.UserID == "" {
return nil, errs.ErrArgs.WrapMsg("userID is required")
}
if req.MsgBurnDuration < 0 {
return nil, errs.ErrArgs.WrapMsg("msgBurnDuration must be >= 0 (seconds)")
}
if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil {
log.ZWarn(ctx, "SetUserMsgBurnDuration: access denied", err,
"opUserID", mcontext.GetOpUserID(ctx), "targetUserID", req.UserID)
return nil, err
}
if _, err := s.db.FindWithError(ctx, []string{req.UserID}); err != nil {
log.ZError(ctx, "SetUserMsgBurnDuration: user not found or db error", err,
"opUserID", mcontext.GetOpUserID(ctx), "targetUserID", req.UserID)
return nil, err
}
if err := s.db.UpdateByMap(ctx, req.UserID, map[string]any{
"msg_burn_duration": req.MsgBurnDuration,
}); err != nil {
log.ZError(ctx, "SetUserMsgBurnDuration: UpdateByMap failed", err,
"opUserID", mcontext.GetOpUserID(ctx), "targetUserID", req.UserID,
"msgBurnDuration", req.MsgBurnDuration)
return nil, err
}
s.friendNotificationSender.UserInfoUpdatedNotification(ctx, req.UserID)
return &pbuser.SetUserMsgBurnDurationResp{}, nil
}
// SetDeleteAccountInterval 设置用户删除账号等待间隔(秒);
// 0 表示重置为系统默认18个月要求 >= 0。只允许本人或管理员操作。
func (s *userServer) SetDeleteAccountInterval(ctx context.Context, req *pbuser.SetDeleteAccountIntervalReq) (*pbuser.SetDeleteAccountIntervalResp, error) {
if req.UserID == "" {
return nil, errs.ErrArgs.WrapMsg("userID is required")
}
if req.DeleteAccountInterval < 0 {
return nil, errs.ErrArgs.WrapMsg("deleteAccountInterval must be >= 0 (seconds)")
}
if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil {
log.ZWarn(ctx, "SetDeleteAccountInterval: access denied", err,
"opUserID", mcontext.GetOpUserID(ctx), "targetUserID", req.UserID)
return nil, err
}
if _, err := s.db.FindWithError(ctx, []string{req.UserID}); err != nil {
log.ZError(ctx, "SetDeleteAccountInterval: user not found or db error", err,
"opUserID", mcontext.GetOpUserID(ctx), "targetUserID", req.UserID)
return nil, err
}
if err := s.db.UpdateByMap(ctx, req.UserID, map[string]any{
"delete_account_interval": req.DeleteAccountInterval,
}); err != nil {
log.ZError(ctx, "SetDeleteAccountInterval: UpdateByMap failed", err,
"opUserID", mcontext.GetOpUserID(ctx), "targetUserID", req.UserID,
"deleteAccountInterval", req.DeleteAccountInterval)
return nil, err
}
// 若用户当前处于离线状态user_offline_record 中有记录),将 offline_time 与
// delete_user_deadline 刷新为当前时刻及新截止时间,使倒计时从本次设置时刻重新起算。
now := time.Now()
interval := req.DeleteAccountInterval
if interval == 0 {
interval = tablerelation.DefaultDeleteAccountIntervalSec
}
newDeadline := now.Add(time.Duration(interval) * time.Second)
if err := s.userOfflineRecord.RefreshOfflineTime(ctx, req.UserID, now, newDeadline); err != nil {
log.ZWarn(ctx, "SetDeleteAccountInterval: RefreshOfflineTime failed", err,
"userID", req.UserID)
}
return &pbuser.SetDeleteAccountIntervalResp{}, nil
}
// GetUserPrivacySettings 返回当前登录用户ctx opUserID的隐私与接收相关设置。
func (s *userServer) GetUserPrivacySettings(ctx context.Context, req *pbuser.GetUserPrivacySettingsReq) (*pbuser.GetUserPrivacySettingsResp, error) {
userID := mcontext.GetOpUserID(ctx)
if userID == "" {
return nil, errs.ErrArgs.WrapMsg("opUserID is required")
}
users, err := s.db.FindWithError(ctx, []string{userID})
if err != nil {
log.ZError(ctx, "GetUserPrivacySettings: user not found or db error", err,
"opUserID", userID)
return nil, err
}
u := users[0]
return &pbuser.GetUserPrivacySettingsResp{
MsgBurnDuration: u.MsgBurnDuration,
PhoneVisibility: u.PhoneVisibility,
CallAcceptSetting: u.CallAcceptSetting,
GlobalRecvMsgOpt: u.GlobalRecvMsgOpt,
MsgReceiveSetting: u.MsgReceiveSetting,
GroupInviteSetting: u.GroupInviteSetting,
DeleteAccountInterval: u.DeleteAccountInterval,
}, nil
}
// GetUserByPhone 根据精确手机号查询用户。
//
// phoneSearchVisibility=false默认时忽略 phone_visibility任何人均可搜到。
// phoneSearchVisibility=true 时按 phone_visibility 过滤:
// - Hidden(2) → 非管理员不可搜到
// - Friends(1) → 仅好友/管理员可搜到
// - Public(0) → 任何人均可搜到
// phone_visibility 仅控制用户资料中手机号字段是否展示,不影响搜索本身:
// 无论目标用户将手机号设置为何种可见性,只要手机号匹配就能找到该用户。
// 返回的 UserInfo 中 phone 字段仍按 applyPhoneVisibility 规则处理。
//
// 当目标用户 MsgReceiveSetting=2不接受任何人消息对非本人搜索者不可见。
// 返回空 userInfo 并不代表错误,调用方应以 nil userInfo 判断"未找到"。
func (s *userServer) GetUserByPhone(ctx context.Context, req *pbuser.GetUserByPhoneReq) (*pbuser.GetUserByPhoneResp, error) {
if req.Phone == "" {
@ -413,7 +548,7 @@ func (s *userServer) GetUserByPhone(ctx context.Context, req *pbuser.GetUserByPh
dbUser, err := s.db.FindByPhone(ctx, req.Phone)
if err != nil {
if errs.ErrRecordNotFound.Is(err) {
// 手机号未注册,返回空响应而非错误,避免枚举攻击
// 手机号未注册,返回空响应而非错误
return &pbuser.GetUserByPhoneResp{}, nil
}
log.ZError(ctx, "GetUserByPhone: FindByPhone failed", err,
@ -421,39 +556,25 @@ func (s *userServer) GetUserByPhone(ctx context.Context, req *pbuser.GetUserByPh
return nil, err
}
// 仅在 phoneSearchVisibility=true 时才按 phone_visibility 过滤,默认跳过
if s.config.RpcConfig.PhoneSearchVisibility {
callerID := mcontext.GetOpUserID(ctx)
isAdmin := datautil.Contain(callerID, s.config.Share.IMAdminUserID...)
switch dbUser.PhoneVisibility {
case tablerelation.PhoneVisibilityHidden:
// 完全隐藏:非管理员无法通过手机号搜到该用户
if !isAdmin {
return &pbuser.GetUserByPhoneResp{}, nil
}
case tablerelation.PhoneVisibilityFriends:
// 仅好友可搜索
if !isAdmin && callerID != dbUser.UserID {
isFriend, err := s.relationClient.IsFriend(ctx, callerID, dbUser.UserID)
if err != nil {
log.ZError(ctx, "GetUserByPhone: IsFriend failed", err,
"callerID", callerID, "targetUserID", dbUser.UserID)
return nil, err
}
if !isFriend {
return &pbuser.GetUserByPhoneResp{}, nil
}
}
}
viewerID := mcontext.GetOpUserID(ctx)
// MsgReceiveSetting=2 表示不接受任何人消息,对非本人搜索者隐藏该用户
if dbUser.MsgReceiveSetting == tablerelation.MsgReceiveSettingNobody && viewerID != dbUser.UserID {
return &pbuser.GetUserByPhoneResp{}, nil
}
pbUser := convert.UserDB2Pb(dbUser)
// 搜索者已知手机号(主动输入),仍对返回的资料字段应用可见性规则
if err := s.applyPhoneVisibility(ctx, viewerID, []*sdkws.UserInfo{pbUser}, []*tablerelation.User{dbUser}); err != nil {
log.ZError(ctx, "GetUserByPhone: applyPhoneVisibility failed", err,
"opUserID", viewerID, "targetUserID", dbUser.UserID)
return nil, err
}
return &pbuser.GetUserByPhoneResp{UserInfo: pbUser}, nil
}
// GetUsersByNickname 按昵称精确匹配查询普通用户app_manger_level 与分页拉取用户一致)。
// 全局黑名单用户会被过滤;手机号字段按 phone_visibility 与 getDesignateUsers 相同规则处理。
// MsgReceiveSetting=2 的用户对非本人搜索者不可见。
func (s *userServer) GetUsersByNickname(ctx context.Context, req *pbuser.GetUsersByNicknameReq) (*pbuser.GetUsersByNicknameResp, error) {
nickname := strings.TrimSpace(req.Nickname)
if nickname == "" {
@ -497,8 +618,23 @@ func (s *userServer) GetUsersByNickname(ctx context.Context, req *pbuser.GetUser
return &pbuser.GetUsersByNicknameResp{}, nil
}
pbUsers := convert.UsersDB2Pb(users)
// 过滤掉 MsgReceiveSetting=2不接受任何人消息的用户本人除外
viewerID := mcontext.GetOpUserID(ctx)
{
visible := make([]*tablerelation.User, 0, len(users))
for _, u := range users {
if u.MsgReceiveSetting == tablerelation.MsgReceiveSettingNobody && viewerID != u.UserID {
continue
}
visible = append(visible, u)
}
users = visible
}
if len(users) == 0 {
return &pbuser.GetUsersByNicknameResp{}, nil
}
pbUsers := convert.UsersDB2Pb(users)
if err := s.applyPhoneVisibility(ctx, viewerID, pbUsers, users); err != nil {
log.ZError(ctx, "GetUsersByNickname: applyPhoneVisibility failed", err,
"opUserID", viewerID, "count", len(users))
@ -590,6 +726,7 @@ func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterR
now := time.Now()
users := make([]*tablerelation.User, 0, len(req.Users))
for _, user := range req.Users {
fullName := convert.BuildFullName(user.FirstName, user.LastName)
users = append(users, &tablerelation.User{
UserID: user.UserID,
Nickname: user.Nickname,
@ -598,6 +735,11 @@ func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterR
CreateTime: now,
AppMangerLevel: user.AppMangerLevel,
GlobalRecvMsgOpt: user.GlobalRecvMsgOpt,
FirstName: user.FirstName,
LastName: user.LastName,
FullName: fullName,
Phone: user.Phone,
AreaCode: user.AreaCode,
})
}
if err := s.db.Create(ctx, users); err != nil {

@ -0,0 +1,55 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tools
import (
"fmt"
"os"
"time"
pbconversation "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
)
// clearBurnExpiredMsgs 阅后即焚 cron 入口:循环调用 conversation 服务的
// ClearBurnExpiredMsgs每次至多处理 burnLimit 个 (user, conversation) 分组,
// 直至本轮没有新的过期分组或达到防御性的最大循环次数。
func (c *cronServer) clearBurnExpiredMsgs() {
now := time.Now()
operationID := fmt.Sprintf("cron_burn_msg_%d_%d", os.Getpid(), now.UnixMilli())
ctx := mcontext.SetOperationID(c.ctx, operationID)
log.ZDebug(ctx, "clear burn expired msgs cron start")
const (
maxLoop = 10000
burnLimit = 100
)
var count int
for i := 1; i <= maxLoop; i++ {
resp, err := c.conversationClient.ClearBurnExpiredMsgs(ctx, &pbconversation.ClearBurnExpiredMsgsReq{
Timestamp: now.UnixMilli(),
Limit: burnLimit,
})
if err != nil {
log.ZError(ctx, "ClearBurnExpiredMsgs failed.", err)
return
}
count += int(resp.Count)
if resp.Count < burnLimit {
break
}
}
log.ZDebug(ctx, "clear burn expired msgs cron completed", "cost", time.Since(now), "count", count)
}

@ -0,0 +1,23 @@
package tools
import (
"context"
"github.com/openimsdk/protocol/auth"
"github.com/openimsdk/tools/log"
)
// fetchChatAdminToken 通过 IM auth-rpc GetAdminToken 获取管理员 token。
// 使用 config.Share.Secret 和第一个 IMAdminUserID 作为凭据。
func (c *cronServer) fetchChatAdminToken(ctx context.Context) (string, error) {
userID := c.config.Share.IMAdminUserID[0]
resp, err := c.authClient.GetAdminToken(ctx, &auth.GetAdminTokenReq{
Secret: c.config.Share.Secret,
UserID: userID,
})
if err != nil {
return "", err
}
log.ZDebug(ctx, "fetchChatAdminToken: ok", "userID", userID)
return resp.Token, nil
}

@ -19,9 +19,13 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
pbconversation "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/third"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/mw"
@ -34,9 +38,10 @@ import (
)
type CronTaskConfig struct {
CronTask config.CronTask
Share config.Share
Discovery config.Discovery
CronTask config.CronTask
Share config.Share
Discovery config.Discovery
MongodbConfig config.Mongo
}
func Start(ctx context.Context, config *CronTaskConfig) error {
@ -55,24 +60,40 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
if err != nil {
return err
}
thirdConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third)
if err != nil {
return err
}
conversationConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Conversation)
if err != nil {
return err
}
authConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Auth)
if err != nil {
return err
}
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
if err != nil {
return errs.WrapMsg(err, "crontask: connect mongodb failed")
}
db := mgocli.GetDB()
userOfflineRecordDB, err := mgo.NewUserOfflineRecordMongo(db)
if err != nil {
return errs.WrapMsg(err, "crontask: init user_offline_record collection failed")
}
srv := &cronServer{
ctx: ctx,
config: config,
cron: cron.New(),
msgClient: msg.NewMsgClient(msgConn),
conversationClient: pbconversation.NewConversationClient(conversationConn),
thirdClient: third.NewThirdClient(thirdConn),
ctx: ctx,
config: config,
cron: cron.New(),
msgClient: msg.NewMsgClient(msgConn),
conversationClient: pbconversation.NewConversationClient(conversationConn),
thirdClient: third.NewThirdClient(thirdConn),
authClient: rpcli.NewAuthClient(authConn),
userOfflineRecordDB: userOfflineRecordDB,
chatAPIAddress: config.CronTask.ChatAPI.Address,
}
if err := srv.registerClearS3(); err != nil {
@ -84,6 +105,12 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
if err := srv.registerClearUserMsg(); err != nil {
return err
}
if err := srv.registerClearBurnExpiredMsgs(); err != nil {
return err
}
if err := srv.registerDeleteExpiredOfflineUsers(); err != nil {
return err
}
log.ZDebug(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime)
srv.cron.Start()
<-ctx.Done()
@ -91,12 +118,15 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
}
type cronServer struct {
ctx context.Context
config *CronTaskConfig
cron *cron.Cron
msgClient msg.MsgClient
conversationClient pbconversation.ConversationClient
thirdClient third.ThirdClient
ctx context.Context
config *CronTaskConfig
cron *cron.Cron
msgClient msg.MsgClient
conversationClient pbconversation.ConversationClient
thirdClient third.ThirdClient
authClient *rpcli.AuthClient
userOfflineRecordDB database.UserOfflineRecord
chatAPIAddress string
}
func (c *cronServer) registerClearS3() error {
@ -121,3 +151,20 @@ func (c *cronServer) registerClearUserMsg() error {
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearUserMsg)
return errs.WrapMsg(err, "failed to register clear user msg cron task")
}
func (c *cronServer) registerClearBurnExpiredMsgs() error {
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearBurnExpiredMsgs)
return errs.WrapMsg(err, "failed to register clear burn expired msgs cron task")
}
// registerDeleteExpiredOfflineUsers 注册每小时执行一次的用户自动删除任务。
// 固定使用 "@hourly" 表达式,与其他任务使用的 CronExecuteTime 独立。
// chatAPI.address 未配置时跳过注册。
func (c *cronServer) registerDeleteExpiredOfflineUsers() error {
if c.chatAPIAddress == "" {
log.ZInfo(c.ctx, "disable auto delete expired offline users: chatAPI.address not configured")
return nil
}
_, err := c.cron.AddFunc("@hourly", c.deleteExpiredOfflineUsers)
return errs.WrapMsg(err, "failed to register delete expired offline users cron task")
}

@ -0,0 +1,95 @@
package tools
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"time"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
)
const deleteExpiredUserBatchLimit = 100
// chatHTTPClient 带超时,防止 chat 服务无响应时 cron worker 永久挂起。
var chatHTTPClient = &http.Client{Timeout: 3 * time.Second}
// deleteExpiredOfflineUsers 是 cron "@hourly" 触发的入口。
// 批量查询离线时长超过 delete_account_interval 的用户并依次调用 chat /account/del 删除。
func (c *cronServer) deleteExpiredOfflineUsers() {
now := time.Now()
operationID := fmt.Sprintf("cron_del_expired_user_%d_%d", os.Getpid(), now.UnixMilli())
ctx := mcontext.SetOperationID(c.ctx, operationID)
log.ZInfo(ctx, "deleteExpiredOfflineUsers: start", "time", now)
users, err := c.userOfflineRecordDB.FindExpiredUsers(ctx, now, deleteExpiredUserBatchLimit)
if err != nil {
log.ZError(ctx, "deleteExpiredOfflineUsers: FindExpiredUsers failed", err)
return
}
if len(users) == 0 {
log.ZDebug(ctx, "deleteExpiredOfflineUsers: no expired users found")
return
}
log.ZInfo(ctx, "deleteExpiredOfflineUsers: found expired users", "count", len(users))
adminToken, err := c.fetchChatAdminToken(ctx)
if err != nil {
log.ZError(ctx, "deleteExpiredOfflineUsers: fetchChatAdminToken failed", err)
return
}
for i, u := range users {
subCtx := mcontext.SetOperationID(c.ctx, fmt.Sprintf("%s_%d", operationID, i))
c.deleteExpiredUser(subCtx, adminToken, u.UserID)
}
log.ZInfo(ctx, "deleteExpiredOfflineUsers: done", "count", len(users), "elapsed", time.Since(now))
}
// deleteExpiredUser 通过 chat HTTP API POST /account/del 删除单个过期用户。
// chat 服务端会处理:强制登出、删除好友/群组关系、清理 chat 账号数据等。
// adminToken 为当次批次开始时通过 admin-api /account/login 获取的管理员 token。
func (c *cronServer) deleteExpiredUser(ctx context.Context, adminToken, userID string) {
log.ZInfo(ctx, "deleteExpiredUser: start", "userID", userID)
operationID := mcontext.GetOperationID(ctx)
body, _ := json.Marshal(map[string]any{"userIDs": []string{userID}})
url := c.chatAPIAddress + "/account/del"
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
log.ZError(ctx, "deleteExpiredUser: build request failed", err, "userID", userID)
return
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("token", adminToken)
req.Header.Set("operationID", operationID)
resp, err := chatHTTPClient.Do(req)
if err != nil {
log.ZError(ctx, "deleteExpiredUser: HTTP call failed", err, "userID", userID, "url", url)
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
var result map[string]any
_ = json.NewDecoder(resp.Body).Decode(&result)
log.ZError(ctx, "deleteExpiredUser: chat API returned error",
fmt.Errorf("status %d", resp.StatusCode),
"userID", userID, "response", result)
return
}
// chat /account/del 已处理好友/群组/IM用户删除仅清理 user_offline_record 防止重复触发
if err := c.userOfflineRecordDB.Delete(ctx, userID); err != nil {
log.ZWarn(ctx, "deleteExpiredUser: Delete offline record failed", err, "userID", userID)
}
log.ZInfo(ctx, "deleteExpiredUser: done", "userID", userID)
}

@ -33,10 +33,16 @@ type CronTaskCmd struct {
func NewCronTaskCmd() *CronTaskCmd {
var cronTaskConfig tools.CronTaskConfig
ret := &CronTaskCmd{cronTaskConfig: &cronTaskConfig}
// ChatAPI 配置内嵌在 openim-crontask.yml 的 chatAPI 字段中,无需单独文件。
// 示例:
// chatAPI:
// address: "http://127.0.0.1:10008"
// adminToken 由 crontask 通过 IM auth-rpc GetAdminToken 自动获取,无需额外配置。
ret.configMap = map[string]any{
OpenIMCronTaskCfgFileName: &cronTaskConfig.CronTask,
ShareFileName: &cronTaskConfig.Share,
DiscoveryConfigFilename: &cronTaskConfig.Discovery,
MongodbConfigFileName: &cronTaskConfig.MongodbConfig,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", version.Version)

@ -119,6 +119,8 @@ type CronTask struct {
RetainChatRecords int `mapstructure:"retainChatRecords"`
FileExpireTime int `mapstructure:"fileExpireTime"`
DeleteObjectType []string `mapstructure:"deleteObjectType"`
// ChatAPI 是 chat HTTP API 服务的访问配置,用于调用 /account/del 等需要管理员权限的接口。
ChatAPI ChatAPI `mapstructure:"chatAPI"`
}
type OfflinePushConfig struct {
@ -404,6 +406,13 @@ type Share struct {
RPCMaxBodySize MaxRequestBody `mapstructure:"rpcMaxBodySize"`
}
// ChatAPI 是 chat HTTP API 服务的访问配置。
// Address 为 chat-api 根地址(如 http://127.0.0.1:10008用于 POST /account/del。
// token 由 crontask 通过 IM auth-rpc GetAdminToken 自动获取,无需手动填写。
type ChatAPI struct {
Address string `mapstructure:"address"`
}
type MaxRequestBody struct {
RequestMaxBodySize int `mapstructure:"requestMaxBodySize"`
ResponseMaxBodySize int `mapstructure:"responseMaxBodySize"`

@ -78,6 +78,8 @@ func FriendsDB2Pb(ctx context.Context, friendsDB []*model.Friend, getUsers func(
friendPb.FriendUser.Nickname = users[friend.FriendUserID].Nickname
friendPb.FriendUser.FaceURL = users[friend.FriendUserID].FaceURL
friendPb.FriendUser.Ex = users[friend.FriendUserID].Ex
friendPb.FriendUser.FirstName = users[friend.FriendUserID].FirstName
friendPb.FriendUser.LastName = users[friend.FriendUserID].LastName
friendPb.CreateTime = friend.CreateTime.Unix()
friendPb.IsPinned = friend.IsPinned
friendPb.IsMute = friend.IsMuted
@ -88,9 +90,9 @@ func FriendsDB2Pb(ctx context.Context, friendsDB []*model.Friend, getUsers func(
return friendsPb, nil
}
func FriendOnlyDB2PbOnly(friendsDB []*model.Friend) []*relation.FriendInfoOnly {
func FriendOnlyDB2PbOnly(friendsDB []*model.Friend, users map[string]*sdkws.UserInfo) []*relation.FriendInfoOnly {
return datautil.Slice(friendsDB, func(f *model.Friend) *relation.FriendInfoOnly {
return &relation.FriendInfoOnly{
info := &relation.FriendInfoOnly{
OwnerUserID: f.OwnerUserID,
FriendUserID: f.FriendUserID,
Remark: f.Remark,
@ -103,6 +105,11 @@ func FriendOnlyDB2PbOnly(friendsDB []*model.Friend) []*relation.FriendInfoOnly {
MuteDuration: f.MuteDuration,
MuteEndTime: f.MuteEndTime,
}
if u, ok := users[f.FriendUserID]; ok {
info.FirstName = u.FirstName
info.LastName = u.LastName
}
return info
})
}

@ -15,26 +15,44 @@
package convert
import (
"strings"
"time"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/utils/datautil"
"time"
"github.com/openimsdk/protocol/sdkws"
)
func BuildFullName(firstName, lastName string) string {
if firstName == "" {
return lastName
}
if lastName == "" {
return firstName
}
return strings.TrimSpace(firstName + " " + lastName)
}
func UserDB2Pb(user *relationtb.User) *sdkws.UserInfo {
return &sdkws.UserInfo{
UserID: user.UserID,
Nickname: user.Nickname,
FaceURL: user.FaceURL,
Ex: user.Ex,
CreateTime: user.CreateTime.UnixMilli(),
AppMangerLevel: user.AppMangerLevel,
GlobalRecvMsgOpt: user.GlobalRecvMsgOpt,
UserID: user.UserID,
Nickname: user.Nickname,
FaceURL: user.FaceURL,
Ex: user.Ex,
CreateTime: user.CreateTime.UnixMilli(),
AppMangerLevel: user.AppMangerLevel,
GlobalRecvMsgOpt: user.GlobalRecvMsgOpt,
FirstName: user.FirstName,
LastName: user.LastName,
Phone: user.Phone,
AreaCode: user.AreaCode,
PhoneVisibility: user.PhoneVisibility,
CallAcceptSetting: user.CallAcceptSetting,
MsgReceiveSetting: user.MsgReceiveSetting,
CallAcceptSetting: user.CallAcceptSetting,
MsgReceiveSetting: user.MsgReceiveSetting,
GroupInviteSetting: user.GroupInviteSetting,
CallRingtoneURL: user.CallRingtoneURL,
MsgBurnDuration: user.MsgBurnDuration,
}
}
@ -43,14 +61,20 @@ func UsersDB2Pb(users []*relationtb.User) []*sdkws.UserInfo {
}
func UserPb2DB(user *sdkws.UserInfo) *relationtb.User {
fullName := BuildFullName(user.FirstName, user.LastName)
return &relationtb.User{
UserID: user.UserID,
Nickname: user.Nickname,
FaceURL: user.FaceURL,
Ex: user.Ex,
CreateTime: time.UnixMilli(user.CreateTime),
AppMangerLevel: user.AppMangerLevel,
UserID: user.UserID,
Nickname: user.Nickname,
FaceURL: user.FaceURL,
Ex: user.Ex,
CreateTime: time.UnixMilli(user.CreateTime),
AppMangerLevel: user.AppMangerLevel,
GlobalRecvMsgOpt: user.GlobalRecvMsgOpt,
FirstName: user.FirstName,
LastName: user.LastName,
FullName: fullName,
AreaCode: user.AreaCode,
CallRingtoneURL: user.CallRingtoneURL,
}
}
@ -63,8 +87,13 @@ func UserPb2DBMap(user *sdkws.UserInfo) map[string]any {
"nickname": user.Nickname,
"face_url": user.FaceURL,
"ex": user.Ex,
"first_name": user.FirstName,
"last_name": user.LastName,
"area_code": user.AreaCode,
"app_manager_level": user.AppMangerLevel,
"global_recv_msg_opt": user.GlobalRecvMsgOpt,
"call_ringtone_url": user.CallRingtoneURL,
"msg_burn_duration": user.MsgBurnDuration,
}
for key, value := range fields {
if v, ok := value.(string); ok && v != "" {
@ -73,6 +102,10 @@ func UserPb2DBMap(user *sdkws.UserInfo) map[string]any {
val[key] = v
}
}
if user.FirstName != "" || user.LastName != "" {
fullName := BuildFullName(user.FirstName, user.LastName)
val["full_name"] = fullName
}
return val
}
func UserPb2DBMapEx(user *sdkws.UserInfoWithEx) map[string]any {
@ -91,12 +124,32 @@ func UserPb2DBMapEx(user *sdkws.UserInfoWithEx) map[string]any {
if user.Ex != nil {
val["ex"] = user.Ex.Value
}
if user.FirstName != nil {
val["first_name"] = user.FirstName.Value
}
if user.LastName != nil {
val["last_name"] = user.LastName.Value
}
if user.FirstName != nil || user.LastName != nil {
firstName := ""
lastName := ""
if user.FirstName != nil {
firstName = user.FirstName.Value
}
if user.LastName != nil {
lastName = user.LastName.Value
}
val["full_name"] = BuildFullName(firstName, lastName)
}
if user.GlobalRecvMsgOpt != nil {
val["global_recv_msg_opt"] = user.GlobalRecvMsgOpt.Value
}
if user.Phone != nil {
val["phone"] = user.Phone.Value
}
if user.AreaCode != nil {
val["area_code"] = user.AreaCode.Value
}
if user.PhoneVisibility != nil {
val["phone_visibility"] = user.PhoneVisibility.Value
}
@ -106,6 +159,14 @@ func UserPb2DBMapEx(user *sdkws.UserInfoWithEx) map[string]any {
if user.MsgReceiveSetting != nil {
val["msg_receive_setting"] = user.MsgReceiveSetting.Value
}
if user.GroupInviteSetting != nil {
val["group_invite_setting"] = user.GroupInviteSetting.Value
}
if user.CallRingtoneURL != nil {
val["call_ringtone_url"] = user.CallRingtoneURL.Value
}
if user.MsgBurnDuration != nil {
val["msg_burn_duration"] = user.MsgBurnDuration.Value
}
return val
}

@ -95,7 +95,7 @@ type FriendDatabase interface {
// BecomeOnewayFriend inserts a single-side friendship: ownerUserID -> friendUserID.
// The reverse side (friendUserID -> ownerUserID) is NOT created.
BecomeOnewayFriend(ctx context.Context, ownerUserID, friendUserID string, addSource int32) error
BecomeOnewayFriend(ctx context.Context, ownerUserID, friendUserID string, addSource int32, remark string) error
}
type friendDatabase struct {
@ -415,7 +415,7 @@ func (f *friendDatabase) GetPinnedFriendIDs(ctx context.Context, ownerUserID str
// BecomeOnewayFriend creates only the ownerUserID->friendUserID side of the friendship.
// The reverse side is intentionally omitted so that the target user is not aware of being added.
func (f *friendDatabase) BecomeOnewayFriend(ctx context.Context, ownerUserID, friendUserID string, addSource int32) error {
func (f *friendDatabase) BecomeOnewayFriend(ctx context.Context, ownerUserID, friendUserID string, addSource int32, remark string) error {
return f.tx.Transaction(ctx, func(ctx context.Context) error {
existing, err := f.friend.FindFriends(ctx, ownerUserID, []string{friendUserID})
if err != nil {
@ -427,7 +427,7 @@ func (f *friendDatabase) BecomeOnewayFriend(ctx context.Context, ownerUserID, fr
}
opUserID := mcontext.GetOpUserID(ctx)
if err := f.friend.Create(ctx, []*model.Friend{
{OwnerUserID: ownerUserID, FriendUserID: friendUserID, AddSource: addSource, OperatorUserID: opUserID},
{OwnerUserID: ownerUserID, FriendUserID: friendUserID, AddSource: addSource, OperatorUserID: opUserID, Remark: remark},
}); err != nil {
return err
}

@ -126,6 +126,11 @@ type GroupDatabase interface {
FindJoinGroupID(ctx context.Context, userID string) ([]string, error)
GetGroupApplicationUnhandledCount(ctx context.Context, groupIDs []string, ts int64) (int64, error)
// 群置顶消息:保留最近 N 条
PinGroupMessage(ctx context.Context, groupID string, msg *model.GroupPinnedMessage) ([]*model.GroupPinnedMessage, error)
UnpinGroupMessage(ctx context.Context, groupID string, pinID string, seq int64) ([]*model.GroupPinnedMessage, error)
GetGroupPinnedMessages(ctx context.Context, groupID string) ([]*model.GroupPinnedMessage, error)
}
func NewGroupDatabase(
@ -134,24 +139,39 @@ func NewGroupDatabase(
groupDB database.Group,
groupMemberDB database.GroupMember,
groupRequestDB database.GroupRequest,
groupPinnedMsgDB database.GroupPinnedMsg,
ctxTx tx.Tx,
groupHash cache.GroupHash,
) GroupDatabase {
return &groupDatabase{
groupDB: groupDB,
groupMemberDB: groupMemberDB,
groupRequestDB: groupRequestDB,
ctxTx: ctxTx,
cache: redis2.NewGroupCacheRedis(rdb, localCache, groupDB, groupMemberDB, groupRequestDB, groupHash, redis2.GetRocksCacheOptions()),
groupDB: groupDB,
groupMemberDB: groupMemberDB,
groupRequestDB: groupRequestDB,
groupPinnedMsgDB: groupPinnedMsgDB,
ctxTx: ctxTx,
cache: redis2.NewGroupCacheRedis(rdb, localCache, groupDB, groupMemberDB, groupRequestDB, groupHash, redis2.GetRocksCacheOptions()),
}
}
type groupDatabase struct {
groupDB database.Group
groupMemberDB database.GroupMember
groupRequestDB database.GroupRequest
ctxTx tx.Tx
cache cache.GroupCache
groupDB database.Group
groupMemberDB database.GroupMember
groupRequestDB database.GroupRequest
groupPinnedMsgDB database.GroupPinnedMsg
ctxTx tx.Tx
cache cache.GroupCache
}
func (g *groupDatabase) PinGroupMessage(ctx context.Context, groupID string, msg *model.GroupPinnedMessage) ([]*model.GroupPinnedMessage, error) {
return g.groupPinnedMsgDB.Pin(ctx, groupID, msg)
}
func (g *groupDatabase) UnpinGroupMessage(ctx context.Context, groupID string, pinID string, seq int64) ([]*model.GroupPinnedMessage, error) {
return g.groupPinnedMsgDB.Unpin(ctx, groupID, pinID, seq)
}
func (g *groupDatabase) GetGroupPinnedMessages(ctx context.Context, groupID string) ([]*model.GroupPinnedMessage, error) {
return g.groupPinnedMsgDB.Get(ctx, groupID)
}
func (g *groupDatabase) FindJoinGroupID(ctx context.Context, userID string) ([]string, error) {

@ -31,6 +31,8 @@ type RtcDatabase interface {
RemoveInvitee(ctx context.Context, roomID string, userID string) error
GetInvitationByGroupID(ctx context.Context, groupID string) (*model.SignalInvitation, error)
GetInvitationsByRoomIDs(ctx context.Context, roomIDs []string) ([]*model.SignalInvitation, error)
// GetBusyUserIDs returns the subset of userIDs that are currently in an active call.
GetBusyUserIDs(ctx context.Context, userIDs []string) ([]string, error)
CreateRecord(ctx context.Context, record *model.SignalRecord) error
SearchRecords(ctx context.Context, sendID, recvID string, sessionType int32, startTime, endTime int64, pagination pagination.Pagination) (int64, []*model.SignalRecord, error)
@ -73,6 +75,10 @@ func (r *rtcDatabase) GetInvitationsByRoomIDs(ctx context.Context, roomIDs []str
return r.db.GetInvitationsByRoomIDs(ctx, roomIDs)
}
func (r *rtcDatabase) GetBusyUserIDs(ctx context.Context, userIDs []string) ([]string, error) {
return r.db.GetBusyUserIDs(ctx, userIDs)
}
func (r *rtcDatabase) CreateRecord(ctx context.Context, record *model.SignalRecord) error {
return r.db.CreateRecord(ctx, record)
}

@ -69,6 +69,9 @@ type UserDatabase interface {
SortQuery(ctx context.Context, userIDName map[string]string, asc bool) ([]*model.User, error)
// Delete permanently removes users and invalidates their cache entries.
Delete(ctx context.Context, userIDs []string) error
// CRUD user command
AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string, ex string) error
DeleteUserCommand(ctx context.Context, userID string, Type int32, UUID string) error
@ -222,6 +225,18 @@ func (u *userDatabase) SortQuery(ctx context.Context, userIDName map[string]stri
return u.userDB.SortQuery(ctx, userIDName, asc)
}
func (u *userDatabase) Delete(ctx context.Context, userIDs []string) error {
if len(userIDs) == 0 {
return nil
}
return u.tx.Transaction(ctx, func(ctx context.Context) error {
if err := u.userDB.Delete(ctx, userIDs); err != nil {
return err
}
return u.cache.DelUsersInfo(userIDs...).ChainExecDel(ctx)
})
}
func (u *userDatabase) AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string, ex string) error {
return u.userDB.AddUserCommand(ctx, userID, Type, UUID, value, ex)
}

@ -14,8 +14,10 @@ type UserGlobalBlackDatabase interface {
AddBlack(ctx context.Context, blacks []*model.UserGlobalBlack) error
// RemoveBlack 按 userID 将用户从全局黑名单移除
RemoveBlack(ctx context.Context, userIDs []string) error
// IsBlocked 检查用户是否在全局黑名单
// IsBlocked 检查用户是否在全局黑名单(含冻结)
IsBlocked(ctx context.Context, userID string) (bool, error)
// GetStatus 返回用户限制状态0=正常1=冻结2=黑名单
GetStatus(ctx context.Context, userID string) (int32, error)
// FindBlocked 批量查询哪些 userID 在全局黑名单中,返回被封禁的记录
FindBlocked(ctx context.Context, userIDs []string) ([]*model.UserGlobalBlack, error)
// GetBlackList 分页获取黑名单列表
@ -42,6 +44,10 @@ func (u *userGlobalBlackDatabase) IsBlocked(ctx context.Context, userID string)
return u.db.IsBlocked(ctx, userID)
}
func (u *userGlobalBlackDatabase) GetStatus(ctx context.Context, userID string) (int32, error) {
return u.db.GetStatus(ctx, userID)
}
func (u *userGlobalBlackDatabase) GetBlackList(ctx context.Context, pagination pagination.Pagination) (int64, []*model.UserGlobalBlack, error) {
return u.db.Page(ctx, pagination)
}

@ -0,0 +1,22 @@
// Copyright © 2026 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
package database
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
)
// GroupPinnedMsg 群置顶消息的存储抽象
type GroupPinnedMsg interface {
// Pin 置顶一条消息:若 PinID 为空会自动生成;自动滚动保留最近 N 条
Pin(ctx context.Context, groupID string, msg *model.GroupPinnedMessage) ([]*model.GroupPinnedMessage, error)
// Unpin 取消置顶pinID 非空时按 pinID 精确删除,否则按 seq 删除
Unpin(ctx context.Context, groupID string, pinID string, seq int64) ([]*model.GroupPinnedMessage, error)
// Get 获取群置顶消息列表(最新的在前)
Get(ctx context.Context, groupID string) ([]*model.GroupPinnedMessage, error)
}

@ -0,0 +1,115 @@
// Copyright © 2026 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
package mgo
import (
"context"
"errors"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/errs"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func NewGroupPinnedMsgMongo(db *mongo.Database) (database.GroupPinnedMsg, error) {
coll := db.Collection(database.GroupPinnedMsgName)
_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
Keys: bson.D{{Key: "group_id", Value: 1}},
Options: options.Index().SetUnique(true),
})
if err != nil {
return nil, errs.Wrap(err)
}
return &groupPinnedMsgMgo{coll: coll}, nil
}
type groupPinnedMsgMgo struct {
coll *mongo.Collection
}
func (g *groupPinnedMsgMgo) get(ctx context.Context, groupID string) (*model.GroupPinnedMsg, error) {
doc, err := mongoutil.FindOne[*model.GroupPinnedMsg](ctx, g.coll, bson.M{"group_id": groupID})
if err != nil {
if errs.ErrRecordNotFound.Is(err) || errors.Is(err, mongo.ErrNoDocuments) {
return &model.GroupPinnedMsg{GroupID: groupID}, nil
}
return nil, err
}
return doc, nil
}
func (g *groupPinnedMsgMgo) Get(ctx context.Context, groupID string) ([]*model.GroupPinnedMessage, error) {
doc, err := g.get(ctx, groupID)
if err != nil {
return nil, err
}
return doc.PinnedMsgs, nil
}
// Pin 置顶一条消息:
// - 若提供的 msg.PinID 为空,则自动生成 ObjectID().Hex()
// - 同 seq 的旧记录会被先移除避免重复
// - 新记录 push 到数组首位,自动滚动保留最近 GroupPinnedMsgMaxKeep 条
func (g *groupPinnedMsgMgo) Pin(ctx context.Context, groupID string, msg *model.GroupPinnedMessage) ([]*model.GroupPinnedMessage, error) {
if msg == nil {
return nil, errs.ErrArgs.WrapMsg("pin msg is nil")
}
if msg.PinID == "" {
msg.PinID = primitive.NewObjectID().Hex()
}
msg.GroupID = groupID
if _, err := mongoutil.UpdateOneResult(ctx, g.coll,
bson.M{"group_id": groupID},
bson.M{"$pull": bson.M{"pinned_msgs": bson.M{"seq": msg.Seq}}},
); err != nil {
return nil, err
}
filter := bson.M{"group_id": groupID}
update := bson.M{
"$push": bson.M{
"pinned_msgs": bson.M{
"$each": bson.A{msg},
"$position": 0,
"$slice": model.GroupPinnedMsgMaxKeep,
},
},
"$setOnInsert": bson.M{"group_id": groupID},
}
opts := options.Update().SetUpsert(true)
if _, err := g.coll.UpdateOne(ctx, filter, update, opts); err != nil {
return nil, errs.Wrap(err)
}
return g.Get(ctx, groupID)
}
// Unpin 取消置顶:
// - pinID 非空时按 pinID 精确删除(推荐)
// - 否则按 seq 删除
// 返回更新后的置顶列表(可能为空数组)
func (g *groupPinnedMsgMgo) Unpin(ctx context.Context, groupID string, pinID string, seq int64) ([]*model.GroupPinnedMessage, error) {
if pinID == "" && seq <= 0 {
return nil, errs.ErrArgs.WrapMsg("either pinID or seq must be provided")
}
pull := bson.M{}
if pinID != "" {
pull["pin_id"] = pinID
} else {
pull["seq"] = seq
}
if _, err := mongoutil.UpdateOneResult(ctx, g.coll,
bson.M{"group_id": groupID},
bson.M{"$pull": bson.M{"pinned_msgs": pull}},
); err != nil {
return nil, err
}
return g.Get(ctx, groupID)
}

@ -0,0 +1,134 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mgo
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/errs"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func NewMsgBurnDeadlineMongo(db *mongo.Database) (database.MsgBurnDeadline, error) {
coll := db.Collection(database.MsgBurnDeadlineName)
_, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{
Keys: bson.D{
{Key: "user_id", Value: 1},
{Key: "conversation_id", Value: 1},
{Key: "seq", Value: 1},
},
Options: options.Index().SetUnique(true),
},
{
Keys: bson.D{{Key: "deadline_ms", Value: 1}},
},
})
if err != nil {
return nil, errs.Wrap(err)
}
return &msgBurnDeadlineMgo{coll: coll}, nil
}
type msgBurnDeadlineMgo struct {
coll *mongo.Collection
}
func (m *msgBurnDeadlineMgo) UpsertIfAbsent(ctx context.Context, items []*model.MsgBurnDeadline) error {
if len(items) == 0 {
return nil
}
models := make([]mongo.WriteModel, 0, len(items))
for _, item := range items {
filter := bson.M{
"user_id": item.UserID,
"conversation_id": item.ConversationID,
"seq": item.Seq,
}
setOnInsert := bson.M{
"user_id": item.UserID,
"conversation_id": item.ConversationID,
"seq": item.Seq,
"deadline_ms": item.DeadlineMs,
"create_time": item.CreateTime,
}
models = append(models,
mongo.NewUpdateOneModel().
SetFilter(filter).
SetUpdate(bson.M{"$setOnInsert": setOnInsert}).
SetUpsert(true),
)
}
_, err := m.coll.BulkWrite(ctx, models, options.BulkWrite().SetOrdered(false))
return errs.Wrap(err)
}
func (m *msgBurnDeadlineMgo) FindExpiredGroups(ctx context.Context, nowMs int64, limit int) ([]*database.ExpiredBurnGroup, error) {
if limit <= 0 {
return nil, nil
}
pipeline := mongo.Pipeline{
bson.D{{Key: "$match", Value: bson.M{"deadline_ms": bson.M{"$lte": nowMs}}}},
bson.D{{Key: "$group", Value: bson.M{
"_id": bson.M{
"user_id": "$user_id",
"conversation_id": "$conversation_id",
},
"max_seq": bson.M{"$max": "$seq"},
"seqs": bson.M{"$push": "$seq"},
}}},
bson.D{{Key: "$limit", Value: int64(limit)}},
}
type aggRow struct {
ID struct {
UserID string `bson:"user_id"`
ConversationID string `bson:"conversation_id"`
} `bson:"_id"`
MaxSeq int64 `bson:"max_seq"`
Seqs []int64 `bson:"seqs"`
}
rows, err := mongoutil.Aggregate[*aggRow](ctx, m.coll, pipeline)
if err != nil {
return nil, err
}
res := make([]*database.ExpiredBurnGroup, 0, len(rows))
for _, r := range rows {
res = append(res, &database.ExpiredBurnGroup{
UserID: r.ID.UserID,
ConversationID: r.ID.ConversationID,
MaxSeq: r.MaxSeq,
Seqs: r.Seqs,
})
}
return res, nil
}
func (m *msgBurnDeadlineMgo) DeleteByUserConversationSeqs(ctx context.Context, userID, conversationID string, seqs []int64) error {
if len(seqs) == 0 {
return nil
}
filter := bson.M{
"user_id": userID,
"conversation_id": conversationID,
"seq": bson.M{"$in": seqs},
}
_, err := m.coll.DeleteMany(ctx, filter)
return errs.Wrap(err)
}

@ -48,6 +48,14 @@ func (p *phoneSNMgo) GetByPhone(ctx context.Context, phone string) (*model.Phone
return doc, nil
}
func (p *phoneSNMgo) DeleteByPhone(ctx context.Context, phone string) error {
if phone == "" {
return nil
}
_, err := p.coll.DeleteOne(ctx, bson.M{"phone": phone})
return errs.Wrap(err)
}
func (p *phoneSNMgo) Upsert(ctx context.Context, phone string, userID int64, isSnd bool) error {
if phone == "" {
return errs.ErrArgs.WrapMsg("phone is empty")

@ -117,6 +117,44 @@ func (s *signalMgo) GetInvitationsByRoomIDs(ctx context.Context, roomIDs []strin
return mongoutil.Find[*model.SignalInvitation](ctx, s.invColl, bson.M{"room_id": bson.M{"$in": roomIDs}})
}
func (s *signalMgo) GetBusyUserIDs(ctx context.Context, userIDs []string) ([]string, error) {
if len(userIDs) == 0 {
return nil, nil
}
filter := bson.M{
"$or": bson.A{
bson.M{"inviter_user_id": bson.M{"$in": userIDs}},
bson.M{"invitee_user_id_list": bson.M{"$in": userIDs}},
},
}
invitations, err := mongoutil.Find[*model.SignalInvitation](ctx, s.invColl, filter,
options.Find().SetProjection(bson.M{"inviter_user_id": 1, "invitee_user_id_list": 1}),
)
if err != nil {
return nil, err
}
requested := make(map[string]struct{}, len(userIDs))
for _, uid := range userIDs {
requested[uid] = struct{}{}
}
busySet := make(map[string]struct{})
for _, inv := range invitations {
if _, ok := requested[inv.InviterUserID]; ok {
busySet[inv.InviterUserID] = struct{}{}
}
for _, uid := range inv.InviteeUserIDList {
if _, ok := requested[uid]; ok {
busySet[uid] = struct{}{}
}
}
}
busy := make([]string, 0, len(busySet))
for uid := range busySet {
busy = append(busy, uid)
}
return busy, nil
}
func (s *signalMgo) CreateRecord(ctx context.Context, record *model.SignalRecord) error {
return mongoutil.InsertMany(ctx, s.recColl, []*model.SignalRecord{record})
}

@ -16,9 +16,10 @@ package mgo
import (
"context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"time"
"github.com/openimsdk/protocol/user"
"github.com/openimsdk/tools/db/mongoutil"
@ -56,6 +57,11 @@ type UserMgo struct {
}
func (u *UserMgo) Create(ctx context.Context, users []*model.User) error {
for _, user := range users {
if user.DeleteAccountInterval == 0 {
user.DeleteAccountInterval = model.DefaultDeleteAccountIntervalSec
}
}
return mongoutil.InsertMany(ctx, u.coll, users)
}
@ -63,7 +69,38 @@ func (u *UserMgo) UpdateByMap(ctx context.Context, userID string, args map[strin
if len(args) == 0 {
return nil
}
return mongoutil.UpdateOne(ctx, u.coll, bson.M{"user_id": userID}, bson.M{"$set": args}, true)
filter := bson.M{"user_id": userID}
update := bson.M{"$set": args}
if err := mongoutil.UpdateOne(ctx, u.coll, filter, update, true); err != nil {
return err
}
// Keep user attributes in sync for consumers that read from the "attribute" collection.
// Only sync the allowed attribute fields.
attributeSet := make(map[string]any)
for _, key := range []string{
"nickname",
"first_name",
"last_name",
"full_name",
"remark",
"face_url",
"phone_number",
"area_code",
} {
if v, ok := args[key]; ok {
attributeSet[key] = v
}
}
//// user collection uses "phone"; attribute collection uses "phone_number".
if v, ok := args["phone"]; ok {
attributeSet["phone_number"] = v
}
if len(attributeSet) == 0 {
return nil
}
attributeColl := u.coll.Database().Collection("attribute")
return mongoutil.UpdateOne(ctx, attributeColl, filter, bson.M{"$set": attributeSet}, true)
}
func (u *UserMgo) Find(ctx context.Context, userIDs []string) (users []*model.User, err error) {
@ -343,6 +380,14 @@ func (u *UserMgo) CountRangeEverydayTotal(ctx context.Context, start time.Time,
return res, nil
}
func (u *UserMgo) Delete(ctx context.Context, userIDs []string) error {
if len(userIDs) == 0 {
return nil
}
_, err := u.coll.DeleteMany(ctx, bson.M{"user_id": bson.M{"$in": userIDs}})
return errs.Wrap(err)
}
func (u *UserMgo) SortQuery(ctx context.Context, userIDName map[string]string, asc bool) ([]*model.User, error) {
if len(userIDName) == 0 {
return nil, nil

@ -37,7 +37,7 @@ func (u *UserGlobalBlackMgo) Add(ctx context.Context, blacks []*model.UserGlobal
b.CreateTime = time.Now()
}
}
// 使用 upsert 避免重复插入报错
// 使用 upsert 避免重复插入报错status 也走 $set 以便升级/降级(冻结↔黑名单)时同步更新
for _, b := range blacks {
filter := bson.M{"user_id": b.UserID}
update := bson.M{
@ -45,6 +45,7 @@ func (u *UserGlobalBlackMgo) Add(ctx context.Context, blacks []*model.UserGlobal
"nickname": b.Nickname,
"operator_id": b.OperatorID,
"reason": b.Reason,
"status": b.Status,
},
"$setOnInsert": bson.M{
"user_id": b.UserID,
@ -59,6 +60,20 @@ func (u *UserGlobalBlackMgo) Add(ctx context.Context, blacks []*model.UserGlobal
return nil
}
// GetStatus 返回 userID 对应的限制状态:
// 0=正常无记录1=冻结2=黑名单
func (u *UserGlobalBlackMgo) GetStatus(ctx context.Context, userID string) (int32, error) {
var doc model.UserGlobalBlack
err := u.coll.FindOne(ctx, bson.M{"user_id": userID}, options.FindOne().SetProjection(bson.M{"status": 1})).Decode(&doc)
if err != nil {
if err == mongo.ErrNoDocuments {
return model.UserStatusNormal, nil
}
return model.UserStatusNormal, errs.Wrap(err)
}
return doc.Status, nil
}
func (u *UserGlobalBlackMgo) Remove(ctx context.Context, users []string) error {
if len(users) == 0 {
return nil

@ -0,0 +1,92 @@
package mgo
import (
"context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/errs"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func NewUserOfflineRecordMongo(db *mongo.Database) (database.UserOfflineRecord, error) {
coll := db.Collection(database.UserOfflineRecordName)
indexes := []mongo.IndexModel{
{
Keys: bson.D{{Key: "user_id", Value: 1}},
Options: options.Index().SetUnique(true),
},
{
Keys: bson.D{{Key: "delete_user_deadline", Value: 1}},
},
}
if _, err := coll.Indexes().CreateMany(context.Background(), indexes); err != nil {
return nil, errs.Wrap(err)
}
return &userOfflineRecordMgo{coll: coll}, nil
}
type userOfflineRecordMgo struct {
coll *mongo.Collection
}
// Upsert 写入用户的离线记录;若记录已存在则不覆盖($setOnInsert
// 保留最早一次的全离线时刻作为计时起点。
// deadline = offlineTime + delete_account_interval供范围查询快速定位过期账号。
func (u *userOfflineRecordMgo) Upsert(ctx context.Context, userID string, offlineTime, deadline time.Time) error {
filter := bson.M{"user_id": userID}
update := bson.M{
"$setOnInsert": bson.M{
"user_id": userID,
"offline_time": offlineTime,
"delete_user_deadline": deadline,
},
}
opt := options.Update().SetUpsert(true)
_, err := u.coll.UpdateOne(ctx, filter, update, opt)
return errs.Wrap(err)
}
// RefreshOfflineTime 将离线记录的 offline_time 与 delete_user_deadline 同时覆盖写为新值($set
// 仅更新已存在的记录;用户在线时(无记录)不做任何操作。
// 适用场景:用户修改 delete_account_interval让倒计时从设置时刻重新起算。
func (u *userOfflineRecordMgo) RefreshOfflineTime(ctx context.Context, userID string, newOfflineTime, newDeadline time.Time) error {
filter := bson.M{"user_id": userID}
update := bson.M{"$set": bson.M{
"offline_time": newOfflineTime,
"delete_user_deadline": newDeadline,
}}
_, err := u.coll.UpdateOne(ctx, filter, update)
return errs.Wrap(err)
}
// Delete 删除用户的离线记录(用户重新上线时调用,停止计时)。
func (u *userOfflineRecordMgo) Delete(ctx context.Context, userID string) error {
_, err := u.coll.DeleteOne(ctx, bson.M{"user_id": userID})
return errs.Wrap(err)
}
// FindExpiredUsers 返回 delete_user_deadline <= now 的用户。
// 通过 $lookup 联表 user 集合获取完整 *model.User$unwind 同时起到过滤孤儿记录的作用
// (若 user 文档已不存在,$unwind 会将其丢弃,避免对无效账号重复触发删除)。
func (u *userOfflineRecordMgo) FindExpiredUsers(ctx context.Context, now time.Time, limit int) ([]*model.User, error) {
pipeline := bson.A{
bson.M{"$match": bson.M{
"delete_user_deadline": bson.M{"$lte": now},
}},
bson.M{"$limit": limit},
bson.M{"$lookup": bson.M{
"from": database.UserName,
"localField": "user_id",
"foreignField": "user_id",
"as": "u",
}},
bson.M{"$unwind": "$u"},
bson.M{"$replaceRoot": bson.M{"newRoot": "$u"}},
}
return mongoutil.Aggregate[*model.User](ctx, u.coll, pipeline)
}

@ -0,0 +1,48 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package database
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
)
// ExpiredBurnGroup 表示某个 (UserID, ConversationID) 上需要被推进 min_seq 的 seq 集合。
type ExpiredBurnGroup struct {
UserID string
ConversationID string
// MaxSeq 当前批次中最大的过期 seq推进 min_seq 时使用 MaxSeq + 1。
MaxSeq int64
// Seqs 当前批次实际涉及的所有过期 seq便于精确删除已处理的 deadline 记录。
Seqs []int64
}
// MsgBurnDeadline 持久化每条消息对每个用户的「阅后即焚截止时间」。
// 写入位置msg 服务 MarkMsgsAsRead / MarkConversationAsRead单聊
// 消费位置conversation 服务 ClearBurnExpiredMsgs cron 入口。
type MsgBurnDeadline interface {
// UpsertIfAbsent 仅在 (UserID, ConversationID, Seq) 不存在时插入;
// 已存在则不覆盖,保证「首次阅读时刻」决定 deadline。
UpsertIfAbsent(ctx context.Context, items []*model.MsgBurnDeadline) error
// FindExpiredGroups 查询 deadline_ms <= nowMs 的记录,按 (UserID, ConversationID)
// 聚合并返回每组的最大 seq 与所涉及的 seq 列表。limit 限制返回的 group 数量。
FindExpiredGroups(ctx context.Context, nowMs int64, limit int) ([]*ExpiredBurnGroup, error)
// DeleteByUserConversationSeqs 删除某 (UserID, ConversationID) 下指定 seq 列表的 deadline 记录。
// 一般在成功推进 min_seq 后调用。
DeleteByUserConversationSeqs(ctx context.Context, userID, conversationID string, seqs []int64) error
}

@ -12,6 +12,7 @@ const (
GroupJoinVersionName = "group_join_version"
ConversationVersionName = "conversation_version"
GroupRequestName = "group_request"
GroupPinnedMsgName = "group_pinned_msg"
LogName = "log"
ObjectName = "s3"
UserName = "user"
@ -22,4 +23,6 @@ const (
SignalInvitationName = "signal_invitation"
SignalRecordName = "signal_record"
SpamReportName = "spam_report"
MsgBurnDeadlineName = "msg_burn_deadline"
UserOfflineRecordName = "user_offline_record"
)

@ -17,4 +17,6 @@ type PhoneSN interface {
GetByPhone(ctx context.Context, phone string) (*model.PhoneSNInfo, error)
// Upsert 写入或更新 is_snd 与 user_id
Upsert(ctx context.Context, phone string, userID int64, isSnd bool) error
// DeleteByPhone 按手机号删除记录;记录不存在时不报错
DeleteByPhone(ctx context.Context, phone string) error
}

@ -38,6 +38,9 @@ type SignalDatabase interface {
GetInvitationByGroupID(ctx context.Context, groupID string) (*model.SignalInvitation, error)
// GetInvitationsByRoomIDs retrieves invitations for the given room IDs.
GetInvitationsByRoomIDs(ctx context.Context, roomIDs []string) ([]*model.SignalInvitation, error)
// GetBusyUserIDs returns the subset of userIDs that are currently involved in an active call
// (either as inviter or as invitee in a pending invitation).
GetBusyUserIDs(ctx context.Context, userIDs []string) ([]string, error)
// CreateRecord stores a completed call record.
CreateRecord(ctx context.Context, record *model.SignalRecord) error

@ -45,6 +45,9 @@ type User interface {
SortQuery(ctx context.Context, userIDName map[string]string, asc bool) ([]*model.User, error)
// Delete permanently removes user documents by userID.
Delete(ctx context.Context, userIDs []string) error
// CRUD user command
AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string, ex string) error
DeleteUserCommand(ctx context.Context, userID string, Type int32, UUID string) error

@ -17,6 +17,8 @@ type UserGlobalBlack interface {
Find(ctx context.Context, userIDs []string) ([]*model.UserGlobalBlack, error)
// IsBlocked 检查单个用户是否在黑名单
IsBlocked(ctx context.Context, userID string) (bool, error)
// GetStatus 返回用户限制状态0=正常1=冻结2=黑名单
GetStatus(ctx context.Context, userID string) (int32, error)
// Page 分页查询黑名单列表
Page(ctx context.Context, pagination pagination.Pagination) (count int64, blacks []*model.UserGlobalBlack, err error)
}

@ -0,0 +1,28 @@
package database
import (
"context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
)
// UserOfflineRecord 管理 user_offline_record 集合。
// 集合中的每条记录代表一个当前处于全平台离线状态的用户及其首次全离线时刻。
type UserOfflineRecord interface {
// Upsert 写入用户的离线记录;若记录已存在则不覆盖(保留最早的离线时刻)。
// deadline = offlineTime + delete_account_interval供 FindExpiredUsers 快速过滤。
Upsert(ctx context.Context, userID string, offlineTime, deadline time.Time) error
// RefreshOfflineTime 将已存在的离线记录的 offline_time 与 delete_user_deadline
// 同时刷新,使删除倒计时从 newOfflineTime 重新起算。
// 若记录不存在(用户在线)则无操作。
RefreshOfflineTime(ctx context.Context, userID string, newOfflineTime, newDeadline time.Time) error
// Delete 删除用户的离线记录(用户重新上线时调用)。
Delete(ctx context.Context, userID string) error
// FindExpiredUsers 返回 delete_user_deadline <= now 的用户($lookup user 集合获取完整信息)。
// limit 限制单次返回条数,防止单批处理量过大。
FindExpiredUsers(ctx context.Context, now time.Time, limit int) ([]*model.User, error)
}

@ -0,0 +1,69 @@
// Copyright © 2026 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
package model
// GroupPinnedMsgMaxKeep 群置顶消息最多保留的条数(最新置顶的在最前)
const GroupPinnedMsgMaxKeep = 3
// GroupPinnedOfflinePush 离线推送信息快照
type GroupPinnedOfflinePush struct {
Title string `bson:"title"`
Desc string `bson:"desc"`
Ex string `bson:"ex"`
IOSPushSound string `bson:"ios_push_sound"`
IOSBadgeCount bool `bson:"ios_badge_count"`
SignalInfo string `bson:"signal_info"`
}
// GroupPinnedMessage 一条群置顶消息的完整内容快照
// 置顶时把消息整体快照入库,避免后续消息删除/撤回影响已置顶展示
type GroupPinnedMessage struct {
// PinID 全局唯一 id用于精准取消置顶生产由 mongo ObjectID().Hex() 生成)
PinID string `bson:"pin_id"`
// 会话 / 群信息
ConversationID string `bson:"conversation_id"`
GroupID string `bson:"group_id"`
// 消息标识
Seq int64 `bson:"seq"`
ServerMsgID string `bson:"server_msg_id"`
ClientMsgID string `bson:"client_msg_id"`
// 发送方信息
SendID string `bson:"send_id"`
RecvID string `bson:"recv_id"`
SenderPlatformID int32 `bson:"sender_platform_id"`
SenderNickname string `bson:"sender_nickname"`
SenderFaceURL string `bson:"sender_face_url"`
// 消息内容快照
SessionType int32 `bson:"session_type"`
MsgFrom int32 `bson:"msg_from"`
ContentType int32 `bson:"content_type"`
Content string `bson:"content"`
AtUserIDList []string `bson:"at_user_id_list"`
Options map[string]bool `bson:"options"`
AttachedInfo string `bson:"attached_info"`
Ex string `bson:"ex"`
OfflinePush *GroupPinnedOfflinePush `bson:"offline_push"`
// 时间
SendTime int64 `bson:"send_time"`
CreateTime int64 `bson:"create_time"`
Status int32 `bson:"status"`
// 操作人 & 时间
PinUserID string `bson:"pin_user_id"`
PinTime int64 `bson:"pin_time"`
}
// GroupPinnedMsg 一个群的置顶消息文档,按 group_id 唯一
type GroupPinnedMsg struct {
GroupID string `bson:"group_id"`
PinnedMsgs []*GroupPinnedMessage `bson:"pinned_msgs"`
}

@ -0,0 +1,31 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
// MsgBurnDeadline 单条消息的「阅后即焚截止时间」记录。
// 在 MarkMsgsAsRead / MarkConversationAsRead单聊场景时按
// (UserID, ConversationID, Seq) 维度写入。读取时间锁定后续不再覆盖。
//
// 当当前时间 > DeadlineMs 时cron 会把该用户在该会话上的 min_seq
// 推进到 max(已过期 seq) + 1从而让这些消息从对该用户的拉取结果中消失。
type MsgBurnDeadline struct {
UserID string `bson:"user_id"`
ConversationID string `bson:"conversation_id"`
Seq int64 `bson:"seq"`
// DeadlineMs 截止时间戳(毫秒);超过即可被 cron 收走推进 min_seq。
DeadlineMs int64 `bson:"deadline_ms"`
// CreateTime 写入时刻(毫秒);用于排查/审计。
CreateTime int64 `bson:"create_time"`
}

@ -42,22 +42,51 @@ const (
MsgReceiveSettingNobody int32 = 2
)
// GroupInviteSetting 群邀请权限枚举。
// 0=所有人可邀请, 1=仅好友可邀请, 2=所有人不可邀请
const (
GroupInviteSettingPublic int32 = 0
GroupInviteSettingFriends int32 = 1
GroupInviteSettingNobody int32 = 2
)
// UserStatus 用户账号状态枚举。
// 0=正常1=冻结可登录不能收发消息2=黑名单(不可登录,自动踢下线,不能收发消息)
const (
UserStatusNormal int32 = 0
UserStatusFrozen int32 = 1
UserStatusBlacklist int32 = 2
)
// DefaultDeleteAccountIntervalSec 删除账号等待间隔的系统默认值18 个月,按 30 天/月折算)。
// 注册时写入 MongoDB 的初始值;用户未显式修改时即为此值。
const DefaultDeleteAccountIntervalSec int32 = 18 * 30 * 24 * 3600
type User struct {
UserID string `bson:"user_id"`
Nickname string `bson:"nickname"`
FaceURL string `bson:"face_url"`
Ex string `bson:"ex"`
AppMangerLevel int32 `bson:"app_manger_level"`
GlobalRecvMsgOpt int32 `bson:"global_recv_msg_opt"`
CreateTime time.Time `bson:"create_time"`
// Phone 用户手机号(明文,仅服务端留存,下发时按 PhoneVisibility 过滤)
Phone string `bson:"phone"`
// PhoneVisibility 0=所有人可见 1=仅好友可见 2=隐藏
PhoneVisibility int32 `bson:"phone_visibility"`
// CallAcceptSetting 0=所有人可发起 1=仅好友可发起 2=不接受任何通话
CallAcceptSetting int32 `bson:"call_accept_setting"`
// MsgReceiveSetting 0=所有人可发送 1=仅好友可发送 2=所有人不可发送
MsgReceiveSetting int32 `bson:"msg_receive_setting"`
UserID string `bson:"user_id"`
Nickname string `bson:"nickname"`
FaceURL string `bson:"face_url"`
Ex string `bson:"ex"`
AppMangerLevel int32 `bson:"app_manger_level"`
GlobalRecvMsgOpt int32 `bson:"global_recv_msg_opt"`
CreateTime time.Time `bson:"create_time"`
FirstName string `bson:"first_name"`
LastName string `bson:"last_name"`
FullName string `bson:"full_name"`
Phone string `bson:"phone"`
AreaCode string `bson:"area_code"`
PhoneVisibility int32 `bson:"phone_visibility"`
CallAcceptSetting int32 `bson:"call_accept_setting"`
MsgReceiveSetting int32 `bson:"msg_receive_setting"`
GroupInviteSetting int32 `bson:"group_invite_setting"`
// CallRingtoneURL 用户自定义来电铃声 URL对方来电时播放此铃声
CallRingtoneURL string `bson:"call_ringtone_url"`
// Status 账号状态0=正常1=冻结2=黑名单
Status int32 `bson:"status"`
// MsgBurnDuration 用户全局消息阅后即焚时长0 表示关闭
MsgBurnDuration int32 `bson:"msg_burn_duration"`
// DeleteAccountInterval 删除账号间隔0 表示关闭
DeleteAccountInterval int32 `bson:"delete_account_interval"`
}
func (u *User) GetNickname() string {

@ -2,11 +2,14 @@ package model
import "time"
// UserGlobalBlack 全局黑名单记录,被加入黑名单的用户无法登录
// UserGlobalBlack 全局黑名单/冻结记录。
// Status: 1=冻结可登录不能收发消息2=黑名单(不可登录,自动踢下线,不能收发消息)
type UserGlobalBlack struct {
UserID string `bson:"user_id"`
Nickname string `bson:"nickname"`
OperatorID string `bson:"operator_id"`
Reason string `bson:"reason"`
CreateTime time.Time `bson:"create_time"`
// Status 限制类型1=冻结2=黑名单
Status int32 `bson:"status"`
}

@ -0,0 +1,14 @@
package model
import "time"
// UserOfflineRecord 记录用户全平台离线的时刻及账号自动删除截止时间。
// 用户上线时删除记录;用户全部平台离线时 upsert 记录。
// crontask 每小时扫描此集合,删除 DeleteUserDeadline <= now 的账号。
type UserOfflineRecord struct {
UserID string `bson:"user_id"`
OfflineTime time.Time `bson:"offline_time"`
// DeleteUserDeadline = OfflineTime + delete_account_interval
// 用户修改 delete_account_interval 时同步刷新此字段。
DeleteUserDeadline time.Time `bson:"delete_user_deadline"`
}

@ -44,7 +44,7 @@ func NewOptions(opts ...OptionsOpt) Options {
func NewMsgOptions() Options {
options := make(map[string]bool, 11)
options[constant.IsOfflinePush] = false
return make(map[string]bool)
return options
}
func WithOptions(options Options, opts ...OptionsOpt) Options {

@ -75,6 +75,29 @@ func (x *MsgClient) GetActiveConversation(ctx context.Context, conversationIDs [
return extractField(ctx, x.MsgClient.GetActiveConversation, req, (*msg.GetActiveConversationResp).GetConversations)
}
// GetSingleMsgBySeq 根据会话 ID 与 seq 拉取一条消息(不存在时返回 nil
func (x *MsgClient) GetSingleMsgBySeq(ctx context.Context, conversationID string, seq int64) (*sdkws.MsgData, error) {
if conversationID == "" || seq <= 0 {
return nil, nil
}
req := &msg.GetMsgByConversationIDsReq{
ConversationIDs: []string{conversationID},
MaxSeqs: map[string]int64{conversationID: seq},
}
resp, err := x.MsgClient.GetMsgByConversationIDs(ctx, req)
if err != nil {
return nil, err
}
m := resp.GetMsgDatas()
if len(m) == 0 {
return nil, nil
}
if v, ok := m[conversationID]; ok && v != nil && v.Seq == seq {
return v, nil
}
return nil, nil
}
func (x *MsgClient) GetSeqMessage(ctx context.Context, userID string, conversations []*msg.ConversationSeqs) (map[string]*sdkws.PullMsgs, error) {
if len(conversations) == 0 {
return nil, nil

@ -1 +1 @@
Subproject commit 0db6a732426df40792921f861112e32785405e8d
Subproject commit 24c72259373e9080ab005bb10385524753d4dfc3
Loading…
Cancel
Save