Merge branch 'openimsdk:main' into main

pull/2386/head
chao 1 year ago committed by GitHub
commit 51567959db
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -23,7 +23,9 @@ geTui:
channelID: '' channelID: ''
channelName: '' channelName: ''
fcm: fcm:
serviceAccount: "x.json" # Prioritize using file paths. If the file path is empty, use URL
filePath: "" # File path is concatenated with the parameters passed in through - c(`mage` default pass in `config/`) and filePath.
authURL: "" # Must start with https or http.
jpns: jpns:
appKey: '' appKey: ''
masterSecret: '' masterSecret: ''

@ -13,7 +13,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect
github.com/mitchellh/mapstructure v1.5.0 github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.65 github.com/openimsdk/protocol v0.0.66-alpha.1
github.com/openimsdk/tools v0.0.49-alpha.19 github.com/openimsdk/tools v0.0.49-alpha.19
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_golang v1.18.0

@ -270,8 +270,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJp0= github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJp0=
github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.65 h1:SPT9qyUsFRTTKSKb/FjpS+xr6sxz/Kbnu+su1bxYagc= github.com/openimsdk/protocol v0.0.66-alpha.1 h1:/8y+aXQeX6+IgfFxujHbRgJylqJRkwF5gMrwNhWMsiU=
github.com/openimsdk/protocol v0.0.65/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/protocol v0.0.66-alpha.1/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.49-alpha.19 h1:CbASL0yefRSVAmWPVeRnhF7wZKd6umLfz31CIhEgrBs= github.com/openimsdk/tools v0.0.49-alpha.19 h1:CbASL0yefRSVAmWPVeRnhF7wZKd6umLfz31CIhEgrBs=
github.com/openimsdk/tools v0.0.49-alpha.19/go.mod h1:g7mkHXYUPi0/8aAX8VPMHpnb3hqdV69Jph+bXOGvvNM= github.com/openimsdk/tools v0.0.49-alpha.19/go.mod h1:g7mkHXYUPi0/8aAX8VPMHpnb3hqdV69Jph+bXOGvvNM=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=

