kafka核心概念

小TOT 创建于 2018-12-25

一、borker

broker可以理解成一个kafka服务node,是一个运行的kafka服务。broker与broker之间是平等的关系,任意broker都可以down机而不影响其他broker正常工作。kafaka在启动的时候,会将自己的信息同步到zk上。数据存储使用了zk的零时节点,broker需要通过心跳机制维护与zk的注册关系,一旦broker宕机,zk上面对应的零时节点也会被删除。

kafka Controller

我们可以把controller当成集群的管理者,集群中borker启动时如果没有controller回主动的去zk上注册znode节点来抢夺controller的位置,注册成功的broker会被当选为kafka controller。若当前的controller宕机其他borker会从重新进入controller争抢流程,从而选出新的controller。controller主要的功能如下:

  • UpdateMetadataRequest:更新元数据请求。topic分区状态经常会发生变更(比如leader重新选举了或副本集合变化了等)。由于当前clients只能与分区的leader broker进行交互,那么一旦发生变更,controller会将最新的元数据广播给所有存活的broker。具体方式就是给所有broker发送UpdateMetadataRequest请求
  • CreateTopics: 创建topic请求。当前不管是通过API方式、脚本方式抑或是CreateTopics请求方式来创建topic,做法几乎都是在Zookeeper的/brokers/topics下创建znode来触发创建逻辑,而controller会监听该path下的变更来执行真正的“创建topic”逻辑。
  • DeleteTopics:删除topic请求。和CreateTopics类似,也是通过创建Zookeeper下的/admin/delete_topics/<topic>节点来触发删除topic,controller执行真正的逻辑
  • 分区重分配:即kafka-reassign-partitions脚本做的事情。同样是与Zookeeper结合使用,脚本写入/admin/reassign_partitions节点来触发,controller负责按照方案分配分区
  • Preferred leader分配:preferred leader选举当前有两种触发方式:1. 自动触发(auto.leader.rebalance.enable = true);2. kafka-preferred-replica-election脚本触发。两者“玩法”相同,向Zookeeper的/admin/preferred_replica_election写数据,controller提取数据执行preferred leader分配
  • 分区扩展:即增加topic分区数。标准做法也是通过kafka-reassign-partitions脚本完成,不过用户可直接往Zookeeper中写数据来实现,比如直接把新增分区的副本集合写入到/brokers/topics/<topic>下,然后controller会为你自动地选出leader并增加分区
  • 集群扩展:新增broker时Zookeeper中/brokers/ids下会新增znode,controller自动完成服务发现的工作
  • broker崩溃处理:同样地,controller通过Zookeeper可实时侦测broker状态。一旦有broker挂掉了,controller可立即感知并为受影响分区选举新的leader
  • ControlledShutdown:broker除了崩溃,还能“优雅”地退出。broker一旦自行终止,controller会接收到一个ControlledShudownRequest请求,然后controller会妥善处理该请求并执行各种收尾工作
  • Controller leader选举:controller必然要提供自己的leader选举以防这个全局唯一的组件崩溃宕机导致服务中断。这个功能也是通过Zookeeper的帮助实现的。

二、topic & partiton

Topic相当于传统消息系统MQ中的一个队列queue,可以把topic当成是消息的分类。partiton可以看成是topic的一个分区,目的是突破IO瓶颈。kafka在存储topic日志的时候,将topic分开存储,这样就能将同一个消息的写压力分配到不同的分区,这样可以提升kafka的整体吞吐能力。为了保证数据的高可用,kafka使用partiton-Replica进行数据备份,若partition leader挂了,kafka controller会自动从partiton-Replica选举新的leader。提到备份不得不提到ISR,这是一个同步备份列表,每当用户添加新的消息时,分区leader成功写入日志后,后必须保证ISR列表里面的备份也成功写入日志后,才能给客户端相应成功。因此ISR列表的备份的日志总是和leader保持一致,在leader宕机的时候,可以使用ISR列表的备份取代leader的位置。

三、log & segment

