作者 | 陸小風
來源 | 碼農的荒島求生
今天來聊一聊協程的作用。
假設磁碟上有10個檔案,你需要讀取的記憶體,那麼你該怎麼用程式碼實現呢?
在接著往下看之前,先自己想一想這個問題,看看自己能想出幾種方法,各自有什麼樣的優缺點。
想清楚了嗎(還在看嗎),想清楚了我們繼續往下看。
最簡單的方法——序列
這可能是大多數同學都能想到的最簡單方法,那就是一個一個的讀取,讀完一個接著讀下一個。
用程式碼表示是這樣的:
for file in files:
result = file.read()
process(result)
是不是非常簡單,我們假設每個檔案讀取需要1分鐘,那麼10個檔案總共需要10分鐘才能讀取完成。
這種方法有什麼問題呢?
實際上這種方法只有一個問題,那就是慢。
除此之外,其它都是優點:
程式碼簡單,容易理解
可維護性好,這程式碼交給誰都能維護的了(論程式設計師的核心競爭力在哪裡)
那麼慢的問題該怎麼解決呢?
有的同學可能已經想到了,為啥要一個一個讀取呢?並行讀取不就可以加快速度了嗎。
稍好的方法,並行
那麼,該怎麼並行讀取檔案呢?
顯然,地球人都知道,執行緒就是用來並行的。
我們可以同時開啟10個執行緒,每個執行緒中讀取一個檔案。
用程式碼實現就是這樣的:
def read_and_process(file):
result = file.read()
process(result)
def main():
files = [fileA,fileB,fileC......]
for file in files:
create_thread(read_and_process,
file).run()
# 等待這些執行緒執行完成
怎麼樣,是不是也非常簡單。
那麼這種方法有什麼問題嗎?
在開啟10個執行緒這種問題規模下沒有問題。
現在我們把問題難度加大,假設有10000個檔案,需要處理該怎麼辦呢?
有的同學可能想10個檔案和10000個檔案有什麼區別嗎,直接創建10000個執行緒去讀不可以嗎?
實際上這裡的問題其實是說創建多個執行緒有沒有什麼問題。
我們知道,雖然執行緒號稱「輕量級進程」,雖然是輕量級但當數量足夠可觀時依然會有性能問題。
這裡的問題主要有這樣幾個方面:
創建執行緒需要消耗系統資源,像記憶體等(想一想為什麼?)
排程開銷,尤其是當執行緒數量較多且都比較繁忙時(同樣想一想為什麼?)
創建多個執行緒不一定能加快I/O(如果此時設備處理能力已經飽和)
既然執行緒有這樣那樣的問題,那麼還有沒有更好的方法?
答案是肯定的,並行程式設計不一定只能依賴執行緒這種技術。
這裡的答案就是基於事件驅動程式設計技術。
事件驅動 + 非同步
沒錯,即使在單個執行緒中,使用事件驅動+非同步也可以實現IO並行處理,Node.js就是非常典型的例子。
為什麼單執行緒也可以做到並行呢?
這是基於這樣兩個事實:
相對於CPU的處理速度來說,IO是非常慢的
IO不怎麼需要計算資源
因此,當我們發起IO操作後為什麼要一直等著IO執行完成呢?在IO執行完之前的這段時間處理其它IO難道不香嗎?
這就是為什麼單執行緒也可以並行處理多個IO的本質所在。
回到我們的例子,該怎樣用事件驅動+非同步來改造上述程序呢?
實際上非常簡單。
首先我們需要創建一個event loop,這個非常簡單:
event_loop = EventLoop()
然後,我們需要往event loop中加入原材料,也就是需要監控的event,就像這樣:
def add_to_event_loop(event_loop, file):
file.asyn_read() # 檔案非同步讀取
event_loop.add(file)
注意當執行file.asyn_read這行程式碼時會立即返回,不會阻塞執行緒,當這行程式碼返回時可能檔案還沒有真正開始讀取,這就是所謂的非同步。
file.asyn_read這行程式碼的真正目的僅僅是發起IO,而不是等待IO執行完成。
此後我們將該IO放到event loop中進行監控,也就是event_loop.add(file)這行程式碼的作用。
一切準備就緒,接下來就可以等待event的到來了:
while event_loop:
file = event_loop.wait_one_IO_ready()
process(file.result)
我們可以看到,event_loop會一直等待直到有檔案讀取完成(event_loop.wait_one_IO_ready()),這時我們就能得到讀完的檔案了,接下來處理即可。
全部程式碼如下所示:
def add_to_event_loop(event_loop, file):
file.asyn_read() # 檔案非同步讀取
event_loop.add(file)
def main():
files = [fileA,fileB,fileC ...]
event_loop = EventLoop()
for file in files:
add_to_event_loop(event_loop, file)
while event_loop:
file = event_loop.wait_one_IO_ready()
process(file.result)
多執行緒 VS 單執行緒 + event loop
接下來我們看下程序執行的效果。
在多執行緒情況下,假設有10個檔案,每個檔案讀取需要1秒,那麼很簡單,並行讀取10個檔案需要1秒。
那麼對於單執行緒+event loop呢?
我們再次看下event loop + 非同步版本的程式碼:
def add_to_event_loop(event_loop, file):
file.asyn_read() # 檔案非同步讀取
event_loop.add(file)
def main():
files = [fileA,fileB,fileC......]
event_loop = EventLoop()
for file in files:
add_to_event_loop(event_loop, file)
while event_loop:
file = event_loop.wait_one_IO_ready()
process(file.result)
對於add_to_event_loop,由於檔案非同步讀取,因此該函數可以瞬間執行完成,真正耗時的函數其實就是event loop的等待函數,也就是這樣:
file = event_loop.wait_one_IO_ready()
我們知道,一個檔案的讀取耗時是1秒,因此該函數在1s後才能返回,但是,但是,接下來是重點。
但是雖然該函數wait_one_IO_ready會等待1s,不要忘了,我們利用這兩行程式碼同時發起了10個IO操作請求。
for file in files: add_to_event_loop(event_loop, file)
因此在event_loop.wait_one_IO_ready等待的1s期間,剩下的9個IO也完成了,也就是說event_loop.wait_one_IO_ready函數只是在第一次循環時會等待1s,但是此後的9次循環會直接返回,原因就在於剩下的9個IO也完成了。
因此整個程序的執行耗時也是1秒。
是不是很神奇,我們只用一個執行緒就達到了10個執行緒的效果。
這就是event loop + 非同步的威力所在。
一個好聽的名字:Reactors模式
本質上,我們上述給出的event loop簡單程式碼片段做的事情本質上和生物一樣:
給出刺激,做出反應。
我們這裡的給出event,然後處理event。
這本質上就是所謂的Reactors模式。
現在你應該明白所謂的Reactors模式是怎麼一回事了吧。
所謂的一些看上去複雜的非同步框架其核心不過就是這裡給出的程式碼片段,只是這些框架可以支持更加複雜的多階段任務處理以及各種類型的IO。而我們這裡給出的程式碼片段只能處理檔案讀取這一類IO。
把回調也加進來
如果我們需要處理各種類型的IO上述程式碼片段會有什麼問題嗎?
問題就在於上述程式碼片段就不會這麼簡單了,針對不同類型會有不同的處理方法,因此上述process方法需要判斷IO類型然後有針對性的處理,這會使得程式碼越來越複雜,越來越難以維護。
幸好我們也有應對策略,這就是回調。
我們可以把IO完成後的處理任務封裝到回呼函式中,然後和IO一併註冊到event loop。
就像這樣:
def IO_type_1(event_loop, io):
io.start()
def callback(result):
process_IO_type_1(result)
event_loop.add((io, callback))
這樣,event_loop在檢測到有IO完成後就可以把該IO和關聯的callback處理函數一併檢索出來,直接調用callback函數就可以了。
while event_loop:
io, callback = event_loop.wait_one_IO_ready()
callback(io.result)
看到了吧,這樣event_loop內部就極其簡潔了,even_loop根本就不關心該怎麼處理該IO結果,這是註冊的callback該關心的事情,event_loop需要做的僅僅就是拿到event以及相應的處理函數callback,然後調用該callback函數就可以了。
現在我們可以同單執行緒來併發程式設計了,也使用callback對IO處理進行了抽象,使得程式碼更加容易維護,想想看還有沒有什麼問題?
回呼函式的問題
雖然回呼函式使得event loop內部更加簡潔,但依然有其它問題,讓我們來仔細看看回呼函式:
def start_IO_type_1(event_loop, io):
io.start()
def callback(result):
process_IO_type_1(result)
event_loop.add((io, callback))
從上述程式碼中你能看到什麼問題嗎?
在上述程式碼中,一次IO處理過程被分為了兩部分:
發起IO
IO處理
其中第2部分放到了回呼函式中,這樣的非同步處理天然不容易理解,這和我們熟悉的發起IO,等待IO完成、處理IO結果的同步模組有很大差別。
這裡的給的例子很簡單,所以你可能不以為意,但是當處理的任務非常複雜時,可能會出現回呼函式中嵌套回呼函式,也就是回調地獄,這樣的程式碼維護起來會讓你懷疑為什麼要稱為一名苦逼的碼農。
問題出在哪裡
讓我們再來仔細的看看問題出在了哪裡?
同步程式設計模式下很簡單,但是同步模式下發起IO,執行緒會被阻塞,這樣我們就不得不創建多個執行緒,但是創建過多執行緒又會有性能問題。
這樣為了發起IO後不阻塞當前執行緒我們就不得不採用非同步程式設計+event loop。
在這種模式下,非同步發起IO不會阻塞調用執行緒,我們可以使用單執行緒加非同步程式設計的方法來實現多執行緒效果,但是在這種模式下處理一個IO的流程又不得不被拆分成兩部分,這樣的程式碼違反程式設計師直覺,因此難以維護。
那麼很自然的,有沒有一種方法既能有同步程式設計的簡單理解又會有非同步程式設計的非阻塞呢?
答案是肯定的,這就是協程。
Finally!終於到了協程
利用協程我可以以同步的形式來非同步程式設計。
這是什麼意思呢?
我們之所以採用非同步程式設計是為了發起IO後不阻塞當前執行緒,而是用協程,程式設計師可以自行決定在什麼時刻掛起當前協程,這樣也不會阻塞當前執行緒。
而協程最棒的一點就在於掛起後可以暫存執行狀態,恢復運行後可以在掛起點繼續運行,這樣我們就不再需要像回調那樣將一個IO的處理流程拆分成兩部分了。
因此我們可以在發起非同步IO,這樣不會阻塞當前執行緒,同時在發起非同步IO後掛起當前協程,當IO完成後恢復該協程的運行,這樣我們就可以實現同步的方式來非同步程式設計了。
接下來我們就用協程來改造一下回調版本的IO處理方式:
def start_IO_type_1(io):
io.start() # IO非同步請求
yield # 暫停當前協程
process_IO_type_1(result) # 處理返回結果
此後我們要把該協程放到event loop中監控起來:
def add_to_event_loop(io, event_loop):
coroutine = start_IO_type_1(io)
next(coroutine)
event_loop.add(coroutine)
最後,當IO完成後event loop檢索出相應的協程並恢復其運行:
while event_loop:
coroutine = event_loop.wait_one_IO_ready()
next(coroutine)
現在你應該看出來了吧,上述程式碼中沒有回調,也沒有把處理IO的流程拆成兩部分,整體的程式碼都是以同步的方式來編寫,最棒的是依然能達到非同步的效果。
實際上你會看到,採用協程後我們依然需要基於事件程式設計的event loop,因為本質上協程並沒有改變IO的非同步處理本質,只要IO是非同步處理的那麼我們就必須依賴event loop來監控IO何時完成,只不過我們採用協程消除了對回調的依賴,整體程式設計方式上還是採用程式設計師最熟悉也最容易理解的同步方式。
總結
看上去簡簡單單的IO實際上一點都不簡單吧。
為了高效進行IO操作,我們採用的技術是這樣演進的:
單執行緒序列 + 阻塞式IO(同步)
多執行緒並行 + 阻塞式IO(並行)
單執行緒 + 非阻塞式IO(非同步) + event loop
單執行緒 + 非阻塞式IO(非同步) + event loop + 回調
Reactor模式(更好的單執行緒 + 非阻塞式IO+ event loop + 回調)
單執行緒 + 非阻塞式IO(非同步) + event loop + 協程
最終我們採用協程技術獲取到了非同步程式設計的高效以及同步程式設計的簡單理解,這也是當今高性能伺服器常用的一種技術組合。
希望這篇文章能對你理解高效IO有所幫助。
