full commit

master
han-joker 2 years ago
commit a8e375ee13

42
.gitignore vendored

@ -0,0 +1,42 @@
# Reference https://github.com/github/gitignore/blob/master/Go.gitignore
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
vendor/
# Go workspace file
go.work
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# OS General
Thumbs.db
.DS_Store
# project
*.cert
*.key
*.log
bin/
# Develop tools
.vscode/
.idea/
*.swp
# volumes
data/

@ -0,0 +1,6 @@
# Kafka之go编程实战
注意,根目录下的代码都是单独测试文件,运行时需要单独运行指定文件,例如:
```shell
go run create-topic.go
```
章节对应的测试文件请参考《Kafka之Go操作实战-文档.md》中的说明

@ -0,0 +1,29 @@
package main
import (
"github.com/Shopify/sarama"
"log"
)
func main() {
// 一得到异步的producer
brokers := []string{"localhost:9092"}
conf := sarama.NewConfig()
// 1, default
conf.Producer.RequiredAcks = sarama.WaitForLocal
// -1
conf.Producer.RequiredAcks = sarama.WaitForAll
// 0
conf.Producer.RequiredAcks = sarama.NoResponse
producer, err := sarama.NewAsyncProducer(brokers, conf)
if err != nil {
log.Fatalln(err)
}
// 通常还要配合
// 未同步的follower是否可以选举为leader
//unclean.leader.election.enable = false
// 最小的正常同步状态的 follower 数量
//min.insync.replicas = $(N/2 + 1)
}

@ -0,0 +1,137 @@
package main
import (
"github.com/Shopify/sarama"
"log"
"os"
"os/signal"
"sync"
"time"
)
func main() {
// select demo
//selectDemo()
//
goroutineDemo()
}
func goroutineDemo() {
// 一得到异步的producer
brokers := []string{"localhost:9292"}
conf := sarama.NewConfig()
// 开启 Success channel 来接收发送成功的信息
conf.Producer.Return.Successes = true
producer, err := sarama.NewAsyncProducer(brokers, conf)
if err != nil {
log.Fatalln(err)
}
// 二,启用 goroutine
var wg sync.WaitGroup
sendCounter, errorCounter, successCounter := 0, 0, 0
// 2.1 处理 errors
go func() {
wg.Add(1)
defer wg.Done()
for err := range producer.Errors() {
log.Printf("Failed to send, err: %s\n", err)
errorCounter++
}
}()
// 2.2 处理 success
go func() {
wg.Add(1)
defer wg.Done()
for suc := range producer.Successes() {
log.Printf("Success to send, partition: %d, offset: %d\n", suc.Partition, suc.Offset)
successCounter++
}
}()
// 增加控制信号
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt) // ctrl + c
loop:
for {
time.Sleep(500 * time.Millisecond)
select {
case producer.Input() <- &sarama.ProducerMessage{
Topic: "cctopic",
Value: sarama.StringEncoder("MaShiBing Go"),
}:
sendCounter++
// for 终止信号
case <-signals:
// 异步终止
producer.AsyncClose()
break loop
}
}
// wg 等待
wg.Wait()
// 输出统计结果
log.Printf("SendCounter: %d, ErrorCounter: %d, SuccessCounter: %d\n", sendCounter, errorCounter, successCounter)
}
func selectDemo() {
// 一得到异步的producer
brokers := []string{"localhost:9092"}
conf := sarama.NewConfig()
// 开启 Success channel 来接收发送成功的信息
conf.Producer.Return.Successes = true
producer, err := sarama.NewAsyncProducer(brokers, conf)
if err != nil {
log.Fatalln(err)
}
// close
defer func() {
if err := producer.Close(); err != nil {
log.Fatalln(err)
}
}()
// 增加控制信号
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt) // ctrl + c
// 二send message
sendCounter, errorCounter, successCounter := 0, 0, 0
loop:
for {
time.Sleep(100 * time.Millisecond)
select {
// send
case producer.Input() <- &sarama.ProducerMessage{
Topic: "async_topic",
Value: sarama.StringEncoder("MaShiBing Go"),
}:
sendCounter++
// error
case err := <-producer.Errors():
log.Printf("Failed to send, err: %s\n", err)
errorCounter++
// success
case suc := <-producer.Successes():
log.Printf("Success to send, partition: %d, offset: %d\n", suc.Partition, suc.Offset)
successCounter++
// for 终止信号
case <-signals:
break loop
}
}
// 输出统计结果
log.Printf("SendCounter: %d, ErrorCounter: %d, SuccessCounter: %d\n", sendCounter, errorCounter, successCounter)
}

