日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當(dāng)前位置:首頁 > 科技  > 軟件

一文帶你徹底掌握阻塞隊(duì)列!

來源: 責(zé)編: 時(shí)間:2023-12-15 09:49:51 235觀看
導(dǎo)讀一、摘要在之前的文章中,我們介紹了生產(chǎn)者和消費(fèi)者模型的最基本實(shí)現(xiàn)思路,相信大家對它已經(jīng)有一個(gè)初步的認(rèn)識(shí)。在 Java 的并發(fā)包里面還有一個(gè)非常重要的接口:BlockingQueue。BlockingQueue是一個(gè)阻塞隊(duì)列,更為準(zhǔn)確的解釋是

一、摘要

在之前的文章中,我們介紹了生產(chǎn)者和消費(fèi)者模型的最基本實(shí)現(xiàn)思路,相信大家對它已經(jīng)有一個(gè)初步的認(rèn)識(shí)。XFZ28資訊網(wǎng)——每日最新資訊28at.com

在 Java 的并發(fā)包里面還有一個(gè)非常重要的接口:BlockingQueue。XFZ28資訊網(wǎng)——每日最新資訊28at.com

BlockingQueue是一個(gè)阻塞隊(duì)列,更為準(zhǔn)確的解釋是:BlockingQueue是一個(gè)基于阻塞機(jī)制實(shí)現(xiàn)的線程安全的隊(duì)列。通過它也可以實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者模型,并且效率更高、安全可靠,相比之前介紹的生產(chǎn)者和消費(fèi)者模型,它可以同時(shí)實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者并行運(yùn)行。XFZ28資訊網(wǎng)——每日最新資訊28at.com

XFZ28資訊網(wǎng)——每日最新資訊28at.com

那什么是阻塞隊(duì)列呢?XFZ28資訊網(wǎng)——每日最新資訊28at.com

簡單的說,就是當(dāng)參數(shù)在入隊(duì)和出隊(duì)時(shí),通過加鎖的方式來避免線程并發(fā)操作時(shí)導(dǎo)致的數(shù)據(jù)異常問題。XFZ28資訊網(wǎng)——每日最新資訊28at.com

在 Java 中,能對線程并發(fā)執(zhí)行進(jìn)行加鎖的方式主要有synchronized和ReentrantLock,其中BlockingQueue采用的是ReentrantLock方式實(shí)現(xiàn)。XFZ28資訊網(wǎng)——每日最新資訊28at.com

與此對應(yīng)的還有非阻塞機(jī)制的隊(duì)列,主要是采用 CAS 方式來控制并發(fā)操作,例如:ConcurrentLinkedQueue,這個(gè)我們在后面的文章再進(jìn)行分享介紹。XFZ28資訊網(wǎng)——每日最新資訊28at.com

今天我們主要介紹BlockingQueue相關(guān)的知識(shí)和用法,廢話不多說了,進(jìn)入正題!XFZ28資訊網(wǎng)——每日最新資訊28at.com

二、BlockingQueue 方法介紹

打開BlockingQueue的源碼,你會(huì)發(fā)現(xiàn)它繼承自Queue,正如上文提到的,它本質(zhì)是一個(gè)隊(duì)列接口。XFZ28資訊網(wǎng)——每日最新資訊28at.com

public interface BlockingQueue<E> extends Queue<E> { //...省略}

關(guān)于隊(duì)列,我們在之前的集合系列文章中對此有過深入的介紹,本篇就再次簡單的介紹一下。XFZ28資訊網(wǎng)——每日最新資訊28at.com

隊(duì)列其實(shí)是一個(gè)數(shù)據(jù)結(jié)構(gòu),元素遵循先進(jìn)先出的原則,所有新元素的插入,也被稱為入隊(duì)操作,會(huì)插入到隊(duì)列的尾部;元素的移除,也被稱為出隊(duì)操作,會(huì)從隊(duì)列的頭部開始移除,從而保證先進(jìn)先出的原則。XFZ28資訊網(wǎng)——每日最新資訊28at.com

在Queue接口中,總共有 6 個(gè)方法,可以分為 3 類,分別是:插入、移除、查詢,內(nèi)容如下:XFZ28資訊網(wǎng)——每日最新資訊28at.com

方法描述add(e)插入元素,如果插入失敗,就拋異常offer(e)插入元素,如果插入成功,就返回 true;反之 falseremove()移除元素,如果移除失敗,就拋異常poll()移除元素,如果移除成功,返回 true;反之 falseelement()獲取隊(duì)首元素,如果獲取結(jié)果為空,就拋異常peek()獲取隊(duì)首元素,如果獲取結(jié)果為空,返回空對象XFZ28資訊網(wǎng)——每日最新資訊28at.com

因?yàn)锽lockingQueue是Queue的子接口,了解Queue接口里面的方法,有助于我們對BlockingQueue的理解。XFZ28資訊網(wǎng)——每日最新資訊28at.com

