feat: mark read aggregate write

pull/2558/head
withchao 1 year ago
parent c14dcb7e49
commit 6cf385bb31

@ -12,7 +12,7 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.69
github.com/openimsdk/protocol v0.0.72-alpha.6
github.com/openimsdk/tools v0.0.49-alpha.55
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
@ -41,6 +41,7 @@ require (
github.com/spf13/viper v1.18.2
github.com/stathat/consistent v1.0.0
go.uber.org/automaxprocs v1.5.3
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
golang.org/x/sync v0.6.0
)
@ -74,7 +75,6 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bytedance/sonic v1.9.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chai2010/webp v1.1.1 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/clbanning/mxj v1.8.4 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
@ -170,7 +170,6 @@ require (
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/image v0.15.0 // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/oauth2 v0.17.0 // indirect

@ -71,8 +71,6 @@ github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZX
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chai2010/webp v1.1.1 h1:jTRmEccAJ4MGrhFOrPMpNGIJ/eybIgwKpcACsrTEapk=
github.com/chai2010/webp v1.1.1/go.mod h1:0XVwvZWdjjdxpUEIf7b9g9VkHFnInUSYujwqTLEuldU=
github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk=
@ -321,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.69 h1:dVi8meSg8kmUzSH1XQab4MjihqKkkcCAmt1BYXPJuXo=
github.com/openimsdk/protocol v0.0.69/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/protocol v0.0.72-alpha.6 h1:FzSzXJtSyXYSAewt7vPaQf0DuU8T7QS6ePy68F7bXk8=
github.com/openimsdk/protocol v0.0.72-alpha.6/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.49-alpha.55 h1:KPgC53oqiwZYssLKljhtXbWXifMlTj2SSQEusj4Uf4k=
github.com/openimsdk/tools v0.0.49-alpha.55/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=

@ -0,0 +1,29 @@
package push
import (
"github.com/openimsdk/protocol/sdkws"
"testing"
)
func TestName(t *testing.T) {
var c ConsumerHandler
c.readCh = make(chan *sdkws.MarkAsReadTips)
go c.loopRead()
go func() {
for i := 0; ; i++ {
seq := int64(i + 1)
if seq%3 == 0 {
seq = 1
}
c.readCh <- &sdkws.MarkAsReadTips{
ConversationID: "c100",
MarkAsReadUserID: "u100",
HasReadSeq: seq,
}
}
}()
select {}
}

@ -17,6 +17,7 @@ package push
import (
"context"
"encoding/json"
"fmt"
"github.com/IBM/sarama"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
@ -35,11 +36,17 @@ import (
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/mq/kafka"
"github.com/openimsdk/tools/mq/memamq"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/jsonutil"
"github.com/openimsdk/tools/utils/timeutil"
"github.com/redis/go-redis/v9"
"google.golang.org/protobuf/proto"
"math/rand"
"os"
"strconv"
"sync/atomic"
"time"
)
type ConsumerHandler struct {
@ -54,6 +61,7 @@ type ConsumerHandler struct {
groupRpcClient rpcclient.GroupRpcClient
webhookClient *webhook.Client
config *Config
readCh chan *sdkws.MarkAsReadTips
}
func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher, rdb redis.UniversalClient,
@ -76,6 +84,8 @@ func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher,
consumerHandler.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL)
consumerHandler.config = config
consumerHandler.onlineCache = rpccache.NewOnlineCache(userRpcClient, consumerHandler.groupLocalCache, rdb, nil)
consumerHandler.readCh = make(chan *sdkws.MarkAsReadTips, 1024*8)
go consumerHandler.loopRead()
return &consumerHandler, nil
}
@ -89,6 +99,7 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
MsgData: msgFromMQ.MsgData,
ConversationID: msgFromMQ.ConversationID,
}
c.handlerConversationRead(ctx, pbData.MsgData)
sec := msgFromMQ.MsgData.SendTime / 1000
nowSec := timeutil.GetCurrentTimestampBySecond()
log.ZDebug(ctx, "push msg", "msg", pbData.String(), "sec", sec, "nowSec", nowSec)
@ -118,6 +129,98 @@ func (*ConsumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil }
func (*ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }
func (c *ConsumerHandler) loopRead() {
type markKey struct {
ConversationID string
UserID string
}
type markSeq struct {
ReadSeq int64
MarkSeq int64
Count int64
}
type asyncRequest struct {
ConversationID string
UserID string
ReadSeq int64
}
ctx := context.Background()
ctx = mcontext.WithOpUserIDContext(ctx, c.config.Share.IMAdminUserID[0])
opIDPrefix := fmt.Sprintf("mark_read_%d_%d_", os.Getpid(), rand.Uint32())
var incr atomic.Uint64
maxSeq := make(map[markKey]*markSeq, 1024*8)
queue := memamq.NewMemoryQueue(32, 1024)
defer queue.Stop()
ticker := time.NewTicker(time.Second * 1)
defer ticker.Stop()
for {
select {
case <-ticker.C:
var markSeqs []asyncRequest
for key, seq := range maxSeq {
if seq.MarkSeq >= seq.ReadSeq {
seq.Count++
if seq.Count > 6 {
delete(maxSeq, key)
}
continue
}
seq.Count = 0
seq.MarkSeq = seq.ReadSeq
markSeqs = append(markSeqs, asyncRequest{
ConversationID: key.ConversationID,
UserID: key.UserID,
ReadSeq: seq.ReadSeq,
})
}
if len(markSeqs) == 0 {
continue
}
go func() {
for i := range markSeqs {
seq := markSeqs[i]
queue.PushCtx(ctx, func() {
ctx = mcontext.SetOperationID(ctx, opIDPrefix+strconv.FormatUint(incr.Add(1), 10))
_, err := c.msgRpcClient.Client.SetConversationHasReadSeq(ctx, &pbchat.SetConversationHasReadSeqReq{
ConversationID: seq.ConversationID,
UserID: seq.UserID,
HasReadSeq: seq.ReadSeq,
NoNotification: true,
})
if err != nil {
log.ZError(ctx, "ConsumerHandler SetConversationHasReadSeq", err, "conversationID", seq.ConversationID, "userID", seq.UserID, "readSeq", seq.ReadSeq)
}
})
}
}()
case tips, ok := <-c.readCh:
if !ok {
return
}
if tips.HasReadSeq <= 0 {
continue
}
key := markKey{
ConversationID: tips.ConversationID,
UserID: tips.MarkAsReadUserID,
}
ms, ok := maxSeq[key]
if ok {
if ms.ReadSeq < tips.HasReadSeq {
ms.ReadSeq = tips.HasReadSeq
}
} else {
ms = &markSeq{
ReadSeq: tips.HasReadSeq,
MarkSeq: 0,
}
maxSeq[key] = ms
}
}
}
}
func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
ctx := c.pushConsumerGroup.GetContextFromMsg(msg)
@ -215,6 +318,39 @@ func (c *ConsumerHandler) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.
return result, nil
}
func (c *ConsumerHandler) handlerConversationRead(ctx context.Context, msg *sdkws.MsgData) {
if msg.ContentType != constant.HasReadReceipt {
return
}
var elem sdkws.NotificationElem
if err := json.Unmarshal(msg.Content, &elem); err != nil {
log.ZError(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg)
return
}
var tips sdkws.MarkAsReadTips
if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil {
log.ZError(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg)
return
}
if len(tips.Seqs) > 0 {
for _, seq := range tips.Seqs {
if tips.HasReadSeq < seq {
tips.HasReadSeq = seq
}
}
clear(tips.Seqs)
tips.Seqs = nil
}
if tips.HasReadSeq < 0 {
return
}
select {
case c.readCh <- &tips:
default:
log.ZWarn(ctx, "handlerConversationRead readCh is full", nil, "markAsReadTips", &tips)
}
}
func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
log.ZDebug(ctx, "Get group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
var pushToUserIDs []string

@ -66,6 +66,11 @@ type groupServer struct {
webhookClient *webhook.Client
}
func (s *groupServer) SetGroupInfoEX(ctx context.Context, req *pbgroup.SetGroupInfoEXReq) (*pbgroup.SetGroupInfoEXResp, error) {
//TODO implement me
panic("implement me")
}
type Config struct {
RpcConfig config.Group
RedisConfig config.Redis

@ -80,10 +80,16 @@ func (m *msgServer) SetConversationHasReadSeq(ctx context.Context, req *msg.SetC
if req.HasReadSeq > maxSeq {
return nil, errs.ErrArgs.WrapMsg("hasReadSeq must not be bigger than maxSeq")
}
if err := m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq); err != nil {
return nil, err
if req.NoNotification {
if err := m.MsgDatabase.SetHasReadSeqToDB(ctx, req.UserID, req.ConversationID, req.HasReadSeq); err != nil {
return nil, err
}
} else {
if err := m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq); err != nil {
return nil, err
}
m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID, req.UserID, nil, req.HasReadSeq)
}
m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID, req.UserID, nil, req.HasReadSeq)
return &msg.SetConversationHasReadSeqResp{}, nil
}

