大数据面试之Kafka

1. Kafka 架构

生产者 、 Broker 、 消费者 、 Zookeeper。

注意: Zookeeper 中保存 Broker id 和 controller 等信息, 但是没有生产者信息。

2. Kafka 生产端分区分配策略

Kafka 官方为我们实现了三种 Partitioner (分区器), 分别是 DefaultPartitioner (当未指 定分区器时候所使用的默认分区器)、 UniformStickyPartitioner 、 RoundRobinPartitioner。

1DefaultPartitioner 默认分区器

下图说明了默认分区器的分区分配策略:

2UniformStickyPartitioner 纯粹的粘性分区器

(1) 如果指定了分区号, 则会按照指定的分区号进行分配

(2) 若没有指定分区好, 则使用粘性分区器

3RoundRobinPartitioner 轮询分区器

(1) 如果在消息中指定了分区则使用指定分区。

(2) 如果未指定分区, 都会将消息轮询每个分区, 将数据平均分配到每个分区中。

4) 自定义分区器

自定义分区策略: 可以通过实现 org.apache.kafka.clients.producer.Partitioner 接口, 重写 partition 方法来达到自定义分区效果。

例如我们想要实现随机分配, 只需要以下代码:

List partitions = cluster.partitionsForTopic (topic);

return ThreadLocalRandom.current ().nextInt (partitions.size ());

先计算出该主题总的分区数, 然后随机地返回一个小于它的正整数。

在项目中, 如果希望把 MySQL 中某张表的数据发送到一个分区 。可以以表名为 key 进 行发送。

3. Kafka 丢不丢数据

1Producer 角度

acks=0

acks=- 1, 生产者发送过来数据 Leader 和 ISR 队列里面所有 Follwer 应答, 可靠性高,效率低;

在生产环境中, acks=0 很少使用; acks=1, 一般用于传输普通日志, 允许丢个别数据; acks=- 1, 一般用于传输和钱相关的数据, 对可靠性要求比较高的场景。

2Broker 角度

副本数大于等于 2。

min.insync.replicas 大于等于 2。

4. Kafka 的 ISR 副本同步队列

ISR (In-Sync Replicas), 副本同步队列 。 如果 Follower 长时间未向 Leader 发送通信请 求或同步数据, 则该 Follower 将被踢出 ISR 。 该时间阈值由 replica.lag.time.max.ms 参数设 定, 默认 30s。

任意一个维度超过阈值都会把 Follower 剔除出 ISR, 存入 OSR (Outof-Sync Replicas) 列表, 新加入的 Follower 也会先存放在 OSR 中。

Kafka 分区中的所有副本统称为 AR = ISR + OSR

5. Kafka 数据重复

去重 = 幂等性 + 事务

1) 幂等性原理

2) 幂等性配置参数

3Kafka 的事务一共有如下 5API

// 1 初始化事务

void initTransactions ();

// 2 开启事务

void beginTransaction () throws ProducerFencedException;

// 3 在事务内提交已经消费的偏移量 (主要用于消费者)

void sendOffsetsToTransaction (Map offsets,

String consumerGroupId) throws

ProducerFencedException;

// 4 提交事务

void commitTransaction () throws ProducerFencedException;

// 5 放弃事务 (类似于回滚事务的操作)

void abortTransaction () throws ProducerFencedException;

4) 总结

( 1) 生产者角度

acks 设置为- 1 (acks=- 1)。

幂等性 (enable.idempotence = true) + 事务 。

(2) broker 服务端角度

分区副本大于等于 2 (--replication-factor 2)。

ISR 里应答的最小副本数量大于等于 2 (min.insync.replicas = 2)。

(3) 消费者

事务 + 手动提交 offset (enable.auto.commit = false)。

消费者输出的目的地必须支持事务 (MySQL 、 Kafka)。

6.Kafka 如何保证数据有序 or 怎么解决乱序

1Kafka 最多只保证单分区内的消息是有序的, 所以如果要保证业务全局严格有序, 就要 设置 Topic 为单分区。

2) 如何保证单分区内数据有序?

生产经验——如何保证单分区数据有序

方案一:

禁止重试, 需设置以下参数

设置retries等于0

说明: 数据出现乱序的根本原因是, 失败重试, 关闭重试, 则可保证数据是有序的 。 但是这样做, 可能会 导致数据的丢失。

方案二:

启用幂等, 需设置以下参数

设置enable.idempotence=true, 启用幂等

设置max.in.flight.requests.per.connection, 1.0.X之后, 小于等于5

设置retries, 保证其大于0

设置acks, 保证其为-1

注: 幂等机制保证数据有序的原理如下:

7.Kafka 分区 Leader 选举规则

在 ISR 中存活为前提, 按照 AR 中排在前面的优先 。 例如 AR[ 1,0,2], ISR [ 1, 0, 2], 那么 Leader 就会按照 1, 0, 2 的顺序轮询。

8. Kafka 中 AR 的顺序

如果 Kafka 服务器只有 4 个节点, 那么设置 Kafka 的分区数大于服务器台数, 在 Kafka 底层如何分配存储副本呢?

1) 创建 16 分区, 3 个副本

(1) 创建一个新的 Topic, 名称为 second。

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 16 --replication-factor 3 --topic second

(2) 查看分区和副本情况。

9. Kafka 日志保存时间

默认保存 7 天; 生产环境建议 3 天。

10.Kafka 过期数据清理

日志清理的策略只有 delete 和 compact 两种。

1delete 日志删除: 将过期数据删除

log.cleanup.policy = delete , 所有数据启用删除策略

(1) 基于时间: 默认打开 。 以 segment 中所有记录中的最大时间戳作为该文件时间戳。

(2) 基于大小: 默认关闭 。 超过设置的所有日志总大小, 删除最早的 segment。 log.retention.bytes, 默认等于- 1, 表示无穷大。

思考: 如果一个 segment 中有一部分数据过期, 一部分没有过期, 怎么处理?

2compact 日志压缩

11.Kafka 为什么能高效读写数据

1Kafka 本身是分布式集群, 可以采用分区技术, 并行度高

2) 读数据采用稀疏索引, 可以快速定位要消费的数据

3) 顺序写磁盘

Kafka 的 producer 生产数据, 要写入到 log 文件中, 写的过程是一直追加到文件末端, 为顺序写 。 官网有数据表明, 同样的磁盘, 顺序写能到 600M/s, 而随机写只有 100K/s 。 这与磁盘的机械机构有关, 顺序写之所以快, 是因为其省去了大量磁头寻址的时间。

4) 页缓存 + 零拷贝技术

12.自动创建主题

如果 Broker 端配置参数 auto.create.topics.enable 设置为 true (默认值是 true), 那么当生 产者向一个未创建的主题发送消息时,会自动创建一个分区数为 num.partitions(默认值为 1)、 副本因子为 default.replication.factor (默认值为 1) 的主题 。 除此之外, 当一个消费者开始从 未知主题中读取消息时, 或者当任意一个客户端向未知主题发送元数据请求时, 都会自动创 建一个相应主题。这种创建主题的方式是非预期的, 增加了主题管理和维护的难度。生产环 境建议将该参数设置为 false。

(1) 向一个没有提前创建 five 主题发送数据

[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh--bootstrap-server hadoop102:9092 --topic five>hello world

(2) 查看 five 主题的详情

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic five

13. 副本数设定

一般我们设置成 2 个或 3 个, 很多企业设置为 2 个。

副本的优势: 提高可靠性; 副本劣势: 增加了网络 IO 传输。

14. Kakfa 分区数

(1) 创建一个只有 1 个分区的 Topic。

(2) 测试这个 Topic 的 Producer 吞吐量和 Consumer 吞吐量。

(3) 假设他们的值分别是 Tp 和 Tc, 单位可以是 MB/s。

(4) 然后假设总的目标吞吐量是 Tt, 那么分区数 = Tt / min (Tp, Tc)。

例如: Producer 吞吐量 = 20m/s; Consumer 吞吐量 = 50m/s, 期望吞吐量 100m/s;

分区数 = 100 / 20 = 5 分区

分区数一般设置为: 3- 10 个

分区数不是越多越好, 也不是越少越好, 需要搭建完集群, 进行压测, 再灵活调整分区 个数。

15. Kafka 增加分区

1) 可以通过命令行的方式增加分区, 但是分区数只能增加, 不能减少。

2) 为什么分区数只能增加, 不能减少?

(1) 按照 Kafka 现有的代码逻辑而言, 此功能完全可以实现, 不过也会使得代码的复 杂度急剧增大。

(2) 实现此功能需要考虑的因素很多, 比如删除掉的分区中的消息该作何处理?

(3) 反观这个功能的收益点却是很低, 如果真的需要实现此类的功能, 完全可以重新 创建一个分区数较小的主题, 然后将现有主题中的消息按照既定的逻辑复制过去即可。

16. Kafka 中多少个 Topic

ODS 层: 2 个

DWD 层: 20 个

17. Kafka 消费者是拉取数据还是推送数据

拉取数据。

18.Kafka 消费端分区分配策略

粘性分区:

该分区分配算法是最复杂的一种, 可以通过 partition.assignment.strategy 参数去设置, 从 0. 11 版本开始引入, 目的就是在执行新分配时, 尽量在上一次分配结果上少做调整, 其 主要实现了以下 2 个目标:

(1) Topic Partition 的分配要尽量均衡。

(2) 当 Rebalance 发生时, 尽量与上一次分配结果保持一致。

注意: 当两个目标发生冲突的时候, 优先保证第一个目标, 这样可以使分配更加均匀,其中第一个目标是 3 种分配策略都尽量去尝试完成的,而第二个目标才是该算法的精髓所在。

19.消费者再平衡的条件

1Rebalance 的触发条件有三种

(1) 当 Consumer Group 组成员数量发生变化 (主动加入、主动离组或者故障下线等)。

(2) 当订阅主题的数量或者分区发生变化。

2) 消费者故障下线的情况

3) 主动加入消费者组

在现有集中增加消费者, 也会触发 Kafka 再平衡 。 注意, 如果下游是 Flink, Flink 会自 己维护 offset, 不会触发 Kafka 再平衡。

20. 指定 Offset 消费

可以在任意 offset 处消费数据。

kafkaConsumer.seek (topic, 1000);

21.指定时间消费

可以通过时间来消费数据。

HashMap timestampToSearch = new HashMap<> ();

timestampToSearch.put (topicPartition, System.currentTimeMillis () -1 * 24 * 3600 * 1000);

kafkaConsumer.offsetsForTimes (timestampToSearch);

22.Kafka 监控

公司自己开发的监控器。

开源的监控器: KafkaManager 、 KafkaMonitor 、 KafkaEagle。

23.Kafka 数据积压

1) 发现数据积压

通过 Kafka 的监控器 Eagle, 可以看到消费 lag, 就是积压情况:

2) 解决

(1) 消费者消费能力不足

①可以考虑增加 Topic 的分区数, 并且同时提升消费组的消费者数量, 消费者数 = 分 区数 。(两者缺一不可)。

增加分区数;

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3

②提高每批次拉取的数量, 提高单个消费者的消费能力。

(2) 消费者处理能力不行

①消费者, 调整 fetch.max.bytes 大小, 默认是 50m。

②消费者, 调整 max.poll.records 大小, 默认是 500 条。

如果下游是 Spark、Flink 等计算引擎, 消费到数据之后还要进行计算分析处理, 当处理 能力跟不上消费能力时, 会导致背压的出现, 从而使消费的速率下降。

需要对计算性能进行调优 (看 Spark 、 Flink 优化)。

(3) 消息积压后如何处理

某时刻, 突然开始积压消息且持续上涨。这种情况下需要你在短时间内找到消息积压的 原因, 迅速解决问题。

导致消息积压突然增加, 只有两种: 发送变快了或者消费变慢了。

假如赶上大促或者抢购时, 短时间内不太可能优化消费端的代码来提升消费性能, 此时 唯一的办法是通过扩容消费端的实例数来提升总体的消费能力。如果短时间内没有足够的服 务器资源进行扩容, 只能降级一些不重要的业务, 减少发送方发送的数据量, 最低限度让系 统还能正常运转, 保证重要业务服务正常。

假如通过内部监控到消费变慢了, 需要你检查消费实例, 分析一下是什么原因导致消费变慢?

①优先查看日志是否有大量的消费错误。

②此时如果没有错误的话, 可以通过打印堆栈信息, 看一下你的消费线程卡在哪里「触 发死锁或者卡在某些等待资源」。

24.如何提升吞吐量

如何提升吞吐量?

1) 提升生产吞吐量

(1) buffer.memory: 发送消息的缓冲区大小, 默认值是 32m, 可以增加到 64m。

