2025-03-25

個人感覺,Dispatcher和RealCall算是OkHttp中兩個很重要且比較好理解的部分瞭。RealCall繼承於Call,主要是用於執行我們的請求,當我們調用client.newCall(request)的時候就會生成一個RealCall實例,我們用它進行同步或異步請求。Dispatcher中主要是對我們的所有請求進行管理,方便我們執行一些類似於cancelAll()這種取消所有請求的操作。

第一次寫源碼分析,希望自己能夠解釋清楚吧…

一 RealCall

Call接口

RealCall實現瞭Call接口,首先先看下Call接口,主要是對各種方法進行瞭定義。代碼很簡單:

public interface Call {
    //返回初始化這個Call的請求
    Request request();
    //同步執行
    Response execute() throws IOException;
    //異步執行
    void enqueue(Callback responseCallback);
    //取消執行
    void cancel();
    //判斷是否執行
    boolean isExecuted();
    //判斷是否取消
    boolean isCanceled();
    //Call工廠
    interface Factory {
        Call newCall(Request request);
    }
}

RealCall類中實現瞭Call接口,並在其中包含瞭這樣幾個實例變量:

//OkHttpClient實例
private final OkHttpClient client;

//攔截器,從故障中恢復並根據需要進行重定向
private final RetryAndFollowUpInterceptor retryAndFollowUpInterceptor;

// Guarded by this.
private boolean executed;

//原始請求
Request originalRequest;

其中RetryAndFollowUpInterceptor是一個攔截器,主要是在出現故障的時候使用,具體怎麼用…等我看到那一塊源碼後再說…

這些可能接下來的代碼中會包含到所以先寫在這。接下來看一下RealCall中是怎麼實現Call接口中的方法。

execute()

首先我們還是直接貼一下源碼:

@Override public Response execute() throws IOException {
    synchronized (this) {
      //如果已經在執行,則拋出異常
 if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    try {
 //將我們的這個實例添加到dispatcher的runningSyncCalls隊列中
  client.dispatcher().executed(this);
 //獲取網絡請求的結果
  Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } finally {
  //從dispatcher的runningSyncCall中刪除本實例
      client.dispatcher().finished(this);
    }
  }

代碼不難理解,其中之所以要把實例添加到dispatcher的隊列中,是因為這樣方便OkHttp對現在所有的網絡請求進行同一管理,finished()方式就是在隊列中找到該實例並刪除掉。這個之前也說瞭,具體的話後面源碼會分析到。

enqueue()

同樣也是直接上源碼:

@Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
//生成Runnable實例,添加到dispatcher中異步執行
 client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

這個代碼也不難,因為是異步執行,所以我們要生成相應的Runnable實例,放在dispatcher的線程池中執行。這裡的AsyncCall是在RealCall中定義的一個內部類,是一個繼承瞭NameRunnable類的實例,而Runnable類是一個實現瞭Runnable接口的抽象類,裡面主要是添加瞭一個name的變量,用於對當前線程命名…??沒搞懂為什麼要給線程命名,這個先不管瞭。總之AsyncCall是一個實現瞭Runnable接口的類就是瞭。

AsyncCall

關於AsyncCall,就看下源碼:

final class AsyncCall extends NamedRunnable {
    
//自己定義的Callback
private final Callback responseCallback;

    private AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl().toString());
      this.responseCallback = responseCallback;
    }
 
 //返回請求的目標host,在dispatcher中用到
 //dispatcher中要求對同一個主機的請求數不能超過某個值(默認5)
String host() {
      return originalRequest.url().host();
    }

    Request request() {
      return originalRequest;
    }
 //該RealCall實例
 RealCall get() {
      return RealCall.this;
    }
 //重寫Runnable的execute()方法
    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
 //請求失敗,調用Callback中的onFailure()方法
 responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
 //請求成功,調用Callback中的onResponse()方法
  responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
 //用於防止多次調用Callback中的方法
 Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
 //在dispatcher中刪除掉該AsyncCall實例
  client.dispatcher().finished(this);
      }
    }
  }

這裡主要說一下這個retryAndFollowUpInterceptor.isCanceled(),這個主要是返回retryAndFollowUpInterceptor的cancel變量的狀態,這個狀態默認是false的,隻有我們手動cancel瞭retryAndFollowUpInterceptor的時候才會變為true,所以這裡大概可以認為是我們的請求出現故障,且取消瞭重定向的執行的時候,這時候我們的網絡請求是失敗的。

cancel()

@Override public void cancel() {
    retryAndFollowUpInterceptor.cancel();
  }

getResponseWithInterceptorChain()

RealCall中還有另外一個重要的方法就是getResponseWithInterceptorChain()方法,這個的話…得等我以後來補充說明瞭…

private Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!retryAndFollowUpInterceptor.isForWebSocket()) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(
        retryAndFollowUpInterceptor.isForWebSocket()));

    Interceptor.Chain chain = new RealInterceptorChain(
        interceptors, null, null, null, 0, originalRequest);
    return chain.proceed(originalRequest);
  }

總體而言,RealCall是我們真正發起網絡請求的核心,利用其可以實現同步請求,異步請求,取消請求,以及查看請求狀態。

二 Dispatcher

Dispatcher的中文意思是調度器,顧名思義這是用於對各種Call的調度,實際也是如此,在Dispathcer裡面就是管理記錄著所有的RealCall。在Dispatcher中,利用Synchronized來對幾乎所有的方法進行修飾,從而保證線程安全性。因為這裡面大多數的方法主要是進行一些隊列數據的存儲和變量值的修改,並沒有什麼耗時的操作,所以用Synchronized來修飾,簡單實用。先來看下裡面包含的實例變量。

  //線程池中同時存在的最大請求數量
  private int maxRequests = 64;
  //對同一個host的最大請求數量
  private int maxRequestsPerHost = 5;
  //空閑線程,當同步隊列+異步隊列==0的時候調用ideaCallback.run()方法
  private Runnable idleCallback;

  //線程池,懶加載
  private ExecutorService executorService;

  //尚未執行的異步請求隊列,
  private final Deque readyAsyncCalls = new ArrayDeque<>();

  //執行中的異步請求隊列,包括已經調用cancel()但是還未finished的
  private final Deque runningAsyncCalls = new ArrayDeque<>();

  //執行中的同步請求隊列,包括已經調用cancel()但是還未finished的
  private final Deque runningSyncCalls = new ArrayDeque<>();

  //懶加載,通過定義為synchronized方法保證線程安全
  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

在這裡面,idleCallback的作用是在Dispatcher中所有線程都執行結束之後執行,這個應該是用於我們進行自定義,暫時沒看到那裡有調用到其setter方法。

對於maxRequests和maxRequestsPerHost的setter和getter就不講瞭,不過在這兩者裡面的setter方法中,當重新設置值之後,都會調用一個promoteCalls()方法,將等待中的請求添加到runningAysncCalls隊列和線程池中。看一下它的源碼:

  private void promoteCalls() {
    //隊列中線程數量超過最大值,不用增加
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    //沒有在等待中的請求
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
 //對等待請求隊列進行遍歷
    for (Iterator i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();
  //如果與call的目標主機host相同的請求數小於maxRequestsPerHost的數量,就可以添加到隊列中,並將其添加到請求中執行
      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }
  //達到最大值,停止添加
      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }

在promoteCall()中,runningCallsForHost()是用來計算隊列中同一host的請求個數的,這個方法在之後添加異步請求到runningAysncCalls和線程池的方法中還會繼續看到。其中call.host()我們在AsyncCall中已經提到過。源碼如下:

  private int runningCallsForHost(AsyncCall call) {
    int result = 0;
    for (AsyncCall c : runningAsyncCalls) {
      if (c.host().equals(call.host())) result++;
    }
    return result;
  }

接下來看一下幾個將Call添加到隊列中的方法,這些在RealCall中其實已經看到過一次瞭。

executed(RealCall call)

將同步請求添加到runningSyncCalls中

synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }

enqueue(AsyncCall call)

添加異步請求,判斷要添加到哪個隊列中

 synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }

finished

當同步/異步網絡請求結束之後,會將該call從對應的隊列中刪除。

  //異步請求結束
 void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
  }
 //同步請求結束
  void finished(RealCall call) {
    finished(runningSyncCalls, call, false);
  }

在這裡,兩種請求都調用瞭同一個方法,看一下該方法的源碼:

 private  void finished(Deque calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
 //移除失敗,拋出異常
  if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      //當finish的是異步請求時promoteCall為true,調用promoteCalls(),將其他異步請求放入隊列和線程池中
  if (promoteCalls) promoteCalls();
 //計算當前正在運行的Call數量
  runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }
 //當同步隊列+異步隊列==0且idleCallback不為空時調用idleCallback
    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

在源碼中,除瞭在相應隊列中移除掉該call實例之外,對於異步請求的情況還會往隊列和線程池中添加新的call請求,並且,對網絡中已經沒有call請求在執行的情況進行瞭考慮。

cancelAll

對所有的call請求都調用cancel方法,不多解釋瞭。

  public synchronized void cancelAll() {
    //等待中的異步請求
for (AsyncCall call : readyAsyncCalls) {
      call.get().cancel();
    }
 //執行中的異步請求
    for (AsyncCall call : runningAsyncCalls) {
      call.get().cancel();
    }
 //執行中的同步請求
    for (RealCall call : runningSyncCalls) {
      call.cancel();
    }
  }

主要的方法就是這些瞭。總的來說,Dispatcher實現的是對Call的記錄和管理。

這部分大概就講這麼多瞭,這也是okhttp框架中比較容易懂的部分瞭…

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *