本质是自定义存储offset, 解决消息消费&offset数据一致性问题。

因为不管是自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);, 还是关闭自动提交改为false(其中手动提交分为同步提交offsetconsumer.commitSync()或者异步提交consumer.commitAsync(new OffsetCommitCallback()...)),

不管先存offset还是先存数据: 都会有问题:

  1. offset先存:broker挂:数据还没存: broker重启:offset已经存了但是那段数据没存=>数据丢失;
  2. 反过来,数据存了但是offset还没存broker挂了重启就会重复消费;

也就是不论是什么顺序,只要1.维持offset和2.存offset对应的数据,只要这是两步,那么中间突然中断=>都会导致数据一致性问题。如果offset放在内存中,那么出现这种问题几乎是必然的: 所以offset要持久化到DB => 多步要一起成功或失败=> 事务, i.e. . 这两步放在同一个transaction中存入mysql

但是offset的维护是相当繁琐的,因为需要考虑到消费者的Rebalace。消费者发生Rebalance之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的offset位置继续消费。

步骤:

  1. 订阅的时候传入参数ConsumerRebalanceListener进行监听;
  2. 自己根据业务需求实现其中的方法。

一般是要自己写onPartitionsRevokedonPartitionsAssigned两个方法,一个是rebalance之前调用另一个真的发生了rebalance之后调用—有点相当于AOP的增强功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class CustomConsumer {

private static Map<TopicPartition, Long> currentOffset = new HashMap<>();

public static void main(String[] args) {

Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserialization");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserialization");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka01");

KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(properties);

consumer.subscribe(Arrays.asList("first", "second"), new ConsumerRebalanceListener() {
@Override // 在Rebalance之前调用
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
commitOffset(currentOffset);
}

@Override // Rebalance了!!
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
currentOffset.clear();
for (TopicPartition partition : collection) {
consumer.seek(partition, getOffset(partition));
}
}
});

while (true) {
ConsumerRecords<Object, Object> consumerRecords = consumer.poll(1000);
for (ConsumerRecord<Object, Object> record : consumerRecords) {
System.out.println(record.partition() + record.offset());

currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset());
commitOffset(currentOffset); // 异步提交
}
}
}

// 获取某个partition最近的offset
public static int getOffset(TopicPartition partition) {
return 0;
}

public static void commitOffset(Map<TopicPartition, Long> currentOffset) {

}
}

注:
消息的丢失和重复问题, 分为Producer-BrokerConsumer-Broker两个地方的丢失和重复问题