Java線程(十三):BlockingQueue-線程的阻塞隊列 – JAVA編程語言程序開發技術文章

 BlockingQueue作為線程容器,可以為線程同步提供有力的保障,其主要用到的方法包括:
[java]
add(E o); //將指定的元素添加到此隊列中(如果立即可行),在成功時返回 true,其他情況則拋出 IllegalStateException。 
drainTo(Collection<? super E> c);  //移除此隊列中所有可用的元素,並將它們添加到給定 collection 中。 
drainTo(Collection<? super E> c,int maxElements);//最多從此隊列中移除給定數量的可用元素,並將這些元素添加到給定 collection 中 
offer(E o);  //如果可能的話,將指定元素插入此隊列中。 
offer(E o, long timeout, TimeUnit unit);  //將指定的元素插入此隊列中,如果沒有可用空間,將等待指定的等待時間(如果有必要)。 
poll(long timeout, TimeUnit unit);  //檢索並移除此隊列的頭部,如果此隊列中沒有任何元素,則等待指定等待的時間(如果有必要)。 
put(E o);    //將指定元素添加到此隊列中,如果沒有可用空間,將一直等待(如果有必要)。 
remainingCapacity();  //返回在無阻塞的理想情況下(不存在內存或資源約束)此隊列能接受的元素數量;如果沒有內部限制,則返回 Integer.MAX_VALUE。 
take();  //檢索並移除此隊列的頭部,如果此隊列不存在任何元素,則一直等待。 

       上述方法中主要用到的是put()和take()方法,也隻有這兩個方法具有阻塞等待功能,另外BlockingQueue 不接受 null 元素。試圖 add、put 或 offer 一個 null 元素時,某些實現會拋出 NullPointerException。null 被用作指示 poll 操作失敗的警戒值。
       BlockingQueue 可以定義為限定容量的隊列,它有一個 remainingCapacity容量值,超出此容量,便無法無阻塞地 put 額外的元素。也可以定義為沒有容量限制的隊列,沒有容量約束的 BlockingQueue 總是報告 Integer.MAX_VALUE 的剩餘容量。
       BlockingQueue 實現是線程安全的。所有排隊方法都可以使用內部鎖定或其他形式的並發控制來自動達到它們的目的。然而,大量的 Collection 操作(addAll、containsAll、retainAll 和 removeAll)沒有 必要自動執行,除非在實現中特別說明。因此,舉例來說,在隻添加瞭 c 中的一些元素後,addAll(c) 有可能失敗(拋出一個異常)。它實質上不 支持使用任何一種“close”或“shutdown”操作來指示不再添加任何項。這種功能的需求和使用有依賴於實現的傾向。例如,一種常用的策略是:對於生產者,插入特殊的 end-of-stream 或 poison 對象,並根據使用者獲取這些對象的時間來對它們進行解釋。
      BlockingQueue 主要用於實現生產者-使用者隊列,但它另外還支持 Collection 接口。因此,舉例來說,使用 remove(x) 從隊列中移除任意一個元素是有可能的。然而,這種操作通常不 會有效執行,隻能有計劃地偶爾使用,比如在取消排隊信息時。阻塞隊列與Semaphore有很大相似性,但也有很多不同,阻塞隊列一般是一方存數據,另一方釋放數據,而Semaphore通常是同一方獲取和釋放信號。下面通過一個例子加以說明:
[java] 
public class BlockingQueueTest { 
    public static void main(String[] args) { 
        final BlockingQueue queue = new ArrayBlockingQueue(3); 
        for(int i=0;i<2;i++){ 
            new Thread(){ 
                public void run(){ 
                    while(true){ 
                        try { 
                        Thread.sleep((long)(Math.random()*1000)); 
                        System.out.println(Thread.currentThread().getName() + "準備放數據!");                             
                            queue.put(1); 
System.out.println(Thread.currentThread().getName() + "已經放瞭數據," +   "隊列目前有" + queue.size() + "個數據"); 
                        } catch (InterruptedException e) { 
                            e.printStackTrace(); 
                        } 
                    } 
                } 
                 
            }.start(); 
        } 
        new Thread(){ 
            public void run(){ 
                while(true){ 
                    try { 
                    //將此處的睡眠時間分別改為100和1000,觀察運行結果 
                    Thread.sleep(1000); 
                    System.out.println(Thread.currentThread().getName() + "準備取數據!"); 
                    queue.take(); 
                    System.out.println(Thread.currentThread().getName() + "已經取走數據," +  
                                "隊列目前有" + queue.size() + "個數據");         
                    } catch (InterruptedException e) { 
                        e.printStackTrace(); 
                    } 
                } 
            } 
        }.start();           
    } 

       上例中定義瞭一個最多可以存放3個數據的BlockingQueue,並創建瞭兩個用於put()的線程,一個用於take()的線程,這邊能夠更容易使阻塞隊列形成滿隊列,當隊列中的有3個數據的時候,兩個put()線程就等待,隻有當take()線程取走一個數據時才可以繼續往隊列中添加數據。運行結果如下(隻去部分結果):
[java] 
Thread-1準備放數據! 
Thread-1已經放瞭數據,隊列目前有1個數據 
Thread-1準備放數據! 
Thread-1已經放瞭數據,隊列目前有2個數據 
Thread-0準備放數據! 
Thread-0已經放瞭數據,隊列目前有3個數據 
Thread-0準備放數據! 
Thread-2準備取數據! 
Thread-2已經取走數據,隊列目前有2個數據 

      既然阻塞隊列可以實現線程之間的等待,那麼我們就可以通過兩個具有1個空間的阻塞隊列可以實現線程的同步,關鍵代碼如下:
[java] 
BlockingQueue<Integer> queue1 = new ArrayBlockingQueue<Integer>(1); 
 BlockingQueue<Integer> queue2 = new ArrayBlockingQueue<Integer>(1); 
 public  void sub(int i){ 
                try { 
                    queue1.put(1); 
                } catch (InterruptedException e) { 
                    e.printStackTrace(); 
                } 
                for(int j=1;j<=10;j++){ 
                    System.out.println("sub thread sequece of " + j + ",loop of " + i); 
                } 
                try { 
                    queue2.take(); 
                } catch (InterruptedException e) { 
                    // TODO Auto-generated catch block 
                    e.printStackTrace(); 
                } 
          } 
          public  void main(int i){ 
                try { 
                    queue2.put(1); 
                } catch (InterruptedException e1) { 
                    // TODO Auto-generated catch block 
                    e1.printStackTrace(); 
                } 
                for(int j=1;j<=100;j++){ 
                    System.out.println("main thread sequece of " + j + ",loop of " + i); 
                } 
                try { 
                    queue1.take(); www.aiwalls.com
                } catch (InterruptedException e) { 
                    // TODO Auto-generated catch block 
                    e.printStackTrace(); 
                } 
          } 

        上例中定義瞭兩個方法,一個sub()和一個main(),兩個方法要實現同步,由於定義的兩個阻塞隊列都是容量為1,所以隻要有一個queue1.put(1);那麼sub()方法就必須等待,隻有當main()方法中queue1.take();以後sub()方法才可以繼續進行,main()方法也類似。

發佈留言