手機客戶端程序由於網絡寬帶的約束,尤其在GPRS網絡環境下,大數據量的網絡交互很大程度上降低應用的響應,影響用戶體驗。比如,如果做一個手機網盤客戶端,在後臺上傳文件時(大數據量的交互),獲取文件列表(命令類的交互)這個過程就顯得太別慢。而我們的要求是希望這些命令類操作能盡快得到響應。
通常,在手機客戶端,我們設計一個網絡操作的管理器,來統一管理這些需要聯網的操作。
具體做法是把網絡操作封裝成一個Command(或者說是Task),管理器實現特定的調度規則來調度運行這些Task。
這樣做的好處至少有三:
一. 用Command封裝瞭網絡操作,使得這些操作與上傳的業務分離,解除瞭強耦合。
二. 可以根據網絡情況來確定來采用不同的調度規則,提高用戶體驗。
三. 重用,這些Task和TaskManager的代碼在別的手機應用上基本上能照搬過去。
四. 擴展,當應用需要擴展新的業務時,隻有擴展一個新的Command(或者說是Task),接受調度即可,易於擴展。
例子:
還是以上文提到的微盤為例,可以概括我們對管理器的設計要求有:
在Wifi網絡環境下:
一:各種網絡操作可以並行運行。
在GPRS網絡環境下:
二:支持優先級搶占調度,命令類操作的優先級比數據傳輸類的優先級高,當命令類的Task(獲取文件列表)提交後,打斷數據傳輸的Task(如上傳,下載),等命令類的任務運行完畢,再接著運行數據類任務(斷點上傳,下載)。
二:同一個優先級的任務可以並行運行,如多個命令一起在網絡上傳輸。
實現思路:
TaskManager :
1. TaskManager開辟一個後臺線程進行調度工作。
2. 由於要支持多個優先級的搶占調度,我們需要兩個隊列來維護運行中的Task和等待中的Task。
3. 由於Task的調度是基於優先級的,我們可以使用優先級隊列,運行隊列采用PriorityQueue,等待隊列使用PriorityBlockingQueue,當沒有網絡業務需要運行時,調度線程阻塞掛起,避免空轉。
4. TaskManager設計為單一實例(單一模式)。
5. 每個Task被調度運行時,該Task被從等待隊列移動運行隊列,當Task執行完畢時,從運行隊列刪除,喚醒調度線程進行新的調度。
下面是簡單的設計代碼:
www.aiwalls.com
public final class TaskEngine implements Runnable{
private PriorityQueue<Task> runningQueue;//運行的task隊列
private PriorityBlockingQueue<Task> readyQueue;//就緒的task隊列,準備接受調度的task列表
private final AtomicLong taskIdProducer = new AtomicLong(1);//Task Id生成器
private Object sheduleLock = new Object();//同步鎖
private static TaskEngine instance;
public long addTask(BusinessObject bo){
Task task = new Task(bo);
long newTaskId = taskIdProducer.incrementAndGet();
task.setTaskId(newTaskId);
if(this.isWifiNetWork()){ //WIFI網絡
synchronized(sheduleLock){
runningQueue.add(task);
}
new Thread(task).start();
}else{ //GPRS網絡
if(readyQueue.offer(task)){ //task入就緒隊列
final ReentrantLock lock = this.lock;
lock.lock();
try{
needSchedule.signal(); //喚醒調度線程重新調度
}finally{
lock.unlock();}
}
}
return newTaskId;
}
public final void run(){//task調度邏輯
….
….
}
//掛起調度線程 當不需要調度時
private void waitUntilNeedSchedule() throws InterruptedException
{
…..
}
}
Task:
1. 對要執行的網絡操作的封裝。
2. Task執行完畢時,發Task結束信號,喚醒調度線程進行新的調度
3. Task要實現Comparable接口,才能讓TaskManager的兩個隊列自動其包含的Task排序。
下面是簡單的設計代碼:
public class Task implements Runnable,Comparable<Task>{
private long taskId;
private BusinessObject bo;//封裝網絡操作的業務對象的抽象父類,
//它封裝瞭具體的Command,保證瞭業務擴展中Task的接口不變
@Override
public void run() {
this.onTaskStart();
this.bo.execute();
this.onTaskEnd();
}
private voidonTaskStart()
{…}
public int getPriority()
{…}
public void setPriority(intpriority)
{…}
@Override
public int compareTo(Task object1) {
return this.getPriority()>object1.getPriority()?-1:1;
}
}
小註意事項:
Android對PriorityQueue的實現和Jdk中的實現有點不一樣。
(PriorityQueue.java在android中的代碼路徑是usr\dalvik\libcore\luni\src\main\java\java\util)
PriorityQueue.remove(Object o) ;//從PriorityQueue中刪除一個元素。
對於完成這個刪除操作android和jdk都是分兩個過程實現,一,找出待刪除元素的索引index,二,刪除index所在元素。
在JDK中,是通過調用元素的equals方法來找到待刪除元素的索引,
private int indexOf(Object o) {
if (o != null) {
for (int i = 0; i < size; i++)
if (o.equals(queue[i]))
return i;
}
return -1;
}
在android中,是間接調用元素的compareTo方法判斷結果是否為0來找到待刪除元素的索引,
int targetIndex;
for (targetIndex = 0; targetIndex < size; targetIndex++) {
if (0 == this.compare((E) o, elements[targetIndex])) {
break;
}
}
private int compare(E o1, E o2) {
if (null != comparator) {
return comparator.compare(o1, o2);
}
return ((Comparable<? super E>) o1).compareTo(o2);
}
所以為瞭Task能在執行完畢時從PriorityQueue找到這個Task並刪除之,需要在compareTo方法裡在優先級相等時
返回0。
@Override
public int compareTo(Task object1) {
if(this.getPriority()==object1.getPriority())
return 0;
return this.getPriority()>object1.getPriority()?-1:1;
}
當是這樣運行PriorityQueue.remove(Object o) 邏輯上隻能刪除PriorityQueue裡第一個優先級與被刪除的元素
優先級相等的元素(可能是待刪除的元素也可能不是),有誤刪的可能,需要做如下修改:
@Override
public int compareTo(Task object1) {
if(this.getPriority()==object1.getPriority() && this.equals(object1))
return 0;
return this.getPriority()>object1.getPriority()?-1:1;
}
這樣才能正確執行remove(Object o),刪除指定的對象o。
個人覺得android這樣設計使得remove(Object o)復雜化瞭,不然JDK中那麼簡潔。語義上也不是那麼好懂瞭,
因為按通常理解,equals是判斷兩個對象是否相等,compareTo可能是對象的某個屬性的比較(類別數據庫中的order by),
而現在執行PriorityQueue.remove(Object o),這個對象o明明在容器PriorityQueue中,卻刪除不瞭,除非去翻看android中PriorityQueue的實現代碼,然後重寫compareTo這個方法。這樣的API使用時比較容易出錯,應該不符號良好的API設計規范
吧。
完整的代碼實現如下:
View Code
1 * @類名:TaskEngine
2 * @創建:baiyingjun (devXiaobai@gmail.com)
3 * @創建日期:2011-7-7
4 * @說明:task調度引擎
5 ***************************************************/
6 public final class TaskEngine implements Runnable{
7
8 private static final String TAG=Log.makeTag(TaskEngine.class);
9
10 private PriorityQueue<Task> runningQueue;//運行的task隊列
11
12 private PriorityBlockingQueue<Task> readyQueue;//就緒的task隊列,準備接受調度的task列表
13
14 private final AtomicLong taskIdProducer = new AtomicLong(1);
15
16 private Object sheduleLock = new Object();//調度鎖
17
18 private static TaskEngine instance;
19
20 private final ReentrantLock lock = new ReentrantLock(true);
21
22 private final Condition needSchedule = lock.newCondition();
23
24 private Task currentTask;//準備接受調度的task
25
26 private Context mAppContext;
27
28 /**
29 * add BusinessObject to taskEngine
30 */
31 public long addTask(BusinessObject bo) throws NetworkNotConnectException{
32 Task task = new Task(bo);
33 long newTaskId = taskIdProducer.incrementAndGet();
34 task.setTaskId(newTaskId);
35 if(Log.DBG){
36 Log.d(TAG, "Add task with task id "+newTaskId+", priority "+task.getPriority());
37 }
38
39 if(this.isWifiNetWork()){ //WIFI網絡
40 synchronized(sheduleLock){
41 runningQueue.add(task);
42 }
43 new Thread(task).start();
44 }else{ //GPRS網絡
45 if(readyQueue.offer(task)){ //task入就緒隊列
46 if(Log.DBG)
47 Log.d(TAG, "add task " +task.bo.methodName+" "+task.taskId+" to ready queue");
48 final ReentrantLock lock = this.lock;
49 lock.lock();
50 try{
51 needSchedule.signal(); //喚醒調度線程重新調度
52 }finally{
53 lock.unlock();
54 }
55 }
56 //schedule();
57 }
58 return newTaskId;
59 }
60
61 private TaskEngine(Context context){
62 mAppContext = context;
63 runningQueue = new PriorityQueue<Task>();
64 readyQueue = new PriorityBlockingQueue<Task>();
65 new Thread(this).start();
66 Log.i(TAG, "shedule thread working");
67 }
68
69 public synchronized static TaskEngine getInstance(Context context){
70 Context appContext = context.getApplicationContext();
71 if(instance==null || instance.mAppContext!=appContext){
72 instance=new TaskEngine(appContext);
73 }
74 return instance;
75 }
76
77 protected boolean isWifiNetWork() throws NetworkNotConnectException{
78 return NetworkManager.isWIFINetWork(mAppContext);
79 }
80
81 /**
82 * task調度邏輯
83 */
84 public final void run(){
85 Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
86 while(true){
87 try {
88 if(this.isWifiNetWork()){
89 Task task = this.readyQueue.take();
90 if(task !=null){
91 synchronized(sheduleLock){
92 runningQueue.add(task);
93 }
94 new Thread(task).start();
95 }
96 }
97 else{//非wifi網絡
98 //空就緒隊列,空運行隊列,等待直到有任務到來
99 if(this.readyQueue.size()==0 && runningQueue.size()==0){
100 currentTask=readyQueue.take();
101 synchronized(sheduleLock){
102 runningQueue.add(currentTask);
103 }
104 new Thread(currentTask).start();
105 }
106 //搶占式調度(就緒隊列非空,運行隊列優先級比就緒隊列優先級低)
107 else if(readyQueue.size()>0 &&
108 this.readyQueue.element().getPriority()>=this.getMaxPriority()){
109 currentTask = readyQueue.take();
110 if(currentTask.getPriority()>this.getMaxPriority()){//暫停低優先級的任務運行
111 synchronized(sheduleLock){
112 for(int i=0;i<runningQueue.size();i++){
113 Task toStopTask =runningQueue.remove();
114 //因為任務調度,將低優先級的任務暫時給凍結起來
115 toStopTask.setState(Task.STATE_FROST);
116 readyQueue.add(toStopTask);
117 }
118 }
119 }
120 //運行被調度的任務
121 runningQueue.add(currentTask);
122 new Thread(currentTask).start();
123 }else {//等高優先級的任務運行完畢
124 waitUntilNeedSchedule();
125 }
126 }
127
128 }catch (InterruptedException e) {
129 Log.e(TAG, "Schedule error "+e.getMessage());
130 } catch (NetworkNotConnectException e) {
131 // TODO Auto-generated catch block
132 e.printStackTrace();
133 }
134 }
135 }
136
137 /*
138 * 等待,直到就緒隊列裡的最高優先級比當前運行優先級高,或者就緒隊列為空時等待運行隊列運行完畢
139 */
140 private void waitUntilNeedSchedule() throws InterruptedException{
141 final ReentrantLock lock = this.lock;
142 lock.lockInterruptibly();
143 try {
144 try{
145 while ((readyQueue.size()>0
146 && readyQueue.element().getPriority()<getMaxPriority())//等高優先級的任務運行完畢
147 || (readyQueue.size()==0 && runningQueue.size()>0) ){//或者等運行隊列運行完畢
148 if(Log.DBG)
149 Log.d(TAG, "waiting sheduling……..");
150 needSchedule.await();
151 }
152 }
153 catch (InterruptedException ie) {
154 needSchedule.signal(); // propagate to non-interrupted thread
155 throw ie;
156 }
157 } finally {
158 lock.unlock();
159 }
160 }
161
162 /**
163 * Hand the specified task ,such as pause,delete and so on
164 */
165 public boolean handTask(long taskId,int handType) {
166 Log.i(TAG, "set task`s state with taskId "+taskId);
167 synchronized(this.sheduleLock){
168 //如果在運行隊列裡,取消該任務
169 Iterator<Task> runningItor= this.runningQueue.iterator();
170 while(runningItor.hasNext()){
171 Task task = runningItor.next();
172 boolean b = task.equals(this);
173 if(task.getTaskId()==taskId){
174 runningQueue.remove(task);
175 task.setState(handType);
176 Log.i(TAG, "set runningQueue taskId = "+taskId + " state " + handType);
177 return true;
178 }
179 }
180 //如果在就緒隊列裡,刪除
181 Iterator<Task> readyItor= this.readyQueue.iterator();
182 while(readyItor.hasNext()){
183 Task task = readyItor.next();
184 if(task.getTaskId()==taskId){
185 // readyQueue.remove(task);
186 task.setState(handType);
187 Log.i(TAG, "set readyQueue taskId = "+taskId + " state " + handType);
188 return true;
189 }
190 }
191 return false;
192 }
193 }
194 /***
195 * 獲取運行隊列任務的最高優先級
196 */
197 private int getMaxPriority(){
198 if(this.runningQueue==null || this.runningQueue.size()==0)
199 return -1;
200 else{
201 return this.runningQueue.element().getPriority();
202 }
203 }
204
205 /***************************************************
206 * @類名:Task
207 * @創建:baiyingjun (devXiaobai@gmail.com)
208 * @創建日期:2011-7-7
209 * @說明:業務對象的包裝成可運行實體
210 ***************************************************/
211 public class Task implements Runnable,Comparable<Task>{
212
213 //運行
214 public static final int STATE_INIT = -1;
215 //運行
216 public static final int STATE_RUN = 0;
217 //停止
218 public static final int STATE_STOP = 1;
219 //暫停
220 public static final int STATE_PAUSE = 2;
221 //取消
222 public static final int STATE_CANCLE = 3;
223 //凍結,如果在GPRS下,因為線程調度的時候低優先級的被放readyqueue裡的時候,要把這個任務暫時給“凍結”起來
224 public static final int STATE_FROST = 4;
225
226 private long taskId;
227
228 private BusinessObject bo;
229
230 public Task(){
231
232 }
233
234 public Task(BusinessObject bo){
235 this.bo=bo;
236 }
237
238 @Override
239 public void run() {
240 this.onTaskStart();
241 this.bo.execute();
242 this.onTaskEnd();
243 }
244
245 private void onTaskStart(){
246 this.bo.setmState(STATE_RUN);
247 }
248
249 public long getTaskId() {
250 return taskId;
251 }
252
253 public void setTaskId(long taskId) {
254 this.taskId = taskId;
255 }
256
257 public int getPriority() {
258 return this.bo.getPriority();
259 }
260
261 public void setPriority(int priority) {
262 this.bo.setPriority(priority);
263 }
264
265 /*
266 * compare task priority
267 */
268 @Override
269 public int compareTo(Task object1) {
270 if(this.getPriority()==object1.getPriority()&& this.equals(object1))
271 return 0;
272 return this.getPriority()>object1.getPriority()?-1:1;
273 }
274
275 public void setState(int state){//設置當前運行的task的state
276 this.bo.setmState(state);
277 Log.d(TAG, "Set task "+this.bo.methodName+" "+this.taskId + " state " + state);
278 }
279
280 private void onTaskEnd(){//運行完畢後從taskengine運行隊列裡刪除
281 if(Log.DBG){
282 Log.d(TAG, "task "+this.bo.methodName+" "+taskId+" End");
283 }
284 if(this.bo.getmState() == STATE_FROST)//因為調度停止瞭該業務
285 return;
286
287 final ReentrantLock lock = TaskEngine.this.lock;
288 lock.lock();
289 try{
290 boolean removed = runningQueue.remove(this); //remove from running queue
291 assert removed;
292 if(Log.DBG)
293 Log.d(TAG, this.bo.methodName+" "+this.taskId+" remove from runningQueue");
294 needSchedule.signal(); //喚醒調度線程重新調度
295 }finally{
296 lock.unlock();
297 }
298 }
299
300 @Override
301 public boolean equals(Object o) {
302 // TODO Auto-generated method stub
303 if(this==o){
304 return true;
305 }
306 if(o instanceof Task){
307 return taskId==((Task)o).taskId;
308 }
309 return false;
310 }
311 }
312
313 }完
拾漏補遺:
1.補充最初的設計類圖(可能與代碼不太一致,但能說明問題)
2. BusinessObject的實現代碼:
1 /***************************************************
2 * @類名:BusinessObject
3 * @創建:baiyingjun (devXiaobai@gmail.com)
4 * @創建日期:2011-7-6
5 * @說明:抽象的業務,擴展的網絡業務要繼承這個類並實現抽象方法execute()
6 ***************************************************/
7 public abstract class BusinessObject {
8
9 public static final int LOWEST_PRIORITY=1;//最低優先級
10
11 public static final int LOW_PRIORITY=2;//低優先級
12
13 public static final int NORMAL_PRIORITY=3;//正常優先級
14
15 public static final int HIGH_PRIORITY=4;//高優先級
16
17 public static final int HIGHEST_PRIORITY=5;//最高優先級
18
19 protected BusinessListener listnener;//運行業務的回調
20
21 protected Context mContext;
22
23 private long taskId;
24
25 protected Map<String,Object> params;//運行業務需要的參數
26
27 private int priority;
28
29 public int getPriority() {
30 return priority;
31 }
32
33 public void setPriority(int priority) {
34 this.priority = priority;
35 }
36
37 //設置回調
38 public void addBusinessListener(BusinessListener listnener){
39 this.listnener=listnener;
40 }
41
42 public long doBusiness() throws NetworkNotConnectException{
43 taskId= TaskEngine.getInstance(mContext).addTask(this);
44 return taskId;
45 }
46
47 public abstract void execute();
48 }
作者 白 英軍