除此之外,BlockingQueue還單獨(dú)擴(kuò)展了一些特有的方法,內(nèi)容如下:XFZ28資訊網(wǎng)——每日最新資訊28at.com

方法描述put(e)插入元素,如果沒有插入成功,線程會(huì)一直阻塞,直到隊(duì)列中有空間再繼續(xù)offer(e, time, unit)插入元素,如果在指定的時(shí)間內(nèi)沒有插入成功,就返回 false;反之 truetake()移除元素,如果沒有移除成功,線程會(huì)一直阻塞,直到隊(duì)列中新的數(shù)據(jù)被加入poll(time, unit)移除元素,如果在指定的時(shí)間內(nèi)沒有移除成功,就返回 false;反之 truedrainTo(Collection c, int maxElements)一次性取走隊(duì)列中的數(shù)據(jù)到 c 中,可以指定取的個(gè)數(shù)。該方法可以提升獲取數(shù)據(jù)效率,不需要多次分批加鎖或釋放鎖XFZ28資訊網(wǎng)——每日最新資訊28at.com

分析源碼,你會(huì)發(fā)現(xiàn)相比普通的Queue子類,BlockingQueue子類主要有以下幾個(gè)明顯的不同點(diǎn):XFZ28資訊網(wǎng)——每日最新資訊28at.com

  • 1.元素插入和移除時(shí)線程安全:主要是通過在入隊(duì)和出隊(duì)時(shí)進(jìn)行加鎖,保證了隊(duì)列線程安全,加鎖邏輯采用ReentrantLock實(shí)現(xiàn)
  • 2.支持阻塞的入隊(duì)和出隊(duì)方法:當(dāng)隊(duì)列滿時(shí),會(huì)阻塞入隊(duì)的線程,直到隊(duì)列不滿;當(dāng)隊(duì)列為空時(shí),會(huì)阻塞出隊(duì)的線程,直到隊(duì)列中有元素;同時(shí)支持超時(shí)機(jī)制,防止線程一直阻塞

三、BlockingQueue 用法詳解

打開源碼,BlockingQueue接口的實(shí)現(xiàn)類非常多,我們重點(diǎn)講解一下其中的 5 個(gè)非常重要的實(shí)現(xiàn)類,分別如下表所示。XFZ28資訊網(wǎng)——每日最新資訊28at.com

實(shí)現(xiàn)類功能ArrayBlockingQueue基于數(shù)組的阻塞隊(duì)列,使用數(shù)組存儲(chǔ)數(shù)據(jù),需要指定長度,所以是一個(gè)有界隊(duì)列LinkedBlockingQueue基于鏈表的阻塞隊(duì)列,使用鏈表存儲(chǔ)數(shù)據(jù),默認(rèn)是一個(gè)無界隊(duì)列;也可以通過構(gòu)造方法中的capacity設(shè)置最大元素?cái)?shù)量,所以也可以作為有界隊(duì)列SynchronousQueue一種沒有緩沖的隊(duì)列XFZ28資訊網(wǎng)——每日最新資訊28at.com

生產(chǎn)者產(chǎn)生的數(shù)據(jù)直接會(huì)被消費(fèi)者獲取并且立刻消費(fèi)PriorityBlockingQueue基于優(yōu)先級(jí)別的阻塞隊(duì)列,底層基于數(shù)組實(shí)現(xiàn),是一個(gè)無界隊(duì)列DelayQueue延遲隊(duì)列,其中的元素只有到了其指定的延遲時(shí)間,才能夠從隊(duì)列中出隊(duì)XFZ28資訊網(wǎng)——每日最新資訊28at.com

下面我們對以上實(shí)現(xiàn)類的用法,進(jìn)行一一介紹。XFZ28資訊網(wǎng)——每日最新資訊28at.com

3.1、ArrayBlockingQueue

ArrayBlockingQueue是一個(gè)基于數(shù)組的阻塞隊(duì)列,初始化的時(shí)候必須指定隊(duì)列大小,源碼實(shí)現(xiàn)比較簡單,采用的是ReentrantLock和Condition實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者模型,部分核心源碼如下:XFZ28資訊網(wǎng)——每日最新資訊28at.com

public class ArrayBlockingQueue<E> extends AbstractQueue<E>        implements BlockingQueue<E>, java.io.Serializable { /** 使用數(shù)組存儲(chǔ)隊(duì)列中的元素 */ final Object[] items; /** 使用獨(dú)占鎖ReetrantLock */ final ReentrantLock lock; /** 等待出隊(duì)的條件 */ private final Condition notEmpty; /** 等待入隊(duì)的條件 */ private final Condition notFull; /** 初始化時(shí),需要指定隊(duì)列大小 */ public ArrayBlockingQueue(int capacity) {        this(capacity, false);    }    /** 初始化時(shí),也指出指定是否為公平鎖, */    public ArrayBlockingQueue(int capacity, boolean fair) {        if (capacity <= 0)            throw new IllegalArgumentException();        this.items = new Object[capacity];        lock = new ReentrantLock(fair);        notEmpty = lock.newCondition();        notFull =  lock.newCondition();    }    /**入隊(duì)操作*/    public void put(E e) throws InterruptedException {        checkNotNull(e);        final ReentrantLock lock = this.lock;        lock.lockInterruptibly();        try {            while (count == items.length)                notFull.await();            enqueue(e);        } finally {            lock.unlock();        }    }    /**出隊(duì)操作*/    public E take() throws InterruptedException {        final ReentrantLock lock = this.lock;        lock.lockInterruptibly();        try {            while (count == 0)                notEmpty.await();            return dequeue();        } finally {            lock.unlock();        }    }}

