在應用編程中,我們會遇到下面這樣的調用模型。。。
當一個業務方法(begin)中順序調用多個子業務方法(opertion1-N),且有些子業務方法比較耗時,那麼自然而然完成這次調用所需要的時間就比較長瞭。對於這樣的問題,通常情況下會從兩個方面對其進行重構和調優:
單個方法調優,即針對operation1-N中比較耗時的方法進行重構已達到期望的效果
業務重組和方法重構,即對整個大的業務方法進行重組,找出合乎當前需求(功能和性能)的實現方式
1比較好理解,最常見的就是sql語句,IO和string buffer方面的調優;2則需要看具體的應用場景瞭。由於本文主要是側重線程的並發與同步,所以我將例舉一個比較特殊的場景(webservices 遠程調用)。如下:
對照上述序列圖,若每一個operation方法都將進行一次遠程webservice調用,那麼一次調用的代價就要包含網絡通信方面的開銷。如果網絡延時很高,多次遠程調用的代價就相當大瞭。那麼該如果結合上面的第2點進行重構和調優呢?
減少網絡調用次數。 例如 operation1->API1,operation2->API2, 那麼是否可以提供一個包含API1和API2的新API12供其調用呢?這樣一次調用就可以達到目的。
多線程調用。如,讓operation1與operation2並發,這樣調用所需要的時間則為max(cost(operation1), cost(operation2))。這樣做會大大提高系統的復雜性,謹慎而為之。
接下來進入主題,談談並發的具體實現方式。
基本測試類ConcurrentSimpleTest
Java代碼
public class ConcurrentSimpleTest {
public void method1() {
System.out.println("before exec method1");
try {
Thread.sleep(400);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
System.out.println("method1 has been interrupted.");
e.printStackTrace();
}
System.out.println("after exec method1");
}
public void method2() {
System.out.println("before exec method2");
try {
Thread.sleep(800);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
System.out.println("method2 has been interrupted.");
e.printStackTrace();
}
System.out.println("after exec method2");
}
}
方式1:使用線程的join方法
Java代碼
public static void main(String[] args) throws InterruptedException {
final ConcurrentSimpleTest cst = new ConcurrentSimpleTest();
long s1 = System.currentTimeMillis();
cst.method1();
cst.method2();
long s2 = System.currentTimeMillis();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
cst.method1();
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
cst.method2();
}
});
t1.start();
t2.start();
t1.join(); //t2.join(500); 實際400ms後,方法就返回瞭
t2.join(); //t1.join(x); if x< max((800 – 400), (800-500)), 那該方法會在t2執行完前返回
//線程1/2都已返回,需要驗證結果
long s3 = System.currentTimeMillis();
System.out.println("time cost for normal execution:" + (s2-s1));
System.out.println("time cost for concurrent execution:" + (s3-s2));
}
方式2:使用信號量對象
自定義信號量
Java代碼
public class SimpleMonitor {
public volatile int single = 0;
public void require(int single) {
this.single = single;
}
public synchronized void release(int single) {
this.single -= single;
this.notify();
}
}
Java代碼
public static void main(String[] args) throws InterruptedException {
final ConcurrentSimpleTest cst = new ConcurrentSimpleTest();
final SimpleMonitor monitor = new SimpleMonitor();
long s1 = System.currentTimeMillis();
cst.method1();
cst.method2();
long s2 = System.currentTimeMillis();
monitor.require(2); //初始化信號量
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
cst.method1();
} finally {
monitor.release(1); //信號量-1
}
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
try {
cst.method2();
} finally {
monitor.release(1); //信號量-1
}
}
});
t1.start();
t2.start();
synchronized (monitor) {
while (monitor.single > 0) {
monitor.wait(); // monitor.wait(10 * 1000), 進行超時異常處理,中斷t1,t2
}
}
//線程1/2都已返回,需要驗證結果
long s3 = System.currentTimeMillis();
System.out.println("time cost for normal execution:" + (s2 – s1));
System.out.println("time cost for concurrent execution:" + (s3 – s2));
}
使用JDK concurrent包中的信號量對象Semaphores
Java代碼
public static void main(String[] args) throws InterruptedException {
final ConcurrentSimpleTest cst = new ConcurrentSimpleTest();
final Semaphore monitor = new Semaphore(0);
long s1 = System.currentTimeMillis();
cst.method1();
cst.method2();
long s2 = System.currentTimeMillis();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
cst.method1();
} finally {
monitor.release(); //增加信號量
}
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
try {
cst.method2();
} finally {
monitor.release(); //增加信號量
}
}
});
t1.start();
t2.start();
monitor.acquireUninterruptibly(2); //tryAcquire(int permits, long timeout, TimeUnit unit) 可設置超時處理
//線程1/2都已返回,需要驗證結果
long s3 = System.currentTimeMillis();
System.out.println("time cost for normal execution:" + (s2 – s1));
System.out.println("time cost for concurrent execution:" + (s3 – s2));
}
使用JDK concurrent包中的信號量對象CountDownLatch(似乎比Semaphores更適合這種場景)
Java代碼
public static void main(String[] args) throws InterruptedException {
final ConcurrentSimpleTest22 cst = new ConcurrentSimpleTest22();
final CountDownLatch monitor = new CountDownLatch(2); //設置計數器
long s1 = System.currentTimeMillis();
cst.method1();
cst.method2();
long s2 = System.currentTimeMillis();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
cst.method1();
} finally {
monitor.countDown(); //計數器-1
}
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
try {
cst.method2();
} finally {
monitor.countDown(); //計數器-1
}
}
});
t1.start();
t2.start();
monitor.await(1, TimeUnit.SECONDS); //monitor.await(1000, TimeUnit.MILLISECONDS); 設定超時閥值
//線程1/2都已返回,需要驗證結果
long s3 = System.currentTimeMillis();
System.out.println("time cost for normal execution:" + (s2 – s1));
System.out.println("time cost for concurrent execution:" + (s3 – s2));
}
方式3:使用JDK5中新的線程實現方式和線程池
Java代碼
public static void main(String[] args) throws InterruptedException, ExecutionException {
final ExecutorService execPool = Executors.newFixedThreadPool(5);
final ConcurrentSimpleTest3 cst = new ConcurrentSimpleTest3();
long s1 = System.currentTimeMillis();
cst.method1();
cst.method2();
long s2 = System.currentTimeMillis();
Callable<Void> call1 = new Callable<Void>(){
@Override
public Void call() throws Exception {
cst.method1();
return null;
}
};
Callable<Void> call2 = new Callable<Void>(){
@Override
public Void call() throws Exception {
cst.method2();
return null;
}
};
Future<Void> task1 = execPool.submit(call1);
Future<Void> task2 = execPool.submit(call2);
task1.get();//task1.get(1, TimeUnit.SECONDS); get方法會阻塞,直到線程執行結束返回結果或者超時
task2.get();//task2.get(1, TimeUnit.SECONDS);
//線程1/2都已返回,需要驗證結果
long s3 = System.currentTimeMillis();
System.out.println("time cost for normal execution:" + (s2 – s1));
System.out.println("time cost for concurrent execution:" + (s3 – s2));
execPool.shutdown();
}
要達到目的,我們的實現的方式有多種,上面給出的例子也隻是拋磚引入,那麼該如何抉擇呢?個人認為應該考慮以下幾個方面:
並發線程(任務)的可控性。如線程執行是否超時,可中斷執行中的線程,異常處理以及獲取線程執行的狀態等。方法1/2/3都針對具體的子線程(任務)可控,而方法2在超時設定方面則是針對並發的線程。
合理性,即合乎人的思維和設計,個人認為用信號量同步的方式(方法2)比較合理。
高效性,即性能。線程池應該是個不錯的選擇。
簡化性,即實現和使用比較簡單化。
簡化,其實就是更多的復用/重用。對於上面給出的應用場景,可以提出下面這樣的模型:
其中主線程為main thread,在它的執行過程中會啟動2個子線程T1和T2,等待T1和T2執行結束後,main thread才能執行結束。
進一步抽象下,便可以得到下面這個復雜點的模型:
模型中每個背景為白色的節點代表一個Thread,邊代表執行的步驟和方向。
Begin 會啟動T1和T2 線程,T2 執行完畢後會執行T4,而從 T1 和 T2 指向 T3 的兩條邊表示的是 T3 必須等 T1 和 T2 都執行完畢以後才能開始執行。若T1和T2同時執行完畢,那麼T3和T4也會並發執行。當T3和T4執行完成,就會執行End,整個過程結束。
對於這個模型,我們可以提供一個簡單的框架示意圖:
其具體實現主要分為3部分:
Constructor: 線程(任務)構建器,主要構建子線程(任務)以及其關聯關系(主要是前置依賴關系)
Executor: 線程(任務)執行器,主要負責高效地執行所提交的線程(任務)
Monitor: 負責監控一組線程(任務)的執行狀態並進行簡單地調度。如前置任務出現異常,那麼後續任務該任何執行等等
當然還得提供一些額外的訪問接口:如,線程(任務)的執行結果等。這樣一個框架/組件的雛形就有瞭,而作為調用者,也隻要關心輸入和輸出即可。
總結:本文主要從一個順序調用的例子出發,因性能問題引申到瞭並發,列出瞭並發同步的幾種簡單實現示例,並最後提出瞭一個並發框架的雛形。
有興趣的朋友可以一起討論其具體實現和應用。
–《全文完》–