From 222d061dcd7d93d646052c27228748a607c3f529 Mon Sep 17 00:00:00 2001
From: WangchuXiao <wangchuxiao97@outlook.com>
Date: Wed, 19 Jul 2023 10:47:28 +0800
Subject: [PATCH] fix: msg destruct not effect and notification self 2 self
 push twice (#596)

* new feature: add batch send msg

* new feature: add batch send msg

* new feature: add batch send msg

* new feature: add batch send msg

* new feature: add batch send msg

* new feature: add batch send msg

* fix bug: multiple gateway kick user

* fix bug: multiple gateway kick user

* fix bug: multiple gateway kick user

* fix bug: multiple gateway kick user

* fix bug: multiple gateway kick user

* MsgDestructTime

* fix bug: msg destruct sql

* fix bug: msg destruct

* fix bug: msg destruct

* fix bug: msg destruct sql

* fix bug: msg destruct sql

* fix bug: msg destruct sql

* fix bug: msg destruct sql

* debug: print stack

* debug: print stack

* debug: print stack

* fix bug: msg destruct sql

Signed-off-by: wangchuxiao <wangchuxiao97@outlook.com>

* fix bug: msg notification self 2 self push twice

* fix bug: heartbeat get self notification

---------

Signed-off-by: wangchuxiao <wangchuxiao97@outlook.com>
---
 internal/push/push_handler.go                 |  8 +++++-
 internal/rpc/msg/server.go                    |  2 --
 internal/rpc/msg/sync_msg.go                  |  1 +
 internal/tools/conversation.go                | 10 +++++---
 internal/tools/cron_task.go                   | 10 ++++----
 pkg/common/db/controller/msg.go               | 25 ++++++++++++++-----
 pkg/common/db/relation/conversation_model.go  |  2 +-
 .../common/locker/message_locker.go           |  2 +-
 pkg/rpcclient/msg.go                          |  7 +++---
 pkg/rpcclient/notification/msg.go             |  2 +-
 pkg/utils/utils.go                            |  4 +++
 11 files changed, 49 insertions(+), 24 deletions(-)
 rename internal/rpc/msg/lock.go => pkg/common/locker/message_locker.go (99%)

diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go
index d8c04ccb0..0822c3505 100644
--- a/internal/push/push_handler.go
+++ b/internal/push/push_handler.go
@@ -65,7 +65,13 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
 	case constant.SuperGroupChatType:
 		err = c.pusher.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData)
 	default:
-		err = c.pusher.Push2User(ctx, []string{pbData.MsgData.SendID, pbData.MsgData.RecvID}, pbData.MsgData)
+		var pushUserIDs []string
+		if pbData.MsgData.SendID != pbData.MsgData.RecvID {
+			pushUserIDs = []string{pbData.MsgData.SendID, pbData.MsgData.RecvID}
+		} else {
+			pushUserIDs = []string{pbData.MsgData.SendID}
+		}
+		err = c.pusher.Push2User(ctx, pushUserIDs, pbData.MsgData)
 	}
 	if err != nil {
 		if err == errNoOfflinePusher {
diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go
index 5e2cf925f..e88d55bd1 100644
--- a/internal/rpc/msg/server.go
+++ b/internal/rpc/msg/server.go
@@ -42,7 +42,6 @@ type (
 		friend                 *rpcclient.FriendRpcClient
 		GroupLocalCache        *localcache.GroupLocalCache
 		ConversationLocalCache *localcache.ConversationLocalCache
-		MessageLocker          MessageLocker
 		Handlers               MessageInterceptorChain
 		notificationSender     *rpcclient.NotificationSender
 	}
@@ -91,7 +90,6 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
 		GroupLocalCache:        localcache.NewGroupLocalCache(&groupRpcClient),
 		ConversationLocalCache: localcache.NewConversationLocalCache(&conversationClient),
 		friend:                 &friendRpcClient,
-		MessageLocker:          NewLockerMessage(cacheModel),
 	}
 	s.notificationSender = rpcclient.NewNotificationSender(rpcclient.WithLocalSendMsg(s.SendMsg))
 	s.addInterceptorHandler(MessageHasReadEnabled)
diff --git a/internal/rpc/msg/sync_msg.go b/internal/rpc/msg/sync_msg.go
index 70967a1ba..e05bd5644 100644
--- a/internal/rpc/msg/sync_msg.go
+++ b/internal/rpc/msg/sync_msg.go
@@ -95,6 +95,7 @@ func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sd
 	for _, conversationID := range conversationIDs {
 		conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID))
 	}
