Kafka消费者
消费者
概述
Kafka消费者是Kafka系统中用于读取并处理Kafka主题(topic)中的消息的组件。Kafka的消费者在消费消息时会从特定的主题和分区中读取数据,然后进行处理。
消费模式:
由于推模式很难考虑到每个客户端不同的消费速率,导致消费者无法消费消息而宕机,因此kafka采用的是poll的模式,该模式有个缺点,如果服务端没有消息,消费端就会一直空轮询。
为了避免过多不必要的空轮询,kafka做了改进,如果没消息服务端就会暂时保持该请求,在一段时间内有消息再回应给客户端。
工作流程:
Kafka消费者订阅主题后,从指定分区拉取消息,处理并提交消费进度(偏移量)以记录已消费的位置,并在消费者组内通过再均衡机制实现负载均衡和故障恢复,确保每个分区的消息仅被一个组内消费者消费。
消费者组(Consumer Group):消费者组是Kafka中用于管理和协调多个消费者共同消费一个或多个主题的机制,通过添加多个消费者的消费组,并行处理数据提高消息消费速度
特点:
- 唯一的组ID:每个消费者组都有一个唯一的组ID,用于标识这个组。
- 水平扩展:同一消费者组内的多个消费者可以并行消费同一主题的不同分区,实现负载均衡。例如,如果一个主题有4个分区,一个消费者组有4个消费者,那么每个消费者将消费一个分区的数据。
- 单一消费保证:Kafka确保每个分区中的消息在一个消费者组中只会被一个消费者消费,避免消息重复消费。
- 再均衡机制:当消费者组中的成员发生变化(如新增或移除消费者)时,Kafka会触发再均衡,重新分配分区给消费者组中的消费者。
一般消费者组中消费者比分区都多了,就会有闲置的消费者
offset:偏移量是Kafka中用于标识消息在分区中的位置的唯一标识,它是一个不断增加的整数值(老版本存在Zookeeper中,由于频繁通信问题,将其移到每个broker中的__consumer_offsets主题中)
- _consumer_offsets主题里面采用key和 value的方式存储数据。key是
groupid+topic+分区号
,value就是当前offset的值。每隔一段时间,kafka 内部会对这个topic进行compact,也就是每个groupid+topic+分区号
就保留最新数据(这个主题默认50个分区,3个副本)。- Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的
可删除数据。这个后台线程叫 Log Cleaner。很多实际生产环境中都出现过位移主题无限膨
胀占用过多磁盘空间的问题,如果你的环境中也有这个问题,我建议你去检查一下 Log
Cleaner 线程的状态,通常都是这个线程挂掉了导致的。
- Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的
- 如果需要想查看这个主题,则需要在
config/consumer.properties
中添加配置exclude.internaltopics=false
- _consumer_offsets主题里面采用key和 value的方式存储数据。key是
组协调器(Group Coordinator)
定义:
组协调器是Kafka中的一个Broker节点,负责管理消费者组的成员关系和偏移量提交。
消费组的协调器是由groupid决定的,coordinator节点选择=groupid的hashcode值%50(__consumer_offsets的分区数量)
主要职责:
- 管理消费者组成员:
- 处理消费者的加入和离开请求。
- 维护消费者组内的成员列表。
- 触发再均衡过程,在消费者组成员发生变化时重新分配分区。
- 协调再均衡:
- 当消费者组内的成员(消费者)发生变化(如新增、移除消费者,或订阅的主题发生变化)时,组协调器触发再均衡过程。
- 再均衡过程中,组协调器重新分配分区给组内的消费者,确保每个分区仅被一个消费者消费。
- 管理偏移量:
- 接收并存储消费者提交的偏移量。
- 偏移量通常存储在Kafka的内部主题
__consumer_offsets
中。 - 在消费者重新启动或故障恢复时,提供最新的偏移量信息,确保消费者可以从正确的位置继续消费。
工作流程:
消费案例
消费详细流程分析:
代码编写:
创建单个消费者,消费整个topic中的消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27public class CustomConsumer {
public static void main(String[] args) {
//0.配置信息
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip1:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 消费组的id一定要带
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
//1.创建消费者,并订阅主题
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
ArrayList<String> topic = new ArrayList<>();
topic.add("test");
kafkaConsumer.subscribe(topic);
//2.消费信息
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
records.forEach(record -> {
System.out.println(record);
});
}
//3.关闭
}
}创建单个消费者,消费整个topic中某一分区的消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27public class CustomConsumerPartiton {
public static void main(String[] args) {
//0.配置信息
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip1:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 消费组的id一定要带
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
//1.创建消费者,并订阅主题分区
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
ArrayList<TopicPartition> topicPartition = new ArrayList<>();
topicPartition.add(new TopicPartition("test",0));
kafkaConsumer.assign(topicPartition);
//2.消费信息
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
records.forEach(record -> {
System.out.println(record);
});
}
//3.关闭
}
}创建消费者组,对topic进行消费:这里只需要创建三个单一的消费者,要组成一个消费组,只需要使用统一的groupid即可
消费分区分配
Kafka消费分区分配是指Kafka消费者群组中的每个消费者如何分配到不同的分区上,以实现并行消费和负载均衡。Kafka使用了多种分区分配策略来确保数据的高效处理。
重平衡时机:
- 组成员数量发生变化
- 订阅主题数量发生变化
- 订阅主题的分区数发生变化
我们需要尽量去避免非必要的重平衡:
- 例如Consumer 实例会被 Coordinator 错误地认为“已停止”从而被“踢出”Group,如果是这个原因导致的 Rebalance,则需要去避免
- 未能及时发送心跳:
- 设置 session.timeout.ms = 6s
- 设置 heartbeat.interval.ms = 2s
- Consumer 消费时间过长导致的:
max.poll.interval.ms
可以调大,或者优化消费逻辑- Consumer 端GC太频繁并且久:查看Consumer 端的 GC 表现
- 相关参数:
session.timeout.ms
:心跳超时时间heartbeat.interval.ms
:心跳检测间隔时间max.poll.interval.ms
:限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔,超过时间还没有poll则会被踢出
参数控制:partition.assignment.strategy
(默认策略是Range+CooperativeSticky
)
Kafka提供了几种分区分配策略,主要包括:
RangeAssignor:
特点:
- 按范围分配。
- 该策略会对所有消费者按字典序排序,并对分区也按字典序排序。然后将分区按范围分配给消费者。具体而言,每个消费者会尽量均匀地分配连续的一段分区。
- 如果除不尽,多的分区给最前面的分区进行消费,如果余数多了会导致第一个分区消费压力大,严重的数据倾斜
- 并且如果消费者挂了之后,45秒之前发送的消息会交给其他的一个消费者进行消费,消费完之后会进行再分配,会将其消费的所有分区按范围重新分配
适用场景:适用于分区数量相对较少且消费者变动不频繁的场景。
RoundRobinAssignor:
特点:
- 轮询分配。
- 该策略会将所有分区和所有消费者按hashcode排序,然后逐个将分区分配给消费者,类似于轮询调度。每个消费者会得到尽量相等数量的分区,但分区的连续性不会被保证。
- 如果消费者挂了,45秒之前的消息会轮询的给其他消费者,45秒之后会重新轮询分配
适用场景:适用于需要均匀分配分区但不要求分区连续性的场景。
StickyAssignor:
特点:
- 粘性分配。
- 该策略在分区分配时尽量保持上次分配结果的不变。如果必须重新分配分区,也会尽量保证最小的变动。这有助于减少消费者在分区重新分配时的状态恢复工作。
适用场景:适用于对再平衡过程中的状态恢复成本敏感的场景。
CooperativeStickyAssignor:
特点:
- 渐进式粘性分配。
- 进一步减少再平衡对消费者的干扰。传统的
StickyAssignor
在再平衡时会导致所有消费者停止消费并重新分配分区,而CooperativeStickyAssignor
采用了逐步再平衡的方法,使得只有部分消费者会在一次再平衡中被影响。
适用场景:适用于对消费延迟敏感、消费者变动频繁且需要高稳定性的场景。
offset的提交
Kafka支持两种主要的offset提交方式:
- 自动提交(Auto Commit)
- 手动提交(Manual Commit)
自动提交
在自动提交模式下,Kafka消费者会自动定期提交当前消费到的offset。这种方式的优点是实现简单,不需要开发者显式提交offset,但缺点是可能会在失败恢复时重复消费消息,或者丢失部分消息。
自动提交offset的相关参数:
enable.auto.commit
:是否开启自动提交offset功能,默认是trueauto.commit.interval.ms
:自动提交offset的时间间隔,默认是5s
案例:
1 | Properties props = new Properties(); |
手动提交
手动提交模式下,消费者需要显式地在代码中提交offset。这样可以更灵活地控制提交逻辑,确保只有在消息处理完成后才提交offset,从而避免重复消费或消息丢失。
手动提交又分为同步提交和异步提交两种方式:
同步提交:同步提交会等待Kafka返回提交结果,确保offset成功提交。如果提交失败,可以捕获异常并进行处理。
案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false"); // 关闭自动提交
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
try {
consumer.commitSync(); // 同步提交offset
} catch (CommitFailedException e) {
e.printStackTrace();
}
}
异步提交:异步提交不会阻塞当前线程,可以提高提交效率,但无法立即知道提交结果。可以通过回调函数处理提交结果。
案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false"); // 关闭自动提交
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
// 异步提交
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
System.err.printf("Commit failed for offsets %s%n", offsets, exception);
}
});
}通常将两者进行结合:因为同步是阻塞的,影响系统TPS,但是异步失败后又不会重试;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
commitAysnc(); // 使用异步提交规避阻塞
}
} catch (Exception e) {
handle(e); // 处理异常
} finally {
try {
consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
} finally {
consumer.close();
}
}Offset 提交的最佳实践
确保处理完成后再提交:无论是自动提交还是手动提交,都应确保在消息处理完成后再提交offset,以避免重复消费。
定期提交:在手动提交模式下,定期提交offset可以减少在失败恢复时需要重复处理的消息数量。
处理提交失败:在手动提交模式下,特别是同步提交时,应处理提交失败的情况,以避免数据丢失。
控制提交频率:频繁提交offset会增加Kafka的负担,但过少提交可能导致更多的重复消费。应根据业务需求调整提交频率。
分小批提交:将poll的一大批消息分成一小批一小批提交
1
2
3
4
5
6
7
8
9
10
11
12
13
14private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record: records) {
process(record); // 处理消息
offsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1);
// 每100条消息提交一次
if(count % 100 == 0)
consumer.commitAsync(offsets, null); // 回调处理逻辑是
count++;
}
}指定消费位置
在kafka中当消费者查找不到所记录的消费位移时,会根据auto.offset.reset的配置,决定从何处消费。
auto.offset.reset = earliest | latest | none
默认是 latest。
earliest
:自动将偏移量重置为最早的偏移量,–from-beginning。latest
(默认值):自动将偏移量重置为最新偏移量none
:如果未找到消费者组的先前偏移量,则向消费者抛出异常。
除了上述消费方式,还有两种特殊的消费方式:
指定offset位置消费:
消费者可以使用
seek
方法来指定从某个特定的offset位置开始消费。这个方法需要在消费者订阅主题并且分区分配完成后调用案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "test-topic";
TopicPartition partition = new TopicPartition(topic, 0); // 指定主题和分区
consumer.assign(Arrays.asList(partition)); // 手动分配分区
long offsetToStart = 10L; // 指定要从哪个offset开始消费
consumer.seek(partition, offsetToStart); // 指定offset位置
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (var record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
consumer.commitSync(); // 手动提交offset
}
}
}这里也可以通过api获取分区信息,而不是直接new出来:
1
2
3
4
5
6
7
8
9
10
11
12
13// 指定位置进行消费
Set<TopicPartition> assignment = kafkaConsumer.assignment();
// 保证分区分配方案已经制定完毕
while (assignment.size() == 0){
kafkaConsumer.poll(Duration.ofSeconds(1));
assignment = kafkaConsumer.assignment();
}
// 指定消费的offset
for (TopicPartition topicPartition : assignment) {
kafkaConsumer.seek(topicPartition,600);
}
从特定的时间戳开始消费:
Kafka消费者还可以通过时间戳来确定从哪个offset开始消费。消费者可以使用
offsetsForTimes
方法,该方法接受一个包含主题分区和时间戳的映射,并返回从该时间戳开始的offset案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "test-topic";
TopicPartition partition = new TopicPartition(topic, 0); // 指定主题和分区
consumer.assign(Collections.singletonList(partition)); // 手动分配分区
long timestampToStart = System.currentTimeMillis() - 24 * 60 * 60 * 1000; // 指定24小时前的时间戳
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
timestampsToSearch.put(partition, timestampToStart);
// 获取指定时间戳的offset
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampsToSearch);
OffsetAndTimestamp offsetAndTimestamp = offsets.get(partition);
if (offsetAndTimestamp != null) {
consumer.seek(partition, offsetAndTimestamp.offset());
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (var record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
consumer.commitSync(); // 手动提交offset
}
}
}拦截器
与生产者对应,消费者也有拦截器;消费者拦截器可以在消息消费之前或之后执行一些自定义逻辑,比如记录日志、过滤消息、监控性能等。使用消费者拦截器可以增强Kafka消费者的功能,使其更加灵活和可扩展。
Kafka消费者拦截器通过实现ConsumerInterceptor
接口来定义拦截逻辑。该接口包含以下方法:
- **
onConsume
**:在消费者拉取消息之后调用。可以用于处理或修改消费记录。 - **
onCommit
**:在消费者提交offset之前调用。可以用于记录提交的offset或其他监控操作。 - **
close
**:在消费者关闭时调用。可以用于清理资源。 - **
configure
**:用于配置拦截器。
消费者拦截器的简单实现
创建拦截器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public class CustomConsumerInterceptor implements ConsumerInterceptor<String, String> {
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
// 在这里处理消费的记录,比如过滤、修改等
System.out.println("Intercepted " + records.count() + " records");
return records; // 返回处理后的记录
}
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
// 在这里处理提交的offset,比如记录日志等
System.out.println("Committing offsets: " + offsets);
}
public void close() {
// 在这里进行清理操作
}
public void configure(Map<String, ?> configs) {
// 在这里配置拦截器
}
}配置拦截器:
// 配置消费者拦截器 props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.example.CustomConsumerInterceptor");
漏消费和重复消费
重复消费:已经消费了数据,但是 offset 没提交
自动提交offset引起:
漏消费:先提交 offset 后消费,有可能会造成数据的漏消费
设置offset为手动提交,当ofiset被提交时,数据还在内存中未落盘,此时刚好消费者线程被kill掉,那么offset已经提交,但是数据未处理,导致这部分内存中的数据丢失。
解决方案:消费者事务
如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将Kafka的offset保存到支持事务的自定义介质(比如MySQL)
消息积压
消息积压是Kafka中常见的问题,通常是由于消费者消费速度跟不上生产者生产速度引起的。
解决方案:
如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。
如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据,也会造成数据积压。
fetch.max.bytes
默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (broker config)ormax.message.bytes (topic config)影响max.poll.records
一次 poll 拉取数据返回消息的最大条数,默认是 500 条
重要参数
参数 | 解释 | 默认值 |
---|---|---|
bootstrap.servers | 向 Kafka 集群建立初始连接用到的 host/port 列表。 | 无 |
key.deserializer 和 value.deserializer | 指定接收消息的 key 和 value 的反序列化类型。一定要写全类名。 | 无 |
key.deserializer 和 value.deserializer | 指定接收消息的 key 和 value 的反序列化类型。一定要写全类名。 | 无 |
group.id | 标记消费者所属的消费者组。 | 无 |
enable.auto.commit | 消费者会自动周期性地向服务器提交偏移量。 | true |
auto.commit.interval.ms | 如果设置了 enable.auto.commit 为 true,则定义了消费者偏移量向 Kafka 提交的频率。 | 5s |
auto.offset.reset | 当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在时(如,数据被删除了),该如何处理?earliest: 自动重置偏移量到最早的偏移量。latest: 自动重置偏移量为最新的偏移量。none: 如果消费组之前的偏移量不存在,则抛出异常。anything: 任何消费抛异常。 | latest |
offsets.topic.num.partitions | __consumer_offsets 的分区数。 | 50 个分区 |
heartbeat.interval.msKafka | 消费者和 coordinator 之间的心跳时间间隔。此值必须小于 session.timeout.ms 的1/3。 | 3s |
session.timeout.ms | Kafka 消费者和 coordinator 之间连接超时时间,超过该值,消费者被移除,消费者组执行再平衡。 | 45s |
max.poll.interval.ms | 消费者处理消息的最大时长,超过该值,消费者被移除,消费者组执行再平衡。 | 5分钟 |
fetch.min.bytes | 消费者获取服务器端一批消息最小的字节数。 | 1字节 |
fetch.max.wait.ms | 如果没有从服务器端获取到一批数据的最小字节数,该时间到,仍然会返回数据。 | 500ms |
fetch.max.bytes | 消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes(broker config)或max.message.bytes(topic config)影响。 | 52428800(50m) |
max.poll.records | 一次poll拉取数据返回消息的最大条数。 | 500条 |