diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index c0d5a5999..146565561 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -23,7 +23,6 @@ import ( "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/mcontext" - "github.com/OpenIMSDK/tools/utils" "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" @@ -66,15 +65,13 @@ func (s *Server) SetLongConnServer(LongConnServer LongConnServer) { s.LongConnServer = LongConnServer } -func NewServer(rpcPort int, proPort int, longConnServer LongConnServer) *Server { +func NewServer(rpcPort int, proPort int, longConnServer LongConnServer, conf *config.GlobalConfig) *Server { s := &Server{ -func NewServer(rpcPort int, proPort int, longConnServer LongConnServer, config *config.GlobalConfig) *Server { - return &Server{ rpcPort: rpcPort, prometheusPort: proPort, LongConnServer: longConnServer, pushTerminal: make(map[int]struct{}), - config: config, + config: conf, } s.pushTerminal[constant.IOSPlatformID] = struct{}{} s.pushTerminal[constant.AndroidPlatformID] = struct{}{} @@ -155,7 +152,7 @@ func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msgga } userPlatform := &msggateway.SingleMsgToUserPlatform{ - PlatFormID: int32(client.PlatformID), + RecvPlatFormID: int32(client.PlatformID), } if !client.IsBackground || (client.IsBackground && client.PlatformID != constant.IOSPlatformID) { diff --git a/internal/push/callback.go b/internal/push/callback.go index b8830cf9f..6415d63d6 100644 --- a/internal/push/callback.go +++ b/internal/push/callback.go @@ -16,7 +16,6 @@ package push import ( "context" - "encoding/json" "github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/sdkws" @@ -136,14 +135,3 @@ func callbackBeforeSuperGroupOnlinePush( } return nil } -func GetContent(msg *sdkws.MsgData) string { - if msg.ContentType >= constant.NotificationBegin && msg.ContentType <= constant.NotificationEnd { - var notification sdkws.NotificationElem - if err := json.Unmarshal(msg.Content, ¬ification); err != nil { - return "" - } - return notification.Detail - } else { - return string(msg.Content) - } -} diff --git a/internal/push/consumer_init.go b/internal/push/consumer_init.go index e69de29bb..351b63f46 100644 --- a/internal/push/consumer_init.go +++ b/internal/push/consumer_init.go @@ -0,0 +1,41 @@ +// Copyright © 2023 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package push + +import ( + "context" + + "github.com/openimsdk/open-im-server/v3/pkg/common/config" +) + +type Consumer struct { + pushCh ConsumerHandler + // successCount is unused + // successCount uint64 +} + +func NewConsumer(config *config.GlobalConfig, pusher *Pusher) (*Consumer, error) { + c, err := NewConsumerHandler(config, pusher) + if err != nil { + return nil, err + } + return &Consumer{ + pushCh: *c, + }, nil +} + +func (c *Consumer) Start() { + go c.pushCh.pushConsumerGroup.RegisterHandleAndConsumer(context.Background(), &c.pushCh) +} diff --git a/internal/push/offlinepush/dummy/push.go b/internal/push/offlinepush/dummy/push.go index 395c2f45e..f147886d9 100644 --- a/internal/push/offlinepush/dummy/push.go +++ b/internal/push/offlinepush/dummy/push.go @@ -16,16 +16,17 @@ package dummy import ( "context" - "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" + + "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" ) -func NewDummy() *Dummy { +func NewClient() *Dummy { return &Dummy{} } type Dummy struct { } -func (d *Dummy) Push(ctx context.Context, userIDs []string, title, content string, opts *options.Opts) error { +func (d *Dummy) Push(ctx context.Context, userIDs []string, title, content string, opts *offlinepush.Opts) error { return nil } diff --git a/internal/push/offlinepush/fcm/push.go b/internal/push/offlinepush/fcm/push.go index 6882a37c4..ed65a5af6 100644 --- a/internal/push/offlinepush/fcm/push.go +++ b/internal/push/offlinepush/fcm/push.go @@ -16,14 +16,12 @@ package fcm import ( "context" - "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" "path/filepath" firebase "firebase.google.com/go" "firebase.google.com/go/messaging" "github.com/OpenIMSDK/protocol/constant" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/redis/go-redis/v9" @@ -58,7 +56,7 @@ func NewClient(globalConfig *config.GlobalConfig, cache cache.MsgModel) *Fcm { return &Fcm{fcmMsgCli: fcmMsgClient, cache: cache} } -func (f *Fcm) Push(ctx context.Context, userIDs []string, title, content string, opts *options.Opts) error { +func (f *Fcm) Push(ctx context.Context, userIDs []string, title, content string, opts *offlinepush.Opts) error { // accounts->registrationToken allTokens := make(map[string][]string, 0) for _, account := range userIDs { diff --git a/internal/push/offlinepush/getui/push.go b/internal/push/offlinepush/getui/push.go index ba80038f7..67f6292db 100644 --- a/internal/push/offlinepush/getui/push.go +++ b/internal/push/offlinepush/getui/push.go @@ -19,7 +19,6 @@ import ( "crypto/sha256" "encoding/hex" "errors" - "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" "strconv" "sync" "time" @@ -29,7 +28,6 @@ import ( "github.com/OpenIMSDK/tools/mcontext" "github.com/OpenIMSDK/tools/utils/splitter" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" http2 "github.com/openimsdk/open-im-server/v3/pkg/common/http" @@ -53,7 +51,7 @@ const ( taskIDTTL = 1000 * 60 * 60 * 24 ) -type GeTui struct { +type Client struct { cache cache.MsgModel tokenExpireTime int64 taskIDTTL int64 diff --git a/internal/push/offlinepush/jpush/push.go b/internal/push/offlinepush/jpush/push.go index 55c4aea39..2ced4bfd3 100644 --- a/internal/push/offlinepush/jpush/push.go +++ b/internal/push/offlinepush/jpush/push.go @@ -18,8 +18,8 @@ import ( "context" "encoding/base64" "fmt" - "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" + "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/jpush/body" "github.com/openimsdk/open-im-server/v3/pkg/common/config" http2 "github.com/openimsdk/open-im-server/v3/pkg/common/http" @@ -48,7 +48,7 @@ func (j *JPush) getAuthorization(appKey string, masterSecret string) string { return Authorization } -func (j *JPush) Push(ctx context.Context, userIDs []string, title, content string, opts *options.Opts) error { +func (j *JPush) Push(ctx context.Context, userIDs []string, title, content string, opts *offlinepush.Opts) error { var pf body.Platform pf.SetAll() var au body.Audience diff --git a/internal/push/offlinepush/offlinepush_interface.go b/internal/push/offlinepush/offlinepush_interface.go new file mode 100644 index 000000000..a5d4051f9 --- /dev/null +++ b/internal/push/offlinepush/offlinepush_interface.go @@ -0,0 +1,37 @@ +// Copyright © 2023 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package offlinepush + +import ( + "context" +) + +// OfflinePusher Offline Pusher. +type OfflinePusher interface { + Push(ctx context.Context, userIDs []string, title, content string, opts *Opts) error +} + +// Opts opts. +type Opts struct { + Signal *Signal + IOSPushSound string + IOSBadgeCount bool + Ex string +} + +// Signal message id. +type Signal struct { + ClientMsgID string +} diff --git a/internal/push/offlinepush/offlinepusher.go b/internal/push/offlinepush/offlinepusher.go deleted file mode 100644 index 83bf8e66e..000000000 --- a/internal/push/offlinepush/offlinepusher.go +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package offlinepush - -import ( - "context" - "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/dummy" - "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/fcm" - "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/getui" - "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/jpush" - "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" -) - -const ( - GETUI = "getui" - FIREBASE = "fcm" - JPUSH = "jpush" -) - -// OfflinePusher Offline Pusher. -type OfflinePusher interface { - Push(ctx context.Context, userIDs []string, title, content string, opts *options.Opts) error -} - -func NewOfflinePusher(cache cache.MsgModel) OfflinePusher { - var offlinePusher OfflinePusher - switch config.Config.Push.Enable { - case GETUI: - offlinePusher = getui.NewGeTui(cache) - case FIREBASE: - offlinePusher = fcm.NewFcm(cache) - case JPUSH: - offlinePusher = jpush.NewJPush() - default: - offlinePusher = dummy.NewDummy() - } - return offlinePusher -} diff --git a/internal/push/offlinepush/options/options.go b/internal/push/offlinepush/options/options.go deleted file mode 100644 index 056f6b711..000000000 --- a/internal/push/offlinepush/options/options.go +++ /dev/null @@ -1,14 +0,0 @@ -package options - -// Opts opts. -type Opts struct { - Signal *Signal - IOSPushSound string - IOSBadgeCount bool - Ex string -} - -// Signal message id. -type Signal struct { - ClientMsgID string -} diff --git a/internal/push/onlinepusher.go b/internal/push/onlinepusher.go deleted file mode 100644 index 35b9a97b7..000000000 --- a/internal/push/onlinepusher.go +++ /dev/null @@ -1,211 +0,0 @@ -package push - -import ( - "context" - "github.com/OpenIMSDK/protocol/msggateway" - "github.com/OpenIMSDK/protocol/sdkws" - "github.com/OpenIMSDK/tools/discoveryregistry" - "github.com/OpenIMSDK/tools/log" - "github.com/OpenIMSDK/tools/utils" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "golang.org/x/sync/errgroup" - "google.golang.org/grpc" - "os" - "sync" -) - -const ( - ENVNAME = "ENVS_DISCOVERY" - KUBERNETES = "k8s" - ZOOKEEPER = "zookeeper" -) - -type OnlinePusher interface { - GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, - pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) - GetOnlinePushFailedUserIDs(ctx context.Context, msg *sdkws.MsgData, wsResults []*msggateway.SingleMsgToUserResults, - pushToUserIDs *[]string) []string -} - -type emptyOnlinePUsher struct{} - -func newEmptyOnlinePUsher() *emptyOnlinePUsher { - return &emptyOnlinePUsher{} -} - -func (emptyOnlinePUsher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, - pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { - log.ZWarn(ctx, "emptyOnlinePUsher GetConnsAndOnlinePush", nil) - return nil, nil -} -func (u emptyOnlinePUsher) GetOnlinePushFailedUserIDs(ctx context.Context, msg *sdkws.MsgData, - wsResults []*msggateway.SingleMsgToUserResults, pushToUserIDs *[]string) []string { - log.ZWarn(ctx, "emptyOnlinePUsher GetOnlinePushFailedUserIDs", nil) - return nil -} - -func NewOnlinePusher(disCov discoveryregistry.SvcDiscoveryRegistry) OnlinePusher { - var envType string - if value := os.Getenv(ENVNAME); value != "" { - envType = os.Getenv(ENVNAME) - } else { - envType = config.Config.Envs.Discovery - } - switch envType { - case KUBERNETES: - return NewK8sStaticConsistentHash(disCov) - case ZOOKEEPER: - return NewDefaultAllNode(disCov) - default: - return newEmptyOnlinePUsher() - } -} - -type DefaultAllNode struct { - disCov discoveryregistry.SvcDiscoveryRegistry -} - -func NewDefaultAllNode(disCov discoveryregistry.SvcDiscoveryRegistry) *DefaultAllNode { - return &DefaultAllNode{disCov: disCov} -} - -func (d *DefaultAllNode) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, - pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { - conns, err := d.disCov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName) - log.ZDebug(ctx, "get gateway conn", "conn length", len(conns)) - if err != nil { - return nil, err - } - - var ( - mu sync.Mutex - wg = errgroup.Group{} - input = &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: pushToUserIDs} - maxWorkers = config.Config.Push.MaxConcurrentWorkers - ) - - if maxWorkers < 3 { - maxWorkers = 3 - } - - wg.SetLimit(maxWorkers) - - // Online push message - for _, conn := range conns { - conn := conn // loop var safe - wg.Go(func() error { - msgClient := msggateway.NewMsgGatewayClient(conn) - reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, input) - if err != nil { - return nil - } - - log.ZDebug(ctx, "push result", "reply", reply) - if reply != nil && reply.SinglePushResult != nil { - mu.Lock() - wsResults = append(wsResults, reply.SinglePushResult...) - mu.Unlock() - } - - return nil - }) - } - - _ = wg.Wait() - - // always return nil - return wsResults, nil -} - -func (d *DefaultAllNode) GetOnlinePushFailedUserIDs(_ context.Context, msg *sdkws.MsgData, - wsResults []*msggateway.SingleMsgToUserResults, pushToUserIDs *[]string) []string { - - onlineSuccessUserIDs := []string{msg.SendID} - for _, v := range wsResults { - //message sender do not need offline push - if msg.SendID == v.UserID { - continue - } - // mobile online push success - if v.OnlinePush { - onlineSuccessUserIDs = append(onlineSuccessUserIDs, v.UserID) - } - - } - - return utils.SliceSub(*pushToUserIDs, onlineSuccessUserIDs) -} - -type K8sStaticConsistentHash struct { - disCov discoveryregistry.SvcDiscoveryRegistry -} - -func NewK8sStaticConsistentHash(disCov discoveryregistry.SvcDiscoveryRegistry) *K8sStaticConsistentHash { - return &K8sStaticConsistentHash{disCov: disCov} -} - -func (k *K8sStaticConsistentHash) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, - pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { - - var usersHost = make(map[string][]string) - for _, v := range pushToUserIDs { - tHost, err := k.disCov.GetUserIdHashGatewayHost(ctx, v) - if err != nil { - log.ZError(ctx, "get msg gateway hash error", err) - return nil, err - } - tUsers, tbl := usersHost[tHost] - if tbl { - tUsers = append(tUsers, v) - usersHost[tHost] = tUsers - } else { - usersHost[tHost] = []string{v} - } - } - log.ZDebug(ctx, "genUsers send hosts struct:", "usersHost", usersHost) - var usersConns = make(map[*grpc.ClientConn][]string) - for host, userIds := range usersHost { - tconn, _ := k.disCov.GetConn(ctx, host) - usersConns[tconn] = userIds - } - var ( - mu sync.Mutex - wg = errgroup.Group{} - maxWorkers = config.Config.Push.MaxConcurrentWorkers - ) - if maxWorkers < 3 { - maxWorkers = 3 - } - wg.SetLimit(maxWorkers) - for conn, userIds := range usersConns { - tcon := conn - tuserIds := userIds - wg.Go(func() error { - input := &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: tuserIds} - msgClient := msggateway.NewMsgGatewayClient(tcon) - reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, input) - if err != nil { - return nil - } - log.ZDebug(ctx, "push result", "reply", reply) - if reply != nil && reply.SinglePushResult != nil { - mu.Lock() - wsResults = append(wsResults, reply.SinglePushResult...) - mu.Unlock() - } - return nil - }) - } - _ = wg.Wait() - return wsResults, nil -} -func (k *K8sStaticConsistentHash) GetOnlinePushFailedUserIDs(_ context.Context, _ *sdkws.MsgData, - wsResults []*msggateway.SingleMsgToUserResults, _ *[]string) []string { - var needOfflinePushUserIDs []string - for _, v := range wsResults { - if !v.OnlinePush { - needOfflinePushUserIDs = append(needOfflinePushUserIDs, v.UserID) - } - } - return needOfflinePushUserIDs -} diff --git a/internal/push/push.go b/internal/push/push.go deleted file mode 100644 index 90e62ae03..000000000 --- a/internal/push/push.go +++ /dev/null @@ -1,51 +0,0 @@ -package push - -import ( - "context" - pbpush "github.com/OpenIMSDK/protocol/push" - "github.com/OpenIMSDK/tools/discoveryregistry" - "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" - "google.golang.org/grpc" -) - -type pushServer struct { - database controller.PushDatabase - disCov discoveryregistry.SvcDiscoveryRegistry - offlinePusher offlinepush.OfflinePusher - pushCh *ConsumerHandler -} - -func (p pushServer) PushMsg(ctx context.Context, req *pbpush.PushMsgReq) (*pbpush.PushMsgResp, error) { - //todo reserved Interface - return nil, nil -} - -func (p pushServer) DelUserPushToken(ctx context.Context, - req *pbpush.DelUserPushTokenReq) (resp *pbpush.DelUserPushTokenResp, err error) { - if err = p.database.DelFcmToken(ctx, req.UserID, int(req.PlatformID)); err != nil { - return nil, err - } - return &pbpush.DelUserPushTokenResp{}, nil -} - -func Start(disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { - rdb, err := cache.NewRedis() - if err != nil { - return err - } - cacheModel := cache.NewMsgCacheModel(rdb) - offlinePusher := offlinepush.NewOfflinePusher(cacheModel) - database := controller.NewPushDatabase(cacheModel) - - consumer := NewConsumerHandler(offlinePusher, rdb, disCov) - pbpush.RegisterPushMsgServiceServer(server, &pushServer{ - database: database, - disCov: disCov, - offlinePusher: offlinePusher, - pushCh: consumer, - }) - go consumer.pushConsumerGroup.RegisterHandleAndConsumer(consumer) - return nil -} diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 61106e1f1..0e68e76b3 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -16,17 +16,6 @@ package push import ( "context" - "encoding/json" - "github.com/OpenIMSDK/protocol/sdkws" - "github.com/OpenIMSDK/tools/discoveryregistry" - "github.com/OpenIMSDK/tools/mcontext" - "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" - "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" - "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" - "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" - "github.com/openimsdk/open-im-server/v3/pkg/rpccache" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" - "github.com/redis/go-redis/v9" "github.com/IBM/sarama" "github.com/OpenIMSDK/protocol/constant" @@ -40,14 +29,8 @@ import ( ) type ConsumerHandler struct { - pushConsumerGroup *kfk.MConsumerGroup - offlinePusher offlinepush.OfflinePusher - onlinePusher OnlinePusher - groupLocalCache *rpccache.GroupLocalCache - conversationLocalCache *rpccache.ConversationLocalCache - msgRpcClient rpcclient.MessageRpcClient - conversationRpcClient rpcclient.ConversationRpcClient - groupRpcClient rpcclient.GroupRpcClient + pushConsumerGroup *kfk.MConsumerGroup + pusher *Pusher } func NewConsumerHandler(config *config.GlobalConfig, pusher *Pusher) (*ConsumerHandler, error) { @@ -98,7 +81,7 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) { var err error switch msgFromMQ.MsgData.SessionType { case constant.SuperGroupChatType: - err = c.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData) + err = c.pusher.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData) default: var pushUserIDList []string isSenderSync := utils.GetSwitchFromOptions(pbData.MsgData.Options, constant.IsSenderSync) @@ -107,14 +90,18 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) { } else { pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID, pbData.MsgData.SendID) } - err = c.Push2User(ctx, pushUserIDList, pbData.MsgData) + err = c.pusher.Push2User(ctx, pushUserIDList, pbData.MsgData) } if err != nil { - log.ZError(ctx, "push failed", err, "msg", pbData.String()) + if err == errNoOfflinePusher { + log.ZWarn(ctx, "offline push failed", err, "msg", pbData.String()) + } else { + log.ZError(ctx, "push failed", err, "msg", pbData.String()) + } } } -func (*ConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } -func (*ConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } +func (ConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } +func (ConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim, ) error { @@ -125,243 +112,3 @@ func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, } return nil } - -// Push2User Suitable for two types of conversations, one is SingleChatType and the other is NotificationChatType. -func (c *ConsumerHandler) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) error { - log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String()) - if err := callbackOnlinePush(ctx, userIDs, msg); err != nil { - return err - } - - wsResults, err := c.onlinePusher.GetConnsAndOnlinePush(ctx, msg, userIDs) - if err != nil { - return err - } - - log.ZDebug(ctx, "single and notification push result", "result", wsResults, "msg", msg, "push_to_userID", userIDs) - - if !c.shouldPushOffline(ctx, msg) { - return nil - } - - for _, v := range wsResults { - //message sender do not need offline push - if msg.SendID == v.UserID { - continue - } - //receiver online push success - if v.OnlinePush { - return nil - } - } - offlinePUshUserID := []string{msg.RecvID} - //receiver offline push - if err = callbackOfflinePush(ctx, offlinePUshUserID, msg, nil); err != nil { - return err - } - - err = c.offlinePushMsg(ctx, msg, offlinePUshUserID) - if err != nil { - return err - } - - return nil -} - -func (c *ConsumerHandler) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { - log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID) - var pushToUserIDs []string - if err = callbackBeforeSuperGroupOnlinePush(ctx, groupID, msg, &pushToUserIDs); err != nil { - return err - } - - err = c.groupMessagesHandler(ctx, groupID, &pushToUserIDs, msg) - if err != nil { - return err - } - - wsResults, err := c.onlinePusher.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs) - if err != nil { - return err - } - - log.ZDebug(ctx, "group push result", "result", wsResults, "msg", msg) - - if !c.shouldPushOffline(ctx, msg) { - return nil - } - needOfflinePushUserIDs := c.onlinePusher.GetOnlinePushFailedUserIDs(ctx, msg, wsResults, &pushToUserIDs) - - //filter some user, like don not disturb or don't need offline push etc. - needOfflinePushUserIDs, err = c.filterGroupMessageOfflinePush(ctx, groupID, msg, needOfflinePushUserIDs) - if err != nil { - return err - } - // Use offline push messaging - if len(needOfflinePushUserIDs) > 0 { - var offlinePushUserIDs []string - err = callbackOfflinePush(ctx, needOfflinePushUserIDs, msg, &offlinePushUserIDs) - if err != nil { - return err - } - - if len(offlinePushUserIDs) > 0 { - needOfflinePushUserIDs = offlinePushUserIDs - } - - err = c.offlinePushMsg(ctx, msg, needOfflinePushUserIDs) - if err != nil { - log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg) - return err - } - - } - - return nil -} - -func (c *ConsumerHandler) offlinePushMsg(ctx context.Context, msg *sdkws.MsgData, offlinePushUserIDs []string) error { - title, content, opts, err := c.getOfflinePushInfos(msg) - if err != nil { - return err - } - err = c.offlinePusher.Push(ctx, offlinePushUserIDs, title, content, opts) - if err != nil { - prommetrics.MsgOfflinePushFailedCounter.Inc() - return err - } - return nil -} - -func (c *ConsumerHandler) filterGroupMessageOfflinePush(ctx context.Context, groupID string, msg *sdkws.MsgData, - offlinePushUserIDs []string) (userIDs []string, err error) { - - //todo local cache Obtain the difference set through local comparison. - needOfflinePushUserIDs, err := c.conversationRpcClient.GetConversationOfflinePushUserIDs( - ctx, utils.GenGroupConversationID(groupID), offlinePushUserIDs) - if err != nil { - return nil, err - } - return needOfflinePushUserIDs, nil -} - -func (c *ConsumerHandler) getOfflinePushInfos(msg *sdkws.MsgData) (title, content string, opts *options.Opts, err error) { - type AtTextElem struct { - Text string `json:"text,omitempty"` - AtUserList []string `json:"atUserList,omitempty"` - IsAtSelf bool `json:"isAtSelf"` - } - - opts = &options.Opts{Signal: &options.Signal{}} - if msg.OfflinePushInfo != nil { - opts.IOSBadgeCount = msg.OfflinePushInfo.IOSBadgeCount - opts.IOSPushSound = msg.OfflinePushInfo.IOSPushSound - opts.Ex = msg.OfflinePushInfo.Ex - } - - if msg.OfflinePushInfo != nil { - title = msg.OfflinePushInfo.Title - content = msg.OfflinePushInfo.Desc - } - if title == "" { - switch msg.ContentType { - case constant.Text: - fallthrough - case constant.Picture: - fallthrough - case constant.Voice: - fallthrough - case constant.Video: - fallthrough - case constant.File: - title = constant.ContentType2PushContent[int64(msg.ContentType)] - case constant.AtText: - ac := AtTextElem{} - _ = utils.JsonStringToStruct(string(msg.Content), &ac) - case constant.SignalingNotification: - title = constant.ContentType2PushContent[constant.SignalMsg] - default: - title = constant.ContentType2PushContent[constant.Common] - } - } - if content == "" { - content = title - } - return -} -func (c *ConsumerHandler) groupMessagesHandler(ctx context.Context, groupID string, pushToUserIDs *[]string, msg *sdkws.MsgData) (err error) { - if len(*pushToUserIDs) == 0 { - *pushToUserIDs, err = c.groupLocalCache.GetGroupMemberIDs(ctx, groupID) - if err != nil { - return err - } - switch msg.ContentType { - case constant.MemberQuitNotification: - var tips sdkws.MemberQuitTips - if unmarshalNotificationElem(msg.Content, &tips) != nil { - return err - } - if err = c.DeleteMemberAndSetConversationSeq(ctx, groupID, []string{tips.QuitUser.UserID}); err != nil { - log.ZError(ctx, "MemberQuitNotification DeleteMemberAndSetConversationSeq", err, "groupID", groupID, "userID", tips.QuitUser.UserID) - } - *pushToUserIDs = append(*pushToUserIDs, tips.QuitUser.UserID) - case constant.MemberKickedNotification: - var tips sdkws.MemberKickedTips - if unmarshalNotificationElem(msg.Content, &tips) != nil { - return err - } - kickedUsers := utils.Slice(tips.KickedUserList, func(e *sdkws.GroupMemberFullInfo) string { return e.UserID }) - if err = c.DeleteMemberAndSetConversationSeq(ctx, groupID, kickedUsers); err != nil { - log.ZError(ctx, "MemberKickedNotification DeleteMemberAndSetConversationSeq", err, "groupID", groupID, "userIDs", kickedUsers) - } - - *pushToUserIDs = append(*pushToUserIDs, kickedUsers...) - case constant.GroupDismissedNotification: - if msgprocessor.IsNotification(msgprocessor.GetConversationIDByMsg(msg)) { // 消息先到,通知后到 - var tips sdkws.GroupDismissedTips - if unmarshalNotificationElem(msg.Content, &tips) != nil { - return err - } - log.ZInfo(ctx, "GroupDismissedNotificationInfo****", "groupID", groupID, "num", len(*pushToUserIDs), "list", pushToUserIDs) - if len(config.Config.Manager.UserID) > 0 { - ctx = mcontext.WithOpUserIDContext(ctx, config.Config.Manager.UserID[0]) - } - defer func(groupID string) { - if err = c.groupRpcClient.DismissGroup(ctx, groupID); err != nil { - log.ZError(ctx, "DismissGroup Notification clear members", err, "groupID", groupID) - } - }(groupID) - } - } - } - return err -} - -func (c *ConsumerHandler) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID string, userIDs []string) error { - conversationID := msgprocessor.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID) - maxSeq, err := c.msgRpcClient.GetConversationMaxSeq(ctx, conversationID) - if err != nil { - return err - } - return c.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conversationID, maxSeq) -} - -func unmarshalNotificationElem(bytes []byte, t any) error { - var notification sdkws.NotificationElem - if err := json.Unmarshal(bytes, ¬ification); err != nil { - return err - } - - return json.Unmarshal([]byte(notification.Detail), t) -} - -func (c *ConsumerHandler) shouldPushOffline(_ context.Context, msg *sdkws.MsgData) bool { - isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush) - if !isOfflinePush { - return false - } - if msg.ContentType == constant.SignalingNotification { - return false - } - return true -} diff --git a/internal/push/push_rpc_server.go b/internal/push/push_rpc_server.go index e69de29bb..d5d4c9242 100644 --- a/internal/push/push_rpc_server.go +++ b/internal/push/push_rpc_server.go @@ -0,0 +1,108 @@ +// Copyright © 2023 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package push + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/rpccache" + + "github.com/OpenIMSDK/protocol/constant" + pbpush "github.com/OpenIMSDK/protocol/push" + "github.com/OpenIMSDK/tools/discoveryregistry" + "github.com/OpenIMSDK/tools/log" + "github.com/OpenIMSDK/tools/utils" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" + "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" + "google.golang.org/grpc" +) + +type pushServer struct { + pusher *Pusher + config *config.GlobalConfig +} + +func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { + rdb, err := cache.NewRedis(config) + if err != nil { + return err + } + cacheModel := cache.NewMsgCacheModel(rdb, config) + offlinePusher := NewOfflinePusher(config, cacheModel) + database := controller.NewPushDatabase(cacheModel) + groupRpcClient := rpcclient.NewGroupRpcClient(client, config) + conversationRpcClient := rpcclient.NewConversationRpcClient(client, config) + msgRpcClient := rpcclient.NewMessageRpcClient(client, config) + pusher := NewPusher( + config, + client, + offlinePusher, + database, + rpccache.NewGroupLocalCache(groupRpcClient, rdb), + rpccache.NewConversationLocalCache(conversationRpcClient, rdb), + &conversationRpcClient, + &groupRpcClient, + &msgRpcClient, + ) + + pbpush.RegisterPushMsgServiceServer(server, &pushServer{ + pusher: pusher, + config: config, + }) + + consumer, err := NewConsumer(config, pusher) + if err != nil { + return err + } + + consumer.Start() + + return nil +} + +func (r *pushServer) PushMsg(ctx context.Context, pbData *pbpush.PushMsgReq) (resp *pbpush.PushMsgResp, err error) { + switch pbData.MsgData.SessionType { + case constant.SuperGroupChatType: + err = r.pusher.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData) + default: + var pushUserIDList []string + isSenderSync := utils.GetSwitchFromOptions(pbData.MsgData.Options, constant.IsSenderSync) + if !isSenderSync { + pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID) + } else { + pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID, pbData.MsgData.SendID) + } + err = r.pusher.Push2User(ctx, pushUserIDList, pbData.MsgData) + } + if err != nil { + if err != errNoOfflinePusher { + return nil, err + } else { + log.ZWarn(ctx, "offline push failed", err, "msg", pbData.String()) + } + } + return &pbpush.PushMsgResp{}, nil +} + +func (r *pushServer) DelUserPushToken( + ctx context.Context, + req *pbpush.DelUserPushTokenReq, +) (resp *pbpush.DelUserPushTokenResp, err error) { + if err = r.pusher.database.DelFcmToken(ctx, req.UserID, int(req.PlatformID)); err != nil { + return nil, err + } + return &pbpush.DelUserPushTokenResp{}, nil +} diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index e69de29bb..2bf17eaf9 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -0,0 +1,522 @@ +// Copyright © 2023 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package push + +import ( + "context" + "encoding/json" + "errors" + "github.com/openimsdk/open-im-server/v3/pkg/rpccache" + "sync" + + "github.com/OpenIMSDK/protocol/constant" + "github.com/OpenIMSDK/protocol/conversation" + "github.com/OpenIMSDK/protocol/msggateway" + "github.com/OpenIMSDK/protocol/sdkws" + "github.com/OpenIMSDK/tools/discoveryregistry" + "github.com/OpenIMSDK/tools/log" + "github.com/OpenIMSDK/tools/mcontext" + "github.com/OpenIMSDK/tools/utils" + "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" + "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/dummy" + "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/fcm" + "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/getui" + "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/jpush" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" + "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" + "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" +) + +type Pusher struct { + config *config.GlobalConfig + database controller.PushDatabase + discov discoveryregistry.SvcDiscoveryRegistry + offlinePusher offlinepush.OfflinePusher + groupLocalCache *rpccache.GroupLocalCache + conversationLocalCache *rpccache.ConversationLocalCache + msgRpcClient *rpcclient.MessageRpcClient + conversationRpcClient *rpcclient.ConversationRpcClient + groupRpcClient *rpcclient.GroupRpcClient +} + +var errNoOfflinePusher = errors.New("no offlinePusher is configured") + +func NewPusher(config *config.GlobalConfig, discov discoveryregistry.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase, + groupLocalCache *rpccache.GroupLocalCache, conversationLocalCache *rpccache.ConversationLocalCache, + conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, msgRpcClient *rpcclient.MessageRpcClient, +) *Pusher { + return &Pusher{ + config: config, + discov: discov, + database: database, + offlinePusher: offlinePusher, + groupLocalCache: groupLocalCache, + conversationLocalCache: conversationLocalCache, + msgRpcClient: msgRpcClient, + conversationRpcClient: conversationRpcClient, + groupRpcClient: groupRpcClient, + } +} + +func NewOfflinePusher(config *config.GlobalConfig, cache cache.MsgModel) offlinepush.OfflinePusher { + var offlinePusher offlinepush.OfflinePusher + switch config.Push.Enable { + case "getui": + offlinePusher = getui.NewClient(config, cache) + case "fcm": + offlinePusher = fcm.NewClient(config, cache) + case "jpush": + offlinePusher = jpush.NewClient(config) + default: + offlinePusher = dummy.NewClient() + } + return offlinePusher +} + +func (p *Pusher) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID string, userIDs []string) error { + conevrsationID := msgprocessor.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID) + maxSeq, err := p.msgRpcClient.GetConversationMaxSeq(ctx, conevrsationID) + if err != nil { + return err + } + return p.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conevrsationID, maxSeq) +} + +func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) error { + log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String()) + if err := callbackOnlinePush(ctx, p.config, userIDs, msg); err != nil { + return err + } + // push + wsResults, err := p.GetConnsAndOnlinePush(ctx, msg, userIDs) + if err != nil { + return err + } + + isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush) + log.ZDebug(ctx, "push_result", "ws push result", wsResults, "sendData", msg, "isOfflinePush", isOfflinePush, "push_to_userID", userIDs) + + if !isOfflinePush { + return nil + } + + if len(wsResults) == 0 { + return nil + } + onlinePushSuccUserIDSet := utils.SliceSet(utils.Filter(wsResults, func(e *msggateway.SingleMsgToUserResults) (string, bool) { + return e.UserID, e.OnlinePush && e.UserID != "" + })) + offlinePushUserIDList := utils.Filter(wsResults, func(e *msggateway.SingleMsgToUserResults) (string, bool) { + _, exist := onlinePushSuccUserIDSet[e.UserID] + return e.UserID, !exist && e.UserID != "" && e.UserID != msg.SendID + }) + + if len(offlinePushUserIDList) > 0 { + if err = callbackOfflinePush(ctx, p.config, offlinePushUserIDList, msg, &[]string{}); err != nil { + return err + } + err = p.offlinePushMsg(ctx, msg.SendID, msg, offlinePushUserIDList) + if err != nil { + return err + } + } + return nil +} + +func (p *Pusher) UnmarshalNotificationElem(bytes []byte, t any) error { + var notification sdkws.NotificationElem + if err := json.Unmarshal(bytes, ¬ification); err != nil { + return err + } + + return json.Unmarshal([]byte(notification.Detail), t) +} + +/* +k8s deployment,offline push group messages function. +*/ +func (p *Pusher) k8sOfflinePush2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData, wsResults []*msggateway.SingleMsgToUserResults) error { + + var needOfflinePushUserIDs []string + for _, v := range wsResults { + if !v.OnlinePush { + needOfflinePushUserIDs = append(needOfflinePushUserIDs, v.UserID) + } + } + if len(needOfflinePushUserIDs) > 0 { + var offlinePushUserIDs []string + err := callbackOfflinePush(ctx, p.config, needOfflinePushUserIDs, msg, &offlinePushUserIDs) + if err != nil { + return err + } + + if len(offlinePushUserIDs) > 0 { + needOfflinePushUserIDs = offlinePushUserIDs + } + if msg.ContentType != constant.SignalingNotification { + resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs( + ctx, + &conversation.GetConversationOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs}, + ) + if err != nil { + return err + } + if len(resp.UserIDs) > 0 { + err = p.offlinePushMsg(ctx, groupID, msg, resp.UserIDs) + if err != nil { + log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg) + return err + } + } + } + + } + return nil +} +func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { + log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID) + var pushToUserIDs []string + if err = callbackBeforeSuperGroupOnlinePush(ctx, p.config, groupID, msg, &pushToUserIDs); err != nil { + return err + } + + if len(pushToUserIDs) == 0 { + pushToUserIDs, err = p.groupLocalCache.GetGroupMemberIDs(ctx, groupID) + if err != nil { + return err + } + + switch msg.ContentType { + case constant.MemberQuitNotification: + var tips sdkws.MemberQuitTips + if p.UnmarshalNotificationElem(msg.Content, &tips) != nil { + return err + } + defer func(groupID string, userIDs []string) { + if err = p.DeleteMemberAndSetConversationSeq(ctx, groupID, userIDs); err != nil { + log.ZError(ctx, "MemberQuitNotification DeleteMemberAndSetConversationSeq", err, "groupID", groupID, "userIDs", userIDs) + } + }(groupID, []string{tips.QuitUser.UserID}) + pushToUserIDs = append(pushToUserIDs, tips.QuitUser.UserID) + case constant.MemberKickedNotification: + var tips sdkws.MemberKickedTips + if p.UnmarshalNotificationElem(msg.Content, &tips) != nil { + return err + } + kickedUsers := utils.Slice(tips.KickedUserList, func(e *sdkws.GroupMemberFullInfo) string { return e.UserID }) + defer func(groupID string, userIDs []string) { + if err = p.DeleteMemberAndSetConversationSeq(ctx, groupID, userIDs); err != nil { + log.ZError(ctx, "MemberKickedNotification DeleteMemberAndSetConversationSeq", err, "groupID", groupID, "userIDs", userIDs) + } + }(groupID, kickedUsers) + pushToUserIDs = append(pushToUserIDs, kickedUsers...) + case constant.GroupDismissedNotification: + // Messages arrive first, notifications arrive later + if msgprocessor.IsNotification(msgprocessor.GetConversationIDByMsg(msg)) { + var tips sdkws.GroupDismissedTips + if p.UnmarshalNotificationElem(msg.Content, &tips) != nil { + return err + } + log.ZInfo(ctx, "GroupDismissedNotificationInfo****", "groupID", groupID, "num", len(pushToUserIDs), "list", pushToUserIDs) + if len(p.config.Manager.UserID) > 0 { + ctx = mcontext.WithOpUserIDContext(ctx, p.config.Manager.UserID[0]) + } + if len(p.config.Manager.UserID) == 0 && len(p.config.IMAdmin.UserID) > 0 { + ctx = mcontext.WithOpUserIDContext(ctx, p.config.IMAdmin.UserID[0]) + } + defer func(groupID string) { + if err = p.groupRpcClient.DismissGroup(ctx, groupID); err != nil { + log.ZError(ctx, "DismissGroup Notification clear members", err, "groupID", groupID) + } + }(groupID) + } + } + } + + wsResults, err := p.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs) + if err != nil { + return err + } + + log.ZDebug(ctx, "get conn and online push success", "result", wsResults, "msg", msg) + isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush) + if isOfflinePush && p.config.Envs.Discovery == "k8s" { + return p.k8sOfflinePush2SuperGroup(ctx, groupID, msg, wsResults) + } + if isOfflinePush && p.config.Envs.Discovery == "zookeeper" { + var ( + onlineSuccessUserIDs = []string{msg.SendID} + webAndPcBackgroundUserIDs []string + ) + + for _, v := range wsResults { + if v.OnlinePush && v.UserID != msg.SendID { + onlineSuccessUserIDs = append(onlineSuccessUserIDs, v.UserID) + } + + if v.OnlinePush { + continue + } + + if len(v.Resp) == 0 { + continue + } + + for _, singleResult := range v.Resp { + if singleResult.ResultCode != -2 { + continue + } + + isPC := constant.PlatformIDToName(int(singleResult.RecvPlatFormID)) == constant.TerminalPC + isWebID := singleResult.RecvPlatFormID == constant.WebPlatformID + + if isPC || isWebID { + webAndPcBackgroundUserIDs = append(webAndPcBackgroundUserIDs, v.UserID) + } + } + } + + needOfflinePushUserIDs := utils.DifferenceString(onlineSuccessUserIDs, pushToUserIDs) + + // Use offline push messaging + if len(needOfflinePushUserIDs) > 0 { + var offlinePushUserIDs []string + err = callbackOfflinePush(ctx, p.config, needOfflinePushUserIDs, msg, &offlinePushUserIDs) + if err != nil { + return err + } + + if len(offlinePushUserIDs) > 0 { + needOfflinePushUserIDs = offlinePushUserIDs + } + if msg.ContentType != constant.SignalingNotification { + resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs( + ctx, + &conversation.GetConversationOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs}, + ) + if err != nil { + return err + } + if len(resp.UserIDs) > 0 { + err = p.offlinePushMsg(ctx, groupID, msg, resp.UserIDs) + if err != nil { + log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg) + return err + } + if _, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(resp.UserIDs, webAndPcBackgroundUserIDs)); err != nil { + log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, webAndPcBackgroundUserIDs)) + return err + } + } + } + + } + } + return nil +} + +func (p *Pusher) k8sOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { + var usersHost = make(map[string][]string) + for _, v := range pushToUserIDs { + tHost, err := p.discov.GetUserIdHashGatewayHost(ctx, v) + if err != nil { + log.ZError(ctx, "get msggateway hash error", err) + return nil, err + } + tUsers, tbl := usersHost[tHost] + if tbl { + tUsers = append(tUsers, v) + usersHost[tHost] = tUsers + } else { + usersHost[tHost] = []string{v} + } + } + log.ZDebug(ctx, "genUsers send hosts struct:", "usersHost", usersHost) + var usersConns = make(map[*grpc.ClientConn][]string) + for host, userIds := range usersHost { + tconn, _ := p.discov.GetConn(ctx, host) + usersConns[tconn] = userIds + } + var ( + mu sync.Mutex + wg = errgroup.Group{} + maxWorkers = p.config.Push.MaxConcurrentWorkers + ) + if maxWorkers < 3 { + maxWorkers = 3 + } + wg.SetLimit(maxWorkers) + for conn, userIds := range usersConns { + tcon := conn + tuserIds := userIds + wg.Go(func() error { + input := &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: tuserIds} + msgClient := msggateway.NewMsgGatewayClient(tcon) + reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, input) + if err != nil { + return nil + } + log.ZDebug(ctx, "push result", "reply", reply) + if reply != nil && reply.SinglePushResult != nil { + mu.Lock() + wsResults = append(wsResults, reply.SinglePushResult...) + mu.Unlock() + } + return nil + }) + } + _ = wg.Wait() + return wsResults, nil +} +func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { + if p.config.Envs.Discovery == "k8s" { + return p.k8sOnlinePush(ctx, msg, pushToUserIDs) + } + conns, err := p.discov.GetConns(ctx, p.config.RpcRegisterName.OpenImMessageGatewayName) + log.ZDebug(ctx, "get gateway conn", "conn length", len(conns)) + if err != nil { + return nil, err + } + + var ( + mu sync.Mutex + wg = errgroup.Group{} + input = &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: pushToUserIDs} + maxWorkers = p.config.Push.MaxConcurrentWorkers + ) + + if maxWorkers < 3 { + maxWorkers = 3 + } + + wg.SetLimit(maxWorkers) + + // Online push message + for _, conn := range conns { + conn := conn // loop var safe + wg.Go(func() error { + msgClient := msggateway.NewMsgGatewayClient(conn) + reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, input) + if err != nil { + return nil + } + + log.ZDebug(ctx, "push result", "reply", reply) + if reply != nil && reply.SinglePushResult != nil { + mu.Lock() + wsResults = append(wsResults, reply.SinglePushResult...) + mu.Unlock() + } + + return nil + }) + } + + _ = wg.Wait() + + // always return nil + return wsResults, nil +} + +func (p *Pusher) offlinePushMsg(ctx context.Context, conversationID string, msg *sdkws.MsgData, offlinePushUserIDs []string) error { + title, content, opts, err := p.getOfflinePushInfos(conversationID, msg) + if err != nil { + return err + } + err = p.offlinePusher.Push(ctx, offlinePushUserIDs, title, content, opts) + if err != nil { + prommetrics.MsgOfflinePushFailedCounter.Inc() + return err + } + return nil +} + +func (p *Pusher) GetOfflinePushOpts(msg *sdkws.MsgData) (opts *offlinepush.Opts, err error) { + opts = &offlinepush.Opts{Signal: &offlinepush.Signal{}} + // if msg.ContentType > constant.SignalingNotificationBegin && msg.ContentType < constant.SignalingNotificationEnd { + // req := &sdkws.SignalReq{} + // if err := proto.Unmarshal(msg.Content, req); err != nil { + // return nil, utils.Wrap(err, "") + // } + // switch req.Payload.(type) { + // case *sdkws.SignalReq_Invite, *sdkws.SignalReq_InviteInGroup: + // opts.Signal = &offlinepush.Signal{ClientMsgID: msg.ClientMsgID} + // } + // } + if msg.OfflinePushInfo != nil { + opts.IOSBadgeCount = msg.OfflinePushInfo.IOSBadgeCount + opts.IOSPushSound = msg.OfflinePushInfo.IOSPushSound + opts.Ex = msg.OfflinePushInfo.Ex + } + return opts, nil +} + +func (p *Pusher) getOfflinePushInfos(conversationID string, msg *sdkws.MsgData) (title, content string, opts *offlinepush.Opts, err error) { + if p.offlinePusher == nil { + err = errNoOfflinePusher + return + } + + type atContent struct { + Text string `json:"text"` + AtUserList []string `json:"atUserList"` + IsAtSelf bool `json:"isAtSelf"` + } + + opts, err = p.GetOfflinePushOpts(msg) + if err != nil { + return + } + + if msg.OfflinePushInfo != nil { + title = msg.OfflinePushInfo.Title + content = msg.OfflinePushInfo.Desc + } + if title == "" { + switch msg.ContentType { + case constant.Text: + fallthrough + case constant.Picture: + fallthrough + case constant.Voice: + fallthrough + case constant.Video: + fallthrough + case constant.File: + title = constant.ContentType2PushContent[int64(msg.ContentType)] + case constant.AtText: + ac := atContent{} + _ = utils.JsonStringToStruct(string(msg.Content), &ac) + if utils.IsContain(conversationID, ac.AtUserList) { + title = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common] + } else { + title = constant.ContentType2PushContent[constant.GroupMsg] + } + case constant.SignalingNotification: + title = constant.ContentType2PushContent[constant.SignalMsg] + default: + title = constant.ContentType2PushContent[constant.Common] + } + } + if content == "" { + content = title + } + return +} diff --git a/internal/push/tools.go b/internal/push/tools.go new file mode 100644 index 000000000..3242767b1 --- /dev/null +++ b/internal/push/tools.go @@ -0,0 +1,32 @@ +// Copyright © 2023 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package push + +import ( + "github.com/OpenIMSDK/protocol/constant" + "github.com/OpenIMSDK/protocol/sdkws" + "google.golang.org/protobuf/proto" +) + +func GetContent(msg *sdkws.MsgData) string { + if msg.ContentType >= constant.NotificationBegin && msg.ContentType <= constant.NotificationEnd { + var tips sdkws.TipsComm + _ = proto.Unmarshal(msg.Content, &tips) + content := tips.JsonDetail + return content + } else { + return string(msg.Content) + } +} diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 9ae32cd8e..81b4d9d45 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -48,14 +48,6 @@ type conversationServer struct { config *config.GlobalConfig } -func (c *conversationServer) GetConversationNotReceiveMessageUserIDs( - ctx context.Context, - req *pbconversation.GetConversationNotReceiveMessageUserIDsReq, -) (*pbconversation.GetConversationNotReceiveMessageUserIDsResp, error) { - //TODO implement me - panic("implement me") -} - func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { rdb, err := cache.NewRedis(config) if err != nil { diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index 75a3bc1d8..9466c5224 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -31,23 +31,16 @@ import ( "github.com/OpenIMSDK/protocol/constant" pbfriend "github.com/OpenIMSDK/protocol/friend" - "github.com/OpenIMSDK/protocol/sdkws" registry "github.com/OpenIMSDK/tools/discoveryregistry" "github.com/OpenIMSDK/tools/errs" - "github.com/OpenIMSDK/tools/log" - "github.com/OpenIMSDK/tools/tx" "github.com/OpenIMSDK/tools/utils" - "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo" tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification" - "google.golang.org/grpc" ) type friendServer struct { diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 726cc3420..6212dea03 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -15,11 +15,8 @@ package msg import ( - "context" "github.com/openimsdk/open-im-server/v3/pkg/rpccache" - "google.golang.org/grpc" - "github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/conversation" "github.com/OpenIMSDK/protocol/msg" diff --git a/internal/rpc/msg/verify.go b/internal/rpc/msg/verify.go index b964eca9b..318464cf8 100644 --- a/internal/rpc/msg/verify.go +++ b/internal/rpc/msg/verify.go @@ -67,7 +67,7 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe if black { return errs.ErrBlockedByPeer.Wrap() } - if *config.Config.MessageVerify.FriendVerify { + if m.config.MessageVerify.FriendVerify != nil && *m.config.MessageVerify.FriendVerify { friend, err := m.FriendLocalCache.IsFriend(ctx, data.MsgData.SendID, data.MsgData.RecvID) if err != nil { return err diff --git a/pkg/common/db/cache/conversation.go b/pkg/common/db/cache/conversation.go index 8aee6ded3..ef725642c 100644 --- a/pkg/common/db/cache/conversation.go +++ b/pkg/common/db/cache/conversation.go @@ -16,7 +16,6 @@ package cache import ( "context" - "errors" "github.com/OpenIMSDK/tools/log" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/config" diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index 00f715785..eb79fdb70 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -17,7 +17,6 @@ package cache import ( "context" "errors" - "github.com/dtm-labs/rockscache" "strconv" "time" diff --git a/pkg/common/db/localcache/conversation.go b/pkg/common/db/localcache/conversation.go deleted file mode 100644 index e69de29bb..000000000 diff --git a/pkg/common/db/localcache/group.go b/pkg/common/db/localcache/group.go deleted file mode 100644 index e69de29bb..000000000 diff --git a/pkg/rpcclient/group.go b/pkg/rpcclient/group.go index 607ea74ed..ec7aab695 100644 --- a/pkg/rpcclient/group.go +++ b/pkg/rpcclient/group.go @@ -26,7 +26,6 @@ import ( "github.com/OpenIMSDK/tools/utils" "github.com/openimsdk/open-im-server/v3/pkg/common/config" util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" - "google.golang.org/grpc" ) type Group struct { @@ -41,7 +40,7 @@ func NewGroup(discov discoveryregistry.SvcDiscoveryRegistry, config *config.Glob util.ExitWithError(err) } client := group.NewGroupClient(conn) - return &Group{discov: discov, conn: conn, Client: client, Config: config} + return &Group{discov: discov, Client: client, Config: config} } type GroupRpcClient Group