2025-04-23

線程池任務執行結果


這一節來探討下線程池中任務執行的結果以及如何阻塞線程、取消任務等等。


1 package info.imxylz.study.concurrency.future;
2
3 public class SleepForResultDemo implements Runnable {
4
5     static boolean result = false;
6
7     static void sleepWhile(long ms) {
8         try {
9             Thread.sleep(ms);
10         } catch (Exception e) {}
11     }
12
13     @Override
14     public void run() {
15         //do work
16         System.out.println(“Hello, sleep a while.”);
17         sleepWhile(2000L);
18         result = true;
19     }
20
21     public static void main(String[] args) {
22         SleepForResultDemo demo = new SleepForResultDemo();
23         Thread t = new Thread(demo);
24         t.start();
25         sleepWhile(3000L);
26         System.out.println(result);
27     }
28
29 }
30
在沒有線程池的時代裡面,使用Thread.sleep(long)去獲取線程執行完畢的場景很多。顯然這種方式很笨拙,他需要你事先知道任務可能的執行時間,並且還會阻塞主線程,不管任務有沒有執行完畢。


1 package info.imxylz.study.concurrency.future;
2
3 public class SleepLoopForResultDemo implements Runnable {
4
5     boolean result = false;
6
7     volatile boolean finished = false;
8
9     static void sleepWhile(long ms) {
10         try {
11             Thread.sleep(ms);
12         } catch (Exception e) {}
13     }
14
15     @Override
16     public void run() {
17         //do work
18         try {
19             System.out.println(“Hello, sleep a while.”);
20             sleepWhile(2000L);
21             result = true;
22         } finally {
23             finished = true;
24         }
25     }
26
27     public static void main(String[] args) {
28         SleepLoopForResultDemo demo = new SleepLoopForResultDemo();
29         Thread t = new Thread(demo);
30         t.start();
31         while (!demo.finished) {
32             sleepWhile(10L);
33         }
34         System.out.println(demo.result);
35     }
36
37 }
38
使用volatile與while死循環的好處就是等待的時間可以稍微小一點,但是依然有CPU負載高並且阻塞主線程的問題。最簡單的降低CPU負載的方式就是使用Thread.join().


        SleepLoopForResultDemo demo = new SleepLoopForResultDemo();
        Thread t = new Thread(demo);
        t.start();
        t.join();
        System.out.println(demo.result);
顯然這也是一種不錯的方式,另外還有自己寫鎖使用wait/notify的方式。其實join()從本質上講就是利用while和wait來實現的。


上面的方式中都存在一個問題,那就是會阻塞主線程並且任務不能被取消。為瞭解決這個問題,線程池中提供瞭一個Future接口。


ThreadPoolExecutor-Future


在Future接口中提供瞭5個方法。


V get() throws InterruptedException, ExecutionException: 等待計算完成,然後獲取其結果。
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException。最多等待為使計算完成所給定的時間之後,獲取其結果(如果結果可用)。
boolean cancel(boolean mayInterruptIfRunning):試圖取消對此任務的執行。
boolean isCancelled():如果在任務正常完成前將其取消,則返回 true。
boolean isDone():如果任務已完成,則返回 true。 可能由於正常終止、異常或取消而完成,在所有這些情況中,此方法都將返回 true。
API看起來容易,來研究下異常吧。get()請求獲取一個結果會阻塞當前進程,並且可能拋出以下三種異常:


InterruptedException:執行任務的線程被中斷則會拋出此異常,此時不能知道任務是否執行完畢,因此其結果是無用的,必須處理此異常。
ExecutionException:任務執行過程中(Runnable#run())方法可能拋出RuntimeException,如果提交的是一個java.util.concurrent.Callable<V>接口任務,那麼java.util.concurrent.Callable.call()方法有可能拋出任意異常。
CancellationException:實際上get()方法還可能拋出一個CancellationException的RuntimeException,也就是任務被取消瞭但是依然去獲取結果。
對於get(long timeout, TimeUnit unit)而言,除瞭get()方法的異常外,由於有超時機制,因此還可能得到一個TimeoutException。


boolean cancel(boolean mayInterruptIfRunning)方法比較復雜,各種情況比較多:


如果任務已經執行完畢,那麼返回false。
如果任務已經取消,那麼返回false。
循環直到設置任務為取消狀態,對於未啟動的任務將永遠不再執行,對於正在運行的任務,將根據mayInterruptIfRunning是否中斷其運行,如果不中斷那麼任務將繼續運行直到結束。
此方法返回後任務要麼處於運行結束狀態,要麼處於取消狀態。isDone()將永遠返回true,如果cancel()方法返回true,isCancelled()始終返回true。
來看看Future接口的實現類java.util.concurrent.FutureTask<V>具體是如何操作的。


在FutureTask中使用瞭一個AQS數據結構來完成各種狀態以及加鎖、阻塞的實現。


在此AQS類java.util.concurrent.FutureTask.Sync中一個任務用4中狀態:


ThreadPoolExecutor-FutureTask-state


初始情況下任務狀態state=0,任務執行(innerRun)後狀態變為運行狀態RUNNING(state=1),執行完畢後變成運行結束狀態RAN(state=2)。任務在初始狀態或者執行狀態被取消後就變為狀態CANCELLED(state=4)。AQS最擅長無鎖情況下處理幾種簡單的狀態變更的。


        void innerRun() {
            if (!compareAndSetState(0, RUNNING))
                return;
            try {
                runner = Thread.currentThread();
                if (getState() == RUNNING) // recheck after setting thread
                    innerSet(callable.call());
                else
                    releaseShared(0); // cancel
            } catch (Throwable ex) {
                innerSetException(ex);
            }
        }
執行一個任務有四步:設置運行狀態、設置當前線程(AQS需要)、執行任務(Runnable#run或者Callable#call)、設置執行結果。這裡也可以看到,一個任務隻能執行一次,因為執行完畢後它的狀態不在為初始值0,要麼為CANCELLED,要麼為RAN。


取消一個任務(cancel)又是怎樣進行的呢?對比下前面取消任務的描述是不是很簡單,這裡無非利用AQS的狀態來改變任務的執行狀態,最終達到放棄未啟動或者正在執行的任務的目的。


boolean innerCancel(boolean mayInterruptIfRunning) {
    for (;;) {
        int s = getState();
        if (ranOrCancelled(s))
            return false;
        if (compareAndSetState(s, CANCELLED))
            break;
    }
    if (mayInterruptIfRunning) {
        Thread r = runner;
        if (r != null)
            r.interrupt();
    }
    releaseShared(0);
    done();
    return true;
}
到目前為止我們依然沒有說明到底是如何阻塞獲取一個結果的。下面四段代碼描述瞭這個過程。


1     V innerGet() throws InterruptedException, ExecutionException {
2         acquireSharedInterruptibly(0);
3         if (getState() == CANCELLED)
4             throw new CancellationException();
5         if (exception != null)
6             throw new ExecutionException(exception);
7         return result;
8     }
9     //AQS#acquireSharedInterruptibly
10     public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
11         if (Thread.interrupted())
12             throw new InterruptedException();
13         if (tryAcquireShared(arg) < 0)
14             doAcquireSharedInterruptibly(arg); //park current Thread for result
15     }
16     protected int tryAcquireShared(int ignore) {
17         return innerIsDone()? 1 : -1;
18     }
19
20     boolean innerIsDone() {
21         return ranOrCancelled(getState()) && runner == null;
22     }
當調用Future#get()的時候嘗試去獲取一個共享變量。這就涉及到AQS的使用方式瞭。這裡獲取一個共享變量的狀態是任務是否結束(innerIsDone()),也就是任務是否執行完畢或者被取消。如果不滿足條件,那麼在AQS中就會doAcquireSharedInterruptibly(arg)掛起當前線程,直到滿足條件。AQS前面講過,掛起線程使用的是LockSupport的park方式,因此性能消耗是很低的。


至於將Runnable接口轉換成Callable接口,java.util.concurrent.Executors.callable(Runnable, T)也提供瞭一個簡單實現。


    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable  task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }
延遲、周期性任務調度的實現


java.util.concurrent.ScheduledThreadPoolExecutor是默認的延遲、周期性任務調度的實現。


有瞭整個線程池的實現,再回頭來看延遲、周期性任務調度的實現應該就很簡單瞭,因為所謂的延遲、周期性任務調度,無非添加一系列有序的任務隊列,然後按照執行順序的先後來處理整個任務隊列。如果是周期性任務,那麼在執行完畢的時候加入下一個時間點的任務即可。


由此可見,ScheduledThreadPoolExecutor和ThreadPoolExecutor的唯一區別在於任務是有序(按照執行時間順序)的,並且需要到達時間點(臨界點)才能執行,並不是任務隊列中有任務就需要執行的。也就是說唯一不同的就是任務隊列BlockingQueue<Runnable> workQueue不一樣。ScheduledThreadPoolExecutor的任務隊列是java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue,它是基於java.util.concurrent.DelayQueue<RunnableScheduledFuture>隊列的實現。


DelayQueue是基於有序隊列PriorityQueue實現的。PriorityQueue 也叫優先級隊列,按照自然順序對元素進行排序,類似於TreeMap/Collections.sort一樣。


同樣是有序隊列,DelayQueue和PriorityQueue區別在什麼地方?


由於DelayQueue在獲取元素時需要檢測元素是否“可用”,也就是任務是否達到“臨界點”(指定時間點),因此加入元素和移除元素會有一些額外的操作。


典型的,移除元素需要檢測元素是否達到“臨界點”,增加元素的時候如果有一個元素比“頭元素”更早達到臨界點,那麼就需要通知任務隊列。因此這需要一個條件變量final Condition available 。


移除元素(出隊列)的過程是這樣的:


總是檢測隊列的頭元素(順序最小元素,也是最先達到臨界點的元素)
檢測頭元素與當前時間的差,如果大於0,表示還未到底臨界點,因此等待響應時間(使用條件變量available)
如果小於或者等於0,說明已經到底臨界點或者已經過瞭臨界點,那麼就移除頭元素,並且喚醒其它等待任務隊列的線程。
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null) {
                    available.await();
                } else {
                    long delay =  first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay > 0) {
                        long tl = available.awaitNanos(delay);
                    } else {
                        E x = q.poll();
                        assert x != null;
                        if (q.size() != 0)
                            available.signalAll(); // wake up other takers
                        return x;


                    }
                }
            }
        } finally {
            lock.unlock();
        }
    }
同樣加入元素也會有相應的條件變量操作。當前僅當隊列為空或者要加入的元素比隊列中的頭元素還小的時候才需要喚醒“等待線程”去檢測元素。因為頭元素都沒有喚醒那麼比頭元素更延遲的元素就更加不會喚醒。


    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            q.offer(e);
            if (first == null || e.compareTo(first) < 0)
                available.signalAll();
            return true;
        } finally {
            lock.unlock();
        }
    }
有瞭任務隊列後再來看Future在ScheduledThreadPoolExecutor中是如何操作的。


java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask<V>是繼承java.util.concurrent.FutureTask<V>的,區別在於執行任務是否是周期性的。


        private void runPeriodic() {
            boolean ok = ScheduledFutureTask.super.runAndReset();
            boolean down = isShutdown();
            // Reschedule if not cancelled and not shutdown or policy allows
            if (ok && (!down ||
                       (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
                        !isStopped()))) {
                long p = period;
                if (p > 0)
                    time += p;
                else
                    time = now() – p;
                ScheduledThreadPoolExecutor.super.getQueue().add(this);
            }
            // This might have been the final executed delayed
            // task.  Wake up threads to check.
            else if (down)
                interruptIdleWorkers();
        }


        /**
         * Overrides FutureTask version so as to reset/requeue if periodic.
         */
        public void run() {
            if (isPeriodic())
                runPeriodic();
            else
                ScheduledFutureTask.super.run();
        }
    }
如果不是周期性任務調度,那麼就和java.util.concurrent.FutureTask.Sync的調度方式是一樣的。如果是周期性任務(isPeriodic())那麼就稍微有所不同的。


ScheduledThreadPoolExecutor-ScheduledFutureTask


先從功能/結構上分析下。第一種情況假設提交的任務每次執行花費10s,間隔(delay/period)為20s,對於scheduleAtFixedRate而言,每次執行開始時間20s,對於scheduleWithFixedDelay來說每次執行開始時間30s。第二種情況假設提交的任務每次執行時間花費20s,間隔(delay/period)為10s,對於scheduleAtFixedRate而言,每次執行開始時間10s,對於scheduleWithFixedDelay來說每次執行開始時間30s。(具體分析可以參考這裡)


也就是說scheduleWithFixedDelay的執行開始時間為(delay+cost),而對於scheduleAtFixedRate來說執行開始時間為max(period,cost)。


回頭再來看上面源碼runPeriodic()就很容易瞭。但特別要提醒的,如果任務的任何一個執行遇到異常,則後續執行都會被取消,這從runPeriodic()就能看出。要強調的第二點就是同一個周期性任務不會被同時執行。就比如說盡管上面第二種情況的scheduleAtFixedRate任務每隔10s執行到達一個時間點,但是由於每次執行時間花費為20s,因此每次執行間隔為20s,隻不過執行的任務次數會多一點。但從本質上講就是每隔20s執行一次,如果任務隊列不取消的話。


為什麼不會同時執行?


這是因為ScheduledFutureTask執行的時候會將任務從隊列中移除來,執行完畢以後才會添加下一個同序列的任務,因此任務隊列中其實最多隻有同序列的任務的一份副本,所以永遠不會同時執行(盡管要執行的時間在過去)。


 


ScheduledThreadPoolExecutor使用一個無界(容量無限,整數的最大值)的容器(DelayedWorkQueue隊列),根據ThreadPoolExecutor的原理,隻要當容器滿的時候才會啟動一個大於corePoolSize的線程數。因此實際上ScheduledThreadPoolExecutor是一個固定線程大小的線程池,固定大小為corePoolSize,構造函數裡面的Integer.MAX_VALUE其實是不生效的(盡管PriorityQueue使用數組實現有PriorityQueue大小限制,如果你的任務數超過瞭2147483647就會導致OutOfMemoryError,這個參考PriorityQueue的grow方法)。


 


再回頭看scheduleAtFixedRate等方法就容易多瞭。無非就是往任務隊列中添加一個未來某一時刻的ScheduledFutureTask任務,如果是scheduleAtFixedRate那麼period/delay就是正數,如果是scheduleWithFixedDelay那麼period/delay就是一個負數,如果是0那麼就是一次性任務。直接調用父類ThreadPoolExecutor的execute/submit等方法就相當於period/delay是0,並且initialDelay也是0。


    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        if (initialDelay < 0) initialDelay = 0;
        long triggerTime = now() + unit.toNanos(initialDelay);
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Object>(command,
                                            null,
                                            triggerTime,
                                            unit.toNanos(period)));
        delayedExecute(t);
        return t;
    }
另外需要補充說明的一點,前面說過java.util.concurrent.FutureTask.Sync任務隻能執行一次,那麼在runPeriodic()裡面怎麼又將執行過的任務加入隊列中呢?這是因為java.util.concurrent.FutureTask.Sync提供瞭一個innerRunAndReset()方法,此方法不僅執行任務還將任務的狀態還原成0(初始狀態)瞭,所以此任務就可以重復執行。這就是為什麼runPeriodic()裡面調用runAndRest()的緣故。


        boolean innerRunAndReset() {
            if (!compareAndSetState(0, RUNNING))
                return false;
            try {
                runner = Thread.currentThread();
                if (getState() == RUNNING)
                    callable.call(); // dont set result
                runner = null;
                return compareAndSetState(RUNNING, 0);
            } catch (Throwable ex) {
                innerSetException(ex);
                return false;
            }
        }
 


後話


整個並發實踐原理和實現(源碼)上的東西都講完瞭,後面幾個小節是一些總結和掃尾的工作,包括超時機制、異常處理等一些細節問題。也就是說大部分隻需要搬出一些理論和最佳實踐知識出來就好瞭,不會有大量費腦筋的算法分析和原理、思想探討之類的。後面的章節也會加快一些進度。


老實說從剛開始的好奇到中間的興奮,再到現在的徹悟,收獲還是很多,個人覺得這是最認真、最努力也是自我最滿意的一次技術研究和探討,同時在這個過程中將很多技術細節都串聯起來瞭,慢慢就有瞭那種技術相通的感覺。原來有瞭理論以後再去實踐、再去分析問題、解決問題和那種純解決問題得到的經驗完全不一樣。整個專輯下來不僅僅是並發包這一點點知識,設計到硬件、軟件、操作系統、網絡、安全、性能、算法、理論等等,總的來說這也算是一次比較成功的研究切入點,這比Guice那次探討要深入和持久的多。


發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *