|
|
package main
|
|
|
|
|
|
import (
|
|
|
"fmt"
|
|
|
"github.com/Shopify/sarama"
|
|
|
"hash"
|
|
|
"hash/crc32"
|
|
|
"log"
|
|
|
"math/rand"
|
|
|
"os"
|
|
|
"os/signal"
|
|
|
"sync"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
func main() {
|
|
|
// 一,得到异步的producer
|
|
|
brokers := []string{"localhost:9092"}
|
|
|
conf := sarama.NewConfig()
|
|
|
// 开启 Success channel 来接收发送成功的信息
|
|
|
conf.Producer.Return.Successes = true
|
|
|
// 配置producer选项分区的策略
|
|
|
// 随机,全部的消息不需要逻辑上的分割
|
|
|
//conf.Producer.Partitioner = sarama.NewRandomPartitioner
|
|
|
// 轮循,全部的消息不需要逻辑上的分割
|
|
|
//conf.Producer.Partitioner = sarama.NewRoundRobinPartitioner
|
|
|
|
|
|
// Hash,基于特定的Key,完成分区选择。具有逻辑含义
|
|
|
//conf.Producer.Partitioner = sarama.NewHashPartitioner
|
|
|
//自定义Hash算法,需要完成hash32的函数
|
|
|
conf.Producer.Partitioner = sarama.NewCustomHashPartitioner(func() hash.Hash32 {
|
|
|
//poly: x³²+ x³¹+ x²⁴+ x²²+ x¹⁶+ x¹⁴+ x⁸+ x⁷+ x⁵
|
|
|
//0b11010101100000101000001010000001
|
|
|
return crc32.New(crc32.MakeTable(0xD5828281))
|
|
|
})
|
|
|
// 指定,配合 partition使用
|
|
|
//conf.Producer.Partitioner = sarama.NewManualPartitioner
|
|
|
producer, err := sarama.NewAsyncProducer(brokers, conf)
|
|
|
if err != nil {
|
|
|
log.Fatalln(err)
|
|
|
}
|
|
|
|
|
|
// 二,启用 goroutine
|
|
|
var wg sync.WaitGroup
|
|
|
sendCounter, errorCounter, successCounter := 0, 0, 0
|
|
|
|
|
|
// 2.1 处理 errors
|
|
|
go func() {
|
|
|
wg.Add(1)
|
|
|
defer wg.Done()
|
|
|
for err := range producer.Errors() {
|
|
|
log.Printf("Failed to send, err: %s\n", err)
|
|
|
errorCounter++
|
|
|
}
|
|
|
}()
|
|
|
|
|
|
// 2.2 处理 success
|
|
|
go func() {
|
|
|
wg.Add(1)
|
|
|
defer wg.Done()
|
|
|
for suc := range producer.Successes() {
|
|
|
log.Printf("Success to send, partition: %d, offset: %d, value: %s\n", suc.Partition, suc.Offset, suc.Value)
|
|
|
successCounter++
|
|
|
}
|
|
|
}()
|
|
|
|
|
|
// 增加控制信号
|
|
|
signals := make(chan os.Signal, 1)
|
|
|
signal.Notify(signals, os.Interrupt) // ctrl + c
|
|
|
|
|
|
loop:
|
|
|
|
|
|
for {
|
|
|
// 生成特定的消息,随机生成
|
|
|
// 消息关联的数据id
|
|
|
id := rand.Intn(5)
|
|
|
|
|
|
time.Sleep(1000 * time.Millisecond)
|
|
|
select {
|
|
|
case producer.Input() <- &sarama.ProducerMessage{
|
|
|
Topic: "topic_more_partition_1", // 3 partitions
|
|
|
Value: sarama.StringEncoder("MaShiBing Go, id:" + fmt.Sprintf("%d", id)),
|
|
|
// Hash
|
|
|
//Key: sarama.StringEncoder(fmt.Sprintf("%d", id)),
|
|
|
// manual
|
|
|
Partition: int32(id % 3),
|
|
|
}:
|
|
|
sendCounter++
|
|
|
|
|
|
// for 终止信号
|
|
|
case <-signals:
|
|
|
// 异步终止
|
|
|
producer.AsyncClose()
|
|
|
break loop
|
|
|
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// wg 等待
|
|
|
wg.Wait()
|
|
|
|
|
|
// 输出统计结果
|
|
|
log.Printf("SendCounter: %d, ErrorCounter: %d, SuccessCounter: %d\n", sendCounter, errorCounter, successCounter)
|
|
|
|
|
|
}
|