ArrayBlockingQueue采用ReentrantLock進(jìn)行加鎖,只有一個(gè)ReentrantLock對象,這意味著生產(chǎn)者和消費(fèi)者無法并行運(yùn)行。XFZ28資訊網(wǎng)——每日最新資訊28at.com

我們看一個(gè)簡單的示例代碼如下:XFZ28資訊網(wǎng)——每日最新資訊28at.com

public class Container {    /**     * 初始化阻塞隊(duì)列     */    private final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);    /**     * 添加數(shù)據(jù)到阻塞隊(duì)列     * @param value     */    public void add(Integer value) {        try {            queue.put(value);            System.out.println("生產(chǎn)者:"+ Thread.currentThread().getName()+",add:" + value);        } catch (InterruptedException e) {            e.printStackTrace();        }    }    /**     * 從阻塞隊(duì)列獲取數(shù)據(jù)     */    public void get() {        try {            Integer value = queue.take();            System.out.println("消費(fèi)者:"+ Thread.currentThread().getName()+",value:" + value);        } catch (InterruptedException e) {            e.printStackTrace();        }    }}
/** * 生產(chǎn)者 */public class Producer extends Thread {    private Container container;    public Producer(Container container) {        this.container = container;    }    @Override    public void run() {        for (int i = 0; i < 6; i++) {            container.add(i);        }    }}
/** * 消費(fèi)者 */public class Consumer extends Thread {    private Container container;    public Consumer(Container container) {        this.container = container;    }    @Override    public void run() {        for (int i = 0; i < 6; i++) {            container.get();        }    }}
/** * 測試類 */public class MyThreadTest {    public static void main(String[] args) {        Container container = new Container();        Producer producer = new Producer(container);        Consumer consumer = new Consumer(container);        producer.start();        consumer.start();    }}

運(yùn)行結(jié)果如下:XFZ28資訊網(wǎng)——每日最新資訊28at.com

生產(chǎn)者:Thread-0,add:0生產(chǎn)者:Thread-0,add:1生產(chǎn)者:Thread-0,add:2生產(chǎn)者:Thread-0,add:3生產(chǎn)者:Thread-0,add:4生產(chǎn)者:Thread-0,add:5消費(fèi)者:Thread-1,value:0消費(fèi)者:Thread-1,value:1消費(fèi)者:Thread-1,value:2消費(fèi)者:Thread-1,value:3消費(fèi)者:Thread-1,value:4消費(fèi)者:Thread-1,value:5

可以很清晰的看到,生產(chǎn)者線程執(zhí)行完畢之后,消費(fèi)者線程才開始消費(fèi)。XFZ28資訊網(wǎng)——每日最新資訊28at.com

3.2、LinkedBlockingQueue

LinkedBlockingQueue是一個(gè)基于鏈表的阻塞隊(duì)列,初始化的時(shí)候無須指定隊(duì)列大小,默認(rèn)隊(duì)列長度為Integer.MAX_VALUE,也就是 int 型最大值。XFZ28資訊網(wǎng)——每日最新資訊28at.com

同樣的,采用的是ReentrantLock和Condition實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者模型,不同的是它使用了兩個(gè)lock,這意味著生產(chǎn)者和消費(fèi)者可以并行運(yùn)行,程序執(zhí)行效率進(jìn)一步得到提升。XFZ28資訊網(wǎng)——每日最新資訊28at.com

部分核心源碼如下:XFZ28資訊網(wǎng)——每日最新資訊28at.com

public class LinkedBlockingQueue<E> extends AbstractQueue<E>        implements BlockingQueue<E>, java.io.Serializable {    /** 使用出隊(duì)獨(dú)占鎖ReetrantLock */    private final ReentrantLock takeLock = new ReentrantLock();    /** 等待出隊(duì)的條件 */    private final Condition notEmpty = takeLock.newCondition();    /** 使用入隊(duì)獨(dú)占鎖ReetrantLock */    private final ReentrantLock putLock = new ReentrantLock();    /** 等待入隊(duì)的條件 */    private final Condition notFull = putLock.newCondition();    /**入隊(duì)操作*/    public void put(E e) throws InterruptedException {        if (e == null) throw new NullPointerException();        int c = -1;        Node<E> node = new Node<E>(e);        final ReentrantLock putLock = this.putLock;        final AtomicInteger count = this.count;        putLock.lockInterruptibly();        try {            while (count.get() == capacity) {                notFull.await();            }            enqueue(node);            c = count.getAndIncrement();            if (c + 1 < capacity)                notFull.signal();        } finally {            putLock.unlock();        }        if (c == 0)            signalNotEmpty();    }    /**出隊(duì)操作*/    public E take() throws InterruptedException {        E x;        int c = -1;        final AtomicInteger count = this.count;        final ReentrantLock takeLock = this.takeLock;        takeLock.lockInterruptibly();        try {            while (count.get() == 0) {                notEmpty.await();            }            x = dequeue();            c = count.getAndDecrement();            if (c > 1)                notEmpty.signal();        } finally {            takeLock.unlock();        }        if (c == capacity)            signalNotFull();        return x;    }}