@ -0,0 +1 @@
package apistruct

@ -0,0 +1,43 @@
package redis
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/db/redisutil"
"testing"
"time"
)
/*
address: [ 172.16.8.48:7001, 172.16.8.48:7002, 172.16.8.48:7003, 172.16.8.48:7004, 172.16.8.48:7005, 172.16.8.48:7006 ]
username:
password: passwd123
clusterMode: true
db: 0
maxRetry: 10
*/
func TestName111111(t *testing.T) {
conf := config.Redis{
Address: []string{
"172.16.8.48:7001",
"172.16.8.48:7002",
"172.16.8.48:7003",
"172.16.8.48:7004",
"172.16.8.48:7005",
"172.16.8.48:7006",
},
ClusterMode: true,
Password: "passwd123",
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
rdb, err := redisutil.NewRedisClient(ctx, conf.Build())
if err != nil {
panic(err)
}
online := NewUserOnline(rdb)
userID := "123456"
t.Log(online.GetOnline(ctx, userID))
}

@ -73,15 +73,15 @@ func (s *seqUserCacheRedis) GetUserReadSeq(ctx context.Context, conversationID s
})
}
func (s *seqUserCacheRedis) SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
if seq%s.readSeqWriteRatio == 0 {
func (s *seqUserCacheRedis) SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64, writeDB bool) error {
if err := s.rocks.RawSet(ctx, s.getSeqUserReadSeqKey(conversationID, userID), strconv.Itoa(int(seq)), s.readExpireTime); err != nil {
return errs.Wrap(err)
}
if writeDB {
if err := s.mgo.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
}
if err := s.rocks.RawSet(ctx, s.getSeqUserReadSeqKey(conversationID, userID), strconv.Itoa(int(seq)), s.readExpireTime); err != nil {
return errs.Wrap(err)
}
return nil
}

