Broker

Kafka Broker 是 Kafka 集群中的一个节点,负责接收、存储和转发消息。Kafka Broker 是 Kafka 系统的核心组件,负责管理主题、分区和消费者组等关键功能。

节点的服役和退役

Broker节点的服役和退役是Kafka集群运维中的重要操作

服役新节点

  • 启动一台新的KafKa服务端(加入原有的Zookeeper集群)

  • 查看原有的 分区信息:kafka-topics.sh --bootstrap-server ip1:9092 --topic test --describe

    会发现根本没有新加入的节点

  • 进行分区重新负载均衡操作:

    1. 创建一个要负载均衡的主题:vim topics-to-move.json

      1
      2
      3
      4
      5
      6
      7
      {
      "topics": [
      {"topic": "test"}
      ],
      "version": 1
      }

    2. 生成负载均衡计划(将新的broker加入,这里只是让我们查看方案):kafka-reassign-partitions.sh --bootstrap-server ip1:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate

    3. 创建正式计划文件(将刚才写入的计划加入):vim increase-replication-factor.json

      1
      2
      3
      4
      5
      {"version":1,"partitions":[{"topic":"test","partition":0,"replic
      as":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"first","par
      tition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"to
      pic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","
      any","any"]}]}
    4. 执行计划:kafka-reassign-partitions.sh --bootstrap-server ip1:9092 --reassignment-json-file increase-replication-factor.json --execute

    5. 验证计划(可以查看计划执行情况):kafka-reassign-partitions.sh --bootstrap-server ip1:9092 --reassignment-json-file increase-replication-factor.json --verify

退役旧节点

同上生成并执行负载均衡计划,只是区别在于broker-list少了要退役的节点

副本机制

Kafka的副本机制是其可靠性和容错能力的重要组成部分。副本机制确保了即使在节点故障的情况下,Kafka也能继续提供数据的一致性和可用性。Kafka的主题(Topic)被划分为多个分区(Partition),每个分区都可以有多个副本(Replica)。这些副本分布在不同的Broker(Kafka服务器)上,以实现高可用性和容错性。

相关角色:

  • Leader:每个分区都有一个Leader副本,负责处理所有的读写请求。
  • Follower:除了Leader之外的其他副本都被称为Follower,它们会从Leader中复制数据,保持与Leader的数据一致。

Follower副本都是是不对外提供服务的,它唯一的任务就是从领导者副本异步拉取消息,并写入到自己的提交日志中,从而实现与领导者副本的同步。

相关概念:

  • AR:分区中的所有 Replica 统称为 AR = ISR +OSR
  • ISR:所有与 Leader 副本保持一定程度同步的Replica(包括 Leader 副本在内)组成 ISR
  • OSR:与 Leader 副本同步滞后过多的 Replica 组成了 OSR(replica.lag.time.max.ms默认30秒,副本没有和leader通信或同步数据)
  • LEO:每个副本都有内部的LEO,代表当前队列消息的最后一条偏移量offset + 1。
  • HW:高水位,代表所有ISR中的LEO最低的那个offset,也是消费者可见的最大消息offset即所有副本都同步的最后消息。

LEO、HW还对副本故障恢复有用:

  • Follower故障恢复后,Follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向Leader进行同步。
  • Leader挂了之后,会选出新的Leader副本,并且其他副本需要将多于其HW的数据截取掉,保证同步。

所以这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

多副本选leader

image-20240526194444371

分区副本的分配

Kafka中的副本分区分配机制是确保数据高可用性和负载均衡的关键。

kafka默认的分区副本分配:尽量让Leader副本均匀的分布在各个节点,例如下方4个节点,16个分区,3个副本的分配:让leader依次间隔i个节点

image-20240527211635565

手动调整分区副本:

  • 编写分区副本json:

    1
    2
    3
    4
    5
    6
    7
    {
    "version": 1,
    "partitions": [
    {"topic": "my-topic", "partition": 0, "replicas": [2, 3, 4]},
    {"topic": "my-topic", "partition": 1, "replicas": [3, 4, 5]}
    ]
    }
  • 执行分区副本操作:kafka-reassign-partitions.sh --zookeeper ip1:9092 --reassignment-json-file reassignment.json --execute

kafka的自平衡机制: 一般情况下,我们的分区都是平衡散落在broker的,随着一些broker故障,会慢慢出现leader集中在某台broker上的情况,造成集群负载不均衡,这时候就需要分区自平衡

kafka提供了下面几个参数进行控制:

  • auto.leader.rebalance.enable:自动leader parition平衡,默认是true;
  • leader.imbalance.per.broker.percentage:每个broker允许的不平衡的leader的比率,默认是10%,如果超过这个值,控制器将会触发leader的平衡
  • leader.imbalance.check.interval.seconds:检查leader负载是否平衡的时间间隔,默认是300秒

但是在生产环境中是不开启这个自动平衡,因为触发leader partition的自动平衡会损耗性能,或者可以将触发自动平和的参数leader.imbalance.per.broker.percentage的值调大点。

文件存储

存储结构

在Kafka中主题(Topic)是一个逻辑上的概念,分区(partition)是物理上的存在的。每个partition对应一个log文件(这个log也是虚拟的概念),该log文件中存储的就是Producer生产的数据,Producer生产的数据会被不断追加到该log文件末端。

为防止log文件过大导致数据定位效率低下,Kafka采用了分片和索引机制,将每个partition分为多个segment,每个segment默认1G( log.segment.bytes ), 每个segment包括.index文件、**.log文件和.timeindex**(时间戳索引文件)等文件。这些文件位于文件夹下,该文件命名规则为:topic名称+分区号。

image-20240527215537406
  • 当log文件写入4k(这里可以通过log.index.interval.bytes设置)数据,就会写入一条索引信息到index文件中,这样的index索引文件就是一个稀疏索引,它并不会每条日志都建立索引信息。
  • 时间戳索引文件,它的作用是可以查询某一个时间段内的消息,它的数据结构是:时间戳(8byte)+ 相对offset(4byte),如果要使用这个索引文件,先要通过时间范围找到对应的offset,然后再去找对应的index文件找到position信息,最后在遍历log文件,这个过程也是需要用到index索引文件的。
  • 文件内都是序列化后的数据,想看可以通过命令查看:kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log

文件清除策略

Kafka将消息存储在磁盘中,为了控制磁盘占用空间的不断增加就需要对消息做一定的清理操作。

Kafka提供了两种日志清理策略(配置为log.cleanup.policy = delete/compact):日志删除(delete) 、日志压缩(compact)

日志删除

按照一定的保留策略直接删除不符合条件的日志分段

删除策略:

  • 基于时间策略:日志删除任务会周期检查当前日志文件中记录的最大时间戳是否超过设定的阈值

    这里需要注意log.retention参数的优先级:log.retention.ms > log.retention.minutes > log.retention.hours,默认只会配置log.retention.hours参数,值为168即为7天。

  • 基于日志大小策略:日志删除任务会周期性检查当前节点的所有日志大小是否超过设定的阈值(log.retention.bytes,默认是-1,表示无穷大),如果超过则会删除最早的segment

  • 基于日志起始偏移量:如果下一个日志段的起始偏移量 baseOffset小于等于 logStartOffset,则删除该日志段

在删除的时候先从Log对象所维护的日志段的跳跃表中移除要删除的日志段,用来确保已经没有线程来读取这些日志段;接着将日志段所对应的所有文件,包括索引文件都添加上**.deleted的后缀;最后交给一个以delete-file命名的延迟任务来删除这些以.deleted为后缀的文件,默认是1分钟执行一次,可以通过file.delete.delay.ms**来配置。

优缺点:

  • 优点
    • 简单有效,适用于不需要长期保留数据的场景。
    • 能有效控制存储空间的使用。
  • 缺点
    • 无法保留重要数据,需要小心配置保留时间和大小。

相关参数:

  • log.retention.hours:最低优先级小时,默认7天
  • log.retention.minutes:分钟
  • log.retention.ms:最高优先级毫秒
  • log.retention.check.interval.ms:负责设置检查周期,默认5分钟
  • file.delete.delay.ms:延迟执行删除时间
  • log.retention.bytes:当设置为-1时表示运行保留日志最大值(相当于关闭);当设置为1G时,表示日志文件最大值

日志压缩

针对每个消息的key进行整合,对于有相同key的不同value值,只保留最后一个版本

image-20240527222203963

优缺点:

  • 优点
    • 适合需要保留每个Key最新状态的数据,如数据库变更日志。
    • 节省存储空间,保留关键信息。
  • 缺点
    • 压缩过程消耗资源,可能影响性能。
    • 适用于特定类型的数据使用场景。

重要参数

参数名称 描述
replica.lag.time.max.ms ISR 中,如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值,默认 30s。
auto.leader.rebalance.enable 默认是 true。自动 Leader Partition 平衡。
leader.imbalance.per.broker.percentage 默认是 10%。每个 broker 允许的不平衡的 leader 的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡。
leader.imbalance.check.interval.seconds 默认值 300 秒。检查 leader 负载是否不平衡的间隔时间。
log.segment.bytes Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分 成块的大小,默认值 1G。
log.index.interval.bytes 默认 4kb,Kafka 里面每当写入了 4kb 大小的日志(.log),然后就在 index 文件里面记录一个索引。
log.retention.hours Kafka 中数据保存的时间,默认 7 天。
log.retention.minutes Kafka 中数据保存的时间,分钟级别,默认关闭。
log.retention.ms Kafka 中数据保存的时间,毫秒级别,默认关闭。
log.retention.check.interval.ms 检查数据是否保留的间隔时间,默认是 5 分钟。
log.retention.bytes 默认等于 -1,表示无穷大。超过设置的所有日志总大小,删除最早的 segment。
log.cleanup.policy 默认是 delete,表示所有数据启用删除策略;如果设置值为 compact,表示所有数据启用压缩策略。
num.io.threads 默认是 8。负责写磁盘的线程数。整个参数值要占总核数的 50%。
num.replica.fetchers 副本拉取线程数,这个参数占总核数的 50% 的 1/3。
num.network.threads 默认是 3。数据传输线程数,这个参数占总核数的 50% 的 2/3。
log.flush.interval.messages 强制页缓存存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。
log.flush.interval.ms 每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理。