2025-05-23

在應用編程中,我們會遇到下面這樣的調用模型。。。
 


 當一個業務方法(begin)中順序調用多個子業務方法(opertion1-N),且有些子業務方法比較耗時,那麼自然而然完成這次調用所需要的時間就比較長瞭。對於這樣的問題,通常情況下會從兩個方面對其進行重構和調優:
 
單個方法調優,即針對operation1-N中比較耗時的方法進行重構已達到期望的效果
業務重組和方法重構,即對整個大的業務方法進行重組,找出合乎當前需求(功能和性能)的實現方式
1比較好理解,最常見的就是sql語句,IO和string buffer方面的調優;2則需要看具體的應用場景瞭。由於本文主要是側重線程的並發與同步,所以我將例舉一個比較特殊的場景(webservices 遠程調用)。如下:
 


 
 
對照上述序列圖,若每一個operation方法都將進行一次遠程webservice調用,那麼一次調用的代價就要包含網絡通信方面的開銷。如果網絡延時很高,多次遠程調用的代價就相當大瞭。那麼該如果結合上面的第2點進行重構和調優呢?
 
減少網絡調用次數。 例如 operation1->API1,operation2->API2, 那麼是否可以提供一個包含API1和API2的新API12供其調用呢?這樣一次調用就可以達到目的。
多線程調用。如,讓operation1與operation2並發,這樣調用所需要的時間則為max(cost(operation1), cost(operation2))。這樣做會大大提高系統的復雜性,謹慎而為之。
 
接下來進入主題,談談並發的具體實現方式。
 
基本測試類ConcurrentSimpleTest
 
Java代碼 
public class ConcurrentSimpleTest { 
 
    public void method1() { 
        System.out.println("before exec method1"); 
        try { 
            Thread.sleep(400); 
        } catch (InterruptedException e) { 
            // TODO Auto-generated catch block 
            System.out.println("method1 has been interrupted."); 
            e.printStackTrace(); 
             
        } 
        System.out.println("after exec method1"); 
    } 
 
    public void method2() { 
        System.out.println("before exec method2"); 
        try { 
            Thread.sleep(800); 
        } catch (InterruptedException e) { 
            // TODO Auto-generated catch block 
            System.out.println("method2 has been interrupted."); 
            e.printStackTrace(); 
             
        } 
        System.out.println("after exec method2"); 
    } 

 
 
方式1:使用線程的join方法
Java代碼 
public static void main(String[] args) throws InterruptedException { 
        final ConcurrentSimpleTest cst = new ConcurrentSimpleTest(); 
         
        long s1 = System.currentTimeMillis(); 
        cst.method1();  
        cst.method2(); 
        long s2 = System.currentTimeMillis();    
         
        Thread t1 = new Thread(new Runnable() { 
            @Override 
            public void run() { 
                cst.method1();  
            } 
        }); 
 
        Thread t2 = new Thread(new Runnable() { 
            @Override 
            public void run() { 
                cst.method2(); 
            } 
        }); 
         
        t1.start(); 
        t2.start(); 
         
        t1.join(); //t2.join(500); 實際400ms後,方法就返回瞭 
        t2.join(); //t1.join(x); if x< max((800 – 400), (800-500)), 那該方法會在t2執行完前返回 
         
        //線程1/2都已返回,需要驗證結果 
         
        long s3 = System.currentTimeMillis(); 
         
        System.out.println("time cost for normal execution:" + (s2-s1)); 
        System.out.println("time cost for concurrent execution:" + (s3-s2)); 
    } 
 
 
方式2:使用信號量對象
 
自定義信號量
Java代碼 
public class SimpleMonitor { 
        public volatile int single = 0; 
 
        public void require(int single) { 
            this.single = single; 
        } 
 
