diff --git a/go.mod b/go.mod index 6df88e549..e34d69ab0 100644 --- a/go.mod +++ b/go.mod @@ -32,7 +32,6 @@ require ( github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 github.com/olivere/elastic/v7 v7.0.23 github.com/pkg/errors v0.9.1 - github.com/prometheus/client_golang v1.11.1 github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5 github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.7.0 @@ -51,3 +50,5 @@ require ( gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b ) + +replace github.com/Shopify/sarama => github.com/Shopify/sarama v1.29.0 diff --git a/internal/msg_gateway/gate/logic.go b/internal/msg_gateway/gate/logic.go index 56eed3587..95b6fd85c 100644 --- a/internal/msg_gateway/gate/logic.go +++ b/internal/msg_gateway/gate/logic.go @@ -256,7 +256,7 @@ func (ws *WServer) sendSignalMsgReq(conn *UserConn, m *Req) { } func (ws *WServer) sendSignalMsgResp(conn *UserConn, errCode int32, errMsg string, m *Req, pb *pbRtc.SignalResp) { // := make(map[string]interface{}) - log.Debug(m.OperationID, "SignalMsgResp is", pb.String()) + log.Debug(m.OperationID, "sendSignalMsgResp is", pb.String()) b, _ := proto.Marshal(pb) mReply := Resp{ ReqIdentifier: m.ReqIdentifier, diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 6afbc7549..3dc98c3a7 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -160,29 +160,29 @@ func (rpc *rpcChat) encapsulateMsgData(msg *sdk_ws.MsgData) { utils.SetSwitchFromOptions(msg.Options, constant.IsSenderConversationUpdate, false) utils.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, false) utils.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, false) - } } func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) { replay := pbChat.SendMsgResp{} newTime := db.GetCurrentTimestampByMill() - log.NewWarn(pb.OperationID, "rpc sendMsg come here", pb.String(), pb.MsgData.ClientMsgID) + log.Info(pb.OperationID, "rpc sendMsg come here ", pb.String()) flag, errCode, errMsg := isMessageHasReadEnabled(pb) + log.Info(pb.OperationID, "isMessageHasReadEnabled ", flag) if !flag { return returnMsg(&replay, pb, errCode, errMsg, "", 0) } flag, errCode, errMsg = userRelationshipVerification(pb) + log.Info(pb.OperationID, "userRelationshipVerification ", flag) if !flag { return returnMsg(&replay, pb, errCode, errMsg, "", 0) } rpc.encapsulateMsgData(pb.MsgData) - log.Info(pb.OperationID, "this is a test MsgData ", pb.MsgData) msgToMQSingle := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID, MsgData: pb.MsgData} - // callback callbackResp := callbackWordFilter(pb) + log.Info(pb.OperationID, "callbackWordFilter ", callbackResp) if callbackResp.ErrCode != 0 { - log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackWordFilter resp: ", callbackResp) + log.Error(pb.OperationID, utils.GetSelfFuncName(), "callbackWordFilter resp: ", callbackResp) } log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackResp: ", callbackResp) if callbackResp.ActionCode != constant.ActionAllow { @@ -212,7 +212,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle) err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.RecvID, constant.OnlineStatus) if err1 != nil { - log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String()) + log.NewError(msgToMQSingle.OperationID, "kafka send msg err :RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String(), err1.Error()) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } } diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index c4cd9717a..1ebaaab10 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -2,6 +2,7 @@ package kafka import ( log2 "Open_IM/pkg/common/log" + "Open_IM/pkg/utils" "errors" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" @@ -45,15 +46,16 @@ func (p *Producer) SendMessage(m proto.Message, key string, operationID string) return -1, -1, err } if len(bMsg) == 0 { - return 0, 0, errors.New("msg content is nil") + log2.Error(operationID, "len(bMsg) == 0 ") + return 0, 0, errors.New("len(bMsg) == 0 ") } kMsg.Value = sarama.ByteEncoder(bMsg) - log2.Info(operationID, "ByteEncoder SendMessage begin", "key ", kMsg, p.producer) + log2.Info(operationID, "ByteEncoder SendMessage begin", "key ", kMsg, p.producer, "len: ", kMsg.Key.Length(), kMsg.Value.Length()) if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 { log2.Error(operationID, "kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 ", kMsg) return -1, -1, errors.New("key or value == 0") } a, b, c := p.producer.SendMessage(kMsg) - log2.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg, p.producer) - return a, b, c + log2.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg.Key.Length(), kMsg.Value.Length(), p.producer) + return a, b, utils.Wrap(c, "") }