@ -8,7 +8,7 @@ type SeqUser interface {
GetUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetUserMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetUserReadSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error
SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64, writeDB bool) error
SetUserMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error
SetUserReadSeqs(ctx context.Context, userID string, seqs map[string]int64) error
GetUserReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)

@ -77,6 +77,7 @@ type CommonMsgDatabase interface {
SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error)
SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)
GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error)
UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error
@ -337,7 +338,7 @@ func (db *commonMsgDatabase) DeleteMessagesFromCache(ctx context.Context, conver
func (db *commonMsgDatabase) setHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error {
for userID, seq := range userSeqMap {
if err := db.seqUser.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
if err := db.seqUser.SetUserReadSeq(ctx, conversationID, userID, seq, false); err != nil {
return err
}
}
@ -805,7 +806,11 @@ func (db *commonMsgDatabase) UserSetHasReadSeqs(ctx context.Context, userID stri
}
func (db *commonMsgDatabase) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
return db.seqUser.SetUserReadSeq(ctx, conversationID, userID, hasReadSeq)
return db.seqUser.SetUserReadSeq(ctx, conversationID, userID, hasReadSeq, false)
}
func (db *commonMsgDatabase) SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
return db.seqUser.SetUserReadSeq(ctx, conversationID, userID, hasReadSeq, true)
}
func (db *commonMsgDatabase) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {

Loading…
Cancel
Save