上篇文章我們講解了ArrayBlockingQueue源碼,這篇文章開始講解LinkedBlockingQueue源碼。從名字上就能看到ArrayBlockingQueue是基于數(shù)組實現(xiàn)的,而LinkedBlockingQueue是基于鏈表實現(xiàn)。
那么,LinkedBlockingQueue底層源碼實現(xiàn)是什么樣的?跟ArrayBlockingQueue有何不同?
LinkedBlockingQueue的應(yīng)用場景跟ArrayBlockingQueue有什么不一樣?
看完這篇文章,可以輕松解答這些問題。
由于LinkedBlockingQueue實現(xiàn)了BlockingQueue接口,而BlockingQueue接口中定義了幾組放數(shù)據(jù)和取數(shù)據(jù)的方法,來滿足不同的場景。
操作 | 拋出異常 | 返回特定值 | 一直阻塞 | 阻塞指定時間 |
放數(shù)據(jù) | add() | offer() | put() | offer(e, time, unit) |
取數(shù)據(jù)(同時刪除數(shù)據(jù)) | remove() | poll() | take() | poll(time, unit) |
取數(shù)據(jù)(不刪除) | element() | peek() | 不支持 | 不支持 |
這四組方法的區(qū)別是:
LinkedBlockingQueue也會有針對這幾組放數(shù)據(jù)和取數(shù)據(jù)方法的具體實現(xiàn)。 Java線程池中的固定大小線程池就是基于LinkedBlockingQueue實現(xiàn)的:
# 創(chuàng)建固定大小的線程池ExecutorService executorService = Executors.newFixedThreadPool(10);
對應(yīng)的源碼實現(xiàn):
# 底層使用LinkedBlockingQueue隊列存儲任務(wù)public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());}
先看一下LinkedBlockingQueue類里面有哪些屬性:
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** * 容量大小 */ private final int capacity; /** * 元素個數(shù) */ private final AtomicInteger count = new AtomicInteger(); /** * 頭節(jié)點 */ transient Node<E> head; /** * 尾節(jié)點 */ private transient Node<E> last; /** * 取數(shù)據(jù)的鎖 */ private final ReentrantLock takeLock = new ReentrantLock(); /** * 取數(shù)據(jù)的條件(隊列非空) */ private final Condition notEmpty = takeLock.newCondition(); /** * 放數(shù)據(jù)的鎖 */ private final ReentrantLock putLock = new ReentrantLock(); /** * 放數(shù)據(jù)的條件(隊列非滿) */ private final Condition notFull = putLock.newCondition(); /** * 鏈表節(jié)點類 */ static class Node<E> { /** * 節(jié)點元素 */ E item; /** * 后繼節(jié)點 */ Node<E> next; Node(E x) { item = x; } }}
圖片
可以看出LinkedBlockingQueue底層是基于鏈表實現(xiàn)的,定義了頭節(jié)點head和尾節(jié)點last,由鏈表節(jié)點類Node可以看出是個單鏈表。 發(fā)現(xiàn)個問題,ArrayBlockingQueue中只使用了一把鎖,入隊出隊操作共用這把鎖。而LinkedBlockingQueue則使用了兩把鎖,分別是出隊鎖takeLock和入隊鎖putLock,為什么要這么設(shè)計呢?
LinkedBlockingQueue把兩把鎖分開,性能更好,為什么ArrayBlockingQueue不這樣設(shè)計呢?
原因是ArrayBlockingQueue是基于數(shù)組實現(xiàn)的,所有數(shù)據(jù)都存儲在同一個數(shù)組對象里面,對同一個對象沒辦法使用兩把鎖,會有數(shù)據(jù)可見性的問題。而LinkedBlockingQueue底層是基于鏈表實現(xiàn)的,從頭節(jié)點刪除,尾節(jié)點插入,頭尾節(jié)點分別是兩個對象,可以分別使用兩把鎖,提升操作性能。
另外也定義了兩個條件notEmpty和notFull,當條件滿足的時候才允許放數(shù)據(jù)或者取數(shù)據(jù),下面會詳細講。
LinkedBlockingQueue常用的初始化方法有兩個:
/** * 無參構(gòu)造方法 */BlockingQueue<Integer> blockingQueue1 = new LinkedBlockingQueue<>();/** * 指定容量大小的構(gòu)造方法 */BlockingQueue<Integer> blockingQueue2 = new LinkedBlockingQueue<>(10);
再看一下對應(yīng)的源碼實現(xiàn):
/** * 無參構(gòu)造方法 */public LinkedBlockingQueue() { this(Integer.MAX_VALUE);}/** * 指定容量大小的構(gòu)造方法 */public LinkedBlockingQueue(int capacity) { if (capacity <= 0) { throw new IllegalArgumentException(); } // 設(shè)置容量大小,初始化頭尾結(jié)點 this.capacity = capacity; last = head = new Node<E>(null);}
可以看出LinkedBlockingQueue的無參構(gòu)造方法使用的鏈表容量是Integer的最大值,存儲大量數(shù)據(jù)的時候,會有內(nèi)存溢出的風險,建議使用有參構(gòu)造方法,指定容量大小。
有參構(gòu)造方法還會初始化頭尾節(jié)點,節(jié)點值為null。
LinkedBlockingQueue初始化的時候,不支持指定是否使用公平鎖,只能使用非公平鎖,而ArrayBlockingQueue是支持指定的。
放數(shù)據(jù)的方法有四個:
操作 | 拋出異常 | 返回特定值 | 阻塞 | 阻塞一段時間 |
放數(shù)據(jù) | add() | offer() | put() | offer(e, time, unit) |
先看一下offer()方法源碼,其他放數(shù)據(jù)方法邏輯也是大同小異,都是在鏈表尾部插入。 offer()方法在隊列滿的時候,會直接返回false,表示插入失敗。
/** * offer方法入口 * * @param e 元素 * @return 是否插入成功 */public boolean offer(E e) { // 1. 判空,傳參不允許為null if (e == null) { throw new NullPointerException(); } // 2. 如果隊列已滿,則直接返回false,表示插入失敗 final AtomicInteger count = this.count; if (count.get() == capacity) { return false; } int c = -1; Node<E> node = new Node<E>(e); // 3. 獲取put鎖,并加鎖 final ReentrantLock putLock = this.putLock; putLock.lock(); try { // 4. 加鎖后,再次判斷隊列是否已滿,如果未滿,則入隊 if (count.get() < capacity) { enqueue(node); // 5. 隊列個數(shù)加一 c = count.getAndIncrement(); // 6. 如果隊列未滿,則喚醒因為隊列已滿而等待放數(shù)據(jù)的線程(用來補償,不加也行) if (c + 1 < capacity) { notFull.signal(); } } } finally { // 7. 釋放鎖 putLock.unlock(); } // 8. c等于0,表示插入前,隊列為空,是第一次插入,需要喚醒因為隊列為空而等待取數(shù)據(jù)的線程 if (c == 0) { signalNotEmpty(); } // 9. 返回是否插入成功 return c >= 0;}/** * 入隊 * * @param node 節(jié)點 */private void enqueue(LinkedBlockingQueue.Node<E> node) { // 直接追加到鏈表末尾 last = last.next = node;}/** * 喚醒因為隊列為空而等待取數(shù)據(jù)的線程 */private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); }}
offer()方法邏輯也很簡單,追加元素到鏈表末尾,如果是第一次添加元素,就喚醒因為隊列為空而等待取數(shù)據(jù)的線程。
再看一下另外三個添加元素方法源碼:
add()方法在數(shù)組滿的時候,會拋出異常,底層基于offer()實現(xiàn)。
/** * add方法入口 * * @param e 元素 * @return 是否添加成功 */public boolean add(E e) { if (offer(e)) { return true; } else { throw new IllegalStateException("Queue full"); }}
put()方法在數(shù)組滿的時候,會一直阻塞,直到有其他線程取走數(shù)據(jù),空出位置,才能添加成功。
/** * put方法入口 * * @param e 元素 */public void put(E e) throws InterruptedException { // 1. 判空,傳參不允許為null if (e == null) { throw new NullPointerException(); } int c = -1; Node<E> node = new Node<E>(e); // 2. 加可中斷的鎖,防止一直阻塞 final ReentrantLock putLock = this.putLock; putLock.lockInterruptibly(); final AtomicInteger count = this.count; try { // 3. 如果隊列已滿,就一直阻塞,直到被喚醒 while (count.get() == capacity) { notFull.await(); } // 4. 如果隊列未滿,則直接入隊 enqueue(node); c = count.getAndIncrement(); // 5. 如果隊列未滿,則喚醒因為隊列已滿而等待放數(shù)據(jù)的線程(用來補償,不加也行) if (c + 1 < capacity) { notFull.signal(); } } finally { // 6. 釋放鎖 putLock.unlock(); } // 7. c等于0,表示插入前,隊列為空,是第一次插入,需要喚醒因為隊列為空而等待取數(shù)據(jù)的線程 if (c == 0) { signalNotEmpty(); }}
再看一下offer(e, time, unit)方法源碼,在數(shù)組滿的時候, offer(e, time, unit)方法會阻塞一段時間。
/** * offer方法入口 * * @param e 元素 * @param timeout 超時時間 * @param unit 時間單位 * @return 是否添加成功 */public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { // 1. 判空,傳參不允許為null if (e == null) { throw new NullPointerException(); } // 2. 把超時時間轉(zhuǎn)換為納秒 long nanos = unit.toNanos(timeout); int c = -1; final AtomicInteger count = this.count; // 2. 加可中斷的鎖,防止一直阻塞 final ReentrantLock putLock = this.putLock; putLock.lockInterruptibly(); try { // 4. 循環(huán)判斷隊列是否已滿 while (count.get() == capacity) { if (nanos <= 0) { // 6. 如果隊列已滿,且超時時間已過,則返回false return false; } // 5. 如果隊列已滿,則等待指定時間 nanos = notFull.awaitNanos(nanos); } // 7. 如果隊列未滿,則入隊 enqueue(new Node<E>(e)); // 8. 如果隊列未滿,則喚醒因為隊列已滿而等待放數(shù)據(jù)的線程(用來補償,不加也行) c = count.getAndIncrement(); if (c + 1 < capacity) { notFull.signal(); } } finally { // 9. 釋放鎖 putLock.unlock(); } // 10. c等于0,表示插入前,隊列為空,是第一次插入,需要喚醒因為隊列為空而等待取數(shù)據(jù)的線程 if (c == 0) { signalNotEmpty(); } return true;}
彈出數(shù)據(jù)(取出數(shù)據(jù)并刪除)的方法有四個:
操作 | 拋出異常 | 返回特定值 | 阻塞 | 阻塞一段時間 |
取數(shù)據(jù)(同時刪除數(shù)據(jù)) | remove() | poll() | take() | poll(time, unit) |
看一下poll()方法源碼,其他方取數(shù)據(jù)法邏輯大同小異,都是從鏈表頭部彈出元素。 poll()方法在彈出元素的時候,如果隊列為空,直接返回null,表示彈出失敗。
/** * poll方法入口 */public E poll() { // 如果隊列為空,則返回null final AtomicInteger count = this.count; if (count.get() == 0) { return null; } E x = null; int c = -1; // 2. 加鎖 final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { // 3. 如果隊列不為空,則取出隊頭元素 if (count.get() > 0) { x = dequeue(); // 4. 元素個數(shù)減一 c = count.getAndDecrement(); // 5. 如果隊列不為空,則喚醒因為隊列為空而等待取數(shù)據(jù)的線程 if (c > 1) { notEmpty.signal(); } } } finally { // 6. 釋放鎖 takeLock.unlock(); } // 7. 如果取數(shù)據(jù)之前,隊列已滿,取數(shù)據(jù)之后隊列肯定不滿了,則喚醒因為隊列已滿而等待放數(shù)據(jù)的線程 if (c == capacity) { signalNotFull(); } return x;}/** * 取出隊頭元素 */private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; head = first; E x = first.item; first.item = null; return x;}/** * 喚醒因為隊列已滿而等待放數(shù)據(jù)的線程 */private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); }}
再看一下remove()方法源碼,如果隊列為空,remove()會拋出異常。
/** * remove方法入口 */public E remove() { // 1. 直接調(diào)用poll方法 E x = poll(); // 2. 如果取到數(shù)據(jù),直接返回,否則拋出異常 if (x != null) { return x; } else { throw new NoSuchElementException(); }}
再看一下take()方法源碼,如果隊列為空,take()方法就一直阻塞,直到被喚醒。
/** * take方法入口 */public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; // 1. 加可中斷的鎖,防止一直阻塞 final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { // 2. 如果隊列為空,就一直阻塞,直到被喚醒 while (count.get() == 0) { notEmpty.await(); } // 3. 如果隊列不為空,則取出隊頭元素 x = dequeue(); // 4. 隊列元素個數(shù)減一 c = count.getAndDecrement(); // 5. 如果隊列不為空,則喚醒因為隊列為空而等待取數(shù)據(jù)的線程 if (c > 1) { notEmpty.signal(); } } finally { // 6. 釋放鎖 takeLock.unlock(); } // 7. 如果取數(shù)據(jù)之前,隊列已滿,取數(shù)據(jù)之后隊列肯定不滿了,則喚醒因為隊列已滿而等待放數(shù)據(jù)的線程 if (c == capacity) { signalNotFull(); } return x;}
再看一下poll(time, unit)方法源碼,在隊列滿的時候, poll(time, unit)方法會阻塞指定時間,然后然后null。
/** * poll方法入口 * * @param timeout 超時時間 * @param unit 時間單位 * @return 元素 */public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; // 1. 把超時時間轉(zhuǎn)換成納秒 long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; // 2. 加可中斷的鎖,防止一直阻塞 final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { // 3. 循環(huán)判斷隊列是否為空 while (count.get() == 0) { if (nanos <= 0) { // 5. 如果隊列為空,且超時時間已過,則返回null return null; } // 4. 阻塞到到指定時間 nanos = notEmpty.awaitNanos(nanos); } // 6. 如果隊列不為空,則取出隊頭元素 x = dequeue(); // 7. 隊列元素個數(shù)減一 c = count.getAndDecrement(); // 8. 如果隊列不為空,則喚醒因為隊列為空而等待取數(shù)據(jù)的線程 if (c > 1) { notEmpty.signal(); } } finally { // 9. 釋放鎖 takeLock.unlock(); } // 7. 如果取數(shù)據(jù)之前,隊列已滿,取數(shù)據(jù)之后隊列肯定不滿了,則喚醒因為隊列已滿而等待放數(shù)據(jù)的線程 if (c == capacity) { signalNotFull(); } return x;}
再看一下查看數(shù)據(jù)源碼,查看數(shù)據(jù),并不刪除數(shù)據(jù)。
操作 | 拋出異常 | 返回特定值 | 阻塞 | 阻塞一段時間 |
取數(shù)據(jù)(不刪除) | element() | peek() | 不支持 | 不支持 |
先看一下peek()方法源碼,如果數(shù)組為空,直接返回null。
/** * peek方法入口 */public E peek() { // 1. 如果隊列為空,則返回null if (count.get() == 0) { return null; } // 2. 加鎖 final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { // 3. 取出隊頭元素 Node<E> first = head.next; if (first == null) { return null; } else { return first.item; } } finally { // 4. 釋放鎖 takeLock.unlock(); }}
再看一下element()方法源碼,如果隊列為空,則拋出異常。
/** * element方法入口 */public E element() { // 1. 調(diào)用peek方法查詢數(shù)據(jù) E x = peek(); // 2. 如果查到數(shù)據(jù),直接返回 if (x != null) { return x; } else { // 3. 如果沒找到,則拋出異常 throw new NoSuchElementException(); }}
這篇文章講解了LinkedBlockingQueue阻塞隊列的核心源碼,了解到LinkedBlockingQueue隊列具有以下特點:
那么ArrayBlockingQueue與LinkedBlockingQueue區(qū)別是什么?相同點:
不同點:
今天一起分析了LinkedBlockingQueue隊列的源碼,可以看到LinkedBlockingQueue的源碼非常簡單,沒有什么神秘復雜的東西,下篇文章再一起接著分析其他的阻塞隊列源碼。
本文鏈接:http://www.www897cc.com/showinfo-26-70476-0.html深入理解Java線程池,剖析LinkedBlockingQueue源碼實現(xiàn)
聲明:本網(wǎng)頁內(nèi)容旨在傳播知識,若有侵權(quán)等問題請及時與本網(wǎng)聯(lián)系,我們將在第一時間刪除處理。郵件:2376512515@qq.com
下一篇: 一文搞懂設(shè)計模式—策略模式