You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

81 lines
1.5 KiB

package main
import (
"encoding/json"
"github.com/Shopify/sarama"
"log"
"time"
)
// 需要发送的消息类型
type Event struct {
Name string `json:"name"`
Type string `json:"type"`
Source string `json:"source"`
Target string `json:"target"`
Time time.Time `json:"time"`
}
type EventEncoder Event
func (e EventEncoder) Encode() ([]byte, error) {
// 将 Event 类型数据做JSON编码
j, err := json.Marshal(e)
if err != nil {
return nil, err
}
return j, nil
}
func (e EventEncoder) Length() int {
// 编码之后的数据长度
j, err := json.Marshal(e)
if err != nil {
return 0
}
return len(j)
}
func main() {
// 一,获取同步生产者
brokers := []string{"localhost:9092"}
producer, err := sarama.NewSyncProducer(brokers, nil)
if err != nil {
log.Fatalln(err)
}
// 保证关闭生产者
defer func() {
if err := producer.Close(); err != nil {
log.Fatalln(err)
}
}()
// 模拟数据
evt := Event{
Name: "user",
Type: "buy",
Source: "42",
Target: "10098",
Time: time.Now(),
}
// 二,设置消息内容
msg := &sarama.ProducerMessage{
Topic: "event_topic",
//Value: sarama.StringEncoder("Mashibing Go Kafka."),
//Value: sarama.ByteEncoder
Value: EventEncoder(evt),
}
// 三,发送消息
// 返回:分区索引,偏移量,错误
partition, offset, err := producer.SendMessage(msg)
if err != nil {
log.Printf("Send failed, err: %s\n", err)
} else {
log.Printf("Send Success, partition: %d, offset: %d\n", partition, offset)
}
}