15.zookeeper原理解析-客户端与服务器端交互

作者:W.S.T 发布时间:March 17, 2016 分类:Zookeeper No Comments

Zookeeper集群中server数量总是确定的,所以集群中的server交互采用比较可靠的bio长连接模型;不同于集群中sever间交互zookeeper客户端其实数量是未知的,为了提高zookeeper并发性能,zookeeper客户端与服务器端交互采用nio模型。下面我们主要来讲讲zookeeper的服务器端与客户端的交互。读者对nio不了解的话不妨抽点时间去了解下,对于一些nio框架如netty,mina再如一些web容器如tomcat,jetty底层都实现一套nio框架,对于实现nio框架模型大家不妨去谷歌百度搜一下Doug Lea的scalable io in java 这个ppt。

 

客户端

ClientCnxnSocketNIO是zookeeper的nio通讯层的客户端部分,下面伪代码示例其核心代码:

ClientCnxnSocketNIO{

      doTransport() {

               if (如果之前连接没有立马连上,则在这里处理OP_CONNECT事件) {

                   sendThread.primeConnection();

               } else {

                    doIO

                }

 

             //队列中有发送的消息, 开启写

        }

 

       doIO() {

              if (sockKey.isReadable()) {

                    sendThread.readResponse(incomingBuffer);

                    updateLastHeard();

               } 

 

               if(sockKey.isWritable()) {

                      Packetp = outgoingQueue.getFirst() //从发送队列取

                      updateLastSend

                      p.requestHeader.setXid(cnxn.getXid());//设置客户端的xid

                     序列化

                     发送

                     从发送队列删除

                     加入到pendingQueue队列

                }

        }

}

 

ClientCnxn 是客户端操作ClientCnxnSocketNIO的工具,维护了发送任务线程SendThread,事件任务线程EventThead, 发送队列OutgoingQueue以及请求消息的等待队列PendingQueue。下面以伪代码来示例其核心代码

ClientCnxn {

    outgoingQueue//待向服务器端发送的队列, 客户端提交请求放入这个队列

    pendingQueue //发送以后等待响应的队列,

    

    submitRequest(){

       //client端一个封装成一个packet

        outgoingQueue.add(packet);

        selector.wakeup();

        packet.wait(); //如果是同步调用wait,应该反馈后会

    }

 

   SendThread {

       run() {

           1.设置clientCnxnSocket 最后发送时间,最后的心跳时间

           2. if(!clientCnxnSocket.isConnected()) {

                    startConnect()  //主要工作clientCnxnSocket做

              } else {

                    计算下次ping的时间, 发送心跳

                  委托给 clientCnxnSocket.doTranspor进行底层的nio传输

               } 

         }

 

        primeConnection(){

           //构建ConnectRequest

           //组合成通讯层的Packet对象,添加到发送队列,对于ConnectRequest其requestHeader为null

            outgoingQueue.addFirst

            clientCnxnSocket.enableReadWriteOnly();//确保读写事件都监听 

        }

 

        readResponse(){

           1.先读响应头,先对特殊的xid进行处理

           2. packet = pendingQueue.remove() //由于client和server都是单线程处理,多队列处理,所以认为全局有序

           3. 反序列化响应体response, 并设置到packet上

           4.finishPacket 1)同步notifyAll,结束 2)异步加入到event线程的队列

        }

    }

 

    EventThread{ //主要支持异步的回调

       run() {

 

        }

    }

}

 

大家观察客户端操作类Zookeeper里面的操作类主要分为两个参数不带callback的同步方法和参数带callback的异步方法。

1.      同步调用方法实现类似Future同步转异步模式实现

1)  Client提交请求对象封装成packet对象放入OutgoingQueue队列,并调用packet.wait()阻塞当前线程。

2)  每个Client都只有一个SendThread线程是线性处理OutgoingQueue中的请求消息的,SendThread线程通过ClientCnxnSocketNIO工具顺序从OutgoingQueue队列中取请求消息发送到服务器端,同时将请求packet加入到PendingQueue中

3)  ClientCnxnSocketNIO工具接收处理服务器端响应

4)  从PendingQueue队列取出对应的packet,并调packet.notifyAll()唤醒阻塞的线程完成同步调用

2.      异步调用的总体流程跟同步类似关键区别在于

1)  向OutgoingQueue队列提交请求后,不会调用packet.wait()阻塞当前线程,主流程继续执行

2)  同同步调用

3)  同同步调用

4)  从PendingQueue队列取出对应的packet,并调packet.callback方法完成回调处理


 

 

 

Zookeeper服务器端

NIOServerCnxnFactory工厂类,zookeeperserver用来启动监听客户端连接,每当有客户端请求连接进来,NIOServerCnxnFactory都会为这个链接构建NIOServerCnxn 实例来单独处理与这个客户端的交互

NIOServerCnxn封装了处理读取客户端请求数据与及向客户端响应数据

下面通过伪代码来实例:

NIOServerCnxnFactory  {

   configure {

        绑定端口

        作为server监听

        注册selectkey 的连接时间

    }

 

    run {  //起到accept的作用

       1. OP_ACCEPT, 将NIOServerCnxn(handler) attach到selectkey以便被读写事件使用

       2. OP_READ 和 OP_WRITE取出handler NIOServerCnxn,并调doIo

    }

}

 

 

NIOServerCnxn {

 

    构造器 {

        //设置selectkey对read感兴趣

    }

 

    doIo {

        if(k.isReadable()) {

             1. 读前四个字节, 代表请求内容长度,不包括自己的4字节

             2. 读取到字节数组中

             3. zkServer.processPacket()或者zkServer.processConnectRequest()

        }

 

        if(k.isWritable()) {

              1.从outgoingBuffers取ByteBuffer

              2.发送bytes

        }

    }

}

14.zookeeper原理解析-服务器端处理流程之交互图

作者:W.S.T 发布时间:March 17, 2016 分类:Zookeeper No Comments

1. 下面就用一张图来说明Leader端的处理器链的交互过程


2. 下面就用一张图来说明Follower(Observer类似)端的处理器链的交互过程

13.zookeeper原理解析-服务器端处理流程之处理器详解

作者:W.S.T 发布时间:March 17, 2016 分类:Zookeeper No Comments

各个processor的主要功能

1) PrepRequestProcessor

         如名字这个处理器主要功能是对请求进行预处理, 将client向server请求二进制数据反序列化成sever中请求操作。

 

PrepRequestProcessor做为leader的第一个处理器,它的请求数据来源主要来自:

(1)    Leader做一个zk服务接收客户端请求提交到PrepRequestProcessor的队列中

(2)    作为集群的leader,在LearnerHanler.run方法中接收learner向leader发送的投票请求,消息类型为Leader.REQUEST

 

PrepRequestProcessor的处理流程:

(1)    对于非事物性请求:sync,exists, getData, getChildRen,ping, setWatches 这里只做session的检查看看是否超时

(2)    对于事务请求:create, delete,setData,setAcl,check,multi,根据请求的类型创建不同的操作如:type=create è CreateRequest, type=delete èDeleteRequest  等等

(3)    Zookeeper创建新的事务号zxid

(4)    创建并设置事务请求的消息头

(5)    反序列化网络client请求对象提取数据到事务对象的消息体中

 

PrepRequestProcessor线程 { 

    run {

       1. submittedRequests.take()取出nio读取的请求

       2. 根据请求type构建对应的record对象,并将request中的ByteBuffer数据,反序列化到record中

       3. zks.getNextZxid() 生成一个新的事务号递增,作为zxid

       4. 根据cxid,zxid, sessionId等构建事务头TxnHeader

       5.1 对create/delete/setData/setACL/createSession/closeSession/check

               1)checkSession,检测是否过期

               2)创建对应的事务体CreateTxn/DeleteTxn等等

                 3)构建ChangeRecord加入到zks.outstandingChanges队列中去(FinalRequestProcessor会去处理,最终去改变数据)

       5.2 exists/getData 只是 checkSession

       6. 调下一个Processor

    }

 

processRequest() {

       //单机版被zookeeper调,将nio读取的请求加入到submittedRequests中

    }

}

 

2)ProposalRequestProcessor

         ProposalRequestProcessor的处理逻辑相对比较简单


(1)    转发到后面的处理器

(2)    如果是事务性请求(请求头存在的话),leader向follower发起操作请求,超过一半才算成功响应客户端

(3)    如果是事务性请求,调用leader的同步处理器流程

 

3)CommitProcessor

   这个处理器逻辑还是有点小复杂的, leader和learner都需要用到这个处理器

3.1) 对于非事务性的操作(查询,exist等)直接回把请求转到下一个处理器处理

3.2) leader 对于事务性操作(create, setData等)请求,CommitProcessor线程任务会hold在这里,leader中ProposalRequestProcessor处理器会将请求提案发送给所有的followers, followers响应leader,然后leader中LearnerHandler会调processAck处理响应,当超过半数的时候将调CommitProcessor.commit()方法提交请求, 紧接着CommitProcessor将请求传递到下一个处理器处理

3.2) learner对于事务性操作(create, setData等)请求CommitProcessor线程任务会hold在这里, FollowerRequestProcessor或者ObserverRequestProcessor调CommitProcessor将请求提交队列之后会立刻向leader发送事务操作提案,Follower接收到leader的commit消息或者Observer接收到leader的inform消息它们会向CommitProcessor提交请求,紧接着CommitProcessor将请求传递到下一个处理器处理

 

伪代码:

CommitProcessor{

   run() {

       1. toProcess需要交予下一个Processor的,先都交给下一个

       2. nextPending请求时对于事务操作的,有一个不为空一直循环直到有commit过来

       3. queuedRequests.size() == 0&& committedRequests.size() > 0 follower observer接收commit ,加入到toProcess集合中去

       4. nextPending != null&&  committedRequests.size() > 0  leader发起投票请求,并接收follower反馈的, 加入到toProcess集合中去

       5. nextPending == null 前面循环

       6.如果是请求reqeust是事务操作赋给nextPending对象

       7.如果不是加入到toProcess集合中去

        

       //这里主要通过nextPending对象控制请求响应的顺序

    }

 

    commit(Request){

       将request添加到committedRequests队列中去

    }

 

    processRequest(Request) {

       由上游处理器调用,将request对象添加到queuedRequests请求队列中

   }

}

 

4)ToBeAppliedRequestProcessor

         这个处理器的逻辑比较简单


1)   将请求转发给一下个处理器,必须是FinalRequestProcessor

2)   其实leader在走到这个处理器之前会在CommitProcessor中hod一会等到follower反馈在到这,follower反馈后leader的LearnerHandler的processAck会将请求加入toBeApplied集合,所以在这里对于事务请求一定会在toBeApplied中有对应的移除调,如果没有ConcurrentLinkedQueue直接会抛NoSuchElementException异常

 

5)FinalRequestProcessor

   这个处理器是最后一个处理器,真正去执行事务操作更改dataree的数据。

1)   调底层修改数据zks.processTxn(hdr, txn)

2)   将请求加入到committedLog集合中

3)   构建请求的响应,响应客户端

 

伪代码:

FinalRequestProcessor{

      processRequest(Request request) {

           //zks.outstandingChanges这个玩意起什么作用,一直没弄清楚

           1.事务头不为空,是事务类操作 {

                zks.processTxn(hdr,txn) //zkServer处理事务操作

            }

 

           如果是closeSesion,无需生成响应

 

           根据请求类型(request.type)生成响应,并调NioServerCnxn.sendResponse写入chanel通道

      }

}

 

 

6) SyncRequestProcessor

这个处理器用来将请求记录到txLog文件中,通过批量刷盘的方式来提升io的性能,这里请求只有被写入到本地磁盘后,才会被传递到下一个处理器

下面看一下伪代码:

SyncRequestProcessor线程 {

    run() {

        //flush的时间点:1. queuedRequests为空 2.toFlush.size() > 1000

        //生成新的snapshot

        调zks.getZKDatabase().append(si)添加一条事务日志

           1)成功:是事务类操作有事务头, 根据规则判断是否需要生一个新的snapshot,加入到toFlush的集合中    

           2)失败:没有事务头TxnHeader, 优化直接调下一个Processor

    }

 

    flush() {

        zks.getZKDatabase().commit();同步到本snapshot

        然后循环调下一个Processor

    }

 

    processRequest() {

       加入到阻塞队列queuedRequests中, 让同步线程自己处理

    }

}

 

7) AckRequestProcessor 

被ProposalRequestProcessor调用, leader自己做一次投票的成功响应

        

 

8) SendAckRequestProcessor

         对于leader投票请求的发送响应