From 3b6463de35477db5b1c5106c3e73ebab5500dd92 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Mon, 26 Aug 2024 14:55:09 +0800 Subject: [PATCH] feat: online status supports redis cluster --- go.mod | 2 +- go.sum | 4 +-- internal/push/push_handler.go | 4 +-- internal/rpc/conversation/conversaion.go | 5 ++++ pkg/common/storage/cache/redis/online.go | 26 +++++++++++----- pkg/common/storage/cache/redis/online_test.go | 30 ++++++++++++------- 6 files changed, 48 insertions(+), 23 deletions(-) diff --git a/go.mod b/go.mod index abfb4bab3..bc1b976b2 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.6 + github.com/openimsdk/protocol v0.0.72-alpha.7 github.com/openimsdk/tools v0.0.49-alpha.55 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index 55490ed6a..9b347423a 100644 --- a/go.sum +++ b/go.sum @@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.6 h1:FzSzXJtSyXYSAewt7vPaQf0DuU8T7QS6ePy68F7bXk8= -github.com/openimsdk/protocol v0.0.72-alpha.6/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.72-alpha.7 h1:OJTJ+FT/ujOg3+zGRt3ivebIJ9NTptLOGzW4QvwvWjs= +github.com/openimsdk/protocol v0.0.72-alpha.7/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.49-alpha.55 h1:KPgC53oqiwZYssLKljhtXbWXifMlTj2SSQEusj4Uf4k= github.com/openimsdk/tools v0.0.49-alpha.55/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index bac09729d..8979214db 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -151,7 +151,7 @@ func (c *ConsumerHandler) loopRead() { maxSeq := make(map[markKey]*markSeq, 1024*8) queue := memamq.NewMemoryQueue(32, 1024) defer queue.Stop() - ticker := time.NewTicker(time.Second * 1) + ticker := time.NewTicker(time.Second * 10) defer ticker.Stop() for { select { @@ -179,7 +179,7 @@ func (c *ConsumerHandler) loopRead() { go func() { for i := range markSeqs { seq := markSeqs[i] - queue.PushCtx(ctx, func() { + _ = queue.PushCtx(ctx, func() { ctx = mcontext.SetOperationID(ctx, opIDPrefix+strconv.FormatUint(incr.Add(1), 10)) _, err := c.msgRpcClient.Client.SetConversationHasReadSeq(ctx, &pbchat.SetConversationHasReadSeqReq{ ConversationID: seq.ConversationID, diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index f00c970e6..e03b96900 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -52,6 +52,11 @@ type conversationServer struct { config *Config } +func (c *conversationServer) SetConversationMinSeq(ctx context.Context, req *pbconversation.SetConversationMinSeqReq) (*pbconversation.SetConversationMinSeqResp, error) { + //TODO implement me + panic("implement me") +} + type Config struct { RpcConfig config.Conversation RedisConfig config.Redis diff --git a/pkg/common/storage/cache/redis/online.go b/pkg/common/storage/cache/redis/online.go index a012e1cd2..713e84e5c 100644 --- a/pkg/common/storage/cache/redis/online.go +++ b/pkg/common/storage/cache/redis/online.go @@ -8,6 +8,7 @@ import ( "github.com/openimsdk/tools/log" "github.com/redis/go-redis/v9" "strconv" + "strings" "time" ) @@ -66,11 +67,10 @@ func (s *userOnline) SetUserOnline(ctx context.Context, userID string, online, o local change = (num1 ~= num2) or (num2 ~= num3) if change then local members = redis.call("ZRANGE", key, 0, -1) - table.insert(members, KEYS[2]) - redis.call("PUBLISH", KEYS[3], table.concat(members, ":")) - return 1 + table.insert(members, "1") + return members else - return 0 + return {"0"} end ` now := time.Now() @@ -82,12 +82,24 @@ func (s *userOnline) SetUserOnline(ctx context.Context, userID string, online, o for _, platformID := range online { argv = append(argv, platformID) } - keys := []string{s.getUserOnlineKey(userID), userID, s.channelName} - status, err := s.rdb.Eval(ctx, script, keys, argv).Result() + keys := []string{s.getUserOnlineKey(userID), userID} + platformIDs, err := s.rdb.Eval(ctx, script, keys, argv).StringSlice() if err != nil { log.ZError(ctx, "redis SetUserOnline", err, "userID", userID, "online", online, "offline", offline) return err } - log.ZDebug(ctx, "redis SetUserOnline", "userID", userID, "online", online, "offline", offline, "status", status) + if len(platformIDs) == 0 { + return errs.ErrInternalServer.WrapMsg("SetUserOnline redis lua invalid return value") + } + if platformIDs[len(platformIDs)-1] != "0" { + log.ZDebug(ctx, "redis SetUserOnline push", "userID", userID, "online", online, "offline", offline, "platformIDs", platformIDs[:len(platformIDs)-1]) + platformIDs[len(platformIDs)-1] = userID + msg := strings.Join(platformIDs, ":") + if err := s.rdb.Publish(ctx, s.channelName, msg).Err(); err != nil { + return errs.Wrap(err) + } + } else { + log.ZDebug(ctx, "redis SetUserOnline not push", "userID", userID, "online", online, "offline", offline) + } return nil } diff --git a/pkg/common/storage/cache/redis/online_test.go b/pkg/common/storage/cache/redis/online_test.go index 2ce943252..2bda46582 100644 --- a/pkg/common/storage/cache/redis/online_test.go +++ b/pkg/common/storage/cache/redis/online_test.go @@ -18,18 +18,20 @@ maxRetry: 10 */ func TestName111111(t *testing.T) { conf := config.Redis{ - Address: []string{ - "172.16.8.48:7001", - "172.16.8.48:7002", - "172.16.8.48:7003", - "172.16.8.48:7004", - "172.16.8.48:7005", - "172.16.8.48:7006", - }, - ClusterMode: true, - Password: "passwd123", + //Address: []string{ + // "172.16.8.48:7001", + // "172.16.8.48:7002", + // "172.16.8.48:7003", + // "172.16.8.48:7004", + // "172.16.8.48:7005", + // "172.16.8.48:7006", + //}, + //ClusterMode: true, + //Password: "passwd123", + Address: []string{"localhost:16379"}, + Password: "openIM123", } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*1000) defer cancel() rdb, err := redisutil.NewRedisClient(ctx, conf.Build()) if err != nil { @@ -39,5 +41,11 @@ func TestName111111(t *testing.T) { userID := "123456" t.Log(online.GetOnline(ctx, userID)) + t.Log(online.SetUserOnline(ctx, userID, []int32{1, 2, 3, 4}, nil)) + t.Log(online.GetOnline(ctx, userID)) + +} + +func TestName111(t *testing.T) { }