Kafka是一个使用Scala和Java编写的快速、可扩展、高吞吐、可容错的分布式发布订阅消息系统。Kafka具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。
在发布-订阅消息系统中,消息被持久化到一个topic中(即存放在磁盘中),消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者
Kafka将消息持久化到磁盘中,并对消息创建了备份(replica)保证了数据的安全。Kafka在保证了较高的处理速度的同时,又能保证数据处理的低延迟和数据的零丢失。
Kafka的优势在于:
Kafka架构
在我们的实现中,每一个服务器就是一个broker(或者叫kafka的实例), Kafka的broker是无状态的,broker使用Zookeeper维护集群的状态。Leader的选举也由Zookeeper负责。
Zookeeper负责维护和协调broker。当Kafka系统中新增了broker或者某个broker发生故障失效时,由ZooKeeper通知生产者和消费者。生产者和消费者依据Zookeeper的broker状态信息与broker协调数据的发布和订阅任务。
如,检查zookeeper配置
配置说明:
可通/opt/zookeeper/bin/zkServer.sh status命令查看各个主机的服务状态
注: 5台主机的集群中仅有一个Leader,其余4台都是follower
Kafka的配置查看
cat /opt/kafka/config/server.properties
broker.id=1
#port=9092
zookeeper.connect=172.20.1.11:2181,172.20.1.12:2181,172.20.1.13:2181,172.20.1.14:2181,172.20.1.15:2181
log.dirs=/home/kafka-logs
default.replication.factor=5
auto.create.topics.enable=true
zookeeper.connection.timeout.ms=6000
log.retention.hours=1
log.dirs=/home/kafka-logs
配置说明:
Topic 被称为主题,在 kafka 中,使用一个类别属性来划分消息的所属类,划分消息的这个类称为 topic。topic 相当于消息的分配标签,是一个逻辑概念。主题好比是数据库的表,或者文件系统中的文件夹。
topic 中的消息被分割为一个或多个 partition,它是一个物理概念,对应到系统上的就是一个或若干个目录,一个分区就是一个 提交日志目录。消息以追加的形式写入分区,以先后顺序的方式读取。
Partition能水平扩展客户端的读写性能,是高吞吐量的保障(通过将分区分布在不同的服务器上,也就是说,一个主题可以跨越多个服务器,以此来提供比单个服务器更强大的性能)。简单来说,Partition就是一块保存具体数据的空间,本质就是磁盘上存放数据的文件夹,所以Partition是不能跨Broker存在(其他broker上存放的实际是Leader partition的副本),也不能在同一个Broker上跨磁盘。对于一个Topic,可以根据需要设定Partition的个数;Kafka默认的Partition个数num.partitions为1(/opt/kafka/config/server.properties),表示该Topic的所有数据均写入至一个文件夹下;用户也可以在通过指定–partitions来定义分区数(新建topic或通过—alter参数修改—注意是增加)。在数据持久化时,每条消息都是根据一定的分区规则路由到对应的Partition中,并append在log文件的尾部;在同一个Partition中消息是顺序写入的且始终保持有序性;但是不同Partition之间不能保证消息的有序性。
注意:由于一个主题包含无数个分区,因此无法保证在整个 topic 中有序,但是单个 Partition 分区可以保证有序。消息被迫加写入每个分区的尾部。Kafka 通过分区来实现数据冗余和伸缩性
Replica是Kafka架构中一个比较重要的概念,是系统高可用的一种保障。Replica逻辑上是作用于Topic的,但实际上是体现在每一个Partition上。例如:有一个Topic(如CMA_DATA),分区(partitions)数为2(分别为P0,P1),副本因子(replication-factor)数也为3;其本质就是该Topic一共有3个P0分区,3个P1分区。这样的设计在某种意义上就很大程度的提高了系统的容错率。
为了尽可能的提升服务的可用性和容错率,Kafka遵循如下的分区分配原则:
Partition 中的每条记录都会被分配一个唯一的序号,称为 Offset(偏移量)。
Offset是一个递增的、不可变的数字,由 kafka 自动维护。
当一条记录写入Partition 的时候,它就被追加到 log 文件的末尾,并被分配一个序号作为Offset。向Topic 发送消息的时候,实际上是被写入某一个Partition,并赋予Offset。
一个Topic 如果有多个Partition 的话,那么从Topic 这个层面来看,消息是无序的。
但单独看Partition 的话,Partition 内部消息是有序的。
所以,一个Partition 内部消息有序,一个Topic 跨Partition 是无序的。
如果强制要求Topic 整体有序,就只能让Topic 只有一个Partition。
Partition是用来存储数据的,但并不是最小的数据存储单元。Partition下还可以细分成Segment,每个Partition是由一个或多个Segment组成。每个Segment分别对应两个文件:一个是以.index结尾的索引文件,另一个是以.log结尾的数据文件,且两个文件的文件名完全相同。所有的Segment均存在于所属Partition的目录下。
Segment的必要性:如果以partition作为数据存储的最小单元,那么partition将会是一个很大的数据文件,且数据量是持续递增的;当进行过期数据清理或消费指定offset数据时,操作如此的大文件将会是一个很严重的性能问题
Kafka 集群包含一个或多个服务器,每个 Kafka 中服务器被称为 broker。broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker 为消费者提供服务,对读取分区的请求作出响应,返回已经提交到磁盘上的消息。
broker 是集群的组成部分,每个集群中都会有一个 broker 同时充当了 集群控制器(Leader)的角色,它是由集群中的活跃成员选举出来的。每个集群中的成员都有可能充当 Leader,Leader 负责管理工作,包括将分区分配给 broker 和监控 broker。集群中,一个分区从属于一个 Leader,但是一个分区可以分配给多个 broker(非Leader),这时候会发生分区复制。这种复制的机制为分区提供了消息冗余,如果一个 broker 失效,那么其他活跃用户会重新选举一个 Leader 接管。
生产者,即消息的发布者,其会将某 topic 的消息发布到相应的 partition 中。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。不过,在某些情况下,生产者会把消息直接写到指定的分区。
消费者,即消息的使用者,一个消费者可以消费多个 topic 的消息,对于某一个 topic 的消息,其只会消费同一个 partition 中的消息。
每个消费者组都有一个组id!同一个消费组者的消费者可以消费同一topic下不同分区的数据,但是不会组内多个消费者消费同一分区的数据!
即组内消费者小于Topic分区数量时,会存在某个消费者消费多个分区消息的情况;而组内消费者大于分区数量时,多出来的消费者不消费任何分区的消息,即尽量消费组的consumer数量与分区数量一致
如果kafka broker中的config/server.properties配置文件中配置了auto.create.topics.enable参数为true(默认值就是true),那么当生产者向一个尚未创建的topic发送消息时,会自动创建一个num.partitions(默认值为1)个分区和default.replication.factor(我们配置为5)个副本的对应topic。不过我们一般不建议将auto.create.topics.enable参数设置为true,因为这个参数会影响topic的管理与维护。
通过kafka提供的kafka-topics.sh脚本来创建,并且我们也建议通过这种方式(或者相关的变种方式)来创建topic。
如,通过kafka-topics.sh 脚本来创建一个名为PP8-LC1-LC2并且副本数为2、分区数为4的topic
/opt/kafka/bin/kafka-topics.sh --create --topic windtest --replication-factor 5 --partitions 4 --zookeeper localhost:2181
/opt/kafka/bin/kafka-topics.sh --zookeeper 172.20.1.11:2181 --list
列出了某个topic的partition数量、replica因子以及每个partition的leader、replica信息,即创建的windtest topic4个分区每个分区的leader是哪个broker, replica情况等
/opt/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic windtest
1. 在新创建的topic上,利用脚本创建一个发布者
/opt/kafka/bin/kafka-console-producer.sh --broker-list 172.20.1.11:9092,172.20.1.12:9092,172.20.1.13:9092,172.20.1.14:9092,172.20.1.15:9092 --topic windtest
2. 在每个node上创建一个订阅者
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 172.20.1.11:9092,172.20.1.12:9092,172.20.1.13:9092,172.20.1.14:9092,172.20.1.15:9092 --topic windtest --from-beginning
3. 在发布者窗口发送消息(回车一次为一条消息),检查每个订阅者是否收到消息
前面4条消息时,5个订阅者均在,最后一条消息时,仅有三个订阅者存在
/opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 172.20.1.12:9092 --topic windtest
windtest:2:2
windtest:1:1
windtest:3:1
windtest:0:1
可见5条消息通过轮序的方式在每个partition中存放
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 172.20.1.12:9092 --list
命令行consumer方式时,未指定消费组,自动生成了五个消费组(如上图所示)
注:5个client在5个不同的消费组中,因此,发布的消息被5个client分别消费(即消费了5次)
1. 将5个消费者编入同一个消费组
在5个设备上分别创建一个user.properties文件,输入group.id=consoleGroup1
[root@node1 ~]# cat ./user.properties
group.id=consoleGroup1
在五台主机上启动消费者
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 172.20.1.12:9092 --topic windtest --consumer.config ./user.properties
2. 启动消息发布者
启动前,记录每个partition的offset
发布一条消息
可见通一个组下的5个消费者,仅有一个client消费了消息
3. 检查消费组的消费情况
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 172.20.1.15:9092 --describe --group consoleGroup1
在producer中发布多条消息,可见发布的消息会以round-robin方式放在各个partition中
发布10条消息后,由于我们只有4个partition, 而组内的消费者有5个,所以其中一个消费者始终没有消费任何消息,如下图所示的172.20.1.15这个消费者
注: LAG表示消息消费速度与生产速度之间的差异(若LAG值太大,证明消费能力不足)
4. 组内消费成员固定在一个partition上消费
通过比对partition offset与client的消费情况可见,每个client在启动后,分配在哪个partition就会一直在这个partition消费
1. 消息生产时,若指定了分区信息,消息直接投递到该分区
2. 如果未指定消息生产分区,但是指定了key, 则基于该key的hash值分配分区
3. 若分区和key都未制定,消息按照round-robin(轮询)方式投递到各个分区
4. 同一条消息只能被消费组中一个消费者消费
5. 当分区数量大于组中消费者时,会出现组内某一个消费者负责多个分区的情况
6. 当分区数量小于组内消费者数目时,会出现多出来的消费者成员没有分区对应
通过如下命令查看那个partition下有消息
/opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 172.20.1.12:9092 --topic CMA_DATA
然后通过如下命令查看这些消息的内容(/home/kafka-logs是日志存放位置,CMA_DATA是topic名字,23是其分区号)
/opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /home/kafka-logs/CMA_DATA-23/00000000000000000000.log --deep-iteration --print-data-log | more
/opt/kafka/bin/kafka-topics.sh --zookeeper 172.20.1.11:2181 --alter --topic windtest --partitions 5
注意: 只能增加partition, 若减少已有消息的partition时,该partition的消息如何处理是个问题,实际操作时,我看到如下错误信息
修改后,已经启动且未能分配到partition的消费者会自动分配到新的partition进行消费
/opt/kafka/bin/kafka-topics.sh --delete --topic windtest --zookeeper 172.20.1.11:2181
删除前,在log.dirs=/home/kafka-logs目录下,存在topic windtest 4个partition的消息存放目录,当删除后,4个partition目录也同时删除(实际上是标记为删除状态)
在/opt/kafka/config/log4j.properties文件可查看到kafka的运行日志存放路径(如,将日志存在ceph网络磁盘的目录下----每个主机一个目录)
kafka.logs.dir=/usr/admin/log/hosts/node1/kafka/
同时也可log配置文件中修改日志输出等级
页面更新:2024-05-20
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号