2025年4月2日 星期三 乙巳(蛇)年 正月初三 设为首页 加入收藏
rss
您当前的位置:首页 > 计算机 > 服务器 > 网络服务

Zookeeper 源码分析 - NIOServerCnxnFactory

时间:12-14来源:作者:点击数:4

Zookeeper 可以说是业界最流行的分布式协调解决方案,其源码值得我们好好静下心来学习和研究。

这篇文章主要分析 NIOServerCnxnFactory 这个类。NIOServerCnxnFactory 和 NettyServerCnxnFactory 是 Zookeeper 服务端用来处理连接的核心类,前者基于 NIO,后者基于 Netty 框架。废话少说,让我们一起来看下 NIOServerCnxnFactory 这个类是如何实现的:)。

NIOServerCnxnFactory

NIOServerCnxnFactory 基于 NIO 实现了一个多线程的 ServerCnxnFactory,线程间的通信都是通过 queue 来完成的。NIOServerCnxnFactory 包含的线程如下:

  • 1 个 accept 线程,用来监听端口并接收连接,然后把该连接分派给 selector 线程。
  • N 个 selecotr 线程,每个 selctor 线程平均负责 1/N 的连接。使用 N 个 selector 线程的原因在于,在大量连接的场景下,select() 操作本身可能会成为性能瓶颈。
  • N 个 worker 线程,用来负责 socket 的读写。如果 N 为 0,那么 selector 线程自身会进行 socket 读写。
  • 1 个管理连接的线程,用来关闭空闲而且没有建立 session 的连接。

NIOServerCnxnFactory 的启动入口为 startup 方法,如下所示:

  • public void startup(ZooKeeperServer zks, boolean startServer)
  • throws IOException, InterruptedException {
  • //自身的启动逻辑
  • start();
  • //设置 zkServer
  • setZooKeeperServer(zks);
  • if (startServer) {
  • //启动 zkServer
  • zks.startdata();
  • zks.startup();
  • }
  • }

start() 方法包含自身的启动逻辑,而 zks.startdata() 和 zks.startup() 用来启动 zkServer。NIOServerCnxnFactory 是用来管理连接的,而数据处理逻辑则由 zkServer 完成。start() 方法的逻辑如下所示:

  • public void start() {
  • stopped = false;
  • //worker 线程服务,用来进行 socket 的 I/O
  • if (workerPool == null) {
  • workerPool = new WorkerService(
  • "NIOWorker", numWorkerThreads, false);
  • }
  • //selector 线程,用来监听 socket 事件
  • for(SelectorThread thread : selectorThreads) {
  • if (thread.getState() == Thread.State.NEW) {
  • thread.start();
  • }
  • }
  • // accept 线程
  • if (acceptThread.getState() == Thread.State.NEW) {
  • acceptThread.start();
  • }
  • // 连接管理线程
  • if (expirerThread.getState() == Thread.State.NEW) {
  • expirerThread.start();
  • }
  • }

可以看到,start 方法主要生成或启动上述的 accept 线程、selector 线程、worker 线程和连接管理线程。

accept 线程

accept 线程的 run() 方法如下:

  • public void run() {
  • try {
  • //判断是否需要退出
  • while (!stopped && !acceptSocket.socket().isClosed()) {
  • try {
  • //监听连接事件,并建立连接
  • select();
  • } catch (RuntimeException e) {
  • LOG.warn("Ignoring unexpected runtime exception", e);
  • } catch (Exception e) {
  • LOG.warn("Ignoring unexpected exception", e);
  • }
  • }
  • } finally {
  • //关闭 selector
  • closeSelector();
  • if (!reconfiguring) {
  • //唤醒 selector 线程并通知 worker 线程关闭
  • NIOServerCnxnFactory.this.stop();
  • }
  • LOG.info("accept thread exitted run method");
  • }
  • }​

accept 线程主要监听连接事件,并建立连接,并分派给 selector。在退出时,关闭它自身的 selector,然后唤醒用来进行 socket I/O 的 selector 线程,最后通知 worker 线程退出。

accept 线程在 select 方法中监听连接事件,然后进入 doAccept() 方法建立连接,分派给 selector 线程,doAccept() 方法如下所示:

  • private boolean doAccept() {
  • boolean accepted = false;
  • SocketChannel sc = null;
  • try {
  • //建立连接
  • sc = acceptSocket.accept();
  • accepted = true;
  • //防止来自一个 IP 的连接是否过多
  • InetAddress ia = sc.socket().getInetAddress();
  • int cnxncount = getClientCnxnCount(ia);
  • if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
  • throw new IOException("Too many connections from " + ia
  • + " - max is " + maxClientCnxns );
  • }
  • LOG.info("Accepted socket connection from "
  • + sc.socket().getRemoteSocketAddress());
  • sc.configureBlocking(false);
  • //使用轮询来将连接分派给某个 selector 线程
  • if (!selectorIterator.hasNext()) {
  • selectorIterator = selectorThreads.iterator();
  • }
  • SelectorThread selectorThread = selectorIterator.next();
  • if (!selectorThread.addAcceptedConnection(sc)) {
  • throw new IOException(
  • "Unable to add connection to selector queue"
  • + (stopped ? " (shutdown in progress)" : ""));
  • }
  • acceptErrorLogger.flush();
  • } catch (IOException e) {
  • acceptErrorLogger.rateLimitLog(
  • "Error accepting new connection: " + e.getMessage());
  • fastCloseSock(sc);
  • }
  • return accepted;
  • }​

如代码注释所示,doAccept 方法主要做了两件事:

  • 如果某个客户端连接过多则拒绝其建立新连接,防止少量客户端占用所有连接资源。
  • 使用轮询来从 N 个 selector 线程中选出一个 selector 线程,并且调用 selectorThread.addAcceptedConnection(sc) 方法来将连接分派给该 selector 线程。调用该方法会把连接扔到这个 selector 线程的 acceptedQueue(类型为 LinkedBlockingQueue)中,然后调用 selector.wakeup() 唤醒 selector 进行处理。
selector 线程

selector 线程的 run 方法如下所示:

  • public void run() {
  • try {
  • while (!stopped) {
  • try {
  • //监听读写事件并处理
  • select();
  • //处理 accept 线程新分派的连接
  • processAcceptedConnections();
  • //更新连接监听事件
  • processInterestOpsUpdateRequests();
  • } catch (RuntimeException e) {
  • LOG.warn("Ignoring unexpected runtime exception", e);
  • } catch (Exception e) {
  • LOG.warn("Ignoring unexpected exception", e);
  • }
  • }
  • //......
  • } finally {
  • closeSelector();
  • // 唤醒 accept 线程及其他线程,并通知 worker 线程退出
  • NIOServerCnxnFactory.this.stop();
  • LOG.info("selector thread exitted run method");
  • }
  • }​

可以看到,selector 线程主要做三件事:

  • select():监听读写事件并处理;
  • processAcceptedConnections():处理 accept 线程新分派的连接;
  • processInterestOpsUpdateRequests():更新连接监听事件

其中在 select() 方法中,selector 线程会把有事件发生的连接封装成 IOWorkRequest 对象,然后调用 workerPool.schedule(workRequest) 来交给 worker 线程来处理。

worker 线程

worker 线程的核心处理逻辑在 IOWorkRequest 的 doWork() 中,如下所示:

  • public void doWork() throws InterruptedException {
  • //如果 Channel 已经关闭则清理该 SelectionKey
  • if (!key.isValid()) {
  • selectorThread.cleanupSelectionKey(key);
  • return;
  • }
  • //如果可读或可写,则调用 NIOServerCnxn.doIO 方法,通知 NIOServerCnxn 连接对象进行 IO 读写及处理
  • if (key.isReadable() || key.isWritable()) {
  • cnxn.doIO(key);
  • //如果已经 shutdown 则关闭连接
  • if (stopped) {
  • cnxn.close();
  • return;
  • }
  • //如果 Channel 已经关闭则清理该 SelectionKey
  • if (!key.isValid()) {
  • selectorThread.cleanupSelectionKey(key);
  • return;
  • }
  • //更新该会话的过期时间
  • touchCnxn(cnxn);
  • }
  • //已经处理完读写,重新标记该连接已准备好新的 select 事件监听
  • cnxn.enableSelectable();
  • //把该连接重新放到 selectThread 的 updateQueue 中,selectThread 会在处理处理完所有 Channel 的读写和新连接后,更新此 Channel 的注册监听事件
  • if (!selectorThread.addInterestOpsUpdateRequest(key)) {
  • cnxn.close();
  • }
  • }​

具体逻辑见代码注释。

方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门
本栏推荐