package main import ( "github.com/Shopify/sarama" "log" "time" ) func main() { // 一,创建Broker对象 addr := "localhost:9092" broker := sarama.NewBroker(addr) // 打开Broker,建立连接 conf := sarama.NewConfig() if err := broker.Open(conf); err != nil { log.Fatalln(err) } defer func() { _ = broker.Close() }() // open 会连接,但是是非阻塞连接模式 // 不会等到连接成功再返回 // 通常会强制连接判定 broker.Connected() // 1.2 判定连接状态 connected, err := broker.Connected() if err != nil { log.Fatalln(err) } log.Printf("Connected: %v\n", connected) // 二,设置主题 topicKey := "topic_more_partition_1" topicDetail := &sarama.TopicDetail{ // 分区数量 NumPartitions: 3, // 复制因子 ReplicationFactor: 1, } // 三,发出创建主题的请求 // 请求对象 request := &sarama.CreateTopicsRequest{ TopicDetails: map[string]*sarama.TopicDetail{ topicKey: topicDetail, // 同时创建多个是支持的 }, Timeout: 1 * time.Second, } // 发出请求 response, err := broker.CreateTopics(request) if err != nil { log.Fatalln(err) } log.Println(response) // 四,使用consumer查看有几个分区 consumer, err := sarama.NewConsumer([]string{addr}, nil) if err != nil { log.Fatalln(err) } defer func() { if err := consumer.Close(); err != nil { log.Fatalln(err) } }() // 二,获取topic下的分区列表 partitions, err := consumer.Partitions(topicKey) if err != nil { log.Fatalln(err) } log.Println(partitions) // []int32 }