Merge branch 'openimsdk:main' into main

pull/2386/head
chao 1 year ago committed by GitHub
commit 4781cf4043
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -13,6 +13,9 @@ afterUpdateUserInfoEx:
afterSendSingleMsg: afterSendSingleMsg:
enable: false enable: false
timeout: 5 timeout: 5
# Only the senID/recvID specified in attentionIds will send the callback
# if not set, all user messages will be callback
attentionIds: []
beforeSendGroupMsg: beforeSendGroupMsg:
enable: false enable: false
timeout: 5 timeout: 5

@ -101,6 +101,7 @@ func (m MessageApi) newUserSendMsgReq(_ *gin.Context, params *apistruct.SendMsg)
SendTime: params.SendTime, SendTime: params.SendTime,
Options: options, Options: options,
OfflinePushInfo: params.OfflinePushInfo, OfflinePushInfo: params.OfflinePushInfo,
Ex: params.Ex,
}, },
} }
return &pbData return &pbData

@ -61,7 +61,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
userRpcClient: &userRpcClient, userRpcClient: &userRpcClient,
RegisterCenter: client, RegisterCenter: client,
authDatabase: controller.NewAuthDatabase( authDatabase: controller.NewAuthDatabase(
redis2.NewTokenCacheModel(rdb), redis2.NewTokenCacheModel(rdb, config.RpcConfig.TokenPolicy.Expire),
config.Share.Secret, config.Share.Secret,
config.RpcConfig.TokenPolicy.Expire, config.RpcConfig.TokenPolicy.Expire,
), ),

@ -17,13 +17,13 @@ package group
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification"
"github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification"
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
pbgroup "github.com/openimsdk/protocol/group" pbgroup "github.com/openimsdk/protocol/group"
"github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/sdkws"
@ -34,6 +34,12 @@ import (
"github.com/openimsdk/tools/utils/stringutil" "github.com/openimsdk/tools/utils/stringutil"
) )
// GroupApplicationReceiver
const (
applicantReceiver = iota
adminReceiver
)
func NewGroupNotificationSender(db controller.GroupDatabase, msgRpcClient *rpcclient.MessageRpcClient, userRpcClient *rpcclient.UserRpcClient, config *Config, fn func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error)) *GroupNotificationSender { func NewGroupNotificationSender(db controller.GroupDatabase, msgRpcClient *rpcclient.MessageRpcClient, userRpcClient *rpcclient.UserRpcClient, config *Config, fn func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error)) *GroupNotificationSender {
return &GroupNotificationSender{ return &GroupNotificationSender{
NotificationSender: rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithRpcClient(msgRpcClient), rpcclient.WithUserRpcClient(userRpcClient)), NotificationSender: rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithRpcClient(msgRpcClient), rpcclient.WithUserRpcClient(userRpcClient)),
@ -400,15 +406,17 @@ func (g *GroupNotificationSender) GroupApplicationAcceptedNotification(ctx conte
if err != nil { if err != nil {
return return
} }
tips := &sdkws.GroupApplicationAcceptedTips{Group: group, HandleMsg: req.HandledMsg}
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { var opUser *sdkws.GroupMemberFullInfo
if err = g.fillOpUser(ctx, &opUser, group.GroupID); err != nil {
return return
} }
for _, userID := range append(userIDs, req.FromUserID) { for _, userID := range append(userIDs, req.FromUserID) {
tips := &sdkws.GroupApplicationAcceptedTips{Group: group, OpUser: opUser, HandleMsg: req.HandledMsg}
if userID == req.FromUserID { if userID == req.FromUserID {
tips.ReceiverAs = 0 tips.ReceiverAs = applicantReceiver
} else { } else {
tips.ReceiverAs = 1 tips.ReceiverAs = adminReceiver
} }
g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.GroupApplicationAcceptedNotification, tips) g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.GroupApplicationAcceptedNotification, tips)
} }
@ -431,15 +439,17 @@ func (g *GroupNotificationSender) GroupApplicationRejectedNotification(ctx conte
if err != nil { if err != nil {
return return
} }
tips := &sdkws.GroupApplicationRejectedTips{Group: group, HandleMsg: req.HandledMsg}
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { var opUser *sdkws.GroupMemberFullInfo
if err = g.fillOpUser(ctx, &opUser, group.GroupID); err != nil {
return return
} }
for _, userID := range append(userIDs, req.FromUserID) { for _, userID := range append(userIDs, req.FromUserID) {
tips := &sdkws.GroupApplicationAcceptedTips{Group: group, OpUser: opUser, HandleMsg: req.HandledMsg}
if userID == req.FromUserID { if userID == req.FromUserID {
tips.ReceiverAs = 0 tips.ReceiverAs = applicantReceiver
} else { } else {
tips.ReceiverAs = 1 tips.ReceiverAs = adminReceiver
} }
g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.GroupApplicationRejectedNotification, tips) g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.GroupApplicationRejectedNotification, tips)
} }

