Merge branch 'tuoyun' of github.com:OpenIMSDK/Open-IM-Server into tuoyun

# Conflicts:
#	internal/rpc/msg/rpcChat.go
pull/232/head
wangchuxiao 3 years ago
commit e961a44511

@ -16,7 +16,7 @@ const Msg = 2
const ConsumerMsgs = 3 const ConsumerMsgs = 3
const UserMessages = 4 const UserMessages = 4
const MongoMessages = 5 const MongoMessages = 5
const ChannelNum = 100 const ChannelNum = 10
var ( var (
persistentCH PersistentConsumerHandler persistentCH PersistentConsumerHandler

@ -9,6 +9,7 @@ import (
"Open_IM/pkg/grpc-etcdv3/getcdv3" "Open_IM/pkg/grpc-etcdv3/getcdv3"
pbMsg "Open_IM/pkg/proto/chat" pbMsg "Open_IM/pkg/proto/chat"
pbPush "Open_IM/pkg/proto/push" pbPush "Open_IM/pkg/proto/push"
server_api_params "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"
"context" "context"
"errors" "errors"
@ -103,7 +104,7 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
msgList := msgChannelValue.msgList msgList := msgChannelValue.msgList
triggerID := msgChannelValue.triggerID triggerID := msgChannelValue.triggerID
storageMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80) storageMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80)
noStoragepushMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80) notStoragepushMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80)
log.Debug(triggerID, "msg arrived channel", "channel id", channelID, msgList, msgChannelValue.userID, len(msgList)) log.Debug(triggerID, "msg arrived channel", "channel id", channelID, msgList, msgChannelValue.userID, len(msgList))
for _, v := range msgList { for _, v := range msgList {
log.Debug(triggerID, "msg come to storage center", v.String()) log.Debug(triggerID, "msg come to storage center", v.String())
@ -114,7 +115,7 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
//log.NewWarn(triggerID, "storageMsgList to mongodb client msgID: ", v.MsgData.ClientMsgID) //log.NewWarn(triggerID, "storageMsgList to mongodb client msgID: ", v.MsgData.ClientMsgID)
} else { } else {
if !(!isSenderSync && msgChannelValue.userID == v.MsgData.SendID) { if !(!isSenderSync && msgChannelValue.userID == v.MsgData.SendID) {
noStoragepushMsgList = append(noStoragepushMsgList, v) notStoragepushMsgList = append(notStoragepushMsgList, v)
} }
} }
@ -128,7 +129,7 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
// log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String()) // log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String())
// return // return
//} //}
log.Debug(triggerID, "msg storage length", len(storageMsgList), "push length", len(noStoragepushMsgList)) log.Debug(triggerID, "msg storage length", len(storageMsgList), "push length", len(notStoragepushMsgList))
err, lastSeq := saveUserChatList(msgChannelValue.userID, storageMsgList, triggerID) err, lastSeq := saveUserChatList(msgChannelValue.userID, storageMsgList, triggerID)
if err != nil { if err != nil {
singleMsgFailedCount += uint64(len(storageMsgList)) singleMsgFailedCount += uint64(len(storageMsgList))
@ -146,7 +147,7 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
sendMessageToPush(x, msgChannelValue.userID) sendMessageToPush(x, msgChannelValue.userID)
} }
}(noStoragepushMsgList, storageMsgList) }(notStoragepushMsgList, storageMsgList)
} }
} }
@ -175,6 +176,26 @@ func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) {
if err != nil { if err != nil {
log.NewError(triggerID, "single data insert to mongo err", err.Error(), msgList) log.NewError(triggerID, "single data insert to mongo err", err.Error(), msgList)
} }
for _, v := range msgList {
if v.MsgData.ContentType == constant.DeleteMessageNotification {
tips := server_api_params.TipsComm{}
DeleteMessageTips := server_api_params.DeleteMessageTips{}
err := proto.Unmarshal(v.MsgData.Content, &tips)
if err != nil {
log.NewError(triggerID, "tips unmarshal err:", err.Error(), v.String())
continue
}
err = proto.Unmarshal(tips.Detail, &DeleteMessageTips)
if err != nil {
log.NewError(triggerID, "deleteMessageTips unmarshal err:", err.Error(), v.String())
continue
}
if err := db.DB.DelMsgBySeqList(DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID); err != nil {
log.NewError(v.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqList args: ", DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID, err.Error())
}
}
}
} }
} }
} }

@ -7,6 +7,9 @@
package logic package logic
import ( import (
pusher "Open_IM/internal/push"
"Open_IM/internal/push/getui"
jpush "Open_IM/internal/push/jpush"
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
"Open_IM/pkg/common/kafka" "Open_IM/pkg/common/kafka"
@ -15,11 +18,12 @@ import (
) )
var ( var (
rpcServer RPCServer rpcServer RPCServer
pushCh PushConsumerHandler pushCh PushConsumerHandler
pushTerminal []int32 pushTerminal []int32
producer *kafka.Producer producer *kafka.Producer
successCount uint64 offlinePusher pusher.OfflinePusher
successCount uint64
) )
func Init(rpcPort int) { func Init(rpcPort int) {
@ -31,6 +35,12 @@ func Init(rpcPort int) {
func init() { func init() {
producer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic) producer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic)
statistics.NewStatistics(&successCount, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to msg_gateway count", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) statistics.NewStatistics(&successCount, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to msg_gateway count", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
if config.Config.Push.Getui.Enable {
offlinePusher = getui.GetuiClient
}
if config.Config.Push.Jpns.Enable {
offlinePusher = jpush.JPushClient
}
} }
func Run() { func Run() {

@ -7,8 +7,6 @@
package logic package logic
import ( import (
pusher "Open_IM/internal/push"
"Open_IM/internal/push/getui"
jpush "Open_IM/internal/push/jpush" jpush "Open_IM/internal/push/jpush"
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
@ -102,14 +100,6 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) {
content = constant.ContentType2PushContent[constant.Common] content = constant.ContentType2PushContent[constant.Common]
} }
} }
var offlinePusher pusher.OfflinePusher
if config.Config.Push.Getui.Enable {
log.NewInfo(pushMsg.OperationID, utils.GetSelfFuncName(), config.Config.Push.Getui)
offlinePusher = getui.GetuiClient
}
if config.Config.Push.Jpns.Enable {
offlinePusher = jpush.JPushClient
}
if offlinePusher == nil { if offlinePusher == nil {
offlinePusher = jpush.JPushClient offlinePusher = jpush.JPushClient
} }

@ -46,8 +46,8 @@ const (
GroupMsg = 201 GroupMsg = 201
//SysRelated //SysRelated
NotificationBegin = 1000 NotificationBegin = 1000
DeleteMessageNotification = 1100
FriendApplicationApprovedNotification = 1201 //add_friend_response FriendApplicationApprovedNotification = 1201 //add_friend_response
FriendApplicationRejectedNotification = 1202 //add_friend_response FriendApplicationRejectedNotification = 1202 //add_friend_response
FriendApplicationNotification = 1203 //add_friend FriendApplicationNotification = 1203 //add_friend

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save