kafka最终的数据承载是通过log的方式进行,kafka会按照请求的顺序将消息存储到log中。我们知道一个topic可能会被分配到到个分区partiton来减轻单点负载。每个partiton实际上在写log的时候也会存在,单个文件大小物理极限的问题。因此kafka引入了segment解决方案,即将日志分段存储。不同的segment log组合起来的数据就是分区的存储消息数据。为了方便通过offset定位消息,segment log使用first-offset格式进行文件命名,first-offset是该文件存储的第一条消息的offset。这样就能通过消费者提供的offset很快定位到文件,然后通过offset偏移量可以快速定位消息的存储位置。

四、producer消息/数据生产者

生产者负责消息的发送,生产者需要指定消息的topic来区分不同消息。kafka收到消息后通过loadbalance策略,使用hash(message) % topic分片数 决定将数据存储到哪一个分片。 然后将message发送到制定分片的leader,leader收到消息后,将消息保存下来,接着等待ISR(a set of in-sync replicas,该列表的备份数据时刻保持和leader数据一致)中的replica消费消息并发送ack,若ISR列表中的备份分区都已经确认收到消息并保存成功后,leader将成功的消息返回给producer以表明,消息被妥善保存。

五、consumer [group]消息/数据消费者&offset

与其他消息系统不同的是:kafka不会复制去保存客服端之前消费了那条消息,以及下一条应当消费那条消息,kafka将这些工作交给了消费客服端来做,因此kafka在消息消费可以做到无状态。offset就是用来保存某个消费组(consumer group)消费的在当前分区日志下的偏移量的。通常情况下,多个客服端在同时消费同一个消息分区消息的时候会存在并发问题,对于offset的控制就会出现问题,这样就会出现消费重复的情况,kafka使用无锁机制解决这个问题。kafka规定,同一个分区(partition)下的数据只能被通一个consumer group中的一个线程消费,这样就避免了不同线程之间争夺通一个资源,通过这种设计kafka做到了无锁,这样可以避免锁竞争造成效率下降。因此建议consumer group里面的线程数应当和分区数保持一致,这样做可以有效的利用线程资源,线程多了会被浪费掉,少了一个线程可能会处理多个分区的数据。如果你需要多个业务消费同一个消息,由于不同的consumer group对同意主题分区的offset是分开存储的,我们可以创建多个consumer group实现多个线程来消费同一个消息的目的。

kafaka如何常量时间复杂度?

写数据:通过上面消息的存储过程可以发现,除了数据存储和备份操作,并没有其他耗时操作。路由分区->leader写数据->数据复制,这些操作都和现有数据规模没有任何关系。每次写数据只会在原来的基础上做追加存储。由于kafka使用了顺序存储而不是非随机存储(据说磁盘的顺序存储效率远高于磁盘的随机存储、有时候甚至比内存的随机写效率还高),同时kafka还使用了批量存储的方式减少了对io的操作,提升了io效率。
读数据:consumer在消费某个topic的时候,消费者会将所有的分区数据消费完,kafka要求,同一时刻对同一分区的数据只会被一个线程消费,这样避免了锁操作。同时通过consumer group提供的offset数据,通过kafka的文件存储机制可以快速的定位到一个segment文件,并且通过计算offset偏移量可以快速定位到数据。从整个消费流程来看,数据规模对每个过程效率是不敏感的。

kafaka如何做到高可用的&动态扩展

高可用的解决方案通常是采用数据冗余以及快速恢复来解决的。kafka通过分区数据备份(partition replica)&分区数据分散到不同的机器以及kafka controller可以快速检测到宕机节点,通过读取节点的分区数据,可以快速重新选取分区leader,以恢复故障。同时在故障的处理过程中,就算该分区不可用,不往分区写入数据即可,对kafka的数据读取也是没有影响的。kafka使用hash 取余的目的在于均衡负载,并不在于为了通过message可以快速的查找到这个message所在位置,这个不是kafka关注的业务。kafka通过数据复制和快速恢复做到了高可用,同时基于message不关注通过某个具体message的具体存存储位置,因此在扩展kafka的时候,或者在扩展消息分区的时候,不需要进行额为的数据复制操作,降低了扩展时候的成本。

引用