android網絡業務的封裝與調度 – Android移動開發技術文章_手機開發 Android移動開發教學課程

 

手機客戶端程序由於網絡寬帶的約束,尤其在GPRS網絡環境下,大數據量的網絡交互很大程度上降低應用的響應,影響用戶體驗。比如,如果做一個手機網盤客戶端,在後臺上傳文件時(大數據量的交互),獲取文件列表(命令類的交互)這個過程就顯得太別慢。而我們的要求是希望這些命令類操作能盡快得到響應。

 

通常,在手機客戶端,我們設計一個網絡操作的管理器,來統一管理這些需要聯網的操作。

具體做法是把網絡操作封裝成一個Command(或者說是Task),管理器實現特定的調度規則來調度運行這些Task。

這樣做的好處至少有三:

一. 用Command封裝瞭網絡操作,使得這些操作與上傳的業務分離,解除瞭強耦合。

二. 可以根據網絡情況來確定來采用不同的調度規則,提高用戶體驗。

三. 重用,這些Task和TaskManager的代碼在別的手機應用上基本上能照搬過去。

四. 擴展,當應用需要擴展新的業務時,隻有擴展一個新的Command(或者說是Task),接受調度即可,易於擴展。

 

例子:

還是以上文提到的微盤為例,可以概括我們對管理器的設計要求有:

在Wifi網絡環境下:

一:各種網絡操作可以並行運行。

在GPRS網絡環境下:

二:支持優先級搶占調度,命令類操作的優先級比數據傳輸類的優先級高,當命令類的Task(獲取文件列表)提交後,打斷數據傳輸的Task(如上傳,下載),等命令類的任務運行完畢,再接著運行數據類任務(斷點上傳,下載)。

二:同一個優先級的任務可以並行運行,如多個命令一起在網絡上傳輸。

 

實現思路:

TaskManager

1. TaskManager開辟一個後臺線程進行調度工作。

2. 由於要支持多個優先級的搶占調度,我們需要兩個隊列來維護運行中的Task和等待中的Task。

3. 由於Task的調度是基於優先級的,我們可以使用優先級隊列,運行隊列采用PriorityQueue,等待隊列使用PriorityBlockingQueue,當沒有網絡業務需要運行時,調度線程阻塞掛起,避免空轉。

4. TaskManager設計為單一實例(單一模式)。

5. 每個Task被調度運行時,該Task被從等待隊列移動運行隊列,當Task執行完畢時,從運行隊列刪除,喚醒調度線程進行新的調度。

下面是簡單的設計代碼

www.aiwalls.com

 

public final class TaskEngine implements Runnable{

 

private PriorityQueue<Task> runningQueue;//運行的task隊列

 

private PriorityBlockingQueue<Task> readyQueue;//就緒的task隊列,準備接受調度的task列表

 

private final AtomicLong taskIdProducer = new AtomicLong(1);//Task Id生成器

 

private Object sheduleLock = new Object();//同步鎖

 

private static TaskEngine instance;

 

public long addTask(BusinessObject bo){

  Task task = new Task(bo);

  long newTaskId = taskIdProducer.incrementAndGet();

  task.setTaskId(newTaskId);

  if(this.isWifiNetWork()){ //WIFI網絡

  synchronized(sheduleLock){

  runningQueue.add(task);

  }

  new Thread(task).start();

  }else{ //GPRS網絡

  if(readyQueue.offer(task)){ //task入就緒隊列

  final ReentrantLock lock = this.lock;

  lock.lock();

  try{

  needSchedule.signal(); //喚醒調度線程重新調度

  }finally{

  lock.unlock();}

  }

}

return newTaskId;

}

 

public final void run(){//task調度邏輯

….

….

}

 

//掛起調度線程 當不需要調度時

private void waitUntilNeedSchedule() throws InterruptedException

{

…..

}

}

 

Task:

 

1. 對要執行的網絡操作的封裝。

 

2. Task執行完畢時,發Task結束信號,喚醒調度線程進行新的調度

 

3. Task要實現Comparable接口,才能讓TaskManager的兩個隊列自動其包含的Task排序。

 

下面是簡單的設計代碼:

 

public class Task implements Runnable,Comparable<Task>{

 

private long taskId;

 

private BusinessObject bo;//封裝網絡操作的業務對象的抽象父類,

//它封裝瞭具體的Command,保證瞭業務擴展中Task的接口不變

 

@Override

public void run() {

  this.onTaskStart();

  this.bo.execute();

  this.onTaskEnd();

}

 

private voidonTaskStart()

{…}

 

public int getPriority()

{…}

 

public void setPriority(intpriority)

{…}

 

@Override

public int compareTo(Task object1) {

return this.getPriority()>object1.getPriority()?-1:1;

}

} 

 

 

 

小註意事項:

 

Android對PriorityQueue的實現和Jdk中的實現有點不一樣。

 

(PriorityQueue.java在android中的代碼路徑是usr\dalvik\libcore\luni\src\main\java\java\util)

 

PriorityQueue.remove(Object o) ;//從PriorityQueue中刪除一個元素。

 

對於完成這個刪除操作android和jdk都是分兩個過程實現,一,找出待刪除元素的索引index,二,刪除index所在元素。

 

在JDK中,是通過調用元素的equals方法來找到待刪除元素的索引,

 

private int indexOf(Object o) {

if (o != null) {

for (int i = 0; i < size; i++)

if (o.equals(queue[i]))

return i;

}

return -1;

}

 

在android中,是間接調用元素的compareTo方法判斷結果是否為0來找到待刪除元素的索引,

 

int targetIndex;

for (targetIndex = 0; targetIndex < size; targetIndex++) {

if (0 == this.compare((E) o, elements[targetIndex])) {

break;

}

}

 

private int compare(E o1, E o2) {

if (null != comparator) {

return comparator.compare(o1, o2);

}

return ((Comparable<? super E>) o1).compareTo(o2);

}

 

所以為瞭Task能在執行完畢時從PriorityQueue找到這個Task並刪除之,需要在compareTo方法裡在優先級相等時

 

返回0。

 

@Override

public int compareTo(Task object1) {

if(this.getPriority()==object1.getPriority())

return 0;

return this.getPriority()>object1.getPriority()?-1:1;

}

 

當是這樣運行PriorityQueue.remove(Object o) 邏輯上隻能刪除PriorityQueue裡第一個優先級與被刪除的元素

 

優先級相等的元素(可能是待刪除的元素也可能不是),有誤刪的可能,需要做如下修改:

 

@Override

public int compareTo(Task object1) {

if(this.getPriority()==object1.getPriority() && this.equals(object1))

return 0;

return this.getPriority()>object1.getPriority()?-1:1;

}

 

這樣才能正確執行remove(Object o),刪除指定的對象o。

 

 

 

個人覺得android這樣設計使得remove(Object o)復雜化瞭,不然JDK中那麼簡潔。語義上也不是那麼好懂瞭,

 

因為按通常理解,equals是判斷兩個對象是否相等,compareTo可能是對象的某個屬性的比較(類別數據庫中的order by),

 

而現在執行PriorityQueue.remove(Object o),這個對象o明明在容器PriorityQueue中,卻刪除不瞭,除非去翻看android中PriorityQueue的實現代碼,然後重寫compareTo這個方法。這樣的API使用時比較容易出錯,應該不符號良好的API設計規范

 

吧。

 

完整的代碼實現如下:

 

