本文是Java並發包探秘的第一篇,旨在介紹一下Java並發容器中用的一些思路和技巧,幫助大傢更好的理解Java並發容器,讓我們更好的使用並發容器打造更高效的程序。本人能力有限,錯誤難免。希望及時指出。
Java並發包中有很多精心設計的高並發容器。有ConcurrentHashMap、ConcurrentSkipListMap 、ConcurrentLinkedQueue等。ConcurrentLinkedQueue就是其中設計最為優雅的高並發容器。它被設計成瞭無鎖的、無界的、非阻塞式的單向鏈表結構。現在就讓我們來一步一步揭開他們神秘的面紗。
正文開始:
一說到鏈表結構,我們首先就會想到的就是組成鏈表結構的原件,那就是節點。或者有的人稱之為元素。ConcurrentLinkedQueue(為瞭敘述方便後面用CLQ指代)中稱之為Node.
我們先來看看CLQ中Node的結構:
Java代碼
private static class Node<E> {
private volatile E item;
private volatile Node<E> next;
private static final
AtomicReferenceFieldUpdater<Node, Node>
nextUpdater =
AtomicReferenceFieldUpdater.newUpdater
(Node.class, Node.class, "next");
private static final
AtomicReferenceFieldUpdater<Node, Object>
itemUpdater =
AtomicReferenceFieldUpdater.newUpdater
(Node.class, Object.class, "item");
Node(E x) { item = x; }
Node(E x, Node<E> n) { item = x; next = n; }
E getItem() {
return item;
}
boolean casItem(E cmp, E val) {
return itemUpdater.compareAndSet(this, cmp, val);
}
void setItem(E val) {
itemUpdater.set(this, val);
}
Node<E> getNext() {
return next;
}
boolean casNext(Node<E> cmp, Node<E> val) {
return nextUpdater.compareAndSet(this, cmp, val);
}
void setNext(Node<E> val) {
nextUpdater.set(this, val);
}
}
1.CLQ中的Node定義成瞭私有的靜態類說明該節點描述隻適用在CLQ中。
2.其中用到瞭AtomicReferenceFieldUpdater原子屬性引用原子更新器。該類是抽象的,但該類的內部已經給出瞭包訪問控制級別的一個實現AtomicReferenceFieldUpdaterImpl,原理是利用反射將一個被聲明成volatile 的屬性通過Java native interface (JNI)調用,使用cpu指令級的命令將一個變量進行更新,該操作是原子的。atomic 包中還有很多類似的更新器分別針對不同的類型進行原子級別的比較更新原子操作。這裡用到瞭sun.misc.Unsafe 的compareAndSwap操作(簡稱CAS)它有三種不同的本地命令分別針對Int、Long、Object進行原子更新操作。
3.我們可以看出CLQ中的Node結構是一個單向的鏈表結構,因為每個Node隻有一個向後的next和一個item用來裝內容。CLQ將通過casNext和casItem方法來原子更新Node鏈的結構。setNext 和setItem則是直接放入
我們再來看CLQ的鏈結構
Java代碼
private static final
AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node>
tailUpdater =
AtomicReferenceFieldUpdater.newUpdater
(ConcurrentLinkedQueue.class, Node.class, "tail");
private static final
AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node>
headUpdater =
AtomicReferenceFieldUpdater.newUpdater
(ConcurrentLinkedQueue.class, Node.class, "head");
private boolean casTail(Node<E> cmp, Node<E> val) {
return tailUpdater.compareAndSet(this, cmp, val);
}
private boolean casHead(Node<E> cmp, Node<E> val) {
return headUpdater.compareAndSet(this, cmp, val);
}
/**
* Pointer to header node, initialized to a dummy node. The first
* actual node is at head.getNext().
*/
private transient volatile Node<E> head = new Node<E>(null, null);
/** Pointer to last node on list **/
private transient volatile Node<E> tail = head;
1.實際上經過對Node的分析。CLQ中的頭尾指針的更新原理其實也是一樣的。都是通過cpu原子操作命令進行的更新。
2.這樣我們就有瞭在高並發下原子更新的基礎支持,但是除瞭原子更新的支持是不夠的。原因很簡單,這是因為當多個線程同時使用原子更新操作來更新一個鏈表結構的時候隻有一個成功其它的都會失敗。失敗的操作如何再讓它成功才是問題的關鍵。CLQ優雅的解決瞭這一問題。
我們再來看看CLQ的放入元素操作:
Java代碼
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
Node<E> n = new Node<E>(e, null);
for (;;) { //1
Node<E> t = tail; //2
Node<E> s = t.getNext(); //3
if (t == tail) { //4
if (s == null) { //5
if (t.casNext(s, n)) { //6
casTail(t, n); //7
return true; //8
}
} else {
casTail(t, s); //9
}
}
}
}
在有鎖得情況下我們隻要讓獲得鎖得線程更新,其它線程等待即可解決並發更新的問題,但是在上述的單向鏈表結構中有更好的無鎖解決方法。
1.代碼1 啊! 死循環,對,就是利用反復輪詢的重復一段邏輯操作。
2.代碼2 代碼3 先用兩個臨時變量指向CLQ的尾和尾的下一個節點。這樣有什麼好處?直接用tail和tail.getNext不行嗎?我們說瞭。這是一個無鎖得方法。可能有多個線程同時執行到代碼3處,因為臨時變量是每線程的而tail是公共的。這樣成功執行到代碼3的線程都有自己當時的臨時CLQ隊列結構引用。為後面的判斷做好準備。
3.開始判斷 代碼4 證明 在代碼2 和代碼4之間沒有被其它線程修改過,因為有可能已經被修改瞭。那麼這時進入新的輪詢。
4.代碼5 在看代碼5之前我們先要明確一個概念就是把一個Node放入一個CLQ隊列有兩步操作。第一步是tail的next指向新的節點。第二步是tail指向新的節點。代碼5 先判斷是不是有線程已經在完成加入一個節點的第一步,如果是就幫助它完成第二步,再次進入循環。如果沒有線程已經完成第一步。那就自己來完成插入節點的第一步,當然就是調用casNext比較更新的原子操作。上文已經講過。再來完成插入元素的第二步,以上邏輯由代碼6、代碼7完成。註意看代碼8 恒為真? 為什麼?自己調用casTail如果成功返回真毫無疑問。如果失敗為什麼也返回真?答案很簡單,這是因為如果失敗說明一定有其它線程進入瞭代碼9 幫自己完成瞭插入一個節點的第二步操作。所以自己操作肯定是失敗的。所以也返回真。
從上面的代碼分析可以看出打造一個無鎖得並發容器處處都要十分小心。這也是CLQ的高明之處。
我們再來看看刪除一個元素的代碼:
Java代碼
public E poll() {
for (;;) {
Node<E> h = head; //1
Node<E> t = tail; //2
Node<E> first = h.getNext(); //3
if (h == head) { //4
if (h == t) { //5
if (first == null) //6
return null; //7
else
casTail(t, first); //8
} else if (casHead(h, first)) { //9
E item = first.getItem(); //10
if (item != null) { //11
first.setItem(null); //12
return item; //13
}
// else skip over deleted item, continue loop,
}
}
}
}
1.同樣是輪詢,當h!=head的時候繼續循環。因為在代碼1和代碼4之間已經有其它線程刪除瞭頭元素。從而造成h != head.
2.代碼5 是否是空的CLQ。
3.如果是空的CLQ判斷頭得下一節點是否是null.因為隻有時空的才說明沒有元素。否則有可能其它線程正在插入元素造成first!=null,這時就幫助其它線程完成為指針更新操作。再繼續輪詢。
4.如果是非空的CLQ用casHead來原子更新頭節點。因為刪除一個CLQ的元素是從頭開始刪除的。如果失敗說明有其它線程在刪除元素。繼續輪詢。
5.代碼10 如果第一個元素的內容為空說明有線程已經執行到代碼12瞭。所以又開始輪詢。
6.隻有成功執行到代碼13才正真是由當前線程完成瞭刪除一個元素操作。CLQ的peek()操作和poll操作隻差代碼12的操作,即一個刪除元素,一個不刪除元素。
在CLQ中迭代器的方法也很精妙:
Java代碼
private E advance() {
lastRet = nextNode;
E x = nextItem;
Node<E> p = (nextNode == null)? first() : nextNode.getNext();
for (;;) {
if (p == null) {
nextNode = null;
nextItem = null;
return x;
}
E item = p.getItem();
if (item != null) {
nextNode = p;
nextItem = item;
return x;
} else // skip over nulls
p = p.getNext();
}
}
由於CLQ單向鏈表的特殊性,元素的變化隻可能頭處刪除,在尾處添加。所以使用CLQ的迭代器時元素可能比實際的要多。原因很簡單,當你在迭代的時候元素可能已經刪除,當然這是你迭代的線程是不可見的。而刪除是可見的。
ConcurrentLinkeQueue的其它操作大同小異。都是在不斷的輪詢中步步判斷其它線程的影響,一步一步推進自己的操作邏輯。從而最終完成操作的。