Go 中的 channel 解析— Go 中的併發性
什麼是 channel ?
channel
是一個通訊物件,goroutine 可以使用它來相互通訊。 從技術上講,channel 是一個用於資料傳輸的管道,可以將資料
傳入或從中讀取
。 因此,一個 Goroutine 可以將資料傳送到一個 channel ,而其他 Goroutine 可以從同一個 channel 讀取該資料。
宣告一個 channel
Go 提供
chan
關鍵字來建立一個 channel。channel 只能用於傳輸
一種資料型別
的資料。不允許從該 channel 傳輸其他資料型別。
https://
play。golang。org/p/iWOFL
fcgfF-
上面的程式聲明瞭一個 channel
c
,它可以傳輸 int 型別的資料。上面的程式輸出為 ``,因為 channel 的零值為
nil
( 空 ) 但是
nil
( 空 ) channel 是不能被使用的。你不能將資料傳遞給一個
nil
( 空 ) 的 channel 或從
nil
( 空 ) channel 讀取資料。因此,我們必須使用
make
函式來建立一個可以使用的 channel。
https://
play。golang。org/p/N4dU7
Ql9bK7
我們使用了短命名法 := 來使用
make
函式建立 channel。以上程式產生以下結果
type of `c` is chan int
value of `c` is 0xc0420160c0
注意 channel
c
的值,它看起來像一個記憶體地址。預設情況下,channel 是指標。大多數情況下,當您希望與 Goroutine 進行通訊時,您將 channel 作為引數傳遞給函式或方法。因此,當 Goroutine 接收該 channel 作為引數時,您不需要解除對它的引用來從該 channel 傳送或讀取資料。
資料讀寫
Go 提供了非常容易記住的
左箭頭語法
<-
從 channel 中讀寫資料。
c <- data
上面的語法意味著我們想要將
data
傳送或寫入 channel
c
。它從
data
指向 channel
c
,因此我們可以想象一下將
data
傳送到 channel
c
。
<- c
上面的語法意味著我們需要從 channel
c
讀取一些資料,看看箭頭的方向,它是從 channel
c
開始的,這個語句沒有將資料傳送到任何地方,但是它仍然是一個有效的語句。如果您有一個變數用來儲存來自該 channel 的資料,則可以使用以下語法
var data int
data = <- c
現在,從 channel
c
中讀取出的 int 型別的資料可以賦值給 int 型別的變數
data
。
上面的語法可以像下面這樣使用短命名法重寫
data := <- c
Go 將判斷出在 channel
c
中傳輸的資料的資料型別,併為變數
data
提供一個有效的資料型別。
以上所有 channel 操作在預設情況下都是阻塞的
。在
上節課
中,我們看到了
time。Sleep
阻塞了 Goroutine。channel 操作在本質上也是阻塞的。當一些資料被寫入 channel 時,goroutine 會被阻塞,直到其他 Goroutine 從該 channel 讀取資料。同時,正如我們在
併發一章
中看到的,channel 操作告訴排程器排程另一個 Goroutine,這就是為什麼程式不會永遠阻塞在同一個 Goroutine 上。channel 的這些特性在 Goroutines 通訊中非常有用,因為它可以避免了我們用互斥鎖來讓它們相互協作。
在實踐中使用 Channel
上面我們講的已經很多了,現在讓我們來看一下在 Goroutine 中使用 channel 。
https://
play。golang。org/p/OeYLK
Ez7qKi
讓我們一步一步地來討論上述程式的執行。
我們聲明瞭
greet
函式,它接受傳輸資料型別為
string
的 channel
c
。在這個函式中,我們從 channel
c
中讀取資料並將資料列印到控制檯。
在 main 函式中,程式將
main() started
作為第一條語句列印到控制檯。
然後使用
make
函式建立了用於傳輸
string
型別的 channel
c
。
我們將 channel
c
傳遞給
greet
函式,然後使用
go
關鍵字將其作為一個 Goroutine 執行。
此時,程式有 2 個 Goroutine,而主 Goroutine 是
main Goroutine
(
檢視上一課瞭解它是什麼
)。然後程式執行下一行。
我們將字串
John
傳入 channel
c
。此時,goroutine 被阻塞,直到某個 Goroutine 讀取它。Go 排程程式排程
greet
goroutine,然後它開始執行,正如上面第一點說道的。
然後
main Goroutine
被啟用並執行最後的語句,列印
main()stopped
然後停止。
死鎖
如上面所述,當我們往 channel 寫入或從中讀取資料時,goroutine 將被阻塞並將控制權傳遞給可用的 Goroutine。如果沒有其他可用的 Goroutines,那麼可以想象他們都在睡覺。這就是死鎖錯誤發生的地方,那樣會導致整個程式崩潰。
如果您試圖從 channel 中讀取資料,但是 channel 中沒有可用的值,它將阻塞當前的 Goroutine 並且會阻塞其他 Goroutine,希望一些 Goroutine 將值傳送到 channel。因此,
這個讀取操作將會被阻塞
。類似地,如果要將資料傳送到一個 channel,它將阻塞當前的 Goroutine 並解除其他 Goroutine 的阻塞,直到某個 Goroutine 從它讀取資料。因此,
這個傳送操作將被阻塞
。
死鎖的一個簡單例子就是在 main Goroutine 中執行一些 channel 操作。
https://
play。golang。org/p/2KTEo
ljdci_f
上面的程式在執行時會丟擲下面的錯誤。
main() started
fatal error: all Goroutines are asleep - deadlock!
Goroutine 1 [chan send]:
main。main()
program。Go:10 +0xfd
exit status 2
fatal error: all Goroutines are asleep — deadlock!
。 似乎所有的 Goroutine 都在睡覺,或者根本沒有其他 Goroutine 可供使用。
關閉一個通道
一個 channel 可以被關閉,這樣就不能透過它傳送更多的資料了。接收端 Goroutine 可以透過它
val, ok := <- channel
瞭解 channel 的使用狀態,如果 channel 是開啟的或讀取操作是可以執行的,那麼
ok
的值等於
true
如果通道關閉那麼就不能執行更多的讀取操作,此時
ok
等於
false
,channel 可以使用帶有語法的內建函式
close
如,
close(chennel)
來關閉 channel ,讓我們來看一個小例子。
https://
play。golang。org/p/LMmAq
4sgm02
為了幫助您理解阻塞的概念,首先發送操作
c <- “John”
是阻塞的,一些 Goroutine 必須從 channel 中讀取資料,因此
greet
這個 Goroutine 由 Go 排程器排程。然後,第一次讀取操作
<-c
是非阻塞的,因為要從 channel
c
中讀取資料。第二次讀取操作
<-c
將阻塞,因為 channel
c
沒有任何資料可以讀取,因此 Go 排程器啟用
main
Goroutine,程式從
close(c)
函式開始執行。
從上面的錯誤中,我們可以看到我們試圖往一個已經關閉的 channel 裡傳送資料。此外,如果我們試圖從關閉的 channel 閱讀,程式會發生 panic。為了更好地理解被關閉 channel 的可用性,讓我們看看
for
迴圈。
For 迴圈
for
迴圈的無限語法
for{}
可用於讀取透過 channel 傳送的多個值。
https://
play。golang。org/p/X58FT
gSHhXi
在上面的例子中,我們建立了一個
squares
Goroutine,它將逐一返回從 0 到 9 的數字。在
main
Goroutine 中,我們用無限
for
迴圈來讀取那些數字 。
在無限
for
迴圈中,由於我們需要一個條件來在某一點中斷迴圈,所以我們使用語法
val, ok := <-c
從 channel 中讀取值。在這裡,
ok
會在 channel 關閉時給我們提供額外的資訊。因此,在
square
Goroutine 中,在寫完所有資料之後,我們使用語法
close(c)
關閉 channel。當
ok
的值為
true
時,程式列印
val
和
ok
的值。當它為
false
時,我們使用
break
關鍵字跳出迴圈。因此,上述程式產生以下結果
main() started
0 true
1 true
4 true
9 true
16 true
25 true
36 true
49 true
64 true
81 true
0 false <—— loop broke!
main() stopped
當 channel 關閉時,goroutine 讀取的值為 channel 資料型別的零值。在這種情況下,由於 channel 傳輸的是 int 資料型別,因此結果為 0。
為了避免手動檢查 channel 關閉情況帶來的痛苦,Go 為我們提供了
for range
迴圈 ,當 channel 關閉時
for range
將自動關閉。讓我們修改前面的程式。
https://
play。golang。org/p/ICCYb
WO7ZvD
在上面的程式中,我們用
for val:= range c
代替
for{}
。
range
將每次從 channel 中讀取一個值,直到 channel 關閉。因此,上面的程式產生下面的結果
main() started
0
1
4
9
16
25
36
49
64
81
main() stopped
如果最後不關閉
for range
迴圈中的 channel,程式將在執行時丟擲死鎖錯誤。
緩衝區大小或 channel 容量
正如我們看到的,channel 上的每個傳送操作都會阻塞當前的 Goroutine。但到目前為止,我們使用的
make
函式沒有第二個引數。第二個引數是 channel 或緩衝區大小的容量。預設情況下,channel 緩衝區大小為 0 也稱為
無緩衝 channel
。寫入 channel 的任何內容都必須是可以讀取的。
當緩衝區大小為非零 n 時,
goroutine 直到緩衝區滿後才被阻塞
。當緩衝區滿時,傳送到 channel 的任何值都將透過丟擲緩衝區中可供讀取的最後一個值 ( Goroutine 將被阻塞 ) 新增到緩衝區中。但有一個陷阱,
讀操作對快取是持續性的
。這意味著,一旦讀操作開始,它將一直持續下去,直到緩衝區為空。從技術上講,
這意味著讀取緩衝區 channel 的 Goroutine 在緩衝區為空之前不會阻塞
。
我們可以使用以下語法定義緩衝 channel。
c := make(chan Type, n)
這將建立一個緩衝區大小為
n
資料型別為
Type
的 channel。在 channel 接收到
n+1
傳送操作之前,它不會阻塞當前的 Goroutine。 讓我們來證明一下 Goroutine 在 channel 緩衝區滿之前不會阻塞。
https://
play。golang。org/p/k0usd
YZfp3D
在上述程式中,channel
c
的緩衝容量為 3。這意味著它可以儲存 3 個值,也就是第 20 行的值。但是由於緩衝區沒有滿 ( 因為我們沒有傳送任何新值 ),主 Goroutine 將不會阻塞,程式將會繼續。
讓我們傳送額外的值。
https://
play。golang。org/p/KGyis
kRj1Wi
如前所述,現在填充的緩衝區透過
c <- 4
傳送操作,主 Goroutine 將等待
square
Goroutine 讀取所有值。
channel 的長度和容量
與切片相似,緩衝 channel 具有長度和容量。channel 長度是 channel 緩衝區中排隊 ( 未讀 ) 的值個數,而 channel 容量是緩衝區大小。為了計算長度,我們使用
len
函式,而為了計算容量,我們使用
cap
函式,就像切片一樣。
https://
play。golang。org/p/qsDZu
6pXLT7
如果您想知道,為什麼上面的程式執行良好,死鎖錯誤沒有丟擲。這是因為,由於 channel 容量為 3,且緩衝區中只有 2 個值可用,Go 沒有試圖透過阻塞主 Goroutine 執行來排程另一個 Goroutine。如果需要,可以在 main Goroutine 中讀取這些值,因為即使緩衝區沒有滿,也不能阻止從 chennel 讀取值。
這是另外一個例子
https://
play。golang。org/p/-gGpm
08-wzz
這裡有一個腦筋急轉彎
https://
play。golang。org/p/sdHPD
x64aor
使用
for range
來讀取有快取 channel,我們可以從已經關閉的 channel 讀取。因為對於已經關閉的 channel,資料駐留在緩衝區中,我們仍然可以讀取該資料。
https://
play。golang。org/p/vULFy
WnpUoj
緩衝 channel 就像畢達哥拉斯杯,觀看這個關於
畢達哥拉斯杯
的有趣影片。
與多個 Goroutine 一起工作
我們寫兩個 Goroutines,一個用於計算整數的平方另一個用於計算整數的立方。
https://
play。golang。org/p/6wdhW
YpRfrX
讓我們一步一步地討論上述程式的執行。
我們建立了兩個函式
square
和
cube
,它們將作為 Goroutines 單獨執行。兩者都從 channel
c
中接收 int 型別的資料作為變數,然後複製給 num,然後在下一行將計算完成的資料寫到 channel
c
。
在
main
Goroutine 中,我們使用
make
函式建立了兩個型別為 int 的 channel
squareChan
和
cubeChan
。
然後我們執行
square
和
cube
Goroutine。
由於此時仍在
main
Goroutine 內,
testNum
的值此時為 3。
然後我們將資料傳送到
squareChan
和
cubeVal
。主 Goroutine 將被阻塞,直到這些 channel 讀取它。一旦他們讀了它,
main
Goroutine 將繼續執行。
當在
main
Goroutine 中,我們試圖從給定的 channel 讀取資料時,程式將被阻塞,直到這些 channel 從它們的 Goroutine 中寫入一些資料。這裡,我們使用了簡寫語法
:=
從多個 channel 接收資料。
一旦這些 Goroutine 將一些資料寫入 channel ,主 Goroutine 將被阻塞。
channel 寫操作完成後,
main
Goroutine 開始執行。然後我們計算總和並列印在控制檯上。
因此,上述程式將產生以下結果
[main] main() started
[main] sent testNum to squareChan
[square] reading
[main] resuming
[main] sent testNum to cubeChan
[cube] reading
[main] resuming
[main] reading from channels
[main] sum of square and cube of 3 is 36
[main] main() stopped
單向 channel
到目前為止,我們已經看到可以從兩邊傳輸資料的 channel,或者簡單地說,我們可以在上面進行讀寫操作的 channel。但是我們也可以創造單向的 channel。例如,只接收允許對其進行讀操作的 channel,只發送允許對其進行寫操作的 channel。
單向通道也使用
make
函式建立,但是使用了額外的箭頭語法。
roc := make(<-chan int)
soc := make(chan<- int)
在上述程式中,
roc
使用
make
函式建立箭頭遠離
chan
方向來作為只讀 channel。而
soc
使用箭頭靠近
chan
做為只寫 channel。它們也是不同的型別。
https://
play。golang。org/p/JZO51
IoaMg8
但是單向通道有什麼用呢?使用單向 channel 可以提高程式的型別安全性
。因此,程式不容易出錯。
但是假設您有一個 Goroutine,其中您只需要從 channel 中讀取資料,但是主 Goroutine 需要從 channel 中讀取資料或者往 channel 寫入資料。這將如何工作 ?
幸運的是,Go 提供了更簡單的語法來將雙向通道轉換為單向通道。
https://
play。golang。org/p/k3B3g
CelrGv
我們修改了
greet
Goroutine 的例子,將雙向 channel
c
轉換為只讀 channel
roc
的
greet
函式。現在我們只能從那個 channel 中讀取。任何寫操作都會導致致命的錯誤 :
“invalid operation: roc <- ”some text“ (send to receive-only type <-chan string)”
。
匿名 Goroutine
在 Goroutines 一章,我們學習了 匿名 Goroutines。我們還可以使用它們實現 channel。讓我們修改前面的簡單示例來實現匿名 Goroutine 中的 channel。
這是我們之前的例子
https://
play。golang。org/p/c5erd
HX1gwR
下面是修改後的例子,我們將
greet
Goroutine 作為一個匿名 Goroutine。
https://
play。golang。org/p/cM5nF
gRha7c
channel 的資料型別
是的,channel 是第一類值,可以像其他值一樣在任何地方使用:作為結構元素、函式引數、函式返回值,甚至作為另一個 channel 的型別。在這裡,我們感興趣的是使用 channel 作為另一個 channel 的資料型別。
https://
play。golang。org/p/xVQvv
b8O4De
Select
select
就像
switch
一樣沒有任何輸入引數,但是它只用於 channel 操作。
Select
語句用於在多個 channel 中只對一個 channel 執行操作,由
case
塊有條件地選擇。
讓我們先看一個例子,然後討論它是如何工作的。
https://
play。golang。org/p/ar5dZ
UQ2ArH
從上面的程式中,我們可以看到
select
語句就像
switch
一樣,但是不是
boolean
操作,我們添加了 channel 操作,比如讀或寫,或者讀和寫混合。
Select 語句是阻塞的,除非它有預設情況
( 稍後我們將看到 )。一旦其中一個條件滿足,它就會解除阻塞。那麼,
當一個案例多個 case 滿足呢?
如果所有的 case 語句 ( channel 操作 ) 都阻塞了,那麼 select 語句將等待其中一個 case 語句 ( 其 channel 操作 ) 解除阻塞,然後執行該 case。如果一些或所有的 channel 操作是非阻塞的,那麼將隨機選擇一個非阻塞 case 並立即執行。
為了解釋以上例子,我們啟動了兩個獨立 channel 的 Goroutines。然後介紹了兩個案例的 select 語句。一種情況從
chan1
讀取值,另一種情況從
chan2
讀取值。因為這些 channel 是無緩衝的,所以讀操作將被阻塞 ( 寫操作也一樣 )。所以這兩種選擇都是阻塞的。因此
select
將等待其中一個
case
被解除阻塞。
當程式位於
select
語句時,main Goroutine 將阻塞,它將排程 select 語句中出現的所有 Goroutine ( 一次一個 ),即
service1
和
service2
。
service1
等待 3 秒,然後透過寫入
chan1
解除阻塞。類似地,
service2
等待 5 秒,然後透過寫入
chan2
解除阻塞。因為
service1
比
service2
更早解除阻塞,所以
case1
將首先解除阻塞,因此將執行該案例,並忽略其他 case ( 這裡是 case2 )。一旦完成了 case 的執行,主函式的執行將繼續下去。
上面的程式模擬了真實的 Web 服務,其中負載均衡器收到數百萬個請求,它必須從可用的服務之一返回響應。使用 Goroutines、channel 和 select,我們可以請求多個服務來響應,可以使用快速響應的服務。
為了模擬當所有情況都阻塞時,響應幾乎同時可用,我們可以簡單地刪除
Sleep
函式呼叫。
https://
play。golang。org/p/giSkk
qt8XHb
以上程式產生以下結果 ( 您可能會得到不同的結果 )
main() started 0s
service2() started 481 µ s
Response from service 2 Hello from service 2 981。1 µ s
main() stopped 981。1 µ s
但有時,它也可能是
main() started 0s
service1() started 484。8 µ s
Response from service 1 Hello from service 1 984 µ s
main() stopped 984 µ s
這是因為
chan1
和
chan2
操作幾乎同時發生,但是在執行和排程上仍然存在一些時間差。
要模擬所有情況都是非阻塞且響應同時可用時,可以使用有緩衝 channel。
https://
play。golang。org/p/RLRGE
mFQP3f
以上程式產生下面的結果
main() started 0s
Response from chan2 Value 1 0s
main() stopped 1。0012ms
在某些情況下,它也可能是
main() started 0s
Response from chan1 Value 1 0s
main() stopped 1。0012ms
在上面的程式中,兩個 channel 的緩衝區中都有兩個值。由於我們在緩衝區容量 2 的 channel 中傳送了兩個值,這些 channel 操作不會阻塞,控制將轉到
select
語句。由於從緩衝 channel 讀取是非阻塞操作,直到整個緩衝區為空,並且在 case 條件下只讀取一個值,所以所有 case 操作都是非阻塞操作。因此,Go runtime 將隨機選擇一個 case 語句。
default case
與
switch
語句一樣,
select
語句也有
default
case。
default case 是非阻塞的
。但這還不是全部,
default
case 使得預設情況下
select
語句
總是非阻塞的
。這意味著,在任何 channel (
有緩衝或無緩衝
) 上的傳送和接收操作總是非阻塞的。
如果某個值在任何 channel 上可用,則
select
將執行該 case。否則,它將立即執行
default
case。
https://
play。golang。org/p/rFMpc
80EuT3
在上面的程式中,由於 channel 是無緩衝的,而且值在兩個 channel 操作中不能立即可用,因此將執行
default
case。如果上面的
select
語句沒有
default
case,那麼
select
就會阻塞,而回應就會不同。
由於在
default
中,
select
是非阻塞的,排程器不會從主 Goroutine 獲得排程可用 Goroutine 的呼叫。但是我們可以透過呼叫
time。Sleep
來手動實現。這樣,所有的 Goroutine 都會執行並且結束,將控制權返回給
main
Goroutine,它會在一段時間後醒來。當主 Goroutine 喚醒時,channel 將立即具有可用的值。
https://
play。golang。org/p/eD0NH
xHm9hN
因此,上述程式產生以下結果
main() started 0s
service1() started 0s
service2() started 0s
Response from service 1 Hello from service 1 3。0001805s
main() stopped 3。0001805s
在某些情況下,它也可能是
main() started 0s
service1() started 0s
service2() started 0s
Response from service 2 Hello from service 2 3。0000957s
main() stopped 3。0000957s
死鎖
當沒有可用的 channel 傳送或接收資料時,
default
case 是有用的。為了避免死鎖,我們可以使用
default
case。這是有可能的,因為由於有
default
case,所有 channel 操作都是非阻塞的,如果資料不能立即可用,Go 不會安排任何其他 Goroutines 傳送資料到 channel。
https://
play。golang。org/p/S3Wxu
qb8lMF
與接收操作類似,在傳送操作中,如果其他 Goroutine 正在休眠 (
未準備好接收值
),則執行
default
case。
空 channel
我們知道,channel 的預設值為
nil
。因此,我們不能在
nil
channel 上執行傳送或接收操作。但是在這種情況下,當 select 語句中使用
nil
channel 時,它將丟擲以下錯誤之一或兩個錯誤。
https://
play。golang。org/p/uhraF
ubcF4S
從上面的結果中,我們可以看到
select( 無 case)
意味著
select
語句實際上是空的,
因為帶有 nil channel 的 case 被忽略了
。但是由於空
select{}
語句阻塞了主 Goroutine,並且
service
Goroutine 在它的位置被排程,所以在
nil
通道上的通道操作將丟擲
chan send (nil chan)
錯誤。為了避免這種情況,我們使用
default
case。
https://
play。golang。org/p/upLsz
52_CrE
上述程式不僅忽略
case
塊,而且立即執行
default
case。因此排程器沒有時間來排程
service
Goroutine。但這確實是一個糟糕的設計。你應該檢查通道的
nil
值。
新增超時
上面的程式不是很有用,因為只執行
default
case。但有時,我們希望任何可用的服務都應該在適當的時間響應,如果它沒有響應,那麼就應該執行
default
case。這可以透過使用在定義時間後解除阻塞的 channel 操作來完成。該 channel 操作由
time
包的
after
函式提供。我們來看一個例子。
https://
play。golang。org/p/mda2t
2IQK__X
以上程式在 2 秒後產生以下結果
main() started 0s
No response received 2。0010958s
main() stopped 2。0010958s
在上面的程式中,
<-time。After(2 * time。 second)
在 2 秒後解除阻塞,返回它被解除阻塞的時間,但是在這裡,我們對它的返回值不感興趣。因為它也像一個 Goroutine,我們有 3 個 Goroutine,這個首先從其中接觸阻塞。因此,執行與 Goroutine 操作相對應的 case。
這是很有用的,因為您不希望等待來自可用服務的響應太久,而使用者必須等待很長時間才能從服務中獲得任何東西。如果加上
10 *time。second
。第二,在上面的例子中,將列印
service1
的響應,我想現在已經很明顯了。
空 select
與
for{}
空迴圈一樣,空
select{}
語法也是有效的,但有一個問題。正如我們所知,
select
語句被阻塞,直到其中一個 case 解除阻塞,而且由於沒有
case
語句可用來解除阻塞,main Goroutine 將永遠阻塞,從而導致死鎖。
https://
play。golang。org/p/-pBd-
BLMFOu
在上面的程式中,我們知道
select
會阻塞主 Goroutine,排程器會排程另一個可用的 Goroutine,即
service
。但是在那之後,它會掛起,排程器不得不排程另一個可用的 Goroutine,但是由於主協程被阻塞,沒有其他 Goroutine 可用,導致死鎖。
main() started
Hello from service!
fatal error: all Goroutines are asleep - deadlock!
goroutine 1 [select (no cases)]:
main。main()
program。Go:16 +0xba
exit status 2
WaitGroup
讓我們設想這樣一種情況 : 您需要知道是否所有的 Goroutines 都完成了它們的工作。這與選擇只需要一個條件為
true
的地方有些相反,但是在這裡需要
所有條件為 true 才能解鎖主 Goroutine
。這裡條件是 channel 操作成功。
WaitGroup
是一個具有計數器值的結構,它跟蹤生成了多少個 Goroutines 以及完成了多少工作。當這個計數器達到 0 時,表示所有的 Goroutines 都完成了它們的工作。
讓我們看一個例子,看看語法。
https://
play。golang。org/p/8qrAD
9ceOfJ
在上面的程式中,我們建立了
sync。WaitGroup
型別的空結構 (
帶有零值欄位
) wg。WaitGroup struct 有未匯出的欄位,如
noCopy
、
state1
和
sema
,其內部實現我們不需要知道。這個結構有三個方法,即
Add
、
Wait
和
Done
。
Add
方法需要一個
int
型別的引數,這是 WaitGroup 計數器的增量。
Counter
只是一個預設值為 0 的整數。它包含了正在執行的 Goroutine 的數量。在建立 WaitGroup 時,它的計數器值為 0,我們可以使用
Add
方法透過傳遞
delta
作為引數來遞增它。請記住,
counter
不能自動理解 Goroutine 何時啟動,因此我們需要手動增加它。
wait
方法用於從呼叫當前 Goroutine 的位置阻塞該 Goroutine。一旦計數器達到 0,goroutine 將解除阻塞。因此,我們需要一些東西來減少計數器。
Done
方法使計數器遞減。它不接受任何引數,因此它只減 1。
在上面的程式中,建立
wg
後,我們執行
for
迴圈 3 次。在每個回合中,我們啟動一個 Goroutine,並增加計數器 1。這意味著,現在我們有 3 個 Goroutine 等待執行,而 WaitGroup 計數器是 3。注意,我們在 Goroutine 中傳遞了指向
wg
的指標。這是因為在 Goroutine 中,一旦我們完成了 Goroutine 應該做的事情,我們需要呼叫
Done
方法來減少計數器。如果
wg
作為值傳遞,
wg
在
main
中不會減少。這是很明顯的。
執行
for
迴圈之後,我們仍然沒有將控制權傳遞給其他 Goroutines。這是透過呼叫
wg
上的
Wait
方法來完成的,比如
wg。Wait()
。這將阻塞主 Goroutine,直到計數器達到 0。一旦計數器達到 0,因為從 3 個 Goroutine,我們呼叫了
wg
上的
Done
方法 3 次,
main
Goroutine 將解除阻塞,並開始執行進一步的程式碼。
因此上面的程式產生下面的結果
main() started
Service called on instance 2
Service called on instance 3
Service called on instance 1
main() stopped
由於 Goroutines 的執行順序可能會有所不同,因此上述結果可能對您有所不同。
Add
方法接受型別為
int
,這意味著
delta
也可以是負的。想要了解更多,請訪問這裡的
官方文件
。
工作池
顧名思義,工作池是同時工作以執行任務的 Goroutines 的集合。在 WaitGroup 中,我們看到了一些 Goroutines 的集合,但他們沒有具體的工作。一旦您在它們中放入 channel,它們就有一些工作要做,併成為工作池。
因此,工作池背後的概念是維護一個
worker Goroutines
池,它接收一些任務並返回結果。一旦他們都完成了他們的工作,我們收集結果。所有這些 Goroutine 都為個人目的使用相同的通道。
讓我們看一個簡單的例子,有兩個 channel,即
tasks
和
results
。
https://
play。golang。org/p/IYiMV
1I4lCj
別擔心,我會一步一步解釋這裡發生的事情。
sqrWorker
是一個工作函式,它接受
task
channel、
result
channel 和
id
。這個 Goroutine 的工作是將從
task
channel 接收到的數字的平方傳送到
result
channel。
在 main 函式中,我們建立了具有緩衝區容量大小為 10 的
task
和
result
channel。因此,任何傳送操作都是非阻塞的,直到緩衝區滿為止。因此,設定大的緩衝區值並不是一個壞主意。
然後,我們使用上面兩個 channel 和 id 引數生成多個
sqrWorker
例項作為 Goroutines,以獲取關於哪個 Worker 正在執行任務的資訊。
然後我們將 5 個任務傳遞給
task
channel,這些
task
channel 是非阻塞的。
因為我們已經完成了
task
channel,所以關閉了它。這不是必須的,但是如果有一些 bug 進來,它會在將來節省很多時間。
然後使用 for 迴圈,經過 5 次迭代,我們從
result
channel 提取資料。由於空緩衝區上的讀操作是阻塞的,因此將從工作池排程一個 Goroutine。在 Goroutine 返回一些結果之前,main Goroutine 將被阻塞。
由於我們在 worker Goroutine 中模擬阻塞操作,因此呼叫排程器來排程另一個可用的 Goroutine,直到它可用為止。當 worker Goroutine 可用時,它將寫入
result
channel。由於在緩衝區滿之前,對緩衝 channel 的寫入是非阻塞的,所以在這裡對
result
channel 的寫入是非阻塞的。此外,噹噹前工作執行緒 Goroutine 不可用時,將使用任務緩衝區中的值執行多個其他工作執行緒 Goroutine。在所有工作者 Goroutines 消耗任務之後,當
task
channel 緩衝區為空時,範圍迴圈結束。當
task
channel 關閉時,它不會丟擲死鎖錯誤。
有時,所有的工作執行緒都可以睡眠,所以主執行緒會醒來並工作,直到
result
channel 緩衝區再次清空。
所有的 worker Goroutine 死後,
main
Goroutine 將重新獲得控制權,並從
result
channel 列印剩餘的結果,繼續執行。
上面的例子雖然很長,但是很好地解釋了多個 Goroutine 如何在同一個 channel 上提供內容並優雅地完成工作。當員工的工作遇到阻礙時,goroutine 功能強大。如果刪除
time。Sleep()
呼叫,那麼只有一個 Goroutine 將執行此任務,因為在
for range
迴圈完成並在 Goroutine 死亡之前,不會排程其他 Goroutine。
您可以得到不同的結果,就像在前面的例子中一樣,這取決於您的系統有多快,因為如果所有的 worker Gorutine 都被阻塞了,即使是一微秒,main Goroutine 也會像解釋的那樣被喚醒。
現在,讓我們使用同步 Goroutines 的 WaitGroup 概念。使用前面的 WaitGroup 示例,我們可以獲得相同的結果,但更優雅。
https://
play。golang。org/p/0rRfc
hn7sL1
上面的結果看起來很整潔,因為在 main Goroutine 中的
result
channel 上的讀取操作是非阻塞的,因為
result
channel 已經由 result 填充,而 main Goroutine 被
wg。Wait()
呼叫阻塞。使用
waitGroup
,我們可以防止很多 ( 不必要的 ) 上下文切換 ( 排程 ),這裡是 7,而前面的示例中是 9。
但這是有代價的,因為你必須等待所有的工作都完成。
Mutex
互斥是 Go 中最簡單的概念之一。但是在我解釋它之前,讓我們先理解競態條件是什麼。goroutines 有獨立的棧,因此它們之間不共享任何資料。但是在某些情況下,堆中的某些資料可能在多個 Goroutine 之間共享。在這種情況下,多個 Goroutine 試圖在相同的記憶體位置操作資料,從而導致意想不到的結果。我將向您展示一個簡單的示例。
https://
play。golang。org/p/MQNep
ChxiEa
在上面的程式中,我們生成了 1000 個 Goroutines,它增加了初始值為
0
的全域性變數
i
的值。由於我們正在實現 WaitGroup,所以我們希望所有 1000 個 Goroutines 都將
i
的值逐個遞增,結果
i
的最終值為 1000。當 main Goroutine 在
wg。Wait()
呼叫後再次執行時,我們將輸出
i
。
value of i after 1000 operations is 937
什麼?為什麼小於 1000 呢?看起來有些 Goroutine 沒用。但實際上,我們的程式有一個競態條件。讓我們看看會發生什麼。
i = i + 1
的計算有 3 個步驟
(1)得到 i 的值
(2)i 的增量值為 1
(3)用新值更新 i 的值
讓我們設想一個場景,在這些步驟之間安排了不同的 Goroutine。例如,讓我們考慮 1000 個 Goroutines 中的兩個 Goroutines,即 G1 和 G2。
當
i
為
0
時,G1 首先開始,執行前兩個步驟,
i
現在是
1
。但是在 G1 更新第 3 步中的
i
值之前,會排程新的 Goroutine G2 並執行所有步驟。但是對於 G2,
i
的值仍然是
0
,因此在執行步驟 3 之後,
i
將是 1。現在 G1 再次被安排完成步驟 3,並更新步驟 2 中
i
的值 1。在完美的世界裡,goroutines 在完成所有的 3 個步驟後被排程,兩個 Goroutines 的成功操作會產生
i
為 2 的值,但這裡不是這樣。因此,我們可以推測為什麼我們的程式沒有將
i
的值賦值為
1000
。
到目前為止,我們瞭解到 Goroutines 是合作安排的。除非一個 Goroutine 塊具有併發性課程中提到的條件之一,否則另一個 Goroutine 不會取代它。既然
i = i + 1
不是阻塞,為什麼 Go 排程器計劃另一個 Goroutine ?
您一定要在
stackoverflow
上檢視這個答案。
在任何情況下,您都不應該依賴 Go 的排程演算法並實現自己的邏輯來同步不同的 Goroutine。
一種確保每次只有一個 Goroutine 完成上述 3 個步驟的方法是實現互斥鎖。互斥 (
互斥
) 是程式設計中的一個概念,在這個概念中,一次只能有一個例程 ( 執行緒 ) 執行操作。這是透過一個獲取值上的鎖的例程來完成的,對它必須做的值做任何操作,然後釋放鎖。當值被鎖定時,沒有其他例程可以對其讀寫。
在 Go 中,互斥資料結構 ( map ) 是由
sync
包提供的。在 Go 中,在對可能導致競態條件的值執行任何操作之前,我們使用
mutex。Lock()
方法獲取一個鎖,然後是操作程式碼。一旦我們完成了操作,在上面的程式
i = i + 1
中,我們使用
mutex。unlock()
方法來解鎖它。當鎖存在時,任何其他 Goroutine 試圖讀取或寫入
i
的值時,該 Goroutine 將阻塞,直到從第一個 Goroutine 解鎖操作為止。因此,只有 1 個 Goroutine 可以讀取或寫入
i
的值,從而避免了競態條件。請記住,在鎖定和解鎖之間的操作中出現的任何變數在整個操作解鎖之前都不能用於其他 Goroutines。
讓我們用互斥鎖修改前面的示例。
https://
play。golang。org/p/xVFAX
_0Uig8
在上面的程式中,我們建立了一個互斥量
m
,並將它的指標傳遞給所有派生的 Goroutines。在開始對
i
進行操作之前,我們使用
m。lock()
語法獲得互斥物件
m
上的鎖,然後在操作之後使用
m。unlock()
語法解鎖它。上面的程式產生下面的結果。
value of i after 1000 operations is 1000
從上面的結果我們可以看到互斥幫助我們解決了競態條件。但是第一條規則是避免 Goroutines 之間共享資源。
您可以在執行
Go run -race program。Go
這樣的程式時,使用
race
引數在 Go 中測試競態條件。請在
這裡
閱讀更多關於 race 檢測器的資訊。
併發模式
併發有很多方法可以使我們的日常程式設計更加容易。以下是一些概念和方法,我們可以使用它們使程式更快和更可靠。
Generator ( 生產者 )
使用 channel,我們可以更好地實現生產者。如果生產者在計算上很昂貴,那麼我們也可以同時生成資料。這樣,程式就不必等待所有資料生成。例如,生成斐波那契數列。
https://
play。golang。org/p/1_2MD
eqQ3o5
使用
fib
函式,我們得到了一個可以迭代和利用從它接收到的資料的 channel。而在
fib
函式內部,由於我們必須返回一個只接收 channel,我們正在建立一個緩衝 channel 並在最後返回它。此函式的返回值將將此雙向 channel 轉換為單向接收 channel。在使用匿名 Goroutine 時,我們使用 for 迴圈將斐波那契數推入這個 channel。一旦完成 for 迴圈,我們就會從 Goroutine 內部關閉它。在
main
Goroutine 中,使用
for range
在
fib
函式呼叫,我們可以直接訪問這個 channel。
fan-in & fan-out (扇入和扇出)
扇入是一種多路複用策略,將多個 channel 的輸入組合起來產生一個輸出 channel。扇出是一種多路複用策略,其中單個 channel 被分成多個 channel。
package main
import (
“fmt”
“sync”
)
// return channel for input numbers
func getInputChan() <-chan int {
// make return channel
input := make(chan int, 100)
// sample numbers
numbers := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
// run Goroutine
go func() {
for num := range numbers {
input <- num
}
// close channel once all numbers are sent to channel
close(input)
}()
return input
}
// returns a channel which returns square of numbers
func getSquareChan(input <-chan int) <-chan int {
// make return channel
output := make(chan int, 100)
// run Goroutine
go func() {
// push squares until input channel closes
for num := range input {
output <- num * num
}
// close output channel once for loop finishesh
close(output)
}()
return output
}
// returns a merged channel of `outputsChan` channels
// this produce fan-in channel
// this is veriadic function
func merge(outputsChan 。。。<-chan int) <-chan int {
// create a WaitGroup
var wg sync。WaitGroup
// make return channel
merged := make(chan int, 100)
// increase counter to number of channels `len(outputsChan)`
// as we will spawn number of Goroutines equal to number of channels received to merge
wg。Add(len(outputsChan))
// function that accept a channel (which sends square numbers)
// to push numbers to merged channel
output := func(sc <-chan int) {
// run until channel (square numbers sender) closes
for sqr := range sc {
merged <- sqr
}
// once channel (square numbers sender) closes,
// call `Done` on `WaitGroup` to decrement counter
wg。Done()
}
// run above `output` function as groutines, `n` number of times
// where n is equal to number of channels received as argument the function
// here we are using `for range` loop on `outputsChan` hence no need to manually tell `n`
for _, optChan := range outputsChan {
go output(optChan)
}
// run Goroutine to close merged channel once done
go func() {
// wait until WaitGroup finishesh
wg。Wait()
close(merged)
}()
return merged
}
func main() {
// step 1: get input numbers channel
// by calling `getInputChan` function, it runs a Goroutine which sends number to returned channel
chanInputNums := getInputChan()
// step 2: `fan-out` square operations to multiple Goroutines
// this can be done by calling `getSquareChan` function multiple times where individual function call returns a channel which sends square of numbers provided by `chanInputNums` channel
// `getSquareChan` function runs Goroutines internally where squaring operation is ran concurrently
chanOptSqr1 := getSquareChan(chanInputNums)
chanOptSqr2 := getSquareChan(chanInputNums)
// step 3: fan-in (combine) `chanOptSqr1` and `chanOptSqr2` output to merged channel
// this is achieved by calling `merge` function which takes multiple channels as arguments
// and using `WaitGroup` and multiple Goroutines to receive square number, we can send square numbers
// to `merged` channel and close it
chanMergedSqr := merge(chanOptSqr1, chanOptSqr2)
// step 4: let‘s sum all the squares from 0 to 9 which should be about `285`
// this is done by using `for range` loop on `chanMergedSqr`
sqrSum := 0
// run until `chanMergedSqr` or merged channel closes
// that happens in `merge` function when all Goroutines pushing to merged channel finishes
// check line no。 86 and 87
for num := range chanMergedSqr {
sqrSum += num
}
// step 5: print sum when above `for loop` is done executing which is after `chanMergedSqr` channel closes
fmt。Println(“Sum of squares between 0-9 is”, sqrSum)
}
https://
play。golang。org/p/hATZm
b6P1-u
我不打算解釋上面的程式是如何工作的,因為我已經在程式中添加了註釋來解釋了這一點。以上程式產生以下結果
Sum of squares between 0-9 is 285
via: Go 中的 channel 解析- Go 中的併發性
作者:
Uday Hiwarale
譯者:
wumansgy
polaris1119
本文由
GCTT
原創編譯,
Go語言中文網
榮譽推出