Merge branch 'superGroup' of github.com:OpenIMSDK/Open-IM-Server into superGroup

pull/236/head
wangchuxiao 3 years ago
commit a85f6b8287

@ -64,28 +64,37 @@ func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) {
} }
func (ws *WServer) getSeqReq(conn *UserConn, m *Req) { func (ws *WServer) getSeqReq(conn *UserConn, m *Req) {
log.NewInfo(m.OperationID, "Ws call success to getNewSeq", m.MsgIncr, m.SendID, m.ReqIdentifier, m.Data) log.NewInfo(m.OperationID, "Ws call success to getNewSeq", m.MsgIncr, m.SendID, m.ReqIdentifier, m.Data)
rpcReq := pbChat.GetMaxAndMinSeqReq{} nReply := new(sdk_ws.GetMaxAndMinSeqResp)
nReply := new(pbChat.GetMaxAndMinSeqResp) isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSGetNewestSeq)
rpcReq.UserID = m.SendID if isPass {
rpcReq.OperationID = m.OperationID rpcReq := sdk_ws.GetMaxAndMinSeqReq{}
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName) rpcReq.GroupIDList = data.(sdk_ws.GetMaxAndMinSeqReq).GroupIDList
msgClient := pbChat.NewChatClient(grpcConn) rpcReq.UserID = m.SendID
rpcReply, err := msgClient.GetMaxAndMinSeq(context.Background(), &rpcReq) rpcReq.OperationID = m.OperationID
if err != nil { log.Debug(m.OperationID, "Ws call success to getMaxAndMinSeq", m.SendID, m.ReqIdentifier, m.MsgIncr, data.(sdk_ws.GetMaxAndMinSeqReq).GroupIDList)
log.Error(rpcReq.OperationID, "rpc call failed to getSeqReq", err, rpcReq.String()) grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
nReply.ErrCode = 500 msgClient := pbChat.NewChatClient(grpcConn)
nReply.ErrMsg = err.Error() rpcReply, err := msgClient.GetMaxAndMinSeq(context.Background(), &rpcReq)
ws.getSeqResp(conn, m, nReply) if err != nil {
log.Error(rpcReq.OperationID, "rpc call failed to getSeqReq", err.Error(), rpcReq.String())
nReply.ErrCode = 500
nReply.ErrMsg = err.Error()
ws.getSeqResp(conn, m, nReply)
} else {
log.NewInfo(rpcReq.OperationID, "rpc call success to getSeqReq", rpcReply.String())
ws.getSeqResp(conn, m, rpcReply)
}
} else { } else {
log.NewInfo(rpcReq.OperationID, "rpc call success to getSeqReq", rpcReply.String()) nReply.ErrCode = errCode
ws.getSeqResp(conn, m, rpcReply) nReply.ErrMsg = errMsg
ws.getSeqResp(conn, m, nReply)
} }
} }
func (ws *WServer) getSeqResp(conn *UserConn, m *Req, pb *pbChat.GetMaxAndMinSeqResp) { func (ws *WServer) getSeqResp(conn *UserConn, m *Req, pb *sdk_ws.GetMaxAndMinSeqResp) {
var mReplyData sdk_ws.GetMaxAndMinSeqResp log.Debug(m.OperationID, "getSeqResp come here ", pb.String())
mReplyData.MaxSeq = pb.GetMaxSeq() b, _ := proto.Marshal(pb)
mReplyData.MinSeq = pb.GetMinSeq()
b, _ := proto.Marshal(&mReplyData)
mReply := Resp{ mReply := Resp{
ReqIdentifier: m.ReqIdentifier, ReqIdentifier: m.ReqIdentifier,
MsgIncr: m.MsgIncr, MsgIncr: m.MsgIncr,
@ -146,6 +155,7 @@ func (ws *WServer) sendMsgReq(conn *UserConn, m *Req) {
sendMsgAllCountLock.Lock() sendMsgAllCountLock.Lock()
sendMsgAllCount++ sendMsgAllCount++
sendMsgAllCountLock.Unlock() sendMsgAllCountLock.Unlock()
//stat.GaugeVecApiMethod.WithLabelValues("ws_send_message_count").Inc()
log.NewInfo(m.OperationID, "Ws call success to sendMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID, m.Data) log.NewInfo(m.OperationID, "Ws call success to sendMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID, m.Data)
nReply := new(pbChat.SendMsgResp) nReply := new(pbChat.SendMsgResp)

@ -59,6 +59,18 @@ type SeqListData struct {
func (ws *WServer) argsValidate(m *Req, r int32) (isPass bool, errCode int32, errMsg string, returnData interface{}) { func (ws *WServer) argsValidate(m *Req, r int32) (isPass bool, errCode int32, errMsg string, returnData interface{}) {
switch r { switch r {
case constant.WSGetNewestSeq:
data := open_im_sdk.GetMaxAndMinSeqReq{}
if err := proto.Unmarshal(m.Data, &data); err != nil {
log.Error("", "Decode Data struct err", err.Error(), r)
return false, 203, err.Error(), nil
}
if err := validate.Struct(data); err != nil {
log.Error("", "data args validate err", err.Error(), r)
return false, 204, err.Error(), nil
}
return true, 0, "", data
case constant.WSSendMsg: case constant.WSSendMsg:
data := open_im_sdk.MsgData{} data := open_im_sdk.MsgData{}
if err := proto.Unmarshal(m.Data, &data); err != nil { if err := proto.Unmarshal(m.Data, &data); err != nil {

@ -14,14 +14,13 @@ const OnlineTopicBusy = 1
const OnlineTopicVacancy = 0 const OnlineTopicVacancy = 0
const Msg = 2 const Msg = 2
const ConsumerMsgs = 3 const ConsumerMsgs = 3
const UserMessages = 4 const AggregationMessages = 4
const MongoMessages = 5 const MongoMessages = 5
const ChannelNum = 100 const ChannelNum = 100
var ( var (
persistentCH PersistentConsumerHandler persistentCH PersistentConsumerHandler
historyCH OnlineHistoryConsumerHandler historyCH OnlineHistoryConsumerHandler
offlineHistoryCH OfflineHistoryConsumerHandler
producer *kafka.Producer producer *kafka.Producer
cmdCh chan Cmd2Value cmdCh chan Cmd2Value
onlineTopicStatus int onlineTopicStatus int

@ -1,313 +0,0 @@
package logic
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
kfk "Open_IM/pkg/common/kafka"
"Open_IM/pkg/common/log"
pbMsg "Open_IM/pkg/proto/chat"
"Open_IM/pkg/utils"
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
"time"
)
type OfflineHistoryConsumerHandler struct {
msgHandle map[string]fcb
historyConsumerGroup *kfk.MConsumerGroup
cmdCh chan Cmd2Value
msgCh chan Cmd2Value
chArrays [ChannelNum]chan Cmd2Value
msgDistributionCh chan Cmd2Value
}
func (mc *OfflineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) {
mc.msgHandle = make(map[string]fcb)
mc.msgDistributionCh = make(chan Cmd2Value) //no buffer channel
go mc.MessagesDistributionHandle()
mc.cmdCh = cmdCh
mc.msgCh = make(chan Cmd2Value, 1000)
for i := 0; i < ChannelNum; i++ {
mc.chArrays[i] = make(chan Cmd2Value, 1000)
go mc.Run(i)
}
if config.Config.ReliableStorage {
mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2Mongo
} else {
mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2MongoLowReliability
}
mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschatOffline.Topic},
config.Config.Kafka.Ws2mschatOffline.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongoOffline)
}
func (och *OfflineHistoryConsumerHandler) Run(channelID int) {
for {
select {
case cmd := <-och.chArrays[channelID]:
switch cmd.Cmd {
case UserMessages:
msgChannelValue := cmd.Value.(MsgChannelValue)
msgList := msgChannelValue.msgList
triggerID := msgChannelValue.triggerID
storageMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80)
pushMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80)
log.Debug(triggerID, "msg arrived channel", "channel id", channelID, msgList, msgChannelValue.userID, len(msgList))
for _, v := range msgList {
log.Debug(triggerID, "msg come to storage center", v.String())
isHistory := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsHistory)
isSenderSync := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsSenderSync)
if isHistory {
storageMsgList = append(storageMsgList, v)
}
if !(!isSenderSync && msgChannelValue.userID == v.MsgData.SendID) {
pushMsgList = append(pushMsgList, v)
}
}
//switch msgChannelValue.msg.MsgData.SessionType {
//case constant.SingleChatType:
//case constant.GroupChatType:
//case constant.NotificationChatType:
//default:
// log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String())
// return
//}
err, _ := saveUserChatList(msgChannelValue.userID, storageMsgList, triggerID)
if err != nil {
singleMsgFailedCount += uint64(len(storageMsgList))
log.NewError(triggerID, "single data insert to mongo err", err.Error(), storageMsgList)
} else {
singleMsgSuccessCountMutex.Lock()
singleMsgSuccessCount += uint64(len(storageMsgList))
singleMsgSuccessCountMutex.Unlock()
for _, v := range pushMsgList {
sendMessageToPush(v, msgChannelValue.userID)
}
}
}
}
}
}
func (och *OfflineHistoryConsumerHandler) MessagesDistributionHandle() {
UserAggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum)
for {
select {
case cmd := <-och.msgDistributionCh:
switch cmd.Cmd {
case ConsumerMsgs:
triggerChannelValue := cmd.Value.(TriggerChannelValue)
triggerID := triggerChannelValue.triggerID
consumerMessages := triggerChannelValue.cmsgList
//Aggregation map[userid]message list
log.Debug(triggerID, "batch messages come to distribution center", len(consumerMessages))
for i := 0; i < len(consumerMessages); i++ {
msgFromMQ := pbMsg.MsgDataToMQ{}
err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ)
if err != nil {
log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(consumerMessages[i].Value), "err", err.Error())
return
}
log.Debug(triggerID, "single msg come to distribution center", msgFromMQ.String())
if oldM, ok := UserAggregationMsgs[string(consumerMessages[i].Key)]; ok {
oldM = append(oldM, &msgFromMQ)
UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM
} else {
m := make([]*pbMsg.MsgDataToMQ, 0, 100)
m = append(m, &msgFromMQ)
UserAggregationMsgs[string(consumerMessages[i].Key)] = m
}
}
log.Debug(triggerID, "generate map list users len", len(UserAggregationMsgs))
for userID, v := range UserAggregationMsgs {
if len(v) >= 0 {
channelID := getHashCode(userID) % ChannelNum
go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) {
och.chArrays[cID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: messages, triggerID: triggerID}}
}(channelID, userID, v)
}
}
}
}
}
}
func (mc *OfflineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) {
msg := cMsg.Value
now := time.Now()
msgFromMQ := pbMsg.MsgDataToMQ{}
err := proto.Unmarshal(msg, &msgFromMQ)
if err != nil {
log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error())
return
}
operationID := msgFromMQ.OperationID
log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg))
//Control whether to store offline messages (mongo)
isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory)
//Control whether to store history messages (mysql)
isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent)
isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync)
switch msgFromMQ.MsgData.SessionType {
case constant.SingleChatType:
log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = SingleChatType", isHistory, isPersist)
if isHistory {
err := saveUserChat(msgKey, &msgFromMQ)
if err != nil {
singleMsgFailedCount++
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
return
}
singleMsgSuccessCountMutex.Lock()
singleMsgSuccessCount++
singleMsgSuccessCountMutex.Unlock()
log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
}
if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {
} else {
go sendMessageToPush(&msgFromMQ, msgKey)
}
log.NewDebug(operationID, "saveSingleMsg cost time ", time.Since(now))
case constant.GroupChatType:
log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = GroupChatType", isHistory, isPersist)
if isHistory {
err := saveUserChat(msgFromMQ.MsgData.RecvID, &msgFromMQ)
if err != nil {
log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error())
return
}
groupMsgCount++
}
go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID)
log.NewDebug(operationID, "saveGroupMsg cost time ", time.Since(now))
case constant.NotificationChatType:
log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = NotificationChatType", isHistory, isPersist)
if isHistory {
err := saveUserChat(msgKey, &msgFromMQ)
if err != nil {
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
return
}
log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
}
if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {
} else {
go sendMessageToPush(&msgFromMQ, msgKey)
}
log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now))
default:
log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String())
return
}
sess.MarkMessage(cMsg, "")
log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String())
}
func (mc *OfflineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) {
msg := cMsg.Value
msgFromMQ := pbMsg.MsgDataToMQ{}
err := proto.Unmarshal(msg, &msgFromMQ)
if err != nil {
log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error())
return
}
operationID := msgFromMQ.OperationID
log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg))
//Control whether to store offline messages (mongo)
isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory)
isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync)
if isHistory {
seq, err := db.DB.IncrUserSeq(msgKey)
if err != nil {
log.NewError(operationID, "data insert to redis err", err.Error(), string(msg))
return
}
sess.MarkMessage(cMsg, "")
msgFromMQ.MsgData.Seq = uint32(seq)
log.Debug(operationID, "send ch msg is ", msgFromMQ.String())
//mc.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, msgFromMQ}}
//err := saveUserChat(msgKey, &msgFromMQ)
//if err != nil {
// singleMsgFailedCount++
// log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
// return
//}
//singleMsgSuccessCountMutex.Lock()
//singleMsgSuccessCount++
//singleMsgSuccessCountMutex.Unlock()
//log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
} else {
if !(!isSenderSync && msgKey == msgFromMQ.MsgData.SendID) {
go sendMessageToPush(&msgFromMQ, msgKey)
}
}
}
func (OfflineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (OfflineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
//func (mc *OfflineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
// claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
// //log.NewDebug("", "offline new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition())
// //for msg := range claim.Messages() {
// // log.NewDebug("", "kafka get info to delay mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "offline")
// // //mc.msgHandle[msg.Topic](msg.Value, string(msg.Key))
// //}
// for msg := range claim.Messages() {
// if GetOnlineTopicStatus() == OnlineTopicVacancy {
// log.NewDebug("", "vacancy offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
// mc.msgHandle[msg.Topic](msg, string(msg.Key), sess)
// } else {
// select {
// case <-mc.cmdCh:
// log.NewDebug("", "cmd offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
// case <-time.After(time.Millisecond * time.Duration(100)):
// log.NewDebug("", "timeout offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
// }
// mc.msgHandle[msg.Topic](msg, string(msg.Key), sess)
// }
// }
//
// return nil
//}
func (och *OfflineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition())
cMsg := make([]*sarama.ConsumerMessage, 0, 500)
t := time.NewTicker(time.Duration(500) * time.Millisecond)
var triggerID string
for msg := range claim.Messages() {
//och.TriggerCmd(OnlineTopicBusy)
cMsg = append(cMsg, msg)
select {
case <-t.C:
if len(cMsg) >= 0 {
triggerID = utils.OperationIDGenerator()
log.Debug(triggerID, "timer trigger msg consumer start", len(cMsg))
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
triggerID: triggerID, cmsgList: cMsg}}
sess.MarkMessage(msg, "")
cMsg = cMsg[0:0]
log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg))
}
default:
if len(cMsg) >= 500 {
triggerID = utils.OperationIDGenerator()
log.Debug(triggerID, "length trigger msg consumer start", len(cMsg))
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
triggerID: triggerID, cmsgList: cMsg}}
sess.MarkMessage(msg, "")
cMsg = cMsg[0:0]
log.Debug(triggerID, "length trigger msg consumer end", len(cMsg))
}
}
log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset())
}
return nil
}

@ -22,10 +22,10 @@ import (
) )
type MsgChannelValue struct { type MsgChannelValue struct {
userID string aggregationID string //maybe userID or super groupID
triggerID string triggerID string
msgList []*pbMsg.MsgDataToMQ msgList []*pbMsg.MsgDataToMQ
lastSeq uint64 lastSeq uint64
} }
type TriggerChannelValue struct { type TriggerChannelValue struct {
triggerID string triggerID string
@ -98,13 +98,13 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
select { select {
case cmd := <-och.chArrays[channelID]: case cmd := <-och.chArrays[channelID]:
switch cmd.Cmd { switch cmd.Cmd {
case UserMessages: case AggregationMessages:
msgChannelValue := cmd.Value.(MsgChannelValue) msgChannelValue := cmd.Value.(MsgChannelValue)
msgList := msgChannelValue.msgList msgList := msgChannelValue.msgList
triggerID := msgChannelValue.triggerID triggerID := msgChannelValue.triggerID
storageMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80) storageMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80)
notStoragepushMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80) notStoragePushMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80)
log.Debug(triggerID, "msg arrived channel", "channel id", channelID, msgList, msgChannelValue.userID, len(msgList)) log.Debug(triggerID, "msg arrived channel", "channel id", channelID, msgList, msgChannelValue.aggregationID, len(msgList))
for _, v := range msgList { for _, v := range msgList {
log.Debug(triggerID, "msg come to storage center", v.String()) log.Debug(triggerID, "msg come to storage center", v.String())
isHistory := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsHistory) isHistory := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsHistory)
@ -113,9 +113,10 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
storageMsgList = append(storageMsgList, v) storageMsgList = append(storageMsgList, v)
//log.NewWarn(triggerID, "storageMsgList to mongodb client msgID: ", v.MsgData.ClientMsgID) //log.NewWarn(triggerID, "storageMsgList to mongodb client msgID: ", v.MsgData.ClientMsgID)
} else { } else {
if !(!isSenderSync && msgChannelValue.userID == v.MsgData.SendID) { if !(!isSenderSync && msgChannelValue.aggregationID == v.MsgData.SendID) {
notStoragepushMsgList = append(notStoragepushMsgList, v) notStoragePushMsgList = append(notStoragePushMsgList, v)
} }
} }
} }
@ -128,8 +129,8 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
// log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String()) // log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String())
// return // return
//} //}
log.Debug(triggerID, "msg storage length", len(storageMsgList), "push length", len(notStoragepushMsgList)) log.Debug(triggerID, "msg storage length", len(storageMsgList), "push length", len(notStoragePushMsgList))
err, lastSeq := saveUserChatList(msgChannelValue.userID, storageMsgList, triggerID) err, lastSeq := saveUserChatList(msgChannelValue.aggregationID, storageMsgList, triggerID)
if err != nil { if err != nil {
singleMsgFailedCount += uint64(len(storageMsgList)) singleMsgFailedCount += uint64(len(storageMsgList))
log.NewError(triggerID, "single data insert to redis err", err.Error(), storageMsgList) log.NewError(triggerID, "single data insert to redis err", err.Error(), storageMsgList)
@ -137,28 +138,28 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
singleMsgSuccessCountMutex.Lock() singleMsgSuccessCountMutex.Lock()
singleMsgSuccessCount += uint64(len(storageMsgList)) singleMsgSuccessCount += uint64(len(storageMsgList))
singleMsgSuccessCountMutex.Unlock() singleMsgSuccessCountMutex.Unlock()
och.SendMessageToMongoCH(msgChannelValue.userID, triggerID, storageMsgList, lastSeq) och.SendMessageToMongoCH(msgChannelValue.aggregationID, triggerID, storageMsgList, lastSeq)
go func(push, storage []*pbMsg.MsgDataToMQ) { go func(push, storage []*pbMsg.MsgDataToMQ) {
for _, v := range storage { for _, v := range storage {
sendMessageToPush(v, msgChannelValue.userID) sendMessageToPush(v, msgChannelValue.aggregationID)
} }
for _, x := range push { for _, x := range push {
sendMessageToPush(x, msgChannelValue.userID) sendMessageToPush(x, msgChannelValue.aggregationID)
} }
}(notStoragepushMsgList, storageMsgList) }(notStoragePushMsgList, storageMsgList)
} }
} }
} }
} }
} }
func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(userID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) { func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) {
hashCode := getHashCode(userID) hashCode := getHashCode(aggregationID)
channelID := hashCode % ChannelNum channelID := hashCode % ChannelNum
log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) log.Debug(triggerID, "generate channelID", hashCode, channelID, aggregationID)
//go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { //go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) {
och.chMongoArrays[channelID] <- Cmd2Value{Cmd: MongoMessages, Value: MsgChannelValue{userID: userID, msgList: messages, triggerID: triggerID, lastSeq: lastSeq}} och.chMongoArrays[channelID] <- Cmd2Value{Cmd: MongoMessages, Value: MsgChannelValue{aggregationID: aggregationID, msgList: messages, triggerID: triggerID, lastSeq: lastSeq}}
} }
func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) { func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) {
for { for {
@ -169,9 +170,9 @@ func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) {
msgChannelValue := cmd.Value.(MsgChannelValue) msgChannelValue := cmd.Value.(MsgChannelValue)
msgList := msgChannelValue.msgList msgList := msgChannelValue.msgList
triggerID := msgChannelValue.triggerID triggerID := msgChannelValue.triggerID
userID := msgChannelValue.userID aggregationID := msgChannelValue.aggregationID
lastSeq := msgChannelValue.lastSeq lastSeq := msgChannelValue.lastSeq
err := db.DB.BatchInsertChat2DB(userID, msgList, triggerID, lastSeq) err := db.DB.BatchInsertChat2DB(aggregationID, msgList, triggerID, lastSeq)
if err != nil { if err != nil {
log.NewError(triggerID, "single data insert to mongo err", err.Error(), msgList) log.NewError(triggerID, "single data insert to mongo err", err.Error(), msgList)
} }
@ -202,7 +203,7 @@ func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) {
func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() { func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() {
for { for {
UserAggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum) aggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum)
select { select {
case cmd := <-och.msgDistributionCh: case cmd := <-och.msgDistributionCh:
switch cmd.Cmd { switch cmd.Cmd {
@ -220,23 +221,23 @@ func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() {
return return
} }
log.Debug(triggerID, "single msg come to distribution center", msgFromMQ.String(), string(consumerMessages[i].Key)) log.Debug(triggerID, "single msg come to distribution center", msgFromMQ.String(), string(consumerMessages[i].Key))
if oldM, ok := UserAggregationMsgs[string(consumerMessages[i].Key)]; ok { if oldM, ok := aggregationMsgs[string(consumerMessages[i].Key)]; ok {
oldM = append(oldM, &msgFromMQ) oldM = append(oldM, &msgFromMQ)
UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM aggregationMsgs[string(consumerMessages[i].Key)] = oldM
} else { } else {
m := make([]*pbMsg.MsgDataToMQ, 0, 100) m := make([]*pbMsg.MsgDataToMQ, 0, 100)
m = append(m, &msgFromMQ) m = append(m, &msgFromMQ)
UserAggregationMsgs[string(consumerMessages[i].Key)] = m aggregationMsgs[string(consumerMessages[i].Key)] = m
} }
} }
log.Debug(triggerID, "generate map list users len", len(UserAggregationMsgs)) log.Debug(triggerID, "generate map list users len", len(aggregationMsgs))
for userID, v := range UserAggregationMsgs { for aggregationID, v := range aggregationMsgs {
if len(v) >= 0 { if len(v) >= 0 {
hashCode := getHashCode(userID) hashCode := getHashCode(aggregationID)
channelID := hashCode % ChannelNum channelID := hashCode % ChannelNum
log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) log.Debug(triggerID, "generate channelID", hashCode, channelID, aggregationID)
//go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { //go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) {
och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: v, triggerID: triggerID}} och.chArrays[channelID] <- Cmd2Value{Cmd: AggregationMessages, Value: MsgChannelValue{aggregationID: aggregationID, msgList: v, triggerID: triggerID}}
//}(channelID, userID, v) //}(channelID, userID, v)
} }
} }

@ -6,18 +6,25 @@ import (
commonDB "Open_IM/pkg/common/db" commonDB "Open_IM/pkg/common/db"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
pbMsg "Open_IM/pkg/proto/chat"
open_im_sdk "Open_IM/pkg/proto/sdk_ws" open_im_sdk "Open_IM/pkg/proto/sdk_ws"
) )
func (rpc *rpcChat) GetMaxAndMinSeq(_ context.Context, in *pbMsg.GetMaxAndMinSeqReq) (*pbMsg.GetMaxAndMinSeqResp, error) { func (rpc *rpcChat) GetMaxAndMinSeq(_ context.Context, in *open_im_sdk.GetMaxAndMinSeqReq) (*open_im_sdk.GetMaxAndMinSeqResp, error) {
log.NewInfo(in.OperationID, "rpc getMaxAndMinSeq is arriving", in.String()) log.NewInfo(in.OperationID, "rpc getMaxAndMinSeq is arriving", in.String())
resp := new(open_im_sdk.GetMaxAndMinSeqResp)
m := make(map[string]*open_im_sdk.MaxAndMinSeq)
//seq, err := model.GetBiggestSeqFromReceive(in.UserID) //seq, err := model.GetBiggestSeqFromReceive(in.UserID)
maxSeq, err1 := commonDB.DB.GetUserMaxSeq(in.UserID) maxSeq, err1 := commonDB.DB.GetUserMaxSeq(in.UserID)
minSeq, err2 := commonDB.DB.GetUserMinSeq(in.UserID) minSeq, err2 := commonDB.DB.GetUserMinSeq(in.UserID)
resp := new(pbMsg.GetMaxAndMinSeqResp)
if err1 == nil { if err1 == nil {
resp.MaxSeq = uint32(maxSeq) resp.MaxSeq = uint32(maxSeq)
for _, v := range in.GroupIDList {
x := new(open_im_sdk.MaxAndMinSeq)
maxSeq, _ := commonDB.DB.GetUserMaxSeq(v)
x.MaxSeq = uint32(maxSeq)
m[v] = x
}
resp.GroupMaxAndMinSeq = m
} else if err1 == redis.ErrNil { } else if err1 == redis.ErrNil {
resp.MaxSeq = 0 resp.MaxSeq = 0
} else { } else {
@ -39,6 +46,7 @@ func (rpc *rpcChat) GetMaxAndMinSeq(_ context.Context, in *pbMsg.GetMaxAndMinSeq
func (rpc *rpcChat) PullMessageBySeqList(_ context.Context, in *open_im_sdk.PullMessageBySeqListReq) (*open_im_sdk.PullMessageBySeqListResp, error) { func (rpc *rpcChat) PullMessageBySeqList(_ context.Context, in *open_im_sdk.PullMessageBySeqListReq) (*open_im_sdk.PullMessageBySeqListResp, error) {
log.NewInfo(in.OperationID, "rpc PullMessageBySeqList is arriving", in.String()) log.NewInfo(in.OperationID, "rpc PullMessageBySeqList is arriving", in.String())
resp := new(open_im_sdk.PullMessageBySeqListResp) resp := new(open_im_sdk.PullMessageBySeqListResp)
m := make(map[string]*open_im_sdk.MsgDataList)
//msgList, err := commonDB.DB.GetMsgBySeqList(in.UserID, in.SeqList, in.OperationID) //msgList, err := commonDB.DB.GetMsgBySeqList(in.UserID, in.SeqList, in.OperationID)
redisMsgList, failedSeqList, err := commonDB.DB.GetMessageListBySeq(in.UserID, in.SeqList, in.OperationID) redisMsgList, failedSeqList, err := commonDB.DB.GetMessageListBySeq(in.UserID, in.SeqList, in.OperationID)
if err != nil { if err != nil {
@ -60,6 +68,32 @@ func (rpc *rpcChat) PullMessageBySeqList(_ context.Context, in *open_im_sdk.Pull
} else { } else {
resp.List = redisMsgList resp.List = redisMsgList
} }
for k, v := range in.GroupSeqList {
x := new(open_im_sdk.MsgDataList)
redisMsgList, failedSeqList, err := commonDB.DB.GetMessageListBySeq(k, v.SeqList, in.OperationID)
if err != nil {
if err != redis.ErrNil {
log.Error(in.OperationID, "get message from redis exception", err.Error(), failedSeqList)
} else {
log.Debug(in.OperationID, "get message from redis is nil", failedSeqList)
}
msgList, err1 := commonDB.DB.GetMsgBySeqListMongo2(k, failedSeqList, in.OperationID)
if err1 != nil {
log.Error(in.OperationID, "PullMessageBySeqList data error", in.String(), err.Error())
resp.ErrCode = 201
resp.ErrMsg = err.Error()
return resp, nil
} else {
redisMsgList = append(redisMsgList, msgList...)
x.MsgDataList = redisMsgList
m[k] = x
}
} else {
x.MsgDataList = redisMsgList
m[k] = x
}
}
resp.GroupMsgDataList = m
//respSingleMsgFormat = singleMsgHandleByUser(SingleMsgFormat, in.UserID) //respSingleMsgFormat = singleMsgHandleByUser(SingleMsgFormat, in.UserID)
//respGroupMsgFormat = groupMsgHandleByUser(GroupMsgFormat) //respGroupMsgFormat = groupMsgHandleByUser(GroupMsgFormat)
return resp, nil return resp, nil

@ -50,46 +50,49 @@ type MsgCallBackResp struct {
} }
func userRelationshipVerification(data *pbChat.SendMsgReq) (bool, int32, string) { func userRelationshipVerification(data *pbChat.SendMsgReq) (bool, int32, string) {
if data.MsgData.SessionType == constant.GroupChatType { if data.MsgData.SessionType == constant.SingleChatType {
return true, 0, "" log.NewDebug(data.OperationID, config.Config.MessageVerify.FriendVerify)
} reqGetBlackIDListFromCache := &cacheRpc.GetBlackIDListFromCacheReq{UserID: data.MsgData.RecvID, OperationID: data.OperationID}
log.NewDebug(data.OperationID, config.Config.MessageVerify.FriendVerify)
reqGetBlackIDListFromCache := &cacheRpc.GetBlackIDListFromCacheReq{UserID: data.MsgData.RecvID, OperationID: data.OperationID}
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName)
cacheClient := cacheRpc.NewCacheClient(etcdConn)
cacheResp, err := cacheClient.GetBlackIDListFromCache(context.Background(), reqGetBlackIDListFromCache)
if err != nil {
log.NewError(data.OperationID, "GetBlackIDListFromCache rpc call failed ", err.Error())
} else {
if cacheResp.CommonResp.ErrCode != 0 {
log.NewError(data.OperationID, "GetBlackIDListFromCache rpc logic call failed ", cacheResp.String())
} else {
if utils.IsContain(data.MsgData.SendID, cacheResp.UserIDList) {
return false, 600, "in black list"
}
}
}
log.NewDebug(data.OperationID, config.Config.MessageVerify.FriendVerify)
if config.Config.MessageVerify.FriendVerify {
reqGetFriendIDListFromCache := &cacheRpc.GetFriendIDListFromCacheReq{UserID: data.MsgData.RecvID, OperationID: data.OperationID}
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName) etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName)
cacheClient := cacheRpc.NewCacheClient(etcdConn) cacheClient := cacheRpc.NewCacheClient(etcdConn)
cacheResp, err := cacheClient.GetFriendIDListFromCache(context.Background(), reqGetFriendIDListFromCache) cacheResp, err := cacheClient.GetBlackIDListFromCache(context.Background(), reqGetBlackIDListFromCache)
if err != nil { if err != nil {
log.NewError(data.OperationID, "GetFriendIDListFromCache rpc call failed ", err.Error()) log.NewError(data.OperationID, "GetBlackIDListFromCache rpc call failed ", err.Error())
} else { } else {
if cacheResp.CommonResp.ErrCode != 0 { if cacheResp.CommonResp.ErrCode != 0 {
log.NewError(data.OperationID, "GetFriendIDListFromCache rpc logic call failed ", cacheResp.String()) log.NewError(data.OperationID, "GetBlackIDListFromCache rpc logic call failed ", cacheResp.String())
} else { } else {
if !utils.IsContain(data.MsgData.SendID, cacheResp.UserIDList) { if utils.IsContain(data.MsgData.SendID, cacheResp.UserIDList) {
return false, 601, "not friend" return false, 600, "in black list"
} }
} }
} }
return true, 0, "" log.NewDebug(data.OperationID, config.Config.MessageVerify.FriendVerify)
if config.Config.MessageVerify.FriendVerify {
reqGetFriendIDListFromCache := &cacheRpc.GetFriendIDListFromCacheReq{UserID: data.MsgData.RecvID, OperationID: data.OperationID}
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName)
cacheClient := cacheRpc.NewCacheClient(etcdConn)
cacheResp, err := cacheClient.GetFriendIDListFromCache(context.Background(), reqGetFriendIDListFromCache)
if err != nil {
log.NewError(data.OperationID, "GetFriendIDListFromCache rpc call failed ", err.Error())
} else {
if cacheResp.CommonResp.ErrCode != 0 {
log.NewError(data.OperationID, "GetFriendIDListFromCache rpc logic call failed ", cacheResp.String())
} else {
if !utils.IsContain(data.MsgData.SendID, cacheResp.UserIDList) {
return false, 601, "not friend"
}
}
}
return true, 0, ""
} else {
return true, 0, ""
}
} else { } else {
return true, 0, "" return true, 0, ""
} }
} }
func (rpc *rpcChat) encapsulateMsgData(msg *sdk_ws.MsgData) { func (rpc *rpcChat) encapsulateMsgData(msg *sdk_ws.MsgData) {
msg.ServerMsgID = GetMsgID(msg.SendID) msg.ServerMsgID = GetMsgID(msg.SendID)
@ -368,6 +371,34 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
log.Debug(pb.OperationID, "send msg cost time ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID) log.Debug(pb.OperationID, "send msg cost time ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID)
return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime)
case constant.SuperGroupChatType:
// callback
callbackResp := callbackBeforeSendSingleMsg(pb)
if callbackResp.ErrCode != 0 {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSuperGroupMsg resp: ", callbackResp)
}
if callbackResp.ActionCode != constant.ActionAllow {
if callbackResp.ErrCode == 0 {
callbackResp.ErrCode = 201
}
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSuperGroupMsg result", "end rpc and return", callbackResp)
return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0)
}
msgToMQSingle.MsgData = pb.MsgData
log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle)
err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.GroupID, constant.OnlineStatus)
if err1 != nil {
log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String())
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
}
// callback
callbackResp = callbackAfterSendSingleMsg(pb)
if callbackResp.ErrCode != 0 {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSuperGroupMsg resp: ", callbackResp)
}
return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime)
default: default:
return returnMsg(&replay, pb, 203, "unknown sessionType", "", 0) return returnMsg(&replay, pb, 203, "unknown sessionType", "", 0)
} }