(2) batch.size: 默认是 16k。如果 batch 设置太小, 会导致频繁网络请求, 吞吐量下降; 如果 batch 太大, 会导致一条消息需要等待很久才能被发送出去, 增加网络延时。

(3) linger.ms, 这个值默认是 0, 意思就是消息必须立即被发送 。 一般设置一个 5- 100毫秒。如果 linger.ms 设置的太小, 会导致频繁网络请求, 吞吐量下降; 如果 linger.ms 太长, 会导致一条消息需要等待很久才能被发送出去, 增加网络延时。

(4) compression.type: 默认是 none, 不压缩, 但是也可以使用 lz4 压缩, 效率还是不 错的, 压缩之后可以减小数据量, 提升吞吐量, 但是会加大 producer 端的 CPU 开销。

2) 增加分区

3) 消费者提高吞吐量

(1) 调整 fetch.max.bytes 大小, 默认是 50m。

(2) 调整 max.poll.records 大小, 默认是 500 条。

25.Kafka 中数据量计算

每天总数据量 100g, 每天产生 1 亿条日志, 10000 万/24/60/60=1150 条/每秒钟

平均每秒钟: 1150 条

低谷每秒钟: 50 条

高峰每秒钟: 1150 条 * (2-20 倍) = 2300 条 - 23000 条

每条日志大小: 0.5k - 2k (取 1k)

每秒多少数据量: 2.0M - 20MB

26.Kafka 如何压测?

用 Kafka 官方自带的脚本, 对 Kafka 进行压测。

生产者压测: kafka-producer-perf-test.sh

消费者压测: kafka-consumer-perf-test.sh

1Kafka Producer 压力测试

(1) 创建一个 test Topic, 设置为 3 个分区 3 个副本

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --replication-factor 3 --partitions 3 --topic test

(2) 在/opt/module/kafka/bin 目录下面有这两个文件 。 我们来测试一下

[atguigu@hadoop105 kafka]$ bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props

bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=16384 linger.ms=0

参数说明:

输出结果:

(3) 调整 batch.size 大小

(4) 调整 linger.ms 时间

(5) 调整压缩方式

(6) 调整缓存大小

2Kafka Consumer 压力测试

(1) 修改/opt/module/kafka/config/consumer.properties 文件中的一次拉取条数为 500

max.poll.records=500

(2) 消费 100 万条日志进行压测

[atguigu@hadoop105 kafka]$ bin/kafka-consumer-perf-test.sh

--bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092

--topic test --messages 1000000 --consumer.config

config/consumer.properties

参数说明:

输出结果:

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg,

nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec

2022-01-20 09:58:26:171, 2022-01-20 09:58:33:321, 977.0166, 136.6457, 1000465,

139925.1748, 415, 6735, 145.0656, 148547.1418

(3) 一次拉取条数为 2000

(4) 调整 fetch.max.bytes 大小为 100m

27. 磁盘选择

kafka 底层主要是顺序写, 固态硬盘和机械硬盘的顺序写速度差不多。 建议选择普通的机械硬盘。

每天总数据量: 1 亿条 * 1k ≈ 100g

100g * 副本 2 * 保存时间 3 天 / 0.7 ≈ 1T

建议三台服务器硬盘总大小, 大于等于 1T。

28. 内存选择

Kafka 内存组成: 堆内存 + 页缓存

1Kafka 堆内存建议每个节点: 10g ~ 15g

在 kafka-server-start.sh 中修改

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then

export KAFKA_HEAP_OPTS="-Xmx10G -Xms10G"

fi

(1) 查看 Kafka 进程号

[atguigu@hadoop102 kafka]$ jps

2321 Kafka

5255 Jps

1931 QuorumPeerMain

(2) 根据 Kafka 进程号, 查看 Kafka 的 GC 情况

参数说明:

YGC: 年轻代垃圾回收次数;

(3) 根据 Kafka 进程号, 查看 Kafka 的堆内存

[atguigu@hadoop102 kafka]$ jmap -heap 2321

… …

Heap Usage:

G1 Heap:

regions = 2048

capacity = 2147483648 (2048.0MB)

used

free

= 246367744 (234.95458984375MB)

= 1901115904 (1813.04541015625MB)

11.472392082214355% used

2) 页缓存:

页缓存是 Linux 系统服务器的内存 。 我们只需要保证 1 个 segment (1g) 中 25%的数据 在内存中就好。

每个节点页缓存大小 = (分区数 * 1g * 25%) / 节点数 。 例如 10 个分区, 页缓存大小= (10 * 1g * 25%) / 3 ≈ 1g

建议服务器内存大于等于 11G。

29.CPU 选择

1) 默认配置

num.io.threads = 8 负责写磁盘的线程数。

num.replica.fetchers = 1 副本拉取线程数。

num.network.threads = 3 数据传输线程数。

2) 建议配置

此外还有后台的一些其他线程, 比如清理数据线程, Controller 负责感知和管控整个集 群的线程等等, 这样算, 每个 Broker 都会有上百个线程存在 。 根据经验, 4 核 CPU 处理几 十个线程在高峰期会打满, 8 核勉强够用, 而且再考虑到集群上还要运行其他的服务, 所以 部署 Kafka 的服务器一般建议在 16 核以上可以应对一两百个线程的工作, 如果条件允许, 给到24 核甚至 32 核就更好。

num.io.threads = 16 负责写磁盘的线程数。

num.replica.fetchers = 2 副本拉取线程数。

num.network.threads = 6 数据传输线程数。

服务器建议购买 32 核 CPU

30. 网络选择

网络带宽 = 峰值吞吐量 ≈ 20MB/s , 选择千兆网卡即可。

100Mbps 单位是 bit; 10M/s 单位是 byte ; 1byte = 8bit, 100Mbps/8 = 12.5M/s。

一般百兆的网卡 (100Mbps=12.5m/s)、 千兆的网卡 (1 0 0 0Mbps=125m/s)、 万兆的网卡 (1 2 5 0m/s)。

一般百兆的网卡 (100Mbps)、 千兆的网卡 (1000Mbps)、 万兆的网卡 (10000Mbps)。 100Mbps 单位是 bit; 10M/s 单位是 byte ; 1byte = 8bit, 100Mbps/8 = 12.5M/s。

通常选用千兆或者是万兆网卡。

31. Kafka 挂掉

在生产环境中, 如果某个 Kafka 节点挂掉。

正常处理办法:

(1) 先看日志, 尝试重新启动一下, 如果能启动正常, 那直接解决。

(2) 如果重启不行, 检查内存 、 CPU 、 网络带宽 。 调优= 》调优不行增加资源

(3) 如果将 Kafka 整个节点误删除, 如果副本数大于等于 2, 可以按照服役新节点的 方式重新服役一个新节点, 并执行负载均衡。

32.Kafka 的机器数量

33.服役新节点退役旧节点

可以通过 bin/kafka-reassign-partitions.sh 脚本服役和退役节点。

34. Kafka 单条日志传输大小

Kafka 对于消息体的大小默认为单条最大值是 1M 但是在我们应用场景中, 常常会出现 一条消息大于 1M, 如果不对 Kafka 进行配置 。 则会出现生产者无法将消息推送到 Kafka 或消费者无法去消费 Kafka 里面的数据,这时我们就要对 Kafka 进行以下配置:server.properties。

35.Kafka 参数优化

重点调优参数:

(1) buffer.memory 32m

(2) batch.size: 16k

(3) linger.ms 默认 0 调整 5- 100ms

(4) compression.type 采用压缩 snappy

(5) 消费者端调整 fetch.max.bytes 大小, 默认是 50m。

(6) 消费者端调整 max.poll.records 大小, 默认是 500 条。

(7) 单条日志大小: message.max.bytes 、 max.request.size 、 replica.fetch.max.bytes 适当调整 2-10m

(8) Kafka 堆内存建议每个节点: 10g ~ 15g

在 kafka-server-start.sh 中修改

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then

export KAFKA_HEAP_OPTS="-Xmx10G -Xms10G"

fi

(9) 增加 CPU 核数

num.io.threads = 8 负责写磁盘的线程数

num.replica.fetchers = 1 副本拉取线程数

num.network.threads = 3 数据传输线程数

(10) 日志保存时间 log.retention.hours 3 天

(11) 副本数, 调整为 2

展开阅读全文

页面更新:2024-03-02

标签:数据   吞吐量   副本   节点   线程   分区   大小   消费者   消息   时间

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top