Java並發控制:ReentrantLock Condition的使用

生產者-消費者(producer-consumer)問題,也稱作有界緩沖區(bounded-buffer)問題,兩個進程共享一個公共的固定大小的緩沖區。

其中一個是生產者,用於將消息放入緩沖區;另外一個是消費者,用於從緩沖區中取出消息。
問題出現在當緩沖區已經滿瞭,而此時生產者還想向其中放入一個新的數據項的情形,其解決方法是讓生產者此時進行休眠,等待消費者從緩沖區中取走瞭一個或者多個數據後再去喚醒它。
同樣地,當緩沖區已經空瞭,而消費者還想去取消息,此時也可以讓消費者進行休眠,等待生產者放入一個或者多個數據時再喚醒它。

Condition 將 Object 監視器方法(wait、notify 和 notifyAll)分解成截然不同的對象,以便通過將這些對象與任意 Lock 實現組合使用,為每個對象提供 wait 和 signal 方法。
其中,Lock 替代瞭 synchronized 方法和語句的使用,Condition 替代瞭 Object 監視器方法的使用。

在 Condition 中,用 await() 替換 wait(),用 signal() 替換 notify(),用signalAll() 替換 notifyAll(),傳統線程的通信方式,Condition 都可以實現,這裡註意,Condition 是被綁定到 Lock 上的,要創建一個 Lock 的 Condition 必須用 newCondition() 方法。

例1

class BoundedBuffer {
   final Lock lock = new ReentrantLock();//鎖對象
   final Condition notFull  = lock.newCondition();//寫線程條件 
   final Condition notEmpty = lock.newCondition();//讀線程條件 

   final Object[] items = new Object[100];//緩存隊列
   int putptr/*寫索引*/, takeptr/*讀索引*/, count/*隊列中存在的數據個數*/;

   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length)//如果隊列滿瞭 
         notFull.await();//阻塞寫線程
       items[putptr] = x;//賦值 
       if (++putptr == items.length) putptr = 0;//如果寫索引寫到隊列的最後一個位置瞭,那麼置為0
       ++count;//個數++
       notEmpty.signal();//喚醒讀線程
     } finally {
       lock.unlock();
     }
   }

   public Object take() throws InterruptedException {
     lock.lock();
     try {
       while (count == 0)//如果隊列為空
         notEmpty.await();//阻塞讀線程
       Object x = items[takeptr];//取值 
       if (++takeptr == items.length) takeptr = 0;//如果讀索引讀到隊列的最後一個位置瞭,那麼置為0
       --count;//個數--
       notFull.signal();//喚醒寫線程
       return x;
     } finally {
       lock.unlock();
     }
   } 
 }

這是一個處於多線程工作環境下的緩存區,緩存區提供瞭兩個方法,put 和 take,put 是存數據,take 是取數據,內部有個緩存隊列。

這個緩存區類實現的功能:有多個線程往裡面存數據和從裡面取數據,其緩存隊列(先進先出後進後出)能緩存的最大數值是100,多個線程間是互斥的,當緩存隊列中存儲的值達到100時,將寫線程阻塞,並喚醒讀線程,當緩存隊列中存儲的值為0時,將讀線程阻塞,並喚醒寫線程,這也是 ArrayBlockingQueue 的內部實現。

下面分析一下代碼的執行過程:

一個寫線程執行,調用 put 方法;

判斷 count 是否為100,顯然沒有100;

繼續執行,存入值;

判斷當前寫入的索引位置++後,是否和100相等,相等將寫入索引值變為0,並將count+1;

僅喚醒讀線程阻塞隊列中的一個;

一個讀線程執行,調用take方法;

……

僅喚醒寫線程阻塞隊列中的一個。

這就是多個 Condition 的強大之處,假設緩存隊列中已經存滿,那麼阻塞的肯定是寫線程,喚醒的肯定是讀線程,相反,阻塞的肯定是讀線程,喚醒的肯定是寫線程,那麼假設隻有一個Condition會有什麼效果呢,緩存隊列中已經存滿,這個Lock不知道喚醒的是讀線程還是寫線程瞭,如果喚醒的是讀線程,皆大歡喜,如果喚醒的是寫線程,那麼線程剛被喚醒,又被阻塞瞭,這時又去喚醒,這樣就浪費瞭很多時間。

例2

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Main {
    static class NumberWrapper {
        public int value = 1;
    }

    public static void main(String[] args)  {
        //初始化可重入鎖
        final Lock lock = new ReentrantLock();

        //第一個條件當螢幕上輸出到3
        final Condition reachThreeCondition = lock.newCondition();
        //第二個條件當螢幕上輸出到6
        final Condition reachSixCondition = lock.newCondition();

        //NumberWrapper隻是為瞭封裝一個數字,一邊可以將數字對象共享,並可以設置為final
        //註意這裡不要用Integer, Integer 是不可變對象
        final NumberWrapper num = new NumberWrapper();
        //初始化A線程
        Thread threadA = new Thread(new Runnable() {
            @Override
            public void run() {
                //需要先獲得鎖
                lock.lock();
                try {
                    System.out.println("threadA start write");
                    //A線程先輸出前3個數
                    while (num.value <= 3) {
                        System.out.println(num.value);
                        num.value++;
                    }
                    //輸出到3時要signal,告訴B線程可以開始瞭
                    reachThreeCondition.signal();
                } finally {
                    lock.unlock();
                }
                lock.lock();
                try {
                    //等待輸出6的條件
                    reachSixCondition.await();
                    System.out.println("threadA start rewrite");
                    //輸出剩餘數字
                    while (num.value <= 9) {
                        System.out.println(num.value);
                        num.value++;
                    }

                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }

        });


        Thread threadB = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock.lock();

                    while (num.value <= 3) {
                        //等待3輸出完畢的信號
                        reachThreeCondition.await();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
                try {
                    lock.lock();
                    //已經收到信號,開始輸出4,5,6
                    System.out.println("threadB start write");
                    while (num.value <= 6) {
                        System.out.println(num.value);
                        num.value++;
                    }
                    //4,5,6輸出完畢,告訴A線程6輸出完瞭
                    reachSixCondition.signal();
                } finally {
                    lock.unlock();
                }
            }

        });

        //啟動兩個線程
        threadA.start();
        threadB.start();
    }
}

結果如下:

threadA start write
2
threadB start write
5
threadA start rewrite
8

基本思路就是首先要A線程先寫1,2,3,這時候B線程應該等待 reachThredCondition 信號,而當A線程寫完3之後就通過signal告訴B線程“我寫到3瞭,該你瞭”,

這時候A線程要等 reachSixCondition 信號,同時B線程得到通知,開始寫4,5,6,寫完4,5,6之後B線程通知A線程 reachSixCondition 條件成立瞭,這時候A線程就開始寫剩下的7,8,9瞭。

例3

Java官方提供的例子:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Main {
    public static void main(String[] args)  {
        final BoundedBuffer boundedBuffer = new BoundedBuffer();

        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("t1 run");
                for (int i=0;i<20;i++) {
                    try {
                        System.out.println("putting..");
                        boundedBuffer.put(Integer.valueOf(i));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }

        }) ;

        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i=0;i<20;i++) {
                    try {
                        Object val = boundedBuffer.take();
                        System.out.println(val);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }

        }) ;

        t1.start();
        t2.start();
    }

    /**
     * BoundedBuffer 是一個定長100的集合,當集合中沒有元素時,take方法需要等待,直到有元素時才返回元素
     * 當其中的元素數達到最大值時,要等待直到元素被take之後才執行put的操作
     * @author yukaizhao
     *
     */
    static class BoundedBuffer {
        final Lock lock = new ReentrantLock();
        final Condition notFull = lock.newCondition();
        final Condition notEmpty = lock.newCondition();

        final Object[] items = new Object[100];
        int putptr, takeptr, count;

        public void put(Object x) throws InterruptedException {
            System .out.println("put wait lock");
            lock.lock();
            System.out.println("put get lock");
            try {
                while (count == items.length) {
                    System.out.println("buffer full, please wait");
                    notFull.await();
                }

                items[putptr] = x;
                if (++putptr == items.length)
                    putptr = 0;
                ++count;
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
        }

        public Object take() throws InterruptedException {
            System.out.println("take wait lock");
            lock.lock();
            System.out.println("take get lock");
            try {
                while (count == 0) {
                    System.out.println("no elements, please wait");
                    notEmpty.await();
                }
                Object x = items[takeptr];
                if (++takeptr == items.length)
                    takeptr = 0;
                --count;
                notFull.signal();
                return x;
            } finally {
                lock.unlock();
            }
        }
    }
}

結果如下:


t1 run
putting..
put wait lock
take wait lock
put get lock
putting..
put wait lock
take get lock
0
take wait lock
take get lock
no elements, please wait
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
put get lock
putting..
1
take wait lock
put wait lock
take get lock
2
put get lock
take wait lock
take get lock
3
take wait lock
putting..
put wait lock
take get lock
4
take wait lock
put get lock
putting..
put wait lock
take get lock
5
take wait lock
put get lock
putting..
put wait lock
take get lock
6
take wait lock
put get lock
putting..
put wait lock
take get lock
7
take wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
take get lock
8
take wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
take get lock
9
take wait lock
put get lock
putting..
put wait lock
take get lock
put get lock
putting..
put wait lock
10
take wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
take get lock
11
take wait lock
take get lock
12
take wait lock
take get lock
13
take wait lock
take get lock
14
take wait lock
take get lock
15
take wait lock
take get lock
no elements, please wait
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
put get lock
16
take wait lock
take get lock
17
take wait lock
take get lock
18
take wait lock
take get lock
19

這個示例中 BoundedBuffer 是一個固定長度的集合,

這個在其 put 操作時,如果發現長度已經達到最大長度,那麼要等待notFull信號才能繼續 put,如果得到 notFull 信號會像集合中添加元素,並且 put 操作會發出 notEmpty 的信號,

而在其 take 方法中如果發現集合長度為空,那麼會等待 notEmpty 的信號,接受到 notEmpty 信號才能繼續 take,同時如果拿到一個元素,那麼會發出 notFull 的信號。

如果采用 Object 類中的 wait(),notify(),notifyAll() 實現該緩沖區,當向緩沖區寫入數據之後需要喚醒”讀線程”時,不可能通過 notify() 或 notifyAll() 明確的指定喚醒”讀線程”,而隻能通過 notifyAll 喚醒所有線程(但是 notifyAll 無法區分喚醒的線程是讀線程,還是寫線程)。 但是,通過 Condition,就能明確的指定喚醒讀線程。

Condition 原理分析

ConditionObject 是同步器 AbstractQueuedSynchronizer 的內部類,因為 Condition 的操作需要獲取相關聯的鎖,所以作為同步器的內部類也較為合理。每個 Condition 對象都包含著一個隊列,該隊列是 Condition 對象實現等待/通知功能的關鍵。下面將分析Condition 的實現,主要包括:等待隊列、等待和通知。

等待隊列

等待隊列是一個FIFO的隊列,在隊列中的每個節點都包含瞭一個線程引用,該線程就是在 Condition 對象上等待的線程,如果一個線程調用瞭Condition.await()方法,那麼該線程將會構造成節點加入等待隊列並進入等待狀態,在 unlock() 方法後釋放鎖。
一個 Condition 包含一個等待隊列,Condition 擁有首節點(firstWaiter)和尾節點(lastWaiter)。當前線程調用 Condition.await() 方法,將會以當前線程構造節點,並將節點從尾部加入等待隊列,等待隊列的基本結構如下圖所示:

等待

如圖所示,Condition 擁有首尾節點的引用,而新增節點隻需要將原有的尾節點 nextWaiter 指向它,並且更新尾節點即可。上述節點引用更新的過程並沒有使用CAS保證,原因在於調用 await() 方法的線程必定是獲取瞭鎖的線程,也就是說該過程是由鎖來保證線程安全的。在Object的監視器模型上,一個對象擁有一個同步隊列和等待隊列,而並發包中的 Lock(更確切地說是同步器)擁有一個同步隊列和多個等待隊列,其對應關系如下圖所示:

Lock

等待

調用 Condition 的 await() 方法,會使當前線程進入等待隊列,同時線程狀態變為等待狀態,在調用 unlock() 方法後釋放鎖。當從 await() 方法返回時,當前線程一定獲取瞭Condition 相關聯的鎖。如果從隊列(同步隊列和等待隊列)的角度看 await() 方法,當調用 await() 方法時,相當於同步隊列的首節點(獲取瞭鎖的節點)移動到 Condition 的等待隊列中。調用該方法的線程成功獲取瞭鎖的線程,也就是同步隊列中的首節點,該方法會將當前線程構造成節點並加入等待隊列中,然後釋放同步狀態,喚醒同步隊列中的後繼節點,然後當前線程會進入等待狀態。當等待隊列中的節點被喚醒,則喚醒節點的線程開始嘗試獲取同步狀態。如果不是通過其他線程調用 Condition.signal() 方法喚醒,而是對等待線程進行中斷,則會拋出 InterruptedException :

await

通知

調用 Condition 的 signal() 方法,將會喚醒在等待隊列中等待時間最長的節點(首節點),在喚醒節點之前,會將節點移到同步隊列中。調用該方法的前置條件是當前線程必須獲取瞭鎖,接著獲取等待隊列的首節點,將其移動到同步隊列。節點從等待隊列移動到同步隊列的過程如下圖所示:

signal

signal() 方法隻是將 Condition 等待隊列頭結點移出隊列,此時該線程節點還是阻塞的,同時將該節點的線程重新包裝加入同步隊列,當調用 unlock() 方法時,會喚醒同步隊列的第二個節點,假如這個新節點是處於第二個位置,那麼它將會被喚醒,否則,繼續阻塞。

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *