use sarama v1.29.0

pull/236/head
skiffer-git 3 years ago
parent 06915f5dc3
commit 0d2c0e91c8

@ -32,7 +32,6 @@ require (
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
github.com/olivere/elastic/v7 v7.0.23 github.com/olivere/elastic/v7 v7.0.23
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.1
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5 github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5
github.com/sirupsen/logrus v1.8.1 github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.0 github.com/stretchr/testify v1.7.0
@ -51,3 +50,5 @@ require (
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
) )
replace github.com/Shopify/sarama => github.com/Shopify/sarama v1.29.0

@ -256,7 +256,7 @@ func (ws *WServer) sendSignalMsgReq(conn *UserConn, m *Req) {
} }
func (ws *WServer) sendSignalMsgResp(conn *UserConn, errCode int32, errMsg string, m *Req, pb *pbRtc.SignalResp) { func (ws *WServer) sendSignalMsgResp(conn *UserConn, errCode int32, errMsg string, m *Req, pb *pbRtc.SignalResp) {
// := make(map[string]interface{}) // := make(map[string]interface{})
log.Debug(m.OperationID, "SignalMsgResp is", pb.String()) log.Debug(m.OperationID, "sendSignalMsgResp is", pb.String())
b, _ := proto.Marshal(pb) b, _ := proto.Marshal(pb)
mReply := Resp{ mReply := Resp{
ReqIdentifier: m.ReqIdentifier, ReqIdentifier: m.ReqIdentifier,

@ -160,29 +160,29 @@ func (rpc *rpcChat) encapsulateMsgData(msg *sdk_ws.MsgData) {
utils.SetSwitchFromOptions(msg.Options, constant.IsSenderConversationUpdate, false) utils.SetSwitchFromOptions(msg.Options, constant.IsSenderConversationUpdate, false)
utils.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, false) utils.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, false)
utils.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, false) utils.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, false)
} }
} }
func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) { func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) {
replay := pbChat.SendMsgResp{} replay := pbChat.SendMsgResp{}
newTime := db.GetCurrentTimestampByMill() newTime := db.GetCurrentTimestampByMill()
log.NewWarn(pb.OperationID, "rpc sendMsg come here", pb.String(), pb.MsgData.ClientMsgID) log.Info(pb.OperationID, "rpc sendMsg come here ", pb.String())
flag, errCode, errMsg := isMessageHasReadEnabled(pb) flag, errCode, errMsg := isMessageHasReadEnabled(pb)
log.Info(pb.OperationID, "isMessageHasReadEnabled ", flag)
if !flag { if !flag {
return returnMsg(&replay, pb, errCode, errMsg, "", 0) return returnMsg(&replay, pb, errCode, errMsg, "", 0)
} }
flag, errCode, errMsg = userRelationshipVerification(pb) flag, errCode, errMsg = userRelationshipVerification(pb)
log.Info(pb.OperationID, "userRelationshipVerification ", flag)
if !flag { if !flag {
return returnMsg(&replay, pb, errCode, errMsg, "", 0) return returnMsg(&replay, pb, errCode, errMsg, "", 0)
} }
rpc.encapsulateMsgData(pb.MsgData) rpc.encapsulateMsgData(pb.MsgData)
log.Info(pb.OperationID, "this is a test MsgData ", pb.MsgData)
msgToMQSingle := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID, MsgData: pb.MsgData} msgToMQSingle := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID, MsgData: pb.MsgData}
// callback // callback
callbackResp := callbackWordFilter(pb) callbackResp := callbackWordFilter(pb)
log.Info(pb.OperationID, "callbackWordFilter ", callbackResp)
if callbackResp.ErrCode != 0 { if callbackResp.ErrCode != 0 {
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackWordFilter resp: ", callbackResp) log.Error(pb.OperationID, utils.GetSelfFuncName(), "callbackWordFilter resp: ", callbackResp)
} }
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackResp: ", callbackResp) log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackResp: ", callbackResp)
if callbackResp.ActionCode != constant.ActionAllow { if callbackResp.ActionCode != constant.ActionAllow {
@ -212,7 +212,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle) log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle)
err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.RecvID, constant.OnlineStatus) err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.RecvID, constant.OnlineStatus)
if err1 != nil { if err1 != nil {
log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String()) log.NewError(msgToMQSingle.OperationID, "kafka send msg err :RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String(), err1.Error())
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
} }
} }

@ -2,6 +2,7 @@ package kafka
import ( import (
log2 "Open_IM/pkg/common/log" log2 "Open_IM/pkg/common/log"
"Open_IM/pkg/utils"
"errors" "errors"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
@ -45,15 +46,16 @@ func (p *Producer) SendMessage(m proto.Message, key string, operationID string)
return -1, -1, err return -1, -1, err
} }
if len(bMsg) == 0 { if len(bMsg) == 0 {
return 0, 0, errors.New("msg content is nil") log2.Error(operationID, "len(bMsg) == 0 ")
return 0, 0, errors.New("len(bMsg) == 0 ")
} }
kMsg.Value = sarama.ByteEncoder(bMsg) kMsg.Value = sarama.ByteEncoder(bMsg)
log2.Info(operationID, "ByteEncoder SendMessage begin", "key ", kMsg, p.producer) log2.Info(operationID, "ByteEncoder SendMessage begin", "key ", kMsg, p.producer, "len: ", kMsg.Key.Length(), kMsg.Value.Length())
if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 { if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 {
log2.Error(operationID, "kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 ", kMsg) log2.Error(operationID, "kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 ", kMsg)
return -1, -1, errors.New("key or value == 0") return -1, -1, errors.New("key or value == 0")
} }
a, b, c := p.producer.SendMessage(kMsg) a, b, c := p.producer.SendMessage(kMsg)
log2.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg, p.producer) log2.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg.Key.Length(), kMsg.Value.Length(), p.producer)
return a, b, c return a, b, utils.Wrap(c, "")
} }

Loading…
Cancel
Save