把最上面的樣例Container中的阻塞隊(duì)列實(shí)現(xiàn)類換成LinkedBlockingQueue,調(diào)整如下:XFZ28資訊網(wǎng)——每日最新資訊28at.com

/** * 初始化阻塞隊(duì)列 */private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

再次運(yùn)行結(jié)果如下:XFZ28資訊網(wǎng)——每日最新資訊28at.com

生產(chǎn)者:Thread-0,add:0消費(fèi)者:Thread-1,value:0生產(chǎn)者:Thread-0,add:1消費(fèi)者:Thread-1,value:1生產(chǎn)者:Thread-0,add:2消費(fèi)者:Thread-1,value:2生產(chǎn)者:Thread-0,add:3生產(chǎn)者:Thread-0,add:4生產(chǎn)者:Thread-0,add:5消費(fèi)者:Thread-1,value:3消費(fèi)者:Thread-1,value:4消費(fèi)者:Thread-1,value:5

可以很清晰的看到,生產(chǎn)者線程和消費(fèi)者線程,交替并行執(zhí)行。XFZ28資訊網(wǎng)——每日最新資訊28at.com

3.3、SynchronousQueue

SynchronousQueue是一個(gè)沒有緩沖的隊(duì)列,生產(chǎn)者產(chǎn)生的數(shù)據(jù)直接會(huì)被消費(fèi)者獲取并且立刻消費(fèi),相當(dāng)于傳統(tǒng)的一個(gè)請求對應(yīng)一個(gè)應(yīng)答模式。XFZ28資訊網(wǎng)——每日最新資訊28at.com

相比ArrayBlockingQueue和LinkedBlockingQueue,SynchronousQueue實(shí)現(xiàn)機(jī)制也不同,它主要采用隊(duì)列和棧來實(shí)現(xiàn)數(shù)據(jù)的傳遞,中間不存儲(chǔ)任何數(shù)據(jù),生產(chǎn)的數(shù)據(jù)必須得消費(fèi)者處理,線程阻塞方式采用 JDK 提供的LockSupport park/unpark函數(shù)來完成,也支持公平和非公平兩種模式。XFZ28資訊網(wǎng)——每日最新資訊28at.com

  • 當(dāng)采用公平模式時(shí):使用一個(gè) FIFO 隊(duì)列來管理多余的生產(chǎn)者和消費(fèi)者
  • 當(dāng)采用非公平模式時(shí):使用一個(gè) LIFO 棧來管理多余的生產(chǎn)者和消費(fèi)者,這也是SynchronousQueue默認(rèn)的模式

部分核心源碼如下:XFZ28資訊網(wǎng)——每日最新資訊28at.com

public class SynchronousQueue<E> extends AbstractQueue<E>    implements BlockingQueue<E>, java.io.Serializable {    /**不同的策略實(shí)現(xiàn)*/    private transient volatile Transferer<E> transferer; /**默認(rèn)非公平模式*/    public SynchronousQueue() {        this(false);    }    /**可以選策略,也可以采用公平模式*/    public SynchronousQueue(boolean fair) {        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();    } /**入隊(duì)操作*/    public void put(E e) throws InterruptedException {        if (e == null) throw new NullPointerException();        if (transferer.transfer(e, false, 0) == null) {            Thread.interrupted();            throw new InterruptedException();        }    }    /**出隊(duì)操作*/    public E take() throws InterruptedException {        E e = transferer.transfer(null, false, 0);        if (e != null)            return e;        Thread.interrupted();        throw new InterruptedException();    }}

同樣的,把最上面的樣例Container中的阻塞隊(duì)列實(shí)現(xiàn)類換成SynchronousQueue,代碼如下:XFZ28資訊網(wǎng)——每日最新資訊28at.com

public class Container {    /**     * 初始化阻塞隊(duì)列     */    private final BlockingQueue<Integer> queue = new SynchronousQueue<>();    /**     * 添加數(shù)據(jù)到阻塞隊(duì)列     * @param value     */    public void add(Integer value) {        try {            queue.put(value);            Thread.sleep(100);            System.out.println("生產(chǎn)者:"+ Thread.currentThread().getName()+",add:" + value);        } catch (InterruptedException e) {            e.printStackTrace();        }    }    /**     * 從阻塞隊(duì)列獲取數(shù)據(jù)     */    public void get() {        try {            Integer value = queue.take();            Thread.sleep(200);            System.out.println("消費(fèi)者:"+ Thread.currentThread().getName()+",value:" + value);        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

再次運(yùn)行結(jié)果如下:XFZ28資訊網(wǎng)——每日最新資訊28at.com

生產(chǎn)者:Thread-0,add:0消費(fèi)者:Thread-1,value:0生產(chǎn)者:Thread-0,add:1消費(fèi)者:Thread-1,value:1生產(chǎn)者:Thread-0,add:2消費(fèi)者:Thread-1,value:2生產(chǎn)者:Thread-0,add:3消費(fèi)者:Thread-1,value:3生產(chǎn)者:Thread-0,add:4消費(fèi)者:Thread-1,value:4生產(chǎn)者:Thread-0,add:5消費(fèi)者:Thread-1,value:5

可以很清晰的看到,生產(chǎn)者線程和消費(fèi)者線程,交替串行執(zhí)行,生產(chǎn)者每投遞一條數(shù)據(jù),消費(fèi)者處理一條數(shù)據(jù)。XFZ28資訊網(wǎng)——每日最新資訊28at.com

3.4、PriorityBlockingQueue

PriorityBlockingQueue是一個(gè)基于優(yōu)先級(jí)別的阻塞隊(duì)列,底層基于數(shù)組實(shí)現(xiàn),可以認(rèn)為是一個(gè)無界隊(duì)列。XFZ28資訊網(wǎng)——每日最新資訊28at.com

PriorityBlockingQueue與ArrayBlockingQueue的實(shí)現(xiàn)邏輯,基本相似,也是采用ReentrantLock來實(shí)現(xiàn)加鎖的操作。XFZ28資訊網(wǎng)——每日最新資訊28at.com

最大不同點(diǎn)在于:XFZ28資訊網(wǎng)——每日最新資訊28at.com

