日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當(dāng)前位置:首頁 > 科技  > 軟件

源碼解密協(xié)程隊列和線程隊列的實現(xiàn)原理

來源: 責(zé)編: 時間:2023-12-05 09:24:26 288觀看
導(dǎo)讀本次來聊一聊 Python 的隊列,首先隊列是一種特殊的線性表,具有先進先出(FIFO)的特性,這意味著元素的入隊順序和出隊順序是一致的。隊列通常用于存儲需要按順序處理的數(shù)據(jù),例如任務(wù)調(diào)度。當(dāng)然隊列最常見的一個應(yīng)用場景就是解

本次來聊一聊 Python 的隊列,首先隊列是一種特殊的線性表,具有先進先出(FIFO)的特性,這意味著元素的入隊順序和出隊順序是一致的。1tl28資訊網(wǎng)——每日最新資訊28at.com

圖片1tl28資訊網(wǎng)——每日最新資訊28at.com

隊列通常用于存儲需要按順序處理的數(shù)據(jù),例如任務(wù)調(diào)度。當(dāng)然隊列最常見的一個應(yīng)用場景就是解耦,一個線程不停地生產(chǎn)數(shù)據(jù),放到隊列里,另一個線程從隊列中取數(shù)據(jù)進行消費。1tl28資訊網(wǎng)——每日最新資訊28at.com

而 Python 也提供了隊列,分別是協(xié)程隊列和線程隊列。1tl28資訊網(wǎng)——每日最新資訊28at.com

import asyncioimport queue# 協(xié)程隊列coroutine_queue = asyncio.Queue()# 線程隊列threading_queue = queue.Queue()

如果你的程序基于 asyncio,那么應(yīng)該使用協(xié)程隊列,如果你的程序采用了多線程,那么應(yīng)該使用線程隊列。1tl28資訊網(wǎng)——每日最新資訊28at.com

下面我們來看一看這兩種隊列的 API,以及底層實現(xiàn)原理。1tl28資訊網(wǎng)——每日最新資訊28at.com

1tl28資訊網(wǎng)——每日最新資訊28at.com

協(xié)程隊列1tl28資訊網(wǎng)——每日最新資訊28at.com

1tl28資訊網(wǎng)——每日最新資訊28at.com

1tl28資訊網(wǎng)——每日最新資訊28at.com

協(xié)程隊列的具體實現(xiàn)由 asyncio 提供,以下是它的一些用法。1tl28資訊網(wǎng)——每日最新資訊28at.com

import asyncioasync def main():    # 創(chuàng)建隊列時可以指定能夠存儲的最大元素個數(shù)    # 不指定則沒有容量限制    queue = asyncio.Queue(maxsize=20)    # 返回容量    print(queue.maxsize)    """    20    """    # 添加元素,如果隊列滿了會阻塞,直到有剩余空間    await queue.put(111)    # 添加元素,如果隊列滿了會拋異常    # 因為不需要阻塞等待,所以 put_nowait 不是協(xié)程函數(shù)    queue.put_nowait(222)    # 隊列是否已滿    print(queue.full())    """    False    """    # 返回隊列內(nèi)部的元素個數(shù)    print(queue.qsize())    """    2    """    # 從隊列中獲取元素,如果隊列為空,會阻塞,直到隊列中有可用元素    print(await queue.get())    """    111    """    # 從隊列中獲取元素,如果隊列為空,會拋異常    # 因為不需要阻塞等待,所以 put_nowait 不是協(xié)程函數(shù)    print(queue.get_nowait())    """    222    """    # 隊列是否為空    print(queue.empty())    """    True    """asyncio.run(main())

所以協(xié)程隊列的 API 很簡單,我們再羅列一下:1tl28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片1tl28資訊網(wǎng)——每日最新資訊28at.com

然后,協(xié)程隊列還有兩個 API,需要單獨說明,分別是 task_done() 和 join()。1tl28資訊網(wǎng)——每日最新資訊28at.com

首先在協(xié)程隊列內(nèi)部有一個 _unfinished_tasks 屬性,初始值為 0,每當(dāng)往隊列添加一個元素時,該屬性的值就會自增 1。但是從隊列取出元素時,該屬性不會自動減 1,需要手動調(diào)用 task_done() 方法。1tl28資訊網(wǎng)——每日最新資訊28at.com

所以 _unfinished_tasks 記錄了隊列中有多少個任務(wù)數(shù)據(jù)需要處理,每來一個自動加 1,但取走一個不會自動減 1,而是需要 task_done 來實現(xiàn)。1tl28資訊網(wǎng)——每日最新資訊28at.com