@ -16,8 +16,11 @@ package fcm
import ( import (
"context" "context"
"fmt"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
"github.com/openimsdk/tools/utils/httputil"
"path/filepath" "path/filepath"
"strings"
firebase "firebase.google.com/go" firebase "firebase.google.com/go"
"firebase.google.com/go/messaging" "firebase.google.com/go/messaging"
@ -40,13 +43,25 @@ type Fcm struct {
// NewClient initializes a new FCM client using the Firebase Admin SDK. // NewClient initializes a new FCM client using the Firebase Admin SDK.
// It requires the FCM service account credentials file located within the project's configuration directory. // It requires the FCM service account credentials file located within the project's configuration directory.
func NewClient(pushConf *config.Push, cache cache.ThirdCache) (*Fcm, error) { func NewClient(pushConf *config.Push, cache cache.ThirdCache, fcmConfigPath string) (*Fcm, error) {
projectRoot, err := config.GetProjectRoot() var opt option.ClientOption
if err != nil { switch {
return nil, err case len(pushConf.FCM.FilePath) != 0:
// with file path
credentialsFilePath := filepath.Join(fcmConfigPath, pushConf.FCM.FilePath)
opt = option.WithCredentialsFile(credentialsFilePath)
case len(pushConf.FCM.AuthURL) != 0:
// with authentication URL
client := httputil.NewHTTPClient(httputil.NewClientConfig())
resp, err := client.Get(pushConf.FCM.AuthURL)
if err != nil {
return nil, err
}
opt = option.WithCredentialsJSON(resp)
default:
return nil, errs.New("no FCM config").Wrap()
} }
credentialsFilePath := filepath.Join(projectRoot, "config", pushConf.FCM.ServiceAccount)
opt := option.WithCredentialsFile(credentialsFilePath)
fcmApp, err := firebase.NewApp(context.Background(), nil, opt) fcmApp, err := firebase.NewApp(context.Background(), nil, opt)
if err != nil { if err != nil {
return nil, errs.Wrap(err) return nil, errs.Wrap(err)
@ -56,7 +71,6 @@ func NewClient(pushConf *config.Push, cache cache.ThirdCache) (*Fcm, error) {
if err != nil { if err != nil {
return nil, errs.Wrap(err) return nil, errs.Wrap(err)
} }
return &Fcm{fcmMsgCli: fcmMsgClient, cache: cache}, nil return &Fcm{fcmMsgCli: fcmMsgClient, cache: cache}, nil
} }
@ -79,6 +93,8 @@ func (f *Fcm) Push(ctx context.Context, userIDs []string, title, content string,
notification.Body = content notification.Body = content
notification.Title = title notification.Title = title
var messages []*messaging.Message var messages []*messaging.Message
var sendErrBuilder strings.Builder
var msgErrBuilder strings.Builder
for userID, personTokens := range allTokens { for userID, personTokens := range allTokens {
apns := &messaging.APNSConfig{Payload: &messaging.APNSPayload{Aps: &messaging.Aps{Sound: opts.IOSPushSound}}} apns := &messaging.APNSConfig{Payload: &messaging.APNSPayload{Aps: &messaging.Aps{Sound: opts.IOSPushSound}}}
messageCount := len(messages) messageCount := len(messages)
@ -86,9 +102,21 @@ func (f *Fcm) Push(ctx context.Context, userIDs []string, title, content string,
response, err := f.fcmMsgCli.SendAll(ctx, messages) response, err := f.fcmMsgCli.SendAll(ctx, messages)
if err != nil { if err != nil {
Fail = Fail + messageCount Fail = Fail + messageCount
// Record push error
sendErrBuilder.WriteString(err.Error())
sendErrBuilder.WriteByte('.')
} else { } else {
Success = Success + response.SuccessCount Success = Success + response.SuccessCount
Fail = Fail + response.FailureCount Fail = Fail + response.FailureCount
if response.FailureCount != 0 {
// Record message error
for i := range response.Responses {
if !response.Responses[i].Success {
msgErrBuilder.WriteString(response.Responses[i].Error.Error())
msgErrBuilder.WriteByte('.')
}
}
}
} }
messages = messages[0:0] messages = messages[0:0]
} }
@ -134,5 +162,9 @@ func (f *Fcm) Push(ctx context.Context, userIDs []string, title, content string,
Fail = Fail + response.FailureCount Fail = Fail + response.FailureCount
} }
} }
if Fail != 0 {
return errs.New(fmt.Sprintf("%d message send failed;send err:%s;message err:%s",
Fail, sendErrBuilder.String(), msgErrBuilder.String())).Wrap()
}
return nil return nil
} }

@ -36,13 +36,13 @@ type OfflinePusher interface {
Push(ctx context.Context, userIDs []string, title, content string, opts *options.Opts) error Push(ctx context.Context, userIDs []string, title, content string, opts *options.Opts) error
} }
func NewOfflinePusher(pushConf *config.Push, cache cache.ThirdCache) (OfflinePusher, error) { func NewOfflinePusher(pushConf *config.Push, cache cache.ThirdCache, fcmConfigPath string) (OfflinePusher, error) {
var offlinePusher OfflinePusher var offlinePusher OfflinePusher
switch pushConf.Enable { switch pushConf.Enable {
case geTUI: case geTUI:
offlinePusher = getui.NewClient(pushConf, cache) offlinePusher = getui.NewClient(pushConf, cache)
case firebase: case firebase:
return fcm.NewClient(pushConf, cache) return fcm.NewClient(pushConf, cache, fcmConfigPath)
case jPush: case jPush:
offlinePusher = jpush.NewClient(pushConf) offlinePusher = jpush.NewClient(pushConf)
default: default:

@ -29,6 +29,7 @@ type Config struct {
WebhooksConfig config.Webhooks WebhooksConfig config.Webhooks
LocalCacheConfig config.LocalCache LocalCacheConfig config.LocalCache
Discovery config.Discovery Discovery config.Discovery
FcmConfigPath string
} }
func (p pushServer) PushMsg(ctx context.Context, req *pbpush.PushMsgReq) (*pbpush.PushMsgResp, error) { func (p pushServer) PushMsg(ctx context.Context, req *pbpush.PushMsgReq) (*pbpush.PushMsgResp, error) {
@ -50,7 +51,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
return err return err
} }
cacheModel := redis.NewThirdCache(rdb) cacheModel := redis.NewThirdCache(rdb)
offlinePusher, err := offlinepush.NewOfflinePusher(&config.RpcConfig, cacheModel) offlinePusher, err := offlinepush.NewOfflinePusher(&config.RpcConfig, cacheModel, config.FcmConfigPath)
if err != nil { if err != nil {
return err return err
} }

@ -16,6 +16,7 @@ package auth
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
redis2 "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" redis2 "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/db/redisutil"
@ -32,7 +33,6 @@ import (
"github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/tokenverify" "github.com/openimsdk/tools/tokenverify"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@ -153,21 +153,19 @@ func (s *authServer) ForceLogout(ctx context.Context, req *pbauth.ForceLogoutReq
if err := authverify.CheckAdmin(ctx, s.config.Share.IMAdminUserID); err != nil { if err := authverify.CheckAdmin(ctx, s.config.Share.IMAdminUserID); err != nil {
return nil, err return nil, err
} }
if err := s.forceKickOff(ctx, req.UserID, req.PlatformID, mcontext.GetOperationID(ctx)); err != nil { if err := s.forceKickOff(ctx, req.UserID, req.PlatformID); err != nil {
return nil, err return nil, err
} }
return &pbauth.ForceLogoutResp{}, nil return &pbauth.ForceLogoutResp{}, nil
} }
func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID int32, operationID string) error { func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID int32) error {
conns, err := s.RegisterCenter.GetConns(ctx, s.config.Share.RpcRegisterName.MessageGateway) conns, err := s.RegisterCenter.GetConns(ctx, s.config.Share.RpcRegisterName.MessageGateway)
if err != nil { if err != nil {
return err return err
} }
for _, v := range conns { for _, v := range conns {
log.ZDebug(ctx, "forceKickOff", "conn", v.Target()) log.ZDebug(ctx, "forceKickOff", "conn", v.Target())
}
for _, v := range conns {
client := msggateway.NewMsgGatewayClient(v) client := msggateway.NewMsgGatewayClient(v)
kickReq := &msggateway.KickUserOfflineReq{KickUserIDList: []string{userID}, PlatformID: platformID} kickReq := &msggateway.KickUserOfflineReq{KickUserIDList: []string{userID}, PlatformID: platformID}
_, err := client.KickUserOffline(ctx, kickReq) _, err := client.KickUserOffline(ctx, kickReq)
@ -175,8 +173,24 @@ func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID
log.ZError(ctx, "forceKickOff", err, "kickReq", kickReq) log.ZError(ctx, "forceKickOff", err, "kickReq", kickReq)
} }
} }
m, err := s.authDatabase.GetTokensWithoutError(ctx, userID, int(platformID))
if err != nil && err != redis.Nil {
return err
}
for k := range m {
m[k] = constant.KickedToken
log.ZDebug(ctx, "set token map is ", "token map", m, "userID",
userID, "token", k)
err = s.authDatabase.SetTokenMapByUidPid(ctx, userID, int(platformID), m)
if err != nil {
return err
}
}
return nil return nil
} }
func (s *authServer) InvalidateToken(ctx context.Context, req *pbauth.InvalidateTokenReq) (*pbauth.InvalidateTokenResp, error) { func (s *authServer) InvalidateToken(ctx context.Context, req *pbauth.InvalidateTokenReq) (*pbauth.InvalidateTokenResp, error) {
m, err := s.authDatabase.GetTokensWithoutError(ctx, req.UserID, int(req.PlatformID)) m, err := s.authDatabase.GetTokensWithoutError(ctx, req.UserID, int(req.PlatformID))
if err != nil && err != redis.Nil { if err != nil && err != redis.Nil {

@ -47,6 +47,7 @@ func NewPushRpcCmd() *PushRpcCmd {
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version) ret.ctx = context.WithValue(context.Background(), "version", config.Version)
ret.Command.RunE = func(cmd *cobra.Command, args []string) error { ret.Command.RunE = func(cmd *cobra.Command, args []string) error {
ret.pushConfig.FcmConfigPath = ret.ConfigPath()
return ret.runE() return ret.runE()
} }
return ret return ret

@ -31,6 +31,11 @@ type RootCmd struct {
prometheusPort int prometheusPort int
log config.Log log config.Log
index int index int
configPath string
}
func (r *RootCmd) ConfigPath() string {
return r.configPath
} }
func (r *RootCmd) Index() int { func (r *RootCmd) Index() int {
@ -153,6 +158,7 @@ func (r *RootCmd) getFlag(cmd *cobra.Command) (string, int, error) {
if err != nil { if err != nil {
return "", 0, errs.Wrap(err) return "", 0, errs.Wrap(err)
} }
r.configPath = configDirectory
index, err := cmd.Flags().GetInt(FlagTransferIndex) index, err := cmd.Flags().GetInt(FlagTransferIndex)
if err != nil { if err != nil {
return "", 0, errs.Wrap(err) return "", 0, errs.Wrap(err)

@ -202,7 +202,8 @@ type Push struct {
ChannelName string `mapstructure:"channelName"` ChannelName string `mapstructure:"channelName"`
} `mapstructure:"geTui"` } `mapstructure:"geTui"`
FCM struct { FCM struct {
ServiceAccount string `mapstructure:"serviceAccount"` FilePath string `mapstructure:"filePath"`
AuthURL string `mapstructure:"authURL"`
} `mapstructure:"fcm"` } `mapstructure:"fcm"`
JPNS struct { JPNS struct {
AppKey string `mapstructure:"appKey"` AppKey string `mapstructure:"appKey"`

Loading…
Cancel
Save