feat: implement offline push.

pull/2600/head
Monet Lee 1 year ago
parent 6e841f8b69
commit 302a6dba99

@ -14,6 +14,8 @@ toRedisTopic: toRedis
toMongoTopic: toMongo
# Kafka topic for push notifications
toPushTopic: toPush
# Kafka topic for offline push notifications
toOfflinePushTopic: toOfflinePush
# Consumer group ID for Redis topic
toRedisGroupID: redis
# Consumer group ID for MongoDB topic

@ -12,7 +12,7 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.72-alpha.13
github.com/openimsdk/protocol v0.0.72-alpha.16
github.com/openimsdk/tools v0.0.50-alpha.11
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0

@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.13 h1:ILpvuxWGrVJMVCPRodOQcrSMFKUBzLahBPb8GkITWSc=
github.com/openimsdk/protocol v0.0.72-alpha.13/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/protocol v0.0.72-alpha.16 h1:lqEPykX7AQoymxvCy1Je+iPjG3ifuQ/0QMupK/B+fTo=
github.com/openimsdk/protocol v0.0.72-alpha.16/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.50-alpha.11 h1:ClhkRjUVJWbmOiQ14G6do/ES1a6ZueDITv40Apwq/Tc=
github.com/openimsdk/tools v0.0.50-alpha.11/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=

@ -108,6 +108,7 @@ func Start(ctx context.Context, index int, config *Config) error {
if err != nil {
return err
}
msgTransfer := &MsgTransfer{
historyCH: historyCH,
historyMongoCH: historyMongoCH,

@ -2,11 +2,14 @@ package push
import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
pbpush "github.com/openimsdk/protocol/push"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/discovery"
"google.golang.org/grpc"
@ -46,6 +49,10 @@ func (p pushServer) DelUserPushToken(ctx context.Context,
}
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
if err != nil {
return err
}
rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
if err != nil {
return err
@ -56,8 +63,28 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
return err
}
database := controller.NewPushDatabase(cacheModel)
msgModel := redis.NewMsgCache(rdb)
msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB())
if err != nil {
return err
}
seqConversation, err := mgo.NewSeqConversationMongo(mgocli.GetDB())
if err != nil {
return err
}
seqConversationCache := redis.NewSeqConversationCacheRedis(rdb, seqConversation)
seqUser, err := mgo.NewSeqUserMongo(mgocli.GetDB())
if err != nil {
return err
}
seqUserCache := redis.NewSeqUserCacheRedis(rdb, seqUser)
msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqUserCache, seqConversationCache, &config.KafkaConfig)
if err != nil {
return err
}
consumer, err := NewConsumerHandler(config, offlinePusher, rdb, client)
consumer, err := NewConsumerHandler(config, msgDatabase, offlinePusher, rdb, client)
if err != nil {
return err
}

