Kafka源码解析(6)--控制器的实现原理

一. 概述

Kafka集群的一些重要信息都记录在ZK中,比如集群的所有代理节点、主题的所有分区、分区的副本信息(副本集、主副本、同步的副本集)。每个broker都有一个控制器,为了管理整个集群Kafka选利用zk选举模式,为整个集群选举一个“中央控制器”或”主控制器“,控制器其实就是一个broker节点,除了一般broker功能外,还具有分区首领选举功能。中央控制器管理所有节点的信息,并通过向ZK注册各种监听事件来管理整个集群节点、分区的leader的选举、再平衡等问题。外部事件会更新ZK的数据,ZK中的数据一旦发生变化,控制器都要做不同的响应处理。

控制器是实现kafka副本机制的核心组件。控制器工作包括:

  • 主副本选举(PartitionLeaderSelector)
  • 管理分区状态机(PartitionStateMachine)
  • 管理副本状态机(ReplicaStateMachine)
  • 管理多种类型的监听器

控制器通过注册多种监听器来完成相应的处理,启动的时候完成了以下监听器的注册:

  • 在控制器中注册管理性质的监听器(重新分配分区、ISR改变、最优副本选举)
  • 在分区状态机中注册更改主题的监听器
  • 在副本状态机中注册更改代理节点的监听器

二. 主要流程

2.1 数据结构

两个关键的变量:

  • partitionReplicaAssignment: 分配给分区的所有副本。比如
  • partitionLeadershipInfo: 分区的主副本、ISR集合。用于主副本选举等。

控制器上下文对象可以认为是控制器工作时的数据存储和共享介质。控制器通过管理这些元数据,来管理集群的各个节点。当ZK节点发生变化时,控制器的监听事件会被触发,控制器根据ZK变化的值来更新本地的元数据。并根据分区和副本状态机的情况做响应的处理,最后会将结果同步给分区对应的所有broker。

2.2 选举主控制器

2.3.1 控制器

每个节点都要和ZK交互,它们作为ZK的客户端,会建立与ZK服务端的网络连接。每个节点都会启动一个控制器,但在所有的代理节点中还会选举一个主控制器。 代理节点在启动控制器会先注册一个回话实现的监听器(SessionExpirasionListener), 然后才通过选举器(ZookeeperLeaderElector)启动选举过程, 并在/controller节点注册数据改变监听器. 当会话失效或数据改变都会触发对应的监听器,让控制器重新参与选举。如图:

主控制器的选举主要利用的是ZK的leader选举机制,每个代理节点都会参与竞选主控制器,只有一个节点可以成为代理节点的主控制器。其他代理节点只有在主控制器出现故障或会话失效时参与领导选举。每个代理节点都会作为ZK客户端,向ZK服务端尝试创建/controller的临时节点, 最终只有一个代理节点可以成功创建/controller节点。

各节点启动的时候都会通过创建/controller 节点竞选主控制器,但只有一个成为主控制器。三个节点都会注册会话失效监听器,并在/controller节点注册数据改变监听器

如果是主控制器产生会话失效,就会删除/controller临时节点。其他节点就会收到/controller节点的数据改变事件,它们的选举器都会尝试重新创建/controller竞选主控制器。

主控制器向ZK注册的监听器其实是Watcher, ZK的Watcher是一次性的。 因此使用完后需要重新再次注册。控制器的大部分逻辑都是通过注册的监听器来完成的:

  • 首先控制器会向ZK节点注册监听器,每种监听器都有具体的事件处理器。
  • ZK节点数据更新后,会触发监听器调用不同的回调方法
  • 控制器执行具体的事件处理器,处理完成后,再次注册“监听器”,为下次事件触发做准备。

注册的监听器列表:

其中有三个事件是比较关键,通过这几个事件监听器来维护集群主副本的选举、故障转移和分区的分配管理等:

  • TopicChangeListener. 会监听/brokers/kafkas/节点下变化事件,主要是监听主题的增加和删除事件。
  • PartitionModificationsListener。 监听/brokers/topics/topic 节点的数据变化,比如增加分区等操作事件。
  • BrokerChangeListener。 监听/brokers/ids, 监听代理节点的变化,如上线和下线事件。用于做再平衡和负载均衡处理。

控制器选举好后,控制器进行初始化,通道管理器会建立到集群各个代理节点的网络连接;初始化”选举最优副本作为主副本“、”重新分配分区“、”删除主题的管理器“等。控制器上掌握各及节点信息,同时掌握各个主题、分区、副本及ISR的信息。

2.3.2 zookeeper选举过程

zookeeper 3.4.0 以后采用了TCP版本的选举算法FastLeaderElection.

  1. 每个节点都发送投票,刚开始都会选择自己作为leader并广播其他节点。 投票格式(SID, ZXID), SID和ZXID是投票的服务器的值唯一ID和事务ID。
  2. 各个节点收到来自其他节点的投票
  3. 判断轮次,并更新轮次。
    • 如果外部投票的轮次大于内部投票轮次,则立即更新自己的选举轮次。并清空所有已收到的投票,然后使用初始化的投票进行PK确认。
      • 如果外部轮次小于内部轮次,则不做处理。
      • 返回步骤2.
      • 如果一致,则进行PK。
  4. 处理投票。 在收到投票后,针对每个投票,服务器都需要将别人的投票和自己的投票做PK。 PK规则:
    • 优先检查ZXID。ZXID比较大的服务器优先作为leader.
    • 如果ZXID相同的话,那么比较myid。 myid比较大的服务器作为leader服务器。
      根据比较后,更新自己的投票,并在向其他所有机器发出投票信息。
  5. 统计投票。 服务器统计所有投票,如果已经超过半数的机器就作为Leader
  6. 改变服务器状态。 如果是Leader,就更新为LEADING. 如果是Follower就更新为FOLLOWER.

2.3 管理分区和副本状态机

分区和副本都有四种状态: 新建、在线、离线和不存在。分区的四种状态为:

  • 新增分区(NewPartition)
  • 在线分区(OnlinePartition)
  • 离线分区(OfflinePartition)
  • 不存在分区(NonExistenPartition)

副本的四种状态:

  • 新建副本(NewReplica)
  • 在线副本(OnlineReplica)
  • 离线副本(OfflineReplica)
  • 不存在副本(NonExistentReplica)

当外部事件发生变化的时候会调用状态机的状态转移方法,根据不同的状态做出不同的响应。控制器通过分区和副本状态机来管理集群节点、实现主副本选举已经再平衡操作等。

  • 分区从“新建分区”到“上线分区”采用固定算法选择主副本,即选举第一个副本作为分区的主副本。
  • 从“下线状态”或“上线状态”转为“上线状态”,需要完成一次选举。

状态转移过程

  1. 新建主题
    1
    kafka-topic.sh --create --topic test --partitions 3

创建主题test为例,新建主题时,会修改ZK节点/brokers/topics,创建了一个/brokers/topics/test的节点。而/brokers/topics该节点上的更改主题监听器(TopicChangeHandler)捕获创建主题事件。

  • TopicChangeHandler首先会注册一个分区变更事件处理器PartitionModificationHandler。
  • 调用newPartitionCreation(topics)方法,创建分区
    • 更新分区状态从到“不存在状态”更新到“新建分区”状态 。 在转换前不存在分区,转换后分区有副本分区(因此可以执行后面的副本分区的处理)。
    • 更新副本状态从“不存在状态”到“新建状态”。
    • 更新分区状态从“新建状态”到“在线状态”。
      • 调用zk创建各个副本节点
      • 根据不同的当前状态,选择不同的分区选举类,为分区选举副本。
      • 为新的分区初始化主副本和ISR
      • 并将结果同步到分区对应的所有代理节点brokers
    • 更新副本状态从“新建状态”到“在线状态” 。
  1. 创建分区
    ZK创建分区和副本后,就会调用PartitionModificationHandler。 执行各种状态的变化。

初始化状态机

如果控制器在故障转移时,会分别启动副本状态机和分区状态机,并根据上下文初始化分区和副本的状态。

  • 存活的副本,初始化为上线
  • 不存活的副本,初始化为“删除失败”
  • 分区有主副本,初始化为上线
    • 分区有主副本,但不存活,则初始化为下线
    • 分区没有副本,初始化为新建

2.4 选举主副本

分区从”下线状态“或”上线状态“到“上线状态”都要重新选举分区的主副本:

  • 首先读取分区当前的主副本、ISR集合
  • 优先从ISR中选择第一个副本作为主副本。 如果第一副本挂了,就会选择其他副本作为主副本。
  • 如果ISR都挂了,那么就从AR(所有的副本)中选择第一个存活的副本作为主副本。

选择最优副本选举(第一个副本作为leader)是为了分区平衡,因为kafka的分区分配操作保证了分区的主副本会均匀的分布在所有节点上。kafka为了均衡将所有主副本均衡地分配到各个代理节点上。会有一个分区平衡的后台线程,定时检查最优副本(第一个副本)是不是主副本,如果不是则会进行重新选举,将最优副本作为主副本。

控制器会发送LeaderAndIsr请求给分区的所有存活副本,让这些代理节点更新元数据。

分区分配和重新分配

副本分配算法例如以下:

  • 将全部N Broker和待分配的i个Partition排序.
  • 将第i个Partition分配到第(i mod n)个Broker上.
  • 将第i个Partition的第j个副本分配到第((i + j) mod n)个Broker上.

每一个topic的分区0都会被分配在broker 0上。第1个分区都分配到broker 1上。直到partition的id超过broker的数据才開始从头開始反复,这样会导致前面几台机器的压力比后面的机器压力更大。

因此。kafka是先随机挑选一个broker放置分区0,然后再按顺序放置其他分区。

以下图为例, 这里分区0放到了broker5中。分区1–broker6。分区2—broker7….:

节点的上下线都会引起控制器进行分区重新分配,并选举主副本。控制器会发送LeaderAndIsr请求给分区的所有存活副本,让这些代理节点更新元数据。

2.5 管理多种类型的监听器

2.5.1 代理节点上下线

每个broker都有一个唯一的标识符,这个表示符可以指定指定也可以自动生成。在broker启动的时候,它会通过创建临时节点把自己的ID注册到ZK上,控制器监听/brokers/ids路径。当有broker上下线时,控制器就会得到通知。

代理节点上线过程:

  • 控制器向其他节点发送请求更新元数据请求
  • 将所有副本转为上线
  • 如果主副本不是最优副本则重新选举主副本,并转换为在线

代理节点下线步骤:

  • 将该代理节点上的所有分区的状态从“上线状态”设置为“下线状态”
  • 如果节点上存在主副本分区,那么需要为这些分区重新选举主副本。
  • 为分区选举主副本后,会发送LeaderAndIsr请求给分区所有存活的副本。即控制器通知分区所有的其他副本节点。
  • 如果分区选举主副本后,将分区状态从“下线状态”改为上线状态
  • kafka分区及副本在broker的分配
0%