+	conversationIDs = append(conversationIDs, utils.GetSelfNotificationConversationID(req.UserID))
 	log.ZDebug(ctx, "GetMaxSeq", "conversationIDs", conversationIDs)
 	maxSeqs, err := m.MsgDatabase.GetMaxSeqs(ctx, conversationIDs)
 	if err != nil {
diff --git a/internal/tools/conversation.go b/internal/tools/conversation.go
index 837f7f90a..2651cf10e 100644
--- a/internal/tools/conversation.go
+++ b/internal/tools/conversation.go
@@ -33,6 +33,7 @@ func (c *MsgTool) ConversationsDestructMsgs() {
 	}
 	log.ZDebug(context.Background(), "nums conversations need destruct", "nums", len(conversations))
 	for _, conversation := range conversations {
+		ctx = mcontext.NewCtx(utils.GetSelfFuncName() + "-" + utils.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID)
 		log.ZDebug(
 			ctx,
 			"UserMsgsDestruct",
@@ -45,16 +46,17 @@ func (c *MsgTool) ConversationsDestructMsgs() {
 			"lastMsgDestructTime",
 			conversation.LatestMsgDestructTime,
 		)
+		now := time.Now()
 		seqs, err := c.msgDatabase.UserMsgsDestruct(ctx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime)
 		if err != nil {
 			log.ZError(ctx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
 			continue
 		}
-		if err := c.conversationDatabase.UpdateUsersConversationFiled(ctx, []string{conversation.OwnerUserID}, conversation.ConversationID, map[string]interface{}{"latest_msg_destruct_time": time.Now()}); err != nil {
-			log.ZError(ctx, "updateUsersConversationFiled failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
-			continue
-		}
 		if len(seqs) > 0 {
+			if err := c.conversationDatabase.UpdateUsersConversationFiled(ctx, []string{conversation.OwnerUserID}, conversation.ConversationID, map[string]interface{}{"latest_msg_destruct_time": now}); err != nil {
+				log.ZError(ctx, "updateUsersConversationFiled failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
+				continue
+			}
 			if err := c.msgNotificationSender.UserDeleteMsgsNotification(ctx, conversation.OwnerUserID, conversation.ConversationID, seqs); err != nil {
 				log.ZError(ctx, "userDeleteMsgsNotification failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
 			}
diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go
index 82343157b..5e4183615 100644
--- a/internal/tools/cron_task.go
+++ b/internal/tools/cron_task.go
@@ -41,11 +41,11 @@ func StartCronTask() error {
 		panic(err)
 	}
 	log.ZInfo(context.Background(), "start msgDestruct cron task", "cron config", config.Config.MsgDestructTime)
-	// _, err = c.AddFunc(config.Config.MsgDestructTime, msgTool.ConversationsDestructMsgs)
-	// if err != nil {
-	// 	fmt.Println("start conversationsDestructMsgs cron failed", err.Error(), config.Config.ChatRecordsClearTime)
-	// 	panic(err)
-	// }
+	_, err = c.AddFunc(config.Config.MsgDestructTime, msgTool.ConversationsDestructMsgs)
+	if err != nil {
+		fmt.Println("start conversationsDestructMsgs cron failed", err.Error(), config.Config.ChatRecordsClearTime)
+		panic(err)
+	}
 	c.Start()
 	wg.Wait()
 	return nil
diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go
index 37a71bafa..34e895765 100644
--- a/pkg/common/db/controller/msg.go
+++ b/pkg/common/db/controller/msg.go
@@ -63,7 +63,7 @@ type CommonMsgDatabase interface {
 	// 删除会话消息重置最小seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
 	DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error
 	// 用户标记删除过期消息返回标记删除的seq列表
-	UserMsgsDestruct(cte context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error)
+	UserMsgsDestruct(ctx context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error)
 
 	// 用户根据seq删除消息
 	DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error
@@ -641,7 +641,7 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string
 		if err != nil || msgDocModel.DocID == "" {
 			if err != nil {
 				if err == unrelation.ErrMsgListNotExist {
-					log.ZDebug(ctx, "deleteMsgRecursion finished", "conversationID", conversationID, "userID", userID, "index", index)
+					log.ZDebug(ctx, "not doc find", "conversationID", conversationID, "userID", userID, "index", index)
 				} else {
 					log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index)
 				}
@@ -652,25 +652,38 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string
 		index++
 		//&& msgDocModel.Msg[0].Msg.SendTime > lastMsgDestructTime.UnixMilli()
 		if len(msgDocModel.Msg) > 0 {
+			i := 0
+			var over bool
 			for _, msg := range msgDocModel.Msg {
+				i++
 				if msg != nil && msg.Msg != nil && msg.Msg.SendTime+destructTime*1000 <= time.Now().UnixMilli() {
-					if msg.Msg.SendTime > lastMsgDestructTime.UnixMilli() && !utils.Contain(userID, msg.DelList...) {
+					if msg.Msg.SendTime+destructTime*1000 > lastMsgDestructTime.UnixMilli() && !utils.Contain(userID, msg.DelList...) {
 						seqs = append(seqs, msg.Msg.Seq)
 					}
 				} else {
-					log.ZDebug(ctx, "deleteMsgRecursion finished", "conversationID", conversationID, "userID", userID, "index", index)
+					log.ZDebug(ctx, "all msg need destruct is found", "conversationID", conversationID, "userID", userID, "index", index, "stop index", i)
+					over = true
 					break
 				}
 			}
+			if over {
+				break
+			}
 		}
 	}
 
 	log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversationID, "userID", userID, "seqs", seqs)
 	if len(seqs) > 0 {
-		latestSeq := seqs[len(seqs)-1]
-		if err := db.cache.SetConversationUserMinSeq(ctx, conversationID, userID, latestSeq); err != nil {
+		userMinSeq := seqs[len(seqs)-1] + 1
+		currentUserMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID)
+		if err != nil && errs.Unwrap(err) != redis.Nil {
 			return nil, err
 		}
+		if currentUserMinSeq < userMinSeq {
+			if err := db.cache.SetConversationUserMinSeq(ctx, conversationID, userID, userMinSeq); err != nil {
+				return nil, err
+			}
+		}
 	}
 	return seqs, nil
 }
diff --git a/pkg/common/db/relation/conversation_model.go b/pkg/common/db/relation/conversation_model.go
index 10b82ef7c..fb88b8305 100644
--- a/pkg/common/db/relation/conversation_model.go
+++ b/pkg/common/db/relation/conversation_model.go
@@ -207,7 +207,7 @@ func (c *ConversationGorm) GetConversationIDsNeedDestruct(
 ) (conversations []*relation.ConversationModel, err error) {
 	return conversations, utils.Wrap(
 		c.db(ctx).
-			Where("is_msg_destruct = 1 && UNIX_TIMESTAMP(NOW()) > (msg_destruct_time + UNIX_TIMESTAMP(latest_msg_destruct_time)) && msg_destruct_time != 0").
+			Where("is_msg_destruct = 1 && msg_destruct_time != 0 && (UNIX_TIMESTAMP(NOW()) > (msg_destruct_time + UNIX_TIMESTAMP(latest_msg_destruct_time)) || latest_msg_destruct_time is NULL)").
 			Find(&conversations).
 			Error,
 		"",
diff --git a/internal/rpc/msg/lock.go b/pkg/common/locker/message_locker.go
similarity index 99%
rename from internal/rpc/msg/lock.go
rename to pkg/common/locker/message_locker.go
index 154a5b181..2631f19b1 100644
--- a/internal/rpc/msg/lock.go
+++ b/pkg/common/locker/message_locker.go
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package msg
+package locker
 
 import (
 	"context"
diff --git a/pkg/rpcclient/msg.go b/pkg/rpcclient/msg.go
index 1fd6846d1..a012a5a5f 100644
--- a/pkg/rpcclient/msg.go
+++ b/pkg/rpcclient/msg.go
@@ -71,8 +71,9 @@ func newContentTypeConf() map[int32]config.NotificationConf {
 		constant.ConversationUnreadNotification:      config.Config.Notification.ConversationChanged,
 		constant.ConversationPrivateChatNotification: config.Config.Notification.ConversationSetPrivate,
 		// msg
-		constant.MsgRevokeNotification: {IsSendMsg: false, ReliabilityLevel: constant.ReliableNotificationNoMsg},
-		constant.HasReadReceipt:        {IsSendMsg: false, ReliabilityLevel: constant.ReliableNotificationNoMsg},
+		constant.MsgRevokeNotification:  {IsSendMsg: false, ReliabilityLevel: constant.ReliableNotificationNoMsg},
+		constant.HasReadReceipt:         {IsSendMsg: false, ReliabilityLevel: constant.ReliableNotificationNoMsg},
+		constant.DeleteMsgsNotification: {IsSendMsg: false, ReliabilityLevel: constant.ReliableNotificationNoMsg},
 	}
 }
 
@@ -116,7 +117,7 @@ func newSessionTypeConf() map[int32]int32 {
 		constant.ConversationUnreadNotification:      constant.SingleChatType,
 		constant.ConversationPrivateChatNotification: constant.SingleChatType,
 		// delete
-		constant.MsgDeleteNotification: constant.SingleChatType,
+		constant.DeleteMsgsNotification: constant.SingleChatType,
 	}
 }
 
diff --git a/pkg/rpcclient/notification/msg.go b/pkg/rpcclient/notification/msg.go
index c475291d2..bcc6865b1 100644
--- a/pkg/rpcclient/notification/msg.go
+++ b/pkg/rpcclient/notification/msg.go
@@ -36,7 +36,7 @@ func (m *MsgNotificationSender) UserDeleteMsgsNotification(ctx context.Context,
 		ConversationID: conversationID,
 		Seqs:           seqs,
 	}
-	return m.Notification(ctx, userID, userID, constant.MsgDeleteNotification, &tips)
+	return m.Notification(ctx, userID, userID, constant.DeleteMsgsNotification, &tips)
 }
 
 func (m *MsgNotificationSender) MarkAsReadNotification(ctx context.Context, conversationID string, sesstionType int32, sendID, recvID string, seqs []int64, hasReadSeq int64) error {
diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go
index 418e65d34..2662e0a61 100644
--- a/pkg/utils/utils.go
+++ b/pkg/utils/utils.go
@@ -367,6 +367,10 @@ func GetNotificationConversationIDByConversationID(conversationID string) string
 	return ""
 }
 
+func GetSelfNotificationConversationID(userID string) string {
+	return "n_" + userID + "_" + userID
+}
+
 func GetSeqsBeginEnd(seqs []int64) (int64, int64) {
 	if len(seqs) == 0 {
 		return 0, 0