You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

114 lines
2.7 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package main
import (
"context"
"github.com/Shopify/sarama"
"log"
"os"
"os/signal"
"sync"
)
// 分组消费
func main() {
// 一,创建消费者组
addrs := []string{"localhost:9092"}
groupID := "mashibingGroup"
conf := sarama.NewConfig()
// 消费信道返回
conf.Consumer.Return.Errors = true
// 设置分配策略
// 优先顺序配置
// 找到第一个组内全部的consumer都支持的策略作为分配策略
conf.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{
sarama.BalanceStrategySticky, // 新的更加优化的分配策略
sarama.BalanceStrategyRange, // default
sarama.BalanceStrategyRoundRobin, //
}
group, err := sarama.NewConsumerGroup(addrs, groupID, conf)
if err != nil {
log.Fatalln(err)
}
defer func() {
_ = group.Close()
}()
// 二处理group的错误
go func() {
for err := range group.Errors() {
log.Println(err)
}
}()
// 三,组消费
// 带有 cancel context
topics := []string{"topic_more_partition_1"}
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
go func() {
wg.Add(1)
defer wg.Done()
for {
// 每当组内消费者成员改变时,重新执行,重新分配
// for 保证重新执行
handler := GroupConsumeHandler{}
if err := group.Consume(ctx, topics, handler); err != nil {
log.Println(err)
}
// 判定 context 是否cancel
if ctx.Err() != nil {
log.Println(ctx.Err())
return
}
}
}()
// 信号阻塞
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
select {
case <-signals:
// 终止
cancel()
}
wg.Wait()
}
// 零,定义组消费处理器 group consume handler
type GroupConsumeHandler struct{}
// 重新消费时执行,增减组内消费者时,会执行
// ConsumerGroupSession消费者组会话
func (GroupConsumeHandler) Setup(cgs sarama.ConsumerGroupSession) error {
log.Println("setup")
log.Println(cgs.Claims())
//cgs.ResetOffset("topic_more_partition_1", 0, 2048, "")
return nil
}
// 当组内消费者退出时执行
func (GroupConsumeHandler) Cleanup(sarama.ConsumerGroupSession) error {
log.Println("cleanup")
return nil
}
// 组消费的核心方法
// ConsumerGroupSession消费者组会话
// sarama.ConsumerGroupClaim 组资产数据,使用该参数,完成消息的消费
func (GroupConsumeHandler) ConsumeClaim(cgs sarama.ConsumerGroupSession, cgc sarama.ConsumerGroupClaim) error {
log.Println("consumeClain")
// 不要 goroutine 中完成因为group会自动goroutine完成
// 消费
for msg := range cgc.Messages() {
log.Printf("Consumed message, partition: %d, offset: %d\n", msg.Partition, msg.Offset)
// 标记该消息已经被消费
cgs.MarkMessage(msg, "")
}
return nil
}