@ -105,9 +105,9 @@ const (
SysMsgType = 200 SysMsgType = 200
//SessionType //SessionType
SingleChatType = 1 SingleChatType = 1
GroupChatType = 2 GroupChatType = 2
SuperGroupChatType = 3
NotificationChatType = 4 NotificationChatType = 4
//token //token
NormalToken = 0 NormalToken = 0

@ -1032,6 +1032,10 @@ func getSeqUid(uid string, seq uint32) string {
seqSuffix := seq / singleGocMsgNum seqSuffix := seq / singleGocMsgNum
return indexGen(uid, seqSuffix) return indexGen(uid, seqSuffix)
} }
func getSeqSuperGroupID(groupID string, seq uint32) string {
seqSuffix := seq / singleGocMsgNum
return superGroupIndexGen(groupID, seqSuffix)
}
func GetSeqUid(uid string, seq uint32) string { func GetSeqUid(uid string, seq uint32) string {
return getSeqUid(uid, seq) return getSeqUid(uid, seq)
@ -1069,3 +1073,6 @@ func isNotContainInt32(target uint32, List []uint32) bool {
func indexGen(uid string, seqSuffix uint32) string { func indexGen(uid string, seqSuffix uint32) string {
return uid + ":" + strconv.FormatInt(int64(seqSuffix), 10) return uid + ":" + strconv.FormatInt(int64(seqSuffix), 10)
} }
func superGroupIndexGen(groupID string, seqSuffix uint32) string {
return "super_group_" + groupID + ":" + strconv.FormatInt(int64(seqSuffix), 10)
}

@ -4,10 +4,14 @@ source ./proto_dir.cfg
for ((i = 0; i < ${#all_proto[*]}; i++)); do for ((i = 0; i < ${#all_proto[*]}; i++)); do
proto=${all_proto[$i]} proto=${all_proto[$i]}
protoc -I ../../../ -I ./ --go_out=plugins=grpc:. $proto
protoc -I ../../../ -I ./ --go_out=plugins=grpc:. $proto
s=`echo $proto | sed 's/ //g'` s=`echo $proto | sed 's/ //g'`
v=${s//proto/pb.go} v=${s//proto/pb.go}
protoc-go-inject-tag -input=./$v protoc-go-inject-tag -input=./$v
echo "protoc --go_out=plugins=grpc:." $proto echo "protoc --go_out=plugins=grpc:." $proto
done done
echo "proto file generate success..." echo "proto file generate success..."
find ./ -type f -path "*.pb.go"|xargs sed -i 's/\".\/sdk_ws\"/\"Open_IM\/pkg\/proto\/sdk_ws\"/g'

@ -37,7 +37,7 @@ func (m *MsgDataToMQ) Reset() { *m = MsgDataToMQ{} }
func (m *MsgDataToMQ) String() string { return proto.CompactTextString(m) } func (m *MsgDataToMQ) String() string { return proto.CompactTextString(m) }
func (*MsgDataToMQ) ProtoMessage() {} func (*MsgDataToMQ) ProtoMessage() {}
func (*MsgDataToMQ) Descriptor() ([]byte, []int) { func (*MsgDataToMQ) Descriptor() ([]byte, []int) {
return fileDescriptor_chat_83f286704599d5b1, []int{0} return fileDescriptor_chat_732204f30d7bcb33, []int{0}
} }
func (m *MsgDataToMQ) XXX_Unmarshal(b []byte) error { func (m *MsgDataToMQ) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_MsgDataToMQ.Unmarshal(m, b) return xxx_messageInfo_MsgDataToMQ.Unmarshal(m, b)
@ -90,7 +90,7 @@ func (m *MsgDataToDB) Reset() { *m = MsgDataToDB{} }
func (m *MsgDataToDB) String() string { return proto.CompactTextString(m) } func (m *MsgDataToDB) String() string { return proto.CompactTextString(m) }
func (*MsgDataToDB) ProtoMessage() {} func (*MsgDataToDB) ProtoMessage() {}
func (*MsgDataToDB) Descriptor() ([]byte, []int) { func (*MsgDataToDB) Descriptor() ([]byte, []int) {
return fileDescriptor_chat_83f286704599d5b1, []int{1} return fileDescriptor_chat_732204f30d7bcb33, []int{1}
} }
func (m *MsgDataToDB) XXX_Unmarshal(b []byte) error { func (m *MsgDataToDB) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_MsgDataToDB.Unmarshal(m, b) return xxx_messageInfo_MsgDataToDB.Unmarshal(m, b)
@ -137,7 +137,7 @@ func (m *PushMsgDataToMQ) Reset() { *m = PushMsgDataToMQ{} }
func (m *PushMsgDataToMQ) String() string { return proto.CompactTextString(m) } func (m *PushMsgDataToMQ) String() string { return proto.CompactTextString(m) }
func (*PushMsgDataToMQ) ProtoMessage() {} func (*PushMsgDataToMQ) ProtoMessage() {}
func (*PushMsgDataToMQ) Descriptor() ([]byte, []int) { func (*PushMsgDataToMQ) Descriptor() ([]byte, []int) {
return fileDescriptor_chat_83f286704599d5b1, []int{2} return fileDescriptor_chat_732204f30d7bcb33, []int{2}
} }
func (m *PushMsgDataToMQ) XXX_Unmarshal(b []byte) error { func (m *PushMsgDataToMQ) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_PushMsgDataToMQ.Unmarshal(m, b) return xxx_messageInfo_PushMsgDataToMQ.Unmarshal(m, b)
@ -210,7 +210,7 @@ func (m *GetMaxAndMinSeqReq) Reset() { *m = GetMaxAndMinSeqReq{} }
func (m *GetMaxAndMinSeqReq) String() string { return proto.CompactTextString(m) } func (m *GetMaxAndMinSeqReq) String() string { return proto.CompactTextString(m) }
func (*GetMaxAndMinSeqReq) ProtoMessage() {} func (*GetMaxAndMinSeqReq) ProtoMessage() {}
func (*GetMaxAndMinSeqReq) Descriptor() ([]byte, []int) { func (*GetMaxAndMinSeqReq) Descriptor() ([]byte, []int) {
return fileDescriptor_chat_83f286704599d5b1, []int{3} return fileDescriptor_chat_732204f30d7bcb33, []int{3}
} }
func (m *GetMaxAndMinSeqReq) XXX_Unmarshal(b []byte) error { func (m *GetMaxAndMinSeqReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_GetMaxAndMinSeqReq.Unmarshal(m, b) return xxx_messageInfo_GetMaxAndMinSeqReq.Unmarshal(m, b)
@ -258,7 +258,7 @@ func (m *GetMaxAndMinSeqResp) Reset() { *m = GetMaxAndMinSeqResp{} }
func (m *GetMaxAndMinSeqResp) String() string { return proto.CompactTextString(m) } func (m *GetMaxAndMinSeqResp) String() string { return proto.CompactTextString(m) }
func (*GetMaxAndMinSeqResp) ProtoMessage() {} func (*GetMaxAndMinSeqResp) ProtoMessage() {}
func (*GetMaxAndMinSeqResp) Descriptor() ([]byte, []int) { func (*GetMaxAndMinSeqResp) Descriptor() ([]byte, []int) {
return fileDescriptor_chat_83f286704599d5b1, []int{4} return fileDescriptor_chat_732204f30d7bcb33, []int{4}
} }
func (m *GetMaxAndMinSeqResp) XXX_Unmarshal(b []byte) error { func (m *GetMaxAndMinSeqResp) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_GetMaxAndMinSeqResp.Unmarshal(m, b) return xxx_messageInfo_GetMaxAndMinSeqResp.Unmarshal(m, b)
@ -319,7 +319,7 @@ func (m *SendMsgReq) Reset() { *m = SendMsgReq{} }
func (m *SendMsgReq) String() string { return proto.CompactTextString(m) } func (m *SendMsgReq) String() string { return proto.CompactTextString(m) }
func (*SendMsgReq) ProtoMessage() {} func (*SendMsgReq) ProtoMessage() {}
func (*SendMsgReq) Descriptor() ([]byte, []int) { func (*SendMsgReq) Descriptor() ([]byte, []int) {
return fileDescriptor_chat_83f286704599d5b1, []int{5} return fileDescriptor_chat_732204f30d7bcb33, []int{5}
} }
func (m *SendMsgReq) XXX_Unmarshal(b []byte) error { func (m *SendMsgReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SendMsgReq.Unmarshal(m, b) return xxx_messageInfo_SendMsgReq.Unmarshal(m, b)
@ -375,7 +375,7 @@ func (m *SendMsgResp) Reset() { *m = SendMsgResp{} }
func (m *SendMsgResp) String() string { return proto.CompactTextString(m) } func (m *SendMsgResp) String() string { return proto.CompactTextString(m) }
func (*SendMsgResp) ProtoMessage() {} func (*SendMsgResp) ProtoMessage() {}
func (*SendMsgResp) Descriptor() ([]byte, []int) { func (*SendMsgResp) Descriptor() ([]byte, []int) {
return fileDescriptor_chat_83f286704599d5b1, []int{6} return fileDescriptor_chat_732204f30d7bcb33, []int{6}
} }
func (m *SendMsgResp) XXX_Unmarshal(b []byte) error { func (m *SendMsgResp) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SendMsgResp.Unmarshal(m, b) return xxx_messageInfo_SendMsgResp.Unmarshal(m, b)
@ -451,7 +451,7 @@ const _ = grpc.SupportPackageIsVersion4
// Client API for Chat service // Client API for Chat service
type ChatClient interface { type ChatClient interface {
GetMaxAndMinSeq(ctx context.Context, in *GetMaxAndMinSeqReq, opts ...grpc.CallOption) (*GetMaxAndMinSeqResp, error) GetMaxAndMinSeq(ctx context.Context, in *sdk_ws.GetMaxAndMinSeqReq, opts ...grpc.CallOption) (*sdk_ws.GetMaxAndMinSeqResp, error)
PullMessageBySeqList(ctx context.Context, in *sdk_ws.PullMessageBySeqListReq, opts ...grpc.CallOption) (*sdk_ws.PullMessageBySeqListResp, error) PullMessageBySeqList(ctx context.Context, in *sdk_ws.PullMessageBySeqListReq, opts ...grpc.CallOption) (*sdk_ws.PullMessageBySeqListResp, error)
SendMsg(ctx context.Context, in *SendMsgReq, opts ...grpc.CallOption) (*SendMsgResp, error) SendMsg(ctx context.Context, in *SendMsgReq, opts ...grpc.CallOption) (*SendMsgResp, error)
DelMsgList(ctx context.Context, in *sdk_ws.DelMsgListReq, opts ...grpc.CallOption) (*sdk_ws.DelMsgListResp, error) DelMsgList(ctx context.Context, in *sdk_ws.DelMsgListReq, opts ...grpc.CallOption) (*sdk_ws.DelMsgListResp, error)
@ -465,8 +465,8 @@ func NewChatClient(cc *grpc.ClientConn) ChatClient {
return &chatClient{cc} return &chatClient{cc}
} }
func (c *chatClient) GetMaxAndMinSeq(ctx context.Context, in *GetMaxAndMinSeqReq, opts ...grpc.CallOption) (*GetMaxAndMinSeqResp, error) { func (c *chatClient) GetMaxAndMinSeq(ctx context.Context, in *sdk_ws.GetMaxAndMinSeqReq, opts ...grpc.CallOption) (*sdk_ws.GetMaxAndMinSeqResp, error) {
out := new(GetMaxAndMinSeqResp) out := new(sdk_ws.GetMaxAndMinSeqResp)
err := grpc.Invoke(ctx, "/pbChat.Chat/GetMaxAndMinSeq", in, out, c.cc, opts...) err := grpc.Invoke(ctx, "/pbChat.Chat/GetMaxAndMinSeq", in, out, c.cc, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
@ -504,7 +504,7 @@ func (c *chatClient) DelMsgList(ctx context.Context, in *sdk_ws.DelMsgListReq, o
// Server API for Chat service // Server API for Chat service
type ChatServer interface { type ChatServer interface {
GetMaxAndMinSeq(context.Context, *GetMaxAndMinSeqReq) (*GetMaxAndMinSeqResp, error) GetMaxAndMinSeq(context.Context, *sdk_ws.GetMaxAndMinSeqReq) (*sdk_ws.GetMaxAndMinSeqResp, error)
PullMessageBySeqList(context.Context, *sdk_ws.PullMessageBySeqListReq) (*sdk_ws.PullMessageBySeqListResp, error) PullMessageBySeqList(context.Context, *sdk_ws.PullMessageBySeqListReq) (*sdk_ws.PullMessageBySeqListResp, error)
SendMsg(context.Context, *SendMsgReq) (*SendMsgResp, error) SendMsg(context.Context, *SendMsgReq) (*SendMsgResp, error)
DelMsgList(context.Context, *sdk_ws.DelMsgListReq) (*sdk_ws.DelMsgListResp, error) DelMsgList(context.Context, *sdk_ws.DelMsgListReq) (*sdk_ws.DelMsgListResp, error)
@ -515,7 +515,7 @@ func RegisterChatServer(s *grpc.Server, srv ChatServer) {
} }
func _Chat_GetMaxAndMinSeq_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { func _Chat_GetMaxAndMinSeq_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetMaxAndMinSeqReq) in := new(sdk_ws.GetMaxAndMinSeqReq)
if err := dec(in); err != nil { if err := dec(in); err != nil {
return nil, err return nil, err
} }
@ -527,7 +527,7 @@ func _Chat_GetMaxAndMinSeq_Handler(srv interface{}, ctx context.Context, dec fun
FullMethod: "/pbChat.Chat/GetMaxAndMinSeq", FullMethod: "/pbChat.Chat/GetMaxAndMinSeq",
} }
handler := func(ctx context.Context, req interface{}) (interface{}, error) { handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ChatServer).GetMaxAndMinSeq(ctx, req.(*GetMaxAndMinSeqReq)) return srv.(ChatServer).GetMaxAndMinSeq(ctx, req.(*sdk_ws.GetMaxAndMinSeqReq))
} }
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
@ -611,40 +611,40 @@ var _Chat_serviceDesc = grpc.ServiceDesc{
Metadata: "chat/chat.proto", Metadata: "chat/chat.proto",
} }
func init() { proto.RegisterFile("chat/chat.proto", fileDescriptor_chat_83f286704599d5b1) } func init() { proto.RegisterFile("chat/chat.proto", fileDescriptor_chat_732204f30d7bcb33) }
var fileDescriptor_chat_83f286704599d5b1 = []byte{ var fileDescriptor_chat_732204f30d7bcb33 = []byte{
// 507 bytes of a gzipped FileDescriptorProto // 508 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x54, 0xcd, 0x6e, 0xda, 0x40, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x54, 0xdd, 0x6e, 0xda, 0x30,
0x10, 0x96, 0x49, 0x80, 0x32, 0x34, 0x42, 0xda, 0x44, 0x95, 0xe5, 0x5e, 0x1c, 0x9f, 0x50, 0x2b, 0x14, 0x56, 0x68, 0x0b, 0xe3, 0xb0, 0x0a, 0xc9, 0xad, 0xa6, 0x88, 0xab, 0x34, 0xd2, 0x26, 0xb4,
0x19, 0x89, 0xf6, 0xd6, 0x53, 0x89, 0xa3, 0x8a, 0xaa, 0xdb, 0x24, 0x86, 0x5e, 0x7a, 0x41, 0x9b, 0x49, 0x89, 0xc4, 0x76, 0xb7, 0xab, 0xd1, 0x54, 0x13, 0xd2, 0xbc, 0xb6, 0x81, 0xdd, 0xec, 0x86,
0x30, 0x32, 0x16, 0x60, 0x2f, 0x3b, 0xa6, 0xa4, 0xed, 0x33, 0xf4, 0x19, 0xfa, 0x3e, 0x7d, 0xaa, 0xb9, 0xcd, 0x51, 0x88, 0x80, 0xc4, 0xf8, 0x84, 0xd1, 0x6d, 0xcf, 0xb0, 0x67, 0xd8, 0xab, 0xed,
0xca, 0xbb, 0x26, 0x98, 0x40, 0x15, 0x4e, 0xbd, 0x58, 0x9a, 0x6f, 0x3e, 0x7f, 0x3f, 0xeb, 0x1f, 0x51, 0xa6, 0xd8, 0x69, 0x09, 0x05, 0xa9, 0x5c, 0xed, 0x06, 0xe9, 0x7c, 0xfe, 0xfc, 0xfd, 0x18,
0x68, 0xdd, 0x4d, 0x44, 0xd6, 0xc9, 0x2f, 0xbe, 0x54, 0x69, 0x96, 0xb2, 0x9a, 0xbc, 0xbd, 0x98, 0xc7, 0xd0, 0xbe, 0x9d, 0x88, 0xdc, 0x2f, 0x7e, 0x3c, 0xa9, 0xb2, 0x3c, 0x63, 0x75, 0x79, 0x73,
0x88, 0xcc, 0x39, 0xbf, 0x92, 0x98, 0x8c, 0xfa, 0xbc, 0x23, 0xa7, 0x51, 0x47, 0xaf, 0x3a, 0x34, 0x3e, 0x11, 0x79, 0xe7, 0xec, 0x52, 0x62, 0x3a, 0x1e, 0x70, 0x5f, 0x4e, 0x63, 0x5f, 0x2f, 0xf9,
0x9e, 0x8e, 0x56, 0xd4, 0x59, 0x91, 0xa1, 0x7a, 0x3f, 0xa1, 0xc9, 0x29, 0x0a, 0x44, 0x26, 0x86, 0x14, 0x4d, 0xc7, 0x2b, 0xf2, 0x57, 0x64, 0xa8, 0xee, 0x2f, 0x68, 0x71, 0x8a, 0x03, 0x91, 0x8b,
0x29, 0xbf, 0x61, 0x67, 0x50, 0xcd, 0xd2, 0x29, 0x26, 0xb6, 0xe5, 0x5a, 0xed, 0x46, 0x68, 0x06, 0x51, 0xc6, 0xaf, 0xd9, 0x29, 0x1c, 0xe5, 0xd9, 0x14, 0x53, 0xdb, 0x72, 0xac, 0x6e, 0x33, 0x34,
0xe6, 0x42, 0x33, 0x95, 0xa8, 0x44, 0x16, 0xa7, 0x49, 0x3f, 0xb0, 0x2b, 0x7a, 0x57, 0x86, 0xd8, 0x03, 0x73, 0xa0, 0x95, 0x49, 0x54, 0x22, 0x4f, 0xb2, 0x74, 0x10, 0xd8, 0x35, 0xbd, 0x56, 0x85,
0x5b, 0xa8, 0xcf, 0x8d, 0x8c, 0x7d, 0xe4, 0x5a, 0xed, 0x66, 0xd7, 0xf1, 0x09, 0xd5, 0x37, 0x54, 0xd8, 0x3b, 0x68, 0xcc, 0x8d, 0x8c, 0x7d, 0xe0, 0x58, 0xdd, 0x56, 0xaf, 0xe3, 0x11, 0xaa, 0xef,
0x23, 0x21, 0xe3, 0x91, 0x14, 0x4a, 0xcc, 0xc9, 0x2f, 0x8c, 0xc2, 0x35, 0xd5, 0xc3, 0x92, 0x79, 0xa8, 0xc6, 0x42, 0x26, 0x63, 0x29, 0x94, 0x98, 0x93, 0x57, 0x1a, 0x85, 0xf7, 0x54, 0x17, 0x2b,
0xd0, 0x2b, 0x8b, 0x58, 0x07, 0x8b, 0x3c, 0x1d, 0xce, 0xfb, 0x65, 0x41, 0xeb, 0x7a, 0x49, 0x93, 0xe6, 0x41, 0xbf, 0x2a, 0x62, 0xed, 0x2d, 0xf2, 0x74, 0x38, 0xf7, 0xb7, 0x05, 0xed, 0xab, 0x25,
0x72, 0x51, 0x17, 0x9a, 0x57, 0xa5, 0xbb, 0x4c, 0xdd, 0x32, 0x54, 0x4e, 0x53, 0x39, 0x3c, 0x8d, 0x4d, 0xaa, 0x45, 0x1d, 0x68, 0x5d, 0x56, 0x76, 0x99, 0xba, 0x55, 0xa8, 0x9a, 0xa6, 0xb6, 0x7f,
0x07, 0xcf, 0xe5, 0x92, 0x26, 0xc3, 0xf4, 0x0b, 0xa1, 0xea, 0x07, 0xfa, 0x34, 0x1a, 0xe1, 0x16, 0x1a, 0x17, 0x9e, 0xcb, 0x25, 0x4d, 0x46, 0xd9, 0x17, 0x42, 0x35, 0x08, 0xf4, 0x69, 0x34, 0xc3,
0xe6, 0x7d, 0x06, 0xf6, 0x01, 0x33, 0x2e, 0xee, 0xdf, 0x27, 0x63, 0x1e, 0x27, 0x03, 0x5c, 0x84, 0x0d, 0xcc, 0xfd, 0x0c, 0xec, 0x23, 0xe6, 0x5c, 0xdc, 0x7d, 0x48, 0x23, 0x9e, 0xa4, 0x43, 0x5c,
0xb8, 0x60, 0x2f, 0xa0, 0x56, 0xdc, 0x63, 0xc2, 0x14, 0xd3, 0xe3, 0xa4, 0x95, 0x9d, 0xa4, 0xde, 0x84, 0xb8, 0x60, 0x2f, 0xa0, 0x5e, 0xee, 0x31, 0x61, 0xca, 0xe9, 0x71, 0xd2, 0xda, 0x56, 0x52,
0x0a, 0x4e, 0x77, 0xf4, 0x48, 0x32, 0x1b, 0xea, 0x97, 0x4a, 0x5d, 0xa4, 0x63, 0xd4, 0x8a, 0xd5, 0x77, 0x05, 0x27, 0x5b, 0x7a, 0x24, 0x99, 0x0d, 0x8d, 0x0b, 0xa5, 0xce, 0xb3, 0x08, 0xb5, 0xe2,
0x70, 0x3d, 0xe6, 0x56, 0x97, 0x4a, 0x71, 0x8a, 0x0a, 0xb5, 0x62, 0xca, 0x71, 0x2e, 0xee, 0x07, 0x51, 0x78, 0x3f, 0x16, 0x56, 0x17, 0x4a, 0x71, 0x8a, 0x4b, 0xb5, 0x72, 0x2a, 0x70, 0x2e, 0xee,
0xb8, 0xd0, 0xb1, 0x4f, 0xc2, 0x62, 0xd2, 0xb8, 0xd6, 0xb5, 0x8f, 0x0b, 0x5c, 0x4f, 0xde, 0x0f, 0x86, 0xb8, 0xd0, 0xb1, 0x8f, 0xc3, 0x72, 0xd2, 0xb8, 0xd6, 0xb5, 0x0f, 0x4b, 0x5c, 0x4f, 0xee,
0x80, 0x01, 0x26, 0x63, 0x4e, 0x51, 0x5e, 0xe0, 0xff, 0xbe, 0x3b, 0xbf, 0x2d, 0x68, 0x3e, 0x98, 0x4f, 0x80, 0x21, 0xa6, 0x11, 0xa7, 0xb8, 0x28, 0xf0, 0x7f, 0xef, 0xce, 0x1f, 0x0b, 0x5a, 0x0f,
0x9b, 0xb6, 0xb8, 0xdd, 0x16, 0x37, 0x6d, 0x71, 0xab, 0xad, 0x99, 0xf2, 0x64, 0xc6, 0x87, 0x53, 0xe6, 0xa6, 0x2d, 0x6e, 0xb6, 0xc5, 0x75, 0x5b, 0xdc, 0x68, 0x6b, 0xa6, 0x22, 0x99, 0xf1, 0xe1,
0xd4, 0x0f, 0x74, 0xb5, 0x46, 0x58, 0x86, 0x72, 0xc6, 0xdd, 0x2c, 0xc6, 0x24, 0x33, 0x8c, 0xaa, 0x14, 0x0f, 0x02, 0x5d, 0xad, 0x19, 0x56, 0xa1, 0x82, 0x71, 0x3b, 0x4b, 0x30, 0xcd, 0x0d, 0xe3,
0x61, 0x94, 0x20, 0xe6, 0xc0, 0x33, 0xc2, 0x64, 0x3c, 0x8c, 0xe7, 0x68, 0xd7, 0x5c, 0xab, 0x7d, 0xc8, 0x30, 0x2a, 0x10, 0xeb, 0xc0, 0x33, 0xc2, 0x34, 0x1a, 0x25, 0x73, 0xb4, 0xeb, 0x8e, 0xd5,
0x14, 0x3e, 0xcc, 0xdd, 0x3f, 0x15, 0x38, 0xce, 0x3f, 0x43, 0xf6, 0x11, 0x5a, 0x8f, 0x9e, 0x0f, 0x3d, 0x08, 0x1f, 0xe6, 0xde, 0xdf, 0x1a, 0x1c, 0x16, 0x9f, 0x21, 0xfb, 0x06, 0xed, 0x47, 0xff,
0x73, 0x7c, 0xf3, 0x89, 0xfa, 0xbb, 0x2f, 0x82, 0xf3, 0xf2, 0x9f, 0x3b, 0x92, 0x2c, 0x85, 0xb3, 0x0f, 0x7b, 0xb9, 0xa3, 0xe2, 0xf6, 0x9d, 0xe8, 0xbc, 0xda, 0x87, 0x46, 0x92, 0x65, 0x70, 0x7a,
0xeb, 0xe5, 0x6c, 0xc6, 0x91, 0x48, 0x44, 0xd8, 0xfb, 0x3e, 0xc0, 0xc5, 0xa7, 0x98, 0x32, 0xf6, 0xb5, 0x9c, 0xcd, 0x38, 0x12, 0x89, 0x18, 0xfb, 0x3f, 0x86, 0xb8, 0xf8, 0x94, 0x50, 0xce, 0x5e,
0x6a, 0xcf, 0x99, 0xed, 0x23, 0xe6, 0x06, 0xaf, 0x0f, 0xe6, 0x92, 0x64, 0x5d, 0xa8, 0x17, 0xc7, 0xef, 0xd8, 0xbf, 0x8b, 0x58, 0x78, 0xbd, 0xd9, 0x9b, 0x4b, 0x92, 0xf5, 0xa0, 0x51, 0x1e, 0x3e,
0xcc, 0xd8, 0x3a, 0xd8, 0xe6, 0xa1, 0x3b, 0xa7, 0x3b, 0x18, 0x49, 0x76, 0x03, 0x10, 0xe0, 0x8c, 0x63, 0x9e, 0x79, 0x6d, 0xbc, 0xf5, 0x55, 0xe8, 0x9c, 0x6c, 0x61, 0x24, 0xd9, 0x35, 0x40, 0x80,
0x53, 0xa4, 0xa3, 0xb9, 0x7b, 0xec, 0x36, 0xeb, 0x5c, 0xe4, 0xfc, 0x09, 0x06, 0xc9, 0x5e, 0xeb, 0x33, 0x4e, 0xb1, 0x8e, 0xe6, 0xec, 0xb0, 0x5b, 0x2f, 0x17, 0x22, 0x67, 0x4f, 0x30, 0x48, 0xf6,
0xeb, 0x89, 0xaf, 0x7f, 0x71, 0xef, 0x8c, 0xdf, 0x6d, 0x4d, 0xff, 0xbf, 0xde, 0xfc, 0x0d, 0x00, 0xdb, 0x5f, 0x8f, 0x3d, 0xfd, 0xf0, 0xbd, 0x37, 0x7e, 0x37, 0x75, 0xfd, 0xaa, 0xbd, 0xfd, 0x17,
0x00, 0xff, 0xff, 0x6f, 0x9d, 0x6f, 0xa0, 0xfd, 0x04, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x9f, 0xc6, 0xef, 0x8e, 0x13, 0x05, 0x00, 0x00,
} }

@ -75,7 +75,7 @@ message SendMsgResp {
service Chat { service Chat {
rpc GetMaxAndMinSeq(GetMaxAndMinSeqReq) returns(GetMaxAndMinSeqResp); rpc GetMaxAndMinSeq(server_api_params.GetMaxAndMinSeqReq) returns(server_api_params.GetMaxAndMinSeqResp);
rpc PullMessageBySeqList(server_api_params.PullMessageBySeqListReq) returns(server_api_params.PullMessageBySeqListResp); rpc PullMessageBySeqList(server_api_params.PullMessageBySeqListReq) returns(server_api_params.PullMessageBySeqListResp);
rpc SendMsg(SendMsgReq) returns(SendMsgResp); rpc SendMsg(SendMsgReq) returns(SendMsgResp);
rpc DelMsgList(server_api_params.DelMsgListReq) returns(server_api_params.DelMsgListResp); rpc DelMsgList(server_api_params.DelMsgListReq) returns(server_api_params.DelMsgListResp);

File diff suppressed because it is too large Load Diff

@ -170,30 +170,54 @@ message UserInDepartment {
///////////////////////////////////base end///////////////////////////////////// ///////////////////////////////////base end/////////////////////////////////////
message PullMessageBySeqListReq{
string userID = 1;
string operationID = 2;
repeated uint32 seqList = 3;
map <string, seqList>groupSeqList = 4;
}
message seqList {
repeated uint32 seqList = 1;
}
message MsgDataList {
repeated MsgData msgDataList = 1;
}
message PullMessageBySeqListResp { message PullMessageBySeqListResp {
int32 errCode = 1; int32 errCode = 1;
string errMsg = 2; string errMsg = 2;
repeated MsgData list = 3; repeated MsgData list = 3;
map<string, MsgDataList> groupMsgDataList = 4;
} }
message PullMessageBySeqListReq{
string userID = 1;
string operationID = 2;
repeated uint32 seqList = 3;
}
message GetMaxAndMinSeqReq { message GetMaxAndMinSeqReq {
repeated string groupIDList = 1;
string userID = 2;
string operationID =3;
}
message MaxAndMinSeq{
uint32 maxSeq = 1;
uint32 minSeq = 2;
} }
message GetMaxAndMinSeqResp { message GetMaxAndMinSeqResp {
uint32 maxSeq = 1; uint32 maxSeq = 1;
uint32 minSeq = 2; uint32 minSeq = 2;
int32 errCode = 3;
string errMsg = 4;
map<string, MaxAndMinSeq> groupMaxAndMinSeq = 5;
} }
message UserSendMsgResp { message UserSendMsgResp {
string serverMsgID = 1; string serverMsgID = 1;
string clientMsgID = 2; string clientMsgID = 2;
int64 sendTime = 3; int64 sendTime = 3;
} }
message MsgData { message MsgData {
string sendID = 1; string sendID = 1;
string recvID = 2; string recvID = 2;

Loading…
Cancel
Save