feat: online status supports redis cluster

pull/2558/head
withchao 1 year ago
parent 6cf385bb31
commit 3b6463de35

@ -12,7 +12,7 @@ require (
github.com/gorilla/websocket v1.5.1 github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0 github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.72-alpha.6 github.com/openimsdk/protocol v0.0.72-alpha.7
github.com/openimsdk/tools v0.0.49-alpha.55 github.com/openimsdk/tools v0.0.49-alpha.55
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_golang v1.18.0

@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.6 h1:FzSzXJtSyXYSAewt7vPaQf0DuU8T7QS6ePy68F7bXk8= github.com/openimsdk/protocol v0.0.72-alpha.7 h1:OJTJ+FT/ujOg3+zGRt3ivebIJ9NTptLOGzW4QvwvWjs=
github.com/openimsdk/protocol v0.0.72-alpha.6/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/protocol v0.0.72-alpha.7/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.49-alpha.55 h1:KPgC53oqiwZYssLKljhtXbWXifMlTj2SSQEusj4Uf4k= github.com/openimsdk/tools v0.0.49-alpha.55 h1:KPgC53oqiwZYssLKljhtXbWXifMlTj2SSQEusj4Uf4k=
github.com/openimsdk/tools v0.0.49-alpha.55/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/openimsdk/tools v0.0.49-alpha.55/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=

@ -151,7 +151,7 @@ func (c *ConsumerHandler) loopRead() {
maxSeq := make(map[markKey]*markSeq, 1024*8) maxSeq := make(map[markKey]*markSeq, 1024*8)
queue := memamq.NewMemoryQueue(32, 1024) queue := memamq.NewMemoryQueue(32, 1024)
defer queue.Stop() defer queue.Stop()
ticker := time.NewTicker(time.Second * 1) ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
@ -179,7 +179,7 @@ func (c *ConsumerHandler) loopRead() {
go func() { go func() {
for i := range markSeqs { for i := range markSeqs {
seq := markSeqs[i] seq := markSeqs[i]
queue.PushCtx(ctx, func() { _ = queue.PushCtx(ctx, func() {
ctx = mcontext.SetOperationID(ctx, opIDPrefix+strconv.FormatUint(incr.Add(1), 10)) ctx = mcontext.SetOperationID(ctx, opIDPrefix+strconv.FormatUint(incr.Add(1), 10))
_, err := c.msgRpcClient.Client.SetConversationHasReadSeq(ctx, &pbchat.SetConversationHasReadSeqReq{ _, err := c.msgRpcClient.Client.SetConversationHasReadSeq(ctx, &pbchat.SetConversationHasReadSeqReq{
ConversationID: seq.ConversationID, ConversationID: seq.ConversationID,

@ -52,6 +52,11 @@ type conversationServer struct {
config *Config config *Config
} }
func (c *conversationServer) SetConversationMinSeq(ctx context.Context, req *pbconversation.SetConversationMinSeqReq) (*pbconversation.SetConversationMinSeqResp, error) {
//TODO implement me
panic("implement me")
}
type Config struct { type Config struct {
RpcConfig config.Conversation RpcConfig config.Conversation
RedisConfig config.Redis RedisConfig config.Redis

@ -8,6 +8,7 @@ import (
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"strconv" "strconv"
"strings"
"time" "time"
) )
@ -66,11 +67,10 @@ func (s *userOnline) SetUserOnline(ctx context.Context, userID string, online, o
local change = (num1 ~= num2) or (num2 ~= num3) local change = (num1 ~= num2) or (num2 ~= num3)
if change then if change then
local members = redis.call("ZRANGE", key, 0, -1) local members = redis.call("ZRANGE", key, 0, -1)
table.insert(members, KEYS[2]) table.insert(members, "1")
redis.call("PUBLISH", KEYS[3], table.concat(members, ":")) return members
return 1
else else
return 0 return {"0"}
end end
` `
now := time.Now() now := time.Now()
@ -82,12 +82,24 @@ func (s *userOnline) SetUserOnline(ctx context.Context, userID string, online, o
for _, platformID := range online { for _, platformID := range online {
argv = append(argv, platformID) argv = append(argv, platformID)
} }
keys := []string{s.getUserOnlineKey(userID), userID, s.channelName} keys := []string{s.getUserOnlineKey(userID), userID}
status, err := s.rdb.Eval(ctx, script, keys, argv).Result() platformIDs, err := s.rdb.Eval(ctx, script, keys, argv).StringSlice()
if err != nil { if err != nil {
log.ZError(ctx, "redis SetUserOnline", err, "userID", userID, "online", online, "offline", offline) log.ZError(ctx, "redis SetUserOnline", err, "userID", userID, "online", online, "offline", offline)
return err return err
} }
log.ZDebug(ctx, "redis SetUserOnline", "userID", userID, "online", online, "offline", offline, "status", status) if len(platformIDs) == 0 {
return errs.ErrInternalServer.WrapMsg("SetUserOnline redis lua invalid return value")
}
if platformIDs[len(platformIDs)-1] != "0" {
log.ZDebug(ctx, "redis SetUserOnline push", "userID", userID, "online", online, "offline", offline, "platformIDs", platformIDs[:len(platformIDs)-1])
platformIDs[len(platformIDs)-1] = userID
msg := strings.Join(platformIDs, ":")
if err := s.rdb.Publish(ctx, s.channelName, msg).Err(); err != nil {
return errs.Wrap(err)
}
} else {
log.ZDebug(ctx, "redis SetUserOnline not push", "userID", userID, "online", online, "offline", offline)
}
return nil return nil
} }

@ -18,18 +18,20 @@ maxRetry: 10
*/ */
func TestName111111(t *testing.T) { func TestName111111(t *testing.T) {
conf := config.Redis{ conf := config.Redis{
Address: []string{ //Address: []string{
"172.16.8.48:7001", // "172.16.8.48:7001",
"172.16.8.48:7002", // "172.16.8.48:7002",
"172.16.8.48:7003", // "172.16.8.48:7003",
"172.16.8.48:7004", // "172.16.8.48:7004",
"172.16.8.48:7005", // "172.16.8.48:7005",
"172.16.8.48:7006", // "172.16.8.48:7006",
}, //},
ClusterMode: true, //ClusterMode: true,
Password: "passwd123", //Password: "passwd123",
Address: []string{"localhost:16379"},
Password: "openIM123",
} }
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) ctx, cancel := context.WithTimeout(context.Background(), time.Second*1000)
defer cancel() defer cancel()
rdb, err := redisutil.NewRedisClient(ctx, conf.Build()) rdb, err := redisutil.NewRedisClient(ctx, conf.Build())
if err != nil { if err != nil {
@ -39,5 +41,11 @@ func TestName111111(t *testing.T) {
userID := "123456" userID := "123456"
t.Log(online.GetOnline(ctx, userID)) t.Log(online.GetOnline(ctx, userID))
t.Log(online.SetUserOnline(ctx, userID, []int32{1, 2, 3, 4}, nil))
t.Log(online.GetOnline(ctx, userID))
}
func TestName111(t *testing.T) {
} }

Loading…
Cancel
Save