commit a8e375ee13ea9434757deb85d490029f18ba4cf5 Author: han-joker Date: Fri Nov 25 23:02:12 2022 +0800 full commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9cd587c --- /dev/null +++ b/.gitignore @@ -0,0 +1,42 @@ +# Reference https://github.com/github/gitignore/blob/master/Go.gitignore +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +vendor/ + +# Go workspace file +go.work + +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# OS General +Thumbs.db +.DS_Store + +# project +*.cert +*.key +*.log +bin/ + +# Develop tools +.vscode/ +.idea/ +*.swp + +# volumes +data/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..c37d064 --- /dev/null +++ b/README.md @@ -0,0 +1,6 @@ +# Kafka之go编程实战 +注意,根目录下的代码都是单独测试文件,运行时需要单独运行指定文件,例如: +```shell +go run create-topic.go +``` +章节对应的测试文件,请参考《Kafka之Go操作实战-文档.md》中的说明 \ No newline at end of file diff --git a/ack.go b/ack.go new file mode 100644 index 0000000..a5fda37 --- /dev/null +++ b/ack.go @@ -0,0 +1,29 @@ +package main + +import ( + "github.com/Shopify/sarama" + "log" +) + +func main() { + // 一,得到异步的producer + brokers := []string{"localhost:9092"} + conf := sarama.NewConfig() + // 1, default + conf.Producer.RequiredAcks = sarama.WaitForLocal + // -1 + conf.Producer.RequiredAcks = sarama.WaitForAll + // 0 + conf.Producer.RequiredAcks = sarama.NoResponse + producer, err := sarama.NewAsyncProducer(brokers, conf) + if err != nil { + log.Fatalln(err) + } + + // 通常还要配合 + // 未同步的follower是否可以选举为leader + //unclean.leader.election.enable = false + // 最小的正常同步状态的 follower 数量 + //min.insync.replicas = $(N/2 + 1) + +} diff --git a/async-send.go b/async-send.go new file mode 100644 index 0000000..442b09c --- /dev/null +++ b/async-send.go @@ -0,0 +1,137 @@ +package main + +import ( + "github.com/Shopify/sarama" + "log" + "os" + "os/signal" + "sync" + "time" +) + +func main() { + // select demo + //selectDemo() + + // + goroutineDemo() +} + +func goroutineDemo() { + // 一,得到异步的producer + brokers := []string{"localhost:9292"} + conf := sarama.NewConfig() + // 开启 Success channel 来接收发送成功的信息 + conf.Producer.Return.Successes = true + producer, err := sarama.NewAsyncProducer(brokers, conf) + if err != nil { + log.Fatalln(err) + } + + // 二,启用 goroutine + var wg sync.WaitGroup + sendCounter, errorCounter, successCounter := 0, 0, 0 + + // 2.1 处理 errors + go func() { + wg.Add(1) + defer wg.Done() + for err := range producer.Errors() { + log.Printf("Failed to send, err: %s\n", err) + errorCounter++ + } + }() + + // 2.2 处理 success + go func() { + wg.Add(1) + defer wg.Done() + for suc := range producer.Successes() { + log.Printf("Success to send, partition: %d, offset: %d\n", suc.Partition, suc.Offset) + successCounter++ + } + }() + + // 增加控制信号 + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) // ctrl + c + +loop: + for { + time.Sleep(500 * time.Millisecond) + select { + case producer.Input() <- &sarama.ProducerMessage{ + Topic: "cctopic", + Value: sarama.StringEncoder("MaShiBing Go"), + }: + sendCounter++ + + // for 终止信号 + case <-signals: + // 异步终止 + producer.AsyncClose() + break loop + + } + } + + // wg 等待 + wg.Wait() + + // 输出统计结果 + log.Printf("SendCounter: %d, ErrorCounter: %d, SuccessCounter: %d\n", sendCounter, errorCounter, successCounter) +} + +func selectDemo() { + // 一,得到异步的producer + brokers := []string{"localhost:9092"} + conf := sarama.NewConfig() + // 开启 Success channel 来接收发送成功的信息 + conf.Producer.Return.Successes = true + producer, err := sarama.NewAsyncProducer(brokers, conf) + if err != nil { + log.Fatalln(err) + } + // close + defer func() { + if err := producer.Close(); err != nil { + log.Fatalln(err) + } + }() + + // 增加控制信号 + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) // ctrl + c + + // 二,send message + sendCounter, errorCounter, successCounter := 0, 0, 0 +loop: + for { + time.Sleep(100 * time.Millisecond) + select { + // send + case producer.Input() <- &sarama.ProducerMessage{ + Topic: "async_topic", + Value: sarama.StringEncoder("MaShiBing Go"), + }: + sendCounter++ + + // error + case err := <-producer.Errors(): + log.Printf("Failed to send, err: %s\n", err) + errorCounter++ + + // success + case suc := <-producer.Successes(): + log.Printf("Success to send, partition: %d, offset: %d\n", suc.Partition, suc.Offset) + successCounter++ + + // for 终止信号 + case <-signals: + break loop + } + } + + // 输出统计结果 + log.Printf("SendCounter: %d, ErrorCounter: %d, SuccessCounter: %d\n", sendCounter, errorCounter, successCounter) +} diff --git a/consume-partition.go b/consume-partition.go new file mode 100644 index 0000000..a910125 --- /dev/null +++ b/consume-partition.go @@ -0,0 +1,65 @@ +package main + +import ( + "github.com/Shopify/sarama" + "log" + "os" + "os/signal" +) + +func main() { + // 一,获取消费者 + consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil) + if err != nil { + log.Fatalln(err) + } + defer func() { + if err := consumer.Close(); err != nil { + log.Fatalln(err) + } + }() + + // 二,获取topic下的分区列表 + topic := "topic_more_partition_1" // 3 + partitions, err := consumer.Partitions(topic) + if err != nil { + log.Fatalln(err) + } + log.Println(partitions) + + // 三,遍历每个分区 + var consumeCounter = 0 + for _, partition := range partitions { + // 获取每个分区的消费者 + partitionConsumer, err := consumer.ConsumePartition(topic, partition, sarama.OffsetNewest) + if err != nil { + log.Fatalln(err) + } + + // 四,利用 goroutine 完成消费 + go func(pc sarama.PartitionConsumer) { + // 关闭 + defer func() { + if err := pc.Close(); err != nil { + log.Fatalln(err) + } + }() + + // 消费 + for msg := range pc.Messages() { + log.Println(pc) + log.Printf("Consumed message, partition: %d, offset: %d, topic: %s, value: %s\n", msg.Partition, msg.Offset, msg.Topic, msg.Value) + consumeCounter++ + } + + }(partitionConsumer) + } + + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) + select { + case <-signals: + } + + log.Printf("Consumed: %d\n", consumeCounter) +} diff --git a/consume.go b/consume.go new file mode 100644 index 0000000..ac6b6e3 --- /dev/null +++ b/consume.go @@ -0,0 +1,113 @@ +package main + +import ( + "github.com/Shopify/sarama" + "log" + "os" + "os/signal" +) + +func main() { + //consumeTopic() + consumeTopicMulti() +} + +func consumeTopicMulti() { + // 一,获取消费者 + consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil) + if err != nil { + log.Fatalln(err) + } + defer func() { + if err := consumer.Close(); err != nil { + log.Fatalln(err) + } + }() + + // 二,获取topic下的分区列表 + topic := "async_topic" + partitions, err := consumer.Partitions(topic) + if err != nil { + log.Fatalln(err) + } + + // 三,遍历每个分区 + var consumeCounter = 0 + for _, partition := range partitions { + // 获取每个分区的消费者 + partitionConsumer, err := consumer.ConsumePartition(topic, partition, sarama.OffsetNewest) + if err != nil { + log.Fatalln(err) + } + + // 四,利用 goroutine 完成消费 + go func(pc sarama.PartitionConsumer) { + // 关闭 + defer func() { + if err := pc.Close(); err != nil { + log.Fatalln(err) + } + }() + + // 消费 + for msg := range pc.Messages() { + log.Printf("Consumed message, partition: %d, offset: %d, topic: %s, value: %s\n", msg.Partition, msg.Offset, msg.Topic, msg.Value) + consumeCounter++ + } + + }(partitionConsumer) + } + + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) + select { + case <-signals: + } + + log.Printf("Consumed: %d\n", consumeCounter) +} + +func consumeTopic() { + // 一,获取消费者 + consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil) + if err != nil { + log.Fatalln(err) + } + defer func() { + if err := consumer.Close(); err != nil { + log.Fatalln(err) + } + }() + + // 二,获取topic具体某个partition的的消费者 + // topic 1:N partition + partitionConsumer, err := consumer.ConsumePartition("async_topic", 0, sarama.OffsetNewest) + if err != nil { + log.Fatalln(err) + } + defer func() { + if err := partitionConsumer.Close(); err != nil { + log.Fatalln(err) + } + }() + + // 三,消费 + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) + + consumeCounter := 0 +loop: + for { + select { + case msg := <-partitionConsumer.Messages(): + log.Printf("Consumed message, partition: %d, offset: %d, topic: %s, value: %s\n", msg.Partition, msg.Offset, msg.Topic, msg.Value) + consumeCounter++ + + case <-signals: + break loop + } + } + + log.Printf("Consumed: %d\n", consumeCounter) + +} diff --git a/create-topic.go b/create-topic.go new file mode 100644 index 0000000..f7d59bc --- /dev/null +++ b/create-topic.go @@ -0,0 +1,73 @@ +package main + +import ( + "github.com/Shopify/sarama" + "log" + "time" +) + +func main() { + // 一,创建Broker对象 + addr := "localhost:9092" + broker := sarama.NewBroker(addr) + // 打开Broker,建立连接 + conf := sarama.NewConfig() + if err := broker.Open(conf); err != nil { + log.Fatalln(err) + } + defer func() { + _ = broker.Close() + }() + // open 会连接,但是是非阻塞连接模式 + // 不会等到连接成功再返回 + // 通常会强制连接判定 broker.Connected() + // 1.2 判定连接状态 + connected, err := broker.Connected() + if err != nil { + log.Fatalln(err) + } + log.Printf("Connected: %v\n", connected) + + // 二,设置主题 + topicKey := "topic_more_partition_1" + topicDetail := &sarama.TopicDetail{ + // 分区数量 + NumPartitions: 3, + // 复制因子 + ReplicationFactor: 1, + } + + // 三,发出创建主题的请求 + // 请求对象 + request := &sarama.CreateTopicsRequest{ + TopicDetails: map[string]*sarama.TopicDetail{ + topicKey: topicDetail, + // 同时创建多个是支持的 + }, + Timeout: 1 * time.Second, + } + // 发出请求 + response, err := broker.CreateTopics(request) + if err != nil { + log.Fatalln(err) + } + log.Println(response) + + // 四,使用consumer查看有几个分区 + consumer, err := sarama.NewConsumer([]string{addr}, nil) + if err != nil { + log.Fatalln(err) + } + defer func() { + if err := consumer.Close(); err != nil { + log.Fatalln(err) + } + }() + + // 二,获取topic下的分区列表 + partitions, err := consumer.Partitions(topicKey) + if err != nil { + log.Fatalln(err) + } + log.Println(partitions) // []int32 +} diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..346fe32 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,20 @@ +version: "3" +services: + zookeeper: + image: 'bitnami/zookeeper:latest' + ports: + - '2181:2181' + environment: + - ALLOW_ANONYMOUS_LOGIN=yes + kafka: + image: 'bitnami/kafka:latest' + ports: + - '9092:9092' + environment: + - KAFKA_BROKER_ID=1 + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 + - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 + - ALLOW_PLAINTEXT_LISTENER=yes + depends_on: + - zookeeper \ No newline at end of file diff --git a/docker-compose_single.yml b/docker-compose_single.yml new file mode 100644 index 0000000..346fe32 --- /dev/null +++ b/docker-compose_single.yml @@ -0,0 +1,20 @@ +version: "3" +services: + zookeeper: + image: 'bitnami/zookeeper:latest' + ports: + - '2181:2181' + environment: + - ALLOW_ANONYMOUS_LOGIN=yes + kafka: + image: 'bitnami/kafka:latest' + ports: + - '9092:9092' + environment: + - KAFKA_BROKER_ID=1 + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 + - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 + - ALLOW_PLAINTEXT_LISTENER=yes + depends_on: + - zookeeper \ No newline at end of file diff --git a/encode-consume.go b/encode-consume.go new file mode 100644 index 0000000..e5e4c1e --- /dev/null +++ b/encode-consume.go @@ -0,0 +1,70 @@ +package main + +import ( + "encoding/json" + "github.com/Shopify/sarama" + "log" + "os" + "os/signal" + "time" +) + +type Event struct { + Name string `json:"name"` + Type string `json:"type"` + Source string `json:"source"` + Target string `json:"target"` + Time time.Time `json:"time"` +} + +func main() { + // 一,获取消费者 + consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil) + if err != nil { + log.Fatalln(err) + } + defer func() { + if err := consumer.Close(); err != nil { + log.Fatalln(err) + } + }() + + // 二,获取topic具体某个partition的的消费者 + // topic 1:N partition + partitionConsumer, err := consumer.ConsumePartition("event_topic", 0, sarama.OffsetNewest) + if err != nil { + log.Fatalln(err) + } + defer func() { + if err := partitionConsumer.Close(); err != nil { + log.Fatalln(err) + } + }() + + // 三,消费 + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) + + consumeCounter := 0 +loop: + for { + select { + case msg := <-partitionConsumer.Messages(): + log.Printf("Consumed message, partition: %d, offset: %d, topic: %s, value: %s, type:%T\n", msg.Partition, msg.Offset, msg.Topic, msg.Value, msg.Value) + evt := Event{} + err := json.Unmarshal(msg.Value, &evt) + if err != nil { + log.Fatalln(err) + } + log.Println(evt) + log.Printf("%T\n", evt) + + consumeCounter++ + + case <-signals: + break loop + } + } + + log.Printf("Consumed: %d\n", consumeCounter) +} diff --git a/encoder.go b/encoder.go new file mode 100644 index 0000000..83347c6 --- /dev/null +++ b/encoder.go @@ -0,0 +1,80 @@ +package main + +import ( + "encoding/json" + "github.com/Shopify/sarama" + "log" + "time" +) + +// 需要发送的消息类型 +type Event struct { + Name string `json:"name"` + Type string `json:"type"` + Source string `json:"source"` + Target string `json:"target"` + Time time.Time `json:"time"` +} + +type EventEncoder Event + +func (e EventEncoder) Encode() ([]byte, error) { + // 将 Event 类型数据做JSON编码 + j, err := json.Marshal(e) + if err != nil { + return nil, err + } + return j, nil +} + +func (e EventEncoder) Length() int { + // 编码之后的数据长度 + j, err := json.Marshal(e) + if err != nil { + return 0 + } + + return len(j) +} + +func main() { + // 一,获取同步生产者 + brokers := []string{"localhost:9092"} + producer, err := sarama.NewSyncProducer(brokers, nil) + if err != nil { + log.Fatalln(err) + } + // 保证关闭生产者 + defer func() { + if err := producer.Close(); err != nil { + log.Fatalln(err) + } + }() + + // 模拟数据 + evt := Event{ + Name: "user", + Type: "buy", + Source: "42", + Target: "10098", + Time: time.Now(), + } + + // 二,设置消息内容 + msg := &sarama.ProducerMessage{ + Topic: "event_topic", + //Value: sarama.StringEncoder("Mashibing Go Kafka."), + //Value: sarama.ByteEncoder + Value: EventEncoder(evt), + } + + // 三,发送消息 + // 返回:分区索引,偏移量,错误 + partition, offset, err := producer.SendMessage(msg) + if err != nil { + log.Printf("Send failed, err: %s\n", err) + } else { + log.Printf("Send Success, partition: %d, offset: %d\n", partition, offset) + } + +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..caae1b7 --- /dev/null +++ b/go.mod @@ -0,0 +1,26 @@ +module goKafka + +go 1.19 + +require github.com/Shopify/sarama v1.37.2 + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/eapache/go-resiliency v1.3.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect + github.com/eapache/queue v1.1.0 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect + github.com/klauspost/compress v1.15.11 // indirect + github.com/pierrec/lz4/v4 v4.1.17 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect + golang.org/x/net v0.0.0-20220927171203-f486391704dc // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..b846ebe --- /dev/null +++ b/go.sum @@ -0,0 +1,75 @@ +github.com/Shopify/sarama v1.37.2 h1:LoBbU0yJPte0cE5TZCGdlzZRmMgMtZU/XgnUKZg9Cv4= +github.com/Shopify/sarama v1.37.2/go.mod h1:Nxye/E+YPru//Bpaorfhc3JsSGYwCaDDj+R4bK52U5o= +github.com/Shopify/toxiproxy/v2 v2.5.0 h1:i4LPT+qrSlKNtQf5QliVjdP08GyAH8+BUIc9gT0eahc= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.3.0 h1:RRL0nge+cWGlxXbUzJ7yMcq6w2XBEr19dCN6HECGaT0= +github.com/eapache/go-resiliency v1.3.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.3 h1:iTonLeSJOn7MVUtyMT+arAn5AKAPrkilzhGw8wE/Tq8= +github.com/jcmturner/gokrb5/v8 v8.4.3/go.mod h1:dqRwJGXznQrzw6cWmyo6kH+E7jksEQG/CyVWsJEsJO0= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c= +github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= +github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= +github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa h1:zuSxTR4o9y82ebqCUJYNGJbGPo6sKVl54f/TVDObg1c= +golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM= +golang.org/x/net v0.0.0-20220927171203-f486391704dc h1:FxpXZdoBqT8RjqTy6i1E8nXHhW21wK7ptQ/EPIGxzPQ= +golang.org/x/net v0.0.0-20220927171203-f486391704dc/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= +golang.org/x/sync v0.0.0-20220923202941-7f9b1623fab7 h1:ZrnxWX62AgTKOSagEqxvb3ffipvEDX2pl7E1TdqLqIc= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/group-consume.go b/group-consume.go new file mode 100644 index 0000000..9e5a74d --- /dev/null +++ b/group-consume.go @@ -0,0 +1,113 @@ +package main + +import ( + "context" + "github.com/Shopify/sarama" + "log" + "os" + "os/signal" + "sync" +) + +// 分组消费 + +func main() { + + // 一,创建消费者组 + addrs := []string{"localhost:9092"} + groupID := "mashibingGroup" + conf := sarama.NewConfig() + // 消费信道返回 + conf.Consumer.Return.Errors = true + // 设置分配策略 + // 优先顺序配置 + // 找到第一个,组内全部的consumer都支持的策略作为分配策略 + conf.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{ + sarama.BalanceStrategySticky, // 新的更加优化的分配策略 + sarama.BalanceStrategyRange, // default + sarama.BalanceStrategyRoundRobin, // + } + group, err := sarama.NewConsumerGroup(addrs, groupID, conf) + if err != nil { + log.Fatalln(err) + } + defer func() { + _ = group.Close() + }() + + // 二,处理group的错误 + go func() { + for err := range group.Errors() { + log.Println(err) + } + }() + + // 三,组消费 + // 带有 cancel context + topics := []string{"topic_more_partition_1"} + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + go func() { + wg.Add(1) + defer wg.Done() + for { + // 每当组内消费者成员改变时,重新执行,重新分配 + // for 保证重新执行 + handler := GroupConsumeHandler{} + if err := group.Consume(ctx, topics, handler); err != nil { + log.Println(err) + } + + // 判定 context 是否cancel + if ctx.Err() != nil { + log.Println(ctx.Err()) + return + } + } + }() + + // 信号阻塞 + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) + + select { + case <-signals: + // 终止 + cancel() + } + + wg.Wait() +} + +// 零,定义组消费处理器 group consume handler +type GroupConsumeHandler struct{} + +// 重新消费时执行,增减组内消费者时,会执行 +// ConsumerGroupSession,消费者组会话 +func (GroupConsumeHandler) Setup(cgs sarama.ConsumerGroupSession) error { + log.Println("setup") + log.Println(cgs.Claims()) + //cgs.ResetOffset("topic_more_partition_1", 0, 2048, "") + return nil +} + +// 当组内消费者退出时执行 +func (GroupConsumeHandler) Cleanup(sarama.ConsumerGroupSession) error { + log.Println("cleanup") + return nil +} + +// 组消费的核心方法 +// ConsumerGroupSession,消费者组会话 +// sarama.ConsumerGroupClaim 组资产数据,使用该参数,完成消息的消费 +func (GroupConsumeHandler) ConsumeClaim(cgs sarama.ConsumerGroupSession, cgc sarama.ConsumerGroupClaim) error { + log.Println("consumeClain") + // 不要 goroutine 中完成,因为group会自动goroutine完成 + // 消费 + for msg := range cgc.Messages() { + log.Printf("Consumed message, partition: %d, offset: %d\n", msg.Partition, msg.Offset) + // 标记该消息已经被消费 + cgs.MarkMessage(msg, "") + } + return nil +} diff --git a/send-partition.go b/send-partition.go new file mode 100644 index 0000000..192c9f6 --- /dev/null +++ b/send-partition.go @@ -0,0 +1,105 @@ +package main + +import ( + "fmt" + "github.com/Shopify/sarama" + "hash" + "hash/crc32" + "log" + "math/rand" + "os" + "os/signal" + "sync" + "time" +) + +func main() { + // 一,得到异步的producer + brokers := []string{"localhost:9092"} + conf := sarama.NewConfig() + // 开启 Success channel 来接收发送成功的信息 + conf.Producer.Return.Successes = true + // 配置producer选项分区的策略 + // 随机,全部的消息不需要逻辑上的分割 + //conf.Producer.Partitioner = sarama.NewRandomPartitioner + // 轮循,全部的消息不需要逻辑上的分割 + //conf.Producer.Partitioner = sarama.NewRoundRobinPartitioner + + // Hash,基于特定的Key,完成分区选择。具有逻辑含义 + //conf.Producer.Partitioner = sarama.NewHashPartitioner + //自定义Hash算法,需要完成hash32的函数 + conf.Producer.Partitioner = sarama.NewCustomHashPartitioner(func() hash.Hash32 { + //poly: x³²+ x³¹+ x²⁴+ x²²+ x¹⁶+ x¹⁴+ x⁸+ x⁷+ x⁵ + //0b11010101100000101000001010000001 + return crc32.New(crc32.MakeTable(0xD5828281)) + }) + // 指定,配合 partition使用 + //conf.Producer.Partitioner = sarama.NewManualPartitioner + producer, err := sarama.NewAsyncProducer(brokers, conf) + if err != nil { + log.Fatalln(err) + } + + // 二,启用 goroutine + var wg sync.WaitGroup + sendCounter, errorCounter, successCounter := 0, 0, 0 + + // 2.1 处理 errors + go func() { + wg.Add(1) + defer wg.Done() + for err := range producer.Errors() { + log.Printf("Failed to send, err: %s\n", err) + errorCounter++ + } + }() + + // 2.2 处理 success + go func() { + wg.Add(1) + defer wg.Done() + for suc := range producer.Successes() { + log.Printf("Success to send, partition: %d, offset: %d, value: %s\n", suc.Partition, suc.Offset, suc.Value) + successCounter++ + } + }() + + // 增加控制信号 + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) // ctrl + c + +loop: + + for { + // 生成特定的消息,随机生成 + // 消息关联的数据id + id := rand.Intn(5) + + time.Sleep(1000 * time.Millisecond) + select { + case producer.Input() <- &sarama.ProducerMessage{ + Topic: "topic_more_partition_1", // 3 partitions + Value: sarama.StringEncoder("MaShiBing Go, id:" + fmt.Sprintf("%d", id)), + // Hash + //Key: sarama.StringEncoder(fmt.Sprintf("%d", id)), + // manual + Partition: int32(id % 3), + }: + sendCounter++ + + // for 终止信号 + case <-signals: + // 异步终止 + producer.AsyncClose() + break loop + + } + } + + // wg 等待 + wg.Wait() + + // 输出统计结果 + log.Printf("SendCounter: %d, ErrorCounter: %d, SuccessCounter: %d\n", sendCounter, errorCounter, successCounter) + +} diff --git a/sync-send.go b/sync-send.go new file mode 100644 index 0000000..9f28134 --- /dev/null +++ b/sync-send.go @@ -0,0 +1,38 @@ +package main + +import ( + "github.com/Shopify/sarama" + "log" +) + +func main() { + // 一,获取同步生产者 + brokers := []string{"localhost:9092"} + producer, err := sarama.NewSyncProducer(brokers, nil) + if err != nil { + log.Fatalln(err) + } + // 保证关闭生产者 + defer func() { + if err := producer.Close(); err != nil { + log.Fatalln(err) + } + }() + + // 二,设置消息内容 + msg := &sarama.ProducerMessage{ + Topic: "sync_topic", + Value: sarama.StringEncoder("Mashibing Go Kafka."), + //Value: sarama.ByteEncoder + } + + // 三,发送消息 + // 返回:分区索引,偏移量,错误 + partition, offset, err := producer.SendMessage(msg) + if err != nil { + log.Printf("Send failed, err: %s\n", err) + } else { + log.Printf("Send Success, partition: %d, offset: %d\n", partition, offset) + } + +}