  • 1.PriorityBlockingQueue內(nèi)部基于數(shù)組實(shí)現(xiàn)的最小二叉堆算法,可以對隊(duì)列中的元素進(jìn)行排序,插入隊(duì)列的元素需要實(shí)現(xiàn)Comparator或者Comparable接口,以便對元素進(jìn)行排序
  • 2.其次,隊(duì)列的長度是可擴(kuò)展的,不需要顯式指定長度,上限為Integer.MAX_VALUE - 8

部分核心源碼如下:XFZ28資訊網(wǎng)——每日最新資訊28at.com

public class PriorityBlockingQueue<E> extends AbstractQueue<E>    implements BlockingQueue<E>, java.io.Serializable {  /**隊(duì)列元素*/    private transient Object[] queue;    /**比較器*/    private transient Comparator<? super E> comparator;    /**采用ReentrantLock進(jìn)行加鎖*/    private final ReentrantLock lock;    /**條件等待與通知*/    private final Condition notEmpty;    /**入隊(duì)操作*/    public boolean offer(E e) {        if (e == null)            throw new NullPointerException();        final ReentrantLock lock = this.lock;        lock.lock();        int n, cap;        Object[] array;        while ((n = size) >= (cap = (array = queue).length))            tryGrow(array, cap);        try {            Comparator<? super E> cmp = comparator;            if (cmp == null)                siftUpComparable(n, e, array);            else                siftUpUsingComparator(n, e, array, cmp);            size = n + 1;            notEmpty.signal();        } finally {            lock.unlock();        }        return true;    }    /**出隊(duì)操作*/    public E take() throws InterruptedException {        final ReentrantLock lock = this.lock;        lock.lockInterruptibly();        E result;        try {            while ( (result = dequeue()) == null)                notEmpty.await();        } finally {            lock.unlock();        }        return result;    }}

同樣的,把最上面的樣例Container中的阻塞隊(duì)列實(shí)現(xiàn)類換成PriorityBlockingQueue,調(diào)整如下:XFZ28資訊網(wǎng)——每日最新資訊28at.com

/** * 初始化阻塞隊(duì)列 */private final BlockingQueue<Integer> queue = new PriorityBlockingQueue<>();

生產(chǎn)者插入數(shù)據(jù)的內(nèi)容,我們改下插入順序。XFZ28資訊網(wǎng)——每日最新資訊28at.com

/** * 生產(chǎn)者 */public class Producer extends Thread {    private Container container;    public Producer(Container container) {        this.container = container;    }    @Override    public void run() {        container.add(5);        container.add(3);        container.add(1);        container.add(2);        container.add(0);        container.add(4);    }}

最后運(yùn)行結(jié)果如下:XFZ28資訊網(wǎng)——每日最新資訊28at.com

生產(chǎn)者:Thread-0,add:5生產(chǎn)者:Thread-0,add:3生產(chǎn)者:Thread-0,add:1生產(chǎn)者:Thread-0,add:2生產(chǎn)者:Thread-0,add:0生產(chǎn)者:Thread-0,add:4消費(fèi)者:Thread-1,value:0消費(fèi)者:Thread-1,value:1消費(fèi)者:Thread-1,value:2消費(fèi)者:Thread-1,value:3消費(fèi)者:Thread-1,value:4消費(fèi)者:Thread-1,value:5

從日志上可以很明顯看出,對于整數(shù),默認(rèn)情況下,按照升序排序,消費(fèi)者默認(rèn)從 0 開始處理。XFZ28資訊網(wǎng)——每日最新資訊28at.com

3.5、DelayQueue

DelayQueue是一個(gè)線程安全的延遲隊(duì)列,存入隊(duì)列的元素不會(huì)立刻被消費(fèi),只有到了其指定的延遲時(shí)間,才能夠從隊(duì)列中出隊(duì)。XFZ28資訊網(wǎng)——每日最新資訊28at.com

底層采用的是PriorityQueue來存儲(chǔ)元素,DelayQueue的特點(diǎn)在于:插入隊(duì)列中的數(shù)據(jù)可以按照自定義的delay時(shí)間進(jìn)行排序,快到期的元素會(huì)排列在前面,只有delay時(shí)間小于 0 的元素才能夠被取出。XFZ28資訊網(wǎng)——每日最新資訊28at.com

部分核心源碼如下:XFZ28資訊網(wǎng)——每日最新資訊28at.com

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>    implements BlockingQueue<E> {    /**采用ReentrantLock進(jìn)行加鎖*/    private final transient ReentrantLock lock = new ReentrantLock();    /**采用PriorityQueue進(jìn)行存儲(chǔ)數(shù)據(jù)*/    private final PriorityQueue<E> q = new PriorityQueue<E>(); /**條件等待與通知*/    private final Condition available = lock.newCondition();    /**入隊(duì)操作*/    public boolean offer(E e) {        final ReentrantLock lock = this.lock;        lock.lock();        try {            q.offer(e);            if (q.peek() == e) {                leader = null;                available.signal();            }            return true;        } finally {            lock.unlock();        }    }    /**出隊(duì)操作*/    public E poll() {        final ReentrantLock lock = this.lock;        lock.lock();        try {            E first = q.peek();            if (first == null || first.getDelay(NANOSECONDS) > 0)                return null;            else                return q.poll();        } finally {            lock.unlock();        }    }}

同樣的,把最上面的樣例Container中的阻塞隊(duì)列實(shí)現(xiàn)類換成DelayQueue,代碼如下:XFZ28資訊網(wǎng)——每日最新資訊28at.com

public class Container {    /**     * 初始化阻塞隊(duì)列     */    private final BlockingQueue<DelayedUser> queue = new DelayQueue<DelayedUser>();    /**     * 添加數(shù)據(jù)到阻塞隊(duì)列     * @param value     */    public void add(DelayedUser value) {        try {            queue.put(value);            System.out.println("生產(chǎn)者:"+ Thread.currentThread().getName()+",add:" + value);        } catch (InterruptedException e) {            e.printStackTrace();        }    }    /**     * 從阻塞隊(duì)列獲取數(shù)據(jù)     */    public void get() {        try {            DelayedUser value = queue.take();            String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());            System.out.println(time + " 消費(fèi)者:"+ Thread.currentThread().getName()+",value:" + value);        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

DelayQueue隊(duì)列中的元素需要顯式實(shí)現(xiàn)Delayed接口,定義一個(gè)DelayedUser類,代碼如下:XFZ28資訊網(wǎng)——每日最新資訊28at.com

public class DelayedUser implements Delayed {    /**     * 當(dāng)前時(shí)間戳     */    private long start;    /**     * 延遲時(shí)間(單位:毫秒)     */    private long delayedTime;    /**     * 名稱     */    private String name;    public DelayedUser(long delayedTime, String name) {        this.start = System.currentTimeMillis();        this.delayedTime = delayedTime;        this.name = name;    }    @Override    public long getDelay(TimeUnit unit) {        // 獲取當(dāng)前延遲的時(shí)間        long diffTime = (start + delayedTime) - System.currentTimeMillis();        return unit.convert(diffTime,TimeUnit.MILLISECONDS);    }    @Override    public int compareTo(Delayed o) {        // 判斷當(dāng)前對象的延遲時(shí)間是否大于目標(biāo)對象的延遲時(shí)間        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));    }    @Override    public String toString() {        return "DelayedUser{" +                "delayedTime=" + delayedTime +                ", name='" + name + '/'' +                '}';    }}

生產(chǎn)者插入數(shù)據(jù)的內(nèi)容,做如下調(diào)整。XFZ28資訊網(wǎng)——每日最新資訊28at.com

/** * 生產(chǎn)者 */public class Producer extends Thread {    private Container container;    public Producer(Container container) {        this.container = container;    }    @Override    public void run() {        for (int i = 0; i < 6; i++) {            container.add(new DelayedUser(1000 * i, "張三" +  i));        }    }}

最后運(yùn)行結(jié)果如下:XFZ28資訊網(wǎng)——每日最新資訊28at.com

生產(chǎn)者:Thread-0,add:DelayedUser{delayedTime=0, name='張三0'}生產(chǎn)者:Thread-0,add:DelayedUser{delayedTime=1000, name='張三1'}生產(chǎn)者:Thread-0,add:DelayedUser{delayedTime=2000, name='張三2'}生產(chǎn)者:Thread-0,add:DelayedUser{delayedTime=3000, name='張三3'}生產(chǎn)者:Thread-0,add:DelayedUser{delayedTime=4000, name='張三4'}生產(chǎn)者:Thread-0,add:DelayedUser{delayedTime=5000, name='張三5'}2023-11-03 14:55:33 消費(fèi)者:Thread-1,value:DelayedUser{delayedTime=0, name='張三0'}2023-11-03 14:55:34 消費(fèi)者:Thread-1,value:DelayedUser{delayedTime=1000, name='張三1'}2023-11-03 14:55:35 消費(fèi)者:Thread-1,value:DelayedUser{delayedTime=2000, name='張三2'}2023-11-03 14:55:36 消費(fèi)者:Thread-1,value:DelayedUser{delayedTime=3000, name='張三3'}2023-11-03 14:55:37 消費(fèi)者:Thread-1,value:DelayedUser{delayedTime=4000, name='張三4'}2023-11-03 14:55:38 消費(fèi)者:Thread-1,value:DelayedUser{delayedTime=5000, name='張三5'}

可以很清晰的看到,延遲時(shí)間最低的排在最前面。XFZ28資訊網(wǎng)——每日最新資訊28at.com

四、小結(jié)

最后我們來總結(jié)一下BlockingQueue阻塞隊(duì)列接口,它提供了很多非常豐富的生產(chǎn)者和消費(fèi)者模型的編程實(shí)現(xiàn),同時(shí)兼顧了線程安全和執(zhí)行效率的特點(diǎn)。XFZ28資訊網(wǎng)——每日最新資訊28at.com

開發(fā)者可以通過BlockingQueue阻塞隊(duì)列接口,簡單的代碼編程即可實(shí)現(xiàn)多線程中數(shù)據(jù)高效安全傳輸?shù)哪康?,確切的說,它幫助開發(fā)者減輕了不少的編程難度。XFZ28資訊網(wǎng)——每日最新資訊28at.com

在實(shí)際的業(yè)務(wù)開發(fā)中,其中LinkedBlockingQueue使用的是最廣泛的,因?yàn)樗膱?zhí)行效率最高,在使用的時(shí)候,需要平衡好隊(duì)列長度,防止過大導(dǎo)致內(nèi)存溢出。XFZ28資訊網(wǎng)——每日最新資訊28at.com

舉個(gè)最簡單的例子,比如某個(gè)功能上線之后,需要做下壓力測試,總共需要請求 10000 次,采用 100 個(gè)線程去執(zhí)行,測試服務(wù)是否能正常工作。如何實(shí)現(xiàn)呢?XFZ28資訊網(wǎng)——每日最新資訊28at.com

可能有的同學(xué)想到,每個(gè)線程執(zhí)行 100 次請求,啟動(dòng) 100 個(gè)線程去執(zhí)行,可以是可以,就是有點(diǎn)笨拙。XFZ28資訊網(wǎng)——每日最新資訊28at.com

其實(shí)還有另一個(gè)辦法,就是將 10000 個(gè)請求對象,存入到阻塞隊(duì)列中,然后采用 100 個(gè)線程去消費(fèi)執(zhí)行,這種編程模型會(huì)更佳靈活。XFZ28資訊網(wǎng)——每日最新資訊28at.com

具體示例代碼如下:XFZ28資訊網(wǎng)——每日最新資訊28at.com

public static void main(String[] args) throws InterruptedException {    // 將每個(gè)用戶訪問百度服務(wù)的請求任務(wù),存入阻塞隊(duì)列中    // 也可以也采用多線程寫入    BlockingQueue<String> queue = new LinkedBlockingQueue<>();    for (int i = 0; i < 10000; i++) {        queue.put("https://www.baidu.com?paramKey=" + i);    }    // 模擬100個(gè)線程,執(zhí)行10000次請求訪問百度    final int threadNum = 100;    for (int i = 0; i < threadNum; i++) {        final int threadCount = i + 1;        new Thread(new Runnable() {            @Override            public void run() {                System.out.println("thread " + threadCount + " start");                boolean over = false;                while (!over) {                    String url = queue.poll();                    if(Objects.nonNull(url)) {                        // 發(fā)起請求                        String result =HttpUtils.getUrl(url);                        System.out.println("thread " + threadCount + " run result:" + result);                    }else {                        // 任務(wù)結(jié)束                        over = true;                        System.out.println("thread " + threadCount + " final");                    }                }            }        }).start();    }}

本文主要圍繞BlockingQueue阻塞隊(duì)列接口,從方法介紹到用法詳解,做了一次知識(shí)總結(jié),如果有描述不對的地方,歡迎留言指出!XFZ28資訊網(wǎng)——每日最新資訊28at.com

本文鏈接:http://www.www897cc.com/showinfo-26-46337-0.html一文帶你徹底掌握阻塞隊(duì)列!

聲明:本網(wǎng)頁內(nèi)容旨在傳播知識(shí),若有侵權(quán)等問題請及時(shí)與本網(wǎng)聯(lián)系,我們將在第一時(shí)間刪除處理。郵件:2376512515@qq.com

上一篇: 格力董明珠再評(píng)孟羽童:只想著利用平臺(tái)當(dāng)網(wǎng)紅,在公司內(nèi)產(chǎn)生不好影響

下一篇: 7k Star,一款開源的 Kafka 管理平臺(tái),功能齊全、頁面美觀!

標(biāo)簽:
  • 熱門焦點(diǎn)
  • K60 Pro官方停產(chǎn) 第三方瞬間漲價(jià)

    雖然沒有官方宣布,但Redmi的一些高管也已經(jīng)透露了,Redmi K60 Pro已經(jīng)停產(chǎn)且不會(huì)補(bǔ)貨,這一切都是為了即將到來的K60 Ultra鋪路,屬于廠家的正常操作。但有意思的是該機(jī)在停產(chǎn)之后
  • 石頭自清潔掃拖機(jī)器人G10S評(píng)測:多年黑科技集大成之作 懶人終極福音

    科技圈經(jīng)常能看到一個(gè)詞叫“縫合怪”,用來形容那些把好多功能或者外觀結(jié)合在一起的產(chǎn)品,通常這樣的詞是貶義詞,但如果真的是產(chǎn)品縫合的好、縫合的實(shí)用的話,那它就成了中性詞,今
  • JavaScript 混淆及反混淆代碼工具

    介紹在我們開始學(xué)習(xí)反混淆之前,我們首先要了解一下代碼混淆。如果不了解代碼是如何混淆的,我們可能無法成功對代碼進(jìn)行反混淆,尤其是使用自定義混淆器對其進(jìn)行混淆時(shí)。什么是混
  • 三言兩語說透設(shè)計(jì)模式的藝術(shù)-單例模式

    寫在前面單例模式是一種常用的軟件設(shè)計(jì)模式,它所創(chuàng)建的對象只有一個(gè)實(shí)例,且該實(shí)例易于被外界訪問。單例對象由于只有一個(gè)實(shí)例,所以它可以方便地被系統(tǒng)中的其他對象共享,從而減少
  • 只需五步,使用start.spring.io快速入門Spring編程

    步驟1打開https://start.spring.io/,按照屏幕截圖中的內(nèi)容創(chuàng)建項(xiàng)目,添加 Spring Web 依賴項(xiàng),并單擊“生成”按鈕下載 .zip 文件,為下一步做準(zhǔn)備。請?jiān)谶M(jìn)入步驟2之前進(jìn)行解壓。圖
  • Python異步IO編程的進(jìn)程/線程通信實(shí)現(xiàn)

    這篇文章再講3種方式,同時(shí)講4中進(jìn)程間通信的方式一、 Python 中線程間通信的實(shí)現(xiàn)方式共享變量共享變量是多個(gè)線程可以共同訪問的變量。在Python中,可以使用threading模塊中的L
  • 每天一道面試題-CPU偽共享

    前言:了不起:又到了每天一到面試題的時(shí)候了!學(xué)弟,最近學(xué)習(xí)的怎么樣啊 了不起學(xué)弟:最近學(xué)習(xí)的還不錯(cuò),每天都在學(xué)習(xí),每天都在進(jìn)步! 了不起:那你最近學(xué)習(xí)的什么呢? 了不起學(xué)弟:最近在學(xué)習(xí)C
  • 共享單車的故事講到哪了?

    來源丨??素?cái)經(jīng)與共享充電寶相差不多,共享單車已很久沒有被國內(nèi)熱點(diǎn)新聞關(guān)照到了。除了一再漲價(jià)和用戶直呼用不起了。近日多家媒體再發(fā)報(bào)道稱,成都、天津、鄭州等地多個(gè)共享單
  • DRAM存儲(chǔ)器10月價(jià)格下跌,NAND閃存本月價(jià)格與上月持平

    10月30日,據(jù)韓國媒體消息,自今年年初以來一直在上漲的 DRAM 存儲(chǔ)器的交易價(jià)格僅在本月就下跌了近 10%,此次是全年首次降價(jià),而NAND 閃存本月價(jià)格與上月持平。市
Top 主站蜘蛛池模板: 淮安市| 观塘区| 吴堡县| 军事| 柘荣县| 民勤县| 莫力| 揭阳市| 普格县| 酉阳| 西城区| 磴口县| 于都县| 洛隆县| 新宁县| 佛山市| 梁山县| 乐安县| 文成县| 镶黄旗| 宁安市| 钦州市| 随州市| 阳江市| 新昌县| 宁都县| 明星| 石屏县| 阜阳市| 张家港市| 遵化市| 孝义市| 白河县| 东城区| 思茅市| 彭山县| 惠来县| 浦江县| 光山县| 晋江市| 沧州市|