Kafka生产者
生产者
Kafka 生产者(Producer)是 Apache Kafka 中负责将数据(消息)发送到 Kafka 主题(Topics)的一种客户端。生产者通常运行在数据生成端,能够高效地将大规模数据流传输到 Kafka 中,以供消费者(Consumers)处理和分析。
发送消息流程:
RecordAccumulator: 在jvm的内存中开辟了一块缓存空间,是多个双端队列(方便内存的回收),用于将多条消息合并成一个批次,然后由sender线程发送给kafka集群。
生产者最后还会处理发送结果:生产者处理 Broker 返回的响应。如果消息成功发送,生产者会收到成功确认,结束请求、清理队列。如果发送失败,生产者可以重试或根据配置处理错误。
生产者重要参数总结:
参数名称 | 描述 |
---|---|
bootstrap.servers | 生产者连接集群所有的 broker 地址清单。例如 ip1:9092, ip2:9092, ip3:9092,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker 里查找到其他 broker 信息。 |
key.serializer 和 value.serializer | 指定发送消息的 key 和 value 的序列化类型。一定要写全类名。 |
buffer.memory | RecordAccumulator 缓冲区总大小,默认 32m。 |
batch.size | 缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。 |
linger.ms | 如果数据迟迟未达到 batch.size、sender 等待 linger.time 之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值为 5~100ms 之间。 |
acks | 0:生产者发送过来的数据,不需要等到确认落盘就可以。 1:生产者发送过来的数据,Leader 收到数据后返回。 -1(all):生产者发送过来的数据,Leader 和 isr 队列里面的所有节点收到数据后返回。默认值是-1,-1 和 all 是等价的。 |
max.in.flight.requests.per.connection | 允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字。 |
retries | 当消息发送出现错误的时候,系统会重发消息。retries 表示重试次数。默认是 int 最大值,2147483647。 如果要设置重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1,否则在重试此失败消息的时候,其他的消息可能已经发送成功了。 |
retry.backoff.ms | 两次重试之间的时间间隔,默认是 100ms。 |
enable.idempotence | 是否开启幂等性,默认 true,开启幂等性。 |
compression.type | 生产者发送的所有数据的压缩方式。默认是 none,也就是说不压缩。 支持压缩类型:none, gzip, snappy, lz4 和 zstd。 在吞吐量方面:LZ4 > Snappy > zstd 和 GZIP; 在压缩比方面,zstd > LZ4 > GZIP > Snappy。(但是避免 1. Broker 端和Producer 端不同的压缩算法 2. 保证消息格式转换,例如版本不同会导致格式转换甚至导致使用不了零拷贝) |
通常为了提高生产者的吞吐量,经常修改的参数:
- batch.size:增大一批的大小
- linger.ms:延迟等待的时间
- compression.type:将消息数据进行压缩
- buffer.memory:缓冲区大小(一般分区多了才会修改这个参数)
同步发送
在 Kafka 中,生产者可以使用同步发送和异步发送两种方式将消息发送到 Kafka 集群。同步发送要求生产者在发送消息后等待 Kafka 代理(Broker)的确认响应,只有在收到确认响应后,生产者才会继续发送下一条消息。这种方式确保了消息的可靠性,但会增加延迟和影响吞吐量。
工作流程:
- 消息序列化:生产者将消息序列化为字节数组。
- 选择分区:根据配置或分区策略选择目标分区。
- 发送消息:将序列化后的消息发送到选定的分区。
- 等待确认:生产者等待 Kafka 代理返回的确认响应,确保消息成功写入。
- 处理结果:根据返回的响应结果,决定下一步操作(成功、重试或报错)。
基本案例:
添加client依赖:
1
2
3
4
5<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>编写代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31public class CustomProducer {
public static void main(String[] args) {
// 1. 给 kafka 配置对象添加配置信息:bootstrap.servers
Properties properties = new Properties();
//broker信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"ip1:9092,ip2:9092");
//配置序列化类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 2. 创建 kafka 生产者的配置对象,泛型是key、value的类型
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String,String>(properties);
// 3. 创建 kafka 生产者对象
for (int i = 0; i < 5; i++) {
// 同步点在于get()方法
kafkaProducer.send(new ProducerRecord("test", "test" + i), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
System.out.println( "分区 : " + recordMetadata.partition() + " 主题: " + recordMetadata.topic() );
}
}
}).get();
Thread.sleep(100);
}
kafkaProducer.close();
}
}
异步发送
与同步发送需要等待 Kafka 返回确认响应后才能继续发送下一条消息,而异步发送则允许生产者在发送消息后立即进行其他操作(即发送到RecordAccumulator 中就行),无需等待确认响应,从而提高了发送效率和吞吐量。
工作流程:
- 消息序列化:生产者将消息序列化为字节数组。
- 选择分区:根据配置或分区策略选择目标分区。
- 放入缓冲区:将消息放入生产者的缓冲区中。
- 异步发送:生产者从缓冲区中批量取出消息,并将其发送到 Kafka Broker,同时返回继续处理其他任务。
- 回调处理:Kafka Broker 处理完消息后,通过回调函数通知生产者消息发送的结果(成功或失败)。
基本案例:
添加client依赖:
1
2
3
4
5<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>编写代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31public class CustomProducer {
public static void main(String[] args) {
// 1. 给 kafka 配置对象添加配置信息:bootstrap.servers
Properties properties = new Properties();
//broker信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"ip1:9092,ip2:9092");
//配置序列化类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 2. 创建 kafka 生产者的配置对象,泛型是key、value的类型
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String,String>(properties);
// 3. 创建 kafka 生产者对象
for (int i = 0; i < 5; i++) {
// 不带回调的发送方法,第一个参数为topic,第二个为消息数据
kafkaProducer.send(new ProducerRecord("test", "test" + i));
// 带回调的发送方法,第三个参数为回调函数,会返回主题、分区等消息
kafkaProducer.send(new ProducerRecord("test", "test" + i), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
System.out.println( "分区 : " + recordMetadata.partition() + " 主题: " + recordMetadata.topic() );
}
}
});
}
kafkaProducer.close();
}
}生产者拦截器
Kafka 生产者拦截器(Producer Interceptor)是一种用于在消息被发送到 Kafka Broker 之前和确认(ack)被回调之前,对消息进行拦截和处理的机制。它可以用于修改、过滤、监控或者记录消息。拦截器为增强消息发送逻辑提供了灵活性和扩展性。
常用场景:
- 消息修改:在消息发送之前,修改消息内容或元数据。
- 消息过滤:根据某些条件过滤掉不需要发送的消息。
- 监控和统计:收集消息发送前和发送后的统计信息,例如消息数量、发送时间等。
- 日志记录:记录每条消息的发送过程,以便调试和审计。
简单实现:
创建自定义拦截器实现ProducerInterceptor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// 在消息发送之前修改消息内容
String modifiedValue = "Modified: " + record.value();
return new ProducerRecord<>(record.topic(), record.partition(), record.key(), modifiedValue);
}
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// 在消息发送成功或失败之后记录结果;但是这个方法早于send的callback方法,并且不是同一个线程调用注意线程安全;并且这个方法在Producer发送的主路径上,避免过重的操作
if (exception == null) {
System.out.printf("Message sent successfully to topic %s partition %d offset %d%n",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
exception.printStackTrace();
}
}
public void configure(Map<String, ?> configs) {
// 初始化拦截器配置(如果有)
}
public void close() {
// 关闭拦截器,清理资源
}
}注册拦截器:
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,CustomProducerInterceptor.getClass.getName());
序列化器
Kafka 中的序列化器(Serializer)用于将生产者发送的消息对象转换为字节数组,以便能够传输到 Kafka 代理(Broker)。反之,消费者使用反序列化器(Deserializer)将字节数组转换为消息对象。Kafka 提供了多种序列化器以满足不同的数据格式需求。
序列化种类:
StringSerializer
:用于将字符串对象转换为字节数组。它是最常用的序列化器之一,适用于发送简单的文本数据。ByteArraySerializer
用于将字节数组直接传输,不需要进一步序列化。这种序列化器适用于已经是字节数组形式的数据。IntegerSerializer
用于将整数(Integer
)对象转换为字节数组。这种序列化器适用于发送整数类型的数据。LongSerializer
用于将长整数(Long
)对象转换为字节数组。这种序列化器适用于发送长整数类型的数据。DoubleSerializer
用于将双精度浮点数(Double
)对象转换为字节数组。这种序列化器适用于发送双精度浮点类型的数据。FloatSerializer
用于将单精度浮点数(Float
)对象转换为字节数组。这种序列化器适用于发送单精度浮点类型的数据。ShortSerializer
用于将短整数(Short
)对象转换为字节数组。这种序列化器适用于发送短整数类型的数据。BytesSerializer
类似于ByteArraySerializer
,用于直接传输字节数组。与ByteArraySerializer
的区别在于其实现和用法。KafkaAvroSerializer
是 Confluent 提供的序列化器,用于将 Avro 对象转换为字节数组。它通常用于与 Confluent Schema Registry 一起使用,以管理和验证 Avro 模式。JsonSerializer
用于将 Java 对象转换为 JSON 格式的字节数组。它通常用于结构化数据的传输。自定义序列化器:Kafka 还允许用户自定义序列化器。自定义序列化器需要实现
org.apache.kafka.common.serialization.Serializer
接口1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18public class CustomSerializer<T> implements Serializer<T> {
public void configure(Map<String, ?> configs, boolean isKey) {
// 配置序列化器(如果需要)
}
public byte[] serialize(String topic, T data) {
// 实现序列化逻辑
return data.toString().getBytes();
}
public void close() {
// 关闭序列化器,清理资源
}
}分区器
Kafka 分区器(Partitioner)是用于确定消息应该发送到哪个分区(Partition)的机制。分区是 Kafka 中逻辑上的消息存储单元,不同分区可以分布在不同的 Broker 上,从而实现负载均衡和并行处理。
好处:
- 便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成块一块数据存储在多台Broker上;合理控制分区的任务,可以实现负载均衡的效果。
- 提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。
分区器根据配置和消息内容选择目标分区。分区选择可以基于以下几种策略:
- 轮询(Round Robin):消息被轮流发送到各个分区,分布均匀。
- 基于键的分区(Key-based Partitioning):如果消息包含键(Key),生产者会使用键的哈希值来选择分区。
- 自定义分区器(Custom Partitioner):用户可以实现自定义分区逻辑来决定消息的目标分区。
默认分区器
Kafka 提供了一个默认的分区器 DefaultPartitioner
,其分区策略如下:
- 如果指定分区:按照指定的分区进行发送
- 有 Key 的消息:使用 Key 的哈希值对分区数取模(
hash(key) % numPartitions
),从而确定消息的分区。这种方式可以确保相同 Key 的消息总是发送到同一个分区。 - 没有 Key 的消息:如果消息没有指定 Key,则会随机选择一个分区(随机轮询),直到该分区batch(默认16k)已满或已完成,再会随机选另一个分区。
自定义分区器
Kafka 允许用户实现自定义分区器,以满足特定的业务需求。自定义分区器需要实现 org.apache.kafka.clients.producer.Partitioner
接口。
例如:就可以根据 Broker 所在的 IP 地址实现定制化的分区策略,实现分配就近分区
简单实现:
创建自定义分区器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public class CustomPartitioner implements Partitioner {
public void configure(Map<String, ?> configs) {
// 配置初始化(如果需要)
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取主题的分区数
int numPartitions = cluster.partitionsForTopic(topic).size();
// 根据消息值的长度来确定分区
int partition = valueBytes.length % numPartitions;
return partition;
}
public void close() {
// 关闭分区器,清理资源
}
}注册分区器:
properties.put("partitioner.class", "com.example.CustomPartitioner");
消息可靠性
在 Kafka 生产者一方,确保数据可靠性是至关重要的。Kafka 提供了acks机制来增强数据传输的可靠性,从而保证消息不会在传输到broker过程中丢失。
ack机制:
acks
参数决定了生产者在收到 broker 的确认(acknowledgement)之前,必须等待的确认数量。该参数有以下三种配置:
acks=0
:生产者不会等待任何确认。这种配置性能最高,但可靠性最低,因为消息可能会在没有任何确认的情况下丢失。acks=1
:只要领导者分区(leader partition)写入消息成功,生产者就会收到确认。这种配置提供了一定的可靠性,但如果领导者崩溃,可能会丢失消息。acks=all
(或acks=-1
):只有当所有同步副本(in-sync replicas, ISR)都确认写入成功,生产者才会收到确认。这种配置提供最高的可靠性,确保消息不会丢失。当前acks为all时,为避免一直等待Follower同步数据,提供了一种Follower通讯检测的机制:
如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由
replica.lag.time.max.ms
参数设定,默认30s。ISR最小应答数配置:
min.insync.replicas
默认为1
总结:所以要保证生产者严格的消息可靠,至少要让acks = all + 副本数 >= 2 + 应答最小副本数 >= 2
即设置 replication.factor >= 3、min.insync.replicas > 1;确保 replication.factor > min.insync.replicas
消息不重复性
kafka为保证消息的不重复,提供了幂等性和事务的机制
幂等性
kafka中幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复,但是只能保证单分区单会话内的不重复。
幂等性生产者依赖于以下几个关键要素来确保消息不重复:
- Producer ID(PID):
- 当生产者启用幂等性时,Kafka 会为每个生产者分配一个唯一的
Producer ID
(PID)。 - PID 在生产者的整个生命周期内保持不变,重启会重新分配。
- 当生产者启用幂等性时,Kafka 会为每个生产者分配一个唯一的
- Partition:分区号
- Sequence Number(序列号):
- 每个消息在发送时都会被分配一个递增的序列号。
- Kafka 代理(broker)会跟踪每个分区上已接收消息的最高序列号。
- 当生产者重试发送消息时,代理会检查序列号,以确保不会接受重复的消息。
开启幂等性:properties.put("enable.idempotence", "true");
如果需要kafka重启之后仍保证不重复,只有配合生产者事务一起使用
生产者事务
Kafka 的事务机制(Kafka Transactions)允许生产者在一个原子操作中发送多条消息,确保这些消息要么全部成功提交,要么全部回滚。这个特性对保证数据一致性和处理复杂的事件流非常重要。
要使用事务,生产者必须启用幂等性并配置事务 ID
基本概念
- Transactional Producer:启用事务的生产者。
- Transactional ID:事务生产者的唯一标识,用于标识和管理生产者的事务,Producer在使用事务功能前,必须先自定义一个唯一的Transactional ID,即使客户端挂掉了,它重启后也能继续处理未完成的事务。
- Transaction Coordinator:Kafka 的内部组件,负责管理事务的生命周期,会根据事务ID进行哈希后的值来决定使用哪个分区Leader副本所在broker的Transaction Coordinator。
工作流程:
- 初始化事务:调用
initTransactions()
方法,生产者与 Kafka 集群协调器进行初始化,分配transactional.id
。 - 开始事务:调用
beginTransaction()
方法,开始一个新的事务。 - 发送消息:使用常规的
send()
方法发送消息,这些消息会被记录在事务中。 - 提交事务:调用
commitTransaction()
方法,提交事务,所有在事务中的消息会被原子性地写入到目标主题。 - 回滚事务:如果在事务期间发生错误,可以调用
abortTransaction()
方法回滚事务,所有在事务中的消息都会被丢弃。
简单案例:
1 | public class TransactionalProducerExample { |
消息有序性
在 Kafka 中,消息的有序性是通过分区(partition)实现的。每个主题(topic)由一个或多个分区组成,分区是 Kafka 中实现消息顺序和并行处理的基本单元。
单分区内有序:
- Kafka 保证在同一个分区内,消息是按生产者发送的顺序存储和消费的。也就是说,消息的顺序在每个分区内是有保证的。
在kafka1.x版本之前,需要保证单分区有序还必须配置:
max.in.flight.requests.per.connection = 1
在kafka1.x版本之后
- 如果没有开启幂等性则也需要配置;
- 如果开启了幂等性,需要
max.in.flight.requests.per.connection
设置为小于5(因为小于5之后会在broker端进行缓存,并按照序列号重新排序)
全局有序:
- 对于整个主题(即跨多个分区)的消息,Kafka 并不保证全局有序性。要实现全局有序性,需要将所有消息发送到同一个分区,这通常会牺牲并行处理的优势和吞吐量。
如果不想发一个分区,那可以将多个分区一起获取出来,然后再进行统一排序来保证有序(这样效率很低)