Java NIO基礎學習之Buffer、Channel和Selector概述

Buffer

一個 Buffer 本質上是內存中的一塊,我們可以將數據寫入這塊內存中,之後從這塊內存獲取數據。

java.nio 定義瞭以下幾個 Buffer 的實現。

其中最核心的是最後的 ByteBuffer,前面的一大串類隻是包裝瞭一下它而已,我們使用最多的通常也是 ByteBuffer。

我們應該將 Buffer 理解為一個數組,IntBuffer 、ChannelBuffer 、DoubleBuffer 等分別對應int[] 、char[] 、double[] 等。

操作 Buffer 和操作數組、類集差不多,隻不過大部分時候我們都把它放到瞭 NIO 的場景裡面來使用而已。下面是 Buffer 中的幾個重要屬性和幾個重要方法。

position 、limit 、capacity

就像數組有數組容量,每次訪問元素要指定下標,Buffer 中也有幾個重要屬性:position、limit、capacity。

最好理解的當然是 capacity,它代表這個緩沖區的容量,一旦設定就不可以更改。如 capacity 為 1024 的 IntBuffer,代表其一次可以存放 1024 個 int 類型的值。一旦 Buffer 的容量達到 capacity,需要清空 Buffer,才能重新寫入值。

position 和 limit 是變化的,我們分別看下讀和寫操作,他們是如何變化的。

position的初始值是0,每往 Buffer 中寫入一個值,position 就自動加 1,代表下一次的寫入位置。讀操作的時候也是類似的,每讀一個值,position 就自動加 1。

從寫操作模式到讀操作模式切換的時候(flip),position 都會歸零,這樣就可以從頭開始讀寫瞭。

Limit:寫操作模式下,limit 代表的是最大能寫入的數據,這個時候 limit 等於 capacity。寫結束後,切換到讀模式,此時的limit 等於 Buffer 中實際的數據大小,因為 Buffer 不一定被寫滿瞭。

初始化Buffer

每個 Buffer 實現類都提供瞭一個靜態方法 allocate(int capacity) 幫助我們快速實例化一個 Buffer。如:

ByteBuffer byteBuf = ByteBuffer.allocate(1024);
IntBuffer intBuf = IntBuffer.allocate(1024);
LongBuffer longBuf = LongBuffer.allocate(1024);

另外,我們經常使用 wrap 方法來初始化一個 Buffer。

public static ByteBuffer wrap(byte[] array) {
    ...
}

填充Buffer

各個 Buffer 類都提供瞭一些 put 方法用於將數據填充到 Buffer 中,如 ByteBuffer 中的幾個方法:

// 填充一個 byte 值
public abstract ByteBuffer put(byte b);
// 在指定位置填充一個 int 值
public abstract ByteBuffer put(int index, byte b);
// 將一個數組中的值填充進去
public final ByteBuffer put(byte[] src) {...}
public ByteBuffer put(byte[] src, int offset, int length) {...}

上述這些方法需要自己控制 Buffer 大小,不能超過 capacity,超過會拋出 java.nio.BufferOverflowException 異常。

對於 Buffer 來說,另一個常見的操作就是,我們要將來自 Channel 的數據填充到 Buffer 中,在系統層面,這個操作我們稱為讀操作,因為數據是從外部(档案或網絡)讀到內存中。

int num = channel.read(buf);

上述方法會返回從 Channel 中讀入到 Buffer 的數據大小。

提取Buffer中的值

前面介紹瞭寫操作,每寫入一個值,position 最後會指向最後一次寫入的位置的後面一個,如果 Buffer 寫滿瞭,那麼 position等於 capacity(position 從0開始)。

如果要讀 Buffer 中的值,需要切換模式,從寫入模式切換到讀出模式。註意,通常是在說 NIO 的讀操作的時候,我們說從Channel 中讀數據到 Buffer 中,對應的是對 Buffer 的寫入操作。

調用 Buffer 的 flip()方法,可以進行模式切換。其實這個方法也就是設置瞭一下 position 和 limit 值罷瞭。

public final Buffer flip() {
    limit = position; // 將 limit 設置為實際寫入的數據數量
    position = 0; // 重置 position 為 0
    mark = -1; // mark 之後再說
    return this;
}

對應寫入操作的一系列 put 方法,讀操作提供瞭一系列的 get 方法:

// 根據 position 來獲取數據
public abstract byte get();
// 獲取指定位置的數據
public abstract byte get(int index);
// 將 Buffer 中的數據寫入到數組中
public ByteBuffer get(byte[] dst)

附一個經常使用的方法:

new String(buffer.array()).trim();

當然瞭,除瞭將數據從 Buffer 取出來使用,更常見的操作是將我們寫入的數據傳輸到 Channel 中,通過 FileChannel 將數據寫入到档案中,通過 SocketChannel 將數據寫入到網絡發送到遠程機器等。對應的,這種操作,我們稱之為寫操作。

int num = channel.write(buf);

mark() &? reset()

除瞭 position、limit、capacity 這三個基本的屬性外,還有一個常用的屬性就是 mark。

mark 用於臨時保存 position 的值,每次調用 mark()方法都會將 mark 設值為當前 position,便於後續需要的時候使用。

public final Buffer mark() {
    mark = position;
    return this;
}

那到底什麼時候用呢?考慮以下場景,我們在 position 為 5 的時候,先 mark()一下,然後繼續往下讀,讀到第 10 的時候,我想重新回到 position 為 5 的地方重新來一遍,那隻要調用一下 reset()方法,position 就回到 5 瞭。

public final Buffer reset() {
    int m = mark;
    if (m < 0)
        throw new InvalidMarkException();
    position = m;
    return this;
}

rewind()& clear()& compact()

rewind():會重置 position 為 0 ,通常用於重新從頭讀寫 Buffer。

public final Buffer rewind() {
    position = 0;
    mark = -1;
    return this;
}

clear():有點重置 Buffer 的意思,相當於重新實例化瞭一樣。

通常,我們會先填充 Buffer,然後從 Buffer 讀取數據,之後我們再重新往裡填充新的數據,我們一般在重新填充之前先調用clear()。

public final Buffer clear() {
    position = 0;
    limit = capacity;
    mark = -1;
    return this;
}

compact():和 clear()一樣的是,它們都是在準備往 Buffer 填充新的數據之前調用。

前面說的 clear()方法會重置幾個屬性,但是我們要看到,clear()方法並不會將 Buffer 中的數據清空,也就相當於清空瞭數據瞭。

而 compact()方法有點不一樣,調用這個方法以後,會先處理還沒有讀取的數據,也就是 position 到 limit 之間的數據(還沒有讀過的數據),先將這些數據移到左邊,然後再這個基礎上在開始寫入。很明顯,此時,limit 還是等於 capacity,position 指向原來數據的右邊。

Channel

所有的 NIO 操作始於通道,通道時數據來源或數據寫入的目的地,主要地,我們將關心 java.nio 包中實現的以下幾個Channel:

 

FileChannel:档案通道,用於档案的讀和寫DatagrmChannel:用於 UDP 連接的接收和發送SocketChannel:把它理解為 TCP 連接通道,簡單理解就是 TCP 客戶端ServerSocketChannel:TCP 對應的服務端,用於監聽某個端口進來的請求

我們應該關註 SocketChannel 和 ServerSocketChannel 。

Channel 經常翻譯為通道,類似 IO 中的流,用於讀取和寫入。它與前面介紹的 Buffer 打交道,讀取操作的時候將 Channel 中的數據填充到 Buffer 中,而寫操作時將 Buffer 中的數據寫入到 Channel 中。

FileChannel

這裡簡單介紹下 FileChannel 的常用操作,FileChannel 是不支援非阻塞的。

初始化:

FileInputStream inputStream = new FileInputStream(new File("/data.txt"));
FileChannel fileChannel = inputStream.getChannel();

讀取档案內容:

ByteBuffer buffer = ByteBuffer.allocate(1024);
 
int num = fileChannel.read(buffer);

所有的 Channel 都是和 Buffer 打交道的。

寫入档案內容:

ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("隨機寫入一些內容到 Buffer 中".getBytes());
// Buffer 切換為讀模式
buffer.flip();
while(buffer.hasRemaining()) {
    // 將 Buffer 中的內容寫入档案
    fileChannel.write(buffer);
}

SocketChannel

我們可以將 SocketChannel 理解為一個 TCP 客戶端(有點狹隘)。

打開一個 TCP 連接:

SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("https://www.javadoop.com", 80));

上面的這行代碼等於下面的兩行:

// 打開一個通道
SocketChannel socketChannel = SocketChannel.open();
// 發起連接
socketChannel.connect(new InetSocketAddress("https://www.javadoop.com", 80));

SocketChannel 的讀寫和 FileChannel 沒有什麼區別,就是操作緩沖區。

// 讀取數據
socketChannel.read(buffer);
 
// 寫入數據到網絡連接中
while(buffer.hasRemaining()) {
    socketChannel.write(buffer);   
}

ServerSocketChannel

之前說 SocketChannel 是 TCP 客戶端,這裡說的 ServerSocketChannel 就是對應的服務端。

ServerSocketChannel 用於監聽機器端口,管理從這個端口進來的 TCP 連接。

// 實例化
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 監聽 8080 端口
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
 
while (true) {
    // 一旦有一個 TCP 連接進來,就對應創建一個 SocketChannel 進行處理
    SocketChannel socketChannel = serverSocketChannel.accept();
}

SocketChannel 它不僅僅是 TCP 客戶端,它代表的是一個網絡通道,可讀可寫。

ServerSocketChannel 不和 Buffer 打交道,因為它並不實際處理數據,他一旦接收到請求後,實例化 SocketChannel,之後在這個連接通道上的數據他就不管瞭,因為他需要繼續監聽端口,等待下一個連接。

DatagrmChannel

UDP 和 TCP 不一樣,DatagramChannel 一個類處理瞭服務端和客戶端。

監聽端口:

DatagramChannel channel = DatagramChannel.open();
channel.socket().bind(new InetSocketAddress(9090));
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
 
channel.receive(buf);

發送數據:

String newData = "New String to write to file..."
                    + System.currentTimeMillis();
 
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
buf.flip();
 
int bytesSent = channel.send(buf, new InetSocketAddress("jenkov.com", 80));

Selector

Selector 建立在非阻塞的基礎之上,經常聽到的多路復用在 Java 中指的就是它,用於實現一個線程管理多個 Channel。

首先,我們先開啟一個 Selector。

Selector selector = Selector.open();

將 Channel 註冊到 Selector 上。Selector 建立在非阻塞模式之上,所以註冊到 Selector 的 Channel 必須要支援非阻塞模式,FileChannel 不支援非阻塞,我們這裡討論最常見的 SocketChannel 和 ServerSocketChannel 。

// 將通道設置為非阻塞模式,因為默認都是阻塞模式的
channel.configureBlocking(false);
// 註冊
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

register 方法的第二個 int 型參數(使用二進制的標記位)用於表明需要監聽哪些感興趣的事件,共有以下四種事件:

SelectionKey.OP_READ?? 對應 00000001,通道中有數據可以進行讀取

SelectionKey.OP_WRITE 對應 00000100? 可以往通道中寫入數據

SelectionKey.OP_CONNECT? ?對應 00001000,成功建立 TCP 連接

SelectionKey.OP_ACCEPT 對應 00010000,接受 TCP 連接

我們可以同時監聽一個 Channel 中發生的多個事件,如我們要監聽 ACCEPT 和 READ 事件,那麼指定參數為二進制的00010001 即 十進制 17 即可。

註冊方法返回值是 SelectionKey 實例,它包含瞭 Channel 和 Selector 信息,也包括瞭一個叫做 Interset Set 的信息,即我們設置的我們感興趣的正在監聽的事件集合。

調用 Select()方法獲取通道信息,用於判斷是否有我們感興趣的事件已經發生瞭。Selector 的操作就是以上3步。

Selector selector = Selector.open();
 
channel.configureBlocking(false);
 
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
 
while(true) {
  // 判斷是否有事件準備好
  int readyChannels = selector.select();
  if(readyChannels == 0) continue;
 
  // 遍歷
  Set selectedKeys = selector.selectedKeys();
  Iterator keyIterator = selectedKeys.iterator();
  while(keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
 
    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
 
    } else if (key.isConnectable()) {
        // a connection was established with a remote server.
 
    } else if (key.isReadable()) {
        // a channel is ready for reading
 
    } else if (key.isWritable()) {
        // a channel is ready for writing
    }
 
    keyIterator.remove();
  }
}

小結

Buffer 和數組差不多,它有 position、limit、capacity 幾個重要屬性。put()一下數據、flip()切換到讀模式、然後用get()獲取數據、clear()一下清空數據、重新回到put()寫入數據。

Channel 基本上隻和 Buffer 打交道,最重要的接口就是 channel.read(buffer)和 channel.write(buffer)。

Selector 用於實現非阻塞 IO。

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *