Merge pull request #3372 from withchao/pre-release-v3.8.4

feat: add rpc interface permission check
pull/3378/head
chao 4 months ago committed by GitHub
commit 2ec3708a27
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -7,11 +7,11 @@ beforeSendSingleMsg:
# If not set, all contentType messages will through this filter.
deniedTypes: []
beforeUpdateUserInfoEx:
enable: false
enable: false
timeout: 5
failedContinue: true
afterUpdateUserInfoEx:
enable: false
enable: false
timeout: 5
afterSendSingleMsg:
enable: false
@ -181,3 +181,19 @@ afterImportFriends:
afterRemoveBlack:
enable: false
timeout: 5
beforeCreateSingleChatConversations:
enable: false
timeout: 5
failedContinue: false
afterCreateSingleChatConversations:
enable: false
timeout: 5
failedContinue: false
beforeCreateGroupChatConversations:
enable: false
timeout: 5
failedContinue: false
afterCreateGroupChatConversations:
enable: false
timeout: 5
failedContinue: false

@ -16,6 +16,7 @@ package api
import (
"github.com/gin-gonic/gin"
"github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/tools/a2r"
)
@ -48,9 +49,9 @@ func (o *ConversationApi) SetConversations(c *gin.Context) {
a2r.Call(c, conversation.ConversationClient.SetConversations, o.Client)
}
func (o *ConversationApi) GetConversationOfflinePushUserIDs(c *gin.Context) {
a2r.Call(c, conversation.ConversationClient.GetConversationOfflinePushUserIDs, o.Client)
}
//func (o *ConversationApi) GetConversationOfflinePushUserIDs(c *gin.Context) {
// a2r.Call(c, conversation.ConversationClient.GetConversationOfflinePushUserIDs, o.Client)
//}
func (o *ConversationApi) GetFullOwnerConversationIDs(c *gin.Context) {
a2r.Call(c, conversation.ConversationClient.GetFullOwnerConversationIDs, o.Client)
@ -71,3 +72,7 @@ func (o *ConversationApi) GetNotNotifyConversationIDs(c *gin.Context) {
func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) {
a2r.Call(c, conversation.ConversationClient.GetPinnedConversationIDs, o.Client)
}
func (o *ConversationApi) UpdateConversationsByUser(c *gin.Context) {
a2r.Call(c, conversation.ConversationClient.UpdateConversationsByUser, o.Client)
}

@ -9,6 +9,8 @@ import (
"github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding"
"github.com/go-playground/validator/v10"
clientv3 "go.etcd.io/etcd/client/v3"
"github.com/openimsdk/open-im-server/v3/internal/api/jssdk"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
@ -28,7 +30,6 @@ import (
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mw"
clientv3 "go.etcd.io/etcd/client/v3"
)
const (
@ -262,12 +263,13 @@ func newGinRouter(ctx context.Context, client discovery.Conn, cfg *Config) (*gin
conversationGroup.POST("/get_conversation", c.GetConversation)
conversationGroup.POST("/get_conversations", c.GetConversations)
conversationGroup.POST("/set_conversations", c.SetConversations)
conversationGroup.POST("/get_conversation_offline_push_user_ids", c.GetConversationOfflinePushUserIDs)
//conversationGroup.POST("/get_conversation_offline_push_user_ids", c.GetConversationOfflinePushUserIDs)
conversationGroup.POST("/get_full_conversation_ids", c.GetFullOwnerConversationIDs)
conversationGroup.POST("/get_incremental_conversations", c.GetIncrementalConversation)
conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation)
conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs)
conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs)
conversationGroup.POST("/update_conversations_by_user", c.UpdateConversationsByUser)
}
{

@ -0,0 +1,117 @@
package conversation
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/tools/utils/datautil"
)
func (c *conversationServer) webhookBeforeCreateSingleChatConversations(ctx context.Context, before *config.BeforeConfig, req *dbModel.Conversation) error {
return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
cbReq := &callbackstruct.CallbackBeforeCreateSingleChatConversationsReq{
CallbackCommand: callbackstruct.CallbackBeforeCreateSingleChatConversationsCommand,
OwnerUserID: req.OwnerUserID,
ConversationID: req.ConversationID,
ConversationType: req.ConversationType,
UserID: req.UserID,
RecvMsgOpt: req.RecvMsgOpt,
IsPinned: req.IsPinned,
IsPrivateChat: req.IsPrivateChat,
BurnDuration: req.BurnDuration,
GroupAtType: req.GroupAtType,
AttachedInfo: req.AttachedInfo,
Ex: req.Ex,
}
resp := &callbackstruct.CallbackBeforeCreateSingleChatConversationsResp{}
if err := c.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
return err
}
datautil.NotNilReplace(&req.RecvMsgOpt, resp.RecvMsgOpt)
datautil.NotNilReplace(&req.IsPinned, resp.IsPinned)
datautil.NotNilReplace(&req.IsPrivateChat, resp.IsPrivateChat)
datautil.NotNilReplace(&req.BurnDuration, resp.BurnDuration)
datautil.NotNilReplace(&req.GroupAtType, resp.GroupAtType)
datautil.NotNilReplace(&req.AttachedInfo, resp.AttachedInfo)
datautil.NotNilReplace(&req.Ex, resp.Ex)
return nil
})
}
func (c *conversationServer) webhookAfterCreateSingleChatConversations(ctx context.Context, after *config.AfterConfig, req *dbModel.Conversation) error {
cbReq := &callbackstruct.CallbackAfterCreateSingleChatConversationsReq{
CallbackCommand: callbackstruct.CallbackAfterCreateSingleChatConversationsCommand,
OwnerUserID: req.OwnerUserID,
ConversationID: req.ConversationID,
ConversationType: req.ConversationType,
UserID: req.UserID,
RecvMsgOpt: req.RecvMsgOpt,
IsPinned: req.IsPinned,
IsPrivateChat: req.IsPrivateChat,
BurnDuration: req.BurnDuration,
GroupAtType: req.GroupAtType,
AttachedInfo: req.AttachedInfo,
Ex: req.Ex,
}
c.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackAfterCreateSingleChatConversationsResp{}, after)
return nil
}
func (c *conversationServer) webhookBeforeCreateGroupChatConversations(ctx context.Context, before *config.BeforeConfig, req *dbModel.Conversation) error {
return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
cbReq := &callbackstruct.CallbackBeforeCreateGroupChatConversationsReq{
CallbackCommand: callbackstruct.CallbackBeforeCreateGroupChatConversationsCommand,
ConversationID: req.ConversationID,
ConversationType: req.ConversationType,
GroupID: req.GroupID,
RecvMsgOpt: req.RecvMsgOpt,
IsPinned: req.IsPinned,
IsPrivateChat: req.IsPrivateChat,
BurnDuration: req.BurnDuration,
GroupAtType: req.GroupAtType,
AttachedInfo: req.AttachedInfo,
Ex: req.Ex,
}
resp := &callbackstruct.CallbackBeforeCreateGroupChatConversationsResp{}
if err := c.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
return err
}
datautil.NotNilReplace(&req.RecvMsgOpt, resp.RecvMsgOpt)
datautil.NotNilReplace(&req.IsPinned, resp.IsPinned)
datautil.NotNilReplace(&req.IsPrivateChat, resp.IsPrivateChat)
datautil.NotNilReplace(&req.BurnDuration, resp.BurnDuration)
datautil.NotNilReplace(&req.GroupAtType, resp.GroupAtType)
datautil.NotNilReplace(&req.AttachedInfo, resp.AttachedInfo)
datautil.NotNilReplace(&req.Ex, resp.Ex)
return nil
})
}
func (c *conversationServer) webhookAfterCreateGroupChatConversations(ctx context.Context, after *config.AfterConfig, req *dbModel.Conversation) error {
cbReq := &callbackstruct.CallbackAfterCreateGroupChatConversationsReq{
CallbackCommand: callbackstruct.CallbackAfterCreateGroupChatConversationsCommand,
ConversationID: req.ConversationID,
ConversationType: req.ConversationType,
GroupID: req.GroupID,
RecvMsgOpt: req.RecvMsgOpt,
IsPinned: req.IsPinned,
IsPrivateChat: req.IsPrivateChat,
BurnDuration: req.BurnDuration,
GroupAtType: req.GroupAtType,
AttachedInfo: req.AttachedInfo,
Ex: req.Ex,
}
c.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackAfterCreateGroupChatConversationsResp{}, after)
return nil
}

