在之前的文章中,我們介紹了生產者和消費者模型的最基本實現思路,相信大家對它已經有一個初步的認識。
在 Java 的并發包里面還有一個非常重要的接口:BlockingQueue。
BlockingQueue是一個阻塞隊列,更為準確的解釋是:BlockingQueue是一個基于阻塞機制實現的線程安全的隊列。通過它也可以實現生產者和消費者模型,并且效率更高、安全可靠,相比之前介紹的生產者和消費者模型,它可以同時實現生產者和消費者并行運行。
那什么是阻塞隊列呢?
簡單的說,就是當參數在入隊和出隊時,通過加鎖的方式來避免線程并發操作時導致的數據異常問題。
在 Java 中,能對線程并發執行進行加鎖的方式主要有synchronized和ReentrantLock,其中BlockingQueue采用的是ReentrantLock方式實現。
與此對應的還有非阻塞機制的隊列,主要是采用 CAS 方式來控制并發操作,例如:ConcurrentLinkedQueue,這個我們在后面的文章再進行分享介紹。
今天我們主要介紹BlockingQueue相關的知識和用法,廢話不多說了,進入正題!
打開BlockingQueue的源碼,你會發現它繼承自Queue,正如上文提到的,它本質是一個隊列接口。
public interface BlockingQueue<E> extends Queue<E> { //...省略}
關于隊列,我們在之前的集合系列文章中對此有過深入的介紹,本篇就再次簡單的介紹一下。
隊列其實是一個數據結構,元素遵循先進先出的原則,所有新元素的插入,也被稱為入隊操作,會插入到隊列的尾部;元素的移除,也被稱為出隊操作,會從隊列的頭部開始移除,從而保證先進先出的原則。
在Queue接口中,總共有 6 個方法,可以分為 3 類,分別是:插入、移除、查詢,內容如下:
方法描述add(e)插入元素,如果插入失敗,就拋異常offer(e)插入元素,如果插入成功,就返回 true;反之 falseremove()移除元素,如果移除失敗,就拋異常poll()移除元素,如果移除成功,返回 true;反之 falseelement()獲取隊首元素,如果獲取結果為空,就拋異常peek()獲取隊首元素,如果獲取結果為空,返回空對象
因為BlockingQueue是Queue的子接口,了解Queue接口里面的方法,有助于我們對BlockingQueue的理解。
除此之外,BlockingQueue還單獨擴展了一些特有的方法,內容如下:
方法描述put(e)插入元素,如果沒有插入成功,線程會一直阻塞,直到隊列中有空間再繼續offer(e, time, unit)插入元素,如果在指定的時間內沒有插入成功,就返回 false;反之 truetake()移除元素,如果沒有移除成功,線程會一直阻塞,直到隊列中新的數據被加入poll(time, unit)移除元素,如果在指定的時間內沒有移除成功,就返回 false;反之 truedrainTo(Collection c, int maxElements)一次性取走隊列中的數據到 c 中,可以指定取的個數。該方法可以提升獲取數據效率,不需要多次分批加鎖或釋放鎖
分析源碼,你會發現相比普通的Queue子類,BlockingQueue子類主要有以下幾個明顯的不同點:
打開源碼,BlockingQueue接口的實現類非常多,我們重點講解一下其中的 5 個非常重要的實現類,分別如下表所示。
實現類功能ArrayBlockingQueue基于數組的阻塞隊列,使用數組存儲數據,需要指定長度,所以是一個有界隊列LinkedBlockingQueue基于鏈表的阻塞隊列,使用鏈表存儲數據,默認是一個無界隊列;也可以通過構造方法中的capacity設置最大元素數量,所以也可以作為有界隊列SynchronousQueue一種沒有緩沖的隊列
生產者產生的數據直接會被消費者獲取并且立刻消費PriorityBlockingQueue基于優先級別的阻塞隊列,底層基于數組實現,是一個無界隊列DelayQueue延遲隊列,其中的元素只有到了其指定的延遲時間,才能夠從隊列中出隊
下面我們對以上實現類的用法,進行一一介紹。
ArrayBlockingQueue是一個基于數組的阻塞隊列,初始化的時候必須指定隊列大小,源碼實現比較簡單,采用的是ReentrantLock和Condition實現生產者和消費者模型,部分核心源碼如下:
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** 使用數組存儲隊列中的元素 */ final Object[] items; /** 使用獨占鎖ReetrantLock */ final ReentrantLock lock; /** 等待出隊的條件 */ private final Condition notEmpty; /** 等待入隊的條件 */ private final Condition notFull; /** 初始化時,需要指定隊列大小 */ public ArrayBlockingQueue(int capacity) { this(capacity, false); } /** 初始化時,也指出指定是否為公平鎖, */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } /**入隊操作*/ public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } /**出隊操作*/ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }}
ArrayBlockingQueue采用ReentrantLock進行加鎖,只有一個ReentrantLock對象,這意味著生產者和消費者無法并行運行。
我們看一個簡單的示例代碼如下:
public class Container { /** * 初始化阻塞隊列 */ private final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10); /** * 添加數據到阻塞隊列 * @param value */ public void add(Integer value) { try { queue.put(value); System.out.println("生產者:"+ Thread.currentThread().getName()+",add:" + value); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 從阻塞隊列獲取數據 */ public void get() { try { Integer value = queue.take(); System.out.println("消費者:"+ Thread.currentThread().getName()+",value:" + value); } catch (InterruptedException e) { e.printStackTrace(); } }}
/** * 生產者 */public class Producer extends Thread { private Container container; public Producer(Container container) { this.container = container; } @Override public void run() { for (int i = 0; i < 6; i++) { container.add(i); } }}
/** * 消費者 */public class Consumer extends Thread { private Container container; public Consumer(Container container) { this.container = container; } @Override public void run() { for (int i = 0; i < 6; i++) { container.get(); } }}
/** * 測試類 */public class MyThreadTest { public static void main(String[] args) { Container container = new Container(); Producer producer = new Producer(container); Consumer consumer = new Consumer(container); producer.start(); consumer.start(); }}
運行結果如下:
生產者:Thread-0,add:0生產者:Thread-0,add:1生產者:Thread-0,add:2生產者:Thread-0,add:3生產者:Thread-0,add:4生產者:Thread-0,add:5消費者:Thread-1,value:0消費者:Thread-1,value:1消費者:Thread-1,value:2消費者:Thread-1,value:3消費者:Thread-1,value:4消費者:Thread-1,value:5
可以很清晰的看到,生產者線程執行完畢之后,消費者線程才開始消費。
LinkedBlockingQueue是一個基于鏈表的阻塞隊列,初始化的時候無須指定隊列大小,默認隊列長度為Integer.MAX_VALUE,也就是 int 型最大值。
同樣的,采用的是ReentrantLock和Condition實現生產者和消費者模型,不同的是它使用了兩個lock,這意味著生產者和消費者可以并行運行,程序執行效率進一步得到提升。
部分核心源碼如下:
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** 使用出隊獨占鎖ReetrantLock */ private final ReentrantLock takeLock = new ReentrantLock(); /** 等待出隊的條件 */ private final Condition notEmpty = takeLock.newCondition(); /** 使用入隊獨占鎖ReetrantLock */ private final ReentrantLock putLock = new ReentrantLock(); /** 等待入隊的條件 */ private final Condition notFull = putLock.newCondition(); /**入隊操作*/ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); } /**出隊操作*/ public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }}
把最上面的樣例Container中的阻塞隊列實現類換成LinkedBlockingQueue,調整如下:
/** * 初始化阻塞隊列 */private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
再次運行結果如下:
生產者:Thread-0,add:0消費者:Thread-1,value:0生產者:Thread-0,add:1消費者:Thread-1,value:1生產者:Thread-0,add:2消費者:Thread-1,value:2生產者:Thread-0,add:3生產者:Thread-0,add:4生產者:Thread-0,add:5消費者:Thread-1,value:3消費者:Thread-1,value:4消費者:Thread-1,value:5
可以很清晰的看到,生產者線程和消費者線程,交替并行執行。
SynchronousQueue是一個沒有緩沖的隊列,生產者產生的數據直接會被消費者獲取并且立刻消費,相當于傳統的一個請求對應一個應答模式。
相比ArrayBlockingQueue和LinkedBlockingQueue,SynchronousQueue實現機制也不同,它主要采用隊列和棧來實現數據的傳遞,中間不存儲任何數據,生產的數據必須得消費者處理,線程阻塞方式采用 JDK 提供的LockSupport park/unpark函數來完成,也支持公平和非公平兩種模式。
部分核心源碼如下:
public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /**不同的策略實現*/ private transient volatile Transferer<E> transferer; /**默認非公平模式*/ public SynchronousQueue() { this(false); } /**可以選策略,也可以采用公平模式*/ public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); } /**入隊操作*/ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } } /**出隊操作*/ public E take() throws InterruptedException { E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); }}
同樣的,把最上面的樣例Container中的阻塞隊列實現類換成SynchronousQueue,代碼如下:
public class Container { /** * 初始化阻塞隊列 */ private final BlockingQueue<Integer> queue = new SynchronousQueue<>(); /** * 添加數據到阻塞隊列 * @param value */ public void add(Integer value) { try { queue.put(value); Thread.sleep(100); System.out.println("生產者:"+ Thread.currentThread().getName()+",add:" + value); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 從阻塞隊列獲取數據 */ public void get() { try { Integer value = queue.take(); Thread.sleep(200); System.out.println("消費者:"+ Thread.currentThread().getName()+",value:" + value); } catch (InterruptedException e) { e.printStackTrace(); } }}
再次運行結果如下:
生產者:Thread-0,add:0消費者:Thread-1,value:0生產者:Thread-0,add:1消費者:Thread-1,value:1生產者:Thread-0,add:2消費者:Thread-1,value:2生產者:Thread-0,add:3消費者:Thread-1,value:3生產者:Thread-0,add:4消費者:Thread-1,value:4生產者:Thread-0,add:5消費者:Thread-1,value:5
可以很清晰的看到,生產者線程和消費者線程,交替串行執行,生產者每投遞一條數據,消費者處理一條數據。
PriorityBlockingQueue是一個基于優先級別的阻塞隊列,底層基于數組實現,可以認為是一個無界隊列。
PriorityBlockingQueue與ArrayBlockingQueue的實現邏輯,基本相似,也是采用ReentrantLock來實現加鎖的操作。
最大不同點在于:
部分核心源碼如下:
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /**隊列元素*/ private transient Object[] queue; /**比較器*/ private transient Comparator<? super E> comparator; /**采用ReentrantLock進行加鎖*/ private final ReentrantLock lock; /**條件等待與通知*/ private final Condition notEmpty; /**入隊操作*/ public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); size = n + 1; notEmpty.signal(); } finally { lock.unlock(); } return true; } /**出隊操作*/ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { while ( (result = dequeue()) == null) notEmpty.await(); } finally { lock.unlock(); } return result; }}
同樣的,把最上面的樣例Container中的阻塞隊列實現類換成PriorityBlockingQueue,調整如下:
/** * 初始化阻塞隊列 */private final BlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
生產者插入數據的內容,我們改下插入順序。
/** * 生產者 */public class Producer extends Thread { private Container container; public Producer(Container container) { this.container = container; } @Override public void run() { container.add(5); container.add(3); container.add(1); container.add(2); container.add(0); container.add(4); }}
最后運行結果如下:
生產者:Thread-0,add:5生產者:Thread-0,add:3生產者:Thread-0,add:1生產者:Thread-0,add:2生產者:Thread-0,add:0生產者:Thread-0,add:4消費者:Thread-1,value:0消費者:Thread-1,value:1消費者:Thread-1,value:2消費者:Thread-1,value:3消費者:Thread-1,value:4消費者:Thread-1,value:5
從日志上可以很明顯看出,對于整數,默認情況下,按照升序排序,消費者默認從 0 開始處理。
DelayQueue是一個線程安全的延遲隊列,存入隊列的元素不會立刻被消費,只有到了其指定的延遲時間,才能夠從隊列中出隊。
底層采用的是PriorityQueue來存儲元素,DelayQueue的特點在于:插入隊列中的數據可以按照自定義的delay時間進行排序,快到期的元素會排列在前面,只有delay時間小于 0 的元素才能夠被取出。
部分核心源碼如下:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { /**采用ReentrantLock進行加鎖*/ private final transient ReentrantLock lock = new ReentrantLock(); /**采用PriorityQueue進行存儲數據*/ private final PriorityQueue<E> q = new PriorityQueue<E>(); /**條件等待與通知*/ private final Condition available = lock.newCondition(); /**入隊操作*/ public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } } /**出隊操作*/ public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else return q.poll(); } finally { lock.unlock(); } }}
同樣的,把最上面的樣例Container中的阻塞隊列實現類換成DelayQueue,代碼如下:
public class Container { /** * 初始化阻塞隊列 */ private final BlockingQueue<DelayedUser> queue = new DelayQueue<DelayedUser>(); /** * 添加數據到阻塞隊列 * @param value */ public void add(DelayedUser value) { try { queue.put(value); System.out.println("生產者:"+ Thread.currentThread().getName()+",add:" + value); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 從阻塞隊列獲取數據 */ public void get() { try { DelayedUser value = queue.take(); String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); System.out.println(time + " 消費者:"+ Thread.currentThread().getName()+",value:" + value); } catch (InterruptedException e) { e.printStackTrace(); } }}
DelayQueue隊列中的元素需要顯式實現Delayed接口,定義一個DelayedUser類,代碼如下:
public class DelayedUser implements Delayed { /** * 當前時間戳 */ private long start; /** * 延遲時間(單位:毫秒) */ private long delayedTime; /** * 名稱 */ private String name; public DelayedUser(long delayedTime, String name) { this.start = System.currentTimeMillis(); this.delayedTime = delayedTime; this.name = name; } @Override public long getDelay(TimeUnit unit) { // 獲取當前延遲的時間 long diffTime = (start + delayedTime) - System.currentTimeMillis(); return unit.convert(diffTime,TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { // 判斷當前對象的延遲時間是否大于目標對象的延遲時間 return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } @Override public String toString() { return "DelayedUser{" + "delayedTime=" + delayedTime + ", name='" + name + '/'' + '}'; }}
生產者插入數據的內容,做如下調整。
/** * 生產者 */public class Producer extends Thread { private Container container; public Producer(Container container) { this.container = container; } @Override public void run() { for (int i = 0; i < 6; i++) { container.add(new DelayedUser(1000 * i, "張三" + i)); } }}
最后運行結果如下:
生產者:Thread-0,add:DelayedUser{delayedTime=0, name='張三0'}生產者:Thread-0,add:DelayedUser{delayedTime=1000, name='張三1'}生產者:Thread-0,add:DelayedUser{delayedTime=2000, name='張三2'}生產者:Thread-0,add:DelayedUser{delayedTime=3000, name='張三3'}生產者:Thread-0,add:DelayedUser{delayedTime=4000, name='張三4'}生產者:Thread-0,add:DelayedUser{delayedTime=5000, name='張三5'}2023-11-03 14:55:33 消費者:Thread-1,value:DelayedUser{delayedTime=0, name='張三0'}2023-11-03 14:55:34 消費者:Thread-1,value:DelayedUser{delayedTime=1000, name='張三1'}2023-11-03 14:55:35 消費者:Thread-1,value:DelayedUser{delayedTime=2000, name='張三2'}2023-11-03 14:55:36 消費者:Thread-1,value:DelayedUser{delayedTime=3000, name='張三3'}2023-11-03 14:55:37 消費者:Thread-1,value:DelayedUser{delayedTime=4000, name='張三4'}2023-11-03 14:55:38 消費者:Thread-1,value:DelayedUser{delayedTime=5000, name='張三5'}
可以很清晰的看到,延遲時間最低的排在最前面。
最后我們來總結一下BlockingQueue阻塞隊列接口,它提供了很多非常豐富的生產者和消費者模型的編程實現,同時兼顧了線程安全和執行效率的特點。
開發者可以通過BlockingQueue阻塞隊列接口,簡單的代碼編程即可實現多線程中數據高效安全傳輸的目的,確切的說,它幫助開發者減輕了不少的編程難度。
在實際的業務開發中,其中LinkedBlockingQueue使用的是最廣泛的,因為它的執行效率最高,在使用的時候,需要平衡好隊列長度,防止過大導致內存溢出。
舉個最簡單的例子,比如某個功能上線之后,需要做下壓力測試,總共需要請求 10000 次,采用 100 個線程去執行,測試服務是否能正常工作。如何實現呢?
可能有的同學想到,每個線程執行 100 次請求,啟動 100 個線程去執行,可以是可以,就是有點笨拙。
其實還有另一個辦法,就是將 10000 個請求對象,存入到阻塞隊列中,然后采用 100 個線程去消費執行,這種編程模型會更佳靈活。
具體示例代碼如下:
public static void main(String[] args) throws InterruptedException { // 將每個用戶訪問百度服務的請求任務,存入阻塞隊列中 // 也可以也采用多線程寫入 BlockingQueue<String> queue = new LinkedBlockingQueue<>(); for (int i = 0; i < 10000; i++) { queue.put("https://www.baidu.com?paramKey=" + i); } // 模擬100個線程,執行10000次請求訪問百度 final int threadNum = 100; for (int i = 0; i < threadNum; i++) { final int threadCount = i + 1; new Thread(new Runnable() { @Override public void run() { System.out.println("thread " + threadCount + " start"); boolean over = false; while (!over) { String url = queue.poll(); if(Objects.nonNull(url)) { // 發起請求 String result =HttpUtils.getUrl(url); System.out.println("thread " + threadCount + " run result:" + result); }else { // 任務結束 over = true; System.out.println("thread " + threadCount + " final"); } } } }).start(); }}
本文主要圍繞BlockingQueue阻塞隊列接口,從方法介紹到用法詳解,做了一次知識總結,如果有描述不對的地方,歡迎留言指出!
本文鏈接:http://www.www897cc.com/showinfo-26-46337-0.html一文帶你徹底掌握阻塞隊列!
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com