        public synchronized void release(int single) { 
            this.single -= single; 
            this.notify(); 
        } 
    } 
 
 
Java代碼 
public static void main(String[] args) throws InterruptedException { 
        final ConcurrentSimpleTest cst = new ConcurrentSimpleTest(); 
        final SimpleMonitor monitor = new SimpleMonitor(); 
 
        long s1 = System.currentTimeMillis(); 
        cst.method1(); 
        cst.method2(); 
        long s2 = System.currentTimeMillis(); 
 
        monitor.require(2); //初始化信號量 
        Thread t1 = new Thread(new Runnable() { 
            @Override 
            public void run() { 
                try { 
                    cst.method1(); 
                } finally { 
                    monitor.release(1); //信號量-1 
                } 
            } 
        }); 
 
        Thread t2 = new Thread(new Runnable() { 
            @Override 
            public void run() { 
                try { 
                    cst.method2(); 
                } finally { 
                    monitor.release(1); //信號量-1 
                } 
            } 
        }); 
 
        t1.start(); 
        t2.start(); 
 
        synchronized (monitor) { 
            while (monitor.single > 0) { 
                monitor.wait(); // monitor.wait(10 * 1000), 進行超時異常處理,中斷t1,t2 
            } 
        } 
                //線程1/2都已返回,需要驗證結果 
        long s3 = System.currentTimeMillis(); 
 
        System.out.println("time cost for normal execution:" + (s2 – s1)); 
        System.out.println("time cost for concurrent execution:" + (s3 – s2)); 
    } 
 
 
使用JDK concurrent包中的信號量對象Semaphores
Java代碼 
public static void main(String[] args) throws InterruptedException { 
        final ConcurrentSimpleTest cst = new ConcurrentSimpleTest(); 
        final Semaphore monitor = new Semaphore(0); 
 
        long s1 = System.currentTimeMillis(); 
        cst.method1(); 
        cst.method2(); 
        long s2 = System.currentTimeMillis(); 
 
        Thread t1 = new Thread(new Runnable() { 
            @Override 
            public void run() { 
                try { 
                    cst.method1(); 
                } finally { 
                    monitor.release(); //增加信號量 
                } 
            } 
        }); 
 
        Thread t2 = new Thread(new Runnable() { 
            @Override 
            public void run() { 
                try { 
                    cst.method2(); 
                } finally { 
                    monitor.release(); //增加信號量 
                } 
            } 
        }); 
 
        t1.start(); 
        t2.start(); 
 
        monitor.acquireUninterruptibly(2); //tryAcquire(int permits, long timeout, TimeUnit unit) 可設置超時處理 
         
                //線程1/2都已返回,需要驗證結果 
 
        long s3 = System.currentTimeMillis(); 
 
        System.out.println("time cost for normal execution:" + (s2 – s1)); 
        System.out.println("time cost for concurrent execution:" + (s3 – s2)); 
    } 
 
 
使用JDK concurrent包中的信號量對象CountDownLatch(似乎比Semaphores更適合這種場景)
Java代碼 
public static void main(String[] args) throws InterruptedException { 
        final ConcurrentSimpleTest22 cst = new ConcurrentSimpleTest22(); 
        final CountDownLatch monitor = new CountDownLatch(2); //設置計數器 
 
        long s1 = System.currentTimeMillis(); 
        cst.method1(); 
        cst.method2(); 
        long s2 = System.currentTimeMillis(); 
 
        Thread t1 = new Thread(new Runnable() { 
            @Override 
            public void run() { 
                try { 
                    cst.method1(); 
                } finally { 
                    monitor.countDown(); //計數器-1 
                } 
            } 
        }); 
 
        Thread t2 = new Thread(new Runnable() { 
            @Override 
            public void run() { 
                try { 
                    cst.method2(); 
                } finally { 
                    monitor.countDown(); //計數器-1 
                } 
            } 
        }); 
 
        t1.start(); 
        t2.start(); 
 
        monitor.await(1, TimeUnit.SECONDS); //monitor.await(1000, TimeUnit.MILLISECONDS); 設定超時閥值 
        //線程1/2都已返回,需要驗證結果 
 
        long s3 = System.currentTimeMillis(); 
 
        System.out.println("time cost for normal execution:" + (s2 – s1)); 
        System.out.println("time cost for concurrent execution:" + (s3 – s2)); 
    } 
 
 
方式3:使用JDK5中新的線程實現方式和線程池
 
 
Java代碼 
public static void main(String[] args) throws InterruptedException, ExecutionException { 
        final ExecutorService execPool = Executors.newFixedThreadPool(5); 
        final ConcurrentSimpleTest3 cst = new ConcurrentSimpleTest3(); 
     
        long s1 = System.currentTimeMillis(); 
        cst.method1(); 
        cst.method2(); 
        long s2 = System.currentTimeMillis(); 
         
        Callable<Void> call1 = new Callable<Void>(){ 
            @Override 
            public Void call() throws Exception { 
                cst.method1(); 
                return null; 
            }            
        }; 
         
        Callable<Void> call2 = new Callable<Void>(){ 
            @Override 
            public Void call() throws Exception { 
                cst.method2(); 
                return null; 
            }            
        }; 
         
        Future<Void> task1 = execPool.submit(call1); 
        Future<Void> task2 = execPool.submit(call2); 
         
        task1.get();//task1.get(1, TimeUnit.SECONDS); get方法會阻塞,直到線程執行結束返回結果或者超時 
        task2.get();//task2.get(1, TimeUnit.SECONDS);    
         
                //線程1/2都已返回,需要驗證結果 
 
        long s3 = System.currentTimeMillis(); 
 
        System.out.println("time cost for normal execution:" + (s2 – s1)); 
        System.out.println("time cost for concurrent execution:" + (s3 – s2)); 
         
        execPool.shutdown(); 
    } 
 
要達到目的,我們的實現的方式有多種,上面給出的例子也隻是拋磚引入,那麼該如何抉擇呢?個人認為應該考慮以下幾個方面:
 
並發線程(任務)的可控性。如線程執行是否超時,可中斷執行中的線程,異常處理以及獲取線程執行的狀態等。方法1/2/3都針對具體的子線程(任務)可控,而方法2在超時設定方面則是針對並發的線程。
合理性,即合乎人的思維和設計,個人認為用信號量同步的方式(方法2)比較合理。
高效性,即性能。線程池應該是個不錯的選擇。
簡化性,即實現和使用比較簡單化。
簡化,其實就是更多的復用/重用。對於上面給出的應用場景,可以提出下面這樣的模型:


其中主線程為main thread,在它的執行過程中會啟動2個子線程T1和T2,等待T1和T2執行結束後,main thread才能執行結束。
 
進一步抽象下,便可以得到下面這個復雜點的模型:


模型中每個背景為白色的節點代表一個Thread,邊代表執行的步驟和方向。
 
Begin 會啟動T1和T2 線程,T2 執行完畢後會執行T4,而從 T1 和 T2 指向 T3 的兩條邊表示的是 T3 必須等 T1 和 T2 都執行完畢以後才能開始執行。若T1和T2同時執行完畢,那麼T3和T4也會並發執行。當T3和T4執行完成,就會執行End,整個過程結束。
 
對於這個模型,我們可以提供一個簡單的框架示意圖:

 

其具體實現主要分為3部分:
 
Constructor: 線程(任務)構建器,主要構建子線程(任務)以及其關聯關系(主要是前置依賴關系)
Executor: 線程(任務)執行器,主要負責高效地執行所提交的線程(任務)
Monitor: 負責監控一組線程(任務)的執行狀態並進行簡單地調度。如前置任務出現異常,那麼後續任務該任何執行等等
當然還得提供一些額外的訪問接口:如,線程(任務)的執行結果等。這樣一個框架/組件的雛形就有瞭,而作為調用者,也隻要關心輸入和輸出即可。
 
 
 
總結:本文主要從一個順序調用的例子出發,因性能問題引申到瞭並發,列出瞭並發同步的幾種簡單實現示例,並最後提出瞭一個並發框架的雛形。
 
有興趣的朋友可以一起討論其具體實現和應用。
 
–《全文完》–

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *