前面文章我們講解了ArrayBlockingQueue和LinkedBlockingQueue源碼,這篇文章開始講解SynchronousQueue源碼。從名字上就能看到ArrayBlockingQueue是基于數(shù)組實(shí)現(xiàn)的,而LinkedBlockingQueue是基于鏈表實(shí)現(xiàn),而SynchronousQueue是基于什么數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn)的,看不來。
無論是ArrayBlockingQueue還是LinkedBlockingQueue都是起到緩沖隊(duì)列的作用,當(dāng)消費(fèi)者的消費(fèi)速度跟不上時(shí),任務(wù)就在隊(duì)列中堆積,需要等待消費(fèi)者慢慢消費(fèi)。
如果我們想要自己的任務(wù)快速執(zhí)行,不要積壓在隊(duì)列中,該怎么辦? 今天的主角SynchronousQueue就派上用場了。
SynchronousQueue被稱為同步隊(duì)列,當(dāng)生產(chǎn)者往隊(duì)列中放元素的時(shí)候,必須等待消費(fèi)者把這個(gè)元素取走,否則一直阻塞。消費(fèi)者取元素的時(shí)候,同理也必須等待生產(chǎn)者放隊(duì)列中放元素。
由于SynchronousQueue實(shí)現(xiàn)了BlockingQueue接口,而BlockingQueue接口中定義了幾組放數(shù)據(jù)和取數(shù)據(jù)的方法,來滿足不同的場景。
操作 | 拋出異常 | 返回特定值 | 一直阻塞 | 阻塞指定時(shí)間 |
放數(shù)據(jù) | add() | offer() | put() | offer(e, time, unit) |
取數(shù)據(jù)(同時(shí)刪除數(shù)據(jù)) | remove() | poll() | take() | poll(time, unit) |
取數(shù)據(jù)(不刪除) | element() | peek() | 不支持 | 不支持 |
SynchronousQueue也會(huì)有針對(duì)這幾組放數(shù)據(jù)和取數(shù)據(jù)方法的具體實(shí)現(xiàn)。
Java線程池中的帶緩存的線程池就是基于SynchronousQueue實(shí)現(xiàn)的:
// 創(chuàng)建帶緩存的線程池ExecutorService executorService = Executors.newCachedThreadPool();
對(duì)應(yīng)的源碼實(shí)現(xiàn):
// 底層使用SynchronousQueue隊(duì)列處理任務(wù)public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());}
先看一下SynchronousQueue類里面有哪些屬性:
public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** * 轉(zhuǎn)接器(棧和隊(duì)列的父類) */ abstract static class Transferer<E> { /** * 轉(zhuǎn)移(put和take都用這一個(gè)方法) * * @param e 元素 * @param timed 是否超時(shí) * @param nanos 納秒 */ abstract E transfer(E e, boolean timed, long nanos); } /** * 棧實(shí)現(xiàn)類 */ static final class TransferStack<E> extends Transferer<E> { } /** * 隊(duì)列實(shí)現(xiàn)類 */ static final class TransferQueue<E> extends Transferer<E> { }}
SynchronousQueue底層是基于Transferer抽象類實(shí)現(xiàn)的,放數(shù)據(jù)和取數(shù)據(jù)的邏輯都耦合在transfer()方法中。而Transferer抽象類又有兩個(gè)實(shí)現(xiàn)類,分別是基于棧結(jié)構(gòu)實(shí)現(xiàn)和基于隊(duì)列實(shí)現(xiàn)。
SynchronousQueue常用的初始化方法有兩個(gè):
/** * 無參構(gòu)造方法 */BlockingQueue<Integer> blockingQueue1 = new SynchronousQueue<>();/** * 有參構(gòu)造方法,指定是否使用公平鎖(默認(rèn)使用非公平鎖) */BlockingQueue<Integer> blockingQueue2 = new SynchronousQueue<>(true);
再看一下對(duì)應(yīng)的源碼實(shí)現(xiàn):
/** * 無參構(gòu)造方法 */public SynchronousQueue() { this(false);}/** * 有參構(gòu)造方法,指定是否使用公平鎖 */public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();}
可以看出SynchronousQueue的無參構(gòu)造方法默認(rèn)使用的非公平策略,有參構(gòu)造方法可以指定使用公平策略。操作策略:
/** * 棧實(shí)現(xiàn) */static final class TransferStack<E> extends Transferer<E> { /** * 頭節(jié)點(diǎn)(也是棧頂節(jié)點(diǎn)) */ volatile SNode head; /** * 棧節(jié)點(diǎn)類 */ static final class SNode { /** * 當(dāng)前操作的線程 */ volatile Thread waiter; /** * 節(jié)點(diǎn)值(取數(shù)據(jù)的時(shí)候,該字段為null) */ Object item; /** * 節(jié)點(diǎn)模式(也叫操作類型) */ int mode; /** * 后繼節(jié)點(diǎn) */ volatile SNode next; /** * 匹配到的節(jié)點(diǎn) */ volatile SNode match; }}
節(jié)點(diǎn)模式有以下三種:
類型值 | 類型描述 | 作用 |
0 | REQUEST | 表示取數(shù)據(jù) |
1 | DATA | 表示放數(shù)據(jù) |
2 | FULFILLING | 表示正在執(zhí)行中(比如取數(shù)據(jù)的線程正在匹配放數(shù)據(jù)的線程) |
圖片
transfer()方法中,把放數(shù)據(jù)和取數(shù)據(jù)的邏輯耦合在一塊了,邏輯有點(diǎn)繞,不過核心邏輯就四點(diǎn),把握住就能豁然開朗。其實(shí)就是從棧頂壓入,從棧頂彈出。
詳細(xì)流程如下:
/** * 轉(zhuǎn)移(put和take都用這一個(gè)方法) * * @param e 元素(取數(shù)據(jù)的時(shí)候,元素為null) * @param timed 是否超時(shí) * @param nanos 納秒 */E transfer(E e, boolean timed, long nanos) { SNode s = null; // 1. e為null,表示要取數(shù)據(jù),否則是放數(shù)據(jù) int mode = (e == null) ? REQUEST : DATA; for (; ; ) { SNode h = head; // 2. 如果本次操作跟棧頂節(jié)點(diǎn)模式相同(都是取數(shù)據(jù),或者都是放數(shù)據(jù)),就把本次操作包裝成SNode,壓入棧頂 if (h == null || h.mode == mode) { if (timed && nanos <= 0) { if (h != null && h.isCancelled()) { casHead(h, h.next); } else { return null; } // 3. 把本次操作包裝成SNode,壓入棧頂,并掛起當(dāng)前線程 } else if (casHead(h, s = snode(s, e, h, mode))) { // 4. 掛起當(dāng)前線程 SNode m = awaitFulfill(s, timed, nanos); if (m == s) { clean(s); return null; } // 5. 當(dāng)前線程被喚醒后,如果棧頂有了新節(jié)點(diǎn),就刪除當(dāng)前節(jié)點(diǎn) if ((h = head) != null && h.next == s) { casHead(h, s.next); } return (E) ((mode == REQUEST) ? m.item : s.item); } // 6. 如果棧頂節(jié)點(diǎn)類型跟本次操作不同,并且模式不是FULFILLING類型 } else if (!isFulfilling(h.mode)) { if (h.isCancelled()) { casHead(h, h.next); } // 7. 把本次操作包裝成SNode(類型是FULFILLING),壓入棧頂 else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) { // 8. 使用死循環(huán),直到匹配到對(duì)應(yīng)的節(jié)點(diǎn) for (; ; ) { // 9. 遍歷下個(gè)節(jié)點(diǎn) SNode m = s.next; // 10. 如果節(jié)點(diǎn)是null,表示遍歷到末尾,設(shè)置棧頂節(jié)點(diǎn)是null,結(jié)束。 if (m == null) { casHead(s, null); s = null; break; } SNode mn = m.next; // 11. 如果棧頂?shù)暮罄^節(jié)點(diǎn)跟棧頂節(jié)點(diǎn)匹配成功,就刪除這兩個(gè)節(jié)點(diǎn),結(jié)束。 if (m.tryMatch(s)) { casHead(s, mn); return (E) ((mode == REQUEST) ? m.item : s.item); } else { // 12. 如果沒有匹配成功,就刪除棧頂?shù)暮罄^節(jié)點(diǎn),繼續(xù)匹配 s.casNext(m, mn); } } } } else { // 13. 如果棧頂節(jié)點(diǎn)類型跟本次操作不同,并且是FULFILLING類型, // 就再執(zhí)行一遍上面第8步for循環(huán)中的邏輯(很少概率出現(xiàn)) SNode m = h.next; if (m == null) { casHead(h, null); } else { SNode mn = m.next; if (m.tryMatch(h)) { casHead(h, mn); } else { h.casNext(m, mn); } } } }}
不用關(guān)心細(xì)枝末節(jié),把握住代碼核心邏輯即可。 再看一下第4步,掛起線程的代碼邏輯: 核心邏輯就兩條:
/** * 等待執(zhí)行 * * @param s 節(jié)點(diǎn) * @param timed 是否超時(shí) * @param nanos 超時(shí)時(shí)間 */SNode awaitFulfill(SNode s, boolean timed, long nanos) { // 1. 計(jì)算超時(shí)時(shí)間 final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); // 2. 計(jì)算自旋次數(shù) int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (; ; ) { if (w.isInterrupted()) s.tryCancel(); // 3. 如果已經(jīng)匹配到其他節(jié)點(diǎn),直接返回 SNode m = s.match; if (m != null) return m; if (timed) { // 4. 超時(shí)時(shí)間遞減 nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(); continue; } } // 5. 自旋次數(shù)減一 if (spins > 0) spins = shouldSpin(s) ? (spins - 1) : 0; else if (s.waiter == null) s.waiter = w; // 6. 開始掛起當(dāng)前線程 else if (!timed) LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); }}
再看一下匹配節(jié)點(diǎn)的tryMatch()方法邏輯: 作用就是喚醒棧頂節(jié)點(diǎn),并當(dāng)前節(jié)點(diǎn)傳遞給棧頂節(jié)點(diǎn)。
/** * 匹配節(jié)點(diǎn) * * @param s 當(dāng)前節(jié)點(diǎn) */boolean tryMatch(SNode s) { if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { Thread w = waiter; if (w != null) { waiter = null; // 1. 喚醒棧頂節(jié)點(diǎn) LockSupport.unpark(w); } return true; } // 2. 把當(dāng)前節(jié)點(diǎn)傳遞給棧頂節(jié)點(diǎn) return match == s;}
/** * 隊(duì)列實(shí)現(xiàn) */static final class TransferQueue<E> extends Transferer<E> { /** * 頭節(jié)點(diǎn) */ transient volatile QNode head; /** * 尾節(jié)點(diǎn) */ transient volatile QNode tail; /** * 隊(duì)列節(jié)點(diǎn)類 */ static final class QNode { /** * 當(dāng)前操作的線程 */ volatile Thread waiter; /** * 節(jié)點(diǎn)值 */ volatile Object item; /** * 后繼節(jié)點(diǎn) */ volatile QNode next; /** * 當(dāng)前節(jié)點(diǎn)是否為數(shù)據(jù)節(jié)點(diǎn) */ final boolean isData; }}
可以看出TransferQueue隊(duì)列是使用帶有頭尾節(jié)點(diǎn)的單鏈表實(shí)現(xiàn)的。 還有一點(diǎn)需要提一下,TransferQueue默認(rèn)構(gòu)造方法,會(huì)初始化頭尾節(jié)點(diǎn),默認(rèn)是空節(jié)點(diǎn)。
/** * TransferQueue默認(rèn)的構(gòu)造方法 */TransferQueue() { QNode h = new QNode(null, false); head = h; tail = h;}
隊(duì)列使用的公平策略,體現(xiàn)在,每次操作的時(shí)候,都是從隊(duì)尾壓入,從隊(duì)頭彈出。 詳細(xì)流程如下:
/** * 轉(zhuǎn)移(put和take都用這一個(gè)方法) * * @param e 元素(取數(shù)據(jù)的時(shí)候,元素為null) * @param timed 是否超時(shí) * @param nanos 超時(shí)時(shí)間 */E transfer(E e, boolean timed, long nanos) { QNode s = null; // 1. e不為null,表示要放數(shù)據(jù),否則是取數(shù)據(jù) boolean isData = (e != null); for (; ; ) { QNode t = tail; QNode h = head; if (t == null || h == null) { continue; } // 2. 如果本次操作跟隊(duì)尾節(jié)點(diǎn)模式相同(都是取數(shù)據(jù),或者都是放數(shù)據(jù)),就把本次操作包裝成QNode,壓入隊(duì)尾 if (h == t || t.isData == isData) { QNode tn = t.next; if (t != tail) { continue; } if (tn != null) { advanceTail(t, tn); continue; } if (timed && nanos <= 0) { return null; } // 3. 把本次操作包裝成QNode,壓入隊(duì)尾 if (s == null) { s = new QNode(e, isData); } if (!t.casNext(null, s)) { continue; } advanceTail(t, s); // 4. 掛起當(dāng)前線程 Object x = awaitFulfill(s, e, timed, nanos); // 5. 當(dāng)前線程被喚醒后,返回返回傳遞過來的節(jié)點(diǎn)值 if (x == s) { clean(t, s); return null; } if (!s.isOffList()) { advanceHead(t, s); if (x != null) { s.item = s; } s.waiter = null; } return (x != null) ? (E) x : e; } else { // 6. 如果本次操作跟隊(duì)尾節(jié)點(diǎn)模式不同,就從隊(duì)頭結(jié)點(diǎn)開始遍歷,找到模式相匹配的節(jié)點(diǎn) QNode m = h.next; if (t != tail || m == null || h != head) { continue; } Object x = m.item; // 7. 把當(dāng)前節(jié)點(diǎn)值e傳遞給匹配到的節(jié)點(diǎn)m if (isData == (x != null) || x == m || !m.casItem(x, e)) { advanceHead(h, m); continue; } // 8. 彈出隊(duì)頭節(jié)點(diǎn),并喚醒節(jié)點(diǎn)m advanceHead(h, m); LockSupport.unpark(m.waiter); return (x != null) ? (E) x : e; } }}
看完了底層源碼,再看一下上層包裝好的工具方法。
放數(shù)據(jù)的方法有四個(gè):
操作 | 拋出異常 | 返回特定值 | 阻塞 | 阻塞一段時(shí)間 |
放數(shù)據(jù) | add() | offer() | put() | offer(e, time, unit) |
先看一下offer()方法源碼,其他放數(shù)據(jù)方法邏輯也是大同小異,底層都是調(diào)用的transfer()方法實(shí)現(xiàn)。 如果沒有匹配到合適的節(jié)點(diǎn),offer()方法會(huì)直接返回false,表示插入失敗。
/** * offer方法入口 * * @param e 元素 * @return 是否插入成功 */public boolean offer(E e) { // 1. 判空,傳參不允許為null if (e == null) { throw new NullPointerException(); } // 2. 調(diào)用底層transfer方法 return transferer.transfer(e, true, 0) != null;}
再看一下另外三個(gè)添加元素方法源碼:
如果沒有匹配到合適的節(jié)點(diǎn),add()方法會(huì)拋出異常,底層基于offer()實(shí)現(xiàn)。
/** * add方法入口 * * @param e 元素 * @return 是否添加成功 */public boolean add(E e) { if (offer(e)) { return true; } else { throw new IllegalStateException("Queue full"); }}
如果沒有匹配到合適的節(jié)點(diǎn),put()方法會(huì)一直阻塞,直到有其他線程取走數(shù)據(jù),才能添加成功。
/** * put方法入口 * * @param e 元素 */public void put(E e) throws InterruptedException { // 1. 判空,傳參不允許為null if (e == null) { throw new NullPointerException(); } // 2. 調(diào)用底層transfer方法 if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); }}
再看一下offer(e, time, unit)方法源碼,如果沒有匹配到合適的節(jié)點(diǎn), offer(e, time, unit)方法會(huì)阻塞一段時(shí)間,然后返回false。
/** * offer方法入口 * * @param e 元素 * @param timeout 超時(shí)時(shí)間 * @param unit 時(shí)間單位 * @return 是否添加成功 */public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { // 1. 判空,傳參不允許為null if (e == null) { throw new NullPointerException(); } // 2. 調(diào)用底層transfer方法 if (transferer.transfer(e, true, unit.toNanos(timeout)) != null) { return true; } if (!Thread.interrupted()) { return false; } throw new InterruptedException();}
彈出數(shù)據(jù)(取出數(shù)據(jù)并刪除)的方法有四個(gè):
操作 | 拋出異常 | 返回特定值 | 阻塞 | 阻塞一段時(shí)間 |
取數(shù)據(jù)(同時(shí)刪除數(shù)據(jù)) | remove() | poll() | take() | poll(time, unit) |
看一下poll()方法源碼,其他方取數(shù)據(jù)法邏輯大同小異,底層都是調(diào)用的transfer方法實(shí)現(xiàn)。 poll()方法在彈出元素的時(shí)候,如果沒有匹配到合適的節(jié)點(diǎn),直接返回null,表示彈出失敗。
/** * poll方法入口 */public E poll() { // 調(diào)用底層transfer方法 return transferer.transfer(null, true, 0);}
再看一下remove()方法源碼,如果沒有匹配到合適的節(jié)點(diǎn),remove()會(huì)拋出異常。
/** * remove方法入口 */public E remove() { // 1. 直接調(diào)用poll方法 E x = poll(); // 2. 如果取到數(shù)據(jù),直接返回,否則拋出異常 if (x != null) { return x; } else { throw new NoSuchElementException(); }}
再看一下take()方法源碼,如果沒有匹配到合適的節(jié)點(diǎn),take()方法就一直阻塞,直到被喚醒。
/** * take方法入口 */public E take() throws InterruptedException { // 調(diào)用底層transfer方法 E e = transferer.transfer(null, false, 0); if (e != null) { return e; } Thread.interrupted(); throw new InterruptedException();}
再看一下poll(time, unit)方法源碼,如果沒有匹配到合適的節(jié)點(diǎn), poll(time, unit)方法會(huì)阻塞指定時(shí)間,然后然后null。
/** * poll方法入口 * * @param timeout 超時(shí)時(shí)間 * @param unit 時(shí)間單位 * @return 元素 */public E poll(long timeout, TimeUnit unit) throws InterruptedException { // 調(diào)用底層transfer方法 E e = transferer.transfer(null, true, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) { return e; } throw new InterruptedException();}
再看一下查看數(shù)據(jù)源碼,查看數(shù)據(jù),并不刪除數(shù)據(jù)。
操作 | 拋出異常 | 返回特定值 | 阻塞 | 阻塞一段時(shí)間 |
取數(shù)據(jù)(不刪除) | element() | peek() | 不支持 | 不支持 |
先看一下peek()方法源碼,直接返回null,SynchronousQueue不支持這種操作。
/** * peek方法入口 */public E peek() { return null;}
再看一下element()方法源碼,底層調(diào)用的也是peek()方法,也是不支持這種操作。
/** * element方法入口 */public E element() { // 1. 調(diào)用peek方法查詢數(shù)據(jù) E x = peek(); // 2. 如果查到數(shù)據(jù),直接返回 if (x != null) { return x; } else { // 3. 如果沒找到,則拋出異常 throw new NoSuchElementException(); }}
這篇文章講解了SynchronousQueue阻塞隊(duì)列的核心源碼,了解到SynchronousQueue隊(duì)列具有以下特點(diǎn):
本文鏈接:http://www.www897cc.com/showinfo-26-72426-0.html沒研究過SynchronousQueue源碼,就別寫精通線程池
聲明:本網(wǎng)頁內(nèi)容旨在傳播知識(shí),若有侵權(quán)等問題請(qǐng)及時(shí)與本網(wǎng)聯(lián)系,我們將在第一時(shí)間刪除處理。郵件:2376512515@qq.com
上一篇: Spring Boot + Nacos 實(shí)現(xiàn)了一個(gè)動(dòng)態(tài)化線程池,非常實(shí)用!