1. 前言
現在很多做網絡通訊中間代理層的通訊都是使用Java1.4以後推出的NIO進行編寫,現在還有很多開源的框架也是封裝瞭NIO的書寫細節來幫助大傢簡寫異步非阻塞通訊服務。像MySql的代理中間件amoeba-mysql-proxy就是采用NIO的方式處理client端過來的request,之後與Mysql-Server層的通訊也是采用NIO進行命令消息發送的。再看咱們JavaEye首頁介紹的項目xmemcached,其中作者Dennis是其xmemcached的開發人,他也是通過NIO的方式與memcached的Server進行異步通訊的,Dennis的另一個項目yanf4j就是一個NIO框架,xmemcache也是借助這個NIO框架實現的異步非阻塞方式的網絡通訊,Apache的MINA框架都是NIO的封裝再實現。
那麼我們就來回顧一下以前的處理方式,來看看為什麼現在要使用NIO來進行異步非阻塞方式的通訊吧,網上很多文章都是幾句話將NIO和原始的socket通訊的優劣一帶而過,我們這次用一個簡單的下載大文件的網絡服務程序進行說明。使用3種模式來說明,分別是同步單獨線程服務運行模式、傳統阻塞多線程模式、使用NIO異步非阻塞模式。
我們設置服務器上有一個1.81GB的電影,格式為RMVB。使用Server進行服務監聽,客戶端請求到Server,建立網絡通訊,進行電影下載。
2. 同步單線程阻塞
使用同步單線程下載,是最原始的socket通訊,服務端的代碼如下
Java代碼
<span style=”font-size: x-small;”>package server;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
/**
* liuyan
*/
public class FilmServer {
public static void main(String[] args) {
FilmServer ms = new FilmServer();
try {
ms.server();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 服務器端響應請求
*
* @throws Exception
*/
public void server() throws Exception {
// 0.建立服務器端的server的socket
ServerSocket ss = new ServerSocket(9999);
while (true) {
// 1.打開socket連接
// 等待客戶端的請求
final Socket server = ss.accept();
System.out.println(“服務———–請求開始start”);
// 2.打開socket的流信息,準備下面的操作
final InputStream is = server.getInputStream();
byte b[] = new byte[1024];
int readCount = is.read(b);
String str = new String(b);
str = str.trim();
final String serverFileName = str;
// 3.對流信息進行讀寫操作
System.out.println(“客戶端傳過來的信息是:” + str);
System.out.println(“線程” + Thread.currentThread().getName() + “啟動”);
try {
FileInputStream fileInputStream = new FileInputStream(
serverFileName);
// 3.1 服務器回復客戶端信息(response)
OutputStream os = server.getOutputStream();
byte[] bfile = new byte[1024];
// 往客戶端寫
while (fileInputStream.read(bfile) > 0) {
os.write(bfile);
}
fileInputStream.close();
os.close();
// 4.關閉socket
// 先關閉輸入流
is.close();
// 最後關閉socket
server.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(“服務———–請求結束over”);
}
}
}</span>
服務端這麼寫代碼會有什麼問題?咱們先來看客戶端代碼,之後運行後就知道瞭。
Java代碼
<span style=”font-size: x-small;”>package client;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
/**
* liuyan
* @version 1.0
*/
public class FilmClient {
public static void main(String[] args) {
for (int i = 1; i <= 2; i++) {
Client client = new Client();
client.i = i;
client.start();
}
}
}
class Client extends Thread {
int i;
@Override
public void run() {
// 1.建立scoket連接
Socket client;
try {
client = new Socket(“127.0.0.1”, 9999);
// 2.打開socket的流信息,準備下面的操作
OutputStream os = client.getOutputStream();
// 3.寫信息
os.write((“d://film//2.rmvb”).getBytes());
String filmName = “c://io”+i+”.rmvb”;
FileOutputStream fileOutputStream = new FileOutputStream(filmName);
// 3.1接收服務器端的反饋
InputStream is = client.getInputStream();
byte b[] = new byte[1024];
while(is.read(b)>0){
fileOutputStream.write(b);
}
// 4.關閉socket
// 先關閉輸出流
os.close();
// 最後關閉socket
client.close();
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}</span>
客戶端啟動瞭2個線程進行下載電影的工作,先啟動服務端,再運行客戶端,會看筆者本地的硬盤C分區到有如下效果。
可以看到線程2的下載任務一直是0字節,等第一個線程下載完成後呢,線程2的下載任務才能進行。
服務端的代碼造成的問題就是使用傳統的sokect網絡通訊,那麼另一個客戶端的線程請求到server端的時候就發生瞭阻塞的情況,也就是說,服務端相當一個廁所,廁所就有隻有一個坑位,來瞭一個人,相當於客戶端請求,那這個人相當於就把坑位給占瞭,write操作和read操作會阻塞,這個人還沒解決完問題呢,下個人就來瞭,沒辦法,哥們兒先在門外等等啊,等前一個客戶爽完瞭再給您提供服務好吧。那麼如何解決這個占著坑位不讓別人用的情況呢?
3. 阻塞的多線程
為瞭解決以上問題,那麼之後很多Server肯定不可能像以上程序那麼做,不過以前很多Server都是基於單線程服務改造一下,做成多線程的Server的通訊,修改一下上面的Server代碼,如下
Java代碼
<span style=”font-size: x-small;”>package server;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
/**
*
*/
public class FilmServerNewThread {
public static void main(String[] args) {
FilmServerNewThread ms = new FilmServerNewThread();
try {
ms.server();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 服務器端響應請求
*
* @throws Exception
*/
public void server() throws Exception {
// 0.建立服務器端的server的socket
ServerSocket ss = new ServerSocket(9999);
while (true) {
// 1.打開socket連接
// 等待客戶端的請求
final Socket server = ss.accept();
System.out.println(“服務———–請求開始start”);
// 2.打開socket的流信息,準備下面的操作
final InputStream is = server.getInputStream();
byte b[] = new byte[1024];
int readCount = is.read(b);
String str = new String(b);
str = str.trim();
final String serverFileName = str;
// 3.對流信息進行讀寫操作
System.out.println(“客戶端傳過來的信息是:” + str);
if (readCount > 0) {
new Thread() {
@Override
public void run() {
System.out.println(“線程”
+ Thread.currentThread().getName() + “啟動”);
try {
FileInputStream fileInputStream = new FileInputStream(
serverFileName);
// 3.1 服務器回復客戶端信息(response)
OutputStream os = server.getOutputStream();
byte[] bfile = new byte[1024];
// 往客戶端寫
while (fileInputStream.read(bfile) > 0) {
os.write(bfile);
}
fileInputStream.close();
os.close();
// 4.關閉socket
// 先關閉輸入流
is.close();
// 最後關閉socket
server.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}.start();
}
System.out.println(“服務———–請求結束over”);
}
}
}</span>
以上的Server就是在原始的socket基礎上加瞭線程,每一個Client請求過來後,整個Server主線程不必處於阻塞狀態,接收請求後直接另起一個新的線程來處理和客戶端的交互,就是往客戶端發送二進制包。這個在新線程中雖然阻塞,但是對於服務主線程沒有阻塞的影響,主線程依然通過死循環監聽著客戶端的一舉一動。另一個客戶端的線程發起請求後就再起一個新的線程對象去為客戶端服務。執行效果如下
2個線程互不影響,各自下載各自的。當然從非常嚴格的意義來講,str變量在十分高並發的情況下有線程安全問題,這個咱暫且忽略,就著眼於低並發的情況。這個問題是什麼呢,就是如果客戶端請求比較多瞭,那麼為每一個客戶端開辟一個新的線程對象來處理網絡傳輸的請求,需要創建個線程對象,而且這個線程對象從時間上來講還是處於長連接,這個就比較消費系統資源,這個打開進程管理器就可以看到。而且每一個線程內部都是阻塞的,也沒有說完全利用好這個新創建的線程。還拿剛才上廁所舉例子,好比現在不止一個坑位瞭,來瞭一個用戶我這邊就按照工程師的廁所坑位圖建立一個新的坑位,客戶來瞭,不用等待老坑位,用新創建的坑位就行瞭。等那個老坑位用完瞭,自然有垃圾回收器去消滅那個一次性的坑位的,騰出資源位置為瞭建立新的坑位。長時間連接的意思,相當於這個人上廁所的時間非常長,便秘??需要拉一天才能爽完……老的坑位一時半會兒回收不瞭,新的坑位需要有空間為其建造茅房以便滿足客戶端的“急切方便”需要。久而久之,線程數目一多,系統就掛瞭的概率就增多瞭(誰也別想上,全玩完瞭)。
4. 異步非阻塞
使用JDK1.4的NIO可以適當的解決上面的問題,異步 I/O 是一種 沒有阻塞地讀寫數據的方法。通常,在代碼進行 read() 調用時,代碼會阻塞直至有可供讀取的數據。同樣, write() 調用將會阻塞直至數據能夠寫入。異步 I/O 調用不會阻塞。相反,您將註冊對特定 I/O 事件的興趣 ― 可讀的數據的到達、新的套接字連接,等等,而在發生這樣的事件時,系統將會告訴您。異步 I/O 的一個優勢在於,它允許您同時根據大量的輸入和輸出執行 I/O。同步程序常常要求助於輪詢,或者創建許許多多的線程以處理大量的連接。使用異步 I/O,您可以監聽任何數量的通道上的事件,不用輪詢,也不用額外的線程。還是舉上公共廁所例子,雖然這個例子有點臭臭的。您現在有“便便”的需求瞭,不用那麼麻煩,看看公共廁所是否有人占領,也不用給您另起個新坑位,您就拿一根我們服務端定制的容器和一個很粗管子,這個坐便器的大小因您那個地方的尺寸而定,坐便器往您的那個地方一放,再將坐便器和管子一連接,OK,您就敞開瞭“爽”吧。不用擔心,這個管子自然會連接到相應的肥料廠傢,將您的排泄物有效回收加以利用的。您完瞭事,擦擦屁股,關上管子該幹嘛還幹嘛就行瞭。另一個人也有這個需求,沒問題,每個要我們提供服務的人都用這根管子,和自己的坐便器就行瞭,管子很粗,誰來連這個管子都行,有多少都行啊。下面我們來看基於NIO的網絡下載程序
Java代碼
<span style=”font-size: x-small;”>package server;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Iterator;
/**
*
* @author liuyan
*
*/
public class NIOServer {
static int BLOCK = 500*1024;
/**
* 處理客戶端的內部類,專門負責處理與用戶的交互
*/
public class HandleClient {
protected FileChannel channel;
protected ByteBuffer buffer;
String filePath;
/**
* 構造函數,文件的管道初始化
* @param filePath
* @throws IOException
*/
public HandleClient(String filePath) throws IOException {
//文件的管道
this.channel = new FileInputStream(filePath).getChannel();
//建立緩存
this.buffer = ByteBuffer.allocate(BLOCK);
this.filePath = filePath;
}
/**
* 讀取文件管道中數據到緩存中
* @return
*/
public ByteBuffer readBlock() {
try {
//清除緩沖區的內容,posistion設置為0,limit設置為緩沖的容量大小capacity
buffer.clear();
//讀取
int count = channel.read(buffer);
//將緩存的中的posistion設置為0,將緩存中的limit設置在原始posistion位置上
buffer.flip();
if (count <= 0)
return null;
} catch (IOException e) {
e.printStackTrace();
}
return buffer;
}
/**
* 關閉服務端的文件管道
*/
public void close() {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
protected Selector selector;
protected String filename = “d:\film\60.rmvb”; // a big file
protected ByteBuffer clientBuffer = ByteBuffer.allocate(BLOCK);
protected CharsetDecoder decoder;
//構造服務端口,服務管道等等
public NIOServer(int port) throws IOException {
selector = this.getSelector(port);
Charset charset = Charset.forName(“GB2312”);
decoder = charset.newDecoder();
}
// 獲取Selector
//構造服務端口,服務管道等等
protected Selector getSelector(int port) throws IOException {
ServerSocketChannel server = ServerSocketChannel.open();
Selector sel = Selector.open();
server.socket().bind(new InetSocketAddress(port));
server.configureBlocking(false);
//剛開始就註冊鏈接事件
server.register(sel, SelectionKey.OP_ACCEPT);
return sel;
}
// 服務啟動的開始入口
public void listen() {
try {
for (;;) {
//?
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys()
.iterator();
while (iter.hasNext()) {//首先是最先感興趣的連接事件
SelectionKey key = iter.next();
//
iter.remove();
//處理事件
handleKey(key);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
// 處理事件
protected void handleKey(SelectionKey key) throws IOException {
if (key.isAcceptable()) { // 接收請求
//允許網絡連接事件
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
channel.configureBlocking(false);
//網絡管道準備處理讀事件
channel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) { // 讀信息
SocketChannel channel = (SocketChannel) key.channel();
//從客戶端讀過來的數據塊
int count = channel.read(clientBuffer);
if (count > 0) {
//讀取過來的緩存進行有效分割,posistion設置為0,保證從緩存的有效位置開始讀取,limit設置為原先的posistion上
//這樣一來從posistion~limit這段緩存數據是有效,可利用的
clientBuffer.flip();
//對客戶端緩存塊進行編碼
CharBuffer charBuffer = decoder.decode(clientBuffer);
System.out.println(“Client >>download>>” + charBuffer.toString());
//對網絡管道註冊寫事件
SelectionKey wKey = channel.register(selector,
SelectionKey.OP_WRITE);
//將網絡管道附著上一個處理類HandleClient,用於處理客戶端事件的類
wKey.attach(new HandleClient(charBuffer.toString()));
} else{
//如客戶端沒有可讀事件,關閉管道
channel.close();
}
clientBuffer.clear();
} else if (key.isWritable()) { // 寫事件
SocketChannel channel = (SocketChannel) key.channel();
//從管道中將附著處理類對象HandleClient取出來
HandleClient handle = (HandleClient) key.attachment();
//讀取文件管道,返回數據緩存
ByteBuffer block = handle.readBlock();
if (block != null){
//System.out.println(“—“+new String(block.array()));
//寫給客戶端
channel.write(block);
}else {
handle.close();
channel.close();
}
}
}
public static void main(String[] args) {
int port = 12345;
try {
NIOServer server = new NIOServer(port);
System.out.println(“Listernint on ” + port);
while (true) {
server.listen();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}</span>
ServerSocketChannel相當於我們說的那個大粗管子,在它上面註冊瞭很多這個管子感興趣的事件,比如大便、小便、酒醉後吐的污濁都是它關心的。至於誰來控制管道應該關心的事件,是由管道通過Selector註冊事件完成的,Selector相當於一個大管道的維護員瞭。管道必須得有服務商的廠傢維護吧,不能濫用吧。Selector就是個管傢,負責管道的事件監聽的。XXXXBuffer相當於咱們說的坐便器,它是以塊為單位進行管道疏通的,假如您的尺寸特別大,估計您排出的那個玩意也小不瞭,就配置一個大點的緩存傳給服務那邊,當然,您這邊得到的服務端返回的加工後肥料,返給您的也是和您配置的尺寸有關系的。客戶端的代碼如下
Java代碼
<span style=”font-size: x-small;”>package client;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
*
* @author liuyan
*
*/
public class NIOClient {
static int SIZE = 2;
final static int bufferSize = 500 * 1024;
static InetSocketAddress ip = new InetSocketAddress(“localhost”, 12345);
static CharsetEncoder encoder = Charset.forName(“GB2312”).newEncoder();
static class Download implements Runnable {
protected int index;
String outfile = null;
public Download(int index) {
this.index = index;
this.outfile = “c:\” + index + “.rmvb”;
}
public void run() {
FileOutputStream fout = null;
// FileChannel fcout = null;
try {
fout = new FileOutputStream(outfile);
// fcout = fout.getChannel();
} catch (FileNotFoundException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
long start = System.currentTimeMillis();
// 打開客戶端socket管道
SocketChannel client = SocketChannel.open();
// 客戶端的管道的通訊模式
client.configureBlocking(false);
// 選擇器
Selector selector = Selector.open();
// 往客戶端管道上註冊感興趣的連接事件
client.register(selector, SelectionKey.OP_CONNECT);
// 配置IP
client.connect(ip);
// 配置緩存大小
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
int total = 0;
FOR: for (;;) {
// 阻塞,返回發生感興趣事件的數量
selector.select();
// 相當於獲得感興趣事件的集合迭代
Iterator<SelectionKey> iter = selector.selectedKeys()
.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
System.out.println(“—–Thread ”
+ index + “——————“+key.readyOps());
// 刪除這個馬上就要被處理的事件
iter.remove();
// 感興趣的是可連接的事件
if (key.isConnectable()) {
// 獲得該事件中的管道對象
SocketChannel channel = (SocketChannel) key
.channel();
// 如果該管道對象已經連接好瞭
if (channel.isConnectionPending())
channel.finishConnect();
// 往管道中寫一些塊信息
channel.write(encoder.encode(CharBuffer
.wrap(“d://film//1.rmvb”)));
// 之後為該客戶端管道註冊新的感興趣的事件—讀操作
channel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
// 由事件獲得通訊管道
SocketChannel channel = (SocketChannel) key
.channel();
// 從管道中讀取數據放到緩存中
int count = channel.read(buffer);
System.out.println(“count:” + count);
if (count > 0) {
// 統計讀取的字節數目
total += count;
// 這樣一來從posistion~limit這段緩存數據是有效,可利用的
// buffer.flip();
buffer.clear();
// 往輸出文件中去寫瞭
if (count < bufferSize) {
byte[] overByte = new byte[count];
for (int index = 0; index < count; index++) {
overByte[index] = buffer.get(index);
}
fout.write(overByte);
} else {
fout.write(buffer.array());
}
} else {
// 關閉客戶端通道
client.close();
// 退出大循環
break FOR;
}
}
}
}
// 計算時間
double last = (System.currentTimeMillis() – start) * 1.0 / 1000;
System.out.println(“Thread ” + index + ” downloaded ” + total
/ 1024 + “kbytes in ” + last + “s.”);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
long startTime = System.currentTimeMillis();
// 啟用線程池
ExecutorService exec = Executors.newFixedThreadPool(SIZE);
for (int index = 1; index <= SIZE; index++) {
exec.execute(new Download(index));
}
exec.shutdown();
long endTime = System.currentTimeMillis();
long timeLong = endTime – startTime;
System.out.println(“下載時間:” + timeLong);
}
}</span>
效果和上一個程序的效果差不多,隻是時間上和內存資源占有率上有所提高,當然本機僅僅啟動瞭幾個線程,如果客戶端啟動更多線程,NIO的方式節約資源的效果是明顯的,宕機概率小於阻塞IO方式很多。
5. 總結
其實NIO想寫得更多,但是看到網絡上已經有很多資料瞭,就不再展開瞭,非一篇所能盡述的瞭的。當然瞭,NIO也是有場景的,比如適合與長連接的請求,以為NIO維護管道、緩存塊、時間選擇器等等也需要資源消耗的,而且比傳統IO的對象們要重量級。所以原始IO也並不是一無是處,現在還是有很多socket中間件還是已然使用第二種方式。
代碼在附件中~~~