2025年4月8日 星期二 乙巳(蛇)年 正月初九 设为首页 加入收藏
rss
您当前的位置:首页 > 计算机 > 服务器 > 网络服务

Kafka 之 Broker 介绍和使用

时间:12-14来源:作者:点击数:5
城东书院 www.cdsy.xyz

Controller

在 Kafka 集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责分区的重新分配。

Kafka中的控制器选举的工作依赖于Zookeeper,成功竞选为控制器的broker会在Zookeeper中创建/controller这个临时(EPHEMERAL)节点,此临时节点的内容参考如下:

  • {"version":1,"brokerid":0,"timestamp":"1529210278988"}

其中version在目前版本中固定为1,brokerid表示称为控制器的 broker 的id编号,timestamp表示竞选称为控制器时的时间戳。

在任意时刻,集群中有且仅有一个控制器。每个broker启动的时候会去尝试去读取/controller节点的brokerid的值,如果读取到brokerid的值不为-1,则表示已经有其它broker节点成功竞选为控制器,所以当前broker就会放弃竞选;如果Zookeeper中不存在/controller这个节点,或者这个节点中的数据异常,那么就会尝试去创建/controller这个节点,当前broker去创建节点的时候,也有可能其他broker同时去尝试创建这个节点,只有创建成功的那个broker才会成为控制器,而创建失败的broker则表示竞选失败。每个broker都会在内存中保存当前控制器的brokerid值,这个值可以标识为activeControllerId。

Zookeeper 中还有一个与控制器有关的/controller_epoch节点,这个节点是持久(PERSISTENT)节点,节点中存放的是一个整型的controller_epoch值。controller_epoch用于记录控制器发生变更的次数,即记录当前的控制器是第几代控制器,我们也可以称之为“控制器的纪元”。controller_epoch的初始值为1,即集群中第一个控制器的纪元为1,当控制器发生变更时,每选出一个新的控制器就将该字段值加1。

每个和控制器交互的请求都会携带上controller_epoch这个字段,如果请求的controller_epoch值小于内存中的controller_epoch值,则认为这个请求是向已经过期的控制器所发送的请求,那么这个请求会被认定为无效的请求。如果请求的controller_epoch值大于内存中的controller_epoch值,那么则说明已经有新的控制器当选了。由此可见,Kafka通过controller_epoch来保证控制器的唯一性,进而保证相关操作的一致性。

具备控制器身份的broker需要比其他普通的broker多一份职责,具体细节如下:

  1. 监听partition相关的变化。为Zookeeper中的/admin/reassign_partitions节点注册PartitionReassignmentListener,用来处理分区重分配的动作。为Zookeeper中的/isr_change_notification节点注册IsrChangeNotificetionListener,用来处理ISR集合变更的动作。为Zookeeper中的/admin/preferred-replica-election节点添加PreferredReplicaElectionListener,用来处理优先副本的选举动作。
  2. 监听topic相关的变化。为Zookeeper中的/brokers/topics节点添加TopicChangeListener,用来处理topic增减的变化;为Zookeeper中的/admin/delete_topics节点添加TopicDeletionListener,用来处理删除topic的动作。
  3. 监听broker相关的变化。为Zookeeper中的/brokers/ids/节点添加BrokerChangeListener,用来处理broker增减的变化。
  4. 从Zookeeper中读取获取当前所有与topic、partition以及broker有关的信息并进行相应的管理。对于所有topic所对应的Zookeeper中的/brokers/topics/[topic]节点添加PartitionModificationsListener,用来监听topic中的分区分配变化。
  5. 启动并管理分区状态机和副本状态机。
  6. 更新集群的元数据信息。
  7. 如果参数auto.leader.rebalance.enable设置为true,则还会开启一个名为“auto-leader-rebalance-task”的定时任务来负责维护分区的优先副本的均衡。

ControllerContext

控制器在选举成功之后会读取Zookeeper中各个节点的数据来初始化上下文信息(ControllerContext),并且也需要管理这些上下文信息,比如为某个topic增加了若干个分区,控制器在负责创建这些分区的同时也要更新上下文信息,并且也需要将这些变更信息同步到其他普通的broker节点中。不管是监听器触发的事件,还是定时任务触发的事件,亦或者是其他事件(比如ControlledShutdown)都会读取或者更新控制器中的上下文信息,那么这样就会涉及到多线程间的同步,如果单纯的使用锁机制来实现,那么整体的性能也会大打折扣。

针对这一现象,Kafka的控制器使用单线程基于事件队列的模型,将每个事件都做一层封装,然后按照事件发生的先后顺序暂存到LinkedBlockingQueue中,然后使用一个专用的线程(ControllerEventThread)按照FIFO(First Input First Output, 先入先出)的原则顺序处理各个事件,这样可以不需要锁机制就可以在多线程间维护线程安全。

在Kafka的早期版本中,并没有采用Kafka Controller这样一个概念来对分区和副本的状态进行管理,而是依赖于Zookeeper,每个broker都会在Zookeeper上为分区和副本注册大量的监听器(Watcher)。当分区或者副本状态变化时,会唤醒很多不必要的监听器,这种严重依赖于Zookeeper的设计会有脑裂、羊群效应以及造成Zookeeper过载的隐患。在目前的新版本的设计中,只有Kafka Controller在Zookeeper上注册相应的监听器,其他的broker极少需要再监听Zookeeper中的数据变化,这样省去了很多不必要的麻烦。不过每个broker还是会对/controller节点添加监听器的,以此来监听此节点的数据变化(参考ZkClient中的IZkDataListener)。

当/controller节点的数据发生变化时,每个broker都会更新自身内存中保存的activeControllerId。如果broker在数据变更前是控制器,那么如果在数据变更后自身的brokerid值与新的activeControllerId值不一致的话,那么就需要“退位”,关闭相应的资源,比如关闭状态机、注销相应的监听器等。有可能控制器由于异常而下线,造成/controller这个临时节点会被自动删除;也有可能是其他原因将此节点删除了。

当/controller节点被删除时,每个broker都会进行选举,如果broker在节点被删除前是控制器的话,在选举前还需要有一个“退位”的动作。如果有特殊需要,可以手动删除/controller节点来触发新一轮的选举。当然关闭控制器所对应的broker以及手动向/controller节点写入新的brokerid的所对应的数据同样可以触发新一轮的选举。

TOPIC

PARTITION

在 Kafka 中 Partition 的四种状态:

  1. NonExistentPartition —— 这个状态表示该分区要么没有被创建过或曾经被创建过但后面被删除了。
  2. NewPartition —— 分区创建之后就处于NewPartition状态。在这个状态中,分区应该已经分配了副本,但是还没有选举出leader和ISR。
  3. OnlinePartition —— 一旦分区的leader被推选出来,它就处于OnlinePartition状态。
  4. OfflinePartition —— 如果leader选举出来后,leader broker宕机了,那么该分区就处于OfflinePartition状态。

四种状态的转换关系如下:

  1. NonExistentPartition -> NewPartition
    1. 首先将第一个可用的副本broker作为leader broker并把所有可用的副本对象都装入ISR,然后写leader和ISR信息到zookeeper中保存
    2. 对于这个分区而言,发送LeaderAndIsr请求到每个可用的副本broker,以及UpdateMetadata请求到每个可用的broker上
  2. OnlinePartition, OfflinePartition -> OnlinePartition
    1. 为该分区选取新的leader和ISR以及接收LeaderAndIsr请求的一组副本,然后写入leader和ISR信息到zookeeper中保存。
  3. NewPartition, OnlinePartition -> OfflinePartition
    1. 标记分区状态为离线(offline)。
  4. OfflinePartition -> NonExistentPartition
    1. 离线状态标记为不存在分区,表示该分区失败或者被删除。

PartitionLeaderSelector

KafkaController.scala

  • class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState) extends Logging with KafkaMetricsGroup {
  • this.logIdent = "[Controller " + config.brokerId + "]: "
  • private var isRunning = true
  • private val stateChangeLogger = KafkaController.stateChangeLogger
  • val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
  • val partitionStateMachine = new PartitionStateMachine(this)
  • val replicaStateMachine = new ReplicaStateMachine(this)
  • private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
  • onControllerResignation, config.brokerId)
  • // have a separate scheduler for the controller to be able to start and stop independently of the
  • // kafka server
  • private val autoRebalanceScheduler = new KafkaScheduler(1)
  • var deleteTopicManager: TopicDeletionManager = null
  • val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)
  • private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
  • private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
  • private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
  • private val brokerRequestBatch = new ControllerBrokerRequestBatch(this)
  • private val partitionReassignedListener = new PartitionsReassignedListener(this)
  • private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this)

在 KafkaController 类(KafkaController.scala)中定义了很多属性,我们先重点了解下面的PartitionLeaderSelector对象,主要是为分区选举出leader broker,该trait只定义了一个方法selectLeader,接收一个TopicAndPartition对象和一个LeaderAndIsr对象。TopicAndPartition表示要选leader的分区,而第二个参数表示zookeeper中保存的该分区的当前leader和ISR记录。该方法会返回一个元组包括了推举出来的leader和ISR以及需要接收LeaderAndISr请求的一组副本。

  • trait PartitionLeaderSelector {
  • def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int])
  • }

通过我们上面的代码,可以看到在 KafkaController 中共定义了五种selector选举器:

  1. NoOpLeaderSelector
    • 原则上不做任何事情,返回当前的leader和isr。
  2. OfflinePartitionLeaderSelector
    • 从活着的ISR中选择一个broker作为leader,如果ISR中没有活着的副本,则从assignedReplicas中选择一个副本作为leader,leader选举成功后注册到Zookeeper中,并更新所有的缓存。
  3. ReassignedPartitionLeaderSelector
    • 从可用的ISR中选取第一个作为leader,把当前的ISR作为新的ISR,将重分配的副本集合作为接收LeaderAndIsr请求的副本集合。
  4. PreferredReplicaPartitionLeaderSelector
    • 如果从assignedReplicas取出的第一个副本就是分区leader的话,则抛出异常,否则将第一个副本设置为分区leader。
  5. ControlledShutdownLeaderSelector
    • 将ISR中处于关闭状态的副本从集合中去除掉,返回一个新新的ISR集合,然后选取第一个副本作为leader,然后令当前AR作为接收LeaderAndIsr请求的副本。

所有的leader选择完成后,都要通过请求把具体的request路由到对应的handler处理。目前kafka并没有把handler抽象出来,而是每个handler都是一个函数,混在KafkaApi类中。

kafka 的高吞吐率

Kafka 的消息是保存或缓存在磁盘上的,Apache Kafka 基准测试:每秒写入2百万(在三台廉价机器上)。轻松支持每秒百万级的写入请求,超过了大部分的消息中间件,这种特性也使得Kafka在日志处理等海量数据场景广泛应用。

城东书院 www.cdsy.xyz
方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门
本栏推荐