然后 join() 的作用是,當(dāng) _unfinished_tasks 不為 0 的時候,await queue.join() 會阻塞,直到為 0。1tl28資訊網(wǎng)——每日最新資訊28at.com

import asyncioasync def consumer(queue, n):    print(f"consumer{n} 開始消費")    await asyncio.sleep(3)    await queue.get()    # 獲取數(shù)據(jù)后,調(diào)用 task_done    queue.task_done()    print(f"consumer{n} 消費完畢")async def main():    queue = asyncio.Queue()    await queue.put(123)    await queue.put(456)    await queue.put(789)    # 隊列里面有三個數(shù)據(jù),開啟三個消費者去消費    await asyncio.gather(        consumer(queue, 1),        consumer(queue, 2),        consumer(queue, 3),    )    # 這里會陷入阻塞,直到 _unfinished_tasks 變?yōu)?0    await queue.join()    print("main 解除阻塞")asyncio.run(main())"""consumer1 開始消費consumer2 開始消費consumer3 開始消費consumer1 消費完畢consumer2 消費完畢consumer3 消費完畢main 解除阻塞"""

還是比較簡單的,然后我們來看一下協(xié)程隊列的具體實現(xiàn)細節(jié)。1tl28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片1tl28資訊網(wǎng)——每日最新資訊28at.com

首先協(xié)程隊列內(nèi)部有一個 _queue 屬性,它是一個雙端隊列,負責(zé)保存具體的元素。因為要保證兩端的操作都是高效的,所以采用雙端隊列實現(xiàn)。1tl28資訊網(wǎng)——每日最新資訊28at.com

然后是 _getters 和 _putters 兩個屬性,它們是做什么的呢?在隊列滿了的時候,協(xié)程往隊列添加元素時會陷入阻塞,等到隊列有剩余空間時會解除阻塞。同理,在隊列為空時,協(xié)程從隊列獲取元素時會陷入阻塞,等到隊列有可用元素時會解除阻塞。1tl28資訊網(wǎng)——每日最新資訊28at.com

那么這個阻塞等待,以及自動喚醒并解除阻塞是怎么實現(xiàn)的呢?在介紹鎖和信號量的時候,我們分析過整個實現(xiàn)過程,協(xié)程隊列與之類似。1tl28資訊網(wǎng)——每日最新資訊28at.com

假設(shè)協(xié)程從隊列獲取元素,但是隊列為空,于是會創(chuàng)建一個 Future 對象,并保存起來,當(dāng)前保存的地方就是 _getters,它也是雙端隊列。然后 await future,此時就會陷入阻塞,當(dāng)其它協(xié)程往隊列中添加元素時,會將 _getters 里面的 future 彈出,設(shè)置結(jié)果集。因此 await future 的協(xié)程就會解除阻塞,因為隊列有可用元素了。1tl28資訊網(wǎng)——每日最新資訊28at.com

同理,協(xié)程往隊列添加元素也是如此,如果隊列滿了,同樣創(chuàng)建一個 Future 對象,并保存起來,當(dāng)前保存的地方就是 _putters。然后 await future,陷入阻塞,當(dāng)其它協(xié)程從隊列中取出元素,會將 _putters 里面的 future 彈出,設(shè)置結(jié)果集。因此 await future 的協(xié)程就會解除阻塞,因為隊列有可用空間了。1tl28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片1tl28資訊網(wǎng)——每日最新資訊28at.com

三個內(nèi)部調(diào)用的方法,_get 方法負責(zé)從隊列的頭部彈出元素,_put 方法負責(zé)從隊列的尾部追加元素,比較簡單。然后是 _wakeup_next 方法,它負責(zé)喚醒阻塞的協(xié)程。參數(shù) waiters 要么是 _getters,要么是 _putters,從里面彈出一個 future,設(shè)置結(jié)果集,讓對應(yīng)的協(xié)程解除阻塞。1tl28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片1tl28資訊網(wǎng)——每日最新資訊28at.com

  • qsize() 負責(zé)返回隊列的元素個數(shù);
  • maxsize 負責(zé)返回隊列的容量;
  • empty() 負責(zé)判斷隊列是否為空;
  • full() 負責(zé)判斷隊列是否已滿,如果容量小于等于 0,那么表示容量無限,隊列永遠不會滿。否則判斷元素個數(shù)是否大于等于容量;

圖片圖片1tl28資訊網(wǎng)——每日最新資訊28at.com

然后看看 put_nowait 和 get_nowait,首先是 put_nowait,往隊列添加元素。1tl28資訊網(wǎng)——每日最新資訊28at.com

如果添加時發(fā)現(xiàn)隊列已滿,那么拋出異常。如果未滿,則調(diào)用 _put 方法往 _queue 里面添加元素,因為元素的實際存儲是由 self._queue 這個雙端隊列負責(zé)的。1tl28資訊網(wǎng)——每日最新資訊28at.com

添加完畢后,將 _unfinished_task 加 1。最后從 _getters 里面彈出 future,設(shè)置結(jié)果集,讓因獲取不到元素而陷入阻塞的協(xié)程解除阻塞(同時會將添加的元素取走)。1tl28資訊網(wǎng)——每日最新資訊28at.com

get_nowait 的邏輯也很簡單,如果隊列為空,直接拋異常。如果不為空,則調(diào)用 _get 方法從隊列中彈出元素。最后從 _putters 里面彈出 future,設(shè)置結(jié)果集,讓因隊列已滿、無法添加元素而陷入阻塞的協(xié)程解除阻塞(同時會將元素添加進隊列)。1tl28資訊網(wǎng)——每日最新資訊28at.com

再來看看 put 方法的實現(xiàn)細節(jié):1tl28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片1tl28資訊網(wǎng)——每日最新資訊28at.com

結(jié)果和我們之前分析的一樣,只是源碼內(nèi)部多做了一些異常檢測。再來看看 get 方法,它的實現(xiàn)細節(jié)和 put 是類似的。1tl28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片1tl28資訊網(wǎng)——每日最新資訊28at.com

比較簡單,還是沒什么難度的,最后再來看看 task_done 和 join 兩個方法。1tl28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片1tl28資訊網(wǎng)——每日最新資訊28at.com

協(xié)程隊列里面使用了 asyncio.Event,它表示事件,如果事件對象沒有調(diào)用 set 方法設(shè)置標志位,那么調(diào)用 wait 方法時會陷入阻塞。當(dāng)事件對象調(diào)用 set 方法時,wait 會解除阻塞。1tl28資訊網(wǎng)——每日最新資訊28at.com

所以協(xié)程隊列的 join 方法的邏輯就是,當(dāng) _unfinished_tasks 大于 0 時,調(diào)用事件對象的 wait 方法陷入阻塞。1tl28資訊網(wǎng)——每日最新資訊28at.com

而 task_done 方法的作用就是將 _unfinished_tasks 減 1,當(dāng)它的值屬性為 0 時,調(diào)用事件對象的 set 方法,讓 join 解除阻塞。1tl28資訊網(wǎng)——每日最新資訊28at.com

以上就是整個協(xié)程隊列的實現(xiàn)細節(jié),具體的元素存儲是由 collections.deque 來承載的。并在隊列已滿或者為空時,通過 Future 對象來實現(xiàn)阻塞等待和自動喚醒。1tl28資訊網(wǎng)——每日最新資訊28at.com

另外除了先進先出隊列之外,還有先進后出隊列,一般稱為 LIFO 隊列,它的效果類似于棧。1tl28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片1tl28資訊網(wǎng)——每日最新資訊28at.com

這個沒什么好說的,因為是先進后出,所以添加和彈出都在同一端,直接使用列表實現(xiàn)即可。并且由于 LifoQueue 繼承 Queue,所以它的 API 和普通的協(xié)程隊列是一樣的。1tl28資訊網(wǎng)——每日最新資訊28at.com

除了先進先出隊列,還有一個優(yōu)先隊列。1tl28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片1tl28資訊網(wǎng)——每日最新資訊28at.com

它的 API 和普通的協(xié)程隊列也是一致的,只不過優(yōu)先隊列在添加元素時,需要指定一個優(yōu)先級:(優(yōu)先級, 元素),優(yōu)先級的值越低,表示優(yōu)先級越高。然后在內(nèi)部,會按照優(yōu)先級的高低,維護一個小根堆,堆頂元素便是優(yōu)先級最高的元素。1tl28資訊網(wǎng)——每日最新資訊28at.com

這幾個隊列具體使用哪一種,則取決于具體的業(yè)務(wù)場景。1tl28資訊網(wǎng)——每日最新資訊28at.com

1tl28資訊網(wǎng)——每日最新資訊28at.com

線程隊列

1tl28資訊網(wǎng)——每日最新資訊28at.com

1tl28資訊網(wǎng)——每日最新資訊28at.com

說完了協(xié)程隊列,再來看看線程隊列,它們的 API 是類似的,但實現(xiàn)細節(jié)則不同。因為操作系統(tǒng)感知不到協(xié)程,所以協(xié)程隊列的阻塞等待是基于 Future 實現(xiàn)的,而線程隊列的阻塞等待是基于條件變量(和互斥鎖)實現(xiàn)的。1tl28資訊網(wǎng)——每日最新資訊28at.com

還是先來看看線程隊列的一些 API,和協(xié)程隊列是類似的。1tl28資訊網(wǎng)——每日最新資訊28at.com

from queue import Queue# 可以指定一個 maxsize 參數(shù),表示隊列的容量# 默認為 0,表示隊列的容量無限queue = Queue(maxsize=20)# 查看容量print(queue.maxsize)"""20"""# 查看隊列的元素個數(shù)print(queue.qsize())"""0"""# 判斷隊列是否已滿print(queue.full())"""False"""# 判斷隊列是否為空print(queue.empty())"""True"""# 往隊列中添加元素# block 參數(shù)表示是否阻塞,默認為 True,當(dāng)隊列已滿時,線程會阻塞# timeout 表示超時時間,默認為 None,表示會無限等待# 當(dāng)然也可以給 timeout 傳一個具體的值# 如果在規(guī)定時間內(nèi),沒有將元素放入隊列,那么拋異常queue.put(123, block=True, timeout=None)# 也是往隊列中添加元素,但是當(dāng)隊列已滿時,會直接拋異常# put_nowait(item) 本質(zhì)上就是 put(item, block=False)queue.put_nowait(456)# 從隊列中取出元素# 同樣可以傳遞 block 和 timeout 參數(shù)# block 默認為 True,當(dāng)隊列為空時會陷入阻塞# timeout 默認為 None,表示會無限等待print(queue.get(block=True, timeout=None))"""123"""# 也是從隊列中取出元素,但是當(dāng)隊列為空時,會直接拋異常# get_nowait() 本質(zhì)上就是 get(block=False)print(queue.get_nowait())"""456"""# task_done(),將 unfinished_tasks 屬性的值減 1print(queue.unfinished_tasks)  """2"""queue.task_done()queue.task_done()print(queue.unfinished_tasks)"""0"""# join(),當(dāng) unfinished_tasks 不為 0 時,陷入阻塞queue.join()

API 和協(xié)程隊列是相似的,我們羅列一下:1tl28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片1tl28資訊網(wǎng)——每日最新資訊28at.com

線程隊列的具體使用我們已經(jīng)知道了,下面來看看它的具體實現(xiàn)。1tl28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片1tl28資訊網(wǎng)——每日最新資訊28at.com

線程隊列的內(nèi)部依舊使用雙端隊列進行元素存儲,并且還使用了一個互斥鎖和三個條件變量。1tl28資訊網(wǎng)——每日最新資訊28at.com

為了保證數(shù)據(jù)的一致性和線程安全,當(dāng)隊列在多線程環(huán)境中被修改(比如添加或刪除元素)時,需要使用互斥鎖。任何需要修改隊列的操作都必須在獲取到互斥鎖之后進行,以防止多個線程同時對隊列進行修改,否則會導(dǎo)致數(shù)據(jù)不一致或其它錯誤。同時,一旦對隊列的修改完成,必須立即釋放互斥鎖,以便其它線程可以訪問隊列。1tl28資訊網(wǎng)——每日最新資訊28at.com

然后是 not_empty 條件變量,當(dāng)一個新元素被添加到隊列時,應(yīng)該向 not_empty發(fā)送一個信號。這個動作會通知那些想從隊列中獲取元素,但因隊列為空而陷入阻塞的線程,現(xiàn)在隊列中已經(jīng)有了新的元素,它們可以繼續(xù)執(zhí)行獲取元素的操作。1tl28資訊網(wǎng)——每日最新資訊28at.com

接下來是 not_full 條件變量,當(dāng)從隊列中取走一個元素時,應(yīng)該向 not_full 發(fā)送一個信號。這個動作通知那些想往隊列添加元素,但因隊列已滿而陷入阻塞的線程,現(xiàn)在隊列中已經(jīng)有了可用空間,它們可以繼續(xù)執(zhí)行添加元素的操作。1tl28資訊網(wǎng)——每日最新資訊28at.com

最后是 all_tasks_done 條件變量,當(dāng)處理的任務(wù)全部完成,即計數(shù)器 unfinished_task 為 0 時,應(yīng)該向 all_tasks_done 發(fā)送一個信號。這個動作會通知那些執(zhí)行了 join() 方法而陷入阻塞的線程,它們可以繼續(xù)往下執(zhí)行了。1tl28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片1tl28資訊網(wǎng)——每日最新資訊28at.com

因為線程隊列采用了雙端隊列存儲元素,所以雙端隊列的長度就是線程隊列的元素個數(shù)。如果元素個數(shù)為 0,那么隊列就是空;如果容量大于 0,并且小于等于元素個數(shù),那么隊列就滿了。1tl28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片1tl28資訊網(wǎng)——每日最新資訊28at.com

前面說了,put_nowait 和 get_nowait 本質(zhì)上就是調(diào)用了 put 和 get,所以我們的重點是 put 和 get 兩個方法。1tl28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片1tl28資訊網(wǎng)——每日最新資訊28at.com

以上就是 put 方法的底層實現(xiàn),不難理解。說完了 put,再來看看 get。1tl28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片1tl28資訊網(wǎng)——每日最新資訊28at.com

最后是 task_done 和 join 方法,看看它們的內(nèi)部邏輯。1tl28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片1tl28資訊網(wǎng)——每日最新資訊28at.com

調(diào)用 join 方法,當(dāng) unfinished_task 大于 0 時,會陷入阻塞。調(diào)用 task_done 方法,會將未完成任務(wù)數(shù)減 1,如果為 0,那么喚醒阻塞等待的線程。1tl28資訊網(wǎng)——每日最新資訊28at.com

需要注意的是,喚醒調(diào)用的方法不是 notify,而是 notify_all。對于添加元素和獲取元素,每次顯然只能喚醒一個線程,此時調(diào)用 notify。而 unfinished_task 為 0 時,應(yīng)該要喚醒所有等待的線程,因此要調(diào)用 notify_all。1tl28資訊網(wǎng)——每日最新資訊28at.com

最后線程隊列也有相應(yīng)的 PriorityQueue 和 LifoQueue,它們的用法、實現(xiàn)和協(xié)程里面的這兩個隊列是一樣的。1tl28資訊網(wǎng)——每日最新資訊28at.com

1tl28資訊網(wǎng)——每日最新資訊28at.com

小結(jié)

1tl28資訊網(wǎng)——每日最新資訊28at.com

1tl28資訊網(wǎng)——每日最新資訊28at.com

以上便是協(xié)程隊列和線程隊列的具體用法和實現(xiàn)原理,它們本質(zhì)上都是基于雙端隊列實現(xiàn)具體的元素存儲,并且在隊列已滿和隊列為空時,可以阻塞等待。1tl28資訊網(wǎng)——每日最新資訊28at.com

只不過協(xié)程隊列是通過 Future 對象實現(xiàn)的,而線程隊列是通過條件變量實現(xiàn)的。1tl28資訊網(wǎng)——每日最新資訊28at.com

當(dāng)然,除了協(xié)程隊列和線程隊列,還有進程隊列,但進程隊列要復(fù)雜的多。因此關(guān)于進程隊列的實現(xiàn)細節(jié),我們以后專門花篇幅去介紹。1tl28資訊網(wǎng)——每日最新資訊28at.com

本文鏈接:http://www.www897cc.com/showinfo-26-38110-0.html源碼解密協(xié)程隊列和線程隊列的實現(xiàn)原理

聲明:本網(wǎng)頁內(nèi)容旨在傳播知識,若有侵權(quán)等問題請及時與本網(wǎng)聯(lián)系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: 被人說 Lambda 代碼像...,那是沒用下面這三個方法

下一篇: 一次 K8s 升級,竟然導(dǎo)致滴滴故障 12 小時?

標簽:
  • 熱門焦點
Top 主站蜘蛛池模板: 龙陵县| 阿城市| 蒙自县| 班戈县| 吴忠市| 巴青县| 文登市| 隆安县| 潍坊市| 古浪县| 顺昌县| 麻栗坡县| 青浦区| 昌江| 克什克腾旗| 富平县| 芷江| 岳西县| 嘉黎县| 肃宁县| 东台市| 湖州市| 长岭县| 海城市| 嘉兴市| 永年县| 民乐县| 龙山县| 扶沟县| 福建省| 山西省| 合作市| 城口县| 邢台市| 金沙县| 绍兴县| 昭平县| 静海县| 防城港市| 承德市| 余干县|