You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

74 lines
1.5 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package main
import (
"github.com/Shopify/sarama"
"log"
"time"
)
func main() {
// 一创建Broker对象
addr := "localhost:9092"
broker := sarama.NewBroker(addr)
// 打开Broker建立连接
conf := sarama.NewConfig()
if err := broker.Open(conf); err != nil {
log.Fatalln(err)
}
defer func() {
_ = broker.Close()
}()
// open 会连接,但是是非阻塞连接模式
// 不会等到连接成功再返回
// 通常会强制连接判定 broker.Connected()
// 1.2 判定连接状态
connected, err := broker.Connected()
if err != nil {
log.Fatalln(err)
}
log.Printf("Connected: %v\n", connected)
// 二,设置主题
topicKey := "topic_more_partition_1"
topicDetail := &sarama.TopicDetail{
// 分区数量
NumPartitions: 3,
// 复制因子
ReplicationFactor: 1,
}
// 三,发出创建主题的请求
// 请求对象
request := &sarama.CreateTopicsRequest{
TopicDetails: map[string]*sarama.TopicDetail{
topicKey: topicDetail,
// 同时创建多个是支持的
},
Timeout: 1 * time.Second,
}
// 发出请求
response, err := broker.CreateTopics(request)
if err != nil {
log.Fatalln(err)
}
log.Println(response)
// 四使用consumer查看有几个分区
consumer, err := sarama.NewConsumer([]string{addr}, nil)
if err != nil {
log.Fatalln(err)
}
defer func() {
if err := consumer.Close(); err != nil {
log.Fatalln(err)
}
}()
// 二获取topic下的分区列表
partitions, err := consumer.Partitions(topicKey)
if err != nil {
log.Fatalln(err)
}
log.Println(partitions) // []int32
}