@ -19,9 +19,12 @@ import (
"sort"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/dbbuild"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"google.golang.org/grpc"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
@ -30,6 +33,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/protocol/constant"
@ -39,7 +43,6 @@ import (
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
"google.golang.org/grpc"
)
type conversationServer struct {
@ -49,9 +52,10 @@ type conversationServer struct {
conversationNotificationSender *ConversationNotificationSender
config *Config
userClient *rpcli.UserClient
msgClient *rpcli.MsgClient
groupClient *rpcli.GroupClient
webhookClient *webhook.Client
userClient *rpcli.UserClient
msgClient *rpcli.MsgClient
groupClient *rpcli.GroupClient
}
type Config struct {
@ -60,6 +64,7 @@ type Config struct {
MongodbConfig config.Mongo
NotificationConfig config.Notification
Share config.Share
WebhooksConfig config.Webhooks
LocalCacheConfig config.LocalCache
Discovery config.Discovery
}
@ -90,20 +95,32 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr
if err != nil {
return err
}
msgClient := rpcli.NewMsgClient(msgConn)
cs := conversationServer{
config: config,
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
userClient: rpcli.NewUserClient(userConn),
groupClient: rpcli.NewGroupClient(groupConn),
msgClient: msgClient,
}
cs.conversationNotificationSender = NewConversationNotificationSender(&config.NotificationConfig, msgClient)
cs.conversationDatabase = controller.NewConversationDatabase(
conversationDB,
redis.NewConversationRedis(rdb, &config.LocalCacheConfig, conversationDB),
mgocli.GetTx())
localcache.InitLocalCache(&config.LocalCacheConfig)
pbconversation.RegisterConversationServer(server, &conversationServer{
conversationNotificationSender: NewConversationNotificationSender(&config.NotificationConfig, msgClient),
conversationDatabase: controller.NewConversationDatabase(conversationDB,
redis.NewConversationRedis(rdb, &config.LocalCacheConfig, conversationDB), mgocli.GetTx()),
userClient: rpcli.NewUserClient(userConn),
groupClient: rpcli.NewGroupClient(groupConn),
msgClient: msgClient,
})
pbconversation.RegisterConversationServer(server, &cs)
return nil
}
func (c *conversationServer) GetConversation(ctx context.Context, req *pbconversation.GetConversationReq) (*pbconversation.GetConversationResp, error) {
if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil {
return nil, err
}
conversations, err := c.conversationDatabase.FindConversations(ctx, req.OwnerUserID, []string{req.ConversationID})
if err != nil {
return nil, err
@ -117,7 +134,9 @@ func (c *conversationServer) GetConversation(ctx context.Context, req *pbconvers
}
func (c *conversationServer) GetSortedConversationList(ctx context.Context, req *pbconversation.GetSortedConversationListReq) (resp *pbconversation.GetSortedConversationListResp, err error) {
log.ZDebug(ctx, "GetSortedConversationList", "seqs", req, "userID", req.UserID)
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
return nil, err
}
var conversationIDs []string
if len(req.ConversationIDs) == 0 {
conversationIDs, err = c.conversationDatabase.GetConversationIDs(ctx, req.UserID)
@ -190,6 +209,9 @@ func (c *conversationServer) GetSortedConversationList(ctx context.Context, req
}
func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbconversation.GetAllConversationsReq) (*pbconversation.GetAllConversationsResp, error) {
if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil {
return nil, err
}
conversations, err := c.conversationDatabase.GetUserAllConversation(ctx, req.OwnerUserID)
if err != nil {
return nil, err
@ -200,6 +222,9 @@ func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbcon
}
func (c *conversationServer) GetConversations(ctx context.Context, req *pbconversation.GetConversationsReq) (*pbconversation.GetConversationsResp, error) {
if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil {
return nil, err
}
conversations, err := c.getConversations(ctx, req.OwnerUserID, req.ConversationIDs)
if err != nil {
return nil, err
@ -220,6 +245,9 @@ func (c *conversationServer) getConversations(ctx context.Context, ownerUserID s
}
func (c *conversationServer) SetConversation(ctx context.Context, req *pbconversation.SetConversationReq) (*pbconversation.SetConversationResp, error) {
if err := authverify.CheckAccess(ctx, req.GetConversation().GetUserID()); err != nil {
return nil, err
}
var conversation dbModel.Conversation
if err := datautil.CopyStructFields(&conversation, req.Conversation); err != nil {
return nil, err
@ -234,8 +262,10 @@ func (c *conversationServer) SetConversation(ctx context.Context, req *pbconvers
}
func (c *conversationServer) SetConversations(ctx context.Context, req *pbconversation.SetConversationsReq) (*pbconversation.SetConversationsResp, error) {
if req.Conversation == nil {
return nil, errs.ErrArgs.WrapMsg("conversation must not be nil")
for _, userID := range req.UserIDs {
if err := authverify.CheckAccess(ctx, userID); err != nil {
return nil, err
}
}
if req.Conversation.ConversationType == constant.WriteGroupChatType {
groupInfo, err := c.groupClient.GetGroupInfo(ctx, req.Conversation.GroupID)
@ -271,109 +301,29 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
conversation.UserID = req.Conversation.UserID
conversation.GroupID = req.Conversation.GroupID
m := make(map[string]any)
setConversationFieldsFunc := func() {
if req.Conversation.RecvMsgOpt != nil {
conversation.RecvMsgOpt = req.Conversation.RecvMsgOpt.Value
m["recv_msg_opt"] = req.Conversation.RecvMsgOpt.Value
}
if req.Conversation.AttachedInfo != nil {
conversation.AttachedInfo = req.Conversation.AttachedInfo.Value
m["attached_info"] = req.Conversation.AttachedInfo.Value
}
if req.Conversation.Ex != nil {
conversation.Ex = req.Conversation.Ex.Value
m["ex"] = req.Conversation.Ex.Value
}
if req.Conversation.IsPinned != nil {
conversation.IsPinned = req.Conversation.IsPinned.Value
m["is_pinned"] = req.Conversation.IsPinned.Value
}
if req.Conversation.GroupAtType != nil {
conversation.GroupAtType = req.Conversation.GroupAtType.Value
m["group_at_type"] = req.Conversation.GroupAtType.Value
}
if req.Conversation.MsgDestructTime != nil {
conversation.MsgDestructTime = req.Conversation.MsgDestructTime.Value
m["msg_destruct_time"] = req.Conversation.MsgDestructTime.Value
}
if req.Conversation.IsMsgDestruct != nil {
conversation.IsMsgDestruct = req.Conversation.IsMsgDestruct.Value
m["is_msg_destruct"] = req.Conversation.IsMsgDestruct.Value
}
if req.Conversation.BurnDuration != nil {
conversation.BurnDuration = req.Conversation.BurnDuration.Value
m["burn_duration"] = req.Conversation.BurnDuration.Value
}
m, conversation, err := UpdateConversationsMap(ctx, req)
if err != nil {
return nil, err
}
// set need set field in conversation
setConversationFieldsFunc()
for userID := range conversationMap {
unequal := len(m)
if req.Conversation.RecvMsgOpt != nil {
if req.Conversation.RecvMsgOpt.Value == conversationMap[userID].RecvMsgOpt {
unequal--
}
}
if req.Conversation.AttachedInfo != nil {
if req.Conversation.AttachedInfo.Value == conversationMap[userID].AttachedInfo {
unequal--
}
}
if req.Conversation.Ex != nil {
if req.Conversation.Ex.Value == conversationMap[userID].Ex {
unequal--
}
}
if req.Conversation.IsPinned != nil {
if req.Conversation.IsPinned.Value == conversationMap[userID].IsPinned {
unequal--
}
}
if req.Conversation.GroupAtType != nil {
if req.Conversation.GroupAtType.Value == conversationMap[userID].GroupAtType {
unequal--
}
}
if req.Conversation.MsgDestructTime != nil {
if req.Conversation.MsgDestructTime.Value == conversationMap[userID].MsgDestructTime {
unequal--
}
}
unequal := UserUpdateCheckMap(ctx, userID, req.Conversation, conversationMap[userID])
if req.Conversation.IsMsgDestruct != nil {
if req.Conversation.IsMsgDestruct.Value == conversationMap[userID].IsMsgDestruct {
unequal--
}
}
if req.Conversation.BurnDuration != nil {
if req.Conversation.BurnDuration.Value == conversationMap[userID].BurnDuration {
unequal--
}
}
if unequal > 0 {
if unequal {
needUpdateUsersList = append(needUpdateUsersList, userID)
}
}
if len(m) != 0 && len(needUpdateUsersList) != 0 {
if err := c.conversationDatabase.SetUsersConversationFieldTx(ctx, needUpdateUsersList, &conversation, m); err != nil {
return nil, err
}
for _, v := range needUpdateUsersList {
c.conversationNotificationSender.ConversationChangeNotification(ctx, v, []string{req.Conversation.ConversationID})
for _, userID := range needUpdateUsersList {
c.conversationNotificationSender.ConversationChangeNotification(ctx, userID, []string{req.Conversation.ConversationID})
}
}
if req.Conversation.IsPrivateChat != nil && req.Conversation.ConversationType != constant.ReadGroupChatType {
var conversations []*dbModel.Conversation
for _, ownerUserID := range req.UserIDs {
@ -396,58 +346,94 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
return &pbconversation.SetConversationsResp{}, nil
}
// Get user IDs with "Do Not Disturb" enabled in super large groups.
func (c *conversationServer) GetRecvMsgNotNotifyUserIDs(ctx context.Context, req *pbconversation.GetRecvMsgNotNotifyUserIDsReq) (*pbconversation.GetRecvMsgNotNotifyUserIDsResp, error) {
return nil, errs.New("deprecated")
func (c *conversationServer) UpdateConversationsByUser(ctx context.Context, req *pbconversation.UpdateConversationsByUserReq) (*pbconversation.UpdateConversationsByUserResp, error) {
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
return nil, err
}
m := make(map[string]any)
if req.Ex != nil {
m["ex"] = req.Ex.Value
}
if len(m) > 0 {
if err := c.conversationDatabase.UpdateUserConversations(ctx, req.UserID, m); err != nil {
return nil, err
}
}
return &pbconversation.UpdateConversationsByUserResp{}, nil
}
// create conversation without notification for msg redis transfer.
func (c *conversationServer) CreateSingleChatConversations(ctx context.Context,
req *pbconversation.CreateSingleChatConversationsReq,
) (*pbconversation.CreateSingleChatConversationsResp, error) {
func (c *conversationServer) CreateSingleChatConversations(ctx context.Context, req *pbconversation.CreateSingleChatConversationsReq) (*pbconversation.CreateSingleChatConversationsResp, error) {
var conversation dbModel.Conversation
switch req.ConversationType {
case constant.SingleChatType:
var conversation dbModel.Conversation
// sendUser create
conversation.ConversationID = req.ConversationID
conversation.ConversationType = req.ConversationType
conversation.OwnerUserID = req.SendID
conversation.UserID = req.RecvID
if err := c.webhookBeforeCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.BeforeCreateSingleChatConversations, &conversation); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
err := c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation})
if err != nil {
log.ZWarn(ctx, "create conversation failed", err, "conversation", conversation)
}
c.webhookAfterCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.AfterCreateSingleChatConversations, &conversation)
// recvUser create
conversation2 := conversation
conversation2.OwnerUserID = req.RecvID
conversation2.UserID = req.SendID
if err := c.webhookBeforeCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.BeforeCreateSingleChatConversations, &conversation); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
err = c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation2})
if err != nil {
log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation)
}
c.webhookAfterCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.AfterCreateSingleChatConversations, &conversation2)
case constant.NotificationChatType:
var conversation dbModel.Conversation
conversation.ConversationID = req.ConversationID
conversation.ConversationType = req.ConversationType
conversation.OwnerUserID = req.RecvID
conversation.UserID = req.SendID
if err := c.webhookBeforeCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.BeforeCreateSingleChatConversations, &conversation); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
err := c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation})
if err != nil {
log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation)
}
c.webhookAfterCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.AfterCreateSingleChatConversations, &conversation)
}
return &pbconversation.CreateSingleChatConversationsResp{}, nil
}
func (c *conversationServer) CreateGroupChatConversations(ctx context.Context, req *pbconversation.CreateGroupChatConversationsReq) (*pbconversation.CreateGroupChatConversationsResp, error) {
err := c.conversationDatabase.CreateGroupChatConversation(ctx, req.GroupID, req.UserIDs)
if err != nil {
var conversation dbModel.Conversation
conversation.ConversationID = msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupID)
conversation.GroupID = req.GroupID
conversation.ConversationType = constant.ReadGroupChatType
if err := c.webhookBeforeCreateGroupChatConversations(ctx, &c.config.WebhooksConfig.BeforeCreateGroupChatConversations, &conversation); err != nil {
return nil, err
}
conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupID)
if err := c.msgClient.SetUserConversationMaxSeq(ctx, conversationID, req.UserIDs, 0); err != nil {
err := c.conversationDatabase.CreateGroupChatConversation(ctx, req.GroupID, req.UserIDs, &conversation)
if err != nil {
return nil, err
}
c.webhookAfterCreateGroupChatConversations(ctx, &c.config.WebhooksConfig.AfterCreateGroupChatConversations, &conversation)
return &pbconversation.CreateGroupChatConversationsResp{}, nil
}
@ -480,6 +466,9 @@ func (c *conversationServer) SetConversationMinSeq(ctx context.Context, req *pbc
}
func (c *conversationServer) GetConversationIDs(ctx context.Context, req *pbconversation.GetConversationIDsReq) (*pbconversation.GetConversationIDsResp, error) {
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
return nil, err
}
conversationIDs, err := c.conversationDatabase.GetConversationIDs(ctx, req.UserID)
if err != nil {
return nil, err
@ -488,6 +477,9 @@ func (c *conversationServer) GetConversationIDs(ctx context.Context, req *pbconv
}
func (c *conversationServer) GetUserConversationIDsHash(ctx context.Context, req *pbconversation.GetUserConversationIDsHashReq) (*pbconversation.GetUserConversationIDsHashResp, error) {
if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil {
return nil, err
}
hash, err := c.conversationDatabase.GetUserConversationIDsHash(ctx, req.OwnerUserID)
if err != nil {
return nil, err
@ -495,10 +487,7 @@ func (c *conversationServer) GetUserConversationIDsHash(ctx context.Context, req
return &pbconversation.GetUserConversationIDsHashResp{Hash: hash}, nil
}
func (c *conversationServer) GetConversationsByConversationID(
ctx context.Context,
req *pbconversation.GetConversationsByConversationIDReq,
) (*pbconversation.GetConversationsByConversationIDResp, error) {
func (c *conversationServer) GetConversationsByConversationID(ctx context.Context, req *pbconversation.GetConversationsByConversationIDReq) (*pbconversation.GetConversationsByConversationIDResp, error) {
conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, req.ConversationIDs)
if err != nil {
return nil, err
@ -552,10 +541,7 @@ func (c *conversationServer) conversationSort(conversations map[int64]string, re
resp.ConversationElems = append(resp.ConversationElems, cons...)
}
func (c *conversationServer) getConversationInfo(
ctx context.Context,
chatLogs map[string]*sdkws.MsgData,
userID string) (map[string]*pbconversation.ConversationElem, error) {
func (c *conversationServer) getConversationInfo(ctx context.Context, chatLogs map[string]*sdkws.MsgData, userID string) (map[string]*pbconversation.ConversationElem, error) {
var (
sendIDs []string
groupIDs []string
@ -641,6 +627,11 @@ func (c *conversationServer) GetConversationNotReceiveMessageUserIDs(ctx context
}
func (c *conversationServer) UpdateConversation(ctx context.Context, req *pbconversation.UpdateConversationReq) (*pbconversation.UpdateConversationResp, error) {
for _, userID := range req.UserIDs {
if err := authverify.CheckAccess(ctx, userID); err != nil {
return nil, err
}
}
m := make(map[string]any)
if req.RecvMsgOpt != nil {
m["recv_msg_opt"] = req.RecvMsgOpt.Value
@ -687,6 +678,9 @@ func (c *conversationServer) UpdateConversation(ctx context.Context, req *pbconv
}
func (c *conversationServer) GetOwnerConversation(ctx context.Context, req *pbconversation.GetOwnerConversationReq) (*pbconversation.GetOwnerConversationResp, error) {
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
return nil, err
}
total, conversations, err := c.conversationDatabase.GetOwnerConversation(ctx, req.UserID, req.Pagination)
if err != nil {
return nil, err
@ -748,6 +742,9 @@ func (c *conversationServer) GetConversationsNeedClearMsg(ctx context.Context, _
}
func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, req *pbconversation.GetNotNotifyConversationIDsReq) (*pbconversation.GetNotNotifyConversationIDsResp, error) {
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
return nil, err
}
conversationIDs, err := c.conversationDatabase.GetNotNotifyConversationIDs(ctx, req.UserID)
if err != nil {
return nil, err
@ -756,6 +753,9 @@ func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, re
}
func (c *conversationServer) GetPinnedConversationIDs(ctx context.Context, req *pbconversation.GetPinnedConversationIDsReq) (*pbconversation.GetPinnedConversationIDsResp, error) {
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
return nil, err
}
conversationIDs, err := c.conversationDatabase.GetPinnedConversationIDs(ctx, req.UserID)
if err != nil {
return nil, err

@ -0,0 +1,85 @@
package conversation
import (
"context"
dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/protocol/conversation"
)
func UpdateConversationsMap(ctx context.Context, req *conversation.SetConversationsReq) (m map[string]any, conversation dbModel.Conversation, err error) {
m = make(map[string]any)
conversation.ConversationID = req.Conversation.ConversationID
conversation.ConversationType = req.Conversation.ConversationType
conversation.UserID = req.Conversation.UserID
conversation.GroupID = req.Conversation.GroupID
if req.Conversation.RecvMsgOpt != nil {
conversation.RecvMsgOpt = req.Conversation.RecvMsgOpt.Value
m["recv_msg_opt"] = req.Conversation.RecvMsgOpt.Value
}
if req.Conversation.AttachedInfo != nil {
conversation.AttachedInfo = req.Conversation.AttachedInfo.Value
m["attached_info"] = req.Conversation.AttachedInfo.Value
}
if req.Conversation.Ex != nil {
conversation.Ex = req.Conversation.Ex.Value
m["ex"] = req.Conversation.Ex.Value
}
if req.Conversation.IsPinned != nil {
conversation.IsPinned = req.Conversation.IsPinned.Value
m["is_pinned"] = req.Conversation.IsPinned.Value
}
if req.Conversation.GroupAtType != nil {
conversation.GroupAtType = req.Conversation.GroupAtType.Value
m["group_at_type"] = req.Conversation.GroupAtType.Value
}
if req.Conversation.MsgDestructTime != nil {
conversation.MsgDestructTime = req.Conversation.MsgDestructTime.Value
m["msg_destruct_time"] = req.Conversation.MsgDestructTime.Value
}
if req.Conversation.IsMsgDestruct != nil {
conversation.IsMsgDestruct = req.Conversation.IsMsgDestruct.Value
m["is_msg_destruct"] = req.Conversation.IsMsgDestruct.Value
}
if req.Conversation.BurnDuration != nil {
conversation.BurnDuration = req.Conversation.BurnDuration.Value
m["burn_duration"] = req.Conversation.BurnDuration.Value
}
return m, conversation, nil
}
func UserUpdateCheckMap(ctx context.Context, userID string, req *conversation.ConversationReq, conversation *dbModel.Conversation) (unequal bool) {
unequal = false
if req.RecvMsgOpt != nil && conversation.RecvMsgOpt != req.RecvMsgOpt.Value {
unequal = true
}
if req.AttachedInfo != nil && conversation.AttachedInfo != req.AttachedInfo.Value {
unequal = true
}
if req.Ex != nil && conversation.Ex != req.Ex.Value {
unequal = true
}
if req.IsPinned != nil && conversation.IsPinned != req.IsPinned.Value {
unequal = true
}
if req.GroupAtType != nil && conversation.GroupAtType != req.GroupAtType.Value {
unequal = true
}
if req.MsgDestructTime != nil && conversation.MsgDestructTime != req.MsgDestructTime.Value {
unequal = true
}
if req.IsMsgDestruct != nil && conversation.IsMsgDestruct != req.IsMsgDestruct.Value {
unequal = true
}
if req.BurnDuration != nil && conversation.BurnDuration != req.BurnDuration.Value {
unequal = true
}
return unequal
}

@ -17,11 +17,11 @@ package conversation
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/notification"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/notification"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/sdkws"
)

@ -35,6 +35,9 @@ func (c *conversationServer) GetFullOwnerConversationIDs(ctx context.Context, re
}
func (c *conversationServer) GetIncrementalConversation(ctx context.Context, req *conversation.GetIncrementalConversationReq) (*conversation.GetIncrementalConversationResp, error) {
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
return nil, err
}
opt := incrversion.Option[*conversation.Conversation, conversation.GetIncrementalConversationResp]{
Ctx: ctx,
VersionKey: req.UserID,

@ -17,13 +17,14 @@ package group
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
pbgroup "github.com/openimsdk/protocol/group"
)
// GetGroupInfoCache get group info from cache.
func (s *groupServer) GetGroupInfoCache(ctx context.Context, req *pbgroup.GetGroupInfoCacheReq) (*pbgroup.GetGroupInfoCacheResp, error) {
group, err := s.db.TakeGroup(ctx, req.GroupID)
func (g *groupServer) GetGroupInfoCache(ctx context.Context, req *pbgroup.GetGroupInfoCacheReq) (*pbgroup.GetGroupInfoCacheResp, error) {
group, err := g.db.TakeGroup(ctx, req.GroupID)
if err != nil {
return nil, err
}
@ -32,8 +33,11 @@ func (s *groupServer) GetGroupInfoCache(ctx context.Context, req *pbgroup.GetGro
}, nil
}
func (s *groupServer) GetGroupMemberCache(ctx context.Context, req *pbgroup.GetGroupMemberCacheReq) (*pbgroup.GetGroupMemberCacheResp, error) {
members, err := s.db.TakeGroupMember(ctx, req.GroupID, req.GroupMemberID)
func (g *groupServer) GetGroupMemberCache(ctx context.Context, req *pbgroup.GetGroupMemberCacheReq) (*pbgroup.GetGroupMemberCacheResp, error) {
if err := authverify.CheckAccess(ctx, req.GroupMemberID); err != nil {
return nil, err
}
members, err := g.db.TakeGroupMember(ctx, req.GroupID, req.GroupMemberID)
if err != nil {
return nil, err
}

@ -476,6 +476,19 @@ func (g *groupServer) GetGroupAllMember(ctx context.Context, req *pbgroup.GetGro
if err != nil {
return nil, err
}
if !authverify.IsAdmin(ctx) {
var inGroup bool
opUserID := mcontext.GetOpUserID(ctx)
for _, member := range members {
if member.UserID == opUserID {
inGroup = true
break
}
}
if !inGroup {
return nil, errs.ErrNoPermission.WrapMsg("opuser not in group")
}
}
if err := g.PopulateGroupMember(ctx, members...); err != nil {
return nil, err
}
@ -486,11 +499,24 @@ func (g *groupServer) GetGroupAllMember(ctx context.Context, req *pbgroup.GetGro
return &resp, nil
}
func (g *groupServer) checkAdminOrInGroup(ctx context.Context, groupID string) error {
if authverify.IsAdmin(ctx) {
return nil
}
opUserID := mcontext.GetOpUserID(ctx)
members, err := g.db.FindGroupMembers(ctx, groupID, []string{opUserID})
if err != nil {
return err
}
if len(members) == 0 {
return errs.ErrNoPermission.WrapMsg("op user not in group")
}
return nil
}
func (g *groupServer) GetGroupMemberList(ctx context.Context, req *pbgroup.GetGroupMemberListReq) (*pbgroup.GetGroupMemberListResp, error) {
if opUserID := mcontext.GetOpUserID(ctx); !datautil.Contain(opUserID, g.config.Share.IMAdminUserID...) {
if _, err := g.db.TakeGroupMember(ctx, req.GroupID, opUserID); err != nil {
return nil, err
}
if err := g.checkAdminOrInGroup(ctx, req.GroupID); err != nil {
return nil, err
}
var (
total int64
@ -631,6 +657,9 @@ func (g *groupServer) GetGroupMembersInfo(ctx context.Context, req *pbgroup.GetG
if req.GroupID == "" {
return nil, errs.ErrArgs.WrapMsg("groupID empty")
}
if err := g.checkAdminOrInGroup(ctx, req.GroupID); err != nil {
return nil, err
}
members, err := g.getGroupMembersInfo(ctx, req.GroupID, req.UserIDs)
if err != nil {
return nil, err
@ -658,6 +687,9 @@ func (g *groupServer) getGroupMembersInfo(ctx context.Context, groupID string, u
// GetGroupApplicationList handles functions that get a list of group requests.
func (g *groupServer) GetGroupApplicationList(ctx context.Context, req *pbgroup.GetGroupApplicationListReq) (*pbgroup.GetGroupApplicationListResp, error) {
if err := authverify.CheckAccess(ctx, req.FromUserID); err != nil {
return nil, err
}
groupIDs, err := g.db.FindUserManagedGroupID(ctx, req.FromUserID)
if err != nil {
return nil, err
@ -1652,6 +1684,11 @@ func (g *groupServer) GetGroupAbstractInfo(ctx context.Context, req *pbgroup.Get
if datautil.Duplicate(req.GroupIDs) {
return nil, errs.ErrArgs.WrapMsg("groupIDs duplicate")
}
for _, groupID := range req.GroupIDs {
if err := g.checkAdminOrInGroup(ctx, groupID); err != nil {
return nil, err
}
}
groups, err := g.db.FindGroup(ctx, req.GroupIDs)
if err != nil {
return nil, err
@ -1699,6 +1736,9 @@ func (g *groupServer) GetGroupMemberUserIDs(ctx context.Context, req *pbgroup.Ge
if err != nil {
return nil, err
}
if err := authverify.CheckAccessIn(ctx, userIDs...); err != nil {
return nil, err
}
return &pbgroup.GetGroupMemberUserIDsResp{
UserIDs: userIDs,
}, nil

@ -18,24 +18,28 @@ import (
"context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/protocol/group"
"github.com/openimsdk/tools/errs"
)
func (s *groupServer) GroupCreateCount(ctx context.Context, req *group.GroupCreateCountReq) (*group.GroupCreateCountResp, error) {
func (g *groupServer) GroupCreateCount(ctx context.Context, req *group.GroupCreateCountReq) (*group.GroupCreateCountResp, error) {
if req.Start > req.End {
return nil, errs.ErrArgs.WrapMsg("start > end: %d > %d", req.Start, req.End)
}
total, err := s.db.CountTotal(ctx, nil)
if err := authverify.CheckAdmin(ctx); err != nil {
return nil, err
}
total, err := g.db.CountTotal(ctx, nil)
if err != nil {
return nil, err
}
start := time.UnixMilli(req.Start)
before, err := s.db.CountTotal(ctx, &start)
before, err := g.db.CountTotal(ctx, &start)
if err != nil {
return nil, err
}
count, err := s.db.CountRangeEverydayTotal(ctx, start, time.UnixMilli(req.End))
count, err := g.db.CountRangeEverydayTotal(ctx, start, time.UnixMilli(req.End))
if err != nil {
return nil, err
}

@ -11,9 +11,6 @@ import (
"github.com/openimsdk/protocol/constant"
pbgroup "github.com/openimsdk/protocol/group"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/datautil"
)
const versionSyncLimit = 500
@ -23,10 +20,8 @@ func (g *groupServer) GetFullGroupMemberUserIDs(ctx context.Context, req *pbgrou
if err != nil {
return nil, err
}
if opUserID := mcontext.GetOpUserID(ctx); !datautil.Contain(opUserID, g.config.Share.IMAdminUserID...) {
if !datautil.Contain(opUserID, userIDs...) {
return nil, errs.ErrNoPermission.WrapMsg("user not in group")
}
if err := authverify.CheckAccessIn(ctx, userIDs...); err != nil {
return nil, err
}
vl, err := g.db.FindMaxGroupMemberVersionCache(ctx, req.GroupID)
if err != nil {
@ -69,6 +64,9 @@ func (g *groupServer) GetFullJoinGroupIDs(ctx context.Context, req *pbgroup.GetF
}
func (g *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgroup.GetIncrementalGroupMemberReq) (*pbgroup.GetIncrementalGroupMemberResp, error) {
if err := g.checkAdminOrInGroup(ctx, req.GroupID); err != nil {
return nil, err
}
group, err := g.db.TakeGroup(ctx, req.GroupID)
if err != nil {
return nil, err
@ -76,9 +74,6 @@ func (g *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou
if group.Status == constant.GroupStatusDismissed {
return nil, servererrs.ErrDismissedAlready.Wrap()
}
if _, err := g.db.TakeGroupMember(ctx, req.GroupID, mcontext.GetOpUserID(ctx)); err != nil {
return nil, err
}
var (
hasGroupUpdate bool
sortVersion uint64

@ -18,6 +18,7 @@ import (
"context"
"errors"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/msg"
@ -29,6 +30,9 @@ import (
)
func (m *msgServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *msg.GetConversationsHasReadAndMaxSeqReq) (*msg.GetConversationsHasReadAndMaxSeqResp, error) {
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
return nil, err
}
var conversationIDs []string
if len(req.ConversationIDs) == 0 {
var err error
@ -82,6 +86,9 @@ func (m *msgServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *m
}
func (m *msgServer) SetConversationHasReadSeq(ctx context.Context, req *msg.SetConversationHasReadSeqReq) (*msg.SetConversationHasReadSeqResp, error) {
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
return nil, err
}
maxSeq, err := m.MsgDatabase.GetMaxSeq(ctx, req.ConversationID)
if err != nil {
return nil, err
@ -97,8 +104,8 @@ func (m *msgServer) SetConversationHasReadSeq(ctx context.Context, req *msg.SetC
}
func (m *msgServer) MarkMsgsAsRead(ctx context.Context, req *msg.MarkMsgsAsReadReq) (*msg.MarkMsgsAsReadResp, error) {
if len(req.Seqs) < 1 {
return nil, errs.ErrArgs.WrapMsg("seqs must not be empty")
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
return nil, err
}
maxSeq, err := m.MsgDatabase.GetMaxSeq(ctx, req.ConversationID)
if err != nil {
@ -139,6 +146,9 @@ func (m *msgServer) MarkMsgsAsRead(ctx context.Context, req *msg.MarkMsgsAsReadR
}
func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkConversationAsReadReq) (*msg.MarkConversationAsReadResp, error) {
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
return nil, err
}
conversation, err := m.ConversationLocalCache.GetConversation(ctx, req.UserID, req.ConversationID)
if err != nil {
return nil, err
@ -216,5 +226,4 @@ func (m *msgServer) sendMarkAsReadNotification(ctx context.Context, conversation
HasReadSeq: hasReadSeq,
}
m.notificationSender.NotificationWithSessionType(ctx, sendID, recvID, constant.HasReadReceipt, sessionType, tips)
}

@ -94,6 +94,9 @@ func (m *msgServer) DeleteMsgs(ctx context.Context, req *msg.DeleteMsgsReq) (*ms
}
func (m *msgServer) DeleteMsgPhysicalBySeq(ctx context.Context, req *msg.DeleteMsgPhysicalBySeqReq) (*msg.DeleteMsgPhysicalBySeqResp, error) {
if err := authverify.CheckAdmin(ctx); err != nil {
return nil, err
}
err := m.MsgDatabase.DeleteMsgsPhysicalBySeqs(ctx, req.ConversationID, req.Seqs)
if err != nil {
return nil, err

@ -17,11 +17,14 @@ package msg
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"google.golang.org/protobuf/proto"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil"
"github.com/openimsdk/protocol/constant"
pbconversation "github.com/openimsdk/protocol/conversation"
pbconv "github.com/openimsdk/protocol/conversation"
pbmsg "github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/protocol/wrapperspb"
@ -29,13 +32,15 @@ import (
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/datautil"
"google.golang.org/protobuf/proto"
)
func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg.SendMsgResp, error) {
if req.MsgData == nil {
return nil, errs.ErrArgs.WrapMsg("msgData is nil")
}
if err := authverify.CheckAccess(ctx, req.MsgData.SendID); err != nil {
return nil, err
}
before := new(*sdkws.MsgData)
resp, err := m.sendMsg(ctx, req, before)
if err != nil {
@ -104,7 +109,7 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa
var atUserID []string
conversation := &pbconversation.ConversationReq{
conversation := &pbconv.ConversationReq{
ConversationID: msgprocessor.GetConversationIDByMsg(msg),
ConversationType: msg.SessionType,
GroupID: msg.GroupID,
@ -171,13 +176,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq
isSend := true
isNotification := msgprocessor.IsNotificationByMsg(req.MsgData)
if !isNotification {
isSend, err = m.modifyMessageByUserMessageReceiveOpt(
ctx,
req.MsgData.RecvID,
conversationutil.GenConversationIDForSingle(req.MsgData.SendID, req.MsgData.RecvID),
constant.SingleChatType,
req,
)
isSend, err = m.modifyMessageByUserMessageReceiveOpt(authverify.WithTempAdmin(ctx), req.MsgData.RecvID, conversationutil.GenConversationIDForSingle(req.MsgData.SendID, req.MsgData.RecvID), constant.SingleChatType, req)
if err != nil {
return nil, err
}

@ -17,9 +17,10 @@ package msg
import (
"context"
"errors"
"sort"
pbmsg "github.com/openimsdk/protocol/msg"
"github.com/redis/go-redis/v9"
"sort"
)
func (m *msgServer) GetConversationMaxSeq(ctx context.Context, req *pbmsg.GetConversationMaxSeqReq) (*pbmsg.GetConversationMaxSeqResp, error) {

@ -16,15 +16,20 @@ package msg
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/utils/datautil"
)
func (m *msgServer) GetActiveUser(ctx context.Context, req *msg.GetActiveUserReq) (*msg.GetActiveUserResp, error) {
if err := authverify.CheckAdmin(ctx); err != nil {
return nil, err
}
msgCount, userCount, users, dateCount, err := m.MsgDatabase.RangeUserSendCount(ctx, time.UnixMilli(req.Start), time.UnixMilli(req.End), req.Group, req.Ase, req.Pagination.PageNumber, req.Pagination.ShowNumber)
if err != nil {
return nil, err
@ -60,6 +65,9 @@ func (m *msgServer) GetActiveUser(ctx context.Context, req *msg.GetActiveUserReq
}
func (m *msgServer) GetActiveGroup(ctx context.Context, req *msg.GetActiveGroupReq) (*msg.GetActiveGroupResp, error) {
if err := authverify.CheckAdmin(ctx); err != nil {
return nil, err
}
msgCount, groupCount, groups, dateCount, err := m.MsgDatabase.RangeGroupSendCount(ctx, time.UnixMilli(req.Start), time.UnixMilli(req.End), req.Ase, req.Pagination.PageNumber, req.Pagination.ShowNumber)
if err != nil {
return nil, err

@ -29,6 +29,9 @@ import (
)
func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessageBySeqsReq) (*sdkws.PullMessageBySeqsResp, error) {
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
return nil, err
}
resp := &sdkws.PullMessageBySeqsResp{}
resp.Msgs = make(map[string]*sdkws.PullMsgs)
resp.NotificationMsgs = make(map[string]*sdkws.PullMsgs)

@ -47,6 +47,9 @@ func (s *friendServer) GetPaginationBlacks(ctx context.Context, req *relation.Ge
}
func (s *friendServer) IsBlack(ctx context.Context, req *relation.IsBlackReq) (*relation.IsBlackResp, error) {
if err := authverify.CheckAccessIn(ctx, req.UserID1, req.UserID2); err != nil {
return nil, err
}
in1, in2, err := s.blackDatabase.CheckIn(ctx, req.UserID1, req.UserID2)
if err != nil {
return nil, err

@ -280,6 +280,9 @@ func (s *friendServer) SetFriendRemark(ctx context.Context, req *relation.SetFri
}
func (s *friendServer) GetFriendInfo(ctx context.Context, req *relation.GetFriendInfoReq) (*relation.GetFriendInfoResp, error) {
if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil {
return nil, err
}
friends, err := s.db.FindFriendsWithError(ctx, req.OwnerUserID, req.FriendUserIDs)
if err != nil {
return nil, err
@ -288,6 +291,9 @@ func (s *friendServer) GetFriendInfo(ctx context.Context, req *relation.GetFrien
}
func (s *friendServer) GetDesignatedFriends(ctx context.Context, req *relation.GetDesignatedFriendsReq) (resp *relation.GetDesignatedFriendsResp, err error) {
if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil {
return nil, err
}
resp = &relation.GetDesignatedFriendsResp{}
if datautil.Duplicate(req.FriendUserIDs) {
return nil, errs.ErrArgs.WrapMsg("friend userID repeated")
@ -313,9 +319,10 @@ func (s *friendServer) getFriend(ctx context.Context, ownerUserID string, friend
}
// Get the list of friend requests sent out proactively.
func (s *friendServer) GetDesignatedFriendsApply(ctx context.Context,
req *relation.GetDesignatedFriendsApplyReq,
) (resp *relation.GetDesignatedFriendsApplyResp, err error) {
func (s *friendServer) GetDesignatedFriendsApply(ctx context.Context, req *relation.GetDesignatedFriendsApplyReq) (resp *relation.GetDesignatedFriendsApplyResp, err error) {
if err := authverify.CheckAccessIn(ctx, req.FromUserID, req.ToUserID); err != nil {
return nil, err
}
friendRequests, err := s.db.FindBothFriendRequests(ctx, req.FromUserID, req.ToUserID)
if err != nil {
return nil, err
@ -374,6 +381,9 @@ func (s *friendServer) GetPaginationFriendsApplyFrom(ctx context.Context, req *r
// ok.
func (s *friendServer) IsFriend(ctx context.Context, req *relation.IsFriendReq) (resp *relation.IsFriendResp, err error) {
if err := authverify.CheckAccessIn(ctx, req.UserID1, req.UserID2); err != nil {
return nil, err
}
resp = &relation.IsFriendResp{}
resp.InUser1Friends, resp.InUser2Friends, err = s.db.CheckIn(ctx, req.UserID1, req.UserID2)
if err != nil {
@ -426,6 +436,9 @@ func (s *friendServer) GetSpecifiedFriendsInfo(ctx context.Context, req *relatio
return nil, errs.ErrArgs.WrapMsg("userIDList repeated")
}
if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil {
return nil, err
}
userMap, err := s.userClient.GetUsersInfoMap(ctx, req.UserIDList)
if err != nil {
return nil, err
@ -494,10 +507,7 @@ func (s *friendServer) GetSpecifiedFriendsInfo(ctx context.Context, req *relatio
return resp, nil
}
func (s *friendServer) UpdateFriends(
ctx context.Context,
req *relation.UpdateFriendsReq,
) (*relation.UpdateFriendsResp, error) {
func (s *friendServer) UpdateFriends(ctx context.Context, req *relation.UpdateFriendsReq) (*relation.UpdateFriendsResp, error) {
if len(req.FriendUserIDs) == 0 {
return nil, errs.ErrArgs.WrapMsg("friendIDList is empty")
}
@ -505,6 +515,10 @@ func (s *friendServer) UpdateFriends(
return nil, errs.ErrArgs.WrapMsg("friendIDList repeated")
}
if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil {
return nil, err
}
_, err := s.db.FindFriendsWithError(ctx, req.OwnerUserID, req.FriendUserIDs)
if err != nil {
return nil, err

@ -25,6 +25,7 @@ import (
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/third"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/datautil"
)
@ -45,7 +46,7 @@ func genLogID() string {
func (t *thirdServer) UploadLogs(ctx context.Context, req *third.UploadLogsReq) (*third.UploadLogsResp, error) {
var dbLogs []*relationtb.Log
userID := ctx.Value(constant.OpUserID).(string)
userID := mcontext.GetOpUserID(ctx)
platform := constant.PlatformID2Name[int(req.Platform)]
for _, fileURL := range req.FileURLs {
log := relationtb.Log{

@ -19,6 +19,7 @@ import (
"fmt"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/mcache"
"github.com/openimsdk/open-im-server/v3/pkg/dbbuild"
@ -148,6 +149,9 @@ func (t *thirdServer) FcmUpdateToken(ctx context.Context, req *third.FcmUpdateTo
}
func (t *thirdServer) SetAppBadge(ctx context.Context, req *third.SetAppBadgeReq) (resp *third.SetAppBadgeResp, err error) {
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
return nil, err
}
err = t.thirdDatabase.SetAppBadge(ctx, req.UserID, int(req.AppUnreadCount))
if err != nil {
return nil, err

@ -46,7 +46,7 @@ func TestName(t *testing.T) {
srv := &cronServer{
ctx: ctx,
config: &CronTaskConfig{
config: &Config{
CronTask: config.CronTask{
RetainChatRecords: 1,
FileExpireTime: 1,

@ -64,16 +64,57 @@ func GetIMAdminUserIDs(ctx context.Context) []string {
}
func IsAdmin(ctx context.Context) bool {
return datautil.Contain(mcontext.GetOpUserID(ctx), GetIMAdminUserIDs(ctx)...)
return IsTempAdmin(ctx) || IsSystemAdmin(ctx)
}
func CheckAccess(ctx context.Context, ownerUserID string) error {
opUserID := mcontext.GetOpUserID(ctx)
if opUserID == ownerUserID {
if mcontext.GetOpUserID(ctx) == ownerUserID {
return nil
}
if datautil.Contain(mcontext.GetOpUserID(ctx), GetIMAdminUserIDs(ctx)...) {
if IsAdmin(ctx) {
return nil
}
return servererrs.ErrNoPermission.WrapMsg("ownerUserID", ownerUserID)
}
func CheckAccessIn(ctx context.Context, ownerUserIDs ...string) error {
opUserID := mcontext.GetOpUserID(ctx)
for _, userID := range ownerUserIDs {
if opUserID == userID {
return nil
}
}
if IsAdmin(ctx) {
return nil
}
return servererrs.ErrNoPermission.WrapMsg("opUser in ownerUserIDs")
}
var tempAdminValue = []string{"1"}
const ctxTempAdminKey = "ctxImTempAdminKey"
func WithTempAdmin(ctx context.Context) context.Context {
keys, _ := ctx.Value(constant.RpcCustomHeader).([]string)
if datautil.Contain(ctxTempAdminKey, keys...) {
return ctx
}
if len(keys) > 0 {
temp := make([]string, 0, len(keys)+1)
temp = append(temp, keys...)
keys = append(temp, ctxTempAdminKey)
} else {
keys = []string{ctxTempAdminKey}
}
ctx = context.WithValue(ctx, constant.RpcCustomHeader, keys)
return context.WithValue(ctx, ctxTempAdminKey, tempAdminValue)
}
func IsTempAdmin(ctx context.Context) bool {
values, _ := ctx.Value(ctxTempAdminKey).([]string)
return datautil.Equal(tempAdminValue, values)
}
func IsSystemAdmin(ctx context.Context) bool {
return datautil.Contain(mcontext.GetOpUserID(ctx), GetIMAdminUserIDs(ctx)...)
}

@ -15,51 +15,55 @@
package callbackstruct
const (
CallbackBeforeInviteJoinGroupCommand = "callbackBeforeInviteJoinGroupCommand"
CallbackAfterJoinGroupCommand = "callbackAfterJoinGroupCommand"
CallbackAfterSetGroupInfoCommand = "callbackAfterSetGroupInfoCommand"
CallbackAfterSetGroupInfoExCommand = "callbackAfterSetGroupInfoExCommand"
CallbackBeforeSetGroupInfoCommand = "callbackBeforeSetGroupInfoCommand"
CallbackBeforeSetGroupInfoExCommand = "callbackBeforeSetGroupInfoExCommand"
CallbackAfterRevokeMsgCommand = "callbackBeforeAfterMsgCommand"
CallbackBeforeAddBlackCommand = "callbackBeforeAddBlackCommand"
CallbackAfterAddFriendCommand = "callbackAfterAddFriendCommand"
CallbackBeforeAddFriendAgreeCommand = "callbackBeforeAddFriendAgreeCommand"
CallbackAfterAddFriendAgreeCommand = "callbackAfterAddFriendAgreeCommand"
CallbackAfterDeleteFriendCommand = "callbackAfterDeleteFriendCommand"
CallbackBeforeImportFriendsCommand = "callbackBeforeImportFriendsCommand"
CallbackAfterImportFriendsCommand = "callbackAfterImportFriendsCommand"
CallbackAfterRemoveBlackCommand = "callbackAfterRemoveBlackCommand"
CallbackAfterQuitGroupCommand = "callbackAfterQuitGroupCommand"
CallbackAfterKickGroupCommand = "callbackAfterKickGroupCommand"
CallbackAfterDisMissGroupCommand = "callbackAfterDisMissGroupCommand"
CallbackBeforeJoinGroupCommand = "callbackBeforeJoinGroupCommand"
CallbackAfterGroupMsgReadCommand = "callbackAfterGroupMsgReadCommand"
CallbackBeforeMsgModifyCommand = "callbackBeforeMsgModifyCommand"
CallbackAfterUpdateUserInfoCommand = "callbackAfterUpdateUserInfoCommand"
CallbackAfterUpdateUserInfoExCommand = "callbackAfterUpdateUserInfoExCommand"
CallbackBeforeUpdateUserInfoExCommand = "callbackBeforeUpdateUserInfoExCommand"
CallbackBeforeUserRegisterCommand = "callbackBeforeUserRegisterCommand"
CallbackAfterUserRegisterCommand = "callbackAfterUserRegisterCommand"
CallbackAfterTransferGroupOwnerCommand = "callbackAfterTransferGroupOwnerCommand"
CallbackBeforeSetFriendRemarkCommand = "callbackBeforeSetFriendRemarkCommand"
CallbackAfterSetFriendRemarkCommand = "callbackAfterSetFriendRemarkCommand"
CallbackAfterSingleMsgReadCommand = "callbackAfterSingleMsgReadCommand"
CallbackBeforeSendSingleMsgCommand = "callbackBeforeSendSingleMsgCommand"
CallbackAfterSendSingleMsgCommand = "callbackAfterSendSingleMsgCommand"
CallbackBeforeSendGroupMsgCommand = "callbackBeforeSendGroupMsgCommand"
CallbackAfterSendGroupMsgCommand = "callbackAfterSendGroupMsgCommand"
CallbackAfterUserOnlineCommand = "callbackAfterUserOnlineCommand"
CallbackAfterUserOfflineCommand = "callbackAfterUserOfflineCommand"
CallbackAfterUserKickOffCommand = "callbackAfterUserKickOffCommand"
CallbackBeforeOfflinePushCommand = "callbackBeforeOfflinePushCommand"
CallbackBeforeOnlinePushCommand = "callbackBeforeOnlinePushCommand"
CallbackBeforeGroupOnlinePushCommand = "callbackBeforeGroupOnlinePushCommand"
CallbackBeforeAddFriendCommand = "callbackBeforeAddFriendCommand"
CallbackBeforeUpdateUserInfoCommand = "callbackBeforeUpdateUserInfoCommand"
CallbackBeforeCreateGroupCommand = "callbackBeforeCreateGroupCommand"
CallbackAfterCreateGroupCommand = "callbackAfterCreateGroupCommand"
CallbackBeforeMembersJoinGroupCommand = "callbackBeforeMembersJoinGroupCommand"
CallbackBeforeSetGroupMemberInfoCommand = "callbackBeforeSetGroupMemberInfoCommand"
CallbackAfterSetGroupMemberInfoCommand = "callbackAfterSetGroupMemberInfoCommand"
CallbackBeforeInviteJoinGroupCommand = "callbackBeforeInviteJoinGroupCommand"
CallbackAfterJoinGroupCommand = "callbackAfterJoinGroupCommand"
CallbackAfterSetGroupInfoCommand = "callbackAfterSetGroupInfoCommand"
CallbackAfterSetGroupInfoExCommand = "callbackAfterSetGroupInfoExCommand"
CallbackBeforeSetGroupInfoCommand = "callbackBeforeSetGroupInfoCommand"
CallbackBeforeSetGroupInfoExCommand = "callbackBeforeSetGroupInfoExCommand"
CallbackAfterRevokeMsgCommand = "callbackBeforeAfterMsgCommand"
CallbackBeforeAddBlackCommand = "callbackBeforeAddBlackCommand"
CallbackAfterAddFriendCommand = "callbackAfterAddFriendCommand"
CallbackBeforeAddFriendAgreeCommand = "callbackBeforeAddFriendAgreeCommand"
CallbackAfterAddFriendAgreeCommand = "callbackAfterAddFriendAgreeCommand"
CallbackAfterDeleteFriendCommand = "callbackAfterDeleteFriendCommand"
CallbackBeforeImportFriendsCommand = "callbackBeforeImportFriendsCommand"
CallbackAfterImportFriendsCommand = "callbackAfterImportFriendsCommand"
CallbackAfterRemoveBlackCommand = "callbackAfterRemoveBlackCommand"
CallbackAfterQuitGroupCommand = "callbackAfterQuitGroupCommand"
CallbackAfterKickGroupCommand = "callbackAfterKickGroupCommand"
CallbackAfterDisMissGroupCommand = "callbackAfterDisMissGroupCommand"
CallbackBeforeJoinGroupCommand = "callbackBeforeJoinGroupCommand"
CallbackAfterGroupMsgReadCommand = "callbackAfterGroupMsgReadCommand"
CallbackBeforeMsgModifyCommand = "callbackBeforeMsgModifyCommand"
CallbackAfterUpdateUserInfoCommand = "callbackAfterUpdateUserInfoCommand"
CallbackAfterUpdateUserInfoExCommand = "callbackAfterUpdateUserInfoExCommand"
CallbackBeforeUpdateUserInfoExCommand = "callbackBeforeUpdateUserInfoExCommand"
CallbackBeforeUserRegisterCommand = "callbackBeforeUserRegisterCommand"
CallbackAfterUserRegisterCommand = "callbackAfterUserRegisterCommand"
CallbackAfterTransferGroupOwnerCommand = "callbackAfterTransferGroupOwnerCommand"
CallbackBeforeSetFriendRemarkCommand = "callbackBeforeSetFriendRemarkCommand"
CallbackAfterSetFriendRemarkCommand = "callbackAfterSetFriendRemarkCommand"
CallbackAfterSingleMsgReadCommand = "callbackAfterSingleMsgReadCommand"
CallbackBeforeSendSingleMsgCommand = "callbackBeforeSendSingleMsgCommand"
CallbackAfterSendSingleMsgCommand = "callbackAfterSendSingleMsgCommand"
CallbackBeforeSendGroupMsgCommand = "callbackBeforeSendGroupMsgCommand"
CallbackAfterSendGroupMsgCommand = "callbackAfterSendGroupMsgCommand"
CallbackAfterUserOnlineCommand = "callbackAfterUserOnlineCommand"
CallbackAfterUserOfflineCommand = "callbackAfterUserOfflineCommand"
CallbackAfterUserKickOffCommand = "callbackAfterUserKickOffCommand"
CallbackBeforeOfflinePushCommand = "callbackBeforeOfflinePushCommand"
CallbackBeforeOnlinePushCommand = "callbackBeforeOnlinePushCommand"
CallbackBeforeGroupOnlinePushCommand = "callbackBeforeGroupOnlinePushCommand"
CallbackBeforeAddFriendCommand = "callbackBeforeAddFriendCommand"
CallbackBeforeUpdateUserInfoCommand = "callbackBeforeUpdateUserInfoCommand"
CallbackBeforeCreateGroupCommand = "callbackBeforeCreateGroupCommand"
CallbackAfterCreateGroupCommand = "callbackAfterCreateGroupCommand"
CallbackBeforeMembersJoinGroupCommand = "callbackBeforeMembersJoinGroupCommand"
CallbackBeforeSetGroupMemberInfoCommand = "callbackBeforeSetGroupMemberInfoCommand"
CallbackAfterSetGroupMemberInfoCommand = "callbackAfterSetGroupMemberInfoCommand"
CallbackBeforeCreateSingleChatConversationsCommand = "callbackBeforeCreateSingleChatConversationsCommand"
CallbackAfterCreateSingleChatConversationsCommand = "callbackAfterCreateSingleChatConversationsCommand"
CallbackBeforeCreateGroupChatConversationsCommand = "callbackBeforeCreateGroupChatConversationsCommand"
CallbackAfterCreateGroupChatConversationsCommand = "callbackAfterCreateGroupChatConversationsCommand"
)

@ -0,0 +1,91 @@
package callbackstruct
type CallbackBeforeCreateSingleChatConversationsReq struct {
CallbackCommand `json:"callbackCommand"`
OwnerUserID string `json:"owner_user_id"`
ConversationID string `json:"conversation_id"`
ConversationType int32 `json:"conversation_type"`
UserID string `json:"user_id"`
RecvMsgOpt int32 `json:"recv_msg_opt"`
IsPinned bool `json:"is_pinned"`
IsPrivateChat bool `json:"is_private_chat"`
BurnDuration int32 `json:"burn_duration"`
GroupAtType int32 `json:"group_at_type"`
AttachedInfo string `json:"attached_info"`
Ex string `json:"ex"`
}
type CallbackBeforeCreateSingleChatConversationsResp struct {
CommonCallbackResp
RecvMsgOpt *int32 `json:"recv_msg_opt"`
IsPinned *bool `json:"is_pinned"`
IsPrivateChat *bool `json:"is_private_chat"`
BurnDuration *int32 `json:"burn_duration"`
GroupAtType *int32 `json:"group_at_type"`
AttachedInfo *string `json:"attached_info"`
Ex *string `json:"ex"`
}
type CallbackAfterCreateSingleChatConversationsReq struct {
CallbackCommand `json:"callbackCommand"`
OwnerUserID string `json:"owner_user_id"`
ConversationID string `json:"conversation_id"`
ConversationType int32 `json:"conversation_type"`
UserID string `json:"user_id"`
RecvMsgOpt int32 `json:"recv_msg_opt"`
IsPinned bool `json:"is_pinned"`
IsPrivateChat bool `json:"is_private_chat"`
BurnDuration int32 `json:"burn_duration"`
GroupAtType int32 `json:"group_at_type"`
AttachedInfo string `json:"attached_info"`
Ex string `json:"ex"`
}
type CallbackAfterCreateSingleChatConversationsResp struct {
CommonCallbackResp
}
type CallbackBeforeCreateGroupChatConversationsReq struct {
CallbackCommand `json:"callbackCommand"`
OwnerUserID string `json:"owner_user_id"`
ConversationID string `json:"conversation_id"`
ConversationType int32 `json:"conversation_type"`
GroupID string `json:"group_id"`
RecvMsgOpt int32 `json:"recv_msg_opt"`
IsPinned bool `json:"is_pinned"`
IsPrivateChat bool `json:"is_private_chat"`
BurnDuration int32 `json:"burn_duration"`
GroupAtType int32 `json:"group_at_type"`
AttachedInfo string `json:"attached_info"`
Ex string `json:"ex"`
}
type CallbackBeforeCreateGroupChatConversationsResp struct {
CommonCallbackResp
RecvMsgOpt *int32 `json:"recv_msg_opt"`
IsPinned *bool `json:"is_pinned"`
IsPrivateChat *bool `json:"is_private_chat"`
BurnDuration *int32 `json:"burn_duration"`
GroupAtType *int32 `json:"group_at_type"`
AttachedInfo *string `json:"attached_info"`
Ex *string `json:"ex"`
}
type CallbackAfterCreateGroupChatConversationsReq struct {
CallbackCommand `json:"callbackCommand"`
OwnerUserID string `json:"owner_user_id"`
ConversationID string `json:"conversation_id"`
ConversationType int32 `json:"conversation_type"`
GroupID string `json:"group_id"`
RecvMsgOpt int32 `json:"recv_msg_opt"`
IsPinned bool `json:"is_pinned"`
IsPrivateChat bool `json:"is_private_chat"`
BurnDuration int32 `json:"burn_duration"`
GroupAtType int32 `json:"group_at_type"`
AttachedInfo string `json:"attached_info"`
Ex string `json:"ex"`
}
type CallbackAfterCreateGroupChatConversationsResp struct {
CommonCallbackResp
}

@ -41,6 +41,7 @@ func NewConversationRpcCmd() *ConversationRpcCmd {
config.MongodbConfigFileName: &conversationConfig.MongodbConfig,
config.ShareFileName: &conversationConfig.Share,
config.NotificationFileName: &conversationConfig.NotificationConfig,
config.WebhooksConfigFileName: &conversationConfig.WebhooksConfig,
config.LocalCacheConfigFileName: &conversationConfig.LocalCacheConfig,
config.DiscoveryConfigFilename: &conversationConfig.Discovery,
}
@ -67,6 +68,7 @@ func (a *ConversationRpcCmd) runE() error {
a.conversationConfig.NotificationConfig.GetConfigFileName(),
a.conversationConfig.Share.GetConfigFileName(),
a.conversationConfig.LocalCacheConfig.GetConfigFileName(),
a.conversationConfig.WebhooksConfig.GetConfigFileName(),
a.conversationConfig.Discovery.GetConfigFileName(),
}, nil,
conversation.Start)

@ -22,7 +22,6 @@ import (
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/db/pagination"
"github.com/openimsdk/tools/db/tx"
@ -47,8 +46,11 @@ type ConversationDatabase interface {
// SetUsersConversationFieldTx updates a specific field for multiple users' conversations, creating new conversations if they do not exist, or updates them otherwise. This operation is
// transactional.
SetUsersConversationFieldTx(ctx context.Context, userIDs []string, conversation *relationtb.Conversation, fieldMap map[string]any) error
// UpdateUserConversations updates all conversations related to a specified user.
// This function does NOT update the user's own conversations but rather the conversations where this user is involved (e.g., other users' conversations referencing this user).
UpdateUserConversations(ctx context.Context, userID string, args map[string]any) error
// CreateGroupChatConversation creates a group chat conversation for the specified group ID and user IDs.
CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error
CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string, conversations *relationtb.Conversation) error
// GetConversationIDs retrieves conversation IDs for a given user.
GetConversationIDs(ctx context.Context, userID string) ([]string, error)
// GetUserConversationIDsHash gets the hash of conversation IDs for a given user.
@ -146,6 +148,18 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context,
})
}
func (c *conversationDatabase) UpdateUserConversations(ctx context.Context, userID string, args map[string]any) error {
conversations, err := c.conversationDB.UpdateUserConversations(ctx, userID, args)
if err != nil {
return err
}
cache := c.cache.CloneConversationCache()
for _, conversation := range conversations {
cache = cache.DelUsersConversation(conversation.ConversationID, conversation.OwnerUserID).DelConversationVersionUserIDs(conversation.OwnerUserID)
}
return cache.ChainExecDel(ctx)
}
func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context, userIDs []string, conversationID string, args map[string]any) error {
_, err := c.conversationDB.UpdateByMap(ctx, userIDs, conversationID, args)
if err != nil {
@ -298,10 +312,10 @@ func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUs
// return c.cache.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID)
//}
func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error {
func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string, conversation *relationtb.Conversation) error {
return c.tx.Transaction(ctx, func(ctx context.Context) error {
cache := c.cache.CloneConversationCache()
conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID)
conversationID := conversation.ConversationID
existConversationUserIDs, err := c.conversationDB.FindUserID(ctx, userIDs, []string{conversationID})
if err != nil {
return err
@ -309,7 +323,15 @@ func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context,
notExistUserIDs := stringutil.DifferenceString(userIDs, existConversationUserIDs)
var conversations []*relationtb.Conversation
for _, v := range notExistUserIDs {
conversation := relationtb.Conversation{ConversationType: constant.ReadGroupChatType, GroupID: groupID, OwnerUserID: v, ConversationID: conversationID}
conversation := relationtb.Conversation{
ConversationType: conversation.ConversationType, GroupID: groupID, OwnerUserID: v, ConversationID: conversationID,
// the parameters have default value
RecvMsgOpt: conversation.RecvMsgOpt, IsPinned: conversation.IsPinned, IsPrivateChat: conversation.IsPrivateChat,
BurnDuration: conversation.BurnDuration, GroupAtType: conversation.GroupAtType, AttachedInfo: conversation.AttachedInfo,
Ex: conversation.Ex, MaxSeq: conversation.MaxSeq, MinSeq: conversation.MinSeq, CreateTime: conversation.CreateTime,
MsgDestructTime: conversation.MsgDestructTime, IsMsgDestruct: conversation.IsMsgDestruct, LatestMsgDestructTime: conversation.LatestMsgDestructTime,
}
conversations = append(conversations, &conversation)
cache = cache.DelConversations(v, conversationID).DelConversationNotReceiveMessageUserIDs(conversationID)
}
@ -320,7 +342,7 @@ func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context,
return err
}
}
_, err = c.conversationDB.UpdateByMap(ctx, existConversationUserIDs, conversationID, map[string]any{"max_seq": 0})
_, err = c.conversationDB.UpdateByMap(ctx, existConversationUserIDs, conversationID, map[string]any{"max_seq": conversation.MaxSeq})
if err != nil {
return err
}

@ -24,6 +24,7 @@ import (
type Conversation interface {
Create(ctx context.Context, conversations []*model.Conversation) (err error)
UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]any) (rows int64, err error)
UpdateUserConversations(ctx context.Context, userID string, args map[string]any) ([]*model.Conversation, error)
Update(ctx context.Context, conversation *model.Conversation) (err error)
Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*model.Conversation, err error)
FindUserID(ctx context.Context, userIDs []string, conversationIDs []string) ([]string, error)

@ -21,23 +21,32 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/pagination"
"github.com/openimsdk/tools/errs"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func NewConversationMongo(db *mongo.Database) (*ConversationMgo, error) {
coll := db.Collection(database.ConversationName)
_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
Keys: bson.D{
{Key: "owner_user_id", Value: 1},
{Key: "conversation_id", Value: 1},
_, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{
Keys: bson.D{
{Key: "owner_user_id", Value: 1},
{Key: "conversation_id", Value: 1},
},
Options: options.Index().SetUnique(true),
},
{
Keys: bson.D{
{Key: "user_id", Value: 1},
},
Options: options.Index(),
},
Options: options.Index().SetUnique(true),
})
if err != nil {
return nil, errs.Wrap(err)
@ -101,6 +110,38 @@ func (c *ConversationMgo) UpdateByMap(ctx context.Context, userIDs []string, con
return rows, nil
}
func (c *ConversationMgo) UpdateUserConversations(ctx context.Context, userID string, args map[string]any) ([]*model.Conversation, error) {
if len(args) == 0 {
return nil, nil
}
filter := bson.M{
"user_id": userID,
}
conversations, err := mongoutil.Find[*model.Conversation](ctx, c.coll, filter, options.Find().SetProjection(bson.M{"_id": 0, "owner_user_id": 1, "conversation_id": 1}))
if err != nil {
return nil, err
}
err = mongoutil.IncrVersion(func() error {
_, err := mongoutil.UpdateMany(ctx, c.coll, filter, bson.M{"$set": args})
if err != nil {
return err
}
return nil
}, func() error {
for _, conversation := range conversations {
if err := c.version.IncrVersion(ctx, conversation.OwnerUserID, []string{conversation.ConversationID}, model.VersionStateUpdate); err != nil {
return err
}
}
return nil
})
if err != nil {
return nil, err
}
return conversations, nil
}
func (c *ConversationMgo) Update(ctx context.Context, conversation *model.Conversation) (err error) {
return mongoutil.IncrVersion(func() error {
return mongoutil.UpdateOne(ctx, c.coll, bson.M{"owner_user_id": conversation.OwnerUserID, "conversation_id": conversation.ConversationID}, bson.M{"$set": conversation}, true)

@ -7,6 +7,7 @@ import (
"sync"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/idutil"
)
@ -253,13 +254,14 @@ func (b *Batcher[T]) distributeMessage(messages map[string][]*T, totalCount int,
func (b *Batcher[T]) run(channelID int, ch <-chan *Msg[T]) {
defer b.wait.Done()
ctx := authverify.WithTempAdmin(context.Background())
for {
select {
case messages, ok := <-ch:
if !ok {
return
}
b.Do(context.Background(), channelID, messages)
b.Do(ctx, channelID, messages)
if b.config.syncWait {
b.counter.Done()
}

Loading…
Cancel
Save