本次我們來聊一聊 Socket,以及它如何與 asyncio 搭配使用。
Socket 是對 TCP/IP 協議的一個封裝,可以讓我們更方便地使用 TCP/IP 協議,而不用關注背后的原理。并且我們經常使用的 Web 框架,本質上也是一個 Socket。
所以 Socket 是操作系統對 TCP/IP 網絡協議棧的封裝,并提供了一系列的接口,我們通過這些接口可以實現網絡通信,而不用關注網絡協議的具體細節。
圖片
按照現有的網絡模型,Socket 并不屬于其中的任何一層,但我們可以簡單地將 Socket 理解為傳輸層之上的抽象層,負責連接應用層和傳輸層。Socket 提供了大量的 API,基于這些 API 我們可以非常方便地使用網絡協議棧,在不同主機間進行網絡通信。
Linux 一切皆文件,Socket 也不例外,它被稱為套接字文件,在使用上和普通文件是類似的。
Socket 是什么我們已經知道了,下面來看看如何使用 Socket 進行編程。
圖片
整個過程如下:
我們使用來編寫代碼演示一下這個過程,首先是服務端:
import socket# socket.socket() 會返回一個「主動套接字」server = socket.socket( # 表示使用 IPv4,如果是 socket.AF_INET6 # 則表示使用 IPv6 socket.AF_INET, # 表示建立 TCP 連接,如果是 socket.SOCK_DGRAM # 則表示建立 UDP 連接 socket.SOCK_STREAM)# 當然這兩個參數也可以不傳,因為默認就是它# 設置套接字屬性,這里讓端口釋放后立刻就能再次使用server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)# 將「主動套接字」綁定在某個 IP 和端口上server.bind(("localhost", 12345))# 監聽,此時「主動套接字」會變成「監聽套接字」server.listen(5)# 調用 accept,等待客戶端連接,此時會阻塞在這里# 如果客戶端連接到來,那么會返回「已連接套接字」,也就是這里的 conn# 至于 addr 則是一個元組,保存了客戶端連接的信息(IP 和端口)conn, addr = server.accept()# 下面我們通過「已連接套接字」conn 和客戶端進行消息的收發# 收消息使用 recv、發消息使用 send,和 read、write 本質是一樣的while True: msg = conn.recv(1024) # 當客戶端斷開連接時,msg 會收到一個空字節串 if not msg: print("客戶端已經斷開連接") conn.close() break print("客戶端發來消息:", msg.decode("utf-8")) # 然后我們加點內容之后,再給客戶端發過去 conn.send("服務端收到, 你發的消息是: ".encode("utf-8") + msg)
接下來編寫客戶端:
import socket# 返回主動套接字client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# 連接服務端client.connect(("localhost", 12345))while True: # 發送消息 data = input("請輸入內容: ") if data.strip().lower() in ("q", "quit", "exit"): client.close() print("Bye~~~") break client.send(data.encode("utf-8")) print(client.recv(1024).decode("utf-8"))
啟動服務端和客戶端進行測試:
圖片
還是比較簡單的,當然我們這里的服務端每次只能和一個客戶端通信,如果想服務多個客戶端的話,那么需要為已連接套接字單獨開一個線程和客戶端進行通信,然后主線程繼續調用 accept 方法等待下一個客戶端。
下面來編寫一下多線程的版本,這里只需要編寫服務端即可,客戶端代碼不變。
import socketimport threadingserver = socket.socket()server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)server.bind(("localhost", 12345))server.listen(5)def handle_message(conn, addr): while True: msg = conn.recv(1024) if not msg: print(f"客戶端(ip: {addr[0]}, port: {addr[1]}) 已經斷開連接") conn.close() break print(f"客戶端(ip: {addr[0]}, port: {addr[1]}) 發來消息:", msg.decode("utf-8")) conn.send("服務端收到, 你發的消息是: ".encode("utf-8") + msg)while True: conn, addr = server.accept() threading.Thread( target=handle_message, args=(conn, addr) ).start()
代碼很簡單,就是把已連接套接字和客戶端的通信邏輯寫在了單獨的函數中,每來一個客戶端,服務端都會啟動一個新的線程去執行該函數,然后繼續監聽,等待下一個客戶端連接到來。
然后客戶端代碼不變,我們啟動三個客戶端去和服務端通信,看看結果如何。
圖片
結果一切正常,當然我們這里的代碼比較簡單,就是普通的消息收發。你也可以實現一個更復雜的功能,比如文件下載器,把服務端當成網盤,支持客戶端上傳和下載文件,并不難。
先回顧一下 socket 模型:
圖片
但是注意:我們說在 listen() 這一步,會將主動套接字轉化為監聽套接字,但此時的監聽套接字的類型是阻塞的。阻塞類型的監聽套接字在調用 accept() 方法時,如果沒有客戶端來連接的話,就會一直處于阻塞狀態,那么此時主線程就沒法干其它事情了。
所以要設置為非阻塞,而非阻塞的監聽套接字在調用 accept() 時,如果沒有客戶端來連接,那么主線程不會傻傻地等待,而是會直接返回,然后去做其它的事情。
類似的,我們在創建已連接套接字的時候默認也是阻塞的,阻塞類型的已連接套接字在調用 send() 和 recv() 的時候也會處于阻塞狀態。比如當客戶端一直不發數據的時候,已連接套接字就會一直阻塞在 recv() 這一步。如果是非阻塞類型的已連接套接字,那么當調用 recv() 但卻收不到數據時,也不用處于阻塞狀態,同樣可以直接返回去做其它事情。
import socketserver = socket.socket()server.bind(("localhost", 12345))# 調用 setblocking 方法,傳入 False# 表示將監聽套接字和已連接套接字的類型設置為非阻塞server.setblocking(False)server.listen(5)while True: try: # 非阻塞的監聽套接字調用 accept() 時 # 如果發現沒有客戶端連接,則會立刻拋出 BlockingIOError # 因此這里寫了個死循環 conn, addr = server.accept() except BlockingIOError: pass else: breakwhile True: try: # 同理,非阻塞的已連接套接字在調用 recv() 時 # 如果發現客戶端沒有發數據,那么同樣會報錯 msg = conn.recv(1024) except BlockingIOError: pass else: print(msg.decode("utf-8")) conn.send(b"data from server")
很明顯,雖然上面的代碼在運行的時候正常,但存在兩個問題:
1)雖然 accept() 不阻塞了,在沒有客戶端連接時主線程可以去做其它事情,但如果后續有客戶端連接,主線程要如何得知呢?因此必須要有一種機制,能夠繼續在監聽套接字上等待后續連接請求,并在請求到來時通知主線程。我們上面的做法是寫了一個死循環,但很明顯這是沒有意義的,這種做法還不如使用阻塞的套接字。
2)send() / recv() 不阻塞了,相當于 I/O 讀寫流程不再是阻塞的,讀寫方法都會瞬間完成并返回,也就是說它會采用能讀多少就讀多少、能寫多少就寫多少的策略來執行 I/O 操作,這顯然更符合我們對性能的追求。
圖片
顯然對于非阻塞套接字而言,會面臨一個問題,那就是當我們執行讀取操作時,有可能只讀了一部分數據,剩余的數據客戶端還沒發過來,那么這些數據何時可讀呢?同理寫數據也是這種情況,當緩沖區滿了,而我們的數據還沒有寫完,那么剩下的數據又何時可寫呢?因此同樣要有一種機制,能夠在主線程做別的事情的時候繼續監聽已連接套接字,并且在有數據可讀寫的時候通知主線程。
這樣才能保證主線程既不會像基本 IO 模型一樣,一直在阻塞點等待,也不會無法處理實際到達的客戶端連接請求和可讀寫的數據,而上面所提到的機制便是 I/O 多路復用。
早期的所有框架都是非阻塞 + 回調 + 基于 IO 多路復用的事件循環,這種模式的性能也非常高,Redis 和 Nginx 都是基于這種方式實現了高并發。只是這種編碼方式非常痛苦,它將好端端的自上而下的邏輯分割的四分五裂,而且也不好維護,它使得開發人員在編寫業務邏輯的同時,還要關注并發細節。
因此使用多路復用 + 回調的方式編寫異步化代碼,雖然并發量能上去,但是對開發者很不友好;而使用同步的方式編寫同步代碼,雖然很容易理解,可并發量卻又上不去。那么問題來了,有沒有一種辦法,能夠讓我們在享受異步化帶來的高并發的同時,又能以同步的方式去編寫代碼呢?也就是我們能不能以同步的方式去編寫異步化的代碼呢?
答案是可以的,使用「協程」便可以辦到。協程在這種模式的基礎之上又批了一層外衣,兼顧了開發效率與運行效率。
asyncio 的事件循環提供了處理套接字的一些方法,我們主要會用到三個:
這些方法類似于前面使用的套接字方法,但不同之處在于,它們需要接收非阻塞套接字作為參數,然后返回協程。我們可以等待協程,直到有數據可供操作。
先來看一下 sock_accept(),它類似于 server.accept()。
conn,add = await loop.sock_accept(sock)
然后 sock_recv 和 sock_sendall 的調用方式與 sock_accept 類似,它們接收一個套接字,然后返回協程對象。通過 await 表達式,sock_recv 將會阻塞,直到套接字有可以處理的字節;sock_sendall 接收一個套接字和要發送的數據,同樣會陷入阻塞,直到要發送給套接字的所有數據都發送完畢,成功時返回 None。
data = await loop.sock_recv(sock)await loop.sock_sendall(sock, data)
下面我們就基于 asyncio 設計一個回顯服務器。
import asyncioimport socketasync def echo(conn: socket.socket): loop = asyncio.get_running_loop() # 無限循環等待來自客戶端連接的數據 try: while data := await loop.sock_recv(conn, 1024): # 收到數據之后再將其發送給客戶端 # 為了區分,我們發送的時候在結尾加一個 b"~" await loop.sock_sendall(conn, data + b"~") except Exception as e: print(f"服務出錯: {e}") finally: conn.close()async def listen_for_conn(server: socket.socket): loop = asyncio.get_running_loop() while True: conn, addr = await loop.sock_accept(server) conn.setblocking(False) print(f"收到客戶端 {addr} 的連接") # 每次連接時,都創建一個任務來監聽客戶端的數據 asyncio.create_task(echo(conn))async def main(): server = socket.socket() server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) server.setblocking(False) server.bind(("localhost", 12345)) server.listen() await listen_for_conn(server)asyncio.run(main())
運行這個應用程序可以同時服務多個客戶端,它里面同樣使用了 IO 多路復用,只不過事件循環將它封裝起來了,我們不需要直接面對。所以這種編程模式就簡單多了。
如果使用阻塞套接字創建應用程序,那么阻塞套接字將在等待數據時停止整個線程。這阻止了我們實現并發,因為一次只能從一個客戶端獲取數據。
使用非阻塞套接字構建應用程序,這些套接字總是會立即返回,而結果有兩種:要么已經準備好了數據,要么因為沒有數據而出現異常。
使用 asyncio 的事件循環方法來構建具有非阻塞套接字的應用程序,這些方法接收一個套接字并返回一個協程,然后可在 await 表達式中使用它。這將暫停父協程,直到套接字帶有數據。事件循環就是基于 IO 多路復用做的一個封裝,而 IO 多路復用能夠實現的前提之一就是:套接字必須是非阻塞的。
本文鏈接:http://www.www897cc.com/showinfo-26-64097-0.html如何在 Asyncio 中使用 Socket
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com
下一篇: Gorm 框架原理&源碼解析