在 Kafka 集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责分区的重新分配。
Kafka中的控制器选举的工作依赖于Zookeeper,成功竞选为控制器的broker会在Zookeeper中创建/controller这个临时(EPHEMERAL)节点,此临时节点的内容参考如下:
- {"version":1,"brokerid":0,"timestamp":"1529210278988"}
-
其中version在目前版本中固定为1,brokerid表示称为控制器的 broker 的id编号,timestamp表示竞选称为控制器时的时间戳。
在任意时刻,集群中有且仅有一个控制器。每个broker启动的时候会去尝试去读取/controller节点的brokerid的值,如果读取到brokerid的值不为-1,则表示已经有其它broker节点成功竞选为控制器,所以当前broker就会放弃竞选;如果Zookeeper中不存在/controller这个节点,或者这个节点中的数据异常,那么就会尝试去创建/controller这个节点,当前broker去创建节点的时候,也有可能其他broker同时去尝试创建这个节点,只有创建成功的那个broker才会成为控制器,而创建失败的broker则表示竞选失败。每个broker都会在内存中保存当前控制器的brokerid值,这个值可以标识为activeControllerId。
Zookeeper 中还有一个与控制器有关的/controller_epoch节点,这个节点是持久(PERSISTENT)节点,节点中存放的是一个整型的controller_epoch值。controller_epoch用于记录控制器发生变更的次数,即记录当前的控制器是第几代控制器,我们也可以称之为“控制器的纪元”。controller_epoch的初始值为1,即集群中第一个控制器的纪元为1,当控制器发生变更时,每选出一个新的控制器就将该字段值加1。
每个和控制器交互的请求都会携带上controller_epoch这个字段,如果请求的controller_epoch值小于内存中的controller_epoch值,则认为这个请求是向已经过期的控制器所发送的请求,那么这个请求会被认定为无效的请求。如果请求的controller_epoch值大于内存中的controller_epoch值,那么则说明已经有新的控制器当选了。由此可见,Kafka通过controller_epoch来保证控制器的唯一性,进而保证相关操作的一致性。
具备控制器身份的broker需要比其他普通的broker多一份职责,具体细节如下:
控制器在选举成功之后会读取Zookeeper中各个节点的数据来初始化上下文信息(ControllerContext),并且也需要管理这些上下文信息,比如为某个topic增加了若干个分区,控制器在负责创建这些分区的同时也要更新上下文信息,并且也需要将这些变更信息同步到其他普通的broker节点中。不管是监听器触发的事件,还是定时任务触发的事件,亦或者是其他事件(比如ControlledShutdown)都会读取或者更新控制器中的上下文信息,那么这样就会涉及到多线程间的同步,如果单纯的使用锁机制来实现,那么整体的性能也会大打折扣。
针对这一现象,Kafka的控制器使用单线程基于事件队列的模型,将每个事件都做一层封装,然后按照事件发生的先后顺序暂存到LinkedBlockingQueue中,然后使用一个专用的线程(ControllerEventThread)按照FIFO(First Input First Output, 先入先出)的原则顺序处理各个事件,这样可以不需要锁机制就可以在多线程间维护线程安全。
在Kafka的早期版本中,并没有采用Kafka Controller这样一个概念来对分区和副本的状态进行管理,而是依赖于Zookeeper,每个broker都会在Zookeeper上为分区和副本注册大量的监听器(Watcher)。当分区或者副本状态变化时,会唤醒很多不必要的监听器,这种严重依赖于Zookeeper的设计会有脑裂、羊群效应以及造成Zookeeper过载的隐患。在目前的新版本的设计中,只有Kafka Controller在Zookeeper上注册相应的监听器,其他的broker极少需要再监听Zookeeper中的数据变化,这样省去了很多不必要的麻烦。不过每个broker还是会对/controller节点添加监听器的,以此来监听此节点的数据变化(参考ZkClient中的IZkDataListener)。
当/controller节点的数据发生变化时,每个broker都会更新自身内存中保存的activeControllerId。如果broker在数据变更前是控制器,那么如果在数据变更后自身的brokerid值与新的activeControllerId值不一致的话,那么就需要“退位”,关闭相应的资源,比如关闭状态机、注销相应的监听器等。有可能控制器由于异常而下线,造成/controller这个临时节点会被自动删除;也有可能是其他原因将此节点删除了。
当/controller节点被删除时,每个broker都会进行选举,如果broker在节点被删除前是控制器的话,在选举前还需要有一个“退位”的动作。如果有特殊需要,可以手动删除/controller节点来触发新一轮的选举。当然关闭控制器所对应的broker以及手动向/controller节点写入新的brokerid的所对应的数据同样可以触发新一轮的选举。
在 Kafka 中 Partition 的四种状态:
四种状态的转换关系如下:
KafkaController.scala
- class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState) extends Logging with KafkaMetricsGroup {
- this.logIdent = "[Controller " + config.brokerId + "]: "
- private var isRunning = true
- private val stateChangeLogger = KafkaController.stateChangeLogger
- val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
- val partitionStateMachine = new PartitionStateMachine(this)
- val replicaStateMachine = new ReplicaStateMachine(this)
- private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
- onControllerResignation, config.brokerId)
- // have a separate scheduler for the controller to be able to start and stop independently of the
- // kafka server
- private val autoRebalanceScheduler = new KafkaScheduler(1)
- var deleteTopicManager: TopicDeletionManager = null
- val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)
- private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
- private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
- private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
- private val brokerRequestBatch = new ControllerBrokerRequestBatch(this)
-
- private val partitionReassignedListener = new PartitionsReassignedListener(this)
- private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this)
-
在 KafkaController 类(KafkaController.scala)中定义了很多属性,我们先重点了解下面的PartitionLeaderSelector对象,主要是为分区选举出leader broker,该trait只定义了一个方法selectLeader,接收一个TopicAndPartition对象和一个LeaderAndIsr对象。TopicAndPartition表示要选leader的分区,而第二个参数表示zookeeper中保存的该分区的当前leader和ISR记录。该方法会返回一个元组包括了推举出来的leader和ISR以及需要接收LeaderAndISr请求的一组副本。
- trait PartitionLeaderSelector {
- def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int])
- }
-
通过我们上面的代码,可以看到在 KafkaController 中共定义了五种selector选举器:
所有的leader选择完成后,都要通过请求把具体的request路由到对应的handler处理。目前kafka并没有把handler抽象出来,而是每个handler都是一个函数,混在KafkaApi类中。
Kafka 的消息是保存或缓存在磁盘上的,Apache Kafka 基准测试:每秒写入2百万(在三台廉价机器上)。轻松支持每秒百万级的写入请求,超过了大部分的消息中间件,这种特性也使得Kafka在日志处理等海量数据场景广泛应用。