View Code

  1 * @類名:TaskEngine

  2  * @創建:baiyingjun (devXiaobai@gmail.com)

  3  * @創建日期:2011-7-7

  4  * @說明:task調度引擎

  5  ***************************************************/

  6 public final class TaskEngine implements Runnable{

  7

  8     private static final String TAG=Log.makeTag(TaskEngine.class);

  9    

 10     private PriorityQueue<Task> runningQueue;//運行的task隊列

 11    

 12     private PriorityBlockingQueue<Task> readyQueue;//就緒的task隊列,準備接受調度的task列表

 13    

 14     private final AtomicLong taskIdProducer = new AtomicLong(1);

 15    

 16     private Object sheduleLock = new Object();//調度鎖

 17    

 18     private static TaskEngine instance;

 19    

 20     private final ReentrantLock lock = new ReentrantLock(true);

 21    

 22     private final Condition needSchedule = lock.newCondition();

 23    

 24     private Task currentTask;//準備接受調度的task

 25    

 26     private Context mAppContext;

 27    

 28     /**

 29      * add BusinessObject to taskEngine

 30 */

 31     public long addTask(BusinessObject bo) throws NetworkNotConnectException{

 32         Task task = new Task(bo);

 33         long newTaskId = taskIdProducer.incrementAndGet();

 34         task.setTaskId(newTaskId);

 35         if(Log.DBG){

 36             Log.d(TAG, "Add task with task id "+newTaskId+", priority "+task.getPriority());

 37         }

 38        

 39         if(this.isWifiNetWork()){ //WIFI網絡

 40             synchronized(sheduleLock){

 41                 runningQueue.add(task);

 42             }

 43             new Thread(task).start();

 44         }else{ //GPRS網絡

 45             if(readyQueue.offer(task)){ //task入就緒隊列

 46                 if(Log.DBG)

 47                     Log.d(TAG, "add task " +task.bo.methodName+" "+task.taskId+" to ready queue");

 48                 final ReentrantLock lock = this.lock;

 49                 lock.lock();

 50                 try{

 51                     needSchedule.signal(); //喚醒調度線程重新調度

 52                 }finally{

 53                     lock.unlock();

 54                 }

 55             }

 56             //schedule();

 57         }

 58         return newTaskId;

 59     }

 60    

 61     private TaskEngine(Context context){

 62         mAppContext = context;

 63         runningQueue = new PriorityQueue<Task>();

 64         readyQueue = new PriorityBlockingQueue<Task>();

 65         new Thread(this).start();

 66         Log.i(TAG, "shedule thread working");

 67     }

 68    

 69     public synchronized static TaskEngine getInstance(Context context){

 70         Context appContext = context.getApplicationContext();

 71         if(instance==null || instance.mAppContext!=appContext){

 72             instance=new TaskEngine(appContext);

 73         }

 74         return instance;

 75     }

 76    

 77     protected boolean isWifiNetWork() throws NetworkNotConnectException{

 78         return NetworkManager.isWIFINetWork(mAppContext);

 79     }

 80    

 81     /**

 82      * task調度邏輯

 83 */

 84     public final void run(){

 85         Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);

 86         while(true){

 87             try {

 88                 if(this.isWifiNetWork()){

 89                     Task task = this.readyQueue.take();

 90                     if(task !=null){

 91                         synchronized(sheduleLock){

 92                             runningQueue.add(task);

 93                         }

 94                         new Thread(task).start();

 95                     }

 96                 }

 97                 else{//非wifi網絡

 98 //空就緒隊列,空運行隊列,等待直到有任務到來

 99                     if(this.readyQueue.size()==0 && runningQueue.size()==0){

100                         currentTask=readyQueue.take();

101                         synchronized(sheduleLock){

102                             runningQueue.add(currentTask);

103                         }

104                         new Thread(currentTask).start();

105                     }

106                     //搶占式調度(就緒隊列非空,運行隊列優先級比就緒隊列優先級低)

107                     else if(readyQueue.size()>0 &&

108                             this.readyQueue.element().getPriority()>=this.getMaxPriority()){

109                         currentTask = readyQueue.take();

110                         if(currentTask.getPriority()>this.getMaxPriority()){//暫停低優先級的任務運行

111                             synchronized(sheduleLock){

112                                 for(int i=0;i<runningQueue.size();i++){

113                                     Task toStopTask =runningQueue.remove();

114                                     //因為任務調度,將低優先級的任務暫時給凍結起來

115                                     toStopTask.setState(Task.STATE_FROST);

116                                     readyQueue.add(toStopTask);

117                                 }

118                             }

119                         }

120                         //運行被調度的任務

121                         runningQueue.add(currentTask);

122                         new Thread(currentTask).start();

123                     }else {//等高優先級的任務運行完畢

124                         waitUntilNeedSchedule();

125                     }

126                 }

127            

128             }catch (InterruptedException e) {

129                 Log.e(TAG, "Schedule error "+e.getMessage());

130             } catch (NetworkNotConnectException e) {

131                 // TODO Auto-generated catch block

132                 e.printStackTrace();

133             }

134         }

135     }

136

137     /*

138      * 等待,直到就緒隊列裡的最高優先級比當前運行優先級高,或者就緒隊列為空時等待運行隊列運行完畢

139 */

140     private void waitUntilNeedSchedule() throws InterruptedException{

141          final ReentrantLock lock = this.lock;

142          lock.lockInterruptibly();

143             try {

144                 try{

145                     while ((readyQueue.size()>0

146                             && readyQueue.element().getPriority()<getMaxPriority())//等高優先級的任務運行完畢

147                             || (readyQueue.size()==0 && runningQueue.size()>0) ){//或者等運行隊列運行完畢

148                         if(Log.DBG)

149                             Log.d(TAG, "waiting sheduling……..");

150                         needSchedule.await();

151                     }

152                 }

153                 catch (InterruptedException ie) {

154                     needSchedule.signal(); // propagate to non-interrupted thread

155                     throw ie;

156                 }

157             } finally {

158                 lock.unlock();

159             }

160     }

161    

162     /**

163      * Hand the specified task ,such as pause,delete and so on

164 */

165     public boolean handTask(long taskId,int handType) {

166         Log.i(TAG, "set task`s state with taskId "+taskId);

167         synchronized(this.sheduleLock){

168             //如果在運行隊列裡,取消該任務

169             Iterator<Task> runningItor= this.runningQueue.iterator();

170             while(runningItor.hasNext()){

171                 Task task = runningItor.next();

172                 boolean b = task.equals(this);

173                 if(task.getTaskId()==taskId){

174                     runningQueue.remove(task);

175                     task.setState(handType);

176                     Log.i(TAG, "set runningQueue taskId = "+taskId + " state " + handType);

177                     return true;

178                 }

179             }

180             //如果在就緒隊列裡,刪除

181             Iterator<Task> readyItor= this.readyQueue.iterator();

182             while(readyItor.hasNext()){

183                 Task task = readyItor.next();

184                 if(task.getTaskId()==taskId){

185 //                    readyQueue.remove(task);

186                     task.setState(handType);

187                     Log.i(TAG, "set readyQueue taskId = "+taskId + " state " + handType);

188                     return true;

189                 }

190             }

191             return false;

192         }

193     }

194     /***

195      * 獲取運行隊列任務的最高優先級

196 */

197     private int getMaxPriority(){

198         if(this.runningQueue==null || this.runningQueue.size()==0)

199             return -1;

200         else{

201             return this.runningQueue.element().getPriority();

202         }

203     }

204        

205     /***************************************************

206      * @類名:Task

207      * @創建:baiyingjun (devXiaobai@gmail.com)

208      * @創建日期:2011-7-7

209      * @說明:業務對象的包裝成可運行實體

210      ***************************************************/

211     public class Task implements Runnable,Comparable<Task>{

212

213         //運行

214         public static final int STATE_INIT = -1;

215         //運行

216         public static final int STATE_RUN = 0;

217         //停止

218         public static final int STATE_STOP = 1;

219         //暫停

220         public static final int STATE_PAUSE = 2;

221         //取消

222         public static final int STATE_CANCLE = 3;

223         //凍結,如果在GPRS下,因為線程調度的時候低優先級的被放readyqueue裡的時候,要把這個任務暫時給“凍結”起來

224         public static final int STATE_FROST = 4;

225        

226         private long taskId;

227        

228         private BusinessObject bo;

229        

230         public Task(){

231            

232         }

233        

234         public Task(BusinessObject bo){

235             this.bo=bo;

236         }

237        

238         @Override

239         public void run() {

240             this.onTaskStart();

241             this.bo.execute();

242             this.onTaskEnd();

243         }

244

245         private void onTaskStart(){

246             this.bo.setmState(STATE_RUN);

247         }

248        

249         public long getTaskId() {

250             return taskId;

251         }

252

253         public void setTaskId(long taskId) {

254             this.taskId = taskId;

255         }

256

257         public int getPriority() {

258             return this.bo.getPriority();

259         }

260

261         public void setPriority(int priority) {

262             this.bo.setPriority(priority);

263         }

264

265         /*

266          * compare task priority

267 */

268         @Override

269         public int compareTo(Task object1) {

270             if(this.getPriority()==object1.getPriority()&& this.equals(object1))

271                 return 0;

272             return this.getPriority()>object1.getPriority()?-1:1;

273         }

274        

275         public void setState(int state){//設置當前運行的task的state

276             this.bo.setmState(state);

277             Log.d(TAG, "Set task "+this.bo.methodName+" "+this.taskId + " state " + state);

278         }

279        

280         private void onTaskEnd(){//運行完畢後從taskengine運行隊列裡刪除

281             if(Log.DBG){

282                 Log.d(TAG, "task "+this.bo.methodName+" "+taskId+" End");

283             }

284             if(this.bo.getmState() == STATE_FROST)//因為調度停止瞭該業務

285                 return;

286            

287             final ReentrantLock lock = TaskEngine.this.lock;

288             lock.lock();

289             try{

290                 boolean removed = runningQueue.remove(this); //remove from running queue

291                 assert removed;

292                 if(Log.DBG)

293                     Log.d(TAG, this.bo.methodName+" "+this.taskId+" remove from runningQueue");

294                 needSchedule.signal(); //喚醒調度線程重新調度

295             }finally{

296                 lock.unlock();

297             }

298         }

299

300         @Override

301         public boolean equals(Object o) {

302             // TODO Auto-generated method stub

303             if(this==o){

304                 return true;

305             }

306             if(o instanceof Task){

307                 return taskId==((Task)o).taskId;

308             }

309             return false;

310         }

311     }

312    

313 }完

 

拾漏補遺:

1.補充最初的設計類圖(可能與代碼不太一致,但能說明問題)

2. BusinessObject的實現代碼:

 

 1 /***************************************************

 2  * @類名:BusinessObject

 3  * @創建:baiyingjun (devXiaobai@gmail.com)

 4  * @創建日期:2011-7-6

 5  * @說明:抽象的業務,擴展的網絡業務要繼承這個類並實現抽象方法execute()

 6  ***************************************************/

 7 public abstract class BusinessObject {

 8    

 9     public static final int LOWEST_PRIORITY=1;//最低優先級

10    

11     public static final int LOW_PRIORITY=2;//低優先級

12    

13     public static final int NORMAL_PRIORITY=3;//正常優先級

14    

15     public static final int HIGH_PRIORITY=4;//高優先級

16    

17     public static final int HIGHEST_PRIORITY=5;//最高優先級

18    

19     protected BusinessListener listnener;//運行業務的回調

20    

21     protected Context mContext;

22    

23     private long taskId;

24    

25     protected Map<String,Object> params;//運行業務需要的參數

26    

27     private int priority;

28    

29     public int getPriority() {

30         return priority;

31     }

32

33     public void setPriority(int priority) {

34         this.priority = priority;

35     }

36

37     //設置回調

38     public void addBusinessListener(BusinessListener listnener){

39         this.listnener=listnener;

40     }

41    

42     public long doBusiness() throws NetworkNotConnectException{

43         taskId= TaskEngine.getInstance(mContext).addTask(this);

44         return taskId;

45     }

46    

47     public abstract void execute();

48 }

作者 白 英軍

發佈留言