生产者

Kafka 生产者(Producer)是 Apache Kafka 中负责将数据(消息)发送到 Kafka 主题(Topics)的一种客户端。生产者通常运行在数据生成端,能够高效地将大规模数据流传输到 Kafka 中,以供消费者(Consumers)处理和分析。

发送消息流程:

image-20240526160410940

RecordAccumulator: 在jvm的内存中开辟了一块缓存空间,是多个双端队列(方便内存的回收),用于将多条消息合并成一个批次,然后由sender线程发送给kafka集群。

生产者最后还会处理发送结果:生产者处理 Broker 返回的响应。如果消息成功发送,生产者会收到成功确认,结束请求、清理队列。如果发送失败,生产者可以重试或根据配置处理错误

生产者重要参数总结:

参数名称 描述
bootstrap.servers 生产者连接集群所有的 broker 地址清单。例如 ip1:9092, ip2:9092, ip3:9092,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker 里查找到其他 broker 信息。
key.serializer 和 value.serializer 指定发送消息的 key 和 value 的序列化类型。一定要写全类名。
buffer.memory RecordAccumulator 缓冲区总大小,默认 32m。
batch.size 缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms 如果数据迟迟未达到 batch.size、sender 等待 linger.time 之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值为 5~100ms 之间。
acks 0:生产者发送过来的数据,不需要等到确认落盘就可以。
1:生产者发送过来的数据,Leader 收到数据后返回。
-1(all):生产者发送过来的数据,Leader 和 isr 队列里面的所有节点收到数据后返回。默认值是-1,-1 和 all 是等价的。
max.in.flight.requests.per.connection 允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字。
retries 当消息发送出现错误的时候,系统会重发消息。retries 表示重试次数。默认是 int 最大值,2147483647。
如果要设置重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1,否则在重试此失败消息的时候,其他的消息可能已经发送成功了。
retry.backoff.ms 两次重试之间的时间间隔,默认是 100ms。
enable.idempotence 是否开启幂等性,默认 true,开启幂等性。
compression.type 生产者发送的所有数据的压缩方式。默认是 none,也就是说不压缩。
支持压缩类型:none, gzip, snappy, lz4 和 zstd。
在吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;
在压缩比方面,zstd > LZ4 > GZIP > Snappy。(但是避免 1. Broker 端和Producer 端不同的压缩算法 2. 保证消息格式转换,例如版本不同会导致格式转换甚至导致使用不了零拷贝

通常为了提高生产者的吞吐量,经常修改的参数:

  • batch.size:增大一批的大小
  • linger.ms:延迟等待的时间
  • compression.type:将消息数据进行压缩
  • buffer.memory:缓冲区大小(一般分区多了才会修改这个参数)

同步发送

在 Kafka 中,生产者可以使用同步发送和异步发送两种方式将消息发送到 Kafka 集群。同步发送要求生产者在发送消息后等待 Kafka 代理(Broker)的确认响应,只有在收到确认响应后,生产者才会继续发送下一条消息。这种方式确保了消息的可靠性,但会增加延迟和影响吞吐量。

工作流程:

  1. 消息序列化:生产者将消息序列化为字节数组。
  2. 选择分区:根据配置或分区策略选择目标分区。
  3. 发送消息:将序列化后的消息发送到选定的分区。
  4. 等待确认:生产者等待 Kafka 代理返回的确认响应,确保消息成功写入。
  5. 处理结果:根据返回的响应结果,决定下一步操作(成功、重试或报错)。

基本案例:

  1. 添加client依赖:

    1
    2
    3
    4
    5
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.0.0</version>
    </dependency>
  2. 编写代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    public class CustomProducer {

    public static void main(String[] args) {

    // 1. 给 kafka 配置对象添加配置信息:bootstrap.servers
    Properties properties = new Properties();
    //broker信息
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"ip1:9092,ip2:9092");
    //配置序列化类型
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    // 2. 创建 kafka 生产者的配置对象,泛型是key、value的类型
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String,String>(properties);

    // 3. 创建 kafka 生产者对象
    for (int i = 0; i < 5; i++) {
    // 同步点在于get()方法
    kafkaProducer.send(new ProducerRecord("test", "test" + i), new Callback() {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    if (e == null) {
    System.out.println( "分区 : " + recordMetadata.partition() + " 主题: " + recordMetadata.topic() );
    }

    }
    }).get();
    Thread.sleep(100);
    }
    kafkaProducer.close();
    }
    }

异步发送