@ -0,0 +1,65 @@
package main
import (
"github.com/Shopify/sarama"
"log"
"os"
"os/signal"
)
func main() {
// 一,获取消费者
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
log.Fatalln(err)
}
defer func() {
if err := consumer.Close(); err != nil {
log.Fatalln(err)
}
}()
// 二获取topic下的分区列表
topic := "topic_more_partition_1" // 3
partitions, err := consumer.Partitions(topic)
if err != nil {
log.Fatalln(err)
}
log.Println(partitions)
// 三,遍历每个分区
var consumeCounter = 0
for _, partition := range partitions {
// 获取每个分区的消费者
partitionConsumer, err := consumer.ConsumePartition(topic, partition, sarama.OffsetNewest)
if err != nil {
log.Fatalln(err)
}
// 四,利用 goroutine 完成消费
go func(pc sarama.PartitionConsumer) {
// 关闭
defer func() {
if err := pc.Close(); err != nil {
log.Fatalln(err)
}
}()
// 消费
for msg := range pc.Messages() {
log.Println(pc)
log.Printf("Consumed message, partition: %d, offset: %d, topic: %s, value: %s\n", msg.Partition, msg.Offset, msg.Topic, msg.Value)
consumeCounter++
}
}(partitionConsumer)
}
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
select {
case <-signals:
}
log.Printf("Consumed: %d\n", consumeCounter)
}

@ -0,0 +1,113 @@
package main
import (
"github.com/Shopify/sarama"
"log"
"os"
"os/signal"
)
func main() {
//consumeTopic()
consumeTopicMulti()
}
func consumeTopicMulti() {
// 一,获取消费者
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
log.Fatalln(err)
}
defer func() {
if err := consumer.Close(); err != nil {
log.Fatalln(err)
}
}()
// 二获取topic下的分区列表
topic := "async_topic"
partitions, err := consumer.Partitions(topic)
if err != nil {
log.Fatalln(err)
}
// 三,遍历每个分区
var consumeCounter = 0
for _, partition := range partitions {
// 获取每个分区的消费者
partitionConsumer, err := consumer.ConsumePartition(topic, partition, sarama.OffsetNewest)
if err != nil {
log.Fatalln(err)
}
// 四,利用 goroutine 完成消费
go func(pc sarama.PartitionConsumer) {
// 关闭
defer func() {
if err := pc.Close(); err != nil {
log.Fatalln(err)
}
}()
// 消费
for msg := range pc.Messages() {
log.Printf("Consumed message, partition: %d, offset: %d, topic: %s, value: %s\n", msg.Partition, msg.Offset, msg.Topic, msg.Value)
consumeCounter++
}
}(partitionConsumer)
}
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
select {
case <-signals:
}
log.Printf("Consumed: %d\n", consumeCounter)
}
func consumeTopic() {
// 一,获取消费者
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
log.Fatalln(err)
}
defer func() {
if err := consumer.Close(); err != nil {
log.Fatalln(err)
}
}()
// 二获取topic具体某个partition的的消费者
// topic 1:N partition
partitionConsumer, err := consumer.ConsumePartition("async_topic", 0, sarama.OffsetNewest)
if err != nil {
log.Fatalln(err)
}
defer func() {
if err := partitionConsumer.Close(); err != nil {
log.Fatalln(err)
}
}()
// 三,消费
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
consumeCounter := 0
loop:
for {
select {
case msg := <-partitionConsumer.Messages():
log.Printf("Consumed message, partition: %d, offset: %d, topic: %s, value: %s\n", msg.Partition, msg.Offset, msg.Topic, msg.Value)
consumeCounter++
case <-signals:
break loop
}
}
log.Printf("Consumed: %d\n", consumeCounter)
}

