diff --git a/Kafka.md b/Kafka.md index 4f019a7..271bf17 100644 --- a/Kafka.md +++ b/Kafka.md @@ -35,3 +35,41 @@ int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] va 3. Key-ordering List partitions = cluster.partitionsForTopic(topic); return Math.abs(key.hashCode()) % partitions.size(); + +### 无消息丢失配置 + +##### 案例 1:生产者程序丢失数据 + +1. producer.send(msg, callback) +2. 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。 +3. 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试 +##### 案例 2:生产者程序丢失数据 +1. 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余 +2. 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1 +3. 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。 + +#### 案例 3:消费者程序丢失数据 +1. 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。 +维持先消费消息(阅读),再更新位移(书签)的顺序 + +### Kafka 拦截器 +``` +public interface ProducerInterceptor extends Configurable { +ProducerRecord onSend(ProducerRecord var1); + //该方法会在消息发送之前被调用。如果你想在发送之前对消息“美美容”,这个方法是你唯一的机会。 + void onAcknowledgement(RecordMetadata var1, Exception var2); + //该方法会在消息成功提交或发送失败之后被调用. onAcknowledgement 的调用要早于 callback 的调用 + void close(); +} +``` + +``` +public interface ConsumerInterceptor extends Configurable { + ConsumerRecords onConsume(ConsumerRecords var1); + //该方法在消息返回给 Consumer 程序之前调用 + void onCommit(Map var1); + //Consumer 在提交位移之后调用该方法 + void close(); +} +``` +