@ -83,6 +83,11 @@ func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config
if msg.MsgData.ContentType == constant.Typing { if msg.MsgData.ContentType == constant.Typing {
return return
} }
// According to the attentionIds configuration, only some users are sent
attentionIds := after.AttentionIds
if attentionIds != nil && !datautil.Contain(msg.MsgData.RecvID, attentionIds...) && !datautil.Contain(msg.MsgData.SendID, attentionIds...) {
return
}
cbReq := &cbapi.CallbackAfterSendSingleMsgReq{ cbReq := &cbapi.CallbackAfterSendSingleMsgReq{
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand),
RecvID: msg.MsgData.RecvID, RecvID: msg.MsgData.RecvID,

@ -55,6 +55,9 @@ type SendMsg struct {
// OfflinePushInfo contains information for offline push notifications. // OfflinePushInfo contains information for offline push notifications.
OfflinePushInfo *sdkws.OfflinePushInfo `json:"offlinePushInfo"` OfflinePushInfo *sdkws.OfflinePushInfo `json:"offlinePushInfo"`
// Ex stores extended fields
Ex string `json:"ex"`
} }
// SendMsgReq extends SendMsg with the requirement of RecvID when SessionType indicates a one-on-one or notification chat. // SendMsgReq extends SendMsg with the requirement of RecvID when SessionType indicates a one-on-one or notification chat.

@ -339,8 +339,9 @@ type BeforeConfig struct {
} }
type AfterConfig struct { type AfterConfig struct {
Enable bool `mapstructure:"enable"` Enable bool `mapstructure:"enable"`
Timeout int `mapstructure:"timeout"` Timeout int `mapstructure:"timeout"`
AttentionIds []string `mapstructure:"attentionIds"`
} }
type Share struct { type Share struct {

@ -21,22 +21,36 @@ import (
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/stringutil" "github.com/openimsdk/tools/utils/stringutil"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"time"
) )
type tokenCache struct { type tokenCache struct {
rdb redis.UniversalClient rdb redis.UniversalClient
accessExpire time.Duration
} }
func NewTokenCacheModel(rdb redis.UniversalClient) cache.TokenModel { func NewTokenCacheModel(rdb redis.UniversalClient, accessExpire int64) cache.TokenModel {
return &tokenCache{ c := &tokenCache{rdb: rdb}
rdb: rdb, c.accessExpire = c.getExpireTime(accessExpire)
} return c
} }
func (c *tokenCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { func (c *tokenCache) SetTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
return errs.Wrap(c.rdb.HSet(ctx, cachekey.GetTokenKey(userID, platformID), token, flag).Err()) return errs.Wrap(c.rdb.HSet(ctx, cachekey.GetTokenKey(userID, platformID), token, flag).Err())
} }
// SetTokenFlagEx set token and flag with expire time
func (c *tokenCache) SetTokenFlagEx(ctx context.Context, userID string, platformID int, token string, flag int) error {
key := cachekey.GetTokenKey(userID, platformID)
if err := c.rdb.HSet(ctx, key, token, flag).Err(); err != nil {
return errs.Wrap(err)
}
if err := c.rdb.Expire(ctx, key, c.accessExpire).Err(); err != nil {
return errs.Wrap(err)
}
return nil
}
func (c *tokenCache) GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error) { func (c *tokenCache) GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error) {
m, err := c.rdb.HGetAll(ctx, cachekey.GetTokenKey(userID, platformID)).Result() m, err := c.rdb.HGetAll(ctx, cachekey.GetTokenKey(userID, platformID)).Result()
if err != nil { if err != nil {
@ -61,3 +75,7 @@ func (c *tokenCache) SetTokenMapByUidPid(ctx context.Context, userID string, pla
func (c *tokenCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error { func (c *tokenCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error {
return errs.Wrap(c.rdb.HDel(ctx, cachekey.GetTokenKey(userID, platformID), fields...).Err()) return errs.Wrap(c.rdb.HDel(ctx, cachekey.GetTokenKey(userID, platformID), fields...).Err())
} }
func (c *tokenCache) getExpireTime(t int64) time.Duration {
return time.Hour * 24 * time.Duration(t)
}

@ -5,7 +5,9 @@ import (
) )
type TokenModel interface { type TokenModel interface {
AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error SetTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error
// SetTokenFlagEx set token and flag with expire time
SetTokenFlagEx(ctx context.Context, userID string, platformID int, token string, flag int) error
GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error) GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error)
SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error
DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error

@ -55,6 +55,7 @@ func (a *authDatabase) SetTokenMapByUidPid(ctx context.Context, userID string, p
// Create Token. // Create Token.
func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformID int) (string, error) { func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformID int) (string, error) {
isCreate := true // flag is create or update
tokens, err := a.cache.GetTokensWithoutError(ctx, userID, platformID) tokens, err := a.cache.GetTokensWithoutError(ctx, userID, platformID)
if err != nil { if err != nil {
return "", err return "", err
@ -65,6 +66,9 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI
if err != nil || v != constant.NormalToken { if err != nil || v != constant.NormalToken {
deleteTokenKey = append(deleteTokenKey, k) deleteTokenKey = append(deleteTokenKey, k)
} }
if v == constant.NormalToken {
isCreate = false
}
} }
if len(deleteTokenKey) != 0 { if len(deleteTokenKey) != 0 {
err = a.cache.DeleteTokenByUidPid(ctx, userID, platformID, deleteTokenKey) err = a.cache.DeleteTokenByUidPid(ctx, userID, platformID, deleteTokenKey)
@ -79,5 +83,17 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI
if err != nil { if err != nil {
return "", errs.WrapMsg(err, "token.SignedString") return "", errs.WrapMsg(err, "token.SignedString")
} }
return tokenString, a.cache.AddTokenFlag(ctx, userID, platformID, tokenString, constant.NormalToken)
if isCreate {
// should create,should specify expiration time
if err = a.cache.SetTokenFlagEx(ctx, userID, platformID, tokenString, constant.NormalToken); err != nil {
return "", err
}
} else {
// should update
if err = a.cache.SetTokenFlag(ctx, userID, platformID, tokenString, constant.NormalToken); err != nil {
return "", err
}
}
return tokenString, nil
} }

Loading…
Cancel
Save