package main import ( "encoding/json" "github.com/Shopify/sarama" "log" "time" ) // 需要发送的消息类型 type Event struct { Name string `json:"name"` Type string `json:"type"` Source string `json:"source"` Target string `json:"target"` Time time.Time `json:"time"` } type EventEncoder Event func (e EventEncoder) Encode() ([]byte, error) { // 将 Event 类型数据做JSON编码 j, err := json.Marshal(e) if err != nil { return nil, err } return j, nil } func (e EventEncoder) Length() int { // 编码之后的数据长度 j, err := json.Marshal(e) if err != nil { return 0 } return len(j) } func main() { // 一,获取同步生产者 brokers := []string{"localhost:9092"} producer, err := sarama.NewSyncProducer(brokers, nil) if err != nil { log.Fatalln(err) } // 保证关闭生产者 defer func() { if err := producer.Close(); err != nil { log.Fatalln(err) } }() // 模拟数据 evt := Event{ Name: "user", Type: "buy", Source: "42", Target: "10098", Time: time.Now(), } // 二,设置消息内容 msg := &sarama.ProducerMessage{ Topic: "event_topic", //Value: sarama.StringEncoder("Mashibing Go Kafka."), //Value: sarama.ByteEncoder Value: EventEncoder(evt), } // 三,发送消息 // 返回:分区索引,偏移量,错误 partition, offset, err := producer.SendMessage(msg) if err != nil { log.Printf("Send failed, err: %s\n", err) } else { log.Printf("Send Success, partition: %d, offset: %d\n", partition, offset) } }