package main import ( "context" "github.com/Shopify/sarama" "log" "os" "os/signal" "sync" ) // 分组消费 func main() { // 一,创建消费者组 addrs := []string{"localhost:9092"} groupID := "mashibingGroup" conf := sarama.NewConfig() // 消费信道返回 conf.Consumer.Return.Errors = true // 设置分配策略 // 优先顺序配置 // 找到第一个,组内全部的consumer都支持的策略作为分配策略 conf.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{ sarama.BalanceStrategySticky, // 新的更加优化的分配策略 sarama.BalanceStrategyRange, // default sarama.BalanceStrategyRoundRobin, // } group, err := sarama.NewConsumerGroup(addrs, groupID, conf) if err != nil { log.Fatalln(err) } defer func() { _ = group.Close() }() // 二,处理group的错误 go func() { for err := range group.Errors() { log.Println(err) } }() // 三,组消费 // 带有 cancel context topics := []string{"topic_more_partition_1"} ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} go func() { wg.Add(1) defer wg.Done() for { // 每当组内消费者成员改变时,重新执行,重新分配 // for 保证重新执行 handler := GroupConsumeHandler{} if err := group.Consume(ctx, topics, handler); err != nil { log.Println(err) } // 判定 context 是否cancel if ctx.Err() != nil { log.Println(ctx.Err()) return } } }() // 信号阻塞 signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) select { case <-signals: // 终止 cancel() } wg.Wait() } // 零,定义组消费处理器 group consume handler type GroupConsumeHandler struct{} // 重新消费时执行,增减组内消费者时,会执行 // ConsumerGroupSession,消费者组会话 func (GroupConsumeHandler) Setup(cgs sarama.ConsumerGroupSession) error { log.Println("setup") log.Println(cgs.Claims()) //cgs.ResetOffset("topic_more_partition_1", 0, 2048, "") return nil } // 当组内消费者退出时执行 func (GroupConsumeHandler) Cleanup(sarama.ConsumerGroupSession) error { log.Println("cleanup") return nil } // 组消费的核心方法 // ConsumerGroupSession,消费者组会话 // sarama.ConsumerGroupClaim 组资产数据,使用该参数,完成消息的消费 func (GroupConsumeHandler) ConsumeClaim(cgs sarama.ConsumerGroupSession, cgc sarama.ConsumerGroupClaim) error { log.Println("consumeClain") // 不要 goroutine 中完成,因为group会自动goroutine完成 // 消费 for msg := range cgc.Messages() { log.Printf("Consumed message, partition: %d, offset: %d\n", msg.Partition, msg.Offset) // 标记该消息已经被消费 cgs.MarkMessage(msg, "") } return nil }