diff --git a/config/webhooks.yml b/config/webhooks.yml index c7839d4f2..11a85ba0c 100644 --- a/config/webhooks.yml +++ b/config/webhooks.yml @@ -13,6 +13,9 @@ afterUpdateUserInfoEx: afterSendSingleMsg: enable: false timeout: 5 + # Only the senID/recvID specified in attentionIds will send the callback + # if not set, all user messages will be callback + attentionIds: [] beforeSendGroupMsg: enable: false timeout: 5 diff --git a/internal/api/msg.go b/internal/api/msg.go index 180342e59..ba63fbb66 100644 --- a/internal/api/msg.go +++ b/internal/api/msg.go @@ -101,6 +101,7 @@ func (m MessageApi) newUserSendMsgReq(_ *gin.Context, params *apistruct.SendMsg) SendTime: params.SendTime, Options: options, OfflinePushInfo: params.OfflinePushInfo, + Ex: params.Ex, }, } return &pbData diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 320fb1d52..804375e4f 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -61,7 +61,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg userRpcClient: &userRpcClient, RegisterCenter: client, authDatabase: controller.NewAuthDatabase( - redis2.NewTokenCacheModel(rdb), + redis2.NewTokenCacheModel(rdb, config.RpcConfig.TokenPolicy.Expire), config.Share.Secret, config.RpcConfig.TokenPolicy.Expire, ), diff --git a/internal/rpc/group/notification.go b/internal/rpc/group/notification.go index ee4f00ad3..6843c20e2 100644 --- a/internal/rpc/group/notification.go +++ b/internal/rpc/group/notification.go @@ -25,7 +25,9 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" + "github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification" "github.com/openimsdk/protocol/constant" pbgroup "github.com/openimsdk/protocol/group" "github.com/openimsdk/protocol/sdkws" @@ -36,6 +38,12 @@ import ( "github.com/openimsdk/tools/utils/stringutil" ) +// GroupApplicationReceiver +const ( + applicantReceiver = iota + adminReceiver +) + func NewGroupNotificationSender(db controller.GroupDatabase, msgRpcClient *rpcclient.MessageRpcClient, userRpcClient *rpcclient.UserRpcClient, config *Config, fn func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error)) *GroupNotificationSender { return &GroupNotificationSender{ NotificationSender: rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithRpcClient(msgRpcClient), rpcclient.WithUserRpcClient(userRpcClient)), @@ -418,15 +426,17 @@ func (g *GroupNotificationSender) GroupApplicationAcceptedNotification(ctx conte if err != nil { return } - tips := &sdkws.GroupApplicationAcceptedTips{Group: group, HandleMsg: req.HandledMsg} - if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { + + var opUser *sdkws.GroupMemberFullInfo + if err = g.fillOpUser(ctx, &opUser, group.GroupID); err != nil { return } for _, userID := range append(userIDs, req.FromUserID) { + tips := &sdkws.GroupApplicationAcceptedTips{Group: group, OpUser: opUser, HandleMsg: req.HandledMsg} if userID == req.FromUserID { - tips.ReceiverAs = 0 + tips.ReceiverAs = applicantReceiver } else { - tips.ReceiverAs = 1 + tips.ReceiverAs = adminReceiver } g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.GroupApplicationAcceptedNotification, tips) } @@ -449,15 +459,17 @@ func (g *GroupNotificationSender) GroupApplicationRejectedNotification(ctx conte if err != nil { return } - tips := &sdkws.GroupApplicationRejectedTips{Group: group, HandleMsg: req.HandledMsg} - if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { + + var opUser *sdkws.GroupMemberFullInfo + if err = g.fillOpUser(ctx, &opUser, group.GroupID); err != nil { return } for _, userID := range append(userIDs, req.FromUserID) { + tips := &sdkws.GroupApplicationAcceptedTips{Group: group, OpUser: opUser, HandleMsg: req.HandledMsg} if userID == req.FromUserID { - tips.ReceiverAs = 0 + tips.ReceiverAs = applicantReceiver } else { - tips.ReceiverAs = 1 + tips.ReceiverAs = adminReceiver } g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.GroupApplicationRejectedNotification, tips) } diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go index 10404675e..be58d7504 100644 --- a/internal/rpc/msg/callback.go +++ b/internal/rpc/msg/callback.go @@ -83,6 +83,11 @@ func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config if msg.MsgData.ContentType == constant.Typing { return } + // According to the attentionIds configuration, only some users are sent + attentionIds := after.AttentionIds + if attentionIds != nil && !datautil.Contain(msg.MsgData.RecvID, attentionIds...) && !datautil.Contain(msg.MsgData.SendID, attentionIds...) { + return + } cbReq := &cbapi.CallbackAfterSendSingleMsgReq{ CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), RecvID: msg.MsgData.RecvID, diff --git a/pkg/apistruct/manage.go b/pkg/apistruct/manage.go index e79b47722..6ea6a29ed 100644 --- a/pkg/apistruct/manage.go +++ b/pkg/apistruct/manage.go @@ -55,6 +55,9 @@ type SendMsg struct { // OfflinePushInfo contains information for offline push notifications. OfflinePushInfo *sdkws.OfflinePushInfo `json:"offlinePushInfo"` + + // Ex stores extended fields + Ex string `json:"ex"` } // SendMsgReq extends SendMsg with the requirement of RecvID when SessionType indicates a one-on-one or notification chat. diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 6260dc00f..0b6176fb7 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -339,8 +339,9 @@ type BeforeConfig struct { } type AfterConfig struct { - Enable bool `mapstructure:"enable"` - Timeout int `mapstructure:"timeout"` + Enable bool `mapstructure:"enable"` + Timeout int `mapstructure:"timeout"` + AttentionIds []string `mapstructure:"attentionIds"` } type Share struct { diff --git a/pkg/common/storage/cache/redis/token.go b/pkg/common/storage/cache/redis/token.go index 6098a666c..b82259658 100644 --- a/pkg/common/storage/cache/redis/token.go +++ b/pkg/common/storage/cache/redis/token.go @@ -21,22 +21,36 @@ import ( "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/utils/stringutil" "github.com/redis/go-redis/v9" + "time" ) type tokenCache struct { - rdb redis.UniversalClient + rdb redis.UniversalClient + accessExpire time.Duration } -func NewTokenCacheModel(rdb redis.UniversalClient) cache.TokenModel { - return &tokenCache{ - rdb: rdb, - } +func NewTokenCacheModel(rdb redis.UniversalClient, accessExpire int64) cache.TokenModel { + c := &tokenCache{rdb: rdb} + c.accessExpire = c.getExpireTime(accessExpire) + return c } -func (c *tokenCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { +func (c *tokenCache) SetTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { return errs.Wrap(c.rdb.HSet(ctx, cachekey.GetTokenKey(userID, platformID), token, flag).Err()) } +// SetTokenFlagEx set token and flag with expire time +func (c *tokenCache) SetTokenFlagEx(ctx context.Context, userID string, platformID int, token string, flag int) error { + key := cachekey.GetTokenKey(userID, platformID) + if err := c.rdb.HSet(ctx, key, token, flag).Err(); err != nil { + return errs.Wrap(err) + } + if err := c.rdb.Expire(ctx, key, c.accessExpire).Err(); err != nil { + return errs.Wrap(err) + } + return nil +} + func (c *tokenCache) GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error) { m, err := c.rdb.HGetAll(ctx, cachekey.GetTokenKey(userID, platformID)).Result() if err != nil { @@ -61,3 +75,7 @@ func (c *tokenCache) SetTokenMapByUidPid(ctx context.Context, userID string, pla func (c *tokenCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error { return errs.Wrap(c.rdb.HDel(ctx, cachekey.GetTokenKey(userID, platformID), fields...).Err()) } + +func (c *tokenCache) getExpireTime(t int64) time.Duration { + return time.Hour * 24 * time.Duration(t) +} diff --git a/pkg/common/storage/cache/token.go b/pkg/common/storage/cache/token.go index 55b3321ef..4a0fee087 100644 --- a/pkg/common/storage/cache/token.go +++ b/pkg/common/storage/cache/token.go @@ -5,7 +5,9 @@ import ( ) type TokenModel interface { - AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error + SetTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error + // SetTokenFlagEx set token and flag with expire time + SetTokenFlagEx(ctx context.Context, userID string, platformID int, token string, flag int) error GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error) SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error diff --git a/pkg/common/storage/controller/auth.go b/pkg/common/storage/controller/auth.go index 321583743..fbfe30836 100644 --- a/pkg/common/storage/controller/auth.go +++ b/pkg/common/storage/controller/auth.go @@ -55,6 +55,7 @@ func (a *authDatabase) SetTokenMapByUidPid(ctx context.Context, userID string, p // Create Token. func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformID int) (string, error) { + isCreate := true // flag is create or update tokens, err := a.cache.GetTokensWithoutError(ctx, userID, platformID) if err != nil { return "", err @@ -65,6 +66,9 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI if err != nil || v != constant.NormalToken { deleteTokenKey = append(deleteTokenKey, k) } + if v == constant.NormalToken { + isCreate = false + } } if len(deleteTokenKey) != 0 { err = a.cache.DeleteTokenByUidPid(ctx, userID, platformID, deleteTokenKey) @@ -79,5 +83,17 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI if err != nil { return "", errs.WrapMsg(err, "token.SignedString") } - return tokenString, a.cache.AddTokenFlag(ctx, userID, platformID, tokenString, constant.NormalToken) + + if isCreate { + // should create,should specify expiration time + if err = a.cache.SetTokenFlagEx(ctx, userID, platformID, tokenString, constant.NormalToken); err != nil { + return "", err + } + } else { + // should update + if err = a.cache.SetTokenFlag(ctx, userID, platformID, tokenString, constant.NormalToken); err != nil { + return "", err + } + } + return tokenString, nil }