You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

106 lines
2.6 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package main
import (
"fmt"
"github.com/Shopify/sarama"
"hash"
"hash/crc32"
"log"
"math/rand"
"os"
"os/signal"
"sync"
"time"
)
func main() {
// 一得到异步的producer
brokers := []string{"localhost:9092"}
conf := sarama.NewConfig()
// 开启 Success channel 来接收发送成功的信息
conf.Producer.Return.Successes = true
// 配置producer选项分区的策略
// 随机,全部的消息不需要逻辑上的分割
//conf.Producer.Partitioner = sarama.NewRandomPartitioner
// 轮循,全部的消息不需要逻辑上的分割
//conf.Producer.Partitioner = sarama.NewRoundRobinPartitioner
// Hash基于特定的Key完成分区选择。具有逻辑含义
//conf.Producer.Partitioner = sarama.NewHashPartitioner
//自定义Hash算法需要完成hash32的函数
conf.Producer.Partitioner = sarama.NewCustomHashPartitioner(func() hash.Hash32 {
//poly: x³²+ x³¹+ x²⁴+ x²²+ x¹⁶+ x¹⁴+ x⁸+ x⁷+ x⁵
//0b11010101100000101000001010000001
return crc32.New(crc32.MakeTable(0xD5828281))
})
// 指定,配合 partition使用
//conf.Producer.Partitioner = sarama.NewManualPartitioner
producer, err := sarama.NewAsyncProducer(brokers, conf)
if err != nil {
log.Fatalln(err)
}
// 二,启用 goroutine
var wg sync.WaitGroup
sendCounter, errorCounter, successCounter := 0, 0, 0
// 2.1 处理 errors
go func() {
wg.Add(1)
defer wg.Done()
for err := range producer.Errors() {
log.Printf("Failed to send, err: %s\n", err)
errorCounter++
}
}()
// 2.2 处理 success
go func() {
wg.Add(1)
defer wg.Done()
for suc := range producer.Successes() {
log.Printf("Success to send, partition: %d, offset: %d, value: %s\n", suc.Partition, suc.Offset, suc.Value)
successCounter++
}
}()
// 增加控制信号
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt) // ctrl + c
loop:
for {
// 生成特定的消息,随机生成
// 消息关联的数据id
id := rand.Intn(5)
time.Sleep(1000 * time.Millisecond)
select {
case producer.Input() <- &sarama.ProducerMessage{
Topic: "topic_more_partition_1", // 3 partitions
Value: sarama.StringEncoder("MaShiBing Go, id:" + fmt.Sprintf("%d", id)),
// Hash
//Key: sarama.StringEncoder(fmt.Sprintf("%d", id)),
// manual
Partition: int32(id % 3),
}:
sendCounter++
// for 终止信号
case <-signals:
// 异步终止
producer.AsyncClose()
break loop
}
}
// wg 等待
wg.Wait()
// 输出统计结果
log.Printf("SendCounter: %d, ErrorCounter: %d, SuccessCounter: %d\n", sendCounter, errorCounter, successCounter)
}