消费者

概述

Kafka消费者是Kafka系统中用于读取并处理Kafka主题(topic)中的消息的组件。Kafka的消费者在消费消息时会从特定的主题和分区中读取数据,然后进行处理。

消费模式:

由于推模式很难考虑到每个客户端不同的消费速率,导致消费者无法消费消息而宕机,因此kafka采用的是poll的模式,该模式有个缺点,如果服务端没有消息,消费端就会一直空轮询。

为了避免过多不必要的空轮询,kafka做了改进,如果没消息服务端就会暂时保持该请求,在一段时间内有消息再回应给客户端。

工作流程:

Kafka消费者订阅主题后,从指定分区拉取消息,处理并提交消费进度(偏移量)以记录已消费的位置,并在消费者组内通过再均衡机制实现负载均衡和故障恢复,确保每个分区的消息仅被一个组内消费者消费

image-20240528210755837

  • 消费者组(Consumer Group):消费者组是Kafka中用于管理和协调多个消费者共同消费一个或多个主题的机制,通过添加多个消费者的消费组,并行处理数据提高消息消费速度

    特点

    1. 唯一的组ID:每个消费者组都有一个唯一的组ID,用于标识这个组。
    2. 水平扩展:同一消费者组内的多个消费者可以并行消费同一主题的不同分区,实现负载均衡。例如,如果一个主题有4个分区,一个消费者组有4个消费者,那么每个消费者将消费一个分区的数据。
    3. 单一消费保证:Kafka确保每个分区中的消息在一个消费者组中只会被一个消费者消费,避免消息重复消费。
    4. 再均衡机制:当消费者组中的成员发生变化(如新增或移除消费者)时,Kafka会触发再均衡,重新分配分区给消费者组中的消费者。

    一般消费者组中消费者比分区都多了,就会有闲置的消费者

  • offset:偏移量是Kafka中用于标识消息在分区中的位置的唯一标识,它是一个不断增加的整数值(老版本存在Zookeeper中,由于频繁通信问题,将其移到每个broker中的__consumer_offsets主题中)

    • _consumer_offsets主题里面采用key和 value的方式存储数据。key是groupid+topic+分区号,value就是当前offset的值。每隔一段时间,kafka 内部会对这个topic进行compact,也就是每个groupid+topic+分区号就保留最新数据(这个主题默认50个分区,3个副本)。
      • Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的
        可删除数据。这个后台线程叫 Log Cleaner。很多实际生产环境中都出现过位移主题无限膨
        胀占用过多磁盘空间的问题,如果你的环境中也有这个问题,我建议你去检查一下 Log
        Cleaner 线程的状态,通常都是这个线程挂掉了导致的。
    • 如果需要想查看这个主题,则需要在config/consumer.properties中添加配置exclude.internaltopics=false

组协调器(Group Coordinator)

定义

  • 组协调器是Kafka中的一个Broker节点,负责管理消费者组的成员关系和偏移量提交。

    消费组的协调器是由groupid决定的,coordinator节点选择=groupid的hashcode值%50(__consumer_offsets的分区数量)

主要职责:

  1. 管理消费者组成员
    • 处理消费者的加入和离开请求。
    • 维护消费者组内的成员列表。
    • 触发再均衡过程,在消费者组成员发生变化时重新分配分区。
  2. 协调再均衡
    • 当消费者组内的成员(消费者)发生变化(如新增、移除消费者,或订阅的主题发生变化)时,组协调器触发再均衡过程。
    • 再均衡过程中,组协调器重新分配分区给组内的消费者,确保每个分区仅被一个消费者消费。
  3. 管理偏移量
    • 接收并存储消费者提交的偏移量。
    • 偏移量通常存储在Kafka的内部主题__consumer_offsets中。
    • 在消费者重新启动或故障恢复时,提供最新的偏移量信息,确保消费者可以从正确的位置继续消费。

工作流程:

image-20240528214452242

消费案例

消费详细流程分析:

image-20240528215639715

代码编写:

  • 创建单个消费者,消费整个topic中的消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    public class CustomConsumer {
    public static void main(String[] args) {
    //0.配置信息
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip1:9092");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    // 消费组的id一定要带
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

    //1.创建消费者,并订阅主题
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    ArrayList<String> topic = new ArrayList<>();
    topic.add("test");
    kafkaConsumer.subscribe(topic);


    //2.消费信息
    while (true) {
    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
    records.forEach(record -> {
    System.out.println(record);
    });
    }
    //3.关闭
    }
    }
  • 创建单个消费者,消费整个topic中某一分区的消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    public class CustomConsumerPartiton {
    public static void main(String[] args) {
    //0.配置信息
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip1:9092");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    // 消费组的id一定要带
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

    //1.创建消费者,并订阅主题分区
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    ArrayList<TopicPartition> topicPartition = new ArrayList<>();
    topicPartition.add(new TopicPartition("test",0));
    kafkaConsumer.assign(topicPartition);


    //2.消费信息
    while (true) {
    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
    records.forEach(record -> {
    System.out.println(record);
    });
    }
    //3.关闭
    }
    }
  • 创建消费者组,对topic进行消费:这里只需要创建三个单一的消费者,要组成一个消费组,只需要使用统一的groupid即可

消费分区分配

Kafka消费分区分配是指Kafka消费者群组中的每个消费者如何分配到不同的分区上,以实现并行消费和负载均衡。Kafka使用了多种分区分配策略来确保数据的高效处理。

重平衡时机:

  • 组成员数量发生变化
  • 订阅主题数量发生变化
  • 订阅主题的分区数发生变化

我们需要尽量去避免非必要的重平衡:

  • 例如Consumer 实例会被 Coordinator 错误地认为“已停止”从而被“踢出”Group,如果是这个原因导致的 Rebalance,则需要去避免
    • 未能及时发送心跳:
      • 设置 session.timeout.ms = 6s
      • 设置 heartbeat.interval.ms = 2s
    • Consumer 消费时间过长导致的:max.poll.interval.ms可以调大,或者优化消费逻辑
    • Consumer 端GC太频繁并且久:查看Consumer 端的 GC 表现
  • 相关参数:
    • session.timeout.ms:心跳超时时间
    • heartbeat.interval.ms:心跳检测间隔时间
    • max.poll.interval.ms:限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔,超过时间还没有poll则会被踢出

参数控制:partition.assignment.strategy(默认策略是Range+CooperativeSticky

Kafka提供了几种分区分配策略,主要包括:

  1. RangeAssignor

    特点:

    • 按范围分配。
    • 该策略会对所有消费者按字典序排序,并对分区也按字典序排序。然后将分区按范围分配给消费者。具体而言,每个消费者会尽量均匀地分配连续的一段分区。
      • 如果除不尽,多的分区给最前面的分区进行消费,如果余数多了会导致第一个分区消费压力大,严重的数据倾斜
      • 并且如果消费者挂了之后,45秒之前发送的消息会交给其他的一个消费者进行消费,消费完之后会进行再分配,会将其消费的所有分区按范围重新分配

    适用场景:适用于分区数量相对较少且消费者变动不频繁的场景。

  2. RoundRobinAssignor

    特点:

    • 轮询分配。
    • 该策略会将所有分区和所有消费者按hashcode排序,然后逐个将分区分配给消费者,类似于轮询调度。每个消费者会得到尽量相等数量的分区,但分区的连续性不会被保证。
      • 如果消费者挂了,45秒之前的消息会轮询的给其他消费者,45秒之后会重新轮询分配

    适用场景:适用于需要均匀分配分区但不要求分区连续性的场景。

  3. StickyAssignor

    特点:

    • 粘性分配。
    • 该策略在分区分配时尽量保持上次分配结果的不变。如果必须重新分配分区,也会尽量保证最小的变动。这有助于减少消费者在分区重新分配时的状态恢复工作。

    适用场景:适用于对再平衡过程中的状态恢复成本敏感的场景。

  4. CooperativeStickyAssignor

    特点:

    • 渐进式粘性分配。
    • 进一步减少再平衡对消费者的干扰。传统的 StickyAssignor 在再平衡时会导致所有消费者停止消费并重新分配分区,而 CooperativeStickyAssignor 采用了逐步再平衡的方法,使得只有部分消费者会在一次再平衡中被影响。

    适用场景:适用于对消费延迟敏感、消费者变动频繁且需要高稳定性的场景。

offset的提交

Kafka支持两种主要的offset提交方式:

  1. 自动提交(Auto Commit)
  2. 手动提交(Manual Commit)

自动提交

在自动提交模式下,Kafka消费者会自动定期提交当前消费到的offset。这种方式的优点是实现简单,不需要开发者显式提交offset,但缺点是可能会在失败恢复时重复消费消息,或者丢失部分消息。

自动提交offset的相关参数:

  • enable.auto.commit:是否开启自动提交offset功能,默认是true
  • auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s

案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Properties props = new Properties();
// 不建议配置太多broker地址,因为kafka会与所有配置的broker建立连接,其实集群中已经同步了元数据
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true"); // 开启自动提交
props.put("auto.commit.interval.ms", "1000"); // 自动提交间隔,单位为毫秒
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}

手动提交

手动提交模式下,消费者需要显式地在代码中提交offset。这样可以更灵活地控制提交逻辑,确保只有在消息处理完成后才提交offset,从而避免重复消费或消息丢失。

手动提交又分为同步提交和异步提交两种方式:

  • 同步提交:同步提交会等待Kafka返回提交结果,确保offset成功提交。如果提交失败,可以捕获异常并进行处理。

    • 案例:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test-group");
      props.put("enable.auto.commit", "false"); // 关闭自动提交
      props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
      consumer.subscribe(Arrays.asList("test-topic"));

      while (true) {
      ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
      for (ConsumerRecord<String, String> record : records) {
      System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
      }

      try {
      consumer.commitSync(); // 同步提交offset
      } catch (CommitFailedException e) {
      e.printStackTrace();
      }
      }
  • 异步提交:异步提交不会阻塞当前线程,可以提高提交效率,但无法立即知道提交结果。可以通过回调函数处理提交结果。

    • 案例:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test-group");
      props.put("enable.auto.commit", "false"); // 关闭自动提交
      props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
      consumer.subscribe(Arrays.asList("test-topic"));

      while (true) {
      ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
      for (ConsumerRecord<String, String> record : records) {
      System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
      }
      // 异步提交
      consumer.commitAsync((offsets, exception) -> {
      if (exception != null) {
      System.err.printf("Commit failed for offsets %s%n", offsets, exception);
      }
      });
      }

      通常将两者进行结合:因为同步是阻塞的,影响系统TPS,但是异步失败后又不会重试;

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      try {
      while (true) {
      ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
      process(records); // 处理消息
      commitAysnc(); // 使用异步提交规避阻塞
      }
      } catch (Exception e) {
      handle(e); // 处理异常
      } finally {
      try {
      consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
      } finally {
      consumer.close();
      }
      }

      Offset 提交的最佳实践

  1. 确保处理完成后再提交:无论是自动提交还是手动提交,都应确保在消息处理完成后再提交offset,以避免重复消费。

  2. 定期提交:在手动提交模式下,定期提交offset可以减少在失败恢复时需要重复处理的消息数量。

  3. 处理提交失败:在手动提交模式下,特别是同步提交时,应处理提交失败的情况,以避免数据丢失。

  4. 控制提交频率:频繁提交offset会增加Kafka的负担,但过少提交可能导致更多的重复消费。应根据业务需求调整提交频率。

  5. 分小批提交:将poll的一大批消息分成一小批一小批提交

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
    int count = 0;

    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    for (ConsumerRecord<String, String> record: records) {
    process(record); // 处理消息
    offsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1);
    // 每100条消息提交一次
    if(count % 100 == 0
    consumer.commitAsync(offsets, null); // 回调处理逻辑是
    count++;
    }
    }

    指定消费位置

在kafka中当消费者查找不到所记录的消费位移时,会根据auto.offset.reset的配置,决定从何处消费。

auto.offset.reset = earliest | latest | none 默认是 latest。

  • earliest:自动将偏移量重置为最早的偏移量,–from-beginning。
  • latest(默认值):自动将偏移量重置为最新偏移量
  • none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。

除了上述消费方式,还有两种特殊的消费方式:

  • 指定offset位置消费:

    • 消费者可以使用seek方法来指定从某个特定的offset位置开始消费。这个方法需要在消费者订阅主题并且分区分配完成后调用

    • 案例:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      public class KafkaConsumerExample {
      public static void main(String[] args) {
      Properties props = new Properties();
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
      String topic = "test-topic";
      TopicPartition partition = new TopicPartition(topic, 0); // 指定主题和分区
      consumer.assign(Arrays.asList(partition)); // 手动分配分区

      long offsetToStart = 10L; // 指定要从哪个offset开始消费
      consumer.seek(partition, offsetToStart); // 指定offset位置

      while (true) {
      ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
      for (var record : records) {
      System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
      }
      consumer.commitSync(); // 手动提交offset
      }
      }
      }

      这里也可以通过api获取分区信息,而不是直接new出来:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      // 指定位置进行消费
      Set<TopicPartition> assignment = kafkaConsumer.assignment();

      // 保证分区分配方案已经制定完毕
      while (assignment.size() == 0){
      kafkaConsumer.poll(Duration.ofSeconds(1));
      assignment = kafkaConsumer.assignment();
      }

      // 指定消费的offset
      for (TopicPartition topicPartition : assignment) {
      kafkaConsumer.seek(topicPartition,600);
      }
  • 从特定的时间戳开始消费:

    • Kafka消费者还可以通过时间戳来确定从哪个offset开始消费。消费者可以使用offsetsForTimes方法,该方法接受一个包含主题分区和时间戳的映射,并返回从该时间戳开始的offset

    • 案例:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      public class KafkaConsumerExample {
      public static void main(String[] args) {
      Properties props = new Properties();
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
      String topic = "test-topic";
      TopicPartition partition = new TopicPartition(topic, 0); // 指定主题和分区
      consumer.assign(Collections.singletonList(partition)); // 手动分配分区

      long timestampToStart = System.currentTimeMillis() - 24 * 60 * 60 * 1000; // 指定24小时前的时间戳
      Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
      timestampsToSearch.put(partition, timestampToStart);

      // 获取指定时间戳的offset
      Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampsToSearch);
      OffsetAndTimestamp offsetAndTimestamp = offsets.get(partition);
      if (offsetAndTimestamp != null) {
      consumer.seek(partition, offsetAndTimestamp.offset());
      }

      while (true) {
      ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
      for (var record : records) {
      System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
      }
      consumer.commitSync(); // 手动提交offset
      }
      }
      }

      拦截器

与生产者对应,消费者也有拦截器;消费者拦截器可以在消息消费之前或之后执行一些自定义逻辑,比如记录日志、过滤消息、监控性能等。使用消费者拦截器可以增强Kafka消费者的功能,使其更加灵活和可扩展。

Kafka消费者拦截器通过实现ConsumerInterceptor接口来定义拦截逻辑。该接口包含以下方法:

  1. **onConsume**:在消费者拉取消息之后调用。可以用于处理或修改消费记录。
  2. **onCommit**:在消费者提交offset之前调用。可以用于记录提交的offset或其他监控操作。
  3. **close**:在消费者关闭时调用。可以用于清理资源。
  4. **configure**:用于配置拦截器。

消费者拦截器的简单实现

  • 创建拦截器:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public class CustomConsumerInterceptor implements ConsumerInterceptor<String, String> {

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
    // 在这里处理消费的记录,比如过滤、修改等
    System.out.println("Intercepted " + records.count() + " records");
    return records; // 返回处理后的记录
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    // 在这里处理提交的offset,比如记录日志等
    System.out.println("Committing offsets: " + offsets);
    }

    @Override
    public void close() {
    // 在这里进行清理操作
    }

    @Override
    public void configure(Map<String, ?> configs) {
    // 在这里配置拦截器
    }
    }
  • 配置拦截器: // 配置消费者拦截器 props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.example.CustomConsumerInterceptor");

漏消费和重复消费

  • 重复消费:已经消费了数据,但是 offset 没提交

    • 自动提交offset引起:

      image-20240602171737043

  • 漏消费:先提交 offset 后消费,有可能会造成数据的漏消费

    • 设置offset为手动提交,当ofiset被提交时,数据还在内存中未落盘,此时刚好消费者线程被kill掉,那么offset已经提交,但是数据未处理,导致这部分内存中的数据丢失。

      image-20240602171841357

解决方案:消费者事务

如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将Kafka的offset保存到支持事务的自定义介质(比如MySQL)

消息积压

消息积压是Kafka中常见的问题,通常是由于消费者消费速度跟不上生产者生产速度引起的。

解决方案:

  • 如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。

  • 如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据,也会造成数据积压。

    • fetch.max.bytes 默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (broker config)ormax.message.bytes (topic config)影响

    • max.poll.records 一次 poll 拉取数据返回消息的最大条数,默认是 500 条

重要参数

参数 解释 默认值
bootstrap.servers 向 Kafka 集群建立初始连接用到的 host/port 列表。
key.deserializer 和 value.deserializer 指定接收消息的 key 和 value 的反序列化类型。一定要写全类名。
key.deserializer 和 value.deserializer 指定接收消息的 key 和 value 的反序列化类型。一定要写全类名。
group.id 标记消费者所属的消费者组。
enable.auto.commit 消费者会自动周期性地向服务器提交偏移量。 true
auto.commit.interval.ms 如果设置了 enable.auto.commit 为 true,则定义了消费者偏移量向 Kafka 提交的频率。 5s
auto.offset.reset 当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在时(如,数据被删除了),该如何处理?earliest: 自动重置偏移量到最早的偏移量。latest: 自动重置偏移量为最新的偏移量。none: 如果消费组之前的偏移量不存在,则抛出异常。anything: 任何消费抛异常。 latest
offsets.topic.num.partitions __consumer_offsets 的分区数。 50 个分区
heartbeat.interval.msKafka 消费者和 coordinator 之间的心跳时间间隔。此值必须小于 session.timeout.ms 的1/3。 3s
session.timeout.ms Kafka 消费者和 coordinator 之间连接超时时间,超过该值,消费者被移除,消费者组执行再平衡。 45s
max.poll.interval.ms 消费者处理消息的最大时长,超过该值,消费者被移除,消费者组执行再平衡。 5分钟
fetch.min.bytes 消费者获取服务器端一批消息最小的字节数。 1字节
fetch.max.wait.ms 如果没有从服务器端获取到一批数据的最小字节数,该时间到,仍然会返回数据。 500ms
fetch.max.bytes 消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes(broker config)或max.message.bytes(topic config)影响。 52428800(50m)
max.poll.records 一次poll拉取数据返回消息的最大条数。 500条