NIO学习笔记(一)Buffer初探

Posted by Lain on 03-05,2020

leimu

概述

由于疫情的原因,加上梯子全炸了,所以自己想造点轮子来玩玩,在家里学习了一直想学习的Nio,网络相关知识,在这里整理一些笔记。

之前也断断续续接触过Nio,但是每次都很难坚持学下去,一方面是工作中目前用不上,学了也会忘记,另一方面是Java这套原生的NIO API设计上不是很好用,学习使用难度比较大...所以才会有Netty这种框架来降低我们使用NIO的上手难度。但是光会用而不了解底层原理也是不行的,所以还是坚持学习一下吧。

JavaNio有两种叫法,有人说是New IO,对应之前旧的io包,还有一种是 Non-Blocking IO,对应它的非阻塞特性 。
NIO有三大基本组件:

  • Buffer
  • Channel
  • Selector

今天就先从buffer开始了解。

Buffer

Buffer顾名思义,就是一个缓冲区,内部维护了一个缓冲数组,NIO中的所有读写操作,实际上都是在操作Buffer,可以说Buffer就是NIO的核心。JDK中,所有的基本类型(boolean除外)都有其对应的Buffer实现。

Buffer有三个重要的成员变量:

  • position:下标
  • limit:限制大小
  • capacity:容量

以及一个重要的函数,flip()
这些要素共同构成了Buffer的核心特性:既能读又能写。
Buffer有两种状态:读状态与写状态,每当调用flip函数时,这两个状态之间会发生翻转。
这个有些难以理解,用实际例子说明一下。
上面提到了所有基本类型都有其对应的Buffer实现,通过源代码可以知道,这些实现都是抽象类,需要通过调用其allocate(int)方法来创建实例。下面用LongBuffer来说明一下。
当我们要实例化一个Buffer时:

	LongBuffer buffer = LongBuffer.allocate(10);

这样我们就获得了一个容量(capacity)为10的LongBuffer。
所谓的容量(capacity),指的是这个buffer能存储的元素上限,也是底层数组的大小。
那么在调用LongBuffer.allocate(10)的时候发生了什么呢?
首先capacity,limit的值都会被初始化为10,position会被置为0(还有一个mask值会被置为-1,这里不做过多赘述)。
接下来我们往这个buffer里面写数据:

	LongBuffer buffer = LongBuffer.allocate(10);
        for(int i = 0;i<5;i++){
            buffer.put(i);
        }

每当一个元素被put进buffer的时候,position的值都会+1。当position大于limit值的时候,将会抛出BufferOverflowException
当我们想要从buffer里面取出数据的时候,需要调用一次flip() 方法,将状态翻转。
这个方法会将position的值赋予limit,然后将position的值归零(同时将mask值置为-1,也就是初始化时的状态)。
没错,就只做了这么简单的两个操作。这个时候就可以调用buffer的get系列函数,从buffer中将这些元素按写入时的顺序读取出来,如下列代码所示:

        LongBuffer buffer = LongBuffer.allocate(10);
        for(int i = 0;i<5;i++){
            buffer.put(i);
        }
        buffer.flip();
        while (buffer.hasRemaining()){
            System.out.println(buffer.get());
        }

每当调用get() 函数时,position的值都会+1,直到postion值超过limit,将会抛出BufferUnderflowException
很显然,flip的这个重置,将postion移动到了最开始的地方,从而可以按照写入时的顺序将元素读出,又由于limit的限制,正好可以将写入buffer的所有元素全部读完,可以说是很巧妙了。上述代码中的hasRemaining() 方法,其实就是对position和limit的值的大小做了判断,可以来看看源代码:

    /**
     * Tells whether there are any elements between the current position and
     * the limit.
     *
     * @return  <tt>true</tt> if, and only if, there is at least one element
     *          remaining in this buffer
     */
    public final boolean hasRemaining() {
        return position < limit;
    }

那么有同学就会问了,在第一次写入元素的时候是将limit的值作为可写入上限,这点没错,假设我本次没有将buffer写满,比如我压根没写数据,当翻转之后,limit就移动到postion位置了,两个都为0,如果这个时候想要再次往buffer里面写数据,岂不是直接报错了?
没错,就是这样。如果每次都没写满就翻转buffer,limit的值不管怎么翻转,可写入的容量都只会越来小。极端点直接为0都是正常的,这就是NIO的设计。
那有没有一种方法可以让它恢复容量呢?
有的,那就是clear() 方法
它的源代码注释也很有意思:

    /**
     * Clears this buffer.  The position is set to zero, the limit is set to
     * the capacity, and the mark is discarded.
     *
     * <p> Invoke this method before using a sequence of channel-read or
     * <i>put</i> operations to fill this buffer.  For example:
     *
     * <blockquote><pre>
     * buf.clear();     // Prepare buffer for reading
     * in.read(buf);    // Read data</pre></blockquote>
     *
     * <p> This method does not actually erase the data in the buffer, but it
     * is named as if it did because it will most often be used in situations
     * in which that might as well be the case. </p>
     *
     * @return  This buffer
     */
    public final Buffer clear() {
        position = 0;
        limit = capacity;
        mark = -1;
        return this;
    }

划重点,fill this buffer。我目前的理解是是为了能够尽可能的利用buffer底层分配的数组空间,尽量的填满buffer(不一定要填满),所以调用read之前可以使用clear将buffer重置。这里先不做过多解读,先mark一下,我感觉这个注释关系到了NIO的一些重要特性,之后回过头来再进行补充。
总之这个函数会将Buffer的limit值重置回capacity大小,使得这个buffer能被充分利用。注释中也写了,虽然函数名叫clear,但是并不会实际上的清空缓冲数组中的内容,buffer的写入本质上就是移动下标,不断的对原有数组进行覆盖的过程。因此如果是通过以下方式创建的buffer,不建议在外部直接操作缓冲数组,否则很容易造成数据混乱:

	...
        byte[] bytes = new byte[1024];
        ByteBuffer buffer2 = ByteBuffer.wrap(bytes);
	...

warp(byte[]) 函数支持传入一个数组作为缓冲数组,通过注释我们可以了解到,通过这种方式创建的buffer,limit一开始就会被置为数组的length。因此这个buffer从创建时就可以读取原数组里的所有数据。文档中也说明了对buffer中缓冲数组的改动会导致原数组的改动,反之亦然。很好理解,因为传递的只是数组的引用,稍微有常识的人都不会过多纠结。

    /**
     * Wraps a byte array into a buffer.
     *
     * <p> The new buffer will be backed by the given byte array;
     * that is, modifications to the buffer will cause the array to be modified
     * and vice versa.  The new buffer's capacity and limit will be
     * <tt>array.length</tt>, its position will be zero, and its mark will be
     * undefined.  Its {@link #array backing array} will be the
     * given array, and its {@link #arrayOffset array offset>} will
     * be zero.  </p>
     *
     * @param  array
     *         The array that will back this buffer
     *
     * @return  The new byte buffer
     */
    public static ByteBuffer wrap(byte[] array) {
        return wrap(array, 0, array.length);
    }

warp还有一个重载方法:

wrap(byte[] array,int offset, int length)

这个没什么好说的,只是支持传入一个起点终点,限制了原数组在buffer中的初始取值范围作为数据来源而已。

通过上面的几个api的了解,基本上可以对Buffer有一定的认识了,其余的一些比较重要的API咱们下篇文章再说。

Nio文件读写

NIO作为一套用于IO的API,自然可以进行文件的读写,需要使用文件通道Channel搭配缓冲区Buffer来进行。
Channel是双向的,和OutputStream、InputStream不一样,这两者要么只能写要么只能读,而Channel一旦建立后,既可以写,又可以读。这种模型更能体现出操作系统底层的特性,比如Linux操作系统的通道就是双向的。
Nio对于文件的读写,其实并不是对Channel直接进行读写,读写操作永远都是通过Buffer来进行的。

下面是Java NIO中最重要的通道的实现:(摘自并发编程网)

  • FileChannel 从文件中读写数据(只能处于阻塞模式下)。
  • DatagramChannel 能通过UDP读写网络中的数据。
  • SocketChannel 能通过TCP读写网络中的数据。
  • ServerSocketChannel可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。

使用上相比传统IO稍微繁琐一些,我们直接看代码。

NIO文件读取

    private static void read() throws IOException {
        FileInputStream inputStream = new FileInputStream("lain_in.txt");

        FileChannel inputStreamChannel = inputStream.getChannel();
        ByteBuffer buffer = ByteBuffer.allocate(10);
        while (true){
            buffer.clear();//重置buffer
            int read = inputStreamChannel.read(buffer);
            if(read == -1){
                break;
            }
            buffer.flip();
            while (buffer.hasRemaining()){
                System.out.print(buffer.get());
            }
        }
        inputStream.close();
    }

NIO文件写入

    private static void write() throws IOException {
        FileOutputStream outputStream = new FileOutputStream("lain_out.txt");
        FileChannel outputStreamChannel = outputStream.getChannel();
        String text = "Lain!!!!!!!!!!!!!!!!\n" +
                "Let's All Love Lain!\n" +
                "Let's All Love Lain!\n" +
                "Let's All Love Lain!\n" +
                "Let's All Love Lain!\n" +
                "Let's All Love Lain!\n" +
                "Let's All Love Lain!\n" +
                "Let's All Love Lain!\n";
        ByteBuffer buffer = ByteBuffer.allocate(text.length());
        buffer.put(text.getBytes());
        buffer.flip();
        outputStreamChannel.write(buffer);
        outputStream.close();
    }

堆外内存与零拷贝

堆外内存是独立于JVM管控范围的一块独立的内存,因此它不会被GC回收。
由于JVM中的用户线程无法直接接触到硬件,只能在JVM的管理范围内开辟内存空间,比如向外写出文件,我们将要写出的内容的字节数组在用户空间的内存中准备好了,如果操作系统直接使用这块内存空间,会导致一个问题,如果一旦这个时候发生了垃圾回收,绝大多数垃圾回收算法是会将内存碎片压缩重排的,为了腾出更多的连续内存空间供大体积对象使用,这个时候堆内的内存地址会发生天翻地覆的变动,对于我们的IO操作而言,要写出或读入的数据就全乱套了。

那么可不可以在IO发生的时候不进行内存回收呢?显然是不行的,因为这样很容易发生OOM异常,读写文件本身就是大体积对象,很容易频繁触发GC,对性能影响很大。

在传统的IO中,其实会多出一个拷贝的步骤,程序会将JVM中用户空间的数据,复制一份到堆外,在堆外和IO设备进行交互。在读取时也是一样,会将硬盘中的数据通过DMA(Direct Memery Access,直接内存访问)先读到内核空间,然后再复制进堆中,供程序使用。在这个拷贝的过程中,JVM不允许发生CG行为,这是虚拟机层面的保障,我们一般认为,相比于IO的速度来说,在内存中的拷贝是很快的,是可以允许的,所以传统IO中,其实是伴随着堆内内存与堆外内存的拷贝。

虽然这样解决了IO时,GC所产生的问题,但是上面也说过,IO往往挺吃内存的,咱们在平时读写文件的时候,创建的缓冲数组是不是都是随手1024*n起步的?这种大对象很容易引起GC,导致IO效率变慢。那么有没有一种方法,可以避免这种情况呢?

当然有,既然都是内存,为什么要复制来复制去,咱们直接使用堆外内存不就好了...
于是Buffer中出现了这么一个函数:

ByteBuffer.allocateDirect(int )
它的源代码如下:

    public static ByteBuffer allocateDirect(int capacity) {
        return new DirectByteBuffer(capacity);
    }

可以看到和allocate函数不同,allocate函数返回的是一个HeapByteBuffer。
这个方法在使用起来,和allocate(int) 好像并没有什么直观上的区别,毕竟只是内存所在位置不一样。但通过这个方法开辟的内存空间,是直接在堆外的,通过内存地址引用连接到堆内,使得我们可以通过ByteBuffer封装的各种API来操作这块内存区域,避免了直接在堆内存中开辟大空间引发的GC问题,同时也免除了内存拷贝所花费的时间,实现了零拷贝。
在Buffer类中,有这么一个成员变量:

// Used only by direct buffers
// NOTE: hoisted here for speed in JNI GetDirectBufferAddress
long address;

这个变量就是用来存储堆外内存地址的,这块内存不会受GC影响,所以地址值是不会发生改变的。这里有一点很有意思,要知道开辟堆外内存实际上是DirectByteBuffer的特性,按照OOP原则,这个参数不应该存在于它的最顶层的父类里面(Object除外),而是应该单独放在DirectByteBuffer自己里面才对。注释中解释了原因,把它放在这里是为了提高JNI调用效率。可以说是很看重速度了,毫秒必争。

那么问题又来了,它直接在堆外内存中开辟空间,这块空间又不受JVM的管辖,用完了之后谁来释放它?
上面我们知道,DirectBuffer是持有堆外内存的地址的,释放的操作自然可以由它来完成。
在源代码中可以看到DirectByteBuffer中有这么个类,继承了Runnable接口,有一个类型为Cleaner的成员变量。
实际上这是Java新引入,用来代替finalize的Cleaner机制,之后有空再详细介绍,可以参考这篇文章,在对象被回收之前,Reference Handle会启动一个线程,来执行这段回收逻辑。
private static class Deallocator
implements Runnable
{

    private static Unsafe unsafe = Unsafe.getUnsafe();

    private long address;
    private long size;
    private int capacity;

    private Deallocator(long address, long size, int capacity) {
        assert (address != 0);
        this.address = address;
        this.size = size;
        this.capacity = capacity;
    }

    public void run() {
        if (address == 0) {
            // Paranoia
            return;
        }
        unsafe.freeMemory(address);
        address = 0;
        Bits.unreserveMemory(size, capacity);
    }

}

好了,关于Buffer就记录到这里。其实还有很多API也值得一提,之后再补充吧