Zookeeper 可以说是业界最流行的分布式协调解决方案,其源码值得我们好好静下心来学习和研究。
这篇文章主要分析 NIOServerCnxnFactory 这个类。NIOServerCnxnFactory 和 NettyServerCnxnFactory 是 Zookeeper 服务端用来处理连接的核心类,前者基于 NIO,后者基于 Netty 框架。废话少说,让我们一起来看下 NIOServerCnxnFactory 这个类是如何实现的:)。
NIOServerCnxnFactory 基于 NIO 实现了一个多线程的 ServerCnxnFactory,线程间的通信都是通过 queue 来完成的。NIOServerCnxnFactory 包含的线程如下:
NIOServerCnxnFactory 的启动入口为 startup 方法,如下所示:
- public void startup(ZooKeeperServer zks, boolean startServer)
- throws IOException, InterruptedException {
- //自身的启动逻辑
- start();
- //设置 zkServer
- setZooKeeperServer(zks);
- if (startServer) {
- //启动 zkServer
- zks.startdata();
- zks.startup();
- }
- }
start() 方法包含自身的启动逻辑,而 zks.startdata() 和 zks.startup() 用来启动 zkServer。NIOServerCnxnFactory 是用来管理连接的,而数据处理逻辑则由 zkServer 完成。start() 方法的逻辑如下所示:
- public void start() {
- stopped = false;
- //worker 线程服务,用来进行 socket 的 I/O
- if (workerPool == null) {
- workerPool = new WorkerService(
- "NIOWorker", numWorkerThreads, false);
- }
- //selector 线程,用来监听 socket 事件
- for(SelectorThread thread : selectorThreads) {
- if (thread.getState() == Thread.State.NEW) {
- thread.start();
- }
- }
- // accept 线程
- if (acceptThread.getState() == Thread.State.NEW) {
- acceptThread.start();
- }
- // 连接管理线程
- if (expirerThread.getState() == Thread.State.NEW) {
- expirerThread.start();
- }
- }
可以看到,start 方法主要生成或启动上述的 accept 线程、selector 线程、worker 线程和连接管理线程。
accept 线程的 run() 方法如下:
- public void run() {
- try {
- //判断是否需要退出
- while (!stopped && !acceptSocket.socket().isClosed()) {
- try {
- //监听连接事件,并建立连接
- select();
- } catch (RuntimeException e) {
- LOG.warn("Ignoring unexpected runtime exception", e);
- } catch (Exception e) {
- LOG.warn("Ignoring unexpected exception", e);
- }
- }
- } finally {
- //关闭 selector
- closeSelector();
- if (!reconfiguring) {
- //唤醒 selector 线程并通知 worker 线程关闭
- NIOServerCnxnFactory.this.stop();
- }
- LOG.info("accept thread exitted run method");
- }
- }
accept 线程主要监听连接事件,并建立连接,并分派给 selector。在退出时,关闭它自身的 selector,然后唤醒用来进行 socket I/O 的 selector 线程,最后通知 worker 线程退出。
accept 线程在 select 方法中监听连接事件,然后进入 doAccept() 方法建立连接,分派给 selector 线程,doAccept() 方法如下所示:
- private boolean doAccept() {
- boolean accepted = false;
- SocketChannel sc = null;
- try {
- //建立连接
- sc = acceptSocket.accept();
- accepted = true;
- //防止来自一个 IP 的连接是否过多
- InetAddress ia = sc.socket().getInetAddress();
- int cnxncount = getClientCnxnCount(ia);
- if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
- throw new IOException("Too many connections from " + ia
- + " - max is " + maxClientCnxns );
- }
- LOG.info("Accepted socket connection from "
- + sc.socket().getRemoteSocketAddress());
- sc.configureBlocking(false);
- //使用轮询来将连接分派给某个 selector 线程
- if (!selectorIterator.hasNext()) {
- selectorIterator = selectorThreads.iterator();
- }
- SelectorThread selectorThread = selectorIterator.next();
- if (!selectorThread.addAcceptedConnection(sc)) {
- throw new IOException(
- "Unable to add connection to selector queue"
- + (stopped ? " (shutdown in progress)" : ""));
- }
- acceptErrorLogger.flush();
- } catch (IOException e) {
- acceptErrorLogger.rateLimitLog(
- "Error accepting new connection: " + e.getMessage());
- fastCloseSock(sc);
- }
- return accepted;
- }
如代码注释所示,doAccept 方法主要做了两件事:
selector 线程的 run 方法如下所示:
- public void run() {
- try {
- while (!stopped) {
- try {
- //监听读写事件并处理
- select();
- //处理 accept 线程新分派的连接
- processAcceptedConnections();
- //更新连接监听事件
- processInterestOpsUpdateRequests();
- } catch (RuntimeException e) {
- LOG.warn("Ignoring unexpected runtime exception", e);
- } catch (Exception e) {
- LOG.warn("Ignoring unexpected exception", e);
- }
- }
- //......
- } finally {
- closeSelector();
- // 唤醒 accept 线程及其他线程,并通知 worker 线程退出
- NIOServerCnxnFactory.this.stop();
- LOG.info("selector thread exitted run method");
- }
- }
可以看到,selector 线程主要做三件事:
其中在 select() 方法中,selector 线程会把有事件发生的连接封装成 IOWorkRequest 对象,然后调用 workerPool.schedule(workRequest) 来交给 worker 线程来处理。
worker 线程的核心处理逻辑在 IOWorkRequest 的 doWork() 中,如下所示:
- public void doWork() throws InterruptedException {
- //如果 Channel 已经关闭则清理该 SelectionKey
- if (!key.isValid()) {
- selectorThread.cleanupSelectionKey(key);
- return;
- }
- //如果可读或可写,则调用 NIOServerCnxn.doIO 方法,通知 NIOServerCnxn 连接对象进行 IO 读写及处理
- if (key.isReadable() || key.isWritable()) {
- cnxn.doIO(key);
- //如果已经 shutdown 则关闭连接
- if (stopped) {
- cnxn.close();
- return;
- }
- //如果 Channel 已经关闭则清理该 SelectionKey
- if (!key.isValid()) {
- selectorThread.cleanupSelectionKey(key);
- return;
- }
- //更新该会话的过期时间
- touchCnxn(cnxn);
- }
- //已经处理完读写,重新标记该连接已准备好新的 select 事件监听
- cnxn.enableSelectable();
- //把该连接重新放到 selectThread 的 updateQueue 中,selectThread 会在处理处理完所有 Channel 的读写和新连接后,更新此 Channel 的注册监听事件
- if (!selectorThread.addInterestOpsUpdateRequest(key)) {
- cnxn.close();
- }
- }
具体逻辑见代码注释。