本质是自定义存储offset, 解决消息消费&offset数据一致性问题。
因为不管是自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
, 还是关闭自动提交改为false
(其中手动提交分为同步提交offsetconsumer.commitSync()
或者异步提交consumer.commitAsync(new OffsetCommitCallback()...)
),
不管先存offset
还是先存数据: 都会有问题:
- offset先存:broker挂:数据还没存: broker重启:offset已经存了但是那段数据没存=>
数据丢失
;
- 反过来,数据存了但是offset还没存broker挂了重启就会
重复消费
;
也就是不论是什么顺序,只要1.维持offset和2.存offset对应的数据,只要这是两步,那么中间突然中断=>都会导致数据一致性问题。如果offset放在内存中,那么出现这种问题几乎是必然的: 所以offset要持久化到DB => 多步要一起成功或失败=> 事务, i.e. . 这两步放在同一个transaction中存入mysql
但是offset的维护是相当繁琐的,因为需要考虑到消费者的Rebalace。消费者发生Rebalance之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的offset位置继续消费。
步骤:
- 订阅的时候传入参数
ConsumerRebalanceListener
进行监听;
- 自己根据业务需求实现其中的方法。
一般是要自己写onPartitionsRevoked
和onPartitionsAssigned
两个方法,一个是rebalance之前调用另一个真的发生了rebalance之后调用—有点相当于AOP的增强功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| public class CustomConsumer {
private static Map<TopicPartition, Long> currentOffset = new HashMap<>();
public static void main(String[] args) {
Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserialization"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserialization"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka01");
KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList("first", "second"), new ConsumerRebalanceListener() { @Override // 在Rebalance之前调用 public void onPartitionsRevoked(Collection<TopicPartition> collection) { commitOffset(currentOffset); }
@Override // Rebalance了!! public void onPartitionsAssigned(Collection<TopicPartition> collection) { currentOffset.clear(); for (TopicPartition partition : collection) { consumer.seek(partition, getOffset(partition)); } } });
while (true) { ConsumerRecords<Object, Object> consumerRecords = consumer.poll(1000); for (ConsumerRecord<Object, Object> record : consumerRecords) { System.out.println(record.partition() + record.offset());
currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset()); commitOffset(currentOffset); // 异步提交 } } }
// 获取某个partition最近的offset public static int getOffset(TopicPartition partition) { return 0; }
public static void commitOffset(Map<TopicPartition, Long> currentOffset) {
} }
|
注:
消息的丢失和重复问题, 分为Producer-Broker
和Consumer-Broker
两个地方的丢失和重复问题