You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Open-IM-Server/internal/push/push_rpc_server.go

75 lines
2.4 KiB

package push
import (
"context"
"sync"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/localcache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
pbPush "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/push"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
"google.golang.org/grpc"
)
type pushServer struct {
pusher *Pusher
}
func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis()
if err != nil {
return err
}
cacheModel := cache.NewMsgCacheModel(rdb)
offlinePusher := NewOfflinePusher(cacheModel)
database := controller.NewPushDatabase(cacheModel)
groupRpcClient := rpcclient.NewGroupRpcClient(client)
conversationRpcClient := rpcclient.NewConversationRpcClient(client)
msgRpcClient := rpcclient.NewMessageRpcClient(client)
pusher := NewPusher(client, offlinePusher, database, localcache.NewGroupLocalCache(&groupRpcClient), localcache.NewConversationLocalCache(&conversationRpcClient), &conversationRpcClient, &groupRpcClient, &msgRpcClient)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
pbPush.RegisterPushMsgServiceServer(server, &pushServer{
pusher: pusher,
})
}()
go func() {
defer wg.Done()
consumer := NewConsumer(pusher)
consumer.initPrometheus()
consumer.Start()
}()
wg.Wait()
return nil
}
func (r *pushServer) PushMsg(ctx context.Context, pbData *pbPush.PushMsgReq) (resp *pbPush.PushMsgResp, err error) {
switch pbData.MsgData.SessionType {
case constant.SuperGroupChatType:
err = r.pusher.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData)
default:
err = r.pusher.Push2User(ctx, []string{pbData.MsgData.RecvID, pbData.MsgData.SendID}, pbData.MsgData)
}
if err != nil {
if err != errNoOfflinePusher {
return nil, err
} else {
log.ZWarn(ctx, "offline push failed", err, "msg", pbData.String())
}
}
return &pbPush.PushMsgResp{}, nil
}
func (r *pushServer) DelUserPushToken(ctx context.Context, req *pbPush.DelUserPushTokenReq) (resp *pbPush.DelUserPushTokenResp, err error) {
if err = r.pusher.database.DelFcmToken(ctx, req.UserID, int(req.PlatformID)); err != nil {
return nil, err
}
return &pbPush.DelUserPushTokenResp{}, nil
}