Android-AsyncTask源碼解析

public final AsyncTask<Params, Progress, Result> execute(Params… params) {

return executeOnExecutor(sDefaultExecutor, params);//sDefaultExecutor?

}

 

 

private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;

public static final Executor SERIAL_EXECUTOR = new SerialExecutor();

private static class SerialExecutor implements Executor { //嚴格來說它不是一個Executor,因為它沒有自己的線程池,是工作在主線程上的

        final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();

        Runnable mActive;

 

 

        public synchronized void execute(final Runnable r) {//提交一個任務

            mTasks.offer(new Runnable() {//入隊

                public void run() {

                    try {

                        r.run();

                    } finally {

                        scheduleNext();

                    }

                }

            });

            if (mActive == null) {//取出一個任務執行

                scheduleNext();

            }

        }

 

 

        protected synchronized void scheduleNext() {

            if ((mActive = mTasks.poll()) != null) {

                THREAD_POOL_EXECUTOR.execute(mActive);//最終執行任務的executor

            }

        }

    }

public static final Executor THREAD_POOL_EXECUTOR

            = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,

                    TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);//這才是真正的executor,而且是static的,kvm裡面唯一的一個!

private static final int CORE_POOL_SIZE = 5;     //5個常駐線程

private static final int MAXIMUM_POOL_SIZE = 128;// 最多可以開到128個線程

//以上幾行就解釋瞭為什麼無論執行多少次new AsynTask(),在DDMS隻能看到5個Asyn的線程。

//ps:android的ThreadPoolExecutor和jdk的ThreadPoolExecutor接口長得一樣,內部實現完全不一樣。

 

 

sThreadFactory:

private static final ThreadFactory sThreadFactory = new ThreadFactory() {

        private final AtomicInteger mCount = new AtomicInteger(1);

 

 

        public Thread newThread(Runnable r) {

            return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());

        }

    };

這就是為什麼DDMS看到的Asyn線程的名字都帶有"AsyncTask #"的前綴。

繼續:

public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,

            Params… params) {

 

 

        if (mStatus != Status.PENDING) {

            switch (mStatus) {

                case RUNNING:

                    throw new IllegalStateException("Cannot execute task:"

                            + " the task is already running.");

                case FINISHED:

                    throw new IllegalStateException("Cannot execute task:"

                            + " the task has already been executed "

                            + "(a task can be executed only once)");

            }

        }

        mStatus = Status.RUNNING;

        onPreExecute();//abstract,可以被覆蓋

        mWorker.mParams = params;//mWorker?

        exec.execute(mFuture);//exec就是前面提到的sDefaultExecutor,mFutrue?

        return this;

    }

在構造函數裡面:

public AsyncTask() {

        mWorker = new WorkerRunnable<Params, Result>() {

            public Result call() throws Exception {

                mTaskInvoked.set(true);

                Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);

                //noinspection unchecked

                return postResult(doInBackground(mParams));//這裡首先調用子類的doInBackground()方法,然後調用postResult()

            }

        };

 

 

        mFuture = new FutureTask<Result>(mWorker) {

            @Override

            protected void done() {//FutureTask執行完以後的回調

                try {

                    postResultIfNotInvoked(get());//如果沒有調用過會再次調用

                } catch (InterruptedException e) {

                    android.util.Log.w(LOG_TAG, e);

                } catch (ExecutionException e) {

                    throw new RuntimeException("An error occured while executing doInBackground()",

                            e.getCause());

                } catch (CancellationException e) {

                    postResultIfNotInvoked(null);

                }

            }

        };

    }

我們繼續從exec.execute(mFuture);這一行往下看:

執行:sDefaultExecutor.execute(mFuture),實際上就是把任務入隊,然後順序的取,放到THREAD_POOL_EXECUTOR執行

執行:THREAD_POOL_EXECUTOR.execute(mFuture)這裡會使用內部的線程來執行任務

try {

r.run();//r=mFuture

} finally {

scheduleNext();

}

從這裡開始,下面的操作就是在單獨的線程上執行瞭,執行任務實際就是直接調用run:mFuture.run(),這裡面會調用mWorker.call():

public Result call() throws Exception {

        mTaskInvoked.set(true);//這是個boolean的標志,private final AtomicBoolean mTaskInvoked = new AtomicBoolean();

        Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);

        //noinspection unchecked

        return postResult(doInBackground(mParams));//這裡首先調用doInBackground()方法,然後調用postResult()。我們一般會在子類重寫doInBackground()方法

    }

private Result postResult(Result result) {//在這裡發消息

        @SuppressWarnings("unchecked")

        Message message = sHandler.obtainMessage(MESSAGE_POST_RESULT,//sHandler?

                new AsyncTaskResult<Result>(this, result));//AsyncTaskResult?

        message.sendToTarget();

        return result;

    }

private static class AsyncTaskResult<Data> {

        final AsyncTask mTask;

        final Data[] mData;

 

 

        AsyncTaskResult(AsyncTask task, Data… data) {

            mTask = task;

            mData = data;

        }

    }

詳細過程:

public class FutureTask<V> implements RunnableFuture<V> :

public void run() {

        if (state != NEW ||

            !UNSAFE.compareAndSwapObject(this, runnerOffset,

                                         null, Thread.currentThread()))

            return;

        try {

            Callable<V> c = callable;//是構造函數傳進來的,mWorker實現瞭Callable接口,private static abstract class WorkerRunnable<Params, Result> implements Callable<Result>

            if (c != null && state == NEW) {

                V result;

                boolean ran;

                try {

                    result = c.call();//這裡會調用mWorker.call(),

                    ran = true;

                } catch (Throwable ex) {

                    result = null;

                    ran = false;

                    setException(ex);

                }

                if (ran)

                    set(result);//這裡會設置執行的結果

            }

        } finally {

            // runner must be non-null until state is settled to

            // prevent concurrent calls to run()

            runner = null;

            // state must be re-read after nulling runner to prevent

            // leaked interrupts

            int s = state;

            if (s >= INTERRUPTING)

                handlePossibleCancellationInterrupt(s);

        }

    }

protected void set(V v) {

        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

            outcome = v;

            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state

            finishCompletion();

        }

    }

private void finishCompletion() {

        // assert state > COMPLETING;

        for (WaitNode q; (q = waiters) != null;) {

            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {

                for (;;) {

                    Thread t = q.thread;

                    if (t != null) {

                        q.thread = null;

                        LockSupport.unpark(t);

                    }

                    WaitNode next = q.next;

                    if (next == null)

                        break;

                    q.next = null; // unlink to help gc

                    q = next;

                }

                break;

            }

        }

 

 

        done();//這裡會回調done()方法

 

 

        callable = null;        // to reduce footprint

    }

在done()裡面會再次判斷是否發送過消息,如果沒有,再次發送:

private void postResultIfNotInvoked(Result result) {

        final boolean wasTaskInvoked = mTaskInvoked.get();//標志位

        if (!wasTaskInvoked) {

            postResult(result);

        }

    }

 

 

下面看下message的發送部分:

private Result postResult(Result result) {//在這裡發消息

        @SuppressWarnings("unchecked")

        Message message = sHandler.obtainMessage(MESSAGE_POST_RESULT,//sHandler?

                new AsyncTaskResult<Result>(this, result));

        message.sendToTarget();

        return result;

    }

private static final InternalHandler sHandler = new InternalHandler();

 

 

private static class InternalHandler extends Handler {

        @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})

        @Override

        public void handleMessage(Message msg) {

            AsyncTaskResult result = (AsyncTaskResult) msg.obj;

            switch (msg.what) {

                case MESSAGE_POST_RESULT:

                    // There is only one result

                    result.mTask.finish(result.mData[0]);

                    break;

                case MESSAGE_POST_PROGRESS:

                    result.mTask.onProgressUpdate(result.mData);

                    break;

            }

        }

    }

收到消息以後,如果是成功的,調用:

result.mTask.finish(result.mData[0]);

執行瞭mTask的finisk(),同時把執行結果傳遞進去

private void finish(Result result) {

        if (isCancelled()) {

            onCancelled(result);

        } else {

            onPostExecute(result);

        }

        mStatus = Status.FINISHED;

    }

執行: onPostExecute(result);也就是我們在子類需要重寫的方法,在這裡就可以更新UI瞭。

 

 

從上面可以看出來個問題:

AsynTask內部是用一個線程池在執行任務,但是任務執行完畢以後,並沒有提供一個關閉線程池的操作,因此,線程會一直存在。

如果想在任務結束以後把線程池一道關閉掉,可以調用兩個參數的構造函數,把手動構造的Executor傳遞進去。

參考:https://stackoverflow.com/questions/8792799/android-doesnt-terminate-asynctask-thread-after-onpostexecution

而且官方文檔裡面說:

AsyncTask is designed to be a helper class around Thread and Handler and does not constitute a generic threading framework. 

AsyncTasks should ideally be used for short operations (a few seconds at the most.) 

If you need to keep threads running for long periods of time, 

it is highly recommended you use the various APIs provided by the java.util.concurrent pacakge such as Executor, ThreadPoolExecutor and FutureTask.

AsyncTask隻是一個幫助類,並不是一個通用的線程框架,它隻適合執行那些時間比較短的任務,如果要執行長時間的任務,

最好還是用java.util.concurrent這個包下面提供的類手動來做。

發佈留言