@ -0,0 +1,73 @@
package main
import (
"github.com/Shopify/sarama"
"log"
"time"
)
func main() {
// 一创建Broker对象
addr := "localhost:9092"
broker := sarama.NewBroker(addr)
// 打开Broker建立连接
conf := sarama.NewConfig()
if err := broker.Open(conf); err != nil {
log.Fatalln(err)
}
defer func() {
_ = broker.Close()
}()
// open 会连接,但是是非阻塞连接模式
// 不会等到连接成功再返回
// 通常会强制连接判定 broker.Connected()
// 1.2 判定连接状态
connected, err := broker.Connected()
if err != nil {
log.Fatalln(err)
}
log.Printf("Connected: %v\n", connected)
// 二,设置主题
topicKey := "topic_more_partition_1"
topicDetail := &sarama.TopicDetail{
// 分区数量
NumPartitions: 3,
// 复制因子
ReplicationFactor: 1,
}
// 三,发出创建主题的请求
// 请求对象
request := &sarama.CreateTopicsRequest{
TopicDetails: map[string]*sarama.TopicDetail{
topicKey: topicDetail,
// 同时创建多个是支持的
},
Timeout: 1 * time.Second,
}
// 发出请求
response, err := broker.CreateTopics(request)
if err != nil {
log.Fatalln(err)
}
log.Println(response)
// 四使用consumer查看有几个分区
consumer, err := sarama.NewConsumer([]string{addr}, nil)
if err != nil {
log.Fatalln(err)
}
defer func() {
if err := consumer.Close(); err != nil {
log.Fatalln(err)
}
}()
// 二获取topic下的分区列表
partitions, err := consumer.Partitions(topicKey)
if err != nil {
log.Fatalln(err)
}
log.Println(partitions) // []int32
}

@ -0,0 +1,20 @@
version: "3"
services:
zookeeper:
image: 'bitnami/zookeeper:latest'
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:latest'
ports:
- '9092:9092'
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper

@ -0,0 +1,20 @@
version: "3"
services:
zookeeper:
image: 'bitnami/zookeeper:latest'
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:latest'
ports:
- '9092:9092'
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper

@ -0,0 +1,70 @@
package main
import (
"encoding/json"
"github.com/Shopify/sarama"
"log"
"os"
"os/signal"
"time"
)
type Event struct {
Name string `json:"name"`
Type string `json:"type"`
Source string `json:"source"`
Target string `json:"target"`
Time time.Time `json:"time"`
}
func main() {
// 一,获取消费者
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
log.Fatalln(err)
}
defer func() {
if err := consumer.Close(); err != nil {
log.Fatalln(err)
}
}()
// 二获取topic具体某个partition的的消费者
// topic 1:N partition
partitionConsumer, err := consumer.ConsumePartition("event_topic", 0, sarama.OffsetNewest)
if err != nil {
log.Fatalln(err)
}
defer func() {
if err := partitionConsumer.Close(); err != nil {
log.Fatalln(err)
}
}()
// 三,消费
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
consumeCounter := 0
loop:
for {
select {
case msg := <-partitionConsumer.Messages():
log.Printf("Consumed message, partition: %d, offset: %d, topic: %s, value: %s, type:%T\n", msg.Partition, msg.Offset, msg.Topic, msg.Value, msg.Value)
evt := Event{}
err := json.Unmarshal(msg.Value, &evt)
if err != nil {
log.Fatalln(err)
}
log.Println(evt)
log.Printf("%T\n", evt)
consumeCounter++
case <-signals:
break loop
}
}
log.Printf("Consumed: %d\n", consumeCounter)
}

@ -0,0 +1,80 @@
package main
import (
"encoding/json"
"github.com/Shopify/sarama"
"log"
"time"
)
// 需要发送的消息类型
type Event struct {
Name string `json:"name"`
Type string `json:"type"`
Source string `json:"source"`
Target string `json:"target"`
Time time.Time `json:"time"`
}
type EventEncoder Event
func (e EventEncoder) Encode() ([]byte, error) {
// 将 Event 类型数据做JSON编码
j, err := json.Marshal(e)
if err != nil {
return nil, err
}
return j, nil
}
func (e EventEncoder) Length() int {
// 编码之后的数据长度
j, err := json.Marshal(e)
if err != nil {
return 0
}
return len(j)
}
func main() {
// 一,获取同步生产者
brokers := []string{"localhost:9092"}
producer, err := sarama.NewSyncProducer(brokers, nil)
if err != nil {
log.Fatalln(err)
}
// 保证关闭生产者
defer func() {
if err := producer.Close(); err != nil {
log.Fatalln(err)
}
}()
// 模拟数据
evt := Event{
Name: "user",
Type: "buy",
Source: "42",
Target: "10098",
Time: time.Now(),
}
// 二,设置消息内容
msg := &sarama.ProducerMessage{
Topic: "event_topic",
//Value: sarama.StringEncoder("Mashibing Go Kafka."),
//Value: sarama.ByteEncoder
Value: EventEncoder(evt),
}
// 三,发送消息
// 返回:分区索引,偏移量,错误
partition, offset, err := producer.SendMessage(msg)
if err != nil {
log.Printf("Send failed, err: %s\n", err)
} else {
log.Printf("Send Success, partition: %d, offset: %d\n", partition, offset)
}
}

