本质是自定义存储offset, 解决消息消费&offset数据一致性问题。
因为不管是自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);, 还是关闭自动提交改为false(其中手动提交分为同步提交offsetconsumer.commitSync()或者异步提交consumer.commitAsync(new OffsetCommitCallback()...)), 
不管先存offset还是先存数据: 都会有问题: 
- offset先存:broker挂:数据还没存: broker重启:offset已经存了但是那段数据没存=>数据丢失;
- 反过来,数据存了但是offset还没存broker挂了重启就会重复消费;
也就是不论是什么顺序,只要1.维持offset和2.存offset对应的数据,只要这是两步,那么中间突然中断=>都会导致数据一致性问题。如果offset放在内存中,那么出现这种问题几乎是必然的: 所以offset要持久化到DB => 多步要一起成功或失败=> 事务, i.e. . 这两步放在同一个transaction中存入mysql
但是offset的维护是相当繁琐的,因为需要考虑到消费者的Rebalace。消费者发生Rebalance之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的offset位置继续消费。
步骤: 
- 订阅的时候传入参数ConsumerRebalanceListener进行监听;
- 自己根据业务需求实现其中的方法。
一般是要自己写onPartitionsRevoked和onPartitionsAssigned两个方法,一个是rebalance之前调用另一个真的发生了rebalance之后调用—有点相当于AOP的增强功能。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 
 | public class CustomConsumer {
 private static Map<TopicPartition, Long> currentOffset = new HashMap<>();
 
 public static void main(String[] args) {
 
 Properties properties = new Properties();
 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserialization");
 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserialization");
 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka01");
 
 KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(properties);
 
 consumer.subscribe(Arrays.asList("first", "second"), new ConsumerRebalanceListener() {
 @Override // 在Rebalance之前调用
 public void onPartitionsRevoked(Collection<TopicPartition> collection) {
 commitOffset(currentOffset);
 }
 
 @Override // Rebalance了!!
 public void onPartitionsAssigned(Collection<TopicPartition> collection) {
 currentOffset.clear();
 for (TopicPartition partition : collection) {
 consumer.seek(partition, getOffset(partition));
 }
 }
 });
 
 while (true) {
 ConsumerRecords<Object, Object> consumerRecords =  consumer.poll(1000);
 for (ConsumerRecord<Object, Object> record : consumerRecords) {
 System.out.println(record.partition() + record.offset());
 
 currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset());
 commitOffset(currentOffset); // 异步提交
 }
 }
 }
 
 // 获取某个partition最近的offset
 public static int getOffset(TopicPartition partition) {
 return 0;
 }
 
 public static void commitOffset(Map<TopicPartition, Long> currentOffset) {
 
 }
 }
 
 | 
注:
消息的丢失和重复问题, 分为Producer-Broker和Consumer-Broker两个地方的丢失和重复问题