|
|
@ -2,6 +2,7 @@ package kafka
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
import (
|
|
|
|
log2 "Open_IM/pkg/common/log"
|
|
|
|
log2 "Open_IM/pkg/common/log"
|
|
|
|
|
|
|
|
"errors"
|
|
|
|
"github.com/Shopify/sarama"
|
|
|
|
"github.com/Shopify/sarama"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
|
|
)
|
|
|
|
)
|
|
|
@ -43,6 +44,9 @@ func (p *Producer) SendMessage(m proto.Message, key ...string) (int32, int64, er
|
|
|
|
log2.Error("", "", "proto marshal err = %s", err.Error())
|
|
|
|
log2.Error("", "", "proto marshal err = %s", err.Error())
|
|
|
|
return -1, -1, err
|
|
|
|
return -1, -1, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(bMsg) == 0 {
|
|
|
|
|
|
|
|
return 0, 0, errors.New("msg content is nil")
|
|
|
|
|
|
|
|
}
|
|
|
|
kMsg.Value = sarama.ByteEncoder(bMsg)
|
|
|
|
kMsg.Value = sarama.ByteEncoder(bMsg)
|
|
|
|
|
|
|
|
|
|
|
|
return p.producer.SendMessage(kMsg)
|
|
|
|
return p.producer.SendMessage(kMsg)
|
|
|
|