@ -0,0 +1,26 @@
module goKafka
go 1.19
require github.com/Shopify/sarama v1.37.2
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.3.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.15.11 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect
golang.org/x/net v0.0.0-20220927171203-f486391704dc // indirect
)

@ -0,0 +1,75 @@
github.com/Shopify/sarama v1.37.2 h1:LoBbU0yJPte0cE5TZCGdlzZRmMgMtZU/XgnUKZg9Cv4=
github.com/Shopify/sarama v1.37.2/go.mod h1:Nxye/E+YPru//Bpaorfhc3JsSGYwCaDDj+R4bK52U5o=
github.com/Shopify/toxiproxy/v2 v2.5.0 h1:i4LPT+qrSlKNtQf5QliVjdP08GyAH8+BUIc9gT0eahc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eapache/go-resiliency v1.3.0 h1:RRL0nge+cWGlxXbUzJ7yMcq6w2XBEr19dCN6HECGaT0=
github.com/eapache/go-resiliency v1.3.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8=
github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs=
github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo=
github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM=
github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg=
github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo=
github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o=
github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg=
github.com/jcmturner/gokrb5/v8 v8.4.3 h1:iTonLeSJOn7MVUtyMT+arAn5AKAPrkilzhGw8wE/Tq8=
github.com/jcmturner/gokrb5/v8 v8.4.3/go.mod h1:dqRwJGXznQrzw6cWmyo6kH+E7jksEQG/CyVWsJEsJO0=
github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c=
github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc=
github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa h1:zuSxTR4o9y82ebqCUJYNGJbGPo6sKVl54f/TVDObg1c=
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM=
golang.org/x/net v0.0.0-20220927171203-f486391704dc h1:FxpXZdoBqT8RjqTy6i1E8nXHhW21wK7ptQ/EPIGxzPQ=
golang.org/x/net v0.0.0-20220927171203-f486391704dc/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/sync v0.0.0-20220923202941-7f9b1623fab7 h1:ZrnxWX62AgTKOSagEqxvb3ffipvEDX2pl7E1TdqLqIc=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

@ -0,0 +1,113 @@
package main
import (
"context"
"github.com/Shopify/sarama"
"log"
"os"
"os/signal"
"sync"
)
// 分组消费
func main() {
// 一,创建消费者组
addrs := []string{"localhost:9092"}
groupID := "mashibingGroup"
conf := sarama.NewConfig()
// 消费信道返回
conf.Consumer.Return.Errors = true
// 设置分配策略
// 优先顺序配置
// 找到第一个组内全部的consumer都支持的策略作为分配策略
conf.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{
sarama.BalanceStrategySticky, // 新的更加优化的分配策略
sarama.BalanceStrategyRange, // default
sarama.BalanceStrategyRoundRobin, //
}
group, err := sarama.NewConsumerGroup(addrs, groupID, conf)
if err != nil {
log.Fatalln(err)
}
defer func() {
_ = group.Close()
}()
// 二处理group的错误
go func() {
for err := range group.Errors() {
log.Println(err)
}
}()
// 三,组消费
// 带有 cancel context
topics := []string{"topic_more_partition_1"}
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
go func() {
wg.Add(1)
defer wg.Done()
for {
// 每当组内消费者成员改变时,重新执行,重新分配
// for 保证重新执行
handler := GroupConsumeHandler{}
if err := group.Consume(ctx, topics, handler); err != nil {
log.Println(err)
}
// 判定 context 是否cancel
if ctx.Err() != nil {
log.Println(ctx.Err())
return
}
}
}()
// 信号阻塞
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
select {
case <-signals:
// 终止
cancel()
}
wg.Wait()
}
// 零,定义组消费处理器 group consume handler
type GroupConsumeHandler struct{}
// 重新消费时执行,增减组内消费者时,会执行
// ConsumerGroupSession消费者组会话
func (GroupConsumeHandler) Setup(cgs sarama.ConsumerGroupSession) error {
log.Println("setup")
log.Println(cgs.Claims())
//cgs.ResetOffset("topic_more_partition_1", 0, 2048, "")
return nil
}
// 当组内消费者退出时执行
func (GroupConsumeHandler) Cleanup(sarama.ConsumerGroupSession) error {
log.Println("cleanup")
return nil
}
// 组消费的核心方法
// ConsumerGroupSession消费者组会话
// sarama.ConsumerGroupClaim 组资产数据,使用该参数,完成消息的消费
func (GroupConsumeHandler) ConsumeClaim(cgs sarama.ConsumerGroupSession, cgc sarama.ConsumerGroupClaim) error {
log.Println("consumeClain")
// 不要 goroutine 中完成因为group会自动goroutine完成
// 消费
for msg := range cgc.Messages() {
log.Printf("Consumed message, partition: %d, offset: %d\n", msg.Partition, msg.Offset)
// 标记该消息已经被消费
cgs.MarkMessage(msg, "")
}
return nil
}

@ -0,0 +1,105 @@
package main
import (
"fmt"
"github.com/Shopify/sarama"
"hash"
"hash/crc32"
"log"
"math/rand"
"os"
"os/signal"
"sync"
"time"
)
func main() {
// 一得到异步的producer
brokers := []string{"localhost:9092"}
conf := sarama.NewConfig()
// 开启 Success channel 来接收发送成功的信息
conf.Producer.Return.Successes = true
// 配置producer选项分区的策略
// 随机,全部的消息不需要逻辑上的分割
//conf.Producer.Partitioner = sarama.NewRandomPartitioner
// 轮循,全部的消息不需要逻辑上的分割
//conf.Producer.Partitioner = sarama.NewRoundRobinPartitioner
// Hash基于特定的Key完成分区选择。具有逻辑含义
//conf.Producer.Partitioner = sarama.NewHashPartitioner
//自定义Hash算法需要完成hash32的函数
conf.Producer.Partitioner = sarama.NewCustomHashPartitioner(func() hash.Hash32 {
//poly: x³²+ x³¹+ x²⁴+ x²²+ x¹⁶+ x¹⁴+ x⁸+ x⁷+ x⁵
//0b11010101100000101000001010000001
return crc32.New(crc32.MakeTable(0xD5828281))
})
// 指定,配合 partition使用
//conf.Producer.Partitioner = sarama.NewManualPartitioner
producer, err := sarama.NewAsyncProducer(brokers, conf)
if err != nil {
log.Fatalln(err)
}
// 二,启用 goroutine
var wg sync.WaitGroup
sendCounter, errorCounter, successCounter := 0, 0, 0
// 2.1 处理 errors
go func() {
wg.Add(1)
defer wg.Done()
for err := range producer.Errors() {
log.Printf("Failed to send, err: %s\n", err)
errorCounter++
}
}()
// 2.2 处理 success
go func() {
wg.Add(1)
defer wg.Done()
for suc := range producer.Successes() {
log.Printf("Success to send, partition: %d, offset: %d, value: %s\n", suc.Partition, suc.Offset, suc.Value)
successCounter++
}
}()
// 增加控制信号
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt) // ctrl + c
loop:
for {
// 生成特定的消息,随机生成
// 消息关联的数据id
id := rand.Intn(5)
time.Sleep(1000 * time.Millisecond)
select {
case producer.Input() <- &sarama.ProducerMessage{
Topic: "topic_more_partition_1", // 3 partitions
Value: sarama.StringEncoder("MaShiBing Go, id:" + fmt.Sprintf("%d", id)),
// Hash
//Key: sarama.StringEncoder(fmt.Sprintf("%d", id)),
// manual
Partition: int32(id % 3),
}:
sendCounter++
// for 终止信号
case <-signals:
// 异步终止
producer.AsyncClose()
break loop
}
}
// wg 等待
wg.Wait()
// 输出统计结果
log.Printf("SendCounter: %d, ErrorCounter: %d, SuccessCounter: %d\n", sendCounter, errorCounter, successCounter)
}

@ -0,0 +1,38 @@
package main
import (
"github.com/Shopify/sarama"
"log"
)
func main() {
// 一,获取同步生产者
brokers := []string{"localhost:9092"}
producer, err := sarama.NewSyncProducer(brokers, nil)
if err != nil {
log.Fatalln(err)
}
// 保证关闭生产者
defer func() {
if err := producer.Close(); err != nil {
log.Fatalln(err)
}
}()
// 二,设置消息内容
msg := &sarama.ProducerMessage{
Topic: "sync_topic",
Value: sarama.StringEncoder("Mashibing Go Kafka."),
//Value: sarama.ByteEncoder
}
// 三,发送消息
// 返回:分区索引,偏移量,错误
partition, offset, err := producer.SendMessage(msg)
if err != nil {
log.Printf("Send failed, err: %s\n", err)
} else {
log.Printf("Send Success, partition: %d, offset: %d\n", partition, offset)
}
}
Loading…
Cancel
Save