From 86a325f309f0598abf4e5d0a60ee5d893e62c0bd Mon Sep 17 00:00:00 2001 From: icey-yu <119291641+icey-yu@users.noreply.github.com> Date: Mon, 26 Aug 2024 18:15:05 +0800 Subject: [PATCH 1/2] Fix push (#2559) * fix:log * fix: add log * fix: del return * fix: push config * feat: add push err log and extend push wait time * feat: group config * feat: add log in write binary msg * feat: Modify Prometheus data scraping ports and reserve port space. * feat: change group rpc num * feat: change log * fix: remove quotation mark --- config/openim-api.yml | 2 +- config/openim-msggateway.yml | 2 +- config/openim-msgtransfer.yml | 2 +- config/openim-push.yml | 4 ++-- config/openim-rpc-auth.yml | 2 +- config/openim-rpc-conversation.yml | 2 +- config/openim-rpc-friend.yml | 2 +- config/openim-rpc-group.yml | 4 ++-- config/openim-rpc-msg.yml | 2 +- config/openim-rpc-third.yml | 2 +- config/openim-rpc-user.yml | 2 +- config/prometheus.yml | 26 +++++++++++++------------- internal/msggateway/client.go | 2 ++ internal/push/push_handler.go | 4 ++-- pkg/rpcclient/msg.go | 4 +++- start-config.yml | 2 +- 16 files changed, 34 insertions(+), 30 deletions(-) diff --git a/config/openim-api.yml b/config/openim-api.yml index 78a688fcd..9f53038d8 100644 --- a/config/openim-api.yml +++ b/config/openim-api.yml @@ -8,6 +8,6 @@ prometheus: # Whether to enable prometheus enable: true # Prometheus listening ports, must match the number of api.ports - ports: [ 20113 ] + ports: [ 20502 ] # This address can be accessed via a browser grafanaURL: http://127.0.0.1:13000/ diff --git a/config/openim-msggateway.yml b/config/openim-msggateway.yml index d8068c443..63332740f 100644 --- a/config/openim-msggateway.yml +++ b/config/openim-msggateway.yml @@ -8,7 +8,7 @@ prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup - ports: [ 20112 ] + ports: [ 20640 ] # IP address that the RPC/WebSocket service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP listenIP: 0.0.0.0 diff --git a/config/openim-msgtransfer.yml b/config/openim-msgtransfer.yml index 07a7dc1ab..e71a218ed 100644 --- a/config/openim-msgtransfer.yml +++ b/config/openim-msgtransfer.yml @@ -3,4 +3,4 @@ prometheus: enable: true # List of ports that Prometheus listens on; each port corresponds to an instance of monitoring. Ensure these are managed accordingly # Because four instances have been launched, four ports need to be specified - ports: [ 20108, 20109, 20110, 20111 ] + ports: [ 20600, 20601, 20602, 20603 ] diff --git a/config/openim-push.yml b/config/openim-push.yml index 1e55cdee8..70aa5997f 100644 --- a/config/openim-push.yml +++ b/config/openim-push.yml @@ -4,13 +4,13 @@ rpc: # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP listenIP: 0.0.0.0 # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports - ports: [ 10170 ] + ports: [ 10170, 10171, 10172, 10173 ] prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup - ports: [ 20107 ] + ports: [ 20670, 20671, 20672, 20673 ] maxConcurrentWorkers: 3 #Use geTui for offline push notifications, or choose fcm or jpns; corresponding configuration settings must be specified. diff --git a/config/openim-rpc-auth.yml b/config/openim-rpc-auth.yml index 979eca06f..c55c745b6 100644 --- a/config/openim-rpc-auth.yml +++ b/config/openim-rpc-auth.yml @@ -10,7 +10,7 @@ prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup - ports: [ 20106 ] + ports: [ 20660 ] tokenPolicy: # Token validity period, in days diff --git a/config/openim-rpc-conversation.yml b/config/openim-rpc-conversation.yml index d3f822501..00c9c5aab 100644 --- a/config/openim-rpc-conversation.yml +++ b/config/openim-rpc-conversation.yml @@ -10,4 +10,4 @@ prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup - ports: [ 20105 ] + ports: [ 20680 ] diff --git a/config/openim-rpc-friend.yml b/config/openim-rpc-friend.yml index 8d5869ce8..afac3c5db 100644 --- a/config/openim-rpc-friend.yml +++ b/config/openim-rpc-friend.yml @@ -10,4 +10,4 @@ prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup - ports: [ 20104 ] + ports: [ 20620 ] diff --git a/config/openim-rpc-group.yml b/config/openim-rpc-group.yml index 5fff8b9fe..d0243b5fb 100644 --- a/config/openim-rpc-group.yml +++ b/config/openim-rpc-group.yml @@ -1,6 +1,6 @@ rpc: # The IP address where this RPC service registers itself; if left blank, it defaults to the internal network IP - registerIP: + registerIP: # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP listenIP: 0.0.0.0 # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports @@ -10,7 +10,7 @@ prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup - ports: [ 20103 ] + ports: [ 20650 ] enableHistoryForNewMembers: true \ No newline at end of file diff --git a/config/openim-rpc-msg.yml b/config/openim-rpc-msg.yml index 38cc46ecf..15840c7f3 100644 --- a/config/openim-rpc-msg.yml +++ b/config/openim-rpc-msg.yml @@ -10,7 +10,7 @@ prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup - ports: [ 20102 ] + ports: [ 20630 ] # Does sending messages require friend verification diff --git a/config/openim-rpc-third.yml b/config/openim-rpc-third.yml index 408251f4d..512ca391f 100644 --- a/config/openim-rpc-third.yml +++ b/config/openim-rpc-third.yml @@ -10,7 +10,7 @@ prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup - ports: [ 20101 ] + ports: [ 20690 ] object: diff --git a/config/openim-rpc-user.yml b/config/openim-rpc-user.yml index ec9bb30dc..1958bbc6a 100644 --- a/config/openim-rpc-user.yml +++ b/config/openim-rpc-user.yml @@ -10,7 +10,7 @@ prometheus: # Whether to enable prometheus enable: true # Prometheus listening ports, must be consistent with the number of rpc.ports - ports: [ 20100 ] + ports: [ 20610 ] diff --git a/config/prometheus.yml b/config/prometheus.yml index c7ce4a489..627cf9411 100644 --- a/config/prometheus.yml +++ b/config/prometheus.yml @@ -19,65 +19,65 @@ rule_files: # A scrape configuration containing exactly one endpoint to scrape: # Here it's Prometheus itself. scrape_configs: - # The job name is added as a label job=job_name" to any timeseries scraped from this config. + # The job name is added as a label "job=job_name" to any timeseries scraped from this config. # Monitored information captured by prometheus # prometheus fetches application services - job_name: node_exporter static_configs: - - targets: [ internal_ip:20114 ] + - targets: [ internal_ip:20500 ] - job_name: openimserver-openim-api static_configs: - - targets: [ internal_ip:20113 ] + - targets: [ internal_ip:20502 ] labels: namespace: default - job_name: openimserver-openim-msggateway static_configs: - - targets: [ internal_ip:20112 ] + - targets: [ internal_ip:20640 ] labels: namespace: default - job_name: openimserver-openim-msgtransfer static_configs: - - targets: [ internal_ip:20111, internal_ip:20110, internal_ip:20109, internal_ip:20108 ] + - targets: [ internal_ip:20600, internal_ip:20601, internal_ip:20602, internal_ip:20603 ] labels: namespace: default - job_name: openimserver-openim-push static_configs: - - targets: [ internal_ip:20107 ] + - targets: [ internal_ip:20670, internal_ip:20671, internal_ip:20672, internal_ip:20673] labels: namespace: default - job_name: openimserver-openim-rpc-auth static_configs: - - targets: [ internal_ip:20106 ] + - targets: [ internal_ip:20600 ] labels: namespace: default - job_name: openimserver-openim-rpc-conversation static_configs: - - targets: [ internal_ip:20105 ] + - targets: [ internal_ip:20680 ] labels: namespace: default - job_name: openimserver-openim-rpc-friend static_configs: - - targets: [ internal_ip:20104 ] + - targets: [ internal_ip:20620 ] labels: namespace: default - job_name: openimserver-openim-rpc-group static_configs: - - targets: [ internal_ip:20103 ] + - targets: [ internal_ip:20650 ] labels: namespace: default - job_name: openimserver-openim-rpc-msg static_configs: - - targets: [ internal_ip:20102 ] + - targets: [ internal_ip:20630 ] labels: namespace: default - job_name: openimserver-openim-rpc-third static_configs: - - targets: [ internal_ip:20101 ] + - targets: [ internal_ip:20690 ] labels: namespace: default - job_name: openimserver-openim-rpc-user static_configs: - - targets: [ internal_ip:20100 ] + - targets: [ internal_ip:20610 ] labels: namespace: default \ No newline at end of file diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index a4902570a..dcb15b70d 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -271,11 +271,13 @@ func (c *Client) replyMessage(ctx context.Context, binaryReq *Req, err error, re ErrMsg: errResp.ErrMsg, Data: resp, } + t := time.Now() log.ZDebug(ctx, "gateway reply message", "resp", mReply.String()) err = c.writeBinaryMsg(mReply) if err != nil { log.ZWarn(ctx, "wireBinaryMsg replyMessage", err, "resp", mReply.String()) } + log.ZDebug(ctx, "wireBinaryMsg end", "time cost", time.Since(t)) if binaryReq.ReqIdentifier == WsLogoutMsg { return errs.New("user logout", "operationID", binaryReq.OperationID).Wrap() diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 8ecb3dad1..79d3a9296 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -91,9 +91,9 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) { } sec := msgFromMQ.MsgData.SendTime / 1000 nowSec := timeutil.GetCurrentTimestampBySecond() - log.ZDebug(ctx, "push msg", "msg", pbData.String(), "sec", sec, "nowSec", nowSec) + if nowSec-sec > 10 { - return + log.ZWarn(ctx, "long time push msg", nil, "msg", pbData.String(), "sec", sec, "nowSec", nowSec, "nowSec-sec", nowSec-sec) } var err error switch msgFromMQ.MsgData.SessionType { diff --git a/pkg/rpcclient/msg.go b/pkg/rpcclient/msg.go index 72d5aab95..958cb69a6 100644 --- a/pkg/rpcclient/msg.go +++ b/pkg/rpcclient/msg.go @@ -355,7 +355,9 @@ func (s *NotificationSender) send(ctx context.Context, sendID, recvID string, co } func (s *NotificationSender) NotificationWithSessionType(ctx context.Context, sendID, recvID string, contentType, sessionType int32, m proto.Message, opts ...NotificationOptions) { - s.queue.Push(func() { s.send(ctx, sendID, recvID, contentType, sessionType, m, opts...) }) + if err := s.queue.Push(func() { s.send(ctx, sendID, recvID, contentType, sessionType, m, opts...) }); err != nil { + log.ZWarn(ctx, "Push to queue failed", err, "sendID", sendID, "recvID", recvID, "msg", jsonutil.StructToJsonString(m)) + } } func (s *NotificationSender) Notification(ctx context.Context, sendID, recvID string, contentType int32, m proto.Message, opts ...NotificationOptions) { diff --git a/start-config.yml b/start-config.yml index 21436d7a9..a6d3e47af 100644 --- a/start-config.yml +++ b/start-config.yml @@ -3,7 +3,7 @@ serviceBinaries: openim-crontask: 1 openim-rpc-user: 1 openim-msggateway: 1 - openim-push: 1 + openim-push: 4 openim-msgtransfer: 4 openim-rpc-conversation: 1 openim-rpc-auth: 1 From 85614da36ff0eb925d91191ee20f5f689375820e Mon Sep 17 00:00:00 2001 From: chao <48119764+withchao@users.noreply.github.com> Date: Mon, 26 Aug 2024 18:45:44 +0800 Subject: [PATCH 2/2] fix: read seq is written to mongo, online status redis cluster is supported (#2558) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: GroupApplicationAcceptedNotification * fix: GroupApplicationAcceptedNotification * fix: NotificationUserInfoUpdate * cicd: robot automated Change * fix: component * fix: getConversationInfo * feat: cron task * feat: cron task * feat: cron task * feat: cron task * feat: cron task * fix: minio config url recognition error * update gomake version * update gomake version * fix: seq conversion bug * fix: redis pipe exec * fix: ImportFriends * fix: A large number of logs keysAndValues ​​length is not even * feat: mark read aggregate write * feat: online status supports redis cluster * feat: online status supports redis cluster * feat: online status supports redis cluster * merge * merge * read seq is written to mongo * read seq is written to mongo --------- Co-authored-by: withchao --- go.sum | 3 +- .../msgtransfer/online_history_msg_handler.go | 57 +++++++++++++++++++ .../online_msg_to_mongo_handler.go | 1 - internal/push/a_test.go | 29 ++++++++++ pkg/apistruct/msg_test.go | 1 + pkg/common/storage/cache/redis/online.go | 26 ++++++--- pkg/common/storage/cache/redis/online_test.go | 51 +++++++++++++++++ pkg/common/storage/cache/redis/seq_user.go | 24 ++++---- pkg/common/storage/cache/seq_user.go | 1 + pkg/common/storage/controller/msg.go | 5 ++ pkg/common/storage/database/mgo/seq_user.go | 7 +++ 11 files changed, 182 insertions(+), 23 deletions(-) create mode 100644 internal/push/a_test.go create mode 100644 pkg/apistruct/msg_test.go create mode 100644 pkg/common/storage/cache/redis/online_test.go diff --git a/go.sum b/go.sum index 242b44823..815b6badf 100644 --- a/go.sum +++ b/go.sum @@ -319,8 +319,7 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.8 h1:MhxSsdxXx2ZaeSLQk4uFftsB5L2rPh1Qup+dURQNzXQ= -github.com/openimsdk/protocol v0.0.72-alpha.8/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.72-alpha.9 h1:Dyx4vs88IU4rJ2YcP/TdYp4ww8JjsMkV89hB/Eazx+A= github.com/openimsdk/protocol v0.0.72-alpha.9/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.49-alpha.55 h1:KPgC53oqiwZYssLKljhtXbWXifMlTj2SSQEusj4Uf4k= github.com/openimsdk/tools v0.0.49-alpha.55/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 77161202c..8c1978d48 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -16,6 +16,7 @@ package msgtransfer import ( "context" + "encoding/json" "errors" "github.com/IBM/sarama" "github.com/go-redis/redis" @@ -89,6 +90,7 @@ func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database cont och.conversationRpcClient = conversationRpcClient och.groupRpcClient = groupRpcClient och.historyConsumerGroup = historyConsumerGroup + return &och, err } func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID int, val *batcher.Msg[sarama.ConsumerMessage]) { @@ -97,6 +99,7 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID ctx = withAggregationCtx(ctx, ctxMessages) log.ZInfo(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMessages), "key", val.Key()) + och.doSetReadSeq(ctx, ctxMessages) storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList := och.categorizeMessageLists(ctxMessages) @@ -110,6 +113,60 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID och.handleNotification(ctx, val.Key(), conversationIDNotification, storageNotificationList, notStorageNotificationList) } +func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) { + type seqKey struct { + conversationID string + userID string + } + var readSeq map[seqKey]int64 + for _, msg := range msgs { + if msg.message.ContentType != constant.HasReadReceipt { + continue + } + var elem sdkws.NotificationElem + if err := json.Unmarshal(msg.message.Content, &elem); err != nil { + log.ZError(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg) + continue + } + var tips sdkws.MarkAsReadTips + if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil { + log.ZError(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg) + continue + } + if len(tips.Seqs) > 0 { + for _, seq := range tips.Seqs { + if tips.HasReadSeq < seq { + tips.HasReadSeq = seq + } + } + clear(tips.Seqs) + tips.Seqs = nil + } + if tips.HasReadSeq < 0 { + continue + } + if readSeq == nil { + readSeq = make(map[seqKey]int64) + } + key := seqKey{ + conversationID: tips.ConversationID, + userID: tips.MarkAsReadUserID, + } + if readSeq[key] > tips.HasReadSeq { + continue + } + readSeq[key] = tips.HasReadSeq + } + if readSeq == nil { + return + } + for key, seq := range readSeq { + if err := och.msgDatabase.SetHasReadSeqToDB(ctx, key.userID, key.conversationID, seq); err != nil { + log.ZError(ctx, "set read seq to db error", err, "userID", key.userID, "conversationID", key.conversationID, "seq", seq) + } + } +} + func (och *OnlineHistoryRedisConsumerHandler) parseConsumerMessages(ctx context.Context, consumerMessages []*sarama.ConsumerMessage) []*ContextMsg { var ctxMessages []*ContextMsg for i := 0; i < len(consumerMessages); i++ { diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index a72fb4792..cea47fcd5 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -16,7 +16,6 @@ package msgtransfer import ( "context" - "github.com/IBM/sarama" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" diff --git a/internal/push/a_test.go b/internal/push/a_test.go new file mode 100644 index 000000000..8b2d86407 --- /dev/null +++ b/internal/push/a_test.go @@ -0,0 +1,29 @@ +package push + +import ( + "github.com/openimsdk/protocol/sdkws" + "testing" +) + +func TestName(t *testing.T) { + var c ConsumerHandler + c.readCh = make(chan *sdkws.MarkAsReadTips) + + go c.loopRead() + + go func() { + for i := 0; ; i++ { + seq := int64(i + 1) + if seq%3 == 0 { + seq = 1 + } + c.readCh <- &sdkws.MarkAsReadTips{ + ConversationID: "c100", + MarkAsReadUserID: "u100", + HasReadSeq: seq, + } + } + }() + + select {} +} diff --git a/pkg/apistruct/msg_test.go b/pkg/apistruct/msg_test.go new file mode 100644 index 000000000..28f878a9f --- /dev/null +++ b/pkg/apistruct/msg_test.go @@ -0,0 +1 @@ +package apistruct diff --git a/pkg/common/storage/cache/redis/online.go b/pkg/common/storage/cache/redis/online.go index a012e1cd2..ee1db7e23 100644 --- a/pkg/common/storage/cache/redis/online.go +++ b/pkg/common/storage/cache/redis/online.go @@ -8,6 +8,7 @@ import ( "github.com/openimsdk/tools/log" "github.com/redis/go-redis/v9" "strconv" + "strings" "time" ) @@ -66,11 +67,10 @@ func (s *userOnline) SetUserOnline(ctx context.Context, userID string, online, o local change = (num1 ~= num2) or (num2 ~= num3) if change then local members = redis.call("ZRANGE", key, 0, -1) - table.insert(members, KEYS[2]) - redis.call("PUBLISH", KEYS[3], table.concat(members, ":")) - return 1 + table.insert(members, "1") + return members else - return 0 + return {"0"} end ` now := time.Now() @@ -82,12 +82,24 @@ func (s *userOnline) SetUserOnline(ctx context.Context, userID string, online, o for _, platformID := range online { argv = append(argv, platformID) } - keys := []string{s.getUserOnlineKey(userID), userID, s.channelName} - status, err := s.rdb.Eval(ctx, script, keys, argv).Result() + keys := []string{s.getUserOnlineKey(userID)} + platformIDs, err := s.rdb.Eval(ctx, script, keys, argv).StringSlice() if err != nil { log.ZError(ctx, "redis SetUserOnline", err, "userID", userID, "online", online, "offline", offline) return err } - log.ZDebug(ctx, "redis SetUserOnline", "userID", userID, "online", online, "offline", offline, "status", status) + if len(platformIDs) == 0 { + return errs.ErrInternalServer.WrapMsg("SetUserOnline redis lua invalid return value") + } + if platformIDs[len(platformIDs)-1] != "0" { + log.ZDebug(ctx, "redis SetUserOnline push", "userID", userID, "online", online, "offline", offline, "platformIDs", platformIDs[:len(platformIDs)-1]) + platformIDs[len(platformIDs)-1] = userID + msg := strings.Join(platformIDs, ":") + if err := s.rdb.Publish(ctx, s.channelName, msg).Err(); err != nil { + return errs.Wrap(err) + } + } else { + log.ZDebug(ctx, "redis SetUserOnline not push", "userID", userID, "online", online, "offline", offline) + } return nil } diff --git a/pkg/common/storage/cache/redis/online_test.go b/pkg/common/storage/cache/redis/online_test.go new file mode 100644 index 000000000..0306f6f5d --- /dev/null +++ b/pkg/common/storage/cache/redis/online_test.go @@ -0,0 +1,51 @@ +package redis + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/tools/db/redisutil" + "testing" + "time" +) + +/* +address: [ 172.16.8.48:7001, 172.16.8.48:7002, 172.16.8.48:7003, 172.16.8.48:7004, 172.16.8.48:7005, 172.16.8.48:7006 ] +username: +password: passwd123 +clusterMode: true +db: 0 +maxRetry: 10 +*/ +func TestName111111(t *testing.T) { + conf := config.Redis{ + Address: []string{ + "172.16.8.124:7001", + "172.16.8.124:7002", + "172.16.8.124:7003", + "172.16.8.124:7004", + "172.16.8.124:7005", + "172.16.8.124:7006", + }, + ClusterMode: true, + Password: "passwd123", + //Address: []string{"localhost:16379"}, + //Password: "openIM123", + } + ctx, cancel := context.WithTimeout(context.Background(), time.Second*1000) + defer cancel() + rdb, err := redisutil.NewRedisClient(ctx, conf.Build()) + if err != nil { + panic(err) + } + online := NewUserOnline(rdb) + + userID := "a123456" + t.Log(online.GetOnline(ctx, userID)) + t.Log(online.SetUserOnline(ctx, userID, []int32{1, 2, 3, 4}, nil)) + t.Log(online.GetOnline(ctx, userID)) + +} + +func TestName111(t *testing.T) { + +} diff --git a/pkg/common/storage/cache/redis/seq_user.go b/pkg/common/storage/cache/redis/seq_user.go index edbc66b21..0cedfeee1 100644 --- a/pkg/common/storage/cache/redis/seq_user.go +++ b/pkg/common/storage/cache/redis/seq_user.go @@ -74,17 +74,22 @@ func (s *seqUserCacheRedis) GetUserReadSeq(ctx context.Context, conversationID s } func (s *seqUserCacheRedis) SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error { - if seq%s.readSeqWriteRatio == 0 { - if err := s.mgo.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil { - return err - } + dbSeq, err := s.GetUserReadSeq(ctx, conversationID, userID) + if err != nil { + return err } - if err := s.rocks.RawSet(ctx, s.getSeqUserReadSeqKey(conversationID, userID), strconv.Itoa(int(seq)), s.readExpireTime); err != nil { - return errs.Wrap(err) + if dbSeq < seq { + if err := s.rocks.RawSet(ctx, s.getSeqUserReadSeqKey(conversationID, userID), strconv.Itoa(int(seq)), s.readExpireTime); err != nil { + return errs.Wrap(err) + } } return nil } +func (s *seqUserCacheRedis) SetUserReadSeqToDB(ctx context.Context, conversationID string, userID string, seq int64) error { + return s.mgo.SetUserReadSeq(ctx, conversationID, userID, seq) +} + func (s *seqUserCacheRedis) SetUserMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error { keys := make([]string, 0, len(seqs)) for conversationID, seq := range seqs { @@ -128,13 +133,6 @@ func (s *seqUserCacheRedis) SetUserReadSeqs(ctx context.Context, userID string, if err := s.setUserRedisReadSeqs(ctx, userID, seqs); err != nil { return err } - for conversationID, seq := range seqs { - if seq%s.readSeqWriteRatio == 0 { - if err := s.mgo.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil { - return err - } - } - } return nil } diff --git a/pkg/common/storage/cache/seq_user.go b/pkg/common/storage/cache/seq_user.go index 61dbc0ab4..cef414e16 100644 --- a/pkg/common/storage/cache/seq_user.go +++ b/pkg/common/storage/cache/seq_user.go @@ -9,6 +9,7 @@ type SeqUser interface { SetUserMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error GetUserReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error + SetUserReadSeqToDB(ctx context.Context, conversationID string, userID string, seq int64) error SetUserMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error SetUserReadSeqs(ctx context.Context, userID string, seqs map[string]int64) error GetUserReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index 8eb417f93..7f884165d 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -77,6 +77,7 @@ type CommonMsgDatabase interface { SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error + SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error @@ -808,6 +809,10 @@ func (db *commonMsgDatabase) SetHasReadSeq(ctx context.Context, userID string, c return db.seqUser.SetUserReadSeq(ctx, conversationID, userID, hasReadSeq) } +func (db *commonMsgDatabase) SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error { + return db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, hasReadSeq) +} + func (db *commonMsgDatabase) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) { return db.seqUser.GetUserReadSeqs(ctx, userID, conversationIDs) } diff --git a/pkg/common/storage/database/mgo/seq_user.go b/pkg/common/storage/database/mgo/seq_user.go index 9faad416a..244de3000 100644 --- a/pkg/common/storage/database/mgo/seq_user.go +++ b/pkg/common/storage/database/mgo/seq_user.go @@ -115,5 +115,12 @@ func (s *seqUserMongo) GetUserReadSeqs(ctx context.Context, userID string, conve } func (s *seqUserMongo) SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error { + dbSeq, err := s.GetUserReadSeq(ctx, conversationID, userID) + if err != nil { + return err + } + if dbSeq > seq { + return nil + } return s.setSeq(ctx, conversationID, userID, seq, "read_seq") }