Android异步消息处理机制(4)AsyncTask源码解析

上一章我们学习了抽象类AsyncTask的基本使用(地址:http://blog.csdn.net/wangyongge85/article/details/47988569),下面我将以问答的方法分析AsyncTask源码内容,源码版本为:API22。

1. 为什么必须在UI线程实例化我们的AsyncTask,并且必须在主线程中调用execute(Params... params)

在分析为什么在UI线程调用之前,我们先看一下实例化AsyncTask并调用execute(Params... params)都完成了哪些工作。

我们知道实例化对象时,首先会调用父类的构造器,那么我们先看一下AsyncTask的构造器以及涉及到的内容相关源码:


// AsyncTask中的内部抽象类,实现接口Callable(没有实现call方法),封装传入的参数数组
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
    Params[] mParams;
}
private final WorkerRunnable<Params, Result> mWorker;
private final FutureTask<Result> mFuture;
// AsyncTask构造器,必须在UI线程中调用
public AsyncTask() {
    mWorker = new WorkerRunnable<Params, Result>() {
        public Result call() throws Exception {
            // 标记任务是否被执行
            mTaskInvoked.set(true);
            Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
            return postResult(doInBackground(mParams));
        }
    };
    mFuture = new FutureTask<Result>(mWorker) {
        @Override
        protected void done() {
            try {
                postResultIfNotInvoked(get());
            } catch (InterruptedException e) {
                android.util.Log.w(LOG_TAG, e);
            } catch (ExecutionException e) {
                throw new RuntimeException("An error occured while executing doInBackground()",
                        e.getCause());
            } catch (CancellationException e) {
                postResultIfNotInvoked(null);
            }
        }
    };
}

我们看到AsyncTask构造器主要是初始化两个对象:mWorker和mFuture。mWorker实际上是Callable对象,并且带有成员变量mParams(第一个泛型参数数组类型),我们知道Callable接口实现的线程有返回值,即call()方法的返回值postResult(doInBackground(mParams)),这里调用了doInBackground(mParams),注意,目前call()方法还未执行;mFuture是FutureTask对象,使用mWorker进行初始化。FutureTask是Future接口和Runnable接口的实现类,能够控制关联的callable对象即mWorker,同样该对象的任何方法也未开始执行。

在实例化AsyncTask类之后,我们通常调用execute(Params... params)方法,那么自然要看一下`execute(Params... params)相关源码了:


public static final Executor SERIAL_EXECUTOR = new SerialExecutor();

private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;

public final AsyncTask<Params, Progress, Result> execute(Params... params) {
        return executeOnExecutor(sDefaultExecutor, params);
    }

// execute直接调用这个方法
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
            Params... params) {
        // 任务还未被执行
        if (mStatus != Status.PENDING) {
            switch (mStatus) {
                // 正在执行的任务不能再次执行
                case RUNNING:
                    throw new IllegalStateException("Cannot execute task:"
                            + " the task is already running.");
                // 已经完成的任务不能再次执行,这也是为什么一个对象只能执行一次的原因
                case FINISHED:
                    throw new IllegalStateException("Cannot execute task:"
                            + " the task has already been executed "
                            + "(a task can be executed only once)");
            }
        }
        // 标记任务进入运行状态
        mStatus = Status.RUNNING;
        // 将调用我们覆盖的方法
        onPreExecute();
        // 传递参数
        mWorker.mParams = params;
        // 开始执行mFuture
        exec.execute(mFuture);
        return this;
    }

execute()直接调用executeOnExecutor(Executor exec,Params... params),该方法第一个参数是SerialExecutor对象sDefaultExecutor,第二个参数是我们定义的Params,那么我们就来详细分析一下该方法。首先判断任务是否第一次执行,若不是第一次就会抛出异常(注意case块没有break语句);否则就会向下执行。首先标记任务进入执行状态;接着调用我们覆盖的onPreExecute()方法,这也说明了为什么这是第一个开始执行的方法了,同时我们知道onPreExecute()常用来初始化UI,必须在主线程中调用,因此execute()必须在主线程中调用。并且我们观察到目前仍然在主线程中;最后会执行SerialExecutor对象sDefaultExecutor的execute(mFuture)。那么该方法又会做哪些事情呢?我们进入该方法一探究竟:


private static class SerialExecutor implements Executor {
    // 双端队列保存AsyncTask任务
    final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
    Runnable mActive;

    public synchronized void execute(final Runnable r) {
        mTasks.offer(new Runnable() {
            public void run() {
                try {
                    // 在这里mFuture将会调用mWorker的call()方法
                    r.run();
                } finally {
                    // 在执行完前一个任务时执行下一个任务,实现任务的串行执行
                    scheduleNext();
                }
            }
        });
        // 首次实例化AsyncTask执行该方法,
        if (mActive == null) {
            scheduleNext();
        }
    }

    protected synchronized void scheduleNext() {
        if ((mActive = mTasks.poll()) != null) {
            THREAD_POOL_EXECUTOR.execute(mActive);
        }
    }
}

静态内部类SerialExecutor的execute()方法依次执行的内容是:首先调用mTask的offer()添加Runnable实例,在该实例的run()方法中调用传递过来的参数mFuture的run()方法,同时finally块中执行scheduleNext(),这一步就完成了向队列mTask中添加Runnable实例;接着判断mActive是否为null,第一次当然为null,因此会执行scheduleNext()scheduleNext()也是SerialExecutor类中的方法,该方法会执行THREAD_POOL_EXECUTOR.execute(mActive),其中mActive是从队列中取出来的Runnable对象。THREAD_POOL_EXECUTOR是线程池对象,使用线程池中的线程执行mActive。在mActive中就会执行刚才放进去的mFuture的run()方法,该方法的代码如下:


public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    // 这里执行call()方法
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            runner = null;
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

该方法实际上调用Callable对象的call(),那么这个Callable对象又是什么呢?还记得在AsyncTask中实例化的mWorker对象么。。。没错这个执行的就是mWorker对象的call()方法,终于绕过来了!!!!而在call()方法中会执行doInBackground(mParams),因此该方法是在后台线程执行的方法。

sDefaultExecutor是静态成员变量,每次创建AsyncTask对象都会添加到该变量中的mTasks队列中,而且AsyncTask对象是串行执行(实现原理是通过try/finnally块实现的,代码如上SerialExecutor的execute())。

到目前为止仍然未解决为什么实例化AsyncTask需要在UI线程中执行。。。而且目前还不知道怎么将doInBackground(mParams)结果返回,那么我们先看看这个问题能不能解决为什么在UI线程中的问题。。

2.如何将doInBackground(mParams)结果返回?

我们知道在AsyncTask中实例化的mWorker对象的call()方法会返回postResult(doInBackground(mParams)),那么我们来看一下postResult()源码以及涉及到的其它源码:


// post()方法
private Result postResult(Result result) {
        @SuppressWarnings("unchecked")
        Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
                new AsyncTaskResult<Result>(this, result));
        message.sendToTarget();
        return result;
    }

// getHandler()方法
private static Handler getHandler() {
        synchronized (AsyncTask.class) {
            if (sHandler == null) {
                sHandler = new InternalHandler();
            }
            return sHandler;
        }
    }

// 静态内部类InternalHandler
private static class InternalHandler extends Handler {
        public InternalHandler() {
            super(Looper.getMainLooper());
        }

        @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
        @Override
        public void handleMessage(Message msg) {
            AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
            switch (msg.what) {
                case MESSAGE_POST_RESULT:
                    // 返回结果数组中的一个
                    result.mTask.finish(result.mData[0]);
                    break;
                case MESSAGE_POST_PROGRESS:
                    result.mTask.onProgressUpdate(result.mData);
                    break;
            }
        }
    }
// 静态内部类AsyncTaskResult
private static class AsyncTaskResult<Data> {
        final AsyncTask mTask;
        final Data[] mData;

        AsyncTaskResult(AsyncTask task, Data... data) {
            mTask = task;
            mData = data;
        }
    }

上面一共四部分,第一部分是postResult()源码,该方法中首先调用了getHandler()(源码在第二部分),返回一个InternalHandler对象(源码在第三部分)。而InternalHandler对象就是一个Handler对象(若对Handler机制不了解的话建议参考我之前的文章:Android异步消息处理机制(1)Handler基本使用

Android异步消息处理机制(2)源码解析),调用该对象的obtainMessage()返回一个Message对象,然后通过Message对象的sendToTarget()将消息发送到消息队列中。然后会在Handler对象的handleMessage()中处理消息。

第三部分给出了InternalHandler类源码,发送的消息将在该类的handleMessage()中处理。在该方法中首先获得AsyncTaskResult对象(源码在第四部分),AsyncTaskResult对象用于保存当前AsyncTask对象和postResult()传递的对象,而这里传递的正是doInBackground()返回结果,这样我们就将结果添加到Message对象中。

可以看到AsyncTask也是通过Handler机制实现异步处理:在子线程中处理完任务后将执行结果添加到Message中,然后在主线程中通过Looper对象取出该Message,通过对应的Handler对象的handleMessage()进行处理。

因为在postResult()中我们设置的Message的what为MESSAGE_POST_RESULT,因此在handleMessage()将会执行第一个case块:result.mTask.finish(result.mData[0]),在finish()中只接收结果数组中的第一个值。那么我们继续看一下finish()源码:


private void finish(Result result) {
        if (isCancelled()) {
            onCancelled(result);
        } else {
            onPostExecute(result);
        }
        mStatus = Status.FINISHED;
    }

可以看到若任务没有暂停,就会调用我们覆盖的方法onPostExecute(result)

由于InternalHandler对象的Looper对象是通过Looper.getMainLooper(),即主线程中的Looper对象,因此必须在主线程中实例化我们的AsyncTask。

handleMessage()还有一个case块,那么这个何时这执行呢?

3.调用publishProgress()如何实现向主线程中发送任务进度?

我们知道在doInBackground()中调用publishProgress()时就会将当前任务进度发送给主线程,那么我们来看一下该方法的源码:


protected final void publishProgress(Progress... values) {
        if (!isCancelled()) {
            getHandler().obtainMessage(MESSAGE_POST_PROGRESS,
                    new AsyncTaskResult<Progress>(this, values)).sendToTarget();
        }
    }

是不是似曾相识?这个方法和postResult()方法一样,都是向消息队列中发送消息,该消息的what属性为MESSAGE_POST_PROGRESS,因此会执行handleMessage()的第二个case块,即执行我们覆盖的onProgressUpdate(),该方法就可以更新UI了。

4.如何并行执行任务?

我们知道通过SerialExecutor对象只能顺序(串行)执行任务,那么如何实现并行执行呢?我们知道调用execute(Params... params)实际上执行的是executeOnExecutor(sDefaultExecutor, params),该方法第一个参数默认传递的是SerialExecutor对象。因此,我们可以通过调用executeOnExecutor(Executor, params)方法实现并行执行,前提是我们在该方法中的第一个参数传递一个用于并行执行线程的线程池。该线程池我们可以使用AsyncTask类自带的线程池THREAD_POOL_EXECUTOR,源码如下:


private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = CPU_COUNT + 1;
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE = 1;
private static final BlockingQueue<Runnable> sPoolWorkQueue =
            new LinkedBlockingQueue<Runnable>(128);

public static final Executor THREAD_POOL_EXECUTOR
            = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
                    TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);

该线程池中最大线程个数为128,当然我们也可以使用自定义的线程池。

Android3.0之后,默认情况下,AsyncTask同时只能执行一个任务,我们可以灵活地改变默认配置,使得AsyncTask可以并行执行多任务。

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 08-25