線程池任務執行流程
我們從一個API開始接觸Executor是如何處理任務隊列的。
java.util.concurrent.Executor.execute(Runnable)
Executes the given task sometime in the future. The task may execute in a new thread or in an existing pooled thread. If the task cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached, the task is handled by the current RejectedExecutionHandler.
線程池中所有任務執行都依賴於此接口。這段話有以下幾個意思:
任務可能在將來某個時刻被執行,有可能不是立即執行。為什麼這裡有兩個“可能”?繼續往下面看。
任務可能在一個新的線程中執行或者線程池中存在的一個線程中執行。
任務無法被提交執行有以下兩個原因:線程池已經關閉或者線程池已經達到瞭容量限制。
所有失敗的任務都將被“當前”的任務拒絕策略RejectedExecutionHandler 處理。
回答上面兩個“可能“。任務可能被執行,那不可能的情況就是上面說的情況3;可能不是立即執行,是因為任務可能還在隊列中排隊,因此還在等待分配線程執行。瞭解完瞭字面上的問題,我們再來看具體的實現。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
這一段代碼看起來挺簡單的,其實這就是線程池最重要的一部分,如果能夠完全理解這一塊,線程池還是挺容易的。整個執行流程是這樣的:
如果任務command為空,則拋出空指針異常,返回。否則進行2。
如果當前線程池大小 大於或等於 核心線程池大小,進行4。否則進行3。
創建一個新工作隊列(線程,參考上一節),成功直接返回,失敗進行4。
如果線程池正在運行並且任務加入線程池隊列成功,進行5,否則進行7。
如果線程池已經關閉或者線程池大小為0,進行6,否則直接返回。
如果線程池已經關閉則執行拒絕策略返回,否則啟動一個新線程來進行執行任務,返回。
如果線程池大小 不大於 最大線程池數量,則啟動新線程來進行執行,否則進行拒絕策略,結束。
文字描述步驟不夠簡單?下面圖形詳細表述瞭此過程。
老實說這個圖比上面步驟更難以理解,那麼從何入手呢。
流程的入口很簡單,我們就是要執行一個任務(Runnable command),那麼它的結束點在哪或者有哪幾個?
根據左邊這個圖我們知道可能有以下幾種出口:
(1)圖中的P1、P7,我們根據這條路徑可以看到,僅僅是將任務加入任務隊列(offer(command))瞭;
(2)圖中的P3,這條路徑不將任務加入任務隊列,但是啟動瞭一個新工作線程(Worker)進行掃尾操作,用戶處理為空的任務隊列;
(3)圖中的P4,這條路徑沒有將任務加入任務隊列,但是啟動瞭一個新工作線程(Worker),並且工作現場的第一個任務就是當前任務;
(4)圖中的P5、P6,這條路徑沒有將任務加入任務隊列,也沒有啟動工作線程,僅僅是拋給瞭任務拒絕策略。P2是任務加入瞭任務隊列卻因為線程池已經關閉於是又從任務隊列中刪除,並且拋給瞭拒絕策略。
如果上面的解釋還不清楚,可以去研究下面兩段代碼:
java.util.concurrent.ThreadPoolExecutor.addIfUnderCorePoolSize(Runnable)
java.util.concurrent.ThreadPoolExecutor.addIfUnderMaximumPoolSize(Runnable)
java.util.concurrent.ThreadPoolExecutor.ensureQueuedTaskHandled(Runnable)
那麼什麼時候一個任務被立即執行呢?
在線程池運行狀態下,如果線程池大小 小於 核心線程池大小或者線程池已滿(任務隊列已滿)並且線程池大小 小於 最大線程池大小(此時線程池大小 大於 核心線程池大小的),用程序描述為:
runState == RUNNING && ( poolSize < corePoolSize || poolSize < maxnumPoolSize && workQueue.isFull())
上面的條件就是一個任務能夠被立即執行的條件。
有瞭execute的基礎,我們看看ExecutorService中的幾個submit方法的實現。
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Object> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
很簡單,不是麼?對於一個線程池來說復雜的地方也就在execute方法的執行流程。在下一節中我們來討論下如何獲取任務的執行結果,也就是Future類的使用和原理。