kafka基础

概述

Apache Kafka 是一个分布式流处理平台,最初由 LinkedIn 开发,并在 2011 年作为开源项目贡献给 Apache 软件基金会。Kafka 的设计目标是实现高吞吐量、低延迟、可扩展和容错的数据流处理和消息传递功能。

基础架构

image-20240520225345733

  • 主题(Topic)

    • 定义:主题是消息的分类单位,用于组织和管理消息流。

    • 分区(Partition):每个主题可以划分为多个分区。分区是有序的、不可变的消息序列,消息在分区内有唯一的偏移量(Offset)记录消费位置。

      为了配合多个分区的概念,提出了消费组的概念进行并行消费;并且一个分区只能由消费组的一个消费者进行消费,这样单个分区的消息就是有序的

  • 生产者(Producer)

    • 功能:生产者是将消息发布到 Kafka 主题的客户端。
    • 工作方式:生产者将消息发送到特定的主题和分区,可以根据某种策略(如轮询、键散列)选择分区。
  • 消费者(Consumer)

    • 功能:消费者是从 Kafka 主题中读取消息的客户端。
    • 消费者组(Consumer Group):消费者可以组成一个组,共同消费一个主题的消息。每个消息只会被消费者组中的一个成员消费,实现负载均衡。
  • 代理(Broker)

    • 功能:代理是 Kafka 服务器,负责接收、存储和转发消息。一个 Kafka 集群通常包含多个代理。
    • 存储:消息被持久化存储在代理的磁盘上,每个分区在代理上对应一个日志文件。
  • ZooKeeper

    • 功能:Kafka 使用 ZooKeeper 进行分布式协调和管理,包括:

      • 元数据存储:如broker、主题、分区、代理信息。

        image-20240526193550049

        broker启动之后会向zookeeper进行注册,谁先注册谁的controller就是控制节点负责后续的选主工作

      • 领导者选举:为每个分区选举主副本(Leader)。

      • 配置管理:管理和分发集群配置。

    Kafka在2.8.0以后也可以不用配置Zookeeper变为选配,不配需要通过kraft进行管理

  • 分区和副本(Partition and Replication)

    • 分区:主题分区使 Kafka 能够水平扩展和并行处理消息。

    • 副本(Replica)

      :每个分区可以有多个副本,以实现高可用性。

      • 主副本(Leader):负责处理所有读写请求。
      • 从副本(Follower):复制主副本的数据,当主副本故障时可以提升为新的主副本。
  • 消息日志(Log)

    • 顺序写入:Kafka 将消息按顺序写入分区日志文件,以实现高效的写入性能。
    • 数据保留策略:日志可以配置为按时间或空间限制保留数据,已处理的消息不会立即删除。
  • 控制器(Controller)

    • 角色:控制器是 Kafka 集群中的一个特殊代理,负责集群范围内的管理任务,如分区的主副本选举和重新分配。

高性能

kafka之所以可以快速读写的原因如下:

  1. kafka是分布式集群,采用分区方式,并行操作
  2. 读取数据采用稀疏索引,可以快速定位消费数据
  3. 顺序写磁盘
  4. 页缓冲和零拷贝

使用场景

  1. 实时数据流处理:Kafka 可以用于构建实时数据管道和流处理应用,如日志收集、事件跟踪和监控系统。
  2. 消息传递:Kafka 可以作为传统消息队列系统的替代方案,用于构建可靠的消息传递机制。
  3. 数据集成:Kafka 可以用于在不同的数据源和目标之间集成数据,支持数据的实时同步和批量传输。
  4. 日志聚合:Kafka 可以用作集中式日志管理系统,将不同服务和应用的日志收集到一起,进行分析和监控。

总的来说,Kafka 是一个强大的流处理平台,能够支持各种复杂的数据流和消息传递需求,并且在大数据生态系统中扮演着重要角色。

安装

  • 下载tgz安装包

  • 解压:tar -zxvf kafka_2.12-3.0.0.tgz

  • 准备三台服务器

  • 进入kafka的config文件夹中修改server.properties配置

    • broker.id:保证唯一
    • log.dirs:日志文件目录
    • zookeeper.connect:告诉zookeeper三个broker的地址zookeeper.connect=ip1:2181,ip2:2181,ip3:2181/kafka
  • 分发kafka到其他节点并修改broker.id:xsync kafka_2.12-3.0.0

  • 都添加环境变量:

    1
    2
    3
    4
    5
    6
    7
    sudo vim /etc/profile.d/my_env.sh

    #KAFKA_HOME
    export KAFKA_HOME=/usr/app/kafka_2.12-3.0.0
    export PATH=$PATH:$KAFKA_HOME/bin

    source /etc/profile
  • 启动Zookeeper:./zkServer.sh start

  • 启动Kafka: kafka-server-start.sh -daemon /usr/app/kafka_2.12-3.0.0/config/server.properties

重要集群参数配置

Broker 端参数

  • log.dirs:这是非常重要的参数,指定了Broker需要使用的若干个文件目录路径(消息数据等)。要知道这个参数是没有默认值的,这说明什么?这说明它必须由你亲自指定,多个地址以逗号隔开(kafka1.1之后提供Failover机制,如果一个磁盘坏了,会将数据自动同步到其他正常磁盘中)。

  • log.dir:注意这是 dir,结尾没有s,说明它只能表示单个路径,它是补充上一个参数用的。

  • zookeeper.connect:ZooKeeper集群地址,例如:zk1:2181,zk2:2181,zk3:2181/kafka1

  • listeners:学名叫监听器,其实就是告诉外部连接者要通过什么协议(例如ssl等)访问指定主机名和端口开放的 Kafka服务,例如:CONTROLLER://localhost:9092

  • advertised.listeners:和 listeners 相比多了个 advertised。Advertised 的含义表示宣称的、公布的,就是说这组监听器是 Broker 用于对外发布的。

    可以配置listeners为内网IP,advertised.listeners为外网IP

  • auto.create.topics.enable:是否允许自动创建 Topic,一般不开启,因为会导致创建很多不知道的主题。

  • unclean.leader.election.enable:是否允许 Unclean Leader选举,也就是让不让数据保存少的副本进行选举,一般为false,不然会导致数据的丢失。

  • auto.leader.rebalance.enable:是否允许定期进行 Leader 选举,非必要一定要设置为false,因为换Leader很消耗系统资源。

  • **log.retention.{hours|minutes|ms}**:这是个“三兄弟”,都是控制一条消息数据被保存多长时间。从优先级上来说 ms 设置最高、minutes 次之、hours 最低,默认7天。

  • log.retention.bytes:这是指定 Broker 为消息保存的总磁盘容量大小,默认-1无限制。

  • message.max.bytes:控制 Broker 能够接收的最大消息大小,可以适当提高默认才1M。

Topic 级别参数

  • retention.ms:规定了该 Topic 消息被保存的时长。默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值。
  • retention.bytes:规定了要为该 Topic 预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1,表示可以无限使用磁盘空间。
    设置方法:
  • 在创建Topic带上参数:bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic transaction --partitions 1 --replication-factor 1 --config retention.ms=15552000000 --config max.message.bytes=5242880
  • 修改Topic时:bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --alter --add-config max.message.bytes=10485760

JVM 级别参数

选择合适的垃圾回收器:

  • 如果 Broker 所在机器的 CPU 资源非常充裕,建议使用 CMS 收集器。启用方法是指定-XX:+UseCurrentMarkSweepGC。
  • 否则,使用吞吐量收集器。开启方法是指定-XX:+UseParallelGC。
  • 也可以使用G1,性能更好,开启方法是指定-XX:+UseG1GC。

设置方法:

1
2
3
$> export KAFKA_HEAP_OPTS=--Xms6g  --Xmx6g
$> export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
$> bin/kafka-server-start.sh config/server.properties

操作系统参数

  • 文件描述符限制:ulimit -n一般都需要调大,调大并没有影响
  • 文件系统类型:文件系统指的是如 ext3、ext4 或 XFS 这样的日志型文件系统;XFS 的性能要强于ext4
  • Swappiness:可以设置成一个较小的值,不然会触发OOM killer,还可以提供观察监控的机会
  • 提交时间:kafka落盘时间,Kafka 发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就可以了;默认系统刷盘时间为5秒,可以适当地增加提交间隔来降低物理磁盘的写操作,确实有丢失数据的可能,但是kafka提供了副本机制不需要怕

硬件选择

  • 操作系统:大部分都部署在linux中,因为其epoll多路复用以及sendfile零拷贝的实现
  • 磁盘:大部分只需要机械硬盘
    • 固态硬盘:不需要机械寻道,但是kafka是顺序读写
    • raid磁盘阵列:分布式磁盘,但是kafka可以通过分区实现负载均衡和副本机制以及磁盘故障转移机制(即如果配置的多个磁盘,其中坏了一部分,坏掉的磁盘上的数据会自动地转移到其他正常的磁盘上,则不会影响broker)提供高可靠性
  • 磁盘容量:考虑下面元素(一般会预留20%)
    • 新增消息数消息
    • 留存时间
    • 平均消息大小
    • 备份数
    • 是否启用压缩
  • 内存:kafka内存=堆内存(kafka 内部配置)+页缓存(服务器内存)
    • 堆内存10 -15g
    • 页缓存segment (1g ) 分区数Leader * 1g * 25%
  • cpu选择:
    • num.io.threads =8负责写磁盘的线程数,整个参数值要占总核数的50%
    • num.replica.fetchers = 1副本拉取线程数,这个参数占总核数的50%的1/3
    • num.network.threads =3数据传输线程数,这个参数占总核数的50%的2/3
  • 带宽:按照一个机器占带宽的70%,然后kafka再占其中1/3即可

消息模式

点对点的模式

在点对点模式中,消息队列充当消息传递的中介,一个消息仅能被一个消费者消费。这种模式非常适合需要一次处理一个任务的场景。

工作机制

  1. 队列(Queue):消息被发送到一个特定的队列中。
  2. 生产者(Producer):生产者将消息发送到队列。
  3. 消费者(Consumer):一个消费者从队列中读取并处理消息。
  4. 消息处理:每条消息只能被一个消费者消费并处理。

特性

  • 单一消费者:每条消息仅被一个消费者读取,确保消息处理的一致性。
  • 负载均衡:多个消费者可以并发地从同一个队列读取消息,进行负载均衡。
  • 确认机制:消费者在成功处理消息后,发送确认信息,消息才会从队列中删除。

应用场景

  • 任务处理:后台任务处理、批量处理、工作队列等场景。
  • 订单处理系统:确保每个订单只被处理一次。
  • 邮件系统:每封邮件只被一个处理程序处理和发送。

发布/订阅模式

在发布/订阅模式中,消息通过主题(Topic)进行传递,消息发布者(Publisher)将消息发送到一个主题,多个消息订阅者(Subscriber)可以订阅该主题,每个订阅者都能接收到所有的消息。

工作机制

  1. 主题(Topic):消息发布到特定的主题中。
  2. 发布者(Publisher):发布者将消息发送到主题。
  3. 订阅者(Subscriber):一个或多个订阅者订阅主题,接收来自该主题的所有消息。
  4. 消息广播:每条消息会被广播到所有订阅者。

特性

  • 多消费者:同一消息可以被多个订阅者消费,适合广播消息的场景。
  • 订阅机制:订阅者可以动态地订阅或取消订阅主题。
  • 松耦合:发布者和订阅者之间没有直接依赖关系。

应用场景

  • 新闻发布系统:一个新闻发布,所有订阅该类别的用户都会收到通知。
  • 实时数据流处理:多个系统需要实时接收和处理同一数据流,如股票行情、传感器数据等。
  • 事件驱动架构:系统各部分通过事件通知进行通信,实现松耦合的架构设计。
  • 日志分发系统:日志信息广播到多个日志处理服务进行分析和监控。

Kafka命令行

topic命令

  • 查看服务器所有topic:kafka-topics.sh --bootstrap-server ip1:9092,ip2:9092 --list
  • 创建一个topic:kafka-topics.sh --bootstrap-server ip1:9092 --create --partitions 1 --replication-factor 3 --topic test(创建一个test分区,一个分区,3个副本)
  • 删除 topic:kafka-topics.sh --bootstrap-server ip1:9092 --delete --topic test
  • 查看topic详细信息:kafka-topics.sh --bootstrap-server ip1:9092 --topic test --describe
  • 修改分区数(注意:分区数只能增加,不能减少):kafka-topics.sh --bootstrap-server ip1:9092 --alter --topic test --partitions 3

命令行不能修改副本数

生产者/消费者命令行

  • 连接broker的topic发送消息(连接上就可以直接发送消息了):kafka-console-producer.sh --bootstrap-server ip1:9092 --topic test
  • 连接broker的topic消费消息:kafka-console-consumer.sh --bootstrap-server ip1:9092 --topic test --from-beginning (不加--from-beginning则是增量读取)