From 1d682847874db7040c39c85815d0793536bb443a Mon Sep 17 00:00:00 2001 From: hawklin2017 <32898629+hawklin2017@users.noreply.github.com> Date: Tue, 28 Apr 2026 19:57:56 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E9=BB=91=E5=90=8D?= =?UTF-8?q?=E5=8D=95=E7=8A=B6=E6=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/api/user_global_black.go | 65 ++++++++++++++----- internal/rpc/auth/auth.go | 18 ++--- internal/rpc/msg/send.go | 4 ++ internal/rpc/msg/server.go | 6 ++ internal/rpc/msg/verify.go | 38 +++++++++++ .../storage/controller/user_global_black.go | 8 ++- .../storage/database/mgo/user_global_black.go | 17 ++++- .../storage/database/user_global_black.go | 2 + pkg/common/storage/model/user.go | 10 +++ pkg/common/storage/model/user_global_black.go | 5 +- 10 files changed, 144 insertions(+), 29 deletions(-) diff --git a/internal/api/user_global_black.go b/internal/api/user_global_black.go index 543960aa0..8efcaf9d0 100644 --- a/internal/api/user_global_black.go +++ b/internal/api/user_global_black.go @@ -29,10 +29,14 @@ func NewUserGlobalBlackApi(blacklistDB controller.UserGlobalBlackDatabase, userD type addGlobalBlacklistReq struct { UserIDs []string `json:"userIDs" binding:"required,min=1"` Reason string `json:"reason"` + // Status 限制类型:1=冻结(可登录,不能收发消息);2=黑名单(不可登录,自动踢下线) + Status int32 `json:"status" binding:"required,oneof=1 2"` } type removeGlobalBlacklistReq struct { UserIDs []string `json:"userIDs" binding:"required,min=1"` + // Status 目标状态:0=恢复正常(同步从 blacklistDB 删除记录);1=冻结;2=黑名单 + Status int32 `json:"status" binding:"oneof=0 1 2"` } type getGlobalBlacklistReq struct { @@ -45,6 +49,8 @@ type globalBlackItem struct { OperatorID string `json:"operatorID"` Reason string `json:"reason"` CreateTime int64 `json:"createTime"` + // Status 限制类型:1=冻结,2=黑名单 + Status int32 `json:"status"` } type getGlobalBlacklistResp struct { @@ -52,7 +58,8 @@ type getGlobalBlacklistResp struct { Blacks []globalBlackItem `json:"blacks"` } -// AddGlobalBlacklist 管理员将用户加入全局黑名单,并立即踢下线(所有平台 token 标记 KickedToken) +// AddGlobalBlacklist 管理员设置用户限制状态。 +// Status=1(冻结):可登录,但不能收发消息;Status=2(黑名单):不可登录,自动踢下线,不能收发消息。 func (b *UserGlobalBlackApi) AddGlobalBlacklist(c *gin.Context) { var req addGlobalBlacklistReq if err := c.ShouldBindJSON(&req); err != nil { @@ -85,31 +92,44 @@ func (b *UserGlobalBlackApi) AddGlobalBlacklist(c *gin.Context) { Nickname: u.Nickname, OperatorID: operatorID, Reason: req.Reason, + Status: req.Status, }) } if err := b.blacklistDB.AddBlack(c, blacks); err != nil { apiresp.GinError(c, err) return } - // 黑名单写入成功后,对每个被封禁用户的所有非管理员平台执行 force_logout: - // 1. 断开 WS 长连接(msggateway.KickUserOffline) - // 2. 将 Redis 中该平台的所有 token 标记为 KickedToken - for _, black := range blacks { - for platformID := range constant.PlatformID2Name { - if int32(platformID) == constant.AdminPlatformID { - continue - } - if err := b.authClient.ForceLogout(c, black.UserID, int32(platformID)); err != nil { - // 踢下线失败不阻断主流程,记录警告即可 - log.ZWarn(c, "AddGlobalBlacklist: ForceLogout failed", err, - "userID", black.UserID, "platformID", platformID) + // 同步更新 user 集合中的状态字段 + for _, userID := range req.UserIDs { + if err := b.userDB.UpdateByMap(c, userID, map[string]any{"status": req.Status}); err != nil { + log.ZWarn(c, "AddGlobalBlacklist: UpdateByMap status failed", err, + "userID", userID, "status", req.Status) + } + } + // 仅黑名单(Status=2)需要踢下线:断开 WS 长连接并将 token 标记为 KickedToken + if req.Status == model.UserStatusBlacklist { + for _, black := range blacks { + for platformID := range constant.PlatformID2Name { + if int32(platformID) == constant.AdminPlatformID { + continue + } + if err := b.authClient.ForceLogout(c, black.UserID, int32(platformID)); err != nil { + log.ZWarn(c, "AddGlobalBlacklist: ForceLogout failed", err, + "userID", black.UserID, "platformID", platformID) + } } } } apiresp.GinSuccess(c, nil) } -// RemoveGlobalBlacklist 管理员从全局黑名单移除用户 +// RemoveGlobalBlacklist 管理员更新用户账号状态。 +// 执行顺序: +// 1. 将 user 集合中的 status 字段更新为请求值 +// 2. 仅当 status == 0(恢复正常)时,才从 blacklistDB 删除该用户的限制记录 +// +// 说明:blacklistDB 是 auth/msg 层的拦截依据;状态先落 user 集合, +// 只有确认目标状态为"正常"时才清除黑名单记录,避免状态写入成功但记录未删导致仍被拦截。 func (b *UserGlobalBlackApi) RemoveGlobalBlacklist(c *gin.Context) { var req removeGlobalBlacklistReq if err := c.ShouldBindJSON(&req); err != nil { @@ -120,9 +140,19 @@ func (b *UserGlobalBlackApi) RemoveGlobalBlacklist(c *gin.Context) { apiresp.GinError(c, err) return } - if err := b.blacklistDB.RemoveBlack(c, req.UserIDs); err != nil { - apiresp.GinError(c, err) - return + for _, userID := range req.UserIDs { + if err := b.userDB.UpdateByMap(c, userID, map[string]any{"status": req.Status}); err != nil { + log.ZError(c, "RemoveGlobalBlacklist: UpdateByMap status failed", err, "userID", userID, "status", req.Status) + apiresp.GinError(c, err) + return + } + } + // 只有目标状态为 0(正常)时才删除 blacklistDB 中的限制记录 + if req.Status == model.UserStatusNormal { + if err := b.blacklistDB.RemoveBlack(c, req.UserIDs); err != nil { + apiresp.GinError(c, err) + return + } } apiresp.GinSuccess(c, nil) } @@ -151,6 +181,7 @@ func (b *UserGlobalBlackApi) GetGlobalBlacklist(c *gin.Context) { OperatorID: blk.OperatorID, Reason: blk.Reason, CreateTime: blk.CreateTime.UnixMilli(), + Status: blk.Status, }) } apiresp.GinSuccess(c, getGlobalBlacklistResp{Total: total, Blacks: items}) diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index bb7a95ce1..d5846b715 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -32,6 +32,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" pbauth "github.com/openimsdk/protocol/auth" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/msggateway" @@ -140,13 +141,13 @@ func (s *authServer) GetUserToken(ctx context.Context, req *pbauth.GetUserTokenR return nil, errs.ErrArgs.WrapMsg("app account can`t get token") } - blocked, _ := s.blacklistDB.IsBlocked(ctx, req.UserID) - if blocked { - // Blacklisted users should be actively kicked to invalidate existing sessions. + // 仅黑名单(status=2)禁止登录;冻结(status=1)允许获取 token,仅在收发消息层面拦截 + status, _ := s.blacklistDB.GetStatus(ctx, req.UserID) + if status == model.UserStatusBlacklist { if kickErr := s.forceKickOffAllPlatforms(ctx, req.UserID); kickErr != nil { log.ZWarn(ctx, "GetUserToken forceKickOffAllPlatforms failed", kickErr, "userID", req.UserID) } - log.ZWarn(ctx, "GetUserToken is blocked", errors.New("user is in global blacklist, userID="+req.UserID), "userID", req.UserID, "blocked", blocked) + log.ZWarn(ctx, "GetUserToken is blocked", errors.New("user is in global blacklist, userID="+req.UserID), "userID", req.UserID, "status", status) return nil, servererrs.ErrUserBlocked.WithDetail("user is in global blacklist, userID=" + req.UserID) } token, err := s.authDatabase.CreateToken(ctx, req.UserID, int(req.PlatformID)) @@ -167,14 +168,13 @@ func (s *authServer) parseToken(ctx context.Context, tokensString string) (claim if isAdmin { return claims, nil } - // 非管理员用户检查全局黑名单 - blocked, _ := s.blacklistDB.IsBlocked(ctx, claims.UserID) - if blocked { - // Blacklisted users should be actively kicked to invalidate existing sessions. + // 非管理员用户检查全局黑名单:仅 status=2(黑名单)拦截;status=1(冻结)允许通过 token 校验 + status, _ := s.blacklistDB.GetStatus(ctx, claims.UserID) + if status == model.UserStatusBlacklist { if kickErr := s.forceKickOffAllPlatforms(ctx, claims.UserID); kickErr != nil { log.ZWarn(ctx, "parseToken forceKickOffAllPlatforms failed", kickErr, "userID", claims.UserID) } - log.ZWarn(ctx, "parseToken is blocked", errors.New("user is in global blacklist, userID="+claims.UserID), "userID", claims.UserID, "blocked", blocked) + log.ZWarn(ctx, "parseToken is blocked", errors.New("user is in global blacklist, userID="+claims.UserID), "userID", claims.UserID, "status", status) return nil, servererrs.ErrUserBlocked.WithDetail("user is in global blacklist, userID=" + claims.UserID) } m, err := s.authDatabase.GetTokensWithoutError(ctx, claims.UserID, claims.PlatformID) diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index f754f9057..e96f0d935 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -34,6 +34,10 @@ import ( func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg.SendMsgResp, error) { if req.MsgData != nil { m.encapsulateMsgData(req.MsgData) + // 全局账号状态校验:冻结/黑名单用户不可收发消息 + if err := m.verifyUserStatus(ctx, req); err != nil { + return nil, err + } switch req.MsgData.SessionType { case constant.SingleChatType: return m.sendMsgSingleChat(ctx, req) diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 2b91c4405..de00217ea 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -71,6 +71,7 @@ type msgServer struct { webhookClient *webhook.Client conversationClient *rpcli.ConversationClient spamReportDB database.SpamReport + globalBlackDB controller.UserGlobalBlackDatabase } func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorFunc) { @@ -127,6 +128,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } + globalBlackMgo, err := mgo.NewUserGlobalBlackMongo(mgocli.GetDB()) + if err != nil { + return err + } s := &msgServer{ MsgDatabase: msgDatabase, RegisterCenter: client, @@ -138,6 +143,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL), conversationClient: conversationClient, spamReportDB: spamReportDB, + globalBlackDB: controller.NewUserGlobalBlackDatabase(globalBlackMgo), } s.notificationSender = notification.NewNotificationSender(&config.NotificationConfig, notification.WithLocalSendMsg(s.SendMsg)) diff --git a/internal/rpc/msg/verify.go b/internal/rpc/msg/verify.go index 8b4d53dd0..1f5c45661 100644 --- a/internal/rpc/msg/verify.go +++ b/internal/rpc/msg/verify.go @@ -21,6 +21,7 @@ import ( "time" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" @@ -50,6 +51,43 @@ type MessageRevoked struct { Seq uint32 `json:"seq"` } +// verifyUserStatus 校验发送方/接收方的全局账号状态。 +// 任意一方处于冻结(1)或黑名单(2)即拒绝消息发送/投递。 +// 通知类消息(NotificationBegin~NotificationEnd)和管理员发送方放行。 +func (m *msgServer) verifyUserStatus(ctx context.Context, data *msg.SendMsgReq) error { + if data == nil || data.MsgData == nil { + return nil + } + if data.MsgData.ContentType >= constant.NotificationBegin && data.MsgData.ContentType <= constant.NotificationEnd { + return nil + } + sendID := data.MsgData.SendID + if datautil.Contain(sendID, m.config.Share.IMAdminUserID...) { + return nil + } + if sendID != "" { + st, err := m.globalBlackDB.GetStatus(ctx, sendID) + if err != nil { + log.ZWarn(ctx, "verifyUserStatus: GetStatus(send) failed", err, "sendID", sendID) + } else if st == model.UserStatusFrozen || st == model.UserStatusBlacklist { + return servererrs.ErrUserBlocked.WithDetail("sender is restricted, status=" + strconv.Itoa(int(st))) + } + } + // 单聊:同时校验接收方状态;群聊接收方拦截在推送层处理 + if data.MsgData.SessionType == constant.SingleChatType { + recvID := data.MsgData.RecvID + if recvID != "" && !datautil.Contain(recvID, m.config.Share.IMAdminUserID...) { + st, err := m.globalBlackDB.GetStatus(ctx, recvID) + if err != nil { + log.ZWarn(ctx, "verifyUserStatus: GetStatus(recv) failed", err, "recvID", recvID) + } else if st == model.UserStatusFrozen || st == model.UserStatusBlacklist { + return servererrs.ErrMsgReceiveNotAllowed.WrapMsg("receiver is restricted") + } + } + } + return nil +} + func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgReq) error { switch data.MsgData.SessionType { case constant.SingleChatType: diff --git a/pkg/common/storage/controller/user_global_black.go b/pkg/common/storage/controller/user_global_black.go index 2a6d114d4..973d87539 100644 --- a/pkg/common/storage/controller/user_global_black.go +++ b/pkg/common/storage/controller/user_global_black.go @@ -14,8 +14,10 @@ type UserGlobalBlackDatabase interface { AddBlack(ctx context.Context, blacks []*model.UserGlobalBlack) error // RemoveBlack 按 userID 将用户从全局黑名单移除 RemoveBlack(ctx context.Context, userIDs []string) error - // IsBlocked 检查用户是否在全局黑名单 + // IsBlocked 检查用户是否在全局黑名单(含冻结) IsBlocked(ctx context.Context, userID string) (bool, error) + // GetStatus 返回用户限制状态:0=正常,1=冻结,2=黑名单 + GetStatus(ctx context.Context, userID string) (int32, error) // FindBlocked 批量查询哪些 userID 在全局黑名单中,返回被封禁的记录 FindBlocked(ctx context.Context, userIDs []string) ([]*model.UserGlobalBlack, error) // GetBlackList 分页获取黑名单列表 @@ -42,6 +44,10 @@ func (u *userGlobalBlackDatabase) IsBlocked(ctx context.Context, userID string) return u.db.IsBlocked(ctx, userID) } +func (u *userGlobalBlackDatabase) GetStatus(ctx context.Context, userID string) (int32, error) { + return u.db.GetStatus(ctx, userID) +} + func (u *userGlobalBlackDatabase) GetBlackList(ctx context.Context, pagination pagination.Pagination) (int64, []*model.UserGlobalBlack, error) { return u.db.Page(ctx, pagination) } diff --git a/pkg/common/storage/database/mgo/user_global_black.go b/pkg/common/storage/database/mgo/user_global_black.go index 686c2bf3f..8bca466db 100644 --- a/pkg/common/storage/database/mgo/user_global_black.go +++ b/pkg/common/storage/database/mgo/user_global_black.go @@ -37,7 +37,7 @@ func (u *UserGlobalBlackMgo) Add(ctx context.Context, blacks []*model.UserGlobal b.CreateTime = time.Now() } } - // 使用 upsert 避免重复插入报错 + // 使用 upsert 避免重复插入报错;status 也走 $set 以便升级/降级(冻结↔黑名单)时同步更新 for _, b := range blacks { filter := bson.M{"user_id": b.UserID} update := bson.M{ @@ -45,6 +45,7 @@ func (u *UserGlobalBlackMgo) Add(ctx context.Context, blacks []*model.UserGlobal "nickname": b.Nickname, "operator_id": b.OperatorID, "reason": b.Reason, + "status": b.Status, }, "$setOnInsert": bson.M{ "user_id": b.UserID, @@ -59,6 +60,20 @@ func (u *UserGlobalBlackMgo) Add(ctx context.Context, blacks []*model.UserGlobal return nil } +// GetStatus 返回 userID 对应的限制状态: +// 0=正常(无记录),1=冻结,2=黑名单 +func (u *UserGlobalBlackMgo) GetStatus(ctx context.Context, userID string) (int32, error) { + var doc model.UserGlobalBlack + err := u.coll.FindOne(ctx, bson.M{"user_id": userID}, options.FindOne().SetProjection(bson.M{"status": 1})).Decode(&doc) + if err != nil { + if err == mongo.ErrNoDocuments { + return model.UserStatusNormal, nil + } + return model.UserStatusNormal, errs.Wrap(err) + } + return doc.Status, nil +} + func (u *UserGlobalBlackMgo) Remove(ctx context.Context, users []string) error { if len(users) == 0 { return nil diff --git a/pkg/common/storage/database/user_global_black.go b/pkg/common/storage/database/user_global_black.go index a30dbaadf..3c980d9d5 100644 --- a/pkg/common/storage/database/user_global_black.go +++ b/pkg/common/storage/database/user_global_black.go @@ -17,6 +17,8 @@ type UserGlobalBlack interface { Find(ctx context.Context, userIDs []string) ([]*model.UserGlobalBlack, error) // IsBlocked 检查单个用户是否在黑名单 IsBlocked(ctx context.Context, userID string) (bool, error) + // GetStatus 返回用户限制状态:0=正常,1=冻结,2=黑名单 + GetStatus(ctx context.Context, userID string) (int32, error) // Page 分页查询黑名单列表 Page(ctx context.Context, pagination pagination.Pagination) (count int64, blacks []*model.UserGlobalBlack, err error) } diff --git a/pkg/common/storage/model/user.go b/pkg/common/storage/model/user.go index 8c2e18167..dc02fa5cf 100644 --- a/pkg/common/storage/model/user.go +++ b/pkg/common/storage/model/user.go @@ -42,6 +42,14 @@ const ( MsgReceiveSettingNobody int32 = 2 ) +// UserStatus 用户账号状态枚举。 +// 0=正常;1=冻结(可登录,不能收发消息);2=黑名单(不可登录,自动踢下线,不能收发消息) +const ( + UserStatusNormal int32 = 0 + UserStatusFrozen int32 = 1 + UserStatusBlacklist int32 = 2 +) + type User struct { UserID string `bson:"user_id"` Nickname string `bson:"nickname"` @@ -57,6 +65,8 @@ type User struct { PhoneVisibility int32 `bson:"phone_visibility"` CallAcceptSetting int32 `bson:"call_accept_setting"` MsgReceiveSetting int32 `bson:"msg_receive_setting"` + // Status 账号状态:0=正常,1=冻结,2=黑名单 + Status int32 `bson:"status"` } func (u *User) GetNickname() string { diff --git a/pkg/common/storage/model/user_global_black.go b/pkg/common/storage/model/user_global_black.go index a0329cf86..1beac7b37 100644 --- a/pkg/common/storage/model/user_global_black.go +++ b/pkg/common/storage/model/user_global_black.go @@ -2,11 +2,14 @@ package model import "time" -// UserGlobalBlack 全局黑名单记录,被加入黑名单的用户无法登录 +// UserGlobalBlack 全局黑名单/冻结记录。 +// Status: 1=冻结(可登录,不能收发消息);2=黑名单(不可登录,自动踢下线,不能收发消息) type UserGlobalBlack struct { UserID string `bson:"user_id"` Nickname string `bson:"nickname"` OperatorID string `bson:"operator_id"` Reason string `bson:"reason"` CreateTime time.Time `bson:"create_time"` + // Status 限制类型:1=冻结,2=黑名单 + Status int32 `bson:"status"` } From d51378bd7b57b40aad2878cffed23acbd5c51ef3 Mon Sep 17 00:00:00 2001 From: hawklin2017 <32898629+hawklin2017@users.noreply.github.com> Date: Wed, 29 Apr 2026 18:03:01 +0800 Subject: [PATCH 2/4] pin group message --- internal/api/group.go | 12 + internal/api/router.go | 3 + internal/rpc/group/group.go | 6 +- internal/rpc/group/notification.go | 26 ++ internal/rpc/group/pinned_msg.go | 271 ++++++++++++++++++ pkg/common/storage/controller/group.go | 40 ++- .../storage/database/group_pinned_msg.go | 22 ++ .../storage/database/mgo/group_pinned_msg.go | 115 ++++++++ pkg/common/storage/database/name.go | 1 + pkg/common/storage/model/group_pinned_msg.go | 69 +++++ pkg/rpcli/msg.go | 23 ++ 11 files changed, 577 insertions(+), 11 deletions(-) create mode 100644 internal/rpc/group/pinned_msg.go create mode 100644 pkg/common/storage/database/group_pinned_msg.go create mode 100644 pkg/common/storage/database/mgo/group_pinned_msg.go create mode 100644 pkg/common/storage/model/group_pinned_msg.go diff --git a/internal/api/group.go b/internal/api/group.go index 9a2ffda06..eec11353a 100644 --- a/internal/api/group.go +++ b/internal/api/group.go @@ -173,3 +173,15 @@ func (o *GroupApi) GetGroupApplicationUnhandledCount(c *gin.Context) { func (o *GroupApi) GetCommonGroupsWithFriend(c *gin.Context) { a2r.Call(c, group.GroupClient.GetCommonGroupsWithFriend, o.Client) } + +func (o *GroupApi) PinGroupMessage(c *gin.Context) { + a2r.Call(c, group.GroupClient.PinGroupMessage, o.Client) +} + +func (o *GroupApi) UnpinGroupMessage(c *gin.Context) { + a2r.Call(c, group.GroupClient.UnpinGroupMessage, o.Client) +} + +func (o *GroupApi) GetGroupPinnedMessages(c *gin.Context) { + a2r.Call(c, group.GroupClient.GetGroupPinnedMessages, o.Client) +} diff --git a/internal/api/router.go b/internal/api/router.go index accaae666..026f327d7 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -243,6 +243,9 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co groupRouterGroup.POST("/get_full_join_group_ids", g.GetFullJoinGroupIDs) groupRouterGroup.POST("/get_group_application_unhandled_count", g.GetGroupApplicationUnhandledCount) groupRouterGroup.POST("/get_common_groups_with_friend", g.GetCommonGroupsWithFriend) + groupRouterGroup.POST("/pin_group_message", g.PinGroupMessage) + groupRouterGroup.POST("/unpin_group_message", g.UnpinGroupMessage) + groupRouterGroup.POST("/get_group_pinned_messages", g.GetGroupPinnedMessages) } // certificate { diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index f829f07cf..d9d8fd427 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -101,6 +101,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } + groupPinnedMsgDB, err := mgo.NewGroupPinnedMsgMongo(mgocli.GetDB()) + if err != nil { + return err + } //userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) //msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg) @@ -130,7 +134,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg conversationClient: rpcli.NewConversationClient(conversationConn), //cryptoClient: rpcli.NewCryptoClient(cryptoConn), } - gs.db = controller.NewGroupDatabase(rdb, &config.LocalCacheConfig, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs)) + gs.db = controller.NewGroupDatabase(rdb, &config.LocalCacheConfig, groupDB, groupMemberDB, groupRequestDB, groupPinnedMsgDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs)) gs.notification = NewNotificationSender(gs.db, config, gs.userClient, gs.msgClient, gs.conversationClient) localcache.InitLocalCache(&config.LocalCacheConfig) pbgroup.RegisterGroupServer(server, &gs) diff --git a/internal/rpc/group/notification.go b/internal/rpc/group/notification.go index 95335b2fc..3b1a24688 100644 --- a/internal/rpc/group/notification.go +++ b/internal/rpc/group/notification.go @@ -852,6 +852,32 @@ func (g *NotificationSender) GroupMemberSetToAdminNotification(ctx context.Conte g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberSetToAdminNotification, tips) } +// GroupMessagePinnedNotification 通知群成员有消息被置顶或取消置顶 +// pinType: 1=置顶, 2=取消置顶 +func (g *NotificationSender) GroupMessagePinnedNotification(ctx context.Context, groupID string, pinType int32, + pinned *sdkws.GroupPinnedMsgInfo, pinnedList []*sdkws.GroupPinnedMsgInfo) { + var err error + defer func() { + if err != nil { + log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) + } + }() + groupInfo, err := g.getGroupInfo(ctx, groupID) + if err != nil { + return + } + tips := &sdkws.GroupMessagePinnedTips{ + Group: groupInfo, + Type: pinType, + PinnedMsg: pinned, + PinnedList: pinnedList, + } + if err = g.fillOpUser(ctx, &tips.OpUser, groupID); err != nil { + return + } + g.Notification(ctx, mcontext.GetOpUserID(ctx), groupID, constant.GroupMessagePinnedNotification, tips) +} + func (g *NotificationSender) GroupMemberSetToOrdinaryUserNotification(ctx context.Context, groupID, groupMemberUserID string) { var err error defer func() { diff --git a/internal/rpc/group/pinned_msg.go b/internal/rpc/group/pinned_msg.go new file mode 100644 index 000000000..d28977111 --- /dev/null +++ b/internal/rpc/group/pinned_msg.go @@ -0,0 +1,271 @@ +// Copyright © 2026 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. + +package group + +import ( + "context" + "time" + + "github.com/openimsdk/open-im-server/v3/pkg/authverify" + "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" + "github.com/openimsdk/protocol/constant" + pbgroup "github.com/openimsdk/protocol/group" + "github.com/openimsdk/protocol/sdkws" + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/mcontext" +) + +// 群置顶消息相关 RPC 实现: +// - 自动滚动保留最近 N 条置顶消息(N=model.GroupPinnedMsgMaxKeep,默认为 3) +// - 置顶时把整条消息内容做完整快照存档,避免后续消息删除/撤回影响展示 +// - 每条置顶记录拥有唯一 pinID,作为 unpin 时的精准删除凭据 +// - 权限:默认全员可置顶;当 group.AllowPinMsg=1 时,仅群主/管理员可置顶或取消置顶 + +const ( + groupPinnedActionPin = int32(1) + groupPinnedActionUnpin = int32(2) +) + +// PinGroupMessage 群聊中置顶单条消息 +func (s *groupServer) PinGroupMessage(ctx context.Context, req *pbgroup.PinGroupMessageReq) (*pbgroup.PinGroupMessageResp, error) { + if req.GroupID == "" { + return nil, errs.ErrArgs.WrapMsg("groupID empty") + } + if req.Seq <= 0 { + return nil, errs.ErrArgs.WrapMsg("seq must be positive") + } + + group, err := s.db.TakeGroup(ctx, req.GroupID) + if err != nil { + return nil, err + } + if group.Status == constant.GroupStatusDismissed { + return nil, servererrs.ErrDismissedAlready.Wrap() + } + + if err := s.checkPinPermission(ctx, group); err != nil { + return nil, err + } + + conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupID) + msgData, err := s.msgClient.GetSingleMsgBySeq(ctx, conversationID, req.Seq) + if err != nil { + return nil, err + } + if msgData == nil { + return nil, servererrs.ErrRecordNotFound.WrapMsg("message not found by seq") + } + if msgData.GroupID != "" && msgData.GroupID != req.GroupID { + return nil, errs.ErrArgs.WrapMsg("seq does not belong to this group") + } + if msgData.Status >= constant.MsgStatusHasDeleted { + return nil, servererrs.ErrRecordNotFound.WrapMsg("message has been deleted") + } + + pin := buildPinSnapshot(req.GroupID, conversationID, mcontext.GetOpUserID(ctx), msgData) + + pinnedList, err := s.db.PinGroupMessage(ctx, req.GroupID, pin) + if err != nil { + return nil, err + } + + pbPinned := pinnedMsgDB2PB(pin) + pbList := pinnedListDB2PB(pinnedList) + + s.notification.GroupMessagePinnedNotification(ctx, req.GroupID, groupPinnedActionPin, pbPinned, pbList) + + return &pbgroup.PinGroupMessageResp{ + PinnedMsg: pbPinned, + PinnedList: pbList, + }, nil +} + +// UnpinGroupMessage 群聊中取消置顶单条消息(pinID 优先;为空则按 seq) +func (s *groupServer) UnpinGroupMessage(ctx context.Context, req *pbgroup.UnpinGroupMessageReq) (*pbgroup.UnpinGroupMessageResp, error) { + if req.GroupID == "" { + return nil, errs.ErrArgs.WrapMsg("groupID empty") + } + if req.PinID == "" && req.Seq <= 0 { + return nil, errs.ErrArgs.WrapMsg("either pinID or seq must be provided") + } + + group, err := s.db.TakeGroup(ctx, req.GroupID) + if err != nil { + return nil, err + } + if group.Status == constant.GroupStatusDismissed { + return nil, servererrs.ErrDismissedAlready.Wrap() + } + if err := s.checkPinPermission(ctx, group); err != nil { + return nil, err + } + + current, err := s.db.GetGroupPinnedMessages(ctx, req.GroupID) + if err != nil { + return nil, err + } + var target *model.GroupPinnedMessage + for _, m := range current { + if req.PinID != "" { + if m.PinID == req.PinID { + target = m + break + } + } else if m.Seq == req.Seq { + target = m + break + } + } + if target == nil { + return nil, servererrs.ErrRecordNotFound.WrapMsg("pinned message not found") + } + + pinnedList, err := s.db.UnpinGroupMessage(ctx, req.GroupID, req.PinID, req.Seq) + if err != nil { + return nil, err + } + + pbPinned := pinnedMsgDB2PB(target) + pbList := pinnedListDB2PB(pinnedList) + + s.notification.GroupMessagePinnedNotification(ctx, req.GroupID, groupPinnedActionUnpin, pbPinned, pbList) + + return &pbgroup.UnpinGroupMessageResp{PinnedList: pbList}, nil +} + +// GetGroupPinnedMessages 获取群置顶消息列表 +func (s *groupServer) GetGroupPinnedMessages(ctx context.Context, req *pbgroup.GetGroupPinnedMessagesReq) (*pbgroup.GetGroupPinnedMessagesResp, error) { + if req.GroupID == "" { + return nil, errs.ErrArgs.WrapMsg("groupID empty") + } + if err := s.checkAdminOrInGroup(ctx, req.GroupID); err != nil { + return nil, err + } + pinnedList, err := s.db.GetGroupPinnedMessages(ctx, req.GroupID) + if err != nil { + return nil, err + } + return &pbgroup.GetGroupPinnedMessagesResp{ + PinnedList: pinnedListDB2PB(pinnedList), + }, nil +} + +// checkPinPermission 校验当前操作者是否具备群消息置顶权限 +func (s *groupServer) checkPinPermission(ctx context.Context, group *model.Group) error { + if authverify.IsAppManagerUid(ctx, s.config.Share.IMAdminUserID) { + return nil + } + opUserID := mcontext.GetOpUserID(ctx) + if opUserID == "" { + return errs.ErrNoPermission.WrapMsg("op user id empty") + } + member, err := s.db.TakeGroupMember(ctx, group.GroupID, opUserID) + if err != nil { + return err + } + isOwnerOrAdmin := member.RoleLevel == constant.GroupOwner || member.RoleLevel == constant.GroupAdmin + if group.AllowPinMsg == model.GroupPermAdminOnly && !isOwnerOrAdmin { + return errs.ErrNoPermission.WrapMsg("only owner or admin can pin/unpin group message") + } + return nil +} + +// buildPinSnapshot 把 sdkws.MsgData 完整快照成 GroupPinnedMessage +// PinID 在 mgo 层 Pin 时若为空会自动生成;这里留空交由存储层处理 +func buildPinSnapshot(groupID, conversationID, opUserID string, m *sdkws.MsgData) *model.GroupPinnedMessage { + pin := &model.GroupPinnedMessage{ + GroupID: groupID, + ConversationID: conversationID, + Seq: m.Seq, + ServerMsgID: m.ServerMsgID, + ClientMsgID: m.ClientMsgID, + SendID: m.SendID, + RecvID: m.RecvID, + SenderPlatformID: m.SenderPlatformID, + SenderNickname: m.SenderNickname, + SenderFaceURL: m.SenderFaceURL, + SessionType: m.SessionType, + MsgFrom: m.MsgFrom, + ContentType: m.ContentType, + Content: string(m.Content), + AtUserIDList: append([]string(nil), m.AtUserIDList...), + Options: copyOptions(m.Options), + AttachedInfo: m.AttachedInfo, + Ex: m.Ex, + SendTime: m.SendTime, + CreateTime: m.CreateTime, + Status: m.Status, + PinUserID: opUserID, + PinTime: time.Now().UnixMilli(), + } + if m.OfflinePushInfo != nil { + pin.OfflinePush = &model.GroupPinnedOfflinePush{ + Title: m.OfflinePushInfo.Title, + Desc: m.OfflinePushInfo.Desc, + Ex: m.OfflinePushInfo.Ex, + IOSPushSound: m.OfflinePushInfo.IOSPushSound, + IOSBadgeCount: m.OfflinePushInfo.IOSBadgeCount, + SignalInfo: m.OfflinePushInfo.SignalInfo, + } + } + return pin +} + +func copyOptions(src map[string]bool) map[string]bool { + if len(src) == 0 { + return nil + } + dst := make(map[string]bool, len(src)) + for k, v := range src { + dst[k] = v + } + return dst +} + +func pinnedMsgDB2PB(m *model.GroupPinnedMessage) *sdkws.GroupPinnedMsgInfo { + if m == nil { + return nil + } + return &sdkws.GroupPinnedMsgInfo{ + PinID: m.PinID, + GroupID: m.GroupID, + ConversationID: m.ConversationID, + Seq: m.Seq, + ServerMsgID: m.ServerMsgID, + ClientMsgID: m.ClientMsgID, + SendID: m.SendID, + RecvID: m.RecvID, + SenderPlatformID: m.SenderPlatformID, + SenderNickname: m.SenderNickname, + SenderFaceURL: m.SenderFaceURL, + SessionType: m.SessionType, + MsgFrom: m.MsgFrom, + ContentType: m.ContentType, + Content: m.Content, + AtUserIDList: append([]string(nil), m.AtUserIDList...), + Options: copyOptions(m.Options), + AttachedInfo: m.AttachedInfo, + Ex: m.Ex, + SendTime: m.SendTime, + CreateTime: m.CreateTime, + Status: m.Status, + PinUserID: m.PinUserID, + PinTime: m.PinTime, + } +} + +func pinnedListDB2PB(list []*model.GroupPinnedMessage) []*sdkws.GroupPinnedMsgInfo { + if len(list) == 0 { + return nil + } + result := make([]*sdkws.GroupPinnedMsgInfo, 0, len(list)) + for _, m := range list { + result = append(result, pinnedMsgDB2PB(m)) + } + return result +} diff --git a/pkg/common/storage/controller/group.go b/pkg/common/storage/controller/group.go index 539f7dccc..f8cbb0933 100644 --- a/pkg/common/storage/controller/group.go +++ b/pkg/common/storage/controller/group.go @@ -126,6 +126,11 @@ type GroupDatabase interface { FindJoinGroupID(ctx context.Context, userID string) ([]string, error) GetGroupApplicationUnhandledCount(ctx context.Context, groupIDs []string, ts int64) (int64, error) + + // 群置顶消息:保留最近 N 条 + PinGroupMessage(ctx context.Context, groupID string, msg *model.GroupPinnedMessage) ([]*model.GroupPinnedMessage, error) + UnpinGroupMessage(ctx context.Context, groupID string, pinID string, seq int64) ([]*model.GroupPinnedMessage, error) + GetGroupPinnedMessages(ctx context.Context, groupID string) ([]*model.GroupPinnedMessage, error) } func NewGroupDatabase( @@ -134,24 +139,39 @@ func NewGroupDatabase( groupDB database.Group, groupMemberDB database.GroupMember, groupRequestDB database.GroupRequest, + groupPinnedMsgDB database.GroupPinnedMsg, ctxTx tx.Tx, groupHash cache.GroupHash, ) GroupDatabase { return &groupDatabase{ - groupDB: groupDB, - groupMemberDB: groupMemberDB, - groupRequestDB: groupRequestDB, - ctxTx: ctxTx, - cache: redis2.NewGroupCacheRedis(rdb, localCache, groupDB, groupMemberDB, groupRequestDB, groupHash, redis2.GetRocksCacheOptions()), + groupDB: groupDB, + groupMemberDB: groupMemberDB, + groupRequestDB: groupRequestDB, + groupPinnedMsgDB: groupPinnedMsgDB, + ctxTx: ctxTx, + cache: redis2.NewGroupCacheRedis(rdb, localCache, groupDB, groupMemberDB, groupRequestDB, groupHash, redis2.GetRocksCacheOptions()), } } type groupDatabase struct { - groupDB database.Group - groupMemberDB database.GroupMember - groupRequestDB database.GroupRequest - ctxTx tx.Tx - cache cache.GroupCache + groupDB database.Group + groupMemberDB database.GroupMember + groupRequestDB database.GroupRequest + groupPinnedMsgDB database.GroupPinnedMsg + ctxTx tx.Tx + cache cache.GroupCache +} + +func (g *groupDatabase) PinGroupMessage(ctx context.Context, groupID string, msg *model.GroupPinnedMessage) ([]*model.GroupPinnedMessage, error) { + return g.groupPinnedMsgDB.Pin(ctx, groupID, msg) +} + +func (g *groupDatabase) UnpinGroupMessage(ctx context.Context, groupID string, pinID string, seq int64) ([]*model.GroupPinnedMessage, error) { + return g.groupPinnedMsgDB.Unpin(ctx, groupID, pinID, seq) +} + +func (g *groupDatabase) GetGroupPinnedMessages(ctx context.Context, groupID string) ([]*model.GroupPinnedMessage, error) { + return g.groupPinnedMsgDB.Get(ctx, groupID) } func (g *groupDatabase) FindJoinGroupID(ctx context.Context, userID string) ([]string, error) { diff --git a/pkg/common/storage/database/group_pinned_msg.go b/pkg/common/storage/database/group_pinned_msg.go new file mode 100644 index 000000000..82a268b6f --- /dev/null +++ b/pkg/common/storage/database/group_pinned_msg.go @@ -0,0 +1,22 @@ +// Copyright © 2026 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. + +package database + +import ( + "context" + + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" +) + +// GroupPinnedMsg 群置顶消息的存储抽象 +type GroupPinnedMsg interface { + // Pin 置顶一条消息:若 PinID 为空会自动生成;自动滚动保留最近 N 条 + Pin(ctx context.Context, groupID string, msg *model.GroupPinnedMessage) ([]*model.GroupPinnedMessage, error) + // Unpin 取消置顶;pinID 非空时按 pinID 精确删除,否则按 seq 删除 + Unpin(ctx context.Context, groupID string, pinID string, seq int64) ([]*model.GroupPinnedMessage, error) + // Get 获取群置顶消息列表(最新的在前) + Get(ctx context.Context, groupID string) ([]*model.GroupPinnedMessage, error) +} diff --git a/pkg/common/storage/database/mgo/group_pinned_msg.go b/pkg/common/storage/database/mgo/group_pinned_msg.go new file mode 100644 index 000000000..478bc7ebc --- /dev/null +++ b/pkg/common/storage/database/mgo/group_pinned_msg.go @@ -0,0 +1,115 @@ +// Copyright © 2026 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. + +package mgo + +import ( + "context" + "errors" + + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/tools/db/mongoutil" + "github.com/openimsdk/tools/errs" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +func NewGroupPinnedMsgMongo(db *mongo.Database) (database.GroupPinnedMsg, error) { + coll := db.Collection(database.GroupPinnedMsgName) + _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ + Keys: bson.D{{Key: "group_id", Value: 1}}, + Options: options.Index().SetUnique(true), + }) + if err != nil { + return nil, errs.Wrap(err) + } + return &groupPinnedMsgMgo{coll: coll}, nil +} + +type groupPinnedMsgMgo struct { + coll *mongo.Collection +} + +func (g *groupPinnedMsgMgo) get(ctx context.Context, groupID string) (*model.GroupPinnedMsg, error) { + doc, err := mongoutil.FindOne[*model.GroupPinnedMsg](ctx, g.coll, bson.M{"group_id": groupID}) + if err != nil { + if errs.ErrRecordNotFound.Is(err) || errors.Is(err, mongo.ErrNoDocuments) { + return &model.GroupPinnedMsg{GroupID: groupID}, nil + } + return nil, err + } + return doc, nil +} + +func (g *groupPinnedMsgMgo) Get(ctx context.Context, groupID string) ([]*model.GroupPinnedMessage, error) { + doc, err := g.get(ctx, groupID) + if err != nil { + return nil, err + } + return doc.PinnedMsgs, nil +} + +// Pin 置顶一条消息: +// - 若提供的 msg.PinID 为空,则自动生成 ObjectID().Hex() +// - 同 seq 的旧记录会被先移除避免重复 +// - 新记录 push 到数组首位,自动滚动保留最近 GroupPinnedMsgMaxKeep 条 +func (g *groupPinnedMsgMgo) Pin(ctx context.Context, groupID string, msg *model.GroupPinnedMessage) ([]*model.GroupPinnedMessage, error) { + if msg == nil { + return nil, errs.ErrArgs.WrapMsg("pin msg is nil") + } + if msg.PinID == "" { + msg.PinID = primitive.NewObjectID().Hex() + } + msg.GroupID = groupID + + if _, err := mongoutil.UpdateOneResult(ctx, g.coll, + bson.M{"group_id": groupID}, + bson.M{"$pull": bson.M{"pinned_msgs": bson.M{"seq": msg.Seq}}}, + ); err != nil { + return nil, err + } + filter := bson.M{"group_id": groupID} + update := bson.M{ + "$push": bson.M{ + "pinned_msgs": bson.M{ + "$each": bson.A{msg}, + "$position": 0, + "$slice": model.GroupPinnedMsgMaxKeep, + }, + }, + "$setOnInsert": bson.M{"group_id": groupID}, + } + opts := options.Update().SetUpsert(true) + if _, err := g.coll.UpdateOne(ctx, filter, update, opts); err != nil { + return nil, errs.Wrap(err) + } + return g.Get(ctx, groupID) +} + +// Unpin 取消置顶: +// - pinID 非空时按 pinID 精确删除(推荐) +// - 否则按 seq 删除 +// 返回更新后的置顶列表(可能为空数组) +func (g *groupPinnedMsgMgo) Unpin(ctx context.Context, groupID string, pinID string, seq int64) ([]*model.GroupPinnedMessage, error) { + if pinID == "" && seq <= 0 { + return nil, errs.ErrArgs.WrapMsg("either pinID or seq must be provided") + } + pull := bson.M{} + if pinID != "" { + pull["pin_id"] = pinID + } else { + pull["seq"] = seq + } + if _, err := mongoutil.UpdateOneResult(ctx, g.coll, + bson.M{"group_id": groupID}, + bson.M{"$pull": bson.M{"pinned_msgs": pull}}, + ); err != nil { + return nil, err + } + return g.Get(ctx, groupID) +} diff --git a/pkg/common/storage/database/name.go b/pkg/common/storage/database/name.go index 100e6d112..0fd9b3b2e 100644 --- a/pkg/common/storage/database/name.go +++ b/pkg/common/storage/database/name.go @@ -12,6 +12,7 @@ const ( GroupJoinVersionName = "group_join_version" ConversationVersionName = "conversation_version" GroupRequestName = "group_request" + GroupPinnedMsgName = "group_pinned_msg" LogName = "log" ObjectName = "s3" UserName = "user" diff --git a/pkg/common/storage/model/group_pinned_msg.go b/pkg/common/storage/model/group_pinned_msg.go new file mode 100644 index 000000000..a80780875 --- /dev/null +++ b/pkg/common/storage/model/group_pinned_msg.go @@ -0,0 +1,69 @@ +// Copyright © 2026 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. + +package model + +// GroupPinnedMsgMaxKeep 群置顶消息最多保留的条数(最新置顶的在最前) +const GroupPinnedMsgMaxKeep = 3 + +// GroupPinnedOfflinePush 离线推送信息快照 +type GroupPinnedOfflinePush struct { + Title string `bson:"title"` + Desc string `bson:"desc"` + Ex string `bson:"ex"` + IOSPushSound string `bson:"ios_push_sound"` + IOSBadgeCount bool `bson:"ios_badge_count"` + SignalInfo string `bson:"signal_info"` +} + +// GroupPinnedMessage 一条群置顶消息的完整内容快照 +// 置顶时把消息整体快照入库,避免后续消息删除/撤回影响已置顶展示 +type GroupPinnedMessage struct { + // PinID 全局唯一 id,用于精准取消置顶(生产由 mongo ObjectID().Hex() 生成) + PinID string `bson:"pin_id"` + + // 会话 / 群信息 + ConversationID string `bson:"conversation_id"` + GroupID string `bson:"group_id"` + + // 消息标识 + Seq int64 `bson:"seq"` + ServerMsgID string `bson:"server_msg_id"` + ClientMsgID string `bson:"client_msg_id"` + + // 发送方信息 + SendID string `bson:"send_id"` + RecvID string `bson:"recv_id"` + SenderPlatformID int32 `bson:"sender_platform_id"` + SenderNickname string `bson:"sender_nickname"` + SenderFaceURL string `bson:"sender_face_url"` + + // 消息内容快照 + SessionType int32 `bson:"session_type"` + MsgFrom int32 `bson:"msg_from"` + ContentType int32 `bson:"content_type"` + Content string `bson:"content"` + AtUserIDList []string `bson:"at_user_id_list"` + Options map[string]bool `bson:"options"` + AttachedInfo string `bson:"attached_info"` + Ex string `bson:"ex"` + + OfflinePush *GroupPinnedOfflinePush `bson:"offline_push"` + + // 时间 + SendTime int64 `bson:"send_time"` + CreateTime int64 `bson:"create_time"` + Status int32 `bson:"status"` + + // 操作人 & 时间 + PinUserID string `bson:"pin_user_id"` + PinTime int64 `bson:"pin_time"` +} + +// GroupPinnedMsg 一个群的置顶消息文档,按 group_id 唯一 +type GroupPinnedMsg struct { + GroupID string `bson:"group_id"` + PinnedMsgs []*GroupPinnedMessage `bson:"pinned_msgs"` +} diff --git a/pkg/rpcli/msg.go b/pkg/rpcli/msg.go index e4d1ece6e..d439d0c12 100644 --- a/pkg/rpcli/msg.go +++ b/pkg/rpcli/msg.go @@ -75,6 +75,29 @@ func (x *MsgClient) GetActiveConversation(ctx context.Context, conversationIDs [ return extractField(ctx, x.MsgClient.GetActiveConversation, req, (*msg.GetActiveConversationResp).GetConversations) } +// GetSingleMsgBySeq 根据会话 ID 与 seq 拉取一条消息(不存在时返回 nil) +func (x *MsgClient) GetSingleMsgBySeq(ctx context.Context, conversationID string, seq int64) (*sdkws.MsgData, error) { + if conversationID == "" || seq <= 0 { + return nil, nil + } + req := &msg.GetMsgByConversationIDsReq{ + ConversationIDs: []string{conversationID}, + MaxSeqs: map[string]int64{conversationID: seq}, + } + resp, err := x.MsgClient.GetMsgByConversationIDs(ctx, req) + if err != nil { + return nil, err + } + m := resp.GetMsgDatas() + if len(m) == 0 { + return nil, nil + } + if v, ok := m[conversationID]; ok && v != nil && v.Seq == seq { + return v, nil + } + return nil, nil +} + func (x *MsgClient) GetSeqMessage(ctx context.Context, userID string, conversations []*msg.ConversationSeqs) (map[string]*sdkws.PullMsgs, error) { if len(conversations) == 0 { return nil, nil From bede0f275fc7ee7ed8d747c961298bae590a2a0c Mon Sep 17 00:00:00 2001 From: hawklin2017 <32898629+hawklin2017@users.noreply.github.com> Date: Fri, 1 May 2026 02:17:52 +0800 Subject: [PATCH 3/4] first name --- internal/api/router.go | 32 +++++++++++++++++++++++++++++--- internal/rpc/relation/friend.go | 8 ++++++-- pkg/common/convert/friend.go | 11 +++++++++-- 3 files changed, 44 insertions(+), 7 deletions(-) diff --git a/internal/api/router.go b/internal/api/router.go index 026f327d7..52190ecce 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -12,6 +12,7 @@ import ( pbcrypto "github.com/openimsdk/protocol/crypto" "github.com/openimsdk/protocol/group" "github.com/openimsdk/protocol/msg" + pbredpacket "github.com/openimsdk/protocol/redpacket" "github.com/openimsdk/protocol/relation" "github.com/openimsdk/protocol/rtc" "github.com/openimsdk/protocol/third" @@ -117,6 +118,10 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co if err != nil { return nil, err } + redpacketConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.RedPacket) + if err != nil { + return nil, err + } gin.SetMode(gin.ReleaseMode) r := gin.New() if v, ok := binding.Validator.Engine().(*validator.Validate); ok { @@ -243,9 +248,6 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co groupRouterGroup.POST("/get_full_join_group_ids", g.GetFullJoinGroupIDs) groupRouterGroup.POST("/get_group_application_unhandled_count", g.GetGroupApplicationUnhandledCount) groupRouterGroup.POST("/get_common_groups_with_friend", g.GetCommonGroupsWithFriend) - groupRouterGroup.POST("/pin_group_message", g.PinGroupMessage) - groupRouterGroup.POST("/unpin_group_message", g.UnpinGroupMessage) - groupRouterGroup.POST("/get_group_pinned_messages", g.GetGroupPinnedMessages) } // certificate { @@ -366,6 +368,30 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co cryptoGroup.POST("/integrity_report", cr.IntegrityReport) } + // RedPacket + { + rp := NewRedPacketApi(pbredpacket.NewRedPacketClient(redpacketConn)) + redpacketGroup := r.Group("/redpacket") + redpacketGroup.POST("/create_order", rp.CreateOrder) + redpacketGroup.POST("/created_callback", rp.CreatedCallback) + redpacketGroup.POST("/detail", rp.GetDetail) + redpacketGroup.POST("/issue_claim_sign", rp.IssueClaimSign) + redpacketGroup.POST("/claim_result", rp.ClaimResult) + redpacketGroup.POST("/request_refund", rp.RequestRefund) + redpacketGroup.POST("/get_refund", rp.GetRefund) + redpacketGroup.POST("/wallet_bind/challenge", rp.IssueWalletBindChallenge) + redpacketGroup.POST("/wallet_bind/confirm", rp.ConfirmWalletBind) + redpacketGroup.POST("/wallet_bind/detail", rp.GetWalletBinding) + + adminGroup := redpacketGroup.Group("/admin") + adminGroup.POST("/set_signer", rp.AdminSetSigner) + adminGroup.POST("/set_token", rp.AdminSetToken) + adminGroup.POST("/set_expiry", rp.AdminSetExpiry) + adminGroup.POST("/set_allow_all_tokens", rp.AdminSetAllowAllTokens) + adminGroup.POST("/set_native_token_enabled", rp.AdminSetNativeTokenEnabled) + adminGroup.POST("/parse_tx_events", rp.AdminParseTxEvents) + } + { statisticsGroup := r.Group("/statistics") statisticsGroup.POST("/user/register", u.UserRegisterCount) diff --git a/internal/rpc/relation/friend.go b/internal/rpc/relation/friend.go index ef44566f2..d9ded4943 100644 --- a/internal/rpc/relation/friend.go +++ b/internal/rpc/relation/friend.go @@ -314,7 +314,11 @@ func (s *friendServer) GetFriendInfo(ctx context.Context, req *relation.GetFrien if err != nil { return nil, err } - return &relation.GetFriendInfoResp{FriendInfos: convert.FriendOnlyDB2PbOnly(friends)}, nil + users, err := s.userClient.GetUsersInfoMap(ctx, req.FriendUserIDs) + if err != nil { + return nil, err + } + return &relation.GetFriendInfoResp{FriendInfos: convert.FriendOnlyDB2PbOnly(friends, users)}, nil } func (s *friendServer) GetDesignatedFriends(ctx context.Context, req *relation.GetDesignatedFriendsReq) (resp *relation.GetDesignatedFriendsResp, err error) { @@ -693,7 +697,7 @@ func (s *friendServer) AddOnewayFriend(ctx context.Context, req *relation.ApplyT if in1 { return nil, servererrs.ErrRelationshipAlready.WrapMsg("already in friend list") } - if err := s.db.BecomeOnewayFriend(ctx, req.FromUserID, req.ToUserID, becomeFriendByOneway, req.Remark); err != nil { + if err := s.db.BecomeOnewayFriend(ctx, req.FromUserID, req.ToUserID, becomeFriendByOneway); err != nil { return nil, err } // Notify only A (FromUserID) so incremental friend sync is triggered diff --git a/pkg/common/convert/friend.go b/pkg/common/convert/friend.go index 994c6d7d5..e530edca2 100644 --- a/pkg/common/convert/friend.go +++ b/pkg/common/convert/friend.go @@ -78,6 +78,8 @@ func FriendsDB2Pb(ctx context.Context, friendsDB []*model.Friend, getUsers func( friendPb.FriendUser.Nickname = users[friend.FriendUserID].Nickname friendPb.FriendUser.FaceURL = users[friend.FriendUserID].FaceURL friendPb.FriendUser.Ex = users[friend.FriendUserID].Ex + friendPb.FriendUser.FirstName = users[friend.FriendUserID].FirstName + friendPb.FriendUser.LastName = users[friend.FriendUserID].LastName friendPb.CreateTime = friend.CreateTime.Unix() friendPb.IsPinned = friend.IsPinned friendPb.IsMute = friend.IsMuted @@ -88,9 +90,9 @@ func FriendsDB2Pb(ctx context.Context, friendsDB []*model.Friend, getUsers func( return friendsPb, nil } -func FriendOnlyDB2PbOnly(friendsDB []*model.Friend) []*relation.FriendInfoOnly { +func FriendOnlyDB2PbOnly(friendsDB []*model.Friend, users map[string]*sdkws.UserInfo) []*relation.FriendInfoOnly { return datautil.Slice(friendsDB, func(f *model.Friend) *relation.FriendInfoOnly { - return &relation.FriendInfoOnly{ + info := &relation.FriendInfoOnly{ OwnerUserID: f.OwnerUserID, FriendUserID: f.FriendUserID, Remark: f.Remark, @@ -103,6 +105,11 @@ func FriendOnlyDB2PbOnly(friendsDB []*model.Friend) []*relation.FriendInfoOnly { MuteDuration: f.MuteDuration, MuteEndTime: f.MuteEndTime, } + if u, ok := users[f.FriendUserID]; ok { + info.FirstName = u.FirstName + info.LastName = u.LastName + } + return info }) } From c93ce2f8e5b7ef32795b4de7bcafb8013f9459bd Mon Sep 17 00:00:00 2001 From: hawklin2017 <32898629+hawklin2017@users.noreply.github.com> Date: Fri, 1 May 2026 02:20:49 +0800 Subject: [PATCH 4/4] first name --- internal/rpc/relation/friend.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/internal/rpc/relation/friend.go b/internal/rpc/relation/friend.go index d9ded4943..d39bb2c4e 100644 --- a/internal/rpc/relation/friend.go +++ b/internal/rpc/relation/friend.go @@ -702,13 +702,13 @@ func (s *friendServer) AddOnewayFriend(ctx context.Context, req *relation.ApplyT } // Notify only A (FromUserID) so incremental friend sync is triggered // without notifying B (ToUserID). - tips := sdkws.FriendApplicationApprovedTips{ - FromToUserID: &sdkws.FromToUserID{ - FromUserID: req.FromUserID, - ToUserID: req.ToUserID, - }, - } - s.notificationSender.Notification(ctx, req.FromUserID, req.FromUserID, constant.FriendApplicationApprovedNotification, &tips) + //tips := sdkws.FriendApplicationApprovedTips{ + // FromToUserID: &sdkws.FromToUserID{ + // FromUserID: req.FromUserID, + // ToUserID: req.ToUserID, + // }, + //} + //s.notificationSender.Notification(ctx, req.FromUserID, req.FromUserID, constant.FriendApplicationApprovedNotification, &tips) return &relation.ApplyToAddFriendResp{}, nil }