commit
908e812aa4
@ -0,0 +1,19 @@
|
||||
name: 'issue-translator'
|
||||
on:
|
||||
issue_comment:
|
||||
types: [created]
|
||||
issues:
|
||||
types: [opened]
|
||||
|
||||
jobs:
|
||||
build:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: usthe/issues-translate-action@v2.7
|
||||
with:
|
||||
BOT_GITHUB_TOKEN: ${{ secrets.BOT_TOKEN }}
|
||||
IS_MODIFY_TITLE: true
|
||||
# not require, default false, . Decide whether to modify the issue title
|
||||
# if true, the robot account @Issues-translate-bot must have modification permissions, invite @Issues-translate-bot to your project or use your custom bot.
|
||||
CUSTOM_BOT_NOTE: Bot detected the issue body's language is not English, translate it automatically. 👯👭🏻🧑🤝🧑👫🧑🏿🤝🧑🏻👩🏾🤝👨🏿👬🏿
|
||||
# not require. Customize the translation robot prefix message.
|
File diff suppressed because it is too large
Load Diff
@ -1,3 +1,3 @@
|
||||
cronExecuteTime: "0 2 * * *"
|
||||
cronExecuteTime: 0 2 * * *
|
||||
retainChatRecords: 365
|
||||
fileExpireTime: 90
|
||||
|
@ -1,46 +1,41 @@
|
||||
rpc:
|
||||
# The IP address where this RPC service registers itself; if left blank, it defaults to the internal network IP
|
||||
registerIP: ''
|
||||
registerIP:
|
||||
# IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP
|
||||
listenIP: 0.0.0.0
|
||||
# List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports
|
||||
ports: [ 10170 ]
|
||||
ports: [ 10170, 10171, 10172, 10173, 10174, 10175, 10176, 10177, 10178, 10179, 10180, 10181, 10182, 10183, 10184, 10185 ]
|
||||
|
||||
prometheus:
|
||||
# Enable or disable Prometheus monitoring
|
||||
enable: true
|
||||
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
|
||||
ports: [ 20107 ]
|
||||
ports: [ 12170, 12171, 12172, 12173, 12174, 12175, 12176, 12177, 12178, 12179, 12180, 12181, 12182, 12183, 12184, 12185 ]
|
||||
|
||||
maxConcurrentWorkers: 3
|
||||
#"Use geTui for offline push notifications, or choose fcm or jpns; corresponding configuration settings must be specified."
|
||||
enable: "geTui"
|
||||
#Use geTui for offline push notifications, or choose fcm or jpns; corresponding configuration settings must be specified.
|
||||
enable: geTui
|
||||
geTui:
|
||||
pushUrl: "https://restapi.getui.com/v2/$appId"
|
||||
masterSecret: ''
|
||||
appKey: ''
|
||||
intent: ''
|
||||
channelID: ''
|
||||
channelName: ''
|
||||
pushUrl: https://restapi.getui.com/v2/$appId
|
||||
masterSecret:
|
||||
appKey:
|
||||
intent:
|
||||
channelID:
|
||||
channelName:
|
||||
fcm:
|
||||
# Prioritize using file paths. If the file path is empty, use URL
|
||||
filePath: "" # File path is concatenated with the parameters passed in through - c(`mage` default pass in `config/`) and filePath.
|
||||
authURL: "" # Must start with https or http.
|
||||
filePath: # File path is concatenated with the parameters passed in through - c(`mage` default pass in `config/`) and filePath.
|
||||
authURL: # Must start with https or http.
|
||||
jpns:
|
||||
appKey: ''
|
||||
masterSecret: ''
|
||||
pushURL: ''
|
||||
pushIntent: ''
|
||||
appKey:
|
||||
masterSecret:
|
||||
pushURL:
|
||||
pushIntent:
|
||||
|
||||
# iOS system push sound and badge count
|
||||
iosPush:
|
||||
pushSound: "xxx"
|
||||
pushSound: xxx
|
||||
badgeCount: true
|
||||
production: false
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
fullUserCache: true
|
||||
|
@ -1,13 +1,13 @@
|
||||
rpc:
|
||||
# The IP address where this RPC service registers itself; if left blank, it defaults to the internal network IP
|
||||
registerIP: ''
|
||||
registerIP:
|
||||
# IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP
|
||||
listenIP: 0.0.0.0
|
||||
# List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports
|
||||
ports: [ 10180 ]
|
||||
ports: [ 10220 ]
|
||||
|
||||
prometheus:
|
||||
# Enable or disable Prometheus monitoring
|
||||
enable: true
|
||||
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
|
||||
ports: [ 20105 ]
|
||||
ports: [ 12220 ]
|
||||
|
@ -1,13 +1,13 @@
|
||||
rpc:
|
||||
# The IP address where this RPC service registers itself; if left blank, it defaults to the internal network IP
|
||||
registerIP: ''
|
||||
registerIP:
|
||||
# IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP
|
||||
listenIP: 0.0.0.0
|
||||
# List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports
|
||||
ports: [ 10120 ]
|
||||
ports: [ 10240 ]
|
||||
|
||||
prometheus:
|
||||
# Enable or disable Prometheus monitoring
|
||||
enable: true
|
||||
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
|
||||
ports: [ 20104 ]
|
||||
ports: [ 12240 ]
|
||||
|
@ -1,13 +1,16 @@
|
||||
rpc:
|
||||
# The IP address where this RPC service registers itself; if left blank, it defaults to the internal network IP
|
||||
registerIP: ''
|
||||
registerIP:
|
||||
# IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP
|
||||
listenIP: 0.0.0.0
|
||||
# List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports
|
||||
ports: [ 10150 ]
|
||||
ports: [ 10260 ]
|
||||
|
||||
prometheus:
|
||||
# Enable or disable Prometheus monitoring
|
||||
enable: true
|
||||
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
|
||||
ports: [ 20103 ]
|
||||
ports: [ 12260 ]
|
||||
|
||||
|
||||
enableHistoryForNewMembers: true
|
||||
|
@ -1,40 +1,40 @@
|
||||
rpc:
|
||||
# The IP address where this RPC service registers itself; if left blank, it defaults to the internal network IP
|
||||
registerIP: ''
|
||||
registerIP:
|
||||
# IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP
|
||||
listenIP: 0.0.0.0
|
||||
# List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports
|
||||
ports: [ 10190 ]
|
||||
ports: [ 10300 ]
|
||||
|
||||
prometheus:
|
||||
# Enable or disable Prometheus monitoring
|
||||
enable: true
|
||||
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
|
||||
ports: [ 20101 ]
|
||||
ports: [ 12300 ]
|
||||
|
||||
|
||||
object:
|
||||
# Use MinIO as object storage, or set to "cos", "oss", "kodo", "aws", while also configuring the corresponding settings
|
||||
enable: "minio"
|
||||
enable: minio
|
||||
cos:
|
||||
bucketURL: https://temp-1252357374.cos.ap-chengdu.myqcloud.com
|
||||
secretID: ''
|
||||
secretKey: ''
|
||||
sessionToken: ''
|
||||
secretID:
|
||||
secretKey:
|
||||
sessionToken:
|
||||
publicRead: false
|
||||
oss:
|
||||
endpoint: "https://oss-cn-chengdu.aliyuncs.com"
|
||||
bucket: "demo-9999999"
|
||||
bucketURL: "https://demo-9999999.oss-cn-chengdu.aliyuncs.com"
|
||||
accessKeyID: ''
|
||||
accessKeySecret: ''
|
||||
sessionToken: ''
|
||||
endpoint: https://oss-cn-chengdu.aliyuncs.com
|
||||
bucket: demo-9999999
|
||||
bucketURL: https://demo-9999999.oss-cn-chengdu.aliyuncs.com
|
||||
accessKeyID:
|
||||
accessKeySecret:
|
||||
sessionToken:
|
||||
publicRead: false
|
||||
kodo:
|
||||
endpoint: "http://s3.cn-south-1.qiniucs.com"
|
||||
bucket: "kodo-bucket-test"
|
||||
bucketURL: "http://kodo-bucket-test-oetobfb.qiniudns.com"
|
||||
accessKeyID: ''
|
||||
accessKeySecret: ''
|
||||
sessionToken: ''
|
||||
publicRead: false
|
||||
endpoint: http://s3.cn-south-1.qiniucs.com
|
||||
bucket: kodo-bucket-test
|
||||
bucketURL: http://kodo-bucket-test-oetobfb.qiniudns.com
|
||||
accessKeyID:
|
||||
accessKeySecret:
|
||||
sessionToken:
|
||||
publicRead: false
|
||||
|
@ -1,17 +1,13 @@
|
||||
rpc:
|
||||
# API or other RPCs can access this RPC through this IP; if left blank, the internal network IP is obtained by default
|
||||
registerIP: ''
|
||||
registerIP:
|
||||
# Listening IP; 0.0.0.0 means both internal and external IPs are listened to, if blank, the internal network IP is automatically obtained by default
|
||||
listenIP: 0.0.0.0
|
||||
# Listening ports; if multiple are configured, multiple instances will be launched, and must be consistent with the number of prometheus.ports
|
||||
ports: [ 10110 ]
|
||||
ports: [ 10320 ]
|
||||
|
||||
prometheus:
|
||||
# Whether to enable prometheus
|
||||
enable: true
|
||||
# Prometheus listening ports, must be consistent with the number of rpc.ports
|
||||
ports: [ 20100 ]
|
||||
|
||||
|
||||
|
||||
|
||||
ports: [ 12320 ]
|
||||
|
@ -1,6 +1,7 @@
|
||||
address: [ localhost:16379 ]
|
||||
username: ''
|
||||
username:
|
||||
password: openIM123
|
||||
clusterMode: false
|
||||
db: 0
|
||||
maxRetry: 10
|
||||
poolSize: 100
|
||||
|
@ -0,0 +1,29 @@
|
||||
package push
|
||||
|
||||
import (
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestName(t *testing.T) {
|
||||
var c ConsumerHandler
|
||||
c.readCh = make(chan *sdkws.MarkAsReadTips)
|
||||
|
||||
go c.loopRead()
|
||||
|
||||
go func() {
|
||||
for i := 0; ; i++ {
|
||||
seq := int64(i + 1)
|
||||
if seq%3 == 0 {
|
||||
seq = 1
|
||||
}
|
||||
c.readCh <- &sdkws.MarkAsReadTips{
|
||||
ConversationID: "c100",
|
||||
MarkAsReadUserID: "u100",
|
||||
HasReadSeq: seq,
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
select {}
|
||||
}
|
@ -0,0 +1,122 @@
|
||||
package push
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/IBM/sarama"
|
||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
|
||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||
"github.com/openimsdk/protocol/constant"
|
||||
pbpush "github.com/openimsdk/protocol/push"
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
"github.com/openimsdk/tools/errs"
|
||||
"github.com/openimsdk/tools/log"
|
||||
"github.com/openimsdk/tools/mq/kafka"
|
||||
"github.com/openimsdk/tools/utils/jsonutil"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
type OfflinePushConsumerHandler struct {
|
||||
OfflinePushConsumerGroup *kafka.MConsumerGroup
|
||||
offlinePusher offlinepush.OfflinePusher
|
||||
}
|
||||
|
||||
func NewOfflinePushConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher) (*OfflinePushConsumerHandler, error) {
|
||||
var offlinePushConsumerHandler OfflinePushConsumerHandler
|
||||
var err error
|
||||
offlinePushConsumerHandler.offlinePusher = offlinePusher
|
||||
offlinePushConsumerHandler.OfflinePushConsumerGroup, err = kafka.NewMConsumerGroup(config.KafkaConfig.Build(), config.KafkaConfig.ToOfflineGroupID,
|
||||
[]string{config.KafkaConfig.ToOfflinePushTopic}, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &offlinePushConsumerHandler, nil
|
||||
}
|
||||
|
||||
func (*OfflinePushConsumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil }
|
||||
func (*OfflinePushConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }
|
||||
func (o *OfflinePushConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
|
||||
for msg := range claim.Messages() {
|
||||
ctx := o.OfflinePushConsumerGroup.GetContextFromMsg(msg)
|
||||
o.handleMsg2OfflinePush(ctx, msg.Value)
|
||||
sess.MarkMessage(msg, "")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *OfflinePushConsumerHandler) handleMsg2OfflinePush(ctx context.Context, msg []byte) {
|
||||
offlinePushMsg := pbpush.PushMsgReq{}
|
||||
if err := proto.Unmarshal(msg, &offlinePushMsg); err != nil {
|
||||
log.ZError(ctx, "offline push Unmarshal msg err", err, "msg", string(msg))
|
||||
return
|
||||
}
|
||||
if offlinePushMsg.MsgData == nil || offlinePushMsg.UserIDs == nil {
|
||||
log.ZError(ctx, "offline push msg is empty", errs.New("offlinePushMsg is empty"), "userIDs", offlinePushMsg.UserIDs, "msg", offlinePushMsg.MsgData)
|
||||
return
|
||||
}
|
||||
log.ZInfo(ctx, "receive to OfflinePush MQ", "userIDs", offlinePushMsg.UserIDs, "msg", offlinePushMsg.MsgData)
|
||||
|
||||
err := o.offlinePushMsg(ctx, offlinePushMsg.MsgData, offlinePushMsg.UserIDs)
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "offline push failed", err, "msg", offlinePushMsg.String())
|
||||
}
|
||||
}
|
||||
|
||||
func (o *OfflinePushConsumerHandler) getOfflinePushInfos(msg *sdkws.MsgData) (title, content string, opts *options.Opts, err error) {
|
||||
type AtTextElem struct {
|
||||
Text string `json:"text,omitempty"`
|
||||
AtUserList []string `json:"atUserList,omitempty"`
|
||||
IsAtSelf bool `json:"isAtSelf"`
|
||||
}
|
||||
|
||||
opts = &options.Opts{Signal: &options.Signal{}}
|
||||
if msg.OfflinePushInfo != nil {
|
||||
opts.IOSBadgeCount = msg.OfflinePushInfo.IOSBadgeCount
|
||||
opts.IOSPushSound = msg.OfflinePushInfo.IOSPushSound
|
||||
opts.Ex = msg.OfflinePushInfo.Ex
|
||||
}
|
||||
|
||||
if msg.OfflinePushInfo != nil {
|
||||
title = msg.OfflinePushInfo.Title
|
||||
content = msg.OfflinePushInfo.Desc
|
||||
}
|
||||
if title == "" {
|
||||
switch msg.ContentType {
|
||||
case constant.Text:
|
||||
fallthrough
|
||||
case constant.Picture:
|
||||
fallthrough
|
||||
case constant.Voice:
|
||||
fallthrough
|
||||
case constant.Video:
|
||||
fallthrough
|
||||
case constant.File:
|
||||
title = constant.ContentType2PushContent[int64(msg.ContentType)]
|
||||
case constant.AtText:
|
||||
ac := AtTextElem{}
|
||||
_ = jsonutil.JsonStringToStruct(string(msg.Content), &ac)
|
||||
case constant.SignalingNotification:
|
||||
title = constant.ContentType2PushContent[constant.SignalMsg]
|
||||
default:
|
||||
title = constant.ContentType2PushContent[constant.Common]
|
||||
}
|
||||
}
|
||||
if content == "" {
|
||||
content = title
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (o *OfflinePushConsumerHandler) offlinePushMsg(ctx context.Context, msg *sdkws.MsgData, offlinePushUserIDs []string) error {
|
||||
title, content, opts, err := o.getOfflinePushInfos(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = o.offlinePusher.Push(ctx, offlinePushUserIDs, title, content, opts)
|
||||
if err != nil {
|
||||
prommetrics.MsgOfflinePushFailedCounter.Inc()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
File diff suppressed because it is too large
Load Diff
@ -0,0 +1 @@
|
||||
package apistruct
|
@ -0,0 +1,51 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/tools/db/redisutil"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
/*
|
||||
address: [ 172.16.8.48:7001, 172.16.8.48:7002, 172.16.8.48:7003, 172.16.8.48:7004, 172.16.8.48:7005, 172.16.8.48:7006 ]
|
||||
username:
|
||||
password: passwd123
|
||||
clusterMode: true
|
||||
db: 0
|
||||
maxRetry: 10
|
||||
*/
|
||||
func TestName111111(t *testing.T) {
|
||||
conf := config.Redis{
|
||||
Address: []string{
|
||||
"172.16.8.124:7001",
|
||||
"172.16.8.124:7002",
|
||||
"172.16.8.124:7003",
|
||||
"172.16.8.124:7004",
|
||||
"172.16.8.124:7005",
|
||||
"172.16.8.124:7006",
|
||||
},
|
||||
ClusterMode: true,
|
||||
Password: "passwd123",
|
||||
//Address: []string{"localhost:16379"},
|
||||
//Password: "openIM123",
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1000)
|
||||
defer cancel()
|
||||
rdb, err := redisutil.NewRedisClient(ctx, conf.Build())
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
online := NewUserOnline(rdb)
|
||||
|
||||
userID := "a123456"
|
||||
t.Log(online.GetOnline(ctx, userID))
|
||||
t.Log(online.SetUserOnline(ctx, userID, []int32{1, 2, 3, 4}, nil))
|
||||
t.Log(online.GetOnline(ctx, userID))
|
||||
|
||||
}
|
||||
|
||||
func TestName111(t *testing.T) {
|
||||
|
||||
}
|
@ -0,0 +1,286 @@
|
||||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
pbmsg "github.com/openimsdk/protocol/msg"
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
"github.com/openimsdk/tools/errs"
|
||||
"github.com/openimsdk/tools/log"
|
||||
"github.com/openimsdk/tools/mq/kafka"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
)
|
||||
|
||||
type MsgTransferDatabase interface {
|
||||
// BatchInsertChat2DB inserts a batch of messages into the database for a specific conversation.
|
||||
BatchInsertChat2DB(ctx context.Context, conversationID string, msgs []*sdkws.MsgData, currentMaxSeq int64) error
|
||||
// DeleteMessagesFromCache deletes message caches from Redis by sequence numbers.
|
||||
DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error
|
||||
|
||||
// BatchInsertChat2Cache increments the sequence number and then batch inserts messages into the cache.
|
||||
BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, err error)
|
||||
SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
|
||||
|
||||
// to mq
|
||||
MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error)
|
||||
MsgToMongoMQ(ctx context.Context, key, conversationID string, msgs []*sdkws.MsgData, lastSeq int64) error
|
||||
}
|
||||
|
||||
func NewMsgTransferDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (MsgTransferDatabase, error) {
|
||||
conf, err := kafka.BuildProducerConfig(*kafkaConf.Build())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
producerToMongo, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToMongoTopic)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
producerToPush, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToPushTopic)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &msgTransferDatabase{
|
||||
msgDocDatabase: msgDocModel,
|
||||
msg: msg,
|
||||
seqUser: seqUser,
|
||||
seqConversation: seqConversation,
|
||||
producerToMongo: producerToMongo,
|
||||
producerToPush: producerToPush,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type msgTransferDatabase struct {
|
||||
msgDocDatabase database.Msg
|
||||
msgTable model.MsgDocModel
|
||||
msg cache.MsgCache
|
||||
seqConversation cache.SeqConversationCache
|
||||
seqUser cache.SeqUser
|
||||
producerToMongo *kafka.Producer
|
||||
producerToPush *kafka.Producer
|
||||
}
|
||||
|
||||
func (db *msgTransferDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error {
|
||||
if len(msgList) == 0 {
|
||||
return errs.ErrArgs.WrapMsg("msgList is empty")
|
||||
}
|
||||
msgs := make([]any, len(msgList))
|
||||
for i, msg := range msgList {
|
||||
if msg == nil {
|
||||
continue
|
||||
}
|
||||
var offlinePushModel *model.OfflinePushModel
|
||||
if msg.OfflinePushInfo != nil {
|
||||
offlinePushModel = &model.OfflinePushModel{
|
||||
Title: msg.OfflinePushInfo.Title,
|
||||
Desc: msg.OfflinePushInfo.Desc,
|
||||
Ex: msg.OfflinePushInfo.Ex,
|
||||
IOSPushSound: msg.OfflinePushInfo.IOSPushSound,
|
||||
IOSBadgeCount: msg.OfflinePushInfo.IOSBadgeCount,
|
||||
}
|
||||
}
|
||||
msgs[i] = &model.MsgDataModel{
|
||||
SendID: msg.SendID,
|
||||
RecvID: msg.RecvID,
|
||||
GroupID: msg.GroupID,
|
||||
ClientMsgID: msg.ClientMsgID,
|
||||
ServerMsgID: msg.ServerMsgID,
|
||||
SenderPlatformID: msg.SenderPlatformID,
|
||||
SenderNickname: msg.SenderNickname,
|
||||
SenderFaceURL: msg.SenderFaceURL,
|
||||
SessionType: msg.SessionType,
|
||||
MsgFrom: msg.MsgFrom,
|
||||
ContentType: msg.ContentType,
|
||||
Content: string(msg.Content),
|
||||
Seq: msg.Seq,
|
||||
SendTime: msg.SendTime,
|
||||
CreateTime: msg.CreateTime,
|
||||
Status: msg.Status,
|
||||
Options: msg.Options,
|
||||
OfflinePush: offlinePushModel,
|
||||
AtUserIDList: msg.AtUserIDList,
|
||||
AttachedInfo: msg.AttachedInfo,
|
||||
Ex: msg.Ex,
|
||||
}
|
||||
}
|
||||
return db.BatchInsertBlock(ctx, conversationID, msgs, updateKeyMsg, msgList[0].Seq)
|
||||
}
|
||||
|
||||
func (db *msgTransferDatabase) BatchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error {
|
||||
if len(fields) == 0 {
|
||||
return nil
|
||||
}
|
||||
num := db.msgTable.GetSingleGocMsgNum()
|
||||
// num = 100
|
||||
for i, field := range fields { // Check the type of the field
|
||||
var ok bool
|
||||
switch key {
|
||||
case updateKeyMsg:
|
||||
var msg *model.MsgDataModel
|
||||
msg, ok = field.(*model.MsgDataModel)
|
||||
if msg != nil && msg.Seq != firstSeq+int64(i) {
|
||||
return errs.ErrInternalServer.WrapMsg("seq is invalid")
|
||||
}
|
||||
case updateKeyRevoke:
|
||||
_, ok = field.(*model.RevokeModel)
|
||||
default:
|
||||
return errs.ErrInternalServer.WrapMsg("key is invalid")
|
||||
}
|
||||
if !ok {
|
||||
return errs.ErrInternalServer.WrapMsg("field type is invalid")
|
||||
}
|
||||
}
|
||||
// Returns true if the document exists in the database, false if the document does not exist in the database
|
||||
updateMsgModel := func(seq int64, i int) (bool, error) {
|
||||
var (
|
||||
res *mongo.UpdateResult
|
||||
err error
|
||||
)
|
||||
docID := db.msgTable.GetDocID(conversationID, seq)
|
||||
index := db.msgTable.GetMsgIndex(seq)
|
||||
field := fields[i]
|
||||
switch key {
|
||||
case updateKeyMsg:
|
||||
res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "msg", field)
|
||||
case updateKeyRevoke:
|
||||
res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "revoke", field)
|
||||
}
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return res.MatchedCount > 0, nil
|
||||
}
|
||||
tryUpdate := true
|
||||
for i := 0; i < len(fields); i++ {
|
||||
seq := firstSeq + int64(i) // Current sequence number
|
||||
if tryUpdate {
|
||||
matched, err := updateMsgModel(seq, i)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if matched {
|
||||
continue // The current data has been updated, skip the current data
|
||||
}
|
||||
}
|
||||
doc := model.MsgDocModel{
|
||||
DocID: db.msgTable.GetDocID(conversationID, seq),
|
||||
Msg: make([]*model.MsgInfoModel, num),
|
||||
}
|
||||
var insert int // Inserted data number
|
||||
for j := i; j < len(fields); j++ {
|
||||
seq = firstSeq + int64(j)
|
||||
if db.msgTable.GetDocID(conversationID, seq) != doc.DocID {
|
||||
break
|
||||
}
|
||||
insert++
|
||||
switch key {
|
||||
case updateKeyMsg:
|
||||
doc.Msg[db.msgTable.GetMsgIndex(seq)] = &model.MsgInfoModel{
|
||||
Msg: fields[j].(*model.MsgDataModel),
|
||||
}
|
||||
case updateKeyRevoke:
|
||||
doc.Msg[db.msgTable.GetMsgIndex(seq)] = &model.MsgInfoModel{
|
||||
Revoke: fields[j].(*model.RevokeModel),
|
||||
}
|
||||
}
|
||||
}
|
||||
for i, msgInfo := range doc.Msg {
|
||||
if msgInfo == nil {
|
||||
msgInfo = &model.MsgInfoModel{}
|
||||
doc.Msg[i] = msgInfo
|
||||
}
|
||||
if msgInfo.DelList == nil {
|
||||
doc.Msg[i].DelList = []string{}
|
||||
}
|
||||
}
|
||||
if err := db.msgDocDatabase.Create(ctx, &doc); err != nil {
|
||||
if mongo.IsDuplicateKeyError(err) {
|
||||
i-- // already inserted
|
||||
tryUpdate = true // next block use update mode
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
tryUpdate = false // The current block is inserted successfully, and the next block is inserted preferentially
|
||||
i += insert - 1 // Skip the inserted data
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *msgTransferDatabase) DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error {
|
||||
return db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs)
|
||||
}
|
||||
|
||||
func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) {
|
||||
lenList := len(msgs)
|
||||
if int64(lenList) > db.msgTable.GetSingleGocMsgNum() {
|
||||
return 0, false, errs.New("message count exceeds limit", "limit", db.msgTable.GetSingleGocMsgNum()).Wrap()
|
||||
}
|
||||
if lenList < 1 {
|
||||
return 0, false, errs.New("no messages to insert", "minCount", 1).Wrap()
|
||||
}
|
||||
currentMaxSeq, err := db.seqConversation.Malloc(ctx, conversationID, int64(len(msgs)))
|
||||
if err != nil {
|
||||
log.ZError(ctx, "storage.seq.Malloc", err)
|
||||
return 0, false, err
|
||||
}
|
||||
isNew = currentMaxSeq == 0
|
||||
lastMaxSeq := currentMaxSeq
|
||||
userSeqMap := make(map[string]int64)
|
||||
for _, m := range msgs {
|
||||
currentMaxSeq++
|
||||
m.Seq = currentMaxSeq
|
||||
userSeqMap[m.SendID] = m.Seq
|
||||
}
|
||||
|
||||
failedNum, err := db.msg.SetMessagesToCache(ctx, conversationID, msgs)
|
||||
if err != nil {
|
||||
prommetrics.MsgInsertRedisFailedCounter.Add(float64(failedNum))
|
||||
log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID)
|
||||
} else {
|
||||
prommetrics.MsgInsertRedisSuccessCounter.Inc()
|
||||
}
|
||||
err = db.setHasReadSeqs(ctx, conversationID, userSeqMap)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID)
|
||||
prommetrics.SeqSetFailedCounter.Inc()
|
||||
}
|
||||
return lastMaxSeq, isNew, errs.Wrap(err)
|
||||
}
|
||||
|
||||
func (db *msgTransferDatabase) setHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error {
|
||||
for userID, seq := range userSeqMap {
|
||||
if err := db.seqUser.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *msgTransferDatabase) SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
|
||||
return db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, hasReadSeq)
|
||||
}
|
||||
|
||||
func (db *msgTransferDatabase) MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) {
|
||||
partition, offset, err := db.producerToPush.SendMessage(ctx, key, &pbmsg.PushMsgDataToMQ{MsgData: msg2mq, ConversationID: conversationID})
|
||||
if err != nil {
|
||||
log.ZError(ctx, "MsgToPushMQ", err, "key", key, "msg2mq", msg2mq)
|
||||
return 0, 0, err
|
||||
}
|
||||
return partition, offset, nil
|
||||
}
|
||||
|
||||
func (db *msgTransferDatabase) MsgToMongoMQ(ctx context.Context, key, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error {
|
||||
if len(messages) > 0 {
|
||||
_, _, err := db.producerToMongo.SendMessage(ctx, key, &pbmsg.MsgDataToMongoByMQ{LastSeq: lastSeq, ConversationID: conversationID, MsgData: messages})
|
||||
if err != nil {
|
||||
log.ZError(ctx, "MsgToMongoMQ", err, "key", key, "conversationID", conversationID, "lastSeq", lastSeq)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue