Kafka基础
kafka基础
概述
Apache Kafka 是一个分布式流处理平台,最初由 LinkedIn 开发,并在 2011 年作为开源项目贡献给 Apache 软件基金会。Kafka 的设计目标是实现高吞吐量、低延迟、可扩展和容错的数据流处理和消息传递功能。
基础架构
主题(Topic)
定义:主题是消息的分类单位,用于组织和管理消息流。
分区(Partition):每个主题可以划分为多个分区。分区是有序的、不可变的消息序列,消息在分区内有唯一的偏移量(Offset)记录消费位置。
为了配合多个分区的概念,提出了消费组的概念进行并行消费;并且一个分区只能由消费组的一个消费者进行消费,这样单个分区的消息就是有序的
生产者(Producer)
- 功能:生产者是将消息发布到 Kafka 主题的客户端。
- 工作方式:生产者将消息发送到特定的主题和分区,可以根据某种策略(如轮询、键散列)选择分区。
消费者(Consumer)
- 功能:消费者是从 Kafka 主题中读取消息的客户端。
- 消费者组(Consumer Group):消费者可以组成一个组,共同消费一个主题的消息。每个消息只会被消费者组中的一个成员消费,实现负载均衡。
代理(Broker)
- 功能:代理是 Kafka 服务器,负责接收、存储和转发消息。一个 Kafka 集群通常包含多个代理。
- 存储:消息被持久化存储在代理的磁盘上,每个分区在代理上对应一个日志文件。
ZooKeeper
功能:Kafka 使用 ZooKeeper 进行分布式协调和管理,包括:
元数据存储:如broker、主题、分区、代理信息。
broker启动之后会向zookeeper进行注册,谁先注册谁的controller就是控制节点负责后续的选主工作
领导者选举:为每个分区选举主副本(Leader)。
配置管理:管理和分发集群配置。
Kafka在2.8.0以后也可以不用配置Zookeeper变为选配,不配需要通过kraft进行管理
分区和副本(Partition and Replication)
分区:主题分区使 Kafka 能够水平扩展和并行处理消息。
副本(Replica)
:每个分区可以有多个副本,以实现高可用性。
- 主副本(Leader):负责处理所有读写请求。
- 从副本(Follower):复制主副本的数据,当主副本故障时可以提升为新的主副本。
消息日志(Log)
- 顺序写入:Kafka 将消息按顺序写入分区日志文件,以实现高效的写入性能。
- 数据保留策略:日志可以配置为按时间或空间限制保留数据,已处理的消息不会立即删除。
控制器(Controller)
- 角色:控制器是 Kafka 集群中的一个特殊代理,负责集群范围内的管理任务,如分区的主副本选举和重新分配。
高性能
kafka之所以可以快速读写的原因如下:
- kafka是分布式集群,采用分区方式,并行操作
- 读取数据采用稀疏索引,可以快速定位消费数据
- 顺序写磁盘
- 页缓冲和零拷贝
使用场景
- 实时数据流处理:Kafka 可以用于构建实时数据管道和流处理应用,如日志收集、事件跟踪和监控系统。
- 消息传递:Kafka 可以作为传统消息队列系统的替代方案,用于构建可靠的消息传递机制。
- 数据集成:Kafka 可以用于在不同的数据源和目标之间集成数据,支持数据的实时同步和批量传输。
- 日志聚合:Kafka 可以用作集中式日志管理系统,将不同服务和应用的日志收集到一起,进行分析和监控。
总的来说,Kafka 是一个强大的流处理平台,能够支持各种复杂的数据流和消息传递需求,并且在大数据生态系统中扮演着重要角色。
安装
下载tgz安装包
解压:
tar -zxvf kafka_2.12-3.0.0.tgz
准备三台服务器
进入kafka的config文件夹中修改server.properties配置
- broker.id:保证唯一
- log.dirs:日志文件目录
- zookeeper.connect:告诉zookeeper三个broker的地址
zookeeper.connect=ip1:2181,ip2:2181,ip3:2181/kafka
分发kafka到其他节点并修改broker.id:
xsync kafka_2.12-3.0.0
都添加环境变量:
1
2
3
4
5
6
7sudo vim /etc/profile.d/my_env.sh
KAFKA_HOME
export KAFKA_HOME=/usr/app/kafka_2.12-3.0.0
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile启动Zookeeper:
./zkServer.sh start
启动Kafka:
kafka-server-start.sh -daemon /usr/app/kafka_2.12-3.0.0/config/server.properties
重要集群参数配置
Broker 端参数
log.dirs:这是非常重要的参数,指定了Broker需要使用的若干个文件目录路径(消息数据等)。要知道这个参数是没有默认值的,这说明什么?这说明它必须由你亲自指定,多个地址以逗号隔开(kafka1.1之后提供Failover机制,如果一个磁盘坏了,会将数据自动同步到其他正常磁盘中)。
log.dir:注意这是 dir,结尾没有s,说明它只能表示单个路径,它是补充上一个参数用的。
zookeeper.connect:ZooKeeper集群地址,例如:
zk1:2181,zk2:2181,zk3:2181/kafka1
listeners:学名叫监听器,其实就是告诉外部连接者要通过什么协议(例如ssl等)访问指定主机名和端口开放的 Kafka服务,例如:
CONTROLLER://localhost:9092
。advertised.listeners:和 listeners 相比多了个 advertised。Advertised 的含义表示宣称的、公布的,就是说这组监听器是 Broker 用于对外发布的。
可以配置listeners为内网IP,advertised.listeners为外网IP
auto.create.topics.enable:是否允许自动创建 Topic,一般不开启,因为会导致创建很多不知道的主题。
unclean.leader.election.enable:是否允许 Unclean Leader选举,也就是让不让数据保存少的副本进行选举,一般为false,不然会导致数据的丢失。
auto.leader.rebalance.enable:是否允许定期进行 Leader 选举,非必要一定要设置为false,因为换Leader很消耗系统资源。
**log.retention.{hours|minutes|ms}**:这是个“三兄弟”,都是控制一条消息数据被保存多长时间。从优先级上来说 ms 设置最高、minutes 次之、hours 最低,默认7天。
log.retention.bytes:这是指定 Broker 为消息保存的总磁盘容量大小,默认-1无限制。
message.max.bytes:控制 Broker 能够接收的最大消息大小,可以适当提高默认才1M。
Topic 级别参数
- retention.ms:规定了该 Topic 消息被保存的时长。默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值。
- retention.bytes:规定了要为该 Topic 预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1,表示可以无限使用磁盘空间。
设置方法: - 在创建Topic带上参数:
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic transaction --partitions 1 --replication-factor 1 --config retention.ms=15552000000 --config max.message.bytes=5242880
- 修改Topic时:
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --alter --add-config max.message.bytes=10485760
JVM 级别参数
选择合适的垃圾回收器:
- 如果 Broker 所在机器的 CPU 资源非常充裕,建议使用 CMS 收集器。启用方法是指定-XX:+UseCurrentMarkSweepGC。
- 否则,使用吞吐量收集器。开启方法是指定-XX:+UseParallelGC。
- 也可以使用G1,性能更好,开启方法是指定-XX:+UseG1GC。
设置方法:
1 | $> export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g |
操作系统参数
- 文件描述符限制:
ulimit -n
一般都需要调大,调大并没有影响 - 文件系统类型:文件系统指的是如 ext3、ext4 或 XFS 这样的日志型文件系统;XFS 的性能要强于ext4
- Swappiness:可以设置成一个较小的值,不然会触发OOM killer,还可以提供观察监控的机会
- 提交时间:kafka落盘时间,Kafka 发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就可以了;默认系统刷盘时间为5秒,可以适当地增加提交间隔来降低物理磁盘的写操作,确实有丢失数据的可能,但是kafka提供了副本机制不需要怕
硬件选择
- 操作系统:大部分都部署在linux中,因为其epoll多路复用以及sendfile零拷贝的实现
- 磁盘:大部分只需要机械硬盘
- 固态硬盘:不需要机械寻道,但是kafka是顺序读写
- raid磁盘阵列:分布式磁盘,但是kafka可以通过分区实现负载均衡和副本机制以及磁盘故障转移机制(即如果配置的多个磁盘,其中坏了一部分,坏掉的磁盘上的数据会自动地转移到其他正常的磁盘上,则不会影响broker)提供高可靠性
- 磁盘容量:考虑下面元素(一般会预留20%)
- 新增消息数消息
- 留存时间
- 平均消息大小
- 备份数
- 是否启用压缩
- 内存:kafka内存=堆内存(kafka 内部配置)+页缓存(服务器内存)
- 堆内存10 -15g
- 页缓存segment (1g ) 分区数Leader * 1g * 25%
- cpu选择:
- num.io.threads =8负责写磁盘的线程数,整个参数值要占总核数的50%
- num.replica.fetchers = 1副本拉取线程数,这个参数占总核数的50%的1/3
- num.network.threads =3数据传输线程数,这个参数占总核数的50%的2/3
- 带宽:按照一个机器占带宽的70%,然后kafka再占其中1/3即可
消息模式
点对点的模式
在点对点模式中,消息队列充当消息传递的中介,一个消息仅能被一个消费者消费。这种模式非常适合需要一次处理一个任务的场景。
工作机制
- 队列(Queue):消息被发送到一个特定的队列中。
- 生产者(Producer):生产者将消息发送到队列。
- 消费者(Consumer):一个消费者从队列中读取并处理消息。
- 消息处理:每条消息只能被一个消费者消费并处理。
特性
- 单一消费者:每条消息仅被一个消费者读取,确保消息处理的一致性。
- 负载均衡:多个消费者可以并发地从同一个队列读取消息,进行负载均衡。
- 确认机制:消费者在成功处理消息后,发送确认信息,消息才会从队列中删除。
应用场景
- 任务处理:后台任务处理、批量处理、工作队列等场景。
- 订单处理系统:确保每个订单只被处理一次。
- 邮件系统:每封邮件只被一个处理程序处理和发送。
发布/订阅模式
在发布/订阅模式中,消息通过主题(Topic)进行传递,消息发布者(Publisher)将消息发送到一个主题,多个消息订阅者(Subscriber)可以订阅该主题,每个订阅者都能接收到所有的消息。
工作机制
- 主题(Topic):消息发布到特定的主题中。
- 发布者(Publisher):发布者将消息发送到主题。
- 订阅者(Subscriber):一个或多个订阅者订阅主题,接收来自该主题的所有消息。
- 消息广播:每条消息会被广播到所有订阅者。
特性
- 多消费者:同一消息可以被多个订阅者消费,适合广播消息的场景。
- 订阅机制:订阅者可以动态地订阅或取消订阅主题。
- 松耦合:发布者和订阅者之间没有直接依赖关系。
应用场景
- 新闻发布系统:一个新闻发布,所有订阅该类别的用户都会收到通知。
- 实时数据流处理:多个系统需要实时接收和处理同一数据流,如股票行情、传感器数据等。
- 事件驱动架构:系统各部分通过事件通知进行通信,实现松耦合的架构设计。
- 日志分发系统:日志信息广播到多个日志处理服务进行分析和监控。
Kafka命令行
topic命令
- 查看服务器所有topic:
kafka-topics.sh --bootstrap-server ip1:9092,ip2:9092 --list
- 创建一个topic:
kafka-topics.sh --bootstrap-server ip1:9092 --create --partitions 1 --replication-factor 3 --topic test
(创建一个test分区,一个分区,3个副本) - 删除 topic:
kafka-topics.sh --bootstrap-server ip1:9092 --delete --topic test
- 查看topic详细信息:
kafka-topics.sh --bootstrap-server ip1:9092 --topic test --describe
- 修改分区数(注意:分区数只能增加,不能减少):
kafka-topics.sh --bootstrap-server ip1:9092 --alter --topic test --partitions 3
命令行不能修改副本数
生产者/消费者命令行
- 连接broker的topic发送消息(连接上就可以直接发送消息了):
kafka-console-producer.sh --bootstrap-server ip1:9092 --topic test
- 连接broker的topic消费消息:
kafka-console-consumer.sh --bootstrap-server ip1:9092 --topic test --from-beginning
(不加--from-beginning
则是增量读取)