diff --git a/go.mod b/go.mod index fba1499fe..abfb4bab3 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.69 + github.com/openimsdk/protocol v0.0.72-alpha.6 github.com/openimsdk/tools v0.0.49-alpha.55 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 @@ -41,6 +41,7 @@ require ( github.com/spf13/viper v1.18.2 github.com/stathat/consistent v1.0.0 go.uber.org/automaxprocs v1.5.3 + golang.org/x/exp v0.0.0-20230905200255-921286631fa9 golang.org/x/sync v0.6.0 ) @@ -74,7 +75,6 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/bytedance/sonic v1.9.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect - github.com/chai2010/webp v1.1.1 // indirect github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect github.com/clbanning/mxj v1.8.4 // indirect github.com/coreos/go-semver v0.3.0 // indirect @@ -170,7 +170,6 @@ require ( go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/arch v0.3.0 // indirect - golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect golang.org/x/image v0.15.0 // indirect golang.org/x/net v0.22.0 // indirect golang.org/x/oauth2 v0.17.0 // indirect diff --git a/go.sum b/go.sum index 1a8e1d76d..55490ed6a 100644 --- a/go.sum +++ b/go.sum @@ -71,8 +71,6 @@ github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZX github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/chai2010/webp v1.1.1 h1:jTRmEccAJ4MGrhFOrPMpNGIJ/eybIgwKpcACsrTEapk= -github.com/chai2010/webp v1.1.1/go.mod h1:0XVwvZWdjjdxpUEIf7b9g9VkHFnInUSYujwqTLEuldU= github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY= github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams= github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk= @@ -321,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.69 h1:dVi8meSg8kmUzSH1XQab4MjihqKkkcCAmt1BYXPJuXo= -github.com/openimsdk/protocol v0.0.69/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.72-alpha.6 h1:FzSzXJtSyXYSAewt7vPaQf0DuU8T7QS6ePy68F7bXk8= +github.com/openimsdk/protocol v0.0.72-alpha.6/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.49-alpha.55 h1:KPgC53oqiwZYssLKljhtXbWXifMlTj2SSQEusj4Uf4k= github.com/openimsdk/tools v0.0.49-alpha.55/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= diff --git a/internal/push/a_test.go b/internal/push/a_test.go new file mode 100644 index 000000000..8b2d86407 --- /dev/null +++ b/internal/push/a_test.go @@ -0,0 +1,29 @@ +package push + +import ( + "github.com/openimsdk/protocol/sdkws" + "testing" +) + +func TestName(t *testing.T) { + var c ConsumerHandler + c.readCh = make(chan *sdkws.MarkAsReadTips) + + go c.loopRead() + + go func() { + for i := 0; ; i++ { + seq := int64(i + 1) + if seq%3 == 0 { + seq = 1 + } + c.readCh <- &sdkws.MarkAsReadTips{ + ConversationID: "c100", + MarkAsReadUserID: "u100", + HasReadSeq: seq, + } + } + }() + + select {} +} diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 8ecb3dad1..bac09729d 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -17,6 +17,7 @@ package push import ( "context" "encoding/json" + "fmt" "github.com/IBM/sarama" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" @@ -35,11 +36,17 @@ import ( "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mq/kafka" + "github.com/openimsdk/tools/mq/memamq" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/jsonutil" "github.com/openimsdk/tools/utils/timeutil" "github.com/redis/go-redis/v9" "google.golang.org/protobuf/proto" + "math/rand" + "os" + "strconv" + "sync/atomic" + "time" ) type ConsumerHandler struct { @@ -54,6 +61,7 @@ type ConsumerHandler struct { groupRpcClient rpcclient.GroupRpcClient webhookClient *webhook.Client config *Config + readCh chan *sdkws.MarkAsReadTips } func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher, rdb redis.UniversalClient, @@ -76,6 +84,8 @@ func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher, consumerHandler.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL) consumerHandler.config = config consumerHandler.onlineCache = rpccache.NewOnlineCache(userRpcClient, consumerHandler.groupLocalCache, rdb, nil) + consumerHandler.readCh = make(chan *sdkws.MarkAsReadTips, 1024*8) + go consumerHandler.loopRead() return &consumerHandler, nil } @@ -89,6 +99,7 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) { MsgData: msgFromMQ.MsgData, ConversationID: msgFromMQ.ConversationID, } + c.handlerConversationRead(ctx, pbData.MsgData) sec := msgFromMQ.MsgData.SendTime / 1000 nowSec := timeutil.GetCurrentTimestampBySecond() log.ZDebug(ctx, "push msg", "msg", pbData.String(), "sec", sec, "nowSec", nowSec) @@ -118,6 +129,98 @@ func (*ConsumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil } func (*ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil } +func (c *ConsumerHandler) loopRead() { + type markKey struct { + ConversationID string + UserID string + } + type markSeq struct { + ReadSeq int64 + MarkSeq int64 + Count int64 + } + type asyncRequest struct { + ConversationID string + UserID string + ReadSeq int64 + } + ctx := context.Background() + ctx = mcontext.WithOpUserIDContext(ctx, c.config.Share.IMAdminUserID[0]) + opIDPrefix := fmt.Sprintf("mark_read_%d_%d_", os.Getpid(), rand.Uint32()) + var incr atomic.Uint64 + maxSeq := make(map[markKey]*markSeq, 1024*8) + queue := memamq.NewMemoryQueue(32, 1024) + defer queue.Stop() + ticker := time.NewTicker(time.Second * 1) + defer ticker.Stop() + for { + select { + case <-ticker.C: + var markSeqs []asyncRequest + for key, seq := range maxSeq { + if seq.MarkSeq >= seq.ReadSeq { + seq.Count++ + if seq.Count > 6 { + delete(maxSeq, key) + } + continue + } + seq.Count = 0 + seq.MarkSeq = seq.ReadSeq + markSeqs = append(markSeqs, asyncRequest{ + ConversationID: key.ConversationID, + UserID: key.UserID, + ReadSeq: seq.ReadSeq, + }) + } + if len(markSeqs) == 0 { + continue + } + go func() { + for i := range markSeqs { + seq := markSeqs[i] + queue.PushCtx(ctx, func() { + ctx = mcontext.SetOperationID(ctx, opIDPrefix+strconv.FormatUint(incr.Add(1), 10)) + _, err := c.msgRpcClient.Client.SetConversationHasReadSeq(ctx, &pbchat.SetConversationHasReadSeqReq{ + ConversationID: seq.ConversationID, + UserID: seq.UserID, + HasReadSeq: seq.ReadSeq, + NoNotification: true, + }) + if err != nil { + log.ZError(ctx, "ConsumerHandler SetConversationHasReadSeq", err, "conversationID", seq.ConversationID, "userID", seq.UserID, "readSeq", seq.ReadSeq) + } + }) + } + }() + + case tips, ok := <-c.readCh: + if !ok { + return + } + if tips.HasReadSeq <= 0 { + continue + } + key := markKey{ + ConversationID: tips.ConversationID, + UserID: tips.MarkAsReadUserID, + } + ms, ok := maxSeq[key] + if ok { + if ms.ReadSeq < tips.HasReadSeq { + ms.ReadSeq = tips.HasReadSeq + } + } else { + ms = &markSeq{ + ReadSeq: tips.HasReadSeq, + MarkSeq: 0, + } + maxSeq[key] = ms + } + } + } +} + func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { ctx := c.pushConsumerGroup.GetContextFromMsg(msg) @@ -215,6 +318,39 @@ func (c *ConsumerHandler) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws. return result, nil } +func (c *ConsumerHandler) handlerConversationRead(ctx context.Context, msg *sdkws.MsgData) { + if msg.ContentType != constant.HasReadReceipt { + return + } + var elem sdkws.NotificationElem + if err := json.Unmarshal(msg.Content, &elem); err != nil { + log.ZError(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg) + return + } + var tips sdkws.MarkAsReadTips + if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil { + log.ZError(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg) + return + } + if len(tips.Seqs) > 0 { + for _, seq := range tips.Seqs { + if tips.HasReadSeq < seq { + tips.HasReadSeq = seq + } + } + clear(tips.Seqs) + tips.Seqs = nil + } + if tips.HasReadSeq < 0 { + return + } + select { + case c.readCh <- &tips: + default: + log.ZWarn(ctx, "handlerConversationRead readCh is full", nil, "markAsReadTips", &tips) + } +} + func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { log.ZDebug(ctx, "Get group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID) var pushToUserIDs []string diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index aa12c9d0f..837b57f8d 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -66,6 +66,11 @@ type groupServer struct { webhookClient *webhook.Client } +func (s *groupServer) SetGroupInfoEX(ctx context.Context, req *pbgroup.SetGroupInfoEXReq) (*pbgroup.SetGroupInfoEXResp, error) { + //TODO implement me + panic("implement me") +} + type Config struct { RpcConfig config.Group RedisConfig config.Redis diff --git a/internal/rpc/msg/as_read.go b/internal/rpc/msg/as_read.go index bfba4824f..9a3174e8e 100644 --- a/internal/rpc/msg/as_read.go +++ b/internal/rpc/msg/as_read.go @@ -80,10 +80,16 @@ func (m *msgServer) SetConversationHasReadSeq(ctx context.Context, req *msg.SetC if req.HasReadSeq > maxSeq { return nil, errs.ErrArgs.WrapMsg("hasReadSeq must not be bigger than maxSeq") } - if err := m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq); err != nil { - return nil, err + if req.NoNotification { + if err := m.MsgDatabase.SetHasReadSeqToDB(ctx, req.UserID, req.ConversationID, req.HasReadSeq); err != nil { + return nil, err + } + } else { + if err := m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq); err != nil { + return nil, err + } + m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID, req.UserID, nil, req.HasReadSeq) } - m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID, req.UserID, nil, req.HasReadSeq) return &msg.SetConversationHasReadSeqResp{}, nil } diff --git a/pkg/apistruct/msg_test.go b/pkg/apistruct/msg_test.go new file mode 100644 index 000000000..28f878a9f --- /dev/null +++ b/pkg/apistruct/msg_test.go @@ -0,0 +1 @@ +package apistruct diff --git a/pkg/common/storage/cache/redis/online_test.go b/pkg/common/storage/cache/redis/online_test.go new file mode 100644 index 000000000..2ce943252 --- /dev/null +++ b/pkg/common/storage/cache/redis/online_test.go @@ -0,0 +1,43 @@ +package redis + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/tools/db/redisutil" + "testing" + "time" +) + +/* +address: [ 172.16.8.48:7001, 172.16.8.48:7002, 172.16.8.48:7003, 172.16.8.48:7004, 172.16.8.48:7005, 172.16.8.48:7006 ] +username: +password: passwd123 +clusterMode: true +db: 0 +maxRetry: 10 +*/ +func TestName111111(t *testing.T) { + conf := config.Redis{ + Address: []string{ + "172.16.8.48:7001", + "172.16.8.48:7002", + "172.16.8.48:7003", + "172.16.8.48:7004", + "172.16.8.48:7005", + "172.16.8.48:7006", + }, + ClusterMode: true, + Password: "passwd123", + } + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + rdb, err := redisutil.NewRedisClient(ctx, conf.Build()) + if err != nil { + panic(err) + } + online := NewUserOnline(rdb) + + userID := "123456" + t.Log(online.GetOnline(ctx, userID)) + +} diff --git a/pkg/common/storage/cache/redis/seq_user.go b/pkg/common/storage/cache/redis/seq_user.go index edbc66b21..5458b6594 100644 --- a/pkg/common/storage/cache/redis/seq_user.go +++ b/pkg/common/storage/cache/redis/seq_user.go @@ -73,15 +73,15 @@ func (s *seqUserCacheRedis) GetUserReadSeq(ctx context.Context, conversationID s }) } -func (s *seqUserCacheRedis) SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error { - if seq%s.readSeqWriteRatio == 0 { +func (s *seqUserCacheRedis) SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64, writeDB bool) error { + if err := s.rocks.RawSet(ctx, s.getSeqUserReadSeqKey(conversationID, userID), strconv.Itoa(int(seq)), s.readExpireTime); err != nil { + return errs.Wrap(err) + } + if writeDB { if err := s.mgo.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil { return err } } - if err := s.rocks.RawSet(ctx, s.getSeqUserReadSeqKey(conversationID, userID), strconv.Itoa(int(seq)), s.readExpireTime); err != nil { - return errs.Wrap(err) - } return nil } diff --git a/pkg/common/storage/cache/seq_user.go b/pkg/common/storage/cache/seq_user.go index 61dbc0ab4..5bb9cad04 100644 --- a/pkg/common/storage/cache/seq_user.go +++ b/pkg/common/storage/cache/seq_user.go @@ -8,7 +8,7 @@ type SeqUser interface { GetUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) SetUserMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error GetUserReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) - SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error + SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64, writeDB bool) error SetUserMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error SetUserReadSeqs(ctx context.Context, userID string, seqs map[string]int64) error GetUserReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index 8eb417f93..6a75afcb1 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -77,6 +77,7 @@ type CommonMsgDatabase interface { SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error + SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error @@ -337,7 +338,7 @@ func (db *commonMsgDatabase) DeleteMessagesFromCache(ctx context.Context, conver func (db *commonMsgDatabase) setHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error { for userID, seq := range userSeqMap { - if err := db.seqUser.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil { + if err := db.seqUser.SetUserReadSeq(ctx, conversationID, userID, seq, false); err != nil { return err } } @@ -805,7 +806,11 @@ func (db *commonMsgDatabase) UserSetHasReadSeqs(ctx context.Context, userID stri } func (db *commonMsgDatabase) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error { - return db.seqUser.SetUserReadSeq(ctx, conversationID, userID, hasReadSeq) + return db.seqUser.SetUserReadSeq(ctx, conversationID, userID, hasReadSeq, false) +} + +func (db *commonMsgDatabase) SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error { + return db.seqUser.SetUserReadSeq(ctx, conversationID, userID, hasReadSeq, true) } func (db *commonMsgDatabase) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {