首页
Code
Work
Nginx
ChatGPT
首页
Code
Work
Nginx
ChatGPT
321Cc ChatGPT AI助理
小工具
随机
归档
关于
15.zookeeper原理解析-客户端与服务器端交互
Zookeeper
2016-03-17
W.S.T
Zookeeper集群中server数量总是确定的,所以集群中的server交互采用比较可靠的bio长连接模型;不同于集群中sever间交互zookeeper客户端其实数量是未知的,为了提高zookeeper并发性能,zookeeper客户端与服务器端交互采用nio模型。下面我们主要来讲讲zookeeper的服务器端与客户端的交互。读者对nio不了解的话不妨抽点时间去了解下,对于一些nio框架如netty,mina再如一些web容器如tomcat,jetty底层都实现一套nio框架,对于实现nio框架模型大家不妨去谷歌百度搜一下Doug Lea的scalable io in java 这个ppt。 客户端 ClientCnxnSocketNIO是zookeeper的nio通讯层的客户端部分,下面伪代码示例其核心代码: ```r ClientCnxnSocketNIO{ doTransport() { if (如果之前连接没有立马连上,则在这里处理OP\_CONNECT事件) { sendThread.primeConnection(); } else { doIO } //队列中有发送的消息, 开启写 } doIO() { if (sockKey.isReadable()) { sendThread.readResponse(incomingBuffer); updateLastHeard(); } if(sockKey.isWritable()) { Packetp = outgoingQueue.getFirst() //从发送队列取 updateLastSend p.requestHeader.setXid(cnxn.getXid());//设置客户端的xid 序列化 发送 从发送队列删除 加入到pendingQueue队列 } } } ``` ClientCnxn 是客户端操作ClientCnxnSocketNIO的工具,维护了发送任务线程SendThread,事件任务线程EventThead, 发送队列OutgoingQueue以及请求消息的等待队列PendingQueue。下面以伪代码来示例其核心代码 ```r ClientCnxn { outgoingQueue//待向服务器端发送的队列, 客户端提交请求放入这个队列 pendingQueue //发送以后等待响应的队列, submitRequest(){ //client端一个封装成一个packet outgoingQueue.add(packet); selector.wakeup(); packet.wait(); //如果是同步调用wait,应该反馈后会 } SendThread { run() { 1.设置clientCnxnSocket 最后发送时间,最后的心跳时间 2. if(!clientCnxnSocket.isConnected()) { startConnect() //主要工作clientCnxnSocket做 } else { 计算下次ping的时间, 发送心跳 委托给 clientCnxnSocket.doTranspor进行底层的nio传输 } } primeConnection(){ //构建ConnectRequest //组合成通讯层的Packet对象,添加到发送队列,对于ConnectRequest其requestHeader为null outgoingQueue.addFirst clientCnxnSocket.enableReadWriteOnly();//确保读写事件都监听 } readResponse(){ 1.先读响应头,先对特殊的xid进行处理 2. packet = pendingQueue.remove() //由于client和server都是单线程处理,多队列处理,所以认为全局有序 3. 反序列化响应体response, 并设置到packet上 4.finishPacket 1)同步notifyAll,结束 2)异步加入到event线程的队列 } } EventThread{ //主要支持异步的回调 run() { } } } ``` 大家观察客户端操作类Zookeeper里面的操作类主要分为两个参数不带callback的同步方法和参数带callback的异步方法。 1. 同步调用方法实现类似Future同步转异步模式实现 1) Client提交请求对象封装成packet对象放入OutgoingQueue队列,并调用packet.wait()阻塞当前线程。 2) 每个Client都只有一个SendThread线程是线性处理OutgoingQueue中的请求消息的,SendThread线程通过ClientCnxnSocketNIO工具顺序从OutgoingQueue队列中取请求消息发送到服务器端,同时将请求packet加入到PendingQueue中 3) ClientCnxnSocketNIO工具接收处理服务器端响应 4) 从PendingQueue队列取出对应的packet,并调packet.notifyAll()唤醒阻塞的线程完成同步调用 2. 异步调用的总体流程跟同步类似关键区别在于 1) 向OutgoingQueue队列提交请求后,不会调用packet.wait()阻塞当前线程,主流程继续执行 2) 同同步调用 3) 同同步调用 4) 从PendingQueue队列取出对应的packet,并调packet.callback方法完成回调处理 ![](https://321cc.cn/usr/uploads/2023/04/3305946306.png) Zookeeper服务器端 NIOServerCnxnFactory工厂类,zookeeperserver用来启动监听客户端连接,每当有客户端请求连接进来,NIOServerCnxnFactory都会为这个链接构建NIOServerCnxn 实例来单独处理与这个客户端的交互 NIOServerCnxn封装了处理读取客户端请求数据与及向客户端响应数据 下面通过伪代码来实例: ```r NIOServerCnxnFactory { configure { 绑定端口 作为server监听 注册selectkey 的连接时间 } run { //起到accept的作用 1. OP\_ACCEPT, 将NIOServerCnxn(handler) attach到selectkey以便被读写事件使用 2. OP\_READ 和 OP\_WRITE取出handler NIOServerCnxn,并调doIo } } NIOServerCnxn { 构造器 { //设置selectkey对read感兴趣 } doIo { if(k.isReadable()) { 1. 读前四个字节, 代表请求内容长度,不包括自己的4字节 2. 读取到字节数组中 3. zkServer.processPacket()或者zkServer.processConnectRequest() } if(k.isWritable()) { 1.从outgoingBuffers取ByteBuffer 2.发送bytes } } } ```
Zookeeper
请输入评论内容
取消回复
提交评论
W.S.T
Attitude is everything
热门标签
Zookeeper
Oracle
linux
自动备份
9i
10g
系统集成项目管理工程师
2013
上半年
上午试题
答案
typecho
blog
baidu
bae
热门文章
BLOG成功托管到百度BAE
回来了~~
终于把BLOG从PJBLOG转换到Typecho
有趣的弹钢琴游戏
SQLSERVER2000的jdbc驱动程序连接
如何把ACCESS转成MSSQL数据库
学生街~~~
最新评论
321cc: 6年前哈~~
ZEONLEE: 我试下.
ZEONLEE: URL规则那边没有完全啊,根目录下面的文件都无法访问...