与同步发送需要等待 Kafka 返回确认响应后才能继续发送下一条消息,而异步发送则允许生产者在发送消息后立即进行其他操作(即发送到RecordAccumulator 中就行),无需等待确认响应,从而提高了发送效率和吞吐量。

工作流程:

  1. 消息序列化:生产者将消息序列化为字节数组。
  2. 选择分区:根据配置或分区策略选择目标分区。
  3. 放入缓冲区:将消息放入生产者的缓冲区中。
  4. 异步发送:生产者从缓冲区中批量取出消息,并将其发送到 Kafka Broker,同时返回继续处理其他任务。
  5. 回调处理:Kafka Broker 处理完消息后,通过回调函数通知生产者消息发送的结果(成功或失败)。

基本案例:

  1. 添加client依赖:

    1
    2
    3
    4
    5
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.0.0</version>
    </dependency>
  2. 编写代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    public class CustomProducer {

    public static void main(String[] args) {

    // 1. 给 kafka 配置对象添加配置信息:bootstrap.servers
    Properties properties = new Properties();
    //broker信息
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"ip1:9092,ip2:9092");
    //配置序列化类型
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    // 2. 创建 kafka 生产者的配置对象,泛型是key、value的类型
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String,String>(properties);

    // 3. 创建 kafka 生产者对象
    for (int i = 0; i < 5; i++) {
    // 不带回调的发送方法,第一个参数为topic,第二个为消息数据
    kafkaProducer.send(new ProducerRecord("test", "test" + i));
    // 带回调的发送方法,第三个参数为回调函数,会返回主题、分区等消息
    kafkaProducer.send(new ProducerRecord("test", "test" + i), new Callback() {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    if (e == null) {
    System.out.println( "分区 : " + recordMetadata.partition() + " 主题: " + recordMetadata.topic() );
    }
    }
    });
    }
    kafkaProducer.close();
    }
    }

    生产者拦截器

Kafka 生产者拦截器(Producer Interceptor)是一种用于在消息被发送到 Kafka Broker 之前和确认(ack)被回调之前,对消息进行拦截和处理的机制。它可以用于修改、过滤、监控或者记录消息。拦截器为增强消息发送逻辑提供了灵活性和扩展性。

常用场景:

  1. 消息修改:在消息发送之前,修改消息内容或元数据。
  2. 消息过滤:根据某些条件过滤掉不需要发送的消息。
  3. 监控和统计:收集消息发送前和发送后的统计信息,例如消息数量、发送时间等。
  4. 日志记录:记录每条消息的发送过程,以便调试和审计。

简单实现:

  • 创建自定义拦截器实现ProducerInterceptor

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
    // 在消息发送之前修改消息内容
    String modifiedValue = "Modified: " + record.value();
    return new ProducerRecord<>(record.topic(), record.partition(), record.key(), modifiedValue);
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    // 在消息发送成功或失败之后记录结果;但是这个方法早于send的callback方法,并且不是同一个线程调用注意线程安全;并且这个方法在Producer发送的主路径上,避免过重的操作
    if (exception == null) {
    System.out.printf("Message sent successfully to topic %s partition %d offset %d%n",
    metadata.topic(), metadata.partition(), metadata.offset());
    } else {
    exception.printStackTrace();
    }
    }

    @Override
    public void configure(Map<String, ?> configs) {
    // 初始化拦截器配置(如果有)
    }

    @Override
    public void close() {
    // 关闭拦截器,清理资源
    }
    }
  • 注册拦截器:properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,CustomProducerInterceptor.getClass.getName());

序列化器

Kafka 中的序列化器(Serializer)用于将生产者发送的消息对象转换为字节数组,以便能够传输到 Kafka 代理(Broker)。反之,消费者使用反序列化器(Deserializer)将字节数组转换为消息对象。Kafka 提供了多种序列化器以满足不同的数据格式需求。

序列化种类:

  • StringSerializer :用于将字符串对象转换为字节数组。它是最常用的序列化器之一,适用于发送简单的文本数据。

  • ByteArraySerializer 用于将字节数组直接传输,不需要进一步序列化。这种序列化器适用于已经是字节数组形式的数据。

  • IntegerSerializer 用于将整数(Integer)对象转换为字节数组。这种序列化器适用于发送整数类型的数据。

  • LongSerializer 用于将长整数(Long)对象转换为字节数组。这种序列化器适用于发送长整数类型的数据。

  • DoubleSerializer 用于将双精度浮点数(Double)对象转换为字节数组。这种序列化器适用于发送双精度浮点类型的数据。

  • FloatSerializer 用于将单精度浮点数(Float)对象转换为字节数组。这种序列化器适用于发送单精度浮点类型的数据。

  • ShortSerializer 用于将短整数(Short)对象转换为字节数组。这种序列化器适用于发送短整数类型的数据。

  • BytesSerializer 类似于 ByteArraySerializer,用于直接传输字节数组。与 ByteArraySerializer 的区别在于其实现和用法。

  • KafkaAvroSerializer 是 Confluent 提供的序列化器,用于将 Avro 对象转换为字节数组。它通常用于与 Confluent Schema Registry 一起使用,以管理和验证 Avro 模式。

  • JsonSerializer 用于将 Java 对象转换为 JSON 格式的字节数组。它通常用于结构化数据的传输。

  • 自定义序列化器:Kafka 还允许用户自定义序列化器。自定义序列化器需要实现 org.apache.kafka.common.serialization.Serializer 接口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public class CustomSerializer<T> implements Serializer<T> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    // 配置序列化器(如果需要)
    }

    @Override
    public byte[] serialize(String topic, T data) {
    // 实现序列化逻辑
    return data.toString().getBytes();
    }

    @Override
    public void close() {
    // 关闭序列化器,清理资源
    }
    }

    分区器

Kafka 分区器(Partitioner)是用于确定消息应该发送到哪个分区(Partition)的机制。分区是 Kafka 中逻辑上的消息存储单元,不同分区可以分布在不同的 Broker 上,从而实现负载均衡和并行处理

好处:

  • 便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成块一块数据存储在多台Broker上;合理控制分区的任务,可以实现负载均衡的效果。
  • 提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

分区器根据配置和消息内容选择目标分区。分区选择可以基于以下几种策略

  • 轮询(Round Robin):消息被轮流发送到各个分区,分布均匀。
  • 基于键的分区(Key-based Partitioning):如果消息包含键(Key),生产者会使用键的哈希值来选择分区。
  • 自定义分区器(Custom Partitioner):用户可以实现自定义分区逻辑来决定消息的目标分区。

默认分区器

Kafka 提供了一个默认的分区器 DefaultPartitioner,其分区策略如下:

  1. 如果指定分区:按照指定的分区进行发送
  2. 有 Key 的消息:使用 Key 的哈希值对分区数取模(hash(key) % numPartitions),从而确定消息的分区。这种方式可以确保相同 Key 的消息总是发送到同一个分区。
  3. 没有 Key 的消息:如果消息没有指定 Key,则会随机选择一个分区(随机轮询),直到该分区batch(默认16k)已满或已完成,再会随机选另一个分区。

自定义分区器

Kafka 允许用户实现自定义分区器,以满足特定的业务需求。自定义分区器需要实现 org.apache.kafka.clients.producer.Partitioner 接口。

例如:就可以根据 Broker 所在的 IP 地址实现定制化的分区策略,实现分配就近分区

简单实现:

  • 创建自定义分区器:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public class CustomPartitioner implements Partitioner {

    @Override
    public void configure(Map<String, ?> configs) {
    // 配置初始化(如果需要)
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    // 获取主题的分区数
    int numPartitions = cluster.partitionsForTopic(topic).size();

    // 根据消息值的长度来确定分区
    int partition = valueBytes.length % numPartitions;
    return partition;
    }

    @Override
    public void close() {
    // 关闭分区器,清理资源
    }
    }
  • 注册分区器:properties.put("partitioner.class", "com.example.CustomPartitioner");

消息可靠性

在 Kafka 生产者一方,确保数据可靠性是至关重要的。Kafka 提供了acks机制来增强数据传输的可靠性,从而保证消息不会在传输到broker过程中丢失。

ack机制:

acks 参数决定了生产者在收到 broker 的确认(acknowledgement)之前,必须等待的确认数量。该参数有以下三种配置:

  • acks=0:生产者不会等待任何确认。这种配置性能最高,但可靠性最低,因为消息可能会在没有任何确认的情况下丢失。

  • acks=1:只要领导者分区(leader partition)写入消息成功,生产者就会收到确认。这种配置提供了一定的可靠性,但如果领导者崩溃,可能会丢失消息。

  • acks=all(或 acks=-1):只有当所有同步副本(in-sync replicas, ISR)都确认写入成功,生产者才会收到确认。这种配置提供最高的可靠性,确保消息不会丢失。

    当前acks为all时,为避免一直等待Follower同步数据,提供了一种Follower通讯检测的机制

    如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。

    ISR最小应答数配置:min.insync.replicas默认为1

总结:所以要保证生产者严格的消息可靠,至少要让acks = all + 副本数 >= 2 + 应答最小副本数 >= 2

即设置 replication.factor >= 3、min.insync.replicas > 1;确保 replication.factor > min.insync.replicas

消息不重复性

kafka为保证消息的不重复,提供了幂等性和事务的机制

幂等性

kafka中幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复,但是只能保证单分区单会话内的不重复

幂等性生产者依赖于以下几个关键要素来确保消息不重复:

  1. Producer ID(PID)
    • 当生产者启用幂等性时,Kafka 会为每个生产者分配一个唯一的 Producer ID(PID)。
    • PID 在生产者的整个生命周期内保持不变,重启会重新分配。
  2. Partition:分区号
  3. Sequence Number(序列号)
    • 每个消息在发送时都会被分配一个递增的序列号。
    • Kafka 代理(broker)会跟踪每个分区上已接收消息的最高序列号。
    • 当生产者重试发送消息时,代理会检查序列号,以确保不会接受重复的消息。

开启幂等性:properties.put("enable.idempotence", "true");

如果需要kafka重启之后仍保证不重复,只有配合生产者事务一起使用

生产者事务

Kafka 的事务机制(Kafka Transactions)允许生产者在一个原子操作中发送多条消息,确保这些消息要么全部成功提交,要么全部回滚。这个特性对保证数据一致性和处理复杂的事件流非常重要。

要使用事务,生产者必须启用幂等性并配置事务 ID

基本概念

  • Transactional Producer:启用事务的生产者。
  • Transactional ID:事务生产者的唯一标识,用于标识和管理生产者的事务,Producer在使用事务功能前,必须先自定义一个唯一的Transactional ID,即使客户端挂掉了,它重启后也能继续处理未完成的事务。
  • Transaction Coordinator:Kafka 的内部组件,负责管理事务的生命周期,会根据事务ID进行哈希后的值来决定使用哪个分区Leader副本所在broker的Transaction Coordinator。

工作流程

  1. 初始化事务:调用 initTransactions() 方法,生产者与 Kafka 集群协调器进行初始化,分配 transactional.id
  2. 开始事务:调用 beginTransaction() 方法,开始一个新的事务。
  3. 发送消息:使用常规的 send() 方法发送消息,这些消息会被记录在事务中。
  4. 提交事务:调用 commitTransaction() 方法,提交事务,所有在事务中的消息会被原子性地写入到目标主题。
  5. 回滚事务:如果在事务期间发生错误,可以调用 abortTransaction() 方法回滚事务,所有在事务中的消息都会被丢弃。

image-20240526180556833

简单案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class TransactionalProducerExample {

public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "ip1:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
props.put("enable.idempotence", "true");
props.put("transactional.id", "my-transactional-id");

Producer<String, String> producer = new KafkaProducer<>(props);

// 初始化事务
producer.initTransactions();

try {
// 开始事务
producer.beginTransaction();

// 发送消息
producer.send(new ProducerRecord<>("test", "key1", "value1"));

// 提交事务
producer.commitTransaction();
} catch (Exception e) {
// 发生异常,回滚事务
producer.abortTransaction();
e.printStackTrace();
} finally {
producer.close();
}
}
}

消息有序性

在 Kafka 中,消息的有序性是通过分区(partition)实现的。每个主题(topic)由一个或多个分区组成,分区是 Kafka 中实现消息顺序和并行处理的基本单元。

  1. 单分区内有序:

    • Kafka 保证在同一个分区内,消息是按生产者发送的顺序存储和消费的。也就是说,消息的顺序在每个分区内是有保证的。

    在kafka1.x版本之前,需要保证单分区有序还必须配置max.in.flight.requests.per.connection = 1

    在kafka1.x版本之后

    • 如果没有开启幂等性则也需要配置;
    • 如果开启了幂等性,需要 max.in.flight.requests.per.connection 设置为小于5(因为小于5之后会在broker端进行缓存,并按照序列号重新排序)
  2. 全局有序:

    • 对于整个主题(即跨多个分区)的消息,Kafka 并不保证全局有序性。要实现全局有序性,需要将所有消息发送到同一个分区,这通常会牺牲并行处理的优势和吞吐量。

    如果不想发一个分区,那可以将多个分区一起获取出来,然后再进行统一排序来保证有序(这样效率很低)