fix: read seq is written to mongo, online status redis cluster is supported (#2558)

* fix: GroupApplicationAcceptedNotification

* fix: GroupApplicationAcceptedNotification

* fix: NotificationUserInfoUpdate

* cicd: robot automated Change

* fix: component

* fix: getConversationInfo

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

* fix: minio config url recognition error

* update gomake version

* update gomake version

* fix: seq conversion bug

* fix: redis pipe exec

* fix: ImportFriends

* fix: A large number of logs keysAndValues ​​length is not even

* feat: mark read aggregate write

* feat: online status supports redis cluster

* feat: online status supports redis cluster

* feat: online status supports redis cluster

* merge

* merge

* read seq is written to mongo

* read seq is written to mongo

---------

Co-authored-by: withchao <withchao@users.noreply.github.com>
pull/2564/head
chao 5 months ago committed by GitHub
parent 86a325f309
commit 85614da36f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -319,8 +319,7 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.8 h1:MhxSsdxXx2ZaeSLQk4uFftsB5L2rPh1Qup+dURQNzXQ=
github.com/openimsdk/protocol v0.0.72-alpha.8/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/protocol v0.0.72-alpha.9 h1:Dyx4vs88IU4rJ2YcP/TdYp4ww8JjsMkV89hB/Eazx+A=
github.com/openimsdk/protocol v0.0.72-alpha.9/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.49-alpha.55 h1:KPgC53oqiwZYssLKljhtXbWXifMlTj2SSQEusj4Uf4k=
github.com/openimsdk/tools v0.0.49-alpha.55/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=

@ -16,6 +16,7 @@ package msgtransfer
import (
"context"
"encoding/json"
"errors"
"github.com/IBM/sarama"
"github.com/go-redis/redis"
@ -89,6 +90,7 @@ func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database cont
och.conversationRpcClient = conversationRpcClient
och.groupRpcClient = groupRpcClient
och.historyConsumerGroup = historyConsumerGroup
return &och, err
}
func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID int, val *batcher.Msg[sarama.ConsumerMessage]) {
@ -97,6 +99,7 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID
ctx = withAggregationCtx(ctx, ctxMessages)
log.ZInfo(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMessages),
"key", val.Key())
och.doSetReadSeq(ctx, ctxMessages)
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList :=
och.categorizeMessageLists(ctxMessages)
@ -110,6 +113,60 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID
och.handleNotification(ctx, val.Key(), conversationIDNotification, storageNotificationList, notStorageNotificationList)
}
func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) {
type seqKey struct {
conversationID string
userID string
}
var readSeq map[seqKey]int64
for _, msg := range msgs {
if msg.message.ContentType != constant.HasReadReceipt {
continue
}
var elem sdkws.NotificationElem
if err := json.Unmarshal(msg.message.Content, &elem); err != nil {
log.ZError(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg)
continue
}
var tips sdkws.MarkAsReadTips
if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil {
log.ZError(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg)
continue
}
if len(tips.Seqs) > 0 {
for _, seq := range tips.Seqs {
if tips.HasReadSeq < seq {
tips.HasReadSeq = seq
}
}
clear(tips.Seqs)
tips.Seqs = nil
}
if tips.HasReadSeq < 0 {
continue
}
if readSeq == nil {
readSeq = make(map[seqKey]int64)
}
key := seqKey{
conversationID: tips.ConversationID,
userID: tips.MarkAsReadUserID,
}
if readSeq[key] > tips.HasReadSeq {
continue
}
readSeq[key] = tips.HasReadSeq
}
if readSeq == nil {
return
}
for key, seq := range readSeq {
if err := och.msgDatabase.SetHasReadSeqToDB(ctx, key.userID, key.conversationID, seq); err != nil {
log.ZError(ctx, "set read seq to db error", err, "userID", key.userID, "conversationID", key.conversationID, "seq", seq)
}
}
}
func (och *OnlineHistoryRedisConsumerHandler) parseConsumerMessages(ctx context.Context, consumerMessages []*sarama.ConsumerMessage) []*ContextMsg {
var ctxMessages []*ContextMsg
for i := 0; i < len(consumerMessages); i++ {

@ -16,7 +16,6 @@ package msgtransfer
import (
"context"
"github.com/IBM/sarama"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"

@ -0,0 +1,29 @@
package push
import (
"github.com/openimsdk/protocol/sdkws"
"testing"
)
func TestName(t *testing.T) {
var c ConsumerHandler
c.readCh = make(chan *sdkws.MarkAsReadTips)
go c.loopRead()
go func() {
for i := 0; ; i++ {
seq := int64(i + 1)
if seq%3 == 0 {
seq = 1
}
c.readCh <- &sdkws.MarkAsReadTips{
ConversationID: "c100",
MarkAsReadUserID: "u100",
HasReadSeq: seq,
}
}
}()
select {}
}

@ -0,0 +1 @@
package apistruct

@ -8,6 +8,7 @@ import (
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
"strconv"
"strings"
"time"
)
@ -66,11 +67,10 @@ func (s *userOnline) SetUserOnline(ctx context.Context, userID string, online, o
local change = (num1 ~= num2) or (num2 ~= num3)
if change then
local members = redis.call("ZRANGE", key, 0, -1)
table.insert(members, KEYS[2])
redis.call("PUBLISH", KEYS[3], table.concat(members, ":"))
return 1
table.insert(members, "1")
return members
else
return 0
return {"0"}
end
`
now := time.Now()
@ -82,12 +82,24 @@ func (s *userOnline) SetUserOnline(ctx context.Context, userID string, online, o
for _, platformID := range online {
argv = append(argv, platformID)
}
keys := []string{s.getUserOnlineKey(userID), userID, s.channelName}
status, err := s.rdb.Eval(ctx, script, keys, argv).Result()
keys := []string{s.getUserOnlineKey(userID)}
platformIDs, err := s.rdb.Eval(ctx, script, keys, argv).StringSlice()
if err != nil {
log.ZError(ctx, "redis SetUserOnline", err, "userID", userID, "online", online, "offline", offline)
return err
}
log.ZDebug(ctx, "redis SetUserOnline", "userID", userID, "online", online, "offline", offline, "status", status)
if len(platformIDs) == 0 {
return errs.ErrInternalServer.WrapMsg("SetUserOnline redis lua invalid return value")
}
if platformIDs[len(platformIDs)-1] != "0" {
log.ZDebug(ctx, "redis SetUserOnline push", "userID", userID, "online", online, "offline", offline, "platformIDs", platformIDs[:len(platformIDs)-1])
platformIDs[len(platformIDs)-1] = userID
msg := strings.Join(platformIDs, ":")
if err := s.rdb.Publish(ctx, s.channelName, msg).Err(); err != nil {
return errs.Wrap(err)
}
} else {
log.ZDebug(ctx, "redis SetUserOnline not push", "userID", userID, "online", online, "offline", offline)
}
return nil
}

@ -0,0 +1,51 @@
package redis
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/db/redisutil"
"testing"
"time"
)
/*
address: [ 172.16.8.48:7001, 172.16.8.48:7002, 172.16.8.48:7003, 172.16.8.48:7004, 172.16.8.48:7005, 172.16.8.48:7006 ]
username:
password: passwd123
clusterMode: true
db: 0
maxRetry: 10
*/
func TestName111111(t *testing.T) {
conf := config.Redis{
Address: []string{
"172.16.8.124:7001",
"172.16.8.124:7002",
"172.16.8.124:7003",
"172.16.8.124:7004",
"172.16.8.124:7005",
"172.16.8.124:7006",
},
ClusterMode: true,
Password: "passwd123",
//Address: []string{"localhost:16379"},
//Password: "openIM123",
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1000)
defer cancel()
rdb, err := redisutil.NewRedisClient(ctx, conf.Build())
if err != nil {
panic(err)
}
online := NewUserOnline(rdb)
userID := "a123456"
t.Log(online.GetOnline(ctx, userID))
t.Log(online.SetUserOnline(ctx, userID, []int32{1, 2, 3, 4}, nil))
t.Log(online.GetOnline(ctx, userID))
}
func TestName111(t *testing.T) {
}

@ -74,17 +74,22 @@ func (s *seqUserCacheRedis) GetUserReadSeq(ctx context.Context, conversationID s
}
func (s *seqUserCacheRedis) SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
if seq%s.readSeqWriteRatio == 0 {
if err := s.mgo.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
dbSeq, err := s.GetUserReadSeq(ctx, conversationID, userID)
if err != nil {
return err
}
if err := s.rocks.RawSet(ctx, s.getSeqUserReadSeqKey(conversationID, userID), strconv.Itoa(int(seq)), s.readExpireTime); err != nil {
return errs.Wrap(err)
if dbSeq < seq {
if err := s.rocks.RawSet(ctx, s.getSeqUserReadSeqKey(conversationID, userID), strconv.Itoa(int(seq)), s.readExpireTime); err != nil {
return errs.Wrap(err)
}
}
return nil
}
func (s *seqUserCacheRedis) SetUserReadSeqToDB(ctx context.Context, conversationID string, userID string, seq int64) error {
return s.mgo.SetUserReadSeq(ctx, conversationID, userID, seq)
}
func (s *seqUserCacheRedis) SetUserMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
keys := make([]string, 0, len(seqs))
for conversationID, seq := range seqs {
@ -128,13 +133,6 @@ func (s *seqUserCacheRedis) SetUserReadSeqs(ctx context.Context, userID string,
if err := s.setUserRedisReadSeqs(ctx, userID, seqs); err != nil {
return err
}
for conversationID, seq := range seqs {
if seq%s.readSeqWriteRatio == 0 {
if err := s.mgo.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
}
}
return nil
}

@ -9,6 +9,7 @@ type SeqUser interface {
SetUserMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetUserReadSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error
SetUserReadSeqToDB(ctx context.Context, conversationID string, userID string, seq int64) error
SetUserMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error
SetUserReadSeqs(ctx context.Context, userID string, seqs map[string]int64) error
GetUserReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)

@ -77,6 +77,7 @@ type CommonMsgDatabase interface {
SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error)
SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)
GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error)
UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error
@ -808,6 +809,10 @@ func (db *commonMsgDatabase) SetHasReadSeq(ctx context.Context, userID string, c
return db.seqUser.SetUserReadSeq(ctx, conversationID, userID, hasReadSeq)
}
func (db *commonMsgDatabase) SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
return db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, hasReadSeq)
}
func (db *commonMsgDatabase) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
return db.seqUser.GetUserReadSeqs(ctx, userID, conversationIDs)
}

@ -115,5 +115,12 @@ func (s *seqUserMongo) GetUserReadSeqs(ctx context.Context, userID string, conve
}
func (s *seqUserMongo) SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
dbSeq, err := s.GetUserReadSeq(ctx, conversationID, userID)
if err != nil {
return err
}
if dbSeq > seq {
return nil
}
return s.setSeq(ctx, conversationID, userID, seq, "read_seq")
}

Loading…
Cancel
Save