2 years ago
package main
import (
func main() {
// 一得到异步的producer
brokers := []string{"localhost:9092"}
conf := sarama.NewConfig()
// 开启 Success channel 来接收发送成功的信息
conf.Producer.Return.Successes = true
// 配置producer选项分区的策略
// 随机,全部的消息不需要逻辑上的分割
//conf.Producer.Partitioner = sarama.NewRandomPartitioner
// 轮循,全部的消息不需要逻辑上的分割
//conf.Producer.Partitioner = sarama.NewRoundRobinPartitioner
// Hash基于特定的Key完成分区选择。具有逻辑含义
//conf.Producer.Partitioner = sarama.NewHashPartitioner
conf.Producer.Partitioner = sarama.NewCustomHashPartitioner(func() hash.Hash32 {
//poly: x³²+ x³¹+ x²⁴+ x²²+ x¹⁶+ x¹⁴+ x⁸+ x⁷+ x⁵
return crc32.New(crc32.MakeTable(0xD5828281))
// 指定,配合 partition使用
//conf.Producer.Partitioner = sarama.NewManualPartitioner
producer, err := sarama.NewAsyncProducer(brokers, conf)
if err != nil {
// 二,启用 goroutine
var wg sync.WaitGroup
sendCounter, errorCounter, successCounter := 0, 0, 0
// 2.1 处理 errors
go func() {
defer wg.Done()
for err := range producer.Errors() {
log.Printf("Failed to send, err: %s\n", err)
// 2.2 处理 success
go func() {
defer wg.Done()
for suc := range producer.Successes() {
log.Printf("Success to send, partition: %d, offset: %d, value: %s\n", suc.Partition, suc.Offset, suc.Value)
// 增加控制信号
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt) // ctrl + c
for {
// 生成特定的消息,随机生成
// 消息关联的数据id
id := rand.Intn(5)
time.Sleep(1000 * time.Millisecond)
select {
case producer.Input() <- &sarama.ProducerMessage{
Topic: "topic_more_partition_1", // 3 partitions
Value: sarama.StringEncoder("MaShiBing Go, id:" + fmt.Sprintf("%d", id)),
// Hash
//Key: sarama.StringEncoder(fmt.Sprintf("%d", id)),
// manual
Partition: int32(id % 3),
// for 终止信号
case <-signals:
// 异步终止
break loop
// wg 等待
// 输出统计结果
log.Printf("SendCounter: %d, ErrorCounter: %d, SuccessCounter: %d\n", sendCounter, errorCounter, successCounter)