@ -1,33 +1,20 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package push
import (
"context"
"encoding/json"
"github.com/IBM/sarama"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil"
"github.com/openimsdk/protocol/constant"
pbchat "github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/msggateway"
pbpush "github.com/openimsdk/protocol/push"
"github.com/openimsdk/protocol/sdkws"
@ -46,6 +33,7 @@ type ConsumerHandler struct {
pushConsumerGroup *kafka.MConsumerGroup
offlinePusher offlinepush.OfflinePusher
onlinePusher OnlinePusher
msgDatabase controller.CommonMsgDatabase
onlineCache *rpccache.OnlineCache
groupLocalCache *rpccache.GroupLocalCache
conversationLocalCache *rpccache.ConversationLocalCache
@ -56,7 +44,7 @@ type ConsumerHandler struct {
config *Config
}
func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher, rdb redis.UniversalClient,
func NewConsumerHandler(config *Config, database controller.CommonMsgDatabase, offlinePusher offlinepush.OfflinePusher, rdb redis.UniversalClient,
client discovery.SvcDiscoveryRegistry) (*ConsumerHandler, error) {
var consumerHandler ConsumerHandler
var err error
@ -75,43 +63,51 @@ func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher,
consumerHandler.conversationLocalCache = rpccache.NewConversationLocalCache(consumerHandler.conversationRpcClient, &config.LocalCacheConfig, rdb)
consumerHandler.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL)
consumerHandler.config = config
consumerHandler.msgDatabase = database
//
consumerHandler.onlineCache = rpccache.NewOnlineCache(userRpcClient, consumerHandler.groupLocalCache, rdb, nil)
return &consumerHandler, nil
}
func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
msgFromMQ := pbchat.PushMsgDataToMQ{}
msgFromMQ := pbpush.PushMsgReq{}
if err := proto.Unmarshal(msg, &msgFromMQ); err != nil {
log.ZError(ctx, "push Unmarshal msg err", err, "msg", string(msg))
return
}
pbData := &pbpush.PushMsgReq{
MsgData: msgFromMQ.MsgData,
ConversationID: msgFromMQ.ConversationID,
}
sec := msgFromMQ.MsgData.SendTime / 1000
nowSec := timeutil.GetCurrentTimestampBySecond()
if nowSec-sec > 10 {
prommetrics.MsgLoneTimePushCounter.Inc()
log.ZWarn(ctx, "its been a while since the message was sent", nil, "msg", pbData.String(), "sec", sec, "nowSec", nowSec, "nowSec-sec", nowSec-sec)
log.ZWarn(ctx, "its been a while since the message was sent", nil, "msg", msgFromMQ.String(), "sec", sec, "nowSec", nowSec, "nowSec-sec", nowSec-sec)
}
var err error
if len(msgFromMQ.GetUserIDs()) > 0 {
err := c.offlinePushMsg(ctx, msgFromMQ.MsgData, msgFromMQ.UserIDs)
if err != nil {
log.ZWarn(ctx, "offline push failed", err, "msg", msgFromMQ.String())
}
return
}
switch msgFromMQ.MsgData.SessionType {
case constant.ReadGroupChatType:
err = c.Push2Group(ctx, pbData.MsgData.GroupID, pbData.MsgData)
err = c.Push2Group(ctx, msgFromMQ.MsgData.GroupID, msgFromMQ.MsgData)
default:
var pushUserIDList []string
isSenderSync := datautil.GetSwitchFromOptions(pbData.MsgData.Options, constant.IsSenderSync)
if !isSenderSync || pbData.MsgData.SendID == pbData.MsgData.RecvID {
pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID)
isSenderSync := datautil.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync)
if !isSenderSync || msgFromMQ.MsgData.SendID == msgFromMQ.MsgData.RecvID {
pushUserIDList = append(pushUserIDList, msgFromMQ.MsgData.RecvID)
} else {
pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID, pbData.MsgData.SendID)
pushUserIDList = append(pushUserIDList, msgFromMQ.MsgData.RecvID, msgFromMQ.MsgData.SendID)
}
err = c.Push2User(ctx, pushUserIDList, pbData.MsgData)
err = c.Push2User(ctx, pushUserIDList, msgFromMQ.MsgData)
}
if err != nil {
log.ZWarn(ctx, "push failed", err, "msg", pbData.String())
log.ZWarn(ctx, "push failed", err, "msg", msgFromMQ.String())
}
}
@ -246,28 +242,38 @@ func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *s
if err != nil {
return err
}
// Use offline push messaging
if len(needOfflinePushUserIDs) > 0 {
c.asyncOfflinePush(ctx, needOfflinePushUserIDs, msg)
}
return nil
}
func (c *ConsumerHandler) asyncOfflinePush(ctx context.Context, needOfflinePushUserIDs []string, msg *sdkws.MsgData) {
var offlinePushUserIDs []string
err = c.webhookBeforeOfflinePush(ctx, &c.config.WebhooksConfig.BeforeOfflinePush, needOfflinePushUserIDs, msg, &offlinePushUserIDs)
err := c.webhookBeforeOfflinePush(ctx, &c.config.WebhooksConfig.BeforeOfflinePush, needOfflinePushUserIDs, msg, &offlinePushUserIDs)
if err != nil {
return err
log.ZWarn(ctx, "webhookBeforeOfflinePush failed", err, "msg", msg)
return
}
if len(offlinePushUserIDs) > 0 {
needOfflinePushUserIDs = offlinePushUserIDs
}
err = c.offlinePushMsg(ctx, msg, needOfflinePushUserIDs)
if err != nil {
log.ZWarn(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg)
return nil
if err := c.msgDatabase.MsgToOfflinePushMQ(ctx, conversationutil.GenConversationUniqueKeyForSingle(msg.SendID, msg.RecvID), needOfflinePushUserIDs, msg); err != nil {
prommetrics.SingleChatMsgProcessFailedCounter.Inc()
return
}
}
return nil
func (c *ConsumerHandler) handleMsg2OfflinePush(ctx context.Context, needOfflinePushUserIDs []string, msg *sdkws.MsgData) {
if err := c.offlinePushMsg(ctx, msg, needOfflinePushUserIDs); err != nil {
log.ZWarn(ctx, "offlinePushMsg failed", err, "needOfflinePushUserIDs", needOfflinePushUserIDs, "msg", msg)
}
}
func (c *ConsumerHandler) groupMessagesHandler(ctx context.Context, groupID string, pushToUserIDs *[]string, msg *sdkws.MsgData) (err error) {
if len(*pushToUserIDs) == 0 {
*pushToUserIDs, err = c.groupLocalCache.GetGroupMemberIDs(ctx, groupID)

@ -81,6 +81,7 @@ type Kafka struct {
ToRedisTopic string `mapstructure:"toRedisTopic"`
ToMongoTopic string `mapstructure:"toMongoTopic"`
ToPushTopic string `mapstructure:"toPushTopic"`
ToOfflinePushTopic string `mapstructure:"toOfflinePushTopic"`
ToRedisGroupID string `mapstructure:"toRedisGroupID"`
ToMongoGroupID string `mapstructure:"toMongoGroupID"`
ToPushGroupID string `mapstructure:"toPushGroupID"`

@ -30,6 +30,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/protocol/constant"
pbmsg "github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/push"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
@ -92,6 +93,7 @@ type CommonMsgDatabase interface {
// to mq
MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error
MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error)
MsgToOfflinePushMQ(ctx context.Context, key string, userIDs []string, msg2mq *sdkws.MsgData) error
MsgToMongoMQ(ctx context.Context, key, conversationID string, msgs []*sdkws.MsgData, lastSeq int64) error
RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error)
@ -122,6 +124,10 @@ func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser
if err != nil {
return nil, err
}
producerToOfflinePush, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToOfflinePushTopic)
if err != nil {
return nil, err
}
return &commonMsgDatabase{
msgDocDatabase: msgDocModel,
msg: msg,
@ -130,6 +136,7 @@ func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser
producer: producerToRedis,
producerToMongo: producerToMongo,
producerToPush: producerToPush,
producerToOfflinePush: producerToOfflinePush,
}, nil
}
@ -142,6 +149,7 @@ type commonMsgDatabase struct {
producer *kafka.Producer
producerToMongo *kafka.Producer
producerToPush *kafka.Producer
producerToOfflinePush *kafka.Producer
}
func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error {
@ -158,6 +166,11 @@ func (db *commonMsgDatabase) MsgToPushMQ(ctx context.Context, key, conversationI
return partition, offset, nil
}
func (db *commonMsgDatabase) MsgToOfflinePushMQ(ctx context.Context, key string, userIDs []string, msg2mq *sdkws.MsgData) error {
_, _, err := db.producerToOfflinePush.SendMessage(ctx, key, &push.PushMsgReq{MsgData: msg2mq, UserIDs: userIDs})
return err
}
func (db *commonMsgDatabase) MsgToMongoMQ(ctx context.Context, key, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error {
if len(messages) > 0 {
_, _, err := db.producerToMongo.SendMessage(ctx, key, &pbmsg.MsgDataToMongoByMQ{LastSeq: lastSeq, ConversationID: conversationID, MsgData: messages})

@ -35,7 +35,7 @@ done
echo "Kafka is ready. Creating topics..."
topics=("toRedis" "toMongo" "toPush")
topics=("toRedis" "toMongo" "toPush" "toOfflinePush")
partitions=8
replicationFactor=1

Loading…
Cancel
Save