API设计更建议实战过程中逐渐了解熟悉掌握,本文记录基础设计和相关API,只需要大致了解ByteBuf设计思想即可。
整个ByteBuf的数据结构组成如下,整个设计思想有点类似计算机如何实现从北京到上海,那就是一段足够长的铁轨,不断“拆掉”后面的铁轨放到前面的铁轨上,这样实现火车一直在铁轨上跑的错觉。
实践容量API之前,我们先构建ByteBuf。
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);
print("allocate ByteBuf(9,100) => {} \n", buffer);
// allocate ByteBuf(9,100) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 9/100)
表示 ByteBuf 底层占用了多少字节的内存(包括丢弃的字节、可读字节、可写字节),不同的底层实现机制有不同的计算方式,后面我们讲 ByteBuf 的分类的时候会讲到。
从案例看出,默认创建的方式容量为初始化指定的容量。
print("allocate ByteBuf(9,100) capacity => {} \n", buffer.capacity());
// allocate ByteBuf(9,100) capacity => 9
表示 ByteBuf 底层最大能够占用多少字节的内存,当向 ByteBuf 中写数据的时候,如果发现容量不足,则进行扩容,直到扩容到 maxCapacity,超过这个数,就抛异常。
从案例可以得知,如果扩容到 100 就会报错
print("allocate ByteBuf(9,100) maxCapacity => {} \n", buffer.maxCapacity());
// allocate ByteBuf(9,100) maxCapacity => 100
readableBytes() 表示 ByteBuf 当前可读的字节数,它的值等于writerIndex-readerIndex,如果两者相等,则不可读,isReadable() 方法返回 false。
// readableBytes() 与 isReadable()
print("allocate ByteBuf(9,100) isReadable => {} \n", buffer.isReadable());
print("allocate ByteBuf(9,100) readableBytes => {} \n", buffer.readableBytes());
// allocate ByteBuf(9,100) isReadable => false
// allocate ByteBuf(9,100) readableBytes => 0
// write 方法改变写指针
buffer.writeBytes(new byte[]{1, 2, 3, 4});
// 改变写指针 writeBytes(new byte[]{1,2,3,4}) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 4, cap: 9/100)
// 写入数据之后,重新执行readableBytes() 与 isReadable()
print("allocate ByteBuf(9,100) isReadable => {} \n", buffer.isReadable());
print("allocate ByteBuf(9,100) readableBytes => {} \n", buffer.readableBytes());
//allocate ByteBuf(9,100) isReadable => true
//allocate ByteBuf(9,100) readableBytes => 4
writableBytes() 表示 ByteBuf 当前可写的字节数,它的值等于capacity-writerIndex,如果两者相等,则表示不可写,isWritable() 返回 false。
注意这个时候,并不代表不能往 ByteBuf 中写数据了, 如果发现往 ByteBuf 中写数据写不进去的话,Netty 会自动扩容 ByteBuf,直到扩容到底层的内存大小为 maxCapacity。
maxWritableBytes() 就表示可写的最大字节数,它的值等于maxCapacity-writerIndex。
在初始化构建过程中,由于没有读写任何数据,可以看到他们的值基本和前面计算的容量是一致的。
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);
// writableBytes()、 isWritable() 与 maxWritableBytes()
print("allocate ByteBuf(9,100) writableBytes => {} \n", buffer.writableBytes());
print("allocate ByteBuf(9,100) isWritable => {} \n", buffer.isWritable());
print("allocate ByteBuf(9,100) maxWritableBytes => {} \n", buffer.maxWritableBytes());
// allocate ByteBuf(9,100) writableBytes => 9
// allocate ByteBuf(9,100) isWritable => true
// allocate ByteBuf(9,100) maxWritableBytes => 100
实践读写指针相关的 API之前,我们先构建初始化ByteBuf。
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);
print("allocate ByteBuf(9,100) => {} \n", buffer);
在没有写入任何数据的时候,读指针为0。
// readerIndex() 返回当前读指针
print("allocate ByteBuf(9,100) readerIndex => {} \n", buffer.readerIndex());
// allocate ByteBuf(9,100) readerIndex => 0
下面的案例说明读指针不能越过写指针的界限。
// 尝试手动重定向渎指针位置
// print("allocate ByteBuf(9,100) readerIndex(int) => {} \n", buffer.readerIndex(2));
// readerIndex: 2, writerIndex: 0 (expected: 0 <= readerIndex <= writerIndex <= capacity(9))
我们写入一些数据之后,再进行读指针重定向。
buffer.writeBytes(new byte[]{1, 2, 3, 4});
// 重定向读指针
print("allocate ByteBuf(9,100) readerIndex(int) => {} \n", buffer.readerIndex(2));
print("重定向读指针 之后 (new byte[]{1,2,3,4}) => {} \n", buffer);
// allocate ByteBuf(9,100) readerIndex(int) => PooledUnsafeDirectByteBuf(ridx: 2, widx: 4, cap: 9/100)
// 重定向读指针 之后 (new byte[]{1,2,3,4}) => PooledUnsafeDirectByteBuf(ridx: 2, widx: 4, cap: 9/100)
案例以初始化写入四个字节之后作为开始。
// writeIndex() 与 writeIndex(int)
print("allocate ByteBuf(9,100) writerIndex => {} \n", buffer.writerIndex());
print("allocate ByteBuf(9,100) writerIndex(int) => {} \n", buffer.writerIndex(2));
// allocate ByteBuf(9,100) writerIndex => 4
// allocate ByteBuf(9,100) writerIndex(int) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 2, cap: 9/100)
区别:
下面两段代码是等价的。
// 代码片段1
int readerIndex = buffer.readerIndex();
// … 其他操作
buffer.readerIndex(readerIndex);
// 代码片段二
// (不需要自己定义变量,推荐使用)
buffer.markReaderIndex();
// … 其他操作
// resetReaderIndex() 可以恢复到之前状态
// (解析自定义协议的数据包常用)
buffer.resetReaderIndex();
希望大家多多使用代码片段二这种方式,不需要自己定义变量,无论 buffer 当作参数传递到哪里,调用 resetReaderIndex() 都可以恢复到之前的状态,在解析自定义协议的数据包的时候非常常见,推荐大家使用这一对 API markWriterIndex() 与 resetWriterIndex() 这一对 API 的作用与上述一对 API 类似
实践读写API之前,我们先构建ByteBuf。
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);
print("allocate ByteBuf(9,100) => {} \n", buffer);
上面的代码
writeBytes() 表示把字节数组 src 里面的数据全部写到 ByteBuf。注意此方法执行之后,会移动前面介绍的 writeIndex 写指针。
// write 方法改变写指针
buffer.writeBytes(new byte[]{1, 2, 3, 4});
print("改变写指针 writeBytes(new byte[]{1,2,3,4}) => {} \n", buffer);
// 改变写指针 writeBytes(new byte[]{1,2,3,4}) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 4, cap: 9/100)
readBytes()指的是把 ByteBuf 里面的数据全部读取到 dst。
//只有read方法改变指针
byte[] bytes = new byte[buffer.readableBytes()];
buffer.readBytes(bytes);
print("bytes 内容 => {}", bytes);
// bytes 内容 =>
这里 dst 字节数组的大小通常等于 readableBytes(),而 src 字节数组大小的长度通常小于等于writableBytes()。
writeByte() 表示往 ByteBuf 中写一个字节,而 buffer.readByte() 表示从 ByteBuf 中读取一个字节,类似的 API 还有writeBoolean()、writeChar()、writeShort()、writeInt()、writeLong()、writeFloat()、writeDouble() 与 readBoolean()、readChar()、readShort()、readInt()、readLong()、readFloat()、readDouble()这里就不一一赘述。
// write 方法改变写指针
buffer.writeBytes(new byte[]{1, 2, 3, 4});
print("改变写指针 writeBytes(new byte[]{1,2,3,4}) => {} \n", buffer);
// 改变写指针 writeBytes(new byte[]{1,2,3,4}) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 4, cap: 9/100)buffer.writeByte(5);
print("改变写指针 buffer.writeByte(5) => {} \n", buffer);
// 改变写指针 buffer.writeByte(5) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 5, cap: 9/100)
getBytes、getByte() 与 setBytes()、setByte() 系列,唯一的区别就是 get/set 不会改变读写指针,而 read/write 会改变读写指针。
//只有read方法改变指针
byte[] bytes = new byte[buffer.readableBytes()];
buffer.readBytes(bytes);
print("buffer.readBytes(bytes) => {}\n", buffer);
// buffer.readBytes(bytes) => PooledUnsafeDirectByteBuf(ridx: 10, widx: 10, cap: 16/100)
由于 Netty 使用了堆外内存,而堆外内存是不被 jvm 直接管理的,也就是说申请到的内存无法被垃圾回收器直接回收,所以需要我们手动回收。
Netty 的 ByteBuf 是通过引用计数的方式管理的,如果一个 ByteBuf 没有地方被引用到,就需要回收底层内存。
默认情况下,当创建完一个 ByteBuf,它的引用为1,然后每次调用 retain() 方法, 它的引用就加一, release() 方法原理是将引用计数减一,减完之后如果发现引用计数为0,则直接回收 ByteBuf 底层的内存。
最后是简单测试程序整合前面的API案例。
public class ByteBufTest {
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);
print("allocate ByteBuf(9, 100)", buffer);
// write 方法改变写指针,写完之后写指针未到 capacity 的时候,buffer 仍然可写
buffer.writeBytes(new byte[]{1, 2, 3, 4});
print("writeBytes(1,2,3,4)", buffer);
// write 方法改变写指针,写完之后写指针未到 capacity 的时候,buffer 仍然可写, 写完 int 类型之后,写指针增加4
buffer.writeInt(12);
print("writeInt(12)", buffer);
// write 方法改变写指针, 写完之后写指针等于 capacity 的时候,buffer 不可写
buffer.writeBytes(new byte[]{5});
print("writeBytes(5)", buffer);
// write 方法改变写指针,写的时候发现 buffer 不可写则开始扩容,扩容之后 capacity 随即改变
buffer.writeBytes(new byte[]{6});
print("writeBytes(6)", buffer);
// get 方法不改变读写指针
System.out.println("getByte(3) return: " + buffer.getByte(3));
System.out.println("getShort(3) return: " + buffer.getShort(3));
System.out.println("getInt(3) return: " + buffer.getInt(3));
print("getByte()", buffer);
// set 方法不改变读写指针
buffer.setByte(buffer.readableBytes() + 1, 0);
print("setByte()", buffer);
// read 方法改变读指针
byte[] dst = new byte[buffer.readableBytes()];
buffer.readBytes(dst);
print("readBytes(" + dst.length + ")", buffer);
}
private static void print(String action, ByteBuf buffer) {
System.out.println("after ===========" + action + "============");
System.out.println("capacity(): " + buffer.capacity());
System.out.println("maxCapacity(): " + buffer.maxCapacity());
System.out.println("readerIndex(): " + buffer.readerIndex());
System.out.println("readableBytes(): " + buffer.readableBytes());
System.out.println("isReadable(): " + buffer.isReadable());
System.out.println("writerIndex(): " + buffer.writerIndex());
System.out.println("writableBytes(): " + buffer.writableBytes());
System.out.println("isWritable(): " + buffer.isWritable());
System.out.println("maxWritableBytes(): " + buffer.maxWritableBytes());
System.out.println();
}
}
最后是个人的实验Test类。
/**
* byteBuf 的API测试
*/
public class ByteBufTest {
public static void main(String[] args) {
// 9 代表初始容量, 100代表最大容量
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);
print("allocate ByteBuf(9,100) => {} \n", buffer);
//allocate ByteBuf(9,100) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 9/100)
// write 方法改变写指针
buffer.writeBytes(new byte[]{1, 2, 3, 4});
print("改变写指针 writeBytes(new byte[]{1,2,3,4}) => {} \n", buffer);
// 改变写指针 writeBytes(new byte[]{1,2,3,4}) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 4, cap: 9/100)
// write 改变写指针,如果没有到达 capacity 依然可以写入,写入 int 之后写指针增加4
buffer.writeInt(12);
print("buffer.writeInt(12) => {}\n", buffer);
// buffer.writeInt(12) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 8, cap: 9/100)
// 继续改变写指针,当前写入等于 initialCapacity 这个初始值之后将不能继续写入
buffer.writeBytes(new byte[]{5});
print("writeBytes(new byte[]{5}) => {}\n", buffer);
// writeBytes(new byte[]{5}) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 9, cap: 9/100)
// 继续写入指针,此时发现 已经超过 initialCapacity 的值,此时会进行扩容
buffer.writeBytes(new byte[]{6});
print("writeBytes(new byte[]{6}) => {}\n", buffer);
// writeBytes(new byte[]{6}) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 10, cap: 16/100)
// get 方法调用之后不改变读指针
print("getByte(3) return => {}\n", buffer.getByte(3));
print("getShort(3) return => {}\n", buffer.getShort(3));
print("getInt(3) return => {}\n", buffer.getInt(3));
print("getChar(3) return => {}\n", buffer.getChar(3));
/*
getByte(3) return => 4 getShort(3) return => 1024 getInt(3) return => 67108864 getChar(3) return => Ѐ
* */
// set 方法不改变读写指针
buffer.setByte(buffer.readableBytes() + 1, 0);
print("setByte(buffer.readableBytes() + 1, 0) => {}\n", buffer);
// setByte(buffer.readableBytes() + 1, 0) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 10, cap: 16/100)
//只有read方法改变指针
byte[] bytes = new byte[buffer.readableBytes()];
buffer.readBytes(bytes);
print("buffer.readBytes(bytes) => {}\n", buffer);
// buffer.readBytes(bytes) => PooledUnsafeDirectByteBuf(ridx: 10, widx: 10, cap: 16/100)
print("buffer.readBytes(readBuffer); => {}\n", buffer);
ByteBuf readBuffer = ByteBufAllocator.DEFAULT.buffer(6, 6);
// 原始writeIndex要有足够空间可读
// buffer.writeBytes(new byte[]{5,1,1,1,1,1,1,1});
// buffer.writeBytes(new byte[]{5});
// readerIndex(10) + length(6) exceeds writerIndex(11): PooledUnsafeDirectByteBuf(ridx: 10, widx: 11, cap: 16/100) buffer.readBytes(readBuffer);
System.err.println(readBuffer.readableBytes());
// buffer.readBytes(readBuffer);
// readerIndex(10) + length(9) exceeds writerIndex(10): PooledUnsafeDirectByteBuf(ridx: 10, widx: 10, cap: 16/100)
}
}
比较简单的一个章节,主要介绍ByteBuf在Java中的使用,整个使用过程简单易懂十分清晰。