<>一.ZAB协议

<>1.什么是ZAB协议?

 ZAB 协议的全称是 Zookeeper Atomic Broadcast (Zookeeper 原子广播)。Zookeeper 使用
ZAB协议作为其数据一致性的核心算法,来保证分布式事务的最终一致性。
 ZAB 协议并不像 Paxos 算法那样,是一种通用的分布式一致性算法,它是一种特别为 ZooKeeper
设计的崩溃恢复的原子消息广播算法。Zookeeper 采用一个单一的主进程接受并处理客户端的所有事务请求,并将服务器数据的状态变更以事务 Proposal
的形式广播到所有的副本进程上去。

 ZAB 协议定义:ZAB 协议是为分布式协调服务 Zookeeper 专门设计的一种支持崩溃恢复和原子广播协议。基于该协议,Zookeeper 实现了一种
主备模式(即 Leader 和 Follower 模型)的系统架构来保持集群中各个副本之间数据一致性。
 这里的主备系统架构模型,就是指只有一台客户端(Leader)负责处理外部的写事务请求,然后 Leader 客户端将数据同步到其他 Follower 节点。
 Zookeeper 客户端会随机的链接到 Zookeeper 集群中的一个节点,如果是读请求,就直接从当前节点中读取数据;如果是写请求,那么节点就会向
Leader 提交事务,Leader 接收到事务提交,会广播该事务,只要超过半数节点写入成功,该事务就会被提交。

 上图显示了 Zookeeper 如何处理集群中的数据。所有客户端写入数据都是写入到主进程(称为 Leader)中,然后,由 Leader
复制到备份进程(称为 Follower)中。从而保证数据一致性。从设计上看,和 Raft 类似。
 那么复制过程又是如何的呢?复制过程类似 2PC,ZAB 只需要 Follower 有一半以上返回 ACK
信息就可以执行提交,大大减小了同步阻塞,也提高了可用性。

 简单介绍完 ZAB协议,接下来重点介绍消息广播和崩溃恢复。整个 Zookeeper 就是在这两个模式之间切换。 简而言之,当 Leader
服务可以正常使用,就进入消息广播模式,当 Leader 不可用时,则进入崩溃恢复模式。

<>2.崩溃恢复

 一旦 Leader 服务器出现崩溃或者由于网络原因导致 Leader 服务器失去了与过半 Follower
的联系,那么就会进入崩溃恢复模式。Zookeeper 崩溃恢复的情况大致可以分为两种:Zookeeper集群初始化启动时 Leader
选举、Zookeeper 集群运行期间 Leader 重新选举。
 在 ZAB 协议中,为了保证程序的正确运行,整个恢复过程结束后需要选举出一个新的 Leader 服务器。因此 ZAB 协议需要一个高效且可靠的
Leader 选举算法,从而确保能够快速选举出新的 Leader 。
 Leader 选举算法不仅仅需要让 Leader 自己知道自己已经被选举为 Leader ,同时还需要让集群中的所有其他机器也能够快速感知到选举产生的新
Leader 服务器。
 崩溃恢复主要包括两部分:Leader 选举和数据恢复。

ZAB 协议的特性:

* ZAB 协议需要确保那些已经在 Leader 服务器上提交(Commit)的事务最终被所有的服务器提交。
* ZAB 协议需要确保丢弃那些只在 Leader 上被提出而没有被提交的事务。
崩溃恢复阶段 ZAB 协议是如何保证数据一致性的?
假设两种异常情况:

* 一个事务在 Leader 上提交了,并且过半的 Follower 都响应 ACK 了,但是 Leader 在 Commit 消息发出之前挂了。
* 假设一个事务在 Leader 提出之后,Leader 挂了。
要确保如果发生上述两种情况,数据还能保持一致性,那么 ZAB 协议选举算法必须满足以下要求:

* ZAB 协议需要确保那些已经在 Leader 服务器上提交(Commit)的事务最终被所有的服务器提交。
* ZAB 协议需要确保丢弃那些只在 Leader 上被提出而没有被提交的事务。
根据上述要求,ZAB 协议需要保证选举出来的 Leader 需要满足以下条件:

* 1)新选举出来的 Leader 不能包含未提交的 Proposal 。
即新选举的 Leader 必须都是已经提交了 Proposal 的 Follower 服务器节点。
* 2)新选举的 Leader 节点中含有最大的 zxid 。
这样做的好处是可以避免 Leader 服务器检查 Proposal 的提交和丢弃工作。
 针对这些条件,如果让Leader 选举算法能够保证新选举出来的 Leader 服务器拥有集群中所有机器编号的事务(即 ZXID 最大)
,那么就能够保证这个新选举出来的 Leader 一定具有所有已经提交的提案。而且这么做的另一个好处是:可以省去 Leader
服务器检查事务的提交和丢弃工作的这一步操作。

<>2.1Leader 选举

 Zookeeper 的 Leader 选举算法使用的是 FastLeaderElection。
/** * Check if a pair (server id, zxid) succeeds our * current vote. * *
@param id Server identifier * @param zxid Last zxid observed by the issuer of
this vote */ protected boolean totalOrderPredicate(long newId, long newZxid,
long newEpoch, long curId, long curZxid, long curEpoch) { LOG.debug("id: " +
newId+ ", proposed id: " + curId + ", zxid: 0x" + Long.toHexString(newZxid) +
", proposed zxid: 0x" + Long.toHexString(curZxid)); if(self.getQuorumVerifier().
getWeight(newId) == 0){ return false; } /* * We return true if one of the
following three cases hold: * 1- New epoch is higher * 2- New epoch is the same
as current epoch, but new zxid is higher * 3- New epoch is the same as current
epoch, new zxid is the same * as current zxid, but server id is higher. */
return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid)
|| ((newZxid == curZxid) && (newId > curId))))); }
 根据代码可得知,先比较Epoch (年号) ,再比较Zxid,最后比较服务器ld,大的胜出。

主要涉及的类:
(1)org.apache.zookeeper.server.quorum.QuorumPeer
:选举节点,管理Quorum协议的类。该类用来设置一个报文套接字并响应当前 Leader。
(2)org.apache.zookeeper.server.quorum.QuorumCnxManager
:选举通信连接管理类。用来完成服务器与服务器之间的网络交互,该类使用 TCP 实现 Leader 选举过程中的连接管理。
(3)org.apache.zookeeper.server.quorum.FastLeaderElection:具体选举算法。是 ZK 默认的
Leader 算法。
(4)org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer
:存放服务器选举信息(服务器唯一标识,服务器地址,服务器选举口)。就是zk配置文件中配置的集群信息,选举的时候,发送选票到集群中的每一个节点。
(5)org.apache.zookeeper.server.quorum.Vote:选票类。涉及到 ServerState 和 LearnerType。
(6)序列化接口:apache jute

<>2.2数据恢复

ZAB 数据同步过程中,如何处理需要丢弃的 Proposal?
 在 ZAB 的事务编号 zxid 设计中,zxid 是一个64位的数字,由 Leader 产生,而且它是自增唯一有顺序性。
 其中低32位可以看成一个简单的单增计数器,针对客户端每一个事务请求,Leader 在产生新的 Proposal
事务时,都会对该计数器加1。而高32位则代表了 Leader 周期的 epoch 编号。
 epoch 编号可以理解为当前集群所处的年代,或者周期。每次 Leader 变更之后都会在 epoch 的基础上加1,这样旧的 Leader
崩溃恢复之后,其他 Follower 接收到旧 Leader 的请求,也不会听执行,因为 Follower 只服从 epoch 最高的 Leader 命令。

 每当选举产生一个新的 Leader ,就会从这个 Leader 服务器上取出本地事务日志中最大编号 Proposal 的 zxid,并从 zxid
中解析得到对应的 epoch 编号,然后再对其加1,之后该编号就作为新的 epoch 值,并将低32位数字归零,由0开始网上计数。
 ZAB 协议通过 epoch 编号来区分 Leader 变化周期,能够有效避免不同的 Leader 错误的使用了相同的 zxid 编号提出了不一样的
Proposal 的异常情况。

基于以上策略:
 当一个包含了上一个 Leader 周期中尚未提交过的事务 Proposal 的服务器启动时,当这台机器加入集群中,以Follower 角色连上
Leader 服务器后,Leader 服务器会根据自己服务器上最后提交的 Proposal 来和 Follower 服务器的 Proposal
进行比对,比对的结果肯定是 Leader 要求 Follower 进行一个回退操作,回退到一个确实已经被集群中过半机器 Commit 的最新 Proposal。

<>(1)直接差异化同步

 示例:某个时刻Leader服务器的事务队列对应的ZXID依次是:0x200000001, 0x200000002, 0x200000003,
0x200000004, 0x200000005,而需要数据同步的服务器最后处理的ZXID为:0x200000003,这种场景就执行"直接差异化同步"
,Leader会依次将0x200000004,0x200000005同步给服务器。

<>(2)先回滚再差异化同步

 假如在ZooKeeper集群中有A,B,C三台服务器, B当选为Leader服务器。

 某个时刻,B正要处理一个ZXID=0x200000003的事务,并且已经将该事务写入到B服务器的本地的事务日志中(zk的事物日志需要进行格式化才能看),就在B要发送给其他FollowerA、C机器进行同步的时候,B服务器挂了,Proposal并没有发送出去,而此时ZooKeeper会进行新一轮选举。假设A当选为新的Leader服务器对外进行工作,客户端又提交了
0x300000001 和 0x300000002
两个事务,而此时之前的奔溃的B服务器再次启动,并开始进行数据同步。因为B之前为Leader,故它的本地日志中事务编号为:
0x200000001, 0x200000002, 0x200000003
 而A, C的本地日志中的事务编号为:0x200000001, 0x200000002, 0x300000001, 0x300000002
 这时候就需要A服务器对数据进行回滚之后再同步,这个就称之为“先回滚再差异化同步"

<>(3)仅回滚同步

 先回滚再差异化的特殊模式。先回滚再差异化同步过程中,A当选Leader后,没有写入0x300000001,
0x300000002这两笔数据。因为B复活后是Follower,Follower没有权限发起议案。

<>(4)全量同步

 如:新加入的Follower服务器。

<>3.消息广播

 在 Zookeeper 集群中,数据副本的传递策略就是采用消息广播模式。Zookeeper
中数据副本的同步方式与二段提交类似,但是却又有不同。二段提交要求协调者必须等到所有的参与者全部反馈 ACK 确认消息后,再发送 commit
消息。要求所有的参与者要么全部成功,要么全部失败。二段提交会产生严重的阻塞问题。
 ZAB 协议中 Leader 等待 Follower 的 ACK 反馈消息是指"
只要半数以上的Follower成功反馈即可,不需要收到全部Follower反馈"。

<>二.Zookeeper对请求的处理方式

<>1.事物性的请求

事务请求的处理方式:

* 所有的事务请求必须由一个全局唯一的服务器来协调处理,这样的服务器被叫做 Leader服务器。其他剩余的服务器则是 Follower 服务器。
* Leader服务器负责将一个客户端事务请求,转换成一个事务Proposal,并将该 Proposal 分发给集群中所有的 Follower
服务器,也就是向所有 Follower 节点发送数据广播请求(或数据复制)。
* 分发之后 Leader 服务器需要等待所有 Follower 服务器的反馈(ACK请求),在 ZAB 协议中,只要超过半数的 Follower
服务器进行了正确的反馈后(也就是收到半数以上的Follower的Ack请求),那么 Leader 就会再次向所有的 Follower 服务器发送 Commit
消息,要求其将上一个事务 proposal 进行提交。

Leader服务器请求处理器链:

基本流程:

* PrepRequestProcessor:请求预处理器,接收请求并将其封装为一个事务。
* ProposalRequestProcessor
:对于事务性请求,将请求转发到CommitProcessor外,还会根据请求类型创建对应的Proposal提议,并发送给所有的Follower服务器来发起一次集群内的事务投票。同时,ProposalRequestProcessor还会将事务请求交给SyncRequestProcessor进行事务日志的记录。也就是将数据持久化到磁盘,数据的存储一般可以分为日志数据、快照数据。
* SyncRequestProcessor
:用来将事务请求记录到事务日志文件中,同时会触发Zookeeper进行数据快照。为了提高写磁盘的效率,周期性调用flush操作。
Leader:对应处理器 AckRequestProcessor(不进行网络IO)。
Follower:对应处理器 SendAckRequestProcessor,会给 Leader 发送 ACK。

* AckRequestProcessor:在 SyncRequestProcessor 完成事务日志记录后,向 Proposal 的投票收集器发送
ACK 反馈,以通知投票收集器当前服务器已经完成了对该 Proposal 的事务日志记录。
* CommitProcessor:对于事务请求,会等待集群内针对 Proposal 的投票直到该 Proposal 可被提交。
* ToBeAppliedRequestProcessor:存储已经被 CommitProcessor 处理过的可被提交的Proposal。并将请求交给
FinalRequestProcessor 处理器处理,处理完后,再从toBeApplied队列中移除。
* FinalRequestProcessor:响应客户端
Request(包括创建客户端请求的响应,针对事务请求,该处理还会负责将事务应用到内存数据库中去),是处理链的最后一环。
<>2.非事务性的请求

 Follower 需要接收3中不同的消息:客户端请求、proposal 和 commit。 用来处理非事物请求(查),只有三个 Processor。

基本流程:

* FollowerRequestProcessor:接收和处理客户端请求,前转所有请求到 CommitRequestProcessor,并前转写请求到
leader;
* CommitProcessor
:直接将读请求交由FinalRequestProcessor处理,而对于写请求,转发给leader,等待leader的commit通知;当收到commit通知后,将写请求交给
FinalRequestProcessor。
* SyncRequestProcessor:接收来自leader的proposal,持久化事务,并将其交给SendAckRequestProcessor;
* SendAckRequestProcessor:向leader发送确认消息;
* FinalRequestProcessor:执行请求,响应客户端 Request。
<>二.Zookeeper的sync操作

 在分布式系统中,没有绝对的强一致性,因为存在着网络通信,也就是无解的两军问题。对于ZK来说,它是一个CP架构,它在一致性上有人认为它提供的是一种
强一致性的服务(获取数据前通过sync操作),也有人认为是单调一致性(更新时的大多数概念–过半机制),还有人为是最终一致性
(顺序一致性–zxid),反正各有各的道理。在ZAB协议中多次用到“过半”设计策略
,该策略是zk在A(可用性)与C(一致性)间做的取舍,也是zk具有高容错特性的本质。相较分布式事务中的2PC(二阶段提交协议)的“全量通过”,ZAB协议可用性更高(牺牲了部分一致性),能在集群半数以下服务宕机时正常对外提供服务。
 总的来说,zk的数据一致性是依靠ZAB协议 + 事物日志来保证的,而ZAB协议的基础是Paxos协议,Paxos协议是分布式一致性算法的鼻祖。
 客户端发送的写请求会给leader,而读请求follower是可以处理的,可能会出现读取最新的数据的时候,follwer获取不到数据的情况。
 这里,我们聊一聊 ZooKeeper 的强一致性,也就是zk的sync() 函数。sync 是指让 client 当前连接着的ZooKeeper
服务器,和 ZooKeeper 的 Leader 节点同步(sync)一下数据。
 当 follower 收到到 sync 请求时,会将这个请求添加到一个 pendingSyncs 队列里,然后将这个请求发送给 leader,直到收到
leader 的 Leader.SYNC 消息时,才将这个请求从 pendingSyncs 队列里移除,并 commit 这个请求。

FollowerRequestProcessor.java
public class FollowerRequestProcessor extends ZooKeeperCriticalThread
implements RequestProcessor { FollowerZooKeeperServer zks; @Override public void
run() { try { while (!finished) { Request request = queuedRequests.take(); if (
LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, ZooTrace.
CLIENT_REQUEST_TRACE_MASK, 'F', request, ""); } if (request == Request.
requestOfDeath) { break; } // We want to queue the request to be processed
before we submit // the request to the leader so that we are ready to receive
// the response nextProcessor.processRequest(request); // We now ship the
request to the leader. As with all // other quorum operations, sync also
follows this code // path, but different from others, we need to keep track //
of the sync operations this follower has pending, so we // add it to
pendingSyncs. switch (request.type) { case OpCode.sync: // 将请求添加到一个
pendingSyncs 队列 zks.pendingSyncs.add(request); zks.getFollower().request(request
); break; case OpCode.create: case OpCode.delete: case OpCode.setData: case
OpCode.setACL: case OpCode.createSession: case OpCode.closeSession: case OpCode.
multi: zks.getFollower().request(request); break; } } } catch (Exception e) {
handleException(this.getName(), e); } LOG.info("FollowerRequestProcessor exited
loop!"); } }
FollowerZooKeeperServer.java
public class FollowerZooKeeperServer extends LearnerZooKeeperServer { /* *
Pending sync requests */ ConcurrentLinkedQueue<Request> pendingSyncs; //
接收到leader发送的Leader.SYNC消息后,才真正commit这个请求 synchronized public void sync(){ if(
pendingSyncs.size() ==0){ LOG.warn("Not expecting a sync."); return; } Request r
= pendingSyncs.remove(); commitProcessor.commit(r); } }

 当Leader收到一个sync请求时,如果leader当前没有待commit的决议,那么leader会立即发送一个Leader.SYNC消息给follower。否则,leader会等到当前最后一个待commit的决议完成后,再发送Leader.SYNC消息给Follower。

Leader.java
public class Leader { /** * Process sync requests * * @param r the request */
synchronized public void processSync(LearnerSyncRequest r){ if(
outstandingProposals.isEmpty()){ sendSync(r); } else { List<LearnerSyncRequest>
l= pendingSyncs.get(lastProposed); if (l == null) { l = new ArrayList<
LearnerSyncRequest>(); } l.add(r); pendingSyncs.put(lastProposed, l); } } }
 这有一个隐含的逻辑,就是如果 leader 和 follower 之间的消息通信,是严格按顺序来发送的(TCP保证),因此,当 follower 接收到
Leader.SYNC 消息时,说明follower 也一定接收到了leader之前(在leader接收到sync请求之前)发送的所有提案或者commit消息
。这样,就可以确保follower和leader是同步的了。

技术
©2019-2020 Toolsou All rights reserved,
百度、阿里、腾讯内部岗位级别和薪资结构,附带求职建议!免费下载文档:给你介绍几个实用的免费下载网址Bug数能否做为技术人员考核的KPI?C语言编程之查找某学号学生成绩详解ubuntu14.04如何设置静态IPjava实现抢红包功能"头孢就酒 说走就走"?危险!服用这些药物千万别喝酒2021年1月程序员工资统计,平均14915元微软翻译、Office现开始支持因纽特语Java基础知识之笔记总结分享(超详细)