fix: msg destruct not effect and notification self 2 self push twice ()

* new feature: add batch send msg

* new feature: add batch send msg

* new feature: add batch send msg

* new feature: add batch send msg

* new feature: add batch send msg

* new feature: add batch send msg

* fix bug: multiple gateway kick user

* fix bug: multiple gateway kick user

* fix bug: multiple gateway kick user

* fix bug: multiple gateway kick user

* fix bug: multiple gateway kick user

* MsgDestructTime

* fix bug: msg destruct sql

* fix bug: msg destruct

* fix bug: msg destruct

* fix bug: msg destruct sql

* fix bug: msg destruct sql

* fix bug: msg destruct sql

* fix bug: msg destruct sql

* debug: print stack

* debug: print stack

* debug: print stack

* fix bug: msg destruct sql

Signed-off-by: wangchuxiao <wangchuxiao97@outlook.com>

* fix bug: msg notification self 2 self push twice

* fix bug: heartbeat get self notification

---------

Signed-off-by: wangchuxiao <wangchuxiao97@outlook.com>
pull/602/head
WangchuXiao 2 years ago committed by GitHub
parent 5cb5de4970
commit 222d061dcd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -65,7 +65,13 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
case constant.SuperGroupChatType: case constant.SuperGroupChatType:
err = c.pusher.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData) err = c.pusher.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData)
default: default:
err = c.pusher.Push2User(ctx, []string{pbData.MsgData.SendID, pbData.MsgData.RecvID}, pbData.MsgData) var pushUserIDs []string
if pbData.MsgData.SendID != pbData.MsgData.RecvID {
pushUserIDs = []string{pbData.MsgData.SendID, pbData.MsgData.RecvID}
} else {
pushUserIDs = []string{pbData.MsgData.SendID}
}
err = c.pusher.Push2User(ctx, pushUserIDs, pbData.MsgData)
} }
if err != nil { if err != nil {
if err == errNoOfflinePusher { if err == errNoOfflinePusher {

@ -42,7 +42,6 @@ type (
friend *rpcclient.FriendRpcClient friend *rpcclient.FriendRpcClient
GroupLocalCache *localcache.GroupLocalCache GroupLocalCache *localcache.GroupLocalCache
ConversationLocalCache *localcache.ConversationLocalCache ConversationLocalCache *localcache.ConversationLocalCache
MessageLocker MessageLocker
Handlers MessageInterceptorChain Handlers MessageInterceptorChain
notificationSender *rpcclient.NotificationSender notificationSender *rpcclient.NotificationSender
} }
@ -91,7 +90,6 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
GroupLocalCache: localcache.NewGroupLocalCache(&groupRpcClient), GroupLocalCache: localcache.NewGroupLocalCache(&groupRpcClient),
ConversationLocalCache: localcache.NewConversationLocalCache(&conversationClient), ConversationLocalCache: localcache.NewConversationLocalCache(&conversationClient),
friend: &friendRpcClient, friend: &friendRpcClient,
MessageLocker: NewLockerMessage(cacheModel),
} }
s.notificationSender = rpcclient.NewNotificationSender(rpcclient.WithLocalSendMsg(s.SendMsg)) s.notificationSender = rpcclient.NewNotificationSender(rpcclient.WithLocalSendMsg(s.SendMsg))
s.addInterceptorHandler(MessageHasReadEnabled) s.addInterceptorHandler(MessageHasReadEnabled)

@ -95,6 +95,7 @@ func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sd
for _, conversationID := range conversationIDs { for _, conversationID := range conversationIDs {
conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID)) conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID))
} }
conversationIDs = append(conversationIDs, utils.GetSelfNotificationConversationID(req.UserID))
log.ZDebug(ctx, "GetMaxSeq", "conversationIDs", conversationIDs) log.ZDebug(ctx, "GetMaxSeq", "conversationIDs", conversationIDs)
maxSeqs, err := m.MsgDatabase.GetMaxSeqs(ctx, conversationIDs) maxSeqs, err := m.MsgDatabase.GetMaxSeqs(ctx, conversationIDs)
if err != nil { if err != nil {

@ -33,6 +33,7 @@ func (c *MsgTool) ConversationsDestructMsgs() {
} }
log.ZDebug(context.Background(), "nums conversations need destruct", "nums", len(conversations)) log.ZDebug(context.Background(), "nums conversations need destruct", "nums", len(conversations))
for _, conversation := range conversations { for _, conversation := range conversations {
ctx = mcontext.NewCtx(utils.GetSelfFuncName() + "-" + utils.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID)
log.ZDebug( log.ZDebug(
ctx, ctx,
"UserMsgsDestruct", "UserMsgsDestruct",
@ -45,16 +46,17 @@ func (c *MsgTool) ConversationsDestructMsgs() {
"lastMsgDestructTime", "lastMsgDestructTime",
conversation.LatestMsgDestructTime, conversation.LatestMsgDestructTime,
) )
now := time.Now()
seqs, err := c.msgDatabase.UserMsgsDestruct(ctx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime) seqs, err := c.msgDatabase.UserMsgsDestruct(ctx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime)
if err != nil { if err != nil {
log.ZError(ctx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) log.ZError(ctx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
continue continue
} }
if err := c.conversationDatabase.UpdateUsersConversationFiled(ctx, []string{conversation.OwnerUserID}, conversation.ConversationID, map[string]interface{}{"latest_msg_destruct_time": time.Now()}); err != nil {
log.ZError(ctx, "updateUsersConversationFiled failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
continue
}
if len(seqs) > 0 { if len(seqs) > 0 {
if err := c.conversationDatabase.UpdateUsersConversationFiled(ctx, []string{conversation.OwnerUserID}, conversation.ConversationID, map[string]interface{}{"latest_msg_destruct_time": now}); err != nil {
log.ZError(ctx, "updateUsersConversationFiled failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
continue
}
if err := c.msgNotificationSender.UserDeleteMsgsNotification(ctx, conversation.OwnerUserID, conversation.ConversationID, seqs); err != nil { if err := c.msgNotificationSender.UserDeleteMsgsNotification(ctx, conversation.OwnerUserID, conversation.ConversationID, seqs); err != nil {
log.ZError(ctx, "userDeleteMsgsNotification failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) log.ZError(ctx, "userDeleteMsgsNotification failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
} }

@ -41,11 +41,11 @@ func StartCronTask() error {
panic(err) panic(err)
} }
log.ZInfo(context.Background(), "start msgDestruct cron task", "cron config", config.Config.MsgDestructTime) log.ZInfo(context.Background(), "start msgDestruct cron task", "cron config", config.Config.MsgDestructTime)
// _, err = c.AddFunc(config.Config.MsgDestructTime, msgTool.ConversationsDestructMsgs) _, err = c.AddFunc(config.Config.MsgDestructTime, msgTool.ConversationsDestructMsgs)
// if err != nil { if err != nil {
// fmt.Println("start conversationsDestructMsgs cron failed", err.Error(), config.Config.ChatRecordsClearTime) fmt.Println("start conversationsDestructMsgs cron failed", err.Error(), config.Config.ChatRecordsClearTime)
// panic(err) panic(err)
// } }
c.Start() c.Start()
wg.Wait() wg.Wait()
return nil return nil

@ -63,7 +63,7 @@ type CommonMsgDatabase interface {
// 删除会话消息重置最小seq remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache) // 删除会话消息重置最小seq remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error
// 用户标记删除过期消息返回标记删除的seq列表 // 用户标记删除过期消息返回标记删除的seq列表
UserMsgsDestruct(cte context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error) UserMsgsDestruct(ctx context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error)
// 用户根据seq删除消息 // 用户根据seq删除消息
DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error
@ -641,7 +641,7 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string
if err != nil || msgDocModel.DocID == "" { if err != nil || msgDocModel.DocID == "" {
if err != nil { if err != nil {
if err == unrelation.ErrMsgListNotExist { if err == unrelation.ErrMsgListNotExist {
log.ZDebug(ctx, "deleteMsgRecursion finished", "conversationID", conversationID, "userID", userID, "index", index) log.ZDebug(ctx, "not doc find", "conversationID", conversationID, "userID", userID, "index", index)
} else { } else {
log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index) log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index)
} }
@ -652,25 +652,38 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string
index++ index++
//&& msgDocModel.Msg[0].Msg.SendTime > lastMsgDestructTime.UnixMilli() //&& msgDocModel.Msg[0].Msg.SendTime > lastMsgDestructTime.UnixMilli()
if len(msgDocModel.Msg) > 0 { if len(msgDocModel.Msg) > 0 {
i := 0
var over bool
for _, msg := range msgDocModel.Msg { for _, msg := range msgDocModel.Msg {
i++
if msg != nil && msg.Msg != nil && msg.Msg.SendTime+destructTime*1000 <= time.Now().UnixMilli() { if msg != nil && msg.Msg != nil && msg.Msg.SendTime+destructTime*1000 <= time.Now().UnixMilli() {
if msg.Msg.SendTime > lastMsgDestructTime.UnixMilli() && !utils.Contain(userID, msg.DelList...) { if msg.Msg.SendTime+destructTime*1000 > lastMsgDestructTime.UnixMilli() && !utils.Contain(userID, msg.DelList...) {
seqs = append(seqs, msg.Msg.Seq) seqs = append(seqs, msg.Msg.Seq)
} }
} else { } else {
log.ZDebug(ctx, "deleteMsgRecursion finished", "conversationID", conversationID, "userID", userID, "index", index) log.ZDebug(ctx, "all msg need destruct is found", "conversationID", conversationID, "userID", userID, "index", index, "stop index", i)
over = true
break break
} }
} }
if over {
break
}
} }
} }
log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversationID, "userID", userID, "seqs", seqs) log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversationID, "userID", userID, "seqs", seqs)
if len(seqs) > 0 { if len(seqs) > 0 {
latestSeq := seqs[len(seqs)-1] userMinSeq := seqs[len(seqs)-1] + 1
if err := db.cache.SetConversationUserMinSeq(ctx, conversationID, userID, latestSeq); err != nil { currentUserMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID)
if err != nil && errs.Unwrap(err) != redis.Nil {
return nil, err return nil, err
} }
if currentUserMinSeq < userMinSeq {
if err := db.cache.SetConversationUserMinSeq(ctx, conversationID, userID, userMinSeq); err != nil {
return nil, err
}
}
} }
return seqs, nil return seqs, nil
} }

@ -207,7 +207,7 @@ func (c *ConversationGorm) GetConversationIDsNeedDestruct(
) (conversations []*relation.ConversationModel, err error) { ) (conversations []*relation.ConversationModel, err error) {
return conversations, utils.Wrap( return conversations, utils.Wrap(
c.db(ctx). c.db(ctx).
Where("is_msg_destruct = 1 && UNIX_TIMESTAMP(NOW()) > (msg_destruct_time + UNIX_TIMESTAMP(latest_msg_destruct_time)) && msg_destruct_time != 0"). Where("is_msg_destruct = 1 && msg_destruct_time != 0 && (UNIX_TIMESTAMP(NOW()) > (msg_destruct_time + UNIX_TIMESTAMP(latest_msg_destruct_time)) || latest_msg_destruct_time is NULL)").
Find(&conversations). Find(&conversations).
Error, Error,
"", "",

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package msg package locker
import ( import (
"context" "context"

@ -71,8 +71,9 @@ func newContentTypeConf() map[int32]config.NotificationConf {
constant.ConversationUnreadNotification: config.Config.Notification.ConversationChanged, constant.ConversationUnreadNotification: config.Config.Notification.ConversationChanged,
constant.ConversationPrivateChatNotification: config.Config.Notification.ConversationSetPrivate, constant.ConversationPrivateChatNotification: config.Config.Notification.ConversationSetPrivate,
// msg // msg
constant.MsgRevokeNotification: {IsSendMsg: false, ReliabilityLevel: constant.ReliableNotificationNoMsg}, constant.MsgRevokeNotification: {IsSendMsg: false, ReliabilityLevel: constant.ReliableNotificationNoMsg},
constant.HasReadReceipt: {IsSendMsg: false, ReliabilityLevel: constant.ReliableNotificationNoMsg}, constant.HasReadReceipt: {IsSendMsg: false, ReliabilityLevel: constant.ReliableNotificationNoMsg},
constant.DeleteMsgsNotification: {IsSendMsg: false, ReliabilityLevel: constant.ReliableNotificationNoMsg},
} }
} }
@ -116,7 +117,7 @@ func newSessionTypeConf() map[int32]int32 {
constant.ConversationUnreadNotification: constant.SingleChatType, constant.ConversationUnreadNotification: constant.SingleChatType,
constant.ConversationPrivateChatNotification: constant.SingleChatType, constant.ConversationPrivateChatNotification: constant.SingleChatType,
// delete // delete
constant.MsgDeleteNotification: constant.SingleChatType, constant.DeleteMsgsNotification: constant.SingleChatType,
} }
} }

@ -36,7 +36,7 @@ func (m *MsgNotificationSender) UserDeleteMsgsNotification(ctx context.Context,
ConversationID: conversationID, ConversationID: conversationID,
Seqs: seqs, Seqs: seqs,
} }
return m.Notification(ctx, userID, userID, constant.MsgDeleteNotification, &tips) return m.Notification(ctx, userID, userID, constant.DeleteMsgsNotification, &tips)
} }
func (m *MsgNotificationSender) MarkAsReadNotification(ctx context.Context, conversationID string, sesstionType int32, sendID, recvID string, seqs []int64, hasReadSeq int64) error { func (m *MsgNotificationSender) MarkAsReadNotification(ctx context.Context, conversationID string, sesstionType int32, sendID, recvID string, seqs []int64, hasReadSeq int64) error {

@ -367,6 +367,10 @@ func GetNotificationConversationIDByConversationID(conversationID string) string
return "" return ""
} }
func GetSelfNotificationConversationID(userID string) string {
return "n_" + userID + "_" + userID
}
func GetSeqsBeginEnd(seqs []int64) (int64, int64) { func GetSeqsBeginEnd(seqs []int64) (int64, int64) {
if len(seqs) == 0 { if len(seqs) == 0 {
return 0, 0 return 0, 0

Loading…
Cancel
Save