Merge remote-tracking branch 'origin/superGroup' into superGroup

# Conflicts:
#	pkg/proto/sdk_ws/ws.pb.go
pull/236/head
Gordon 2 years ago
commit ccc69ac2d0

@ -235,8 +235,6 @@ groupMessageHasReadReceiptEnable: false
#单聊已读开启
singleMessageHasReadReceiptEnable: false
#token config
tokenpolicy:
accessSecret: "open_im_server" #token生成相关默认即可
@ -487,30 +485,6 @@ notification:
defaultTips:
tips: "group member info set"
groupMemberSetToOrdinaryUser:
conversation:
reliabilityLevel: 3
unreadCount: false
offlinePush:
switch: false
title: "groupMemberSetToOrdinaryUser title"
desc: "groupMemberSetToOrdinaryUser desc"
ext: "groupMemberSetToOrdinaryUser ext"
defaultTips:
tips: "was set to ordinaryUser"
groupMemberSetToAdmin:
conversation:
reliabilityLevel: 3
unreadCount: false
offlinePush:
switch: false
title: "groupMemberSetToAdmin title"
desc: "groupMemberSetToAdmin desc"
ext: "groupMemberSetToAdmin ext"
defaultTips:
tips: "was set to admin"
#############################organization#################################
organizationChanged:
conversation:
@ -727,4 +701,4 @@ demo:
imAPIURL: http://127.0.0.1:10002
rtc:
signalTimeout: 300
signalTimeout: 35

@ -21,9 +21,7 @@ func GetRTCInvitationInfo(c *gin.Context) {
return
}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req)
var ok bool
var errInfo string
ok, _, errInfo = token_verify.GetUserIDFromToken(c.Request.Header.Get("token"), req.OperationID)
ok, userID, errInfo := token_verify.GetUserIDFromToken(c.Request.Header.Get("token"), req.OperationID)
if !ok {
errMsg := req.OperationID + " " + "GetUserIDFromToken failed " + errInfo + " token:" + c.Request.Header.Get("token")
log.NewError(req.OperationID, errMsg)
@ -37,6 +35,10 @@ func GetRTCInvitationInfo(c *gin.Context) {
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": err.Error()})
return
}
if err := db.DB.DelUserSignalList(userID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "DelUserSignalList result:", err.Error())
}
resp.Data.OpUserID = invitationInfo.OpUserID
resp.Data.Invitation.RoomID = invitationInfo.Invitation.RoomID
resp.Data.Invitation.SessionType = invitationInfo.Invitation.SessionType
@ -45,6 +47,9 @@ func GetRTCInvitationInfo(c *gin.Context) {
resp.Data.Invitation.InviteeUserIDList = invitationInfo.Invitation.InviteeUserIDList
resp.Data.Invitation.MediaType = invitationInfo.Invitation.MediaType
resp.Data.Invitation.Timeout = invitationInfo.Invitation.Timeout
resp.Data.Invitation.InitiateTime = invitationInfo.Invitation.InitiateTime
resp.Data.Invitation.PlatformID = invitationInfo.Invitation.PlatformID
resp.Data.Invitation.CustomData = invitationInfo.Invitation.CustomData
c.JSON(http.StatusOK, resp)
}
@ -83,6 +88,9 @@ func GetRTCInvitationInfoStartApp(c *gin.Context) {
resp.Data.Invitation.InviteeUserIDList = invitationInfo.Invitation.InviteeUserIDList
resp.Data.Invitation.MediaType = invitationInfo.Invitation.MediaType
resp.Data.Invitation.Timeout = invitationInfo.Invitation.Timeout
resp.Data.Invitation.InitiateTime = invitationInfo.Invitation.InitiateTime
resp.Data.Invitation.PlatformID = invitationInfo.Invitation.PlatformID
resp.Data.Invitation.CustomData = invitationInfo.Invitation.CustomData
c.JSON(http.StatusOK, resp)
}

@ -91,9 +91,9 @@ func (ws *WServer) getSeqReq(conn *UserConn, m *Req) {
ws.getSeqResp(conn, m, nReply)
}
}
func (ws *WServer) getSeqResp(conn *UserConn, m *Req, pb *sdk_ws.GetMaxAndMinSeqResp) {
func (ws *WServer) getSeqResp(conn *UserConn, m *Req, pb *pbChat.GetMaxAndMinSeqResp) {
log.Debug(m.OperationID, "getSeqResp come here ", pb.String())
b, _ := proto.Marshal(pb)
mReply := Resp{
@ -108,7 +108,7 @@ func (ws *WServer) getSeqResp(conn *UserConn, m *Req, pb *sdk_ws.GetMaxAndMinSeq
}
func (ws *WServer) pullMsgBySeqListReq(conn *UserConn, m *Req) {
log.NewInfo(m.OperationID, "Ws call success to pullMsgBySeqListReq start", m.SendID, m.ReqIdentifier, m.MsgIncr)
log.NewInfo(m.OperationID, "Ws call success to pullMsgBySeqListReq start", m.SendID, m.ReqIdentifier, m.MsgIncr, string(m.Data))
nReply := new(sdk_ws.PullMessageBySeqListResp)
isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSPullMsgBySeqList)
if isPass {
@ -117,7 +117,7 @@ func (ws *WServer) pullMsgBySeqListReq(conn *UserConn, m *Req) {
rpcReq.UserID = m.SendID
rpcReq.OperationID = m.OperationID
rpcReq.GroupSeqList = data.(sdk_ws.PullMessageBySeqListReq).GroupSeqList
log.NewInfo(m.OperationID, "Ws call success to pullMsgBySeqListReq middle", m.SendID, m.ReqIdentifier, m.MsgIncr, data.(sdk_ws.PullMessageBySeqListReq).SeqList, data.(sdk_ws.PullMessageBySeqListReq).GroupSeqList)
log.NewInfo(m.OperationID, "Ws call success to pullMsgBySeqListReq middle", m.SendID, m.ReqIdentifier, m.MsgIncr, data.(sdk_ws.PullMessageBySeqListReq).SeqList)
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
msgClient := pbChat.NewChatClient(grpcConn)
reply, err := msgClient.PullMessageBySeqList(context.Background(), &rpcReq)
@ -157,7 +157,6 @@ func (ws *WServer) sendMsgReq(conn *UserConn, m *Req) {
sendMsgAllCountLock.Lock()
sendMsgAllCount++
sendMsgAllCountLock.Unlock()
//stat.GaugeVecApiMethod.WithLabelValues("ws_send_message_count").Inc()
log.NewInfo(m.OperationID, "Ws call success to sendMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID, m.Data)
nReply := new(pbChat.SendMsgResp)
@ -248,14 +247,8 @@ func (ws *WServer) sendSignalMsgReq(conn *UserConn, m *Req) {
nReply.ErrMsg = err.Error()
ws.sendSignalMsgResp(conn, 200, err.Error(), m, &signalResp)
} else {
log.NewInfo(pbData.OperationID, "rpc call success to sendMsgReq", reply.String())
// save invitation info for offline push
if err := db.DB.NewCacheSignalInfo(pbData.MsgData); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), m, &signalResp)
ws.sendSignalMsgResp(conn, 200, err.Error(), m, &signalResp)
} else {
ws.sendSignalMsgResp(conn, 0, "", m, &signalResp)
}
log.NewInfo(pbData.OperationID, "rpc call success to sendMsgReq", reply.String(), signalResp.String(), m)
ws.sendSignalMsgResp(conn, 0, "", m, &signalResp)
}
} else {
log.NewError(m.OperationID, utils.GetSelfFuncName(), respPb.IsPass, respPb.CommonResp.ErrCode, respPb.CommonResp.ErrMsg)

@ -89,11 +89,25 @@ func (ws *WServer) readMsg(conn *UserConn) {
}
}
func (ws *WServer) SetWriteTimeout(conn *UserConn, timeout int) {
conn.w.Lock()
defer conn.w.Unlock()
conn.SetWriteDeadline(time.Now().Add(time.Duration(timeout) * time.Second))
}
func (ws *WServer) writeMsg(conn *UserConn, a int, msg []byte) error {
conn.w.Lock()
defer conn.w.Unlock()
conn.SetWriteDeadline(time.Now().Add(time.Duration(60) * time.Second))
return conn.WriteMessage(a, msg)
}
func (ws *WServer) SetWriteTimeoutWriteMsg(conn *UserConn, a int, msg []byte, timeout int) error {
conn.w.Lock()
defer conn.w.Unlock()
conn.SetWriteDeadline(time.Now().Add(time.Duration(timeout) * time.Second))
return conn.WriteMessage(a, msg)
}
func (ws *WServer) MultiTerminalLoginChecker(uid string, platformID int, newConn *UserConn, token string, operationID string) {
switch config.Config.MultiLoginPolicy {
@ -104,7 +118,7 @@ func (ws *WServer) MultiTerminalLoginChecker(uid string, platformID int, newConn
ws.sendKickMsg(oldConn, newConn)
m, err := db.DB.GetTokenMapByUidPid(uid, constant.PlatformIDToName(platformID))
if err != nil && err != redis.ErrNil {
log.NewError(operationID, "get token from redis err", err.Error())
log.NewError(operationID, "get token from redis err", err.Error(), uid)
return
}
if m == nil {
@ -160,7 +174,7 @@ func (ws *WServer) sendKickMsg(oldConn, newConn *UserConn) {
}
err = ws.writeMsg(oldConn, websocket.BinaryMessage, b.Bytes())
if err != nil {
log.NewError(mReply.OperationID, mReply.ReqIdentifier, mReply.ErrCode, mReply.ErrMsg, "WS WriteMsg error", oldConn.RemoteAddr().String(), newConn.RemoteAddr().String(), err.Error())
log.NewError(mReply.OperationID, mReply.ReqIdentifier, mReply.ErrCode, mReply.ErrMsg, "sendKickMsg WS WriteMsg error", oldConn.RemoteAddr().String(), newConn.RemoteAddr().String(), err.Error())
}
}
func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token string) {

@ -460,7 +460,9 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS
//och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: []*pbMsg.MsgDataToMQ{&msgFromMQ}, triggerID: msgFromMQ.OperationID}}
//sess.MarkMessage(msg, "")
rwLock.Lock()
cMsg = append(cMsg, msg)
if len(msg.Value) != 0 {
cMsg = append(cMsg, msg)
}
rwLock.Unlock()
sess.MarkMessage(msg, "")
//och.TriggerCmd(OnlineTopicBusy)
@ -539,7 +541,7 @@ func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) {
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImPushName)
if grpcConn == nil {
log.Error(rpcPushMsg.OperationID, "rpc dial failed", "push data", rpcPushMsg.String())
pid, offset, err := producer.SendMessage(&mqPushMsg)
pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID)
if err != nil {
log.Error(mqPushMsg.OperationID, "kafka send failed", "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error())
}
@ -549,7 +551,7 @@ func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) {
_, err := msgClient.PushMsg(context.Background(), &rpcPushMsg)
if err != nil {
log.Error(rpcPushMsg.OperationID, "rpc send failed", rpcPushMsg.OperationID, "push data", rpcPushMsg.String(), "err", err.Error())
pid, offset, err := producer.SendMessage(&mqPushMsg)
pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID)
if err != nil {
log.Error(message.OperationID, "kafka send failed", mqPushMsg.OperationID, "send data", mqPushMsg.String(), "pid", pid, "offset", offset, "err", err.Error())
}

@ -34,7 +34,7 @@ func (pc *PersistentConsumerHandler) Init() {
func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) {
msg := cMsg.Value
log.NewInfo("msg come here mysql!!!", "", "msg", string(msg))
log.NewInfo("msg come here mysql!!!", "", "msg", string(msg), msgKey)
var tag bool
msgFromMQ := pbMsg.MsgDataToMQ{}
err := proto.Unmarshal(msg, &msgFromMQ)
@ -42,6 +42,7 @@ func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMes
log.NewError(msgFromMQ.OperationID, "msg_transfer Unmarshal msg err", "msg", string(msg), "err", err.Error())
return
}
log.Debug(msgFromMQ.OperationID, "proto.Unmarshal MsgDataToMQ", msgFromMQ.String())
//Control whether to store history messages (mysql)
isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent)
//Only process receiver data
@ -73,8 +74,12 @@ func (PersistentConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
func (pc *PersistentConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
pc.msgHandle[msg.Topic](msg, string(msg.Key), sess)
log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key))
if len(msg.Value) != 0 {
pc.msgHandle[msg.Topic](msg, string(msg.Key), sess)
} else {
log.Error("", "msg get from kafka but is nil", msg.Key)
}
sess.MarkMessage(msg, "")
}
return nil

@ -10,6 +10,7 @@ import (
"Open_IM/internal/push"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbCache "Open_IM/pkg/proto/cache"
@ -62,11 +63,15 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) {
log.NewInfo(pushMsg.OperationID, "push_result", wsResult, "sendData", pushMsg.MsgData)
successCount++
if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID {
// save invitation info for offline push
for _, v := range wsResult {
if v.OnlinePush {
return
}
}
if err := db.DB.HandleSignalInfo(pushMsg.OperationID, pushMsg.MsgData); err != nil {
log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), err.Error(), pushMsg.MsgData)
}
//Use offline push messaging
var UIDList []string
UIDList = append(UIDList, pushMsg.PushToUserID)
@ -246,9 +251,7 @@ func MsgToSuperGroupUser(pushMsg *pbPush.PushMsgReq) {
} else {
log.NewDebug(pushMsg.OperationID, "offline push return result is ", pushResult, pushMsg.MsgData)
}
}
}
func GetOfflinePushOpts(pushMsg *pbPush.PushMsgReq) (opts push.PushOpts, err error) {
@ -267,3 +270,45 @@ func GetOfflinePushOpts(pushMsg *pbPush.PushMsgReq) (opts push.PushOpts, err err
}
return opts, nil
}
//func SendMsgByWS(m *pbChat.WSToMsgSvrChatMsg) {
// m.MsgID = rpcChat.GetMsgID(m.SendID)
// m.ClientMsgID = m.MsgID
// switch m.SessionType {
// case constant.SingleChatType:
// sendMsgToKafka(m, m.SendID, "msgKey--sendID")
// sendMsgToKafka(m, m.RecvID, "msgKey--recvID")
// case constant.GroupChatType:
// etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImGroupName)
// client := pbGroup.NewGroupClient(etcdConn)
// req := &pbGroup.Req{
// GroupID: m.RecvID,
// Token: config.Config.Secret,
// OperationID: m.OperationID,
// }
// reply, err := client.(context.Background(), req)
// if err != nil {
// log.Error(m.Token, m.OperationID, "rpc getGroupInfo failed, err = %s", err.Error())
// return
// }
// if reply.ErrorCode != 0 {
// log.Error(m.Token, m.OperationID, "rpc getGroupInfo failed, err = %s", reply.ErrorMsg)
// return
// }
// groupID := m.RecvID
// for i, v := range reply.MemberList {
// m.RecvID = v.UserId + " " + groupID
// sendMsgToKafka(m, utils.IntToString(i), "msgKey--recvID+\" \"+groupID")
// }
// default:
//
// }
//}
//
//func sendMsgToKafka(m *pbChat.WSToMsgSvrChatMsg, key string, flag string) {
// pid, offset, err := producer.SendMessage(m, key)
// if err != nil {
// log.ErrorByKv("kafka send failed", m.OperationID, "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), flag, key)
// }
//
//}

@ -113,7 +113,6 @@ func userRelationshipVerification(data *pbChat.SendMsgReq) (bool, int32, string)
} else {
return true, 0, ""
}
}
func (rpc *rpcChat) encapsulateMsgData(msg *sdk_ws.MsgData) {
msg.ServerMsgID = GetMsgID(msg.SendID)

@ -75,6 +75,9 @@ type GetRTCInvitationInfoResp struct {
Timeout int32 `json:"timeout"`
MediaType string `json:"mediaType"`
SessionType int32 `json:"sessionType"`
InitiateTime int32 `json:"initiateTime"`
PlatformID int32 `json:"platformID"`
CustomData string `json:"customData"`
} `json:"invitation"`
OfflinePushInfo struct{} `json:"offlinePushInfo"`
} `json:"data"`

@ -230,7 +230,7 @@ var ContentType2PushContent = map[int64]string{
AtText: "[有人@你]",
GroupMsg: "你收到一条群聊消息",
Common: "你收到一条新消息",
SignalMsg: "音視頻通話邀請",
SignalMsg: "音视频通话邀请",
}
const (

@ -95,42 +95,50 @@ func (d *DataBases) CleanUpOneUserAllMsgFromRedis(userID string, operationID str
return nil
}
func (d *DataBases) NewCacheSignalInfo(msg *pbCommon.MsgData) error {
func (d *DataBases) HandleSignalInfo(operationID string, msg *pbCommon.MsgData) error {
req := &pbRtc.SignalReq{}
if err := proto.Unmarshal(msg.Content, req); err != nil {
return err
}
//log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "SignalReq: ", req.String())
var inviteeUserIDList []string
switch invitationInfo := req.Payload.(type) {
var isInviteSignal bool
switch signalInfo := req.Payload.(type) {
case *pbRtc.SignalReq_Invite:
inviteeUserIDList = invitationInfo.Invite.Invitation.InviteeUserIDList
inviteeUserIDList = signalInfo.Invite.Invitation.InviteeUserIDList
isInviteSignal = true
case *pbRtc.SignalReq_InviteInGroup:
inviteeUserIDList = invitationInfo.InviteInGroup.Invitation.InviteeUserIDList
inviteeUserIDList = signalInfo.InviteInGroup.Invitation.InviteeUserIDList
isInviteSignal = true
case *pbRtc.SignalReq_HungUp, *pbRtc.SignalReq_Cancel, *pbRtc.SignalReq_Reject, *pbRtc.SignalReq_Accept:
return errors.New("signalInfo do not need offlinePush")
default:
log2.NewDebug("", utils.GetSelfFuncName(), "req type not invite", string(msg.Content))
log2.NewDebug(operationID, utils.GetSelfFuncName(), "req invalid type", string(msg.Content))
return nil
}
for _, userID := range inviteeUserIDList {
timeout, err := strconv.Atoi(config.Config.Rtc.SignalTimeout)
if err != nil {
return err
}
keyList := SignalListCache + userID
err = d.rdb.LPush(context.Background(), keyList, msg.ClientMsgID).Err()
if err != nil {
return err
}
err = d.rdb.Expire(context.Background(), keyList, time.Duration(timeout)*time.Second).Err()
if err != nil {
return err
}
key := SignalCache + msg.ClientMsgID
err = d.rdb.Set(context.Background(), key, msg.Content, time.Duration(timeout)*time.Second).Err()
if err != nil {
return err
if isInviteSignal {
log2.NewInfo(operationID, utils.GetSelfFuncName(), "invite userID list:", inviteeUserIDList)
for _, userID := range inviteeUserIDList {
log2.NewInfo(operationID, utils.GetSelfFuncName(), "invite userID:", userID)
timeout, err := strconv.Atoi(config.Config.Rtc.SignalTimeout)
if err != nil {
return err
}
keyList := SignalListCache + userID
err = d.rdb.LPush(context.Background(), keyList, msg.ClientMsgID).Err()
if err != nil {
return err
}
err = d.rdb.Expire(context.Background(), keyList, time.Duration(timeout)*time.Second).Err()
if err != nil {
return err
}
key := SignalCache + msg.ClientMsgID
err = d.rdb.Set(context.Background(), key, msg.Content, time.Duration(timeout)*time.Second).Err()
if err != nil {
return err
}
}
return err
}
return nil
}
@ -149,15 +157,17 @@ func (d *DataBases) GetSignalInfoFromCacheByClientMsgID(clientMsgID string) (inv
switch req2 := req.Payload.(type) {
case *pbRtc.SignalReq_Invite:
invitationInfo.Invitation = req2.Invite.Invitation
invitationInfo.OpUserID = req2.Invite.OpUserID
case *pbRtc.SignalReq_InviteInGroup:
invitationInfo.Invitation = req2.InviteInGroup.Invitation
invitationInfo.OpUserID = req2.InviteInGroup.OpUserID
}
return invitationInfo, err
}
func (d *DataBases) GetAvailableSignalInvitationInfo(userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) {
keyList := SignalListCache + userID
result := d.rdb.RPop(context.Background(), keyList)
result := d.rdb.LPop(context.Background(), keyList)
if err = result.Err(); err != nil {
return nil, utils.Wrap(err, "GetAvailableSignalInvitationInfo failed")
}
@ -170,14 +180,14 @@ func (d *DataBases) GetAvailableSignalInvitationInfo(userID string) (invitationI
if err != nil {
return nil, utils.Wrap(err, "GetSignalInfoFromCacheByClientMsgID")
}
err = d.delUserSingalList(userID)
err = d.DelUserSignalList(userID)
if err != nil {
return nil, utils.Wrap(err, "GetSignalInfoFromCacheByClientMsgID")
}
return invitationInfo, nil
}
func (d *DataBases) delUserSingalList(userID string) error {
func (d *DataBases) DelUserSignalList(userID string) error {
keyList := SignalListCache + userID
err := d.rdb.Del(context.Background(), keyList).Err()
return err

@ -2,6 +2,7 @@ package kafka
import (
log2 "Open_IM/pkg/common/log"
"errors"
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
)
@ -32,18 +33,22 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
return &p
}
func (p *Producer) SendMessage(m proto.Message, key ...string) (int32, int64, error) {
func (p *Producer) SendMessage(m proto.Message, key string, operationID string) (int32, int64, error) {
log2.Info(operationID, "SendMessage", "key ", key, m.String(), p.producer)
kMsg := &sarama.ProducerMessage{}
kMsg.Topic = p.topic
if len(key) == 1 {
kMsg.Key = sarama.StringEncoder(key[0])
}
kMsg.Key = sarama.StringEncoder(key)
bMsg, err := proto.Marshal(m)
if err != nil {
log2.Error("", "", "proto marshal err = %s", err.Error())
log2.Error(operationID, "", "proto marshal err = %s", err.Error())
return -1, -1, err
}
if len(bMsg) == 0 {
return 0, 0, errors.New("msg content is nil")
}
kMsg.Value = sarama.ByteEncoder(bMsg)
return p.producer.SendMessage(kMsg)
log2.Info(operationID, "ByteEncoder SendMessage begin", "key ", kMsg, p.producer)
a, b, c := p.producer.SendMessage(kMsg)
log2.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg, p.producer)
return a, b, c
}

File diff suppressed because it is too large Load Diff

@ -122,8 +122,9 @@ message InvitationInfo {
string roomID = 5;
int32 timeout = 6;
string mediaType = 7;
int32 platformID = 8;
int32 sessionType = 9;
int32 platformID = 8;
int32 sessionType = 9;
int32 initiateTime = 10;
}

File diff suppressed because it is too large Load Diff

@ -520,8 +520,9 @@ message InvitationInfo {
string roomID = 5;
int32 timeout = 6;
string mediaType = 7;
int32 platformID = 8;
int32 sessionType = 9;
int32 platformID = 8;
int32 sessionType = 9;
int32 initiateTime = 10;
}
message ParticipantMetaData{

Loading…
Cancel
Save