From 1f7dfa33d7c30f0a8ffd9a5d787aef0d4fcec267 Mon Sep 17 00:00:00 2001 From: Xinwei Xiong <3293172751@qq.com> Date: Sun, 26 Nov 2023 11:19:12 +0800 Subject: [PATCH 1/5] Update README.md (#1477) --- deployments/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/deployments/README.md b/deployments/README.md index 0f73a553e..b24babb31 100644 --- a/deployments/README.md +++ b/deployments/README.md @@ -84,8 +84,8 @@ $ sudo sealos run labring/kubernetes:v1.25.0 labring/helm:v3.8.2 labring/calico: If you are local, you can also use Kind and Minikube to test, for example, using Kind: ```bash -$ sGO111MODULE="on" go get sigs.k8s.io/kind@v0.11.1 -$ skind create cluster +$ GO111MODULE="on" go get sigs.k8s.io/kind@v0.11.1 +$ kind create cluster ``` ### Installing helm @@ -174,4 +174,4 @@ cp ../config/notification.yaml ./charts/generated-configs/notification.yaml ```bash helmfile apply -``` \ No newline at end of file +``` From 403cfb60559a5332b0c5aa2e5b65fa8d9a542019 Mon Sep 17 00:00:00 2001 From: "fengyun.rui" Date: Sun, 26 Nov 2023 20:16:02 +0800 Subject: [PATCH 2/5] perf: redis block with keys command (#1423) Signed-off-by: rfyiamcool --- pkg/common/db/cache/msg.go | 36 ++++++++++++++++++------- pkg/common/db/cache/msg_test.go | 47 +++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 10 deletions(-) diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index 6d0ee8c67..282d1d1c1 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -645,19 +645,35 @@ func (c *msgCache) PipeDeleteMessages(ctx context.Context, conversationID string } func (c *msgCache) CleanUpOneConversationAllMsg(ctx context.Context, conversationID string) error { - vals, err := c.rdb.Keys(ctx, c.allMessageCacheKey(conversationID)).Result() - if errors.Is(err, redis.Nil) { - return nil - } - if err != nil { - return errs.Wrap(err) - } - for _, v := range vals { - if err := c.rdb.Del(ctx, v).Err(); err != nil { + var ( + cursor uint64 + keys []string + err error + + key = c.allMessageCacheKey(conversationID) + ) + + for { + // scan up to 10000 at a time, the count (10000) param refers to the number of scans on redis server. + // if the count is too small, needs to be run scan on redis frequently. + var limit int64 = 10000 + keys, cursor, err = c.rdb.Scan(ctx, cursor, key, limit).Result() + if err != nil { return errs.Wrap(err) } + + for _, key := range keys { + err := c.rdb.Del(ctx, key).Err() + if err != nil { + return errs.Wrap(err) + } + } + + // scan end + if cursor == 0 { + return nil + } } - return nil } func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64) error { diff --git a/pkg/common/db/cache/msg_test.go b/pkg/common/db/cache/msg_test.go index 3fddf5965..a5be018ed 100644 --- a/pkg/common/db/cache/msg_test.go +++ b/pkg/common/db/cache/msg_test.go @@ -385,3 +385,50 @@ func testParallelDeleteMessagesMix(t *testing.T, cid string, seqs []int64, input assert.EqualValues(t, 1, val) // exists } } + +func TestCleanUpOneConversationAllMsg(t *testing.T) { + rdb := redis.NewClient(&redis.Options{}) + defer rdb.Close() + + cacher := msgCache{rdb: rdb} + count := 1000 + prefix := fmt.Sprintf("%v", rand.Int63()) + + ids := []string{} + for i := 0; i < count; i++ { + id := fmt.Sprintf("%v-cid-%v", prefix, rand.Int63()) + ids = append(ids, id) + + key := cacher.allMessageCacheKey(id) + rdb.Set(context.Background(), key, "openim", 0) + } + + // delete 100 keys with scan. + for i := 0; i < 100; i++ { + pickedKey := ids[i] + err := cacher.CleanUpOneConversationAllMsg(context.Background(), pickedKey) + assert.Nil(t, err) + + ls, err := rdb.Keys(context.Background(), pickedKey).Result() + assert.Nil(t, err) + assert.Equal(t, 0, len(ls)) + + rcode, err := rdb.Exists(context.Background(), pickedKey).Result() + assert.Nil(t, err) + assert.EqualValues(t, 0, rcode) // non-exists + } + + sid := fmt.Sprintf("%v-cid-*", prefix) + ls, err := rdb.Keys(context.Background(), cacher.allMessageCacheKey(sid)).Result() + assert.Nil(t, err) + assert.Equal(t, count-100, len(ls)) + + // delete fuzzy matching keys. + err = cacher.CleanUpOneConversationAllMsg(context.Background(), sid) + assert.Nil(t, err) + + // don't contains keys matched `{prefix}-cid-{random}` on redis + ls, err = rdb.Keys(context.Background(), cacher.allMessageCacheKey(sid)).Result() + assert.Nil(t, err) + assert.Equal(t, 0, len(ls)) +} From e4046994cf07c27cf35581a624b360d53450a7a0 Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Sun, 26 Nov 2023 20:22:41 +0800 Subject: [PATCH 3/5] fix: update user's info will modify user create time when modify user's nickname or avatar. (#1446) --- pkg/common/convert/user.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/common/convert/user.go b/pkg/common/convert/user.go index abb3a2144..4ca1899be 100644 --- a/pkg/common/convert/user.go +++ b/pkg/common/convert/user.go @@ -15,8 +15,6 @@ package convert import ( - "time" - "github.com/OpenIMSDK/protocol/sdkws" relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" @@ -43,7 +41,6 @@ func UserPb2DB(user *sdkws.UserInfo) *relationtb.UserModel { userDB.Nickname = user.Nickname userDB.FaceURL = user.FaceURL userDB.Ex = user.Ex - userDB.CreateTime = time.UnixMilli(user.CreateTime) userDB.AppMangerLevel = user.AppMangerLevel userDB.GlobalRecvMsgOpt = user.GlobalRecvMsgOpt return &userDB From 3cecbbc69ac97f3ddea92bc9090b7af4ecb1d511 Mon Sep 17 00:00:00 2001 From: "fengyun.rui" Date: Sun, 26 Nov 2023 20:25:37 +0800 Subject: [PATCH 4/5] fix: grace shutdown for api server (#1439) * fix: add grace shutdown for api server Signed-off-by: rfyiamcool * fix: add grace shutdown for api server Signed-off-by: rfyiamcool --------- Signed-off-by: rfyiamcool --- cmd/openim-api/main.go | 45 +++++++++++++++++++++++++++++------------- 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/cmd/openim-api/main.go b/cmd/openim-api/main.go index 1375655ba..cb9b09802 100644 --- a/cmd/openim-api/main.go +++ b/cmd/openim-api/main.go @@ -18,11 +18,13 @@ import ( "context" "fmt" "net" + "net/http" _ "net/http/pprof" + "os" + "os/signal" "strconv" - - ginProm "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus" - "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" + "syscall" + "time" "github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/tools/discoveryregistry" @@ -33,6 +35,8 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" + ginProm "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" ) func main() { @@ -51,13 +55,12 @@ func run(port int, proPort int) error { if port == 0 || proPort == 0 { err := "port or proPort is empty:" + strconv.Itoa(port) + "," + strconv.Itoa(proPort) log.ZError(context.Background(), err, nil) - return fmt.Errorf(err) } + rdb, err := cache.NewRedis() if err != nil { log.ZError(context.Background(), "Failed to initialize Redis", err) - return err } log.ZInfo(context.Background(), "api start init discov client") @@ -68,30 +71,29 @@ func run(port int, proPort int) error { client, err = kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery) if err != nil { log.ZError(context.Background(), "Failed to initialize discovery register", err) - return err } + if err = client.CreateRpcRootNodes(config.Config.GetServiceNames()); err != nil { log.ZError(context.Background(), "Failed to create RPC root nodes", err) - return err } + log.ZInfo(context.Background(), "api register public config to discov") if err = client.RegisterConf2Registry(constant.OpenIMCommonConfigKey, config.Config.EncodeConfig()); err != nil { log.ZError(context.Background(), "Failed to register public config to discov", err) - return err } + log.ZInfo(context.Background(), "api register public config to discov success") router := api.NewGinRouter(client, rdb) - ////////////////////////////// if config.Config.Prometheus.Enable { p := ginProm.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api")) p.SetListenAddress(fmt.Sprintf(":%d", proPort)) p.Use(router) } - ///////////////////////////////// log.ZInfo(context.Background(), "api init router success") + var address string if config.Config.Api.ListenIP != "" { address = net.JoinHostPort(config.Config.Api.ListenIP, strconv.Itoa(port)) @@ -100,10 +102,25 @@ func run(port int, proPort int) error { } log.ZInfo(context.Background(), "start api server", "address", address, "OpenIM version", config.Version) - err = router.Run(address) - if err != nil { - log.ZError(context.Background(), "api run failed", err, "address", address) - + server := http.Server{Addr: address, Handler: router} + go func() { + err = server.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + log.ZError(context.Background(), "api run failed", err, "address", address) + os.Exit(1) + } + }() + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + <-sigs + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + // graceful shutdown operation. + if err := server.Shutdown(ctx); err != nil { + log.ZError(context.Background(), "failed to api-server shutdown", err) return err } From f935d36715ce90ffe1d8373384f2472cab360881 Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Sun, 26 Nov 2023 20:49:31 +0800 Subject: [PATCH 5/5] fix: wrong single message read state. (#1443) * fix: wrong single message read state. * Update as_read.go * Update as_read.go * Update as_read.go --------- Co-authored-by: Xinwei Xiong <3293172751@qq.com> --- internal/rpc/msg/as_read.go | 79 +++++++++++++++++-------------------- 1 file changed, 37 insertions(+), 42 deletions(-) diff --git a/internal/rpc/msg/as_read.go b/internal/rpc/msg/as_read.go index 6e3bbe987..3311fa5b7 100644 --- a/internal/rpc/msg/as_read.go +++ b/internal/rpc/msg/as_read.go @@ -17,9 +17,12 @@ package msg import ( "context" + utils2 "github.com/OpenIMSDK/tools/utils" + "github.com/redis/go-redis/v9" "github.com/OpenIMSDK/protocol/constant" + "github.com/OpenIMSDK/protocol/conversation" "github.com/OpenIMSDK/protocol/msg" "github.com/OpenIMSDK/protocol/sdkws" "github.com/OpenIMSDK/tools/errs" @@ -88,10 +91,7 @@ func (m *msgServer) SetConversationHasReadSeq( return &msg.SetConversationHasReadSeqResp{}, nil } -func (m *msgServer) MarkMsgsAsRead( - ctx context.Context, - req *msg.MarkMsgsAsReadReq, -) (resp *msg.MarkMsgsAsReadResp, err error) { +func (m *msgServer) MarkMsgsAsRead(ctx context.Context, req *msg.MarkMsgsAsReadReq) (resp *msg.MarkMsgsAsReadResp, err error) { if len(req.Seqs) < 1 { return nil, errs.ErrArgs.Wrap("seqs must not be empty") } @@ -127,10 +127,7 @@ func (m *msgServer) MarkMsgsAsRead( return &msg.MarkMsgsAsReadResp{}, nil } -func (m *msgServer) MarkConversationAsRead( - ctx context.Context, - req *msg.MarkConversationAsReadReq, -) (resp *msg.MarkConversationAsReadResp, err error) { +func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkConversationAsReadReq) (resp *msg.MarkConversationAsReadResp, err error) { conversation, err := m.Conversation.GetConversation(ctx, req.UserID, req.ConversationID) if err != nil { return nil, err @@ -139,49 +136,47 @@ func (m *msgServer) MarkConversationAsRead( if err != nil && errs.Unwrap(err) != redis.Nil { return nil, err } - var seqs []int64 + seqs := generateSeqs(hasReadSeq, req) - log.ZDebug(ctx, "MarkConversationAsRead", "hasReadSeq", hasReadSeq, - "req.HasReadSeq", req.HasReadSeq) - if conversation.ConversationType == constant.SingleChatType { - for i := hasReadSeq + 1; i <= req.HasReadSeq; i++ { - seqs = append(seqs, i) + if len(seqs) > 0 || req.HasReadSeq > hasReadSeq { + err = m.updateReadStatus(ctx, req, conversation, seqs, hasReadSeq) + if err != nil { + return nil, err } + } + return &msg.MarkConversationAsReadResp{}, nil +} - if len(seqs) > 0 { - log.ZDebug(ctx, "MarkConversationAsRead", "seqs", seqs, "conversationID", req.ConversationID) - if err = m.MsgDatabase.MarkSingleChatMsgsAsRead(ctx, req.UserID, req.ConversationID, seqs); err != nil { - return nil, err - } - } - if req.HasReadSeq > hasReadSeq { - err = m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq) - if err != nil { - return nil, err - } - hasReadSeq = req.HasReadSeq - } - if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID, - m.conversationAndGetRecvID(conversation, req.UserID), seqs, hasReadSeq); err != nil { - return nil, err +func generateSeqs(hasReadSeq int64, req *msg.MarkConversationAsReadReq) []int64 { + var seqs []int64 + for _, val := range req.Seqs { + if val > hasReadSeq && !utils2.Contain(val, seqs...) { + seqs = append(seqs, val) } - } else if conversation.ConversationType == constant.SuperGroupChatType || - conversation.ConversationType == constant.NotificationChatType { - if req.HasReadSeq > hasReadSeq { - err = m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq) - if err != nil { - return nil, err - } - hasReadSeq = req.HasReadSeq + } + return seqs +} + +func (m *msgServer) updateReadStatus(ctx context.Context, req *msg.MarkConversationAsReadReq, conversation *conversation.Conversation, seqs []int64, hasReadSeq int64) error { + if conversation.ConversationType == constant.SingleChatType && len(seqs) > 0 { + log.ZDebug(ctx, "MarkConversationAsRead", "seqs", seqs, "conversationID", req.ConversationID) + if err := m.MsgDatabase.MarkSingleChatMsgsAsRead(ctx, req.UserID, req.ConversationID, seqs); err != nil { + return err } - if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID, - req.UserID, seqs, hasReadSeq); err != nil { - return nil, err + } + + if req.HasReadSeq > hasReadSeq { + if err := m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq); err != nil { + return err } + } + recvID := m.conversationAndGetRecvID(conversation, req.UserID) + if conversation.ConversationType == constant.SuperGroupChatType || conversation.ConversationType == constant.NotificationChatType { + recvID = req.UserID } - return &msg.MarkConversationAsReadResp{}, nil + return m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID, recvID, seqs, req.HasReadSeq) } func (m *msgServer) sendMarkAsReadNotification(