上一节[[《跟闪电侠学Netty》阅读笔记 - 开篇入门Netty]] 中介绍了Netty的入门程序,本节如标题所言将会一步步分析入门程序的代码含义。
public static void main(String[] args) {
ServerBootstrap serverBootstrap = new ServerBootstrap();
NioEventLoopGroup boos = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
serverBootstrap
.group(boos, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println(msg);
}
});
}
})
.bind(8000);
}
服务端一上来先构建两个对象NioEventLoopGroup,这两个对象将直接决定Netty启动之后的工作模式,在这个案例中boos和JDK的NIO编程一样负责进行新连接的“轮询”,他会定期检查客户端是否已经准备好可以接入。worker则负责处理boss获取到的连接,当检查连接有数据可以读写的时候就进行数据处理。
NioEventLoopGroup boos = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
那么应该如何理解?其实这两个Group对象简单的看成是线程池即可,和JDBC的线程池没什么区别。通过阅读源码可以知道,bossGroup只用了一个线程来处理远程客户端的连接,workerGroup拥有的线程数默认为2倍的cpu核心数。
那么这两个线程池是如何配合的?boss和worker的工作模式和我们平时上班,老板接活员工干活的模式是类似的。由bossGroup负责接待,再转交给workerGroup来处理具体的业务。
整体概念上贴合NIO的设计思路,不过它要做的更好。
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.
.xxx()
.xxx()
服务端引导类是ServerBootstrap,引导器指的是引导开发者更方便快速的启动Netty服务端/客户端,
这里使用了比较经典的建造者设计模式。
.group(boos, worker)
group方法绑定boos和work使其各司其职,这个操作可以看作是绑定线程池。
注意gorup方法一旦确定就意味着Netty的线程模型被固定了,中途不允许切换,整个运行过程Netty会按照代码实现计算的线程数提供服务。
下面是group的api注释:
Set the EventLoopGroup for the parent (acceptor) and the child (client). These EventLoopGroup's are used to handle all the events and IO for ServerChannel and Channel's.
机翻过来就是:为父(acceptor)和子(client)设置EventLoopGroup。这些EventLoopGroup是用来处理ServerChannel和Channel的所有事件和IO的。注意这里的Channel's是Netty中的概念,初学的时候可以简单的类比为BIO编程的Socket套接字。
.channel(NioServerSocketChannel.class)
设置底层编程模型或者说底层通信模式,一旦设置中途不允许更改。所谓的底层编程模型,其实就是JDK的BIO,NIO模型(Netty摈弃了JDK的AIO编程模型),除此之外Netty还提供了自己编写的Epoll模型,当然日常工作中是用最多的还是NIO模型。
childHandler方法主要作用是初始化和定义处理链来处理请求处理的细节。在案例代码当中我们添加了Netty提供的字符串解码handler(StringDecoder)和由Netty实现的SimpleChannelInboundHandler简易脚手架,脚手架中自定义的处理逻辑为打印客户端发送的请求数据。
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println(msg);
}
});
}
})
Handler负责处理一个I/O事件或拦截一个I/O操作,处理完成将其转发给其ChannelPipeline中的下一个处理Handler,以此形成经典的处理链条。 比如案例里面StringDecoder解码处理数据之后将会交给SimpleChannelInboundHandler的channelRead0方法,该方法中将解码读取到的数据打印到控制台。
借助pipeline,我们可以定义连接收到请求后续的数据读写细节和处理逻辑。为了方便理解,这里可以认为NIoSocketChanne对应BIO编程模型的Socket套接字 ,NioServerSocketChannel对应BIO编程模型的ServerSocket。
.bind(8000)
bind操作是一个异步方法,它会返回ChannelFuture,服务端编码中可以通过添加监听器方式,自定义在Netty服务端启动回调通知之后的下一步处理逻辑,当然也可以完全不关心它是否启动继续往下执行其他业务代码的处理。
Netty的ChannelFuture类注释中有一个简单直观的例子介绍ChannelFuture的使用。
// GOOD
Bootstrap b = ...;
// Configure the connect timeout option.
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
ChannelFuture f = b.connect(...);
f.awaitUninterruptibly();
// Now we are sure the future is completed.
assert f.isDone();
if (f.isCancelled()) {
// Connection attempt cancelled by user
} else if (!f.isSuccess()) {
f.cause().printStackTrace();
} else {
// Connection established successfully
}
这个过程类似外面员把外卖送到指定地点之后打电话通知我们。
第一个案例是通过服务端启动失败自动递增端口号重新绑定端口。
服务端启动必须要关心的问题是指定的端口被占用导致启动失败的处理,这里的代码实践是利用Netty的API完成服务端端口在检测到端口被占用的时候自动+1重试绑定直到所有的端口耗尽。
实现代码如下:
public class NettyServerStart {
public static void main(String[] args) {
ServerBootstrap serverBootstrap = new ServerBootstrap();
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
int port = 10022;
serverBootstrap
.group(boss, worker)
.channel(NioServerSocketChannel.class)
.handler(new ChannelInitializer() {
home.php?mod=space&uid=1892347
protected void initChannel(Channel ch) throws Exception {
// 指定服务端启动过程的一些逻辑
System.out.println("服务端启动当中");
}
})
// 指定自定义属性,客户端可以根据此属性进行一些判断处理
// 可以看作给Channel维护一个Map属性,这里的channel是服务端
// 允许指定一个新创建的通道的初始属性。如果该值为空,指定键的属性将被删除。
.attr(AttributeKey.newInstance("hello"), "hello world")
// 给每个连接指定自定义属性,Channel 进行属性指定等
// 用给定的值在每个 子通道 上设置特定的AttributeKey。如果该值为空,则AttributeKey将被删除。
// 区别是是否是 子channel,子Channel代表给客户端的连接设置
.childAttr(AttributeKey.newInstance("childAttr"), "childAttr")
// 客户端的 Channel 设置TCP 参数
// so_backlog 临时存放已完成三次握手的请求队列的最大长度,如果频繁连接可以调大此参数
.option(ChannelOption.SO_BACKLOG, 1024)
// 给每个连接设置TCP参数
// tcp的心跳检测,true为开启
.childOption(ChannelOption.SO_KEEPALIVE, true)
// nagle 算法开关,实时性要求高就关闭
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.err.println(msg);
}
});
}
});
bind(serverBootstrap, port);
}
/**
* 自动绑定递增端口
* home.php?mod=space&uid=952169 serverBootstrap
* @param port
*/
public static void bind(ServerBootstrap serverBootstrap, int port){
serverBootstrap.bind(port).addListener(future -> {
if(future.isSuccess()){
System.out.println("端口绑定成功");
System.out.println("绑定端口"+ port +"成功");
}else{
System.out.println("端口绑定失败");
bind(serverBootstrap, port+1);
}
});
}
}
详细介绍和解释API个人认为意义不大,这里仅仅对于常用的API进行解释:
客户端的启动代码如下。
public static void main(String[] args) throws InterruptedException {
Bootstrap bootstrap = new Bootstrap();
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
// 引导器引导启动
bootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast(new StringEncoder());
}
});
// 建立通道
Channel channel = bootstrap.connect("127.0.0.1", 8000).channel();
while (true){
channel.writeAndFlush(new Date() + " Hello world");
Thread.sleep(2000);
}
}
客户端代码最主要的三个关注点是:线程模型、IO模型、IO业务处理逻辑,其他代码和服务端的启动比较类似。这里依旧是从上往下一条条分析代码。
客户端连接不需要监听端口,为了和服务端区分直接被叫做Bootstrap,代表客户端的启动引导器。
Bootstrap bootstrap = new Bootstrap();
Netty中客户端也同样需要设置线程模型才能和服务端正确交互,客户端的NioEventLoopGroup同样可以看作是线程池,负责和服务端的数据读写处理。
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
客户端 group 线程池的设置只需要一个即可,因为主要目的是和服务端建立连接(只需要一个线程即可)。
.group(eventExecutors)
和服务端设置同理,作用是底层编程模型的设置。官方注释中推荐使用NIO / EPOLL / KQUEUE这几种,使用最多的是NIO。
.channel(NioSocketChannel.class)
这里比较好奇如果用OIO模型的客户端连接NIO的服务端会怎么样?于是做了个实验,把如下代码改为OioServerSocketChannel(生产禁止使用,此方式已被Deprecated),启动服务端之后启动客户端即可观察效果。
.channel(OioServerSocketChannel.class)
从实验结果来看,显然不允许这么干。
15:24:00.934 [main] WARN io.netty.bootstrap.Bootstrap - Unknown channel option 'SO_KEEPALIVE' for channel '[id: 0xd0aaab57]'
15:24:00.934 [main] WARN io.netty.bootstrap.Bootstrap - Unknown channel option 'TCP_NODELAY' for channel '[id: 0xd0aaab57]'
上文介绍服务端的时候提到过handler()代表服务端启动过程当中的逻辑,在这里自然就表示客户端启动过程的逻辑,客户端的handler()可以直接看作服务端引导器当中的childHandler()。
这里读者可能会好奇为什么客户端代码用childHandler呢?答案是Netty为了防止使用者误解Bootstrap中只有handler,所以我们可以直接等同于服务端的childHandler()。
吐槽:这个child不child的API名称看的比较蛋疼,不加以区分有时候确实容易用错。这里生活化理解服务端中的childHandler是身上带了连接,所以在连接成功之后会调用,没有child则代表此时没有任何连接,所以会发送在初始化的时候调用。
而客户端为什么只保留 handler() 呢?个人理解是客户端最关注的是连接上服务端之后所做的处理,增加初始化的时候做处理没啥意义,并且会导致设计变复杂。
handler内部是对于Channel进行初始化并且添加pipline自定义客户端的读写逻辑。这里同样添加Netty提供的StringEncoder默认会是用字符串编码模式对于发送的数据进行编码处理。
channel.pipeline().addLast(new StringEncoder());
ChannelInitializer可以直接类比SocketChannel。
当配置都准备好之后,客户端的最后一步是启动客户端并且和服务端进行TCP三次握手建立连接。这里方法会返回Channel对象,Netty的connect支持异步连接。
Channel channel = bootstrap.connect("127.0.0.1", 8000).channel();
再次强调,connect 是一个异步方法,同样可以通过给返回的channel对象调用addListner添加监听器,在Netty的客户端成功和服务端建立连接之后会回调相关方法告知监听器所有数据准备完成,此时可以在监听器中添加回调之后的处理逻辑。
我们还可以用监听器对于连接失败的情况做自定义处理逻辑,比如下面例子将会介绍利用监听器实现客户端连接服务端失败之后,定时自动重连服务端多次直到重连次数用完的例子。
第二个实践代码是客户端在连接服务端的时候进行失败重连。失败重连在网络环境较差的时候十分有效,但是需要注意这里的代码中多次重试会逐渐增加时间间隔。
客户端失败重连的整体代码如下:
private static void connect(Bootstrap bootstrap, String host, int port, int retry) {
bootstrap.connect(host, port).addListener(future -> {
if (future.isSuccess()) {
System.out.println(new Date() + ": 连接成功,启动控制台线程……");
Channel channel = ((ChannelFuture) future).channel();
startConsoleThread(channel);
} else if (retry == 0) {
System.err.println("重试次数已用完,放弃连接!");
} else {
// 第几次重连
int order = (MAX_RETRY - retry) + 1;
// 本次重连的间隔
int delay = 1 << order;
System.err.println(new Date() + ": 连接失败,第" + order + "次重连……");
bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit
.SECONDS);
}
});
}
private static void startConsoleThread(Channel channel) {
ConsoleCommandManager consoleCommandManager = new ConsoleCommandManager();
LoginConsoleCommand loginConsoleCommand = new LoginConsoleCommand();
Scanner scanner = new Scanner(System.in);
new Thread(() -> {
while (!Thread.interrupted()) {
if (!SessionUtil.hasLogin(channel)) {
loginConsoleCommand.exec(scanner, channel);
} else {
consoleCommandManager.exec(scanner, channel);
}
}
}).start();
}
加入失败重连代码之后,客户端的启动代码需要进行略微调整,在链式调用中不再使用直接connection,而是传递引导类和相关参数,通过递归的方式实现失败重连的效果:
connect(bootstrap, "127.0.0.1", 10999, MAX_RETRY);
对应源码定义如下:
public static final ChannelOption<Boolean> SO_KEEPALIVE = valueOf("SO_KEEPALIVE");
主要关联TCP底层心跳机制。SO_KEEPALIVE用于开启或者关闭保活探测,默认情况下是关闭的。当SO_KEEPALIVE开启时,可以保持连接检测对方主机是否崩溃,避免(服务器)永远阻塞于TCP连接的输入。
这个参数的含义是:是否开启Nagle算法。首先需要注意这个参数和Linux操作系统的默认值不一样,true 传输到Linux是关闭调Nagle算法。
Nagele算法的出现和以前的网络带宽资源有限有关,为了尽可能的利用网络带宽,TCP总是希望尽可能的发送足够大的数据,Nagle算法就是为了尽可能发送大块数据,避免网络中充斥着许多小数据块。
为了理解Nagle算法,我们需要了解TCP的缓冲区通常会设置MSS参数。
MSS参数:除去 IP 和 TCP 头部之后,一个网络包所能容纳的 TCP 数据的最大长度; 最大 1460。
MTU:一个网络包的最大长度,以太网中一般为 1500 字节;
为什么最大为1460个字节?因为TCP传输过程中都会要求绑定 TCP 和 IP 的头部信息,这样服务端才能回送ACK确认收到包正确。
也就是说传输大数据包的时候,数据会按照MSS的值进行切割。回到Nagle算法,它的作用就是定义任意时刻只能有一个未被ACK确认的小段(MSS对应切割的一个块)。
这就意味着当有多个未被ACK确认的小段的时候,此时client端会小小的延迟一下等待合并为更大的数据包才发送。
Netty 默认关闭了这个算法,意味着一有数据就理解发送,满足低延迟和高并发的设计。
TCP_NODELAY 配置选项定义如下:
public static final ChannelOption<Boolean> TCP_NODELAY = valueOf("TCP_NODELAY");
此参数的配置介绍可以从SocketChannelConfig关联的配置中获取。
/**
* Gets the {home.php?mod=space&uid=282837 StandardSocketOptions#TCP_NODELAY} option. Please note that the default value of this option
* is {home.php?mod=space&uid=295005 true} unlike the operating system default ({@code false}). However, for some buggy platforms, such as
* Android, that shows erratic behavior with Nagle's algorithm disabled, the default value remains to be * {@code false}.
*/
boolean isTcpNoDelay();
注释翻译如下。
获取 StandardSocketOptions.TCP_NODELAY 配置。请注意,该选项的默认值为 true,与操作系统的默认值(false)不同。然而,对于一些有问题的平台,比如Android,在禁用Nagle算法的情况下会出现不稳定的行为,默认值仍然为false。
表示连接超时时间,单位为毫秒。
本部分可以参考作者代码,这里仅仅用笔记归档一下大致代码编写思路。
https://github.com/lightningMan/flash-netty
答案是添加监听器,在监听到客户端连接成功之后直接主动推送自定义信息。
初学者比较容易困扰的问题。handler()和childHandler()的主要区别是:handler()是发生在初始化的时候,childHandler()是发生在客户端连接之后。
“知其所以然”的部分放到后续的源码分析笔记当中,这里暂时跳过,初次阅读只需要记住结论即可。
本质上都是客户端和服务端进行网络通信的连接的一种抽象,但是使用上有不小的区别。下面的内容摘录自参考资料:
Socket、SocketChannel区别:
https://www.cdsy.xyz/computer/network/230716/cd44700.html
Netty Channel的理解: