您當前的位置:首頁 > 舞蹈

Go 中的 channel 解析— Go 中的併發性

作者:由 polarisxu 發表于 舞蹈時間:2020-09-22

什麼是 channel ?

channel

是一個通訊物件,goroutine 可以使用它來相互通訊。 從技術上講,channel 是一個用於資料傳輸的管道,可以將資料

傳入或從中讀取

。 因此,一個 Goroutine 可以將資料傳送到一個 channel ,而其他 Goroutine 可以從同一個 channel 讀取該資料。

宣告一個 channel

Go 提供

chan

關鍵字來建立一個 channel。channel 只能用於傳輸

一種資料型別

的資料。不允許從該 channel 傳輸其他資料型別。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/iWOFL

fcgfF-

上面的程式聲明瞭一個 channel

c

,它可以傳輸 int 型別的資料。上面的程式輸出為 ``,因為 channel 的零值為

nil

( 空 ) 但是

nil

( 空 ) channel 是不能被使用的。你不能將資料傳遞給一個

nil

( 空 ) 的 channel 或從

nil

( 空 ) channel 讀取資料。因此,我們必須使用

make

函式來建立一個可以使用的 channel。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/N4dU7

Ql9bK7

我們使用了短命名法 := 來使用

make

函式建立 channel。以上程式產生以下結果

type of `c` is chan int

value of `c` is 0xc0420160c0

注意 channel

c

的值,它看起來像一個記憶體地址。預設情況下,channel 是指標。大多數情況下,當您希望與 Goroutine 進行通訊時,您將 channel 作為引數傳遞給函式或方法。因此,當 Goroutine 接收該 channel 作為引數時,您不需要解除對它的引用來從該 channel 傳送或讀取資料。

資料讀寫

Go 提供了非常容易記住的

左箭頭語法

<-

從 channel 中讀寫資料。

c <- data

上面的語法意味著我們想要將

data

傳送或寫入 channel

c

。它從

data

指向 channel

c

,因此我們可以想象一下將

data

傳送到 channel

c

<- c

上面的語法意味著我們需要從 channel

c

讀取一些資料,看看箭頭的方向,它是從 channel

c

開始的,這個語句沒有將資料傳送到任何地方,但是它仍然是一個有效的語句。如果您有一個變數用來儲存來自該 channel 的資料,則可以使用以下語法

var data int

data = <- c

現在,從 channel

c

中讀取出的 int 型別的資料可以賦值給 int 型別的變數

data

上面的語法可以像下面這樣使用短命名法重寫

data := <- c

Go 將判斷出在 channel

c

中傳輸的資料的資料型別,併為變數

data

提供一個有效的資料型別。

以上所有 channel 操作在預設情況下都是阻塞的

。在

上節課

中,我們看到了

time。Sleep

阻塞了 Goroutine。channel 操作在本質上也是阻塞的。當一些資料被寫入 channel 時,goroutine 會被阻塞,直到其他 Goroutine 從該 channel 讀取資料。同時,正如我們在

併發一章

中看到的,channel 操作告訴排程器排程另一個 Goroutine,這就是為什麼程式不會永遠阻塞在同一個 Goroutine 上。channel 的這些特性在 Goroutines 通訊中非常有用,因為它可以避免了我們用互斥鎖來讓它們相互協作。

在實踐中使用 Channel

上面我們講的已經很多了,現在讓我們來看一下在 Goroutine 中使用 channel 。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/OeYLK

Ez7qKi

讓我們一步一步地來討論上述程式的執行。

我們聲明瞭

greet

函式,它接受傳輸資料型別為

string

的 channel

c

。在這個函式中,我們從 channel

c

中讀取資料並將資料列印到控制檯。

在 main 函式中,程式將

main() started

作為第一條語句列印到控制檯。

然後使用

make

函式建立了用於傳輸

string

型別的 channel

c

我們將 channel

c

傳遞給

greet

函式,然後使用

go

關鍵字將其作為一個 Goroutine 執行。

此時,程式有 2 個 Goroutine,而主 Goroutine 是

main Goroutine

檢視上一課瞭解它是什麼

)。然後程式執行下一行。

我們將字串

John

傳入 channel

c

。此時,goroutine 被阻塞,直到某個 Goroutine 讀取它。Go 排程程式排程

greet

goroutine,然後它開始執行,正如上面第一點說道的。

然後

main Goroutine

被啟用並執行最後的語句,列印

main()stopped

然後停止。

死鎖

如上面所述,當我們往 channel 寫入或從中讀取資料時,goroutine 將被阻塞並將控制權傳遞給可用的 Goroutine。如果沒有其他可用的 Goroutines,那麼可以想象他們都在睡覺。這就是死鎖錯誤發生的地方,那樣會導致整個程式崩潰。

如果您試圖從 channel 中讀取資料,但是 channel 中沒有可用的值,它將阻塞當前的 Goroutine 並且會阻塞其他 Goroutine,希望一些 Goroutine 將值傳送到 channel。因此,

這個讀取操作將會被阻塞

。類似地,如果要將資料傳送到一個 channel,它將阻塞當前的 Goroutine 並解除其他 Goroutine 的阻塞,直到某個 Goroutine 從它讀取資料。因此,

這個傳送操作將被阻塞

死鎖的一個簡單例子就是在 main Goroutine 中執行一些 channel 操作。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/2KTEo

ljdci_f

上面的程式在執行時會丟擲下面的錯誤。

main() started

fatal error: all Goroutines are asleep - deadlock!

Goroutine 1 [chan send]:

main。main()

program。Go:10 +0xfd

exit status 2

fatal error: all Goroutines are asleep — deadlock!

。 似乎所有的 Goroutine 都在睡覺,或者根本沒有其他 Goroutine 可供使用。

關閉一個通道

一個 channel 可以被關閉,這樣就不能透過它傳送更多的資料了。接收端 Goroutine 可以透過它

val, ok := <- channel

瞭解 channel 的使用狀態,如果 channel 是開啟的或讀取操作是可以執行的,那麼

ok

的值等於

true

如果通道關閉那麼就不能執行更多的讀取操作,此時

ok

等於

false

,channel 可以使用帶有語法的內建函式

close

如,

close(chennel)

來關閉 channel ,讓我們來看一個小例子。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/LMmAq

4sgm02

為了幫助您理解阻塞的概念,首先發送操作

c <- “John”

是阻塞的,一些 Goroutine 必須從 channel 中讀取資料,因此

greet

這個 Goroutine 由 Go 排程器排程。然後,第一次讀取操作

<-c

是非阻塞的,因為要從 channel

c

中讀取資料。第二次讀取操作

<-c

將阻塞,因為 channel

c

沒有任何資料可以讀取,因此 Go 排程器啟用

main

Goroutine,程式從

close(c)

函式開始執行。

從上面的錯誤中,我們可以看到我們試圖往一個已經關閉的 channel 裡傳送資料。此外,如果我們試圖從關閉的 channel 閱讀,程式會發生 panic。為了更好地理解被關閉 channel 的可用性,讓我們看看

for

迴圈。

For 迴圈

for

迴圈的無限語法

for{}

可用於讀取透過 channel 傳送的多個值。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/X58FT

gSHhXi

在上面的例子中,我們建立了一個

squares

Goroutine,它將逐一返回從 0 到 9 的數字。在

main

Goroutine 中,我們用無限

for

迴圈來讀取那些數字 。

在無限

for

迴圈中,由於我們需要一個條件來在某一點中斷迴圈,所以我們使用語法

val, ok := <-c

從 channel 中讀取值。在這裡,

ok

會在 channel 關閉時給我們提供額外的資訊。因此,在

square

Goroutine 中,在寫完所有資料之後,我們使用語法

close(c)

關閉 channel。當

ok

的值為

true

時,程式列印

val

ok

的值。當它為

false

時,我們使用

break

關鍵字跳出迴圈。因此,上述程式產生以下結果

main() started

0 true

1 true

4 true

9 true

16 true

25 true

36 true

49 true

64 true

81 true

0 false <—— loop broke!

main() stopped

當 channel 關閉時,goroutine 讀取的值為 channel 資料型別的零值。在這種情況下,由於 channel 傳輸的是 int 資料型別,因此結果為 0。

為了避免手動檢查 channel 關閉情況帶來的痛苦,Go 為我們提供了

for range

迴圈 ,當 channel 關閉時

for range

將自動關閉。讓我們修改前面的程式。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/ICCYb

WO7ZvD

在上面的程式中,我們用

for val:= range c

代替

for{}

range

將每次從 channel 中讀取一個值,直到 channel 關閉。因此,上面的程式產生下面的結果

main() started

0

1

4

9

16

25

36

49

64

81

main() stopped

如果最後不關閉

for range

迴圈中的 channel,程式將在執行時丟擲死鎖錯誤。

緩衝區大小或 channel 容量

正如我們看到的,channel 上的每個傳送操作都會阻塞當前的 Goroutine。但到目前為止,我們使用的

make

函式沒有第二個引數。第二個引數是 channel 或緩衝區大小的容量。預設情況下,channel 緩衝區大小為 0 也稱為

無緩衝 channel

。寫入 channel 的任何內容都必須是可以讀取的。

當緩衝區大小為非零 n 時,

goroutine 直到緩衝區滿後才被阻塞

。當緩衝區滿時,傳送到 channel 的任何值都將透過丟擲緩衝區中可供讀取的最後一個值 ( Goroutine 將被阻塞 ) 新增到緩衝區中。但有一個陷阱,

讀操作對快取是持續性的

。這意味著,一旦讀操作開始,它將一直持續下去,直到緩衝區為空。從技術上講,

這意味著讀取緩衝區 channel 的 Goroutine 在緩衝區為空之前不會阻塞

我們可以使用以下語法定義緩衝 channel。

c := make(chan Type, n)

這將建立一個緩衝區大小為

n

資料型別為

Type

的 channel。在 channel 接收到

n+1

傳送操作之前,它不會阻塞當前的 Goroutine。 讓我們來證明一下 Goroutine 在 channel 緩衝區滿之前不會阻塞。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/k0usd

YZfp3D

在上述程式中,channel

c

的緩衝容量為 3。這意味著它可以儲存 3 個值,也就是第 20 行的值。但是由於緩衝區沒有滿 ( 因為我們沒有傳送任何新值 ),主 Goroutine 將不會阻塞,程式將會繼續。

讓我們傳送額外的值。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/KGyis

kRj1Wi

如前所述,現在填充的緩衝區透過

c <- 4

傳送操作,主 Goroutine 將等待

square

Goroutine 讀取所有值。

channel 的長度和容量

與切片相似,緩衝 channel 具有長度和容量。channel 長度是 channel 緩衝區中排隊 ( 未讀 ) 的值個數,而 channel 容量是緩衝區大小。為了計算長度,我們使用

len

函式,而為了計算容量,我們使用

cap

函式,就像切片一樣。

https://

play。golang。org/p/qsDZu

6pXLT7

如果您想知道,為什麼上面的程式執行良好,死鎖錯誤沒有丟擲。這是因為,由於 channel 容量為 3,且緩衝區中只有 2 個值可用,Go 沒有試圖透過阻塞主 Goroutine 執行來排程另一個 Goroutine。如果需要,可以在 main Goroutine 中讀取這些值,因為即使緩衝區沒有滿,也不能阻止從 chennel 讀取值。

這是另外一個例子

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/-gGpm

08-wzz

這裡有一個腦筋急轉彎

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/sdHPD

x64aor

使用

for range

來讀取有快取 channel,我們可以從已經關閉的 channel 讀取。因為對於已經關閉的 channel,資料駐留在緩衝區中,我們仍然可以讀取該資料。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/vULFy

WnpUoj

緩衝 channel 就像畢達哥拉斯杯,觀看這個關於

畢達哥拉斯杯

的有趣影片。

與多個 Goroutine 一起工作

我們寫兩個 Goroutines,一個用於計算整數的平方另一個用於計算整數的立方。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/6wdhW

YpRfrX

讓我們一步一步地討論上述程式的執行。

我們建立了兩個函式

square

cube

,它們將作為 Goroutines 單獨執行。兩者都從 channel

c

中接收 int 型別的資料作為變數,然後複製給 num,然後在下一行將計算完成的資料寫到 channel

c

main

Goroutine 中,我們使用

make

函式建立了兩個型別為 int 的 channel

squareChan

cubeChan

然後我們執行

square

cube

Goroutine。

由於此時仍在

main

Goroutine 內,

testNum

的值此時為 3。

然後我們將資料傳送到

squareChan

cubeVal

。主 Goroutine 將被阻塞,直到這些 channel 讀取它。一旦他們讀了它,

main

Goroutine 將繼續執行。

當在

main

Goroutine 中,我們試圖從給定的 channel 讀取資料時,程式將被阻塞,直到這些 channel 從它們的 Goroutine 中寫入一些資料。這裡,我們使用了簡寫語法

:=

從多個 channel 接收資料。

一旦這些 Goroutine 將一些資料寫入 channel ,主 Goroutine 將被阻塞。

channel 寫操作完成後,

main

Goroutine 開始執行。然後我們計算總和並列印在控制檯上。

因此,上述程式將產生以下結果

[main] main() started

[main] sent testNum to squareChan

[square] reading

[main] resuming

[main] sent testNum to cubeChan

[cube] reading

[main] resuming

[main] reading from channels

[main] sum of square and cube of 3 is 36

[main] main() stopped

單向 channel

到目前為止,我們已經看到可以從兩邊傳輸資料的 channel,或者簡單地說,我們可以在上面進行讀寫操作的 channel。但是我們也可以創造單向的 channel。例如,只接收允許對其進行讀操作的 channel,只發送允許對其進行寫操作的 channel。

單向通道也使用

make

函式建立,但是使用了額外的箭頭語法。

roc := make(<-chan int)

soc := make(chan<- int)

在上述程式中,

roc

使用

make

函式建立箭頭遠離

chan

方向來作為只讀 channel。而

soc

使用箭頭靠近

chan

做為只寫 channel。它們也是不同的型別。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/JZO51

IoaMg8

但是單向通道有什麼用呢?使用單向 channel 可以提高程式的型別安全性

。因此,程式不容易出錯。

但是假設您有一個 Goroutine,其中您只需要從 channel 中讀取資料,但是主 Goroutine 需要從 channel 中讀取資料或者往 channel 寫入資料。這將如何工作 ?

幸運的是,Go 提供了更簡單的語法來將雙向通道轉換為單向通道。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/k3B3g

CelrGv

我們修改了

greet

Goroutine 的例子,將雙向 channel

c

轉換為只讀 channel

roc

greet

函式。現在我們只能從那個 channel 中讀取。任何寫操作都會導致致命的錯誤 :

“invalid operation: roc <- ”some text“ (send to receive-only type <-chan string)”

匿名 Goroutine

在 Goroutines 一章,我們學習了 匿名 Goroutines。我們還可以使用它們實現 channel。讓我們修改前面的簡單示例來實現匿名 Goroutine 中的 channel。

這是我們之前的例子

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/c5erd

HX1gwR

下面是修改後的例子,我們將

greet

Goroutine 作為一個匿名 Goroutine。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/cM5nF

gRha7c

channel 的資料型別

是的,channel 是第一類值,可以像其他值一樣在任何地方使用:作為結構元素、函式引數、函式返回值,甚至作為另一個 channel 的型別。在這裡,我們感興趣的是使用 channel 作為另一個 channel 的資料型別。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/xVQvv

b8O4De

Select

select

就像

switch

一樣沒有任何輸入引數,但是它只用於 channel 操作。

Select

語句用於在多個 channel 中只對一個 channel 執行操作,由

case

塊有條件地選擇。

讓我們先看一個例子,然後討論它是如何工作的。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/ar5dZ

UQ2ArH

從上面的程式中,我們可以看到

select

語句就像

switch

一樣,但是不是

boolean

操作,我們添加了 channel 操作,比如讀或寫,或者讀和寫混合。

Select 語句是阻塞的,除非它有預設情況

( 稍後我們將看到 )。一旦其中一個條件滿足,它就會解除阻塞。那麼,

當一個案例多個 case 滿足呢?

如果所有的 case 語句 ( channel 操作 ) 都阻塞了,那麼 select 語句將等待其中一個 case 語句 ( 其 channel 操作 ) 解除阻塞,然後執行該 case。如果一些或所有的 channel 操作是非阻塞的,那麼將隨機選擇一個非阻塞 case 並立即執行。

為了解釋以上例子,我們啟動了兩個獨立 channel 的 Goroutines。然後介紹了兩個案例的 select 語句。一種情況從

chan1

讀取值,另一種情況從

chan2

讀取值。因為這些 channel 是無緩衝的,所以讀操作將被阻塞 ( 寫操作也一樣 )。所以這兩種選擇都是阻塞的。因此

select

將等待其中一個

case

被解除阻塞。

當程式位於

select

語句時,main Goroutine 將阻塞,它將排程 select 語句中出現的所有 Goroutine ( 一次一個 ),即

service1

service2

service1

等待 3 秒,然後透過寫入

chan1

解除阻塞。類似地,

service2

等待 5 秒,然後透過寫入

chan2

解除阻塞。因為

service1

service2

更早解除阻塞,所以

case1

將首先解除阻塞,因此將執行該案例,並忽略其他 case ( 這裡是 case2 )。一旦完成了 case 的執行,主函式的執行將繼續下去。

上面的程式模擬了真實的 Web 服務,其中負載均衡器收到數百萬個請求,它必須從可用的服務之一返回響應。使用 Goroutines、channel 和 select,我們可以請求多個服務來響應,可以使用快速響應的服務。

為了模擬當所有情況都阻塞時,響應幾乎同時可用,我們可以簡單地刪除

Sleep

函式呼叫。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/giSkk

qt8XHb

以上程式產生以下結果 ( 您可能會得到不同的結果 )

main() started 0s

service2() started 481 µ s

Response from service 2 Hello from service 2 981。1 µ s

main() stopped 981。1 µ s

但有時,它也可能是

main() started 0s

service1() started 484。8 µ s

Response from service 1 Hello from service 1 984 µ s

main() stopped 984 µ s

這是因為

chan1

chan2

操作幾乎同時發生,但是在執行和排程上仍然存在一些時間差。

要模擬所有情況都是非阻塞且響應同時可用時,可以使用有緩衝 channel。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/RLRGE

mFQP3f

以上程式產生下面的結果

main() started 0s

Response from chan2 Value 1 0s

main() stopped 1。0012ms

在某些情況下,它也可能是

main() started 0s

Response from chan1 Value 1 0s

main() stopped 1。0012ms

在上面的程式中,兩個 channel 的緩衝區中都有兩個值。由於我們在緩衝區容量 2 的 channel 中傳送了兩個值,這些 channel 操作不會阻塞,控制將轉到

select

語句。由於從緩衝 channel 讀取是非阻塞操作,直到整個緩衝區為空,並且在 case 條件下只讀取一個值,所以所有 case 操作都是非阻塞操作。因此,Go runtime 將隨機選擇一個 case 語句。

default case

switch

語句一樣,

select

語句也有

default

case。

default case 是非阻塞的

。但這還不是全部,

default

case 使得預設情況下

select

語句

總是非阻塞的

。這意味著,在任何 channel (

有緩衝或無緩衝

) 上的傳送和接收操作總是非阻塞的。

如果某個值在任何 channel 上可用,則

select

將執行該 case。否則,它將立即執行

default

case。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/rFMpc

80EuT3

在上面的程式中,由於 channel 是無緩衝的,而且值在兩個 channel 操作中不能立即可用,因此將執行

default

case。如果上面的

select

語句沒有

default

case,那麼

select

就會阻塞,而回應就會不同。

由於在

default

中,

select

是非阻塞的,排程器不會從主 Goroutine 獲得排程可用 Goroutine 的呼叫。但是我們可以透過呼叫

time。Sleep

來手動實現。這樣,所有的 Goroutine 都會執行並且結束,將控制權返回給

main

Goroutine,它會在一段時間後醒來。當主 Goroutine 喚醒時,channel 將立即具有可用的值。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/eD0NH

xHm9hN

因此,上述程式產生以下結果

main() started 0s

service1() started 0s

service2() started 0s

Response from service 1 Hello from service 1 3。0001805s

main() stopped 3。0001805s

在某些情況下,它也可能是

main() started 0s

service1() started 0s

service2() started 0s

Response from service 2 Hello from service 2 3。0000957s

main() stopped 3。0000957s

死鎖

當沒有可用的 channel 傳送或接收資料時,

default

case 是有用的。為了避免死鎖,我們可以使用

default

case。這是有可能的,因為由於有

default

case,所有 channel 操作都是非阻塞的,如果資料不能立即可用,Go 不會安排任何其他 Goroutines 傳送資料到 channel。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/S3Wxu

qb8lMF

與接收操作類似,在傳送操作中,如果其他 Goroutine 正在休眠 (

未準備好接收值

),則執行

default

case。

空 channel

我們知道,channel 的預設值為

nil

。因此,我們不能在

nil

channel 上執行傳送或接收操作。但是在這種情況下,當 select 語句中使用

nil

channel 時,它將丟擲以下錯誤之一或兩個錯誤。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/uhraF

ubcF4S

從上面的結果中,我們可以看到

select( 無 case)

意味著

select

語句實際上是空的,

因為帶有 nil channel 的 case 被忽略了

。但是由於空

select{}

語句阻塞了主 Goroutine,並且

service

Goroutine 在它的位置被排程,所以在

nil

通道上的通道操作將丟擲

chan send (nil chan)

錯誤。為了避免這種情況,我們使用

default

case。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/upLsz

52_CrE

上述程式不僅忽略

case

塊,而且立即執行

default

case。因此排程器沒有時間來排程

service

Goroutine。但這確實是一個糟糕的設計。你應該檢查通道的

nil

值。

新增超時

上面的程式不是很有用,因為只執行

default

case。但有時,我們希望任何可用的服務都應該在適當的時間響應,如果它沒有響應,那麼就應該執行

default

case。這可以透過使用在定義時間後解除阻塞的 channel 操作來完成。該 channel 操作由

time

包的

after

函式提供。我們來看一個例子。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/mda2t

2IQK__X

以上程式在 2 秒後產生以下結果

main() started 0s

No response received 2。0010958s

main() stopped 2。0010958s

在上面的程式中,

<-time。After(2 * time。 second)

在 2 秒後解除阻塞,返回它被解除阻塞的時間,但是在這裡,我們對它的返回值不感興趣。因為它也像一個 Goroutine,我們有 3 個 Goroutine,這個首先從其中接觸阻塞。因此,執行與 Goroutine 操作相對應的 case。

這是很有用的,因為您不希望等待來自可用服務的響應太久,而使用者必須等待很長時間才能從服務中獲得任何東西。如果加上

10 *time。second

。第二,在上面的例子中,將列印

service1

的響應,我想現在已經很明顯了。

空 select

for{}

空迴圈一樣,空

select{}

語法也是有效的,但有一個問題。正如我們所知,

select

語句被阻塞,直到其中一個 case 解除阻塞,而且由於沒有

case

語句可用來解除阻塞,main Goroutine 將永遠阻塞,從而導致死鎖。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/-pBd-

BLMFOu

在上面的程式中,我們知道

select

會阻塞主 Goroutine,排程器會排程另一個可用的 Goroutine,即

service

。但是在那之後,它會掛起,排程器不得不排程另一個可用的 Goroutine,但是由於主協程被阻塞,沒有其他 Goroutine 可用,導致死鎖。

main() started

Hello from service!

fatal error: all Goroutines are asleep - deadlock!

goroutine 1 [select (no cases)]:

main。main()

program。Go:16 +0xba

exit status 2

WaitGroup

讓我們設想這樣一種情況 : 您需要知道是否所有的 Goroutines 都完成了它們的工作。這與選擇只需要一個條件為

true

的地方有些相反,但是在這裡需要

所有條件為 true 才能解鎖主 Goroutine

。這裡條件是 channel 操作成功。

WaitGroup

是一個具有計數器值的結構,它跟蹤生成了多少個 Goroutines 以及完成了多少工作。當這個計數器達到 0 時,表示所有的 Goroutines 都完成了它們的工作。

讓我們看一個例子,看看語法。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/8qrAD

9ceOfJ

在上面的程式中,我們建立了

sync。WaitGroup

型別的空結構 (

帶有零值欄位

) wg。WaitGroup struct 有未匯出的欄位,如

noCopy

state1

sema

,其內部實現我們不需要知道。這個結構有三個方法,即

Add

Wait

Done

Add

方法需要一個

int

型別的引數,這是 WaitGroup 計數器的增量。

Counter

只是一個預設值為 0 的整數。它包含了正在執行的 Goroutine 的數量。在建立 WaitGroup 時,它的計數器值為 0,我們可以使用

Add

方法透過傳遞

delta

作為引數來遞增它。請記住,

counter

不能自動理解 Goroutine 何時啟動,因此我們需要手動增加它。

wait

方法用於從呼叫當前 Goroutine 的位置阻塞該 Goroutine。一旦計數器達到 0,goroutine 將解除阻塞。因此,我們需要一些東西來減少計數器。

Done

方法使計數器遞減。它不接受任何引數,因此它只減 1。

在上面的程式中,建立

wg

後,我們執行

for

迴圈 3 次。在每個回合中,我們啟動一個 Goroutine,並增加計數器 1。這意味著,現在我們有 3 個 Goroutine 等待執行,而 WaitGroup 計數器是 3。注意,我們在 Goroutine 中傳遞了指向

wg

的指標。這是因為在 Goroutine 中,一旦我們完成了 Goroutine 應該做的事情,我們需要呼叫

Done

方法來減少計數器。如果

wg

作為值傳遞,

wg

main

中不會減少。這是很明顯的。

執行

for

迴圈之後,我們仍然沒有將控制權傳遞給其他 Goroutines。這是透過呼叫

wg

上的

Wait

方法來完成的,比如

wg。Wait()

。這將阻塞主 Goroutine,直到計數器達到 0。一旦計數器達到 0,因為從 3 個 Goroutine,我們呼叫了

wg

上的

Done

方法 3 次,

main

Goroutine 將解除阻塞,並開始執行進一步的程式碼。

因此上面的程式產生下面的結果

main() started

Service called on instance 2

Service called on instance 3

Service called on instance 1

main() stopped

由於 Goroutines 的執行順序可能會有所不同,因此上述結果可能對您有所不同。

Add

方法接受型別為

int

,這意味著

delta

也可以是負的。想要了解更多,請訪問這裡的

官方文件

工作池

顧名思義,工作池是同時工作以執行任務的 Goroutines 的集合。在 WaitGroup 中,我們看到了一些 Goroutines 的集合,但他們沒有具體的工作。一旦您在它們中放入 channel,它們就有一些工作要做,併成為工作池。

因此,工作池背後的概念是維護一個

worker Goroutines

池,它接收一些任務並返回結果。一旦他們都完成了他們的工作,我們收集結果。所有這些 Goroutine 都為個人目的使用相同的通道。

讓我們看一個簡單的例子,有兩個 channel,即

tasks

results

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/IYiMV

1I4lCj

別擔心,我會一步一步解釋這裡發生的事情。

sqrWorker

是一個工作函式,它接受

task

channel、

result

channel 和

id

。這個 Goroutine 的工作是將從

task

channel 接收到的數字的平方傳送到

result

channel。

在 main 函式中,我們建立了具有緩衝區容量大小為 10 的

task

result

channel。因此,任何傳送操作都是非阻塞的,直到緩衝區滿為止。因此,設定大的緩衝區值並不是一個壞主意。

然後,我們使用上面兩個 channel 和 id 引數生成多個

sqrWorker

例項作為 Goroutines,以獲取關於哪個 Worker 正在執行任務的資訊。

然後我們將 5 個任務傳遞給

task

channel,這些

task

channel 是非阻塞的。

因為我們已經完成了

task

channel,所以關閉了它。這不是必須的,但是如果有一些 bug 進來,它會在將來節省很多時間。

然後使用 for 迴圈,經過 5 次迭代,我們從

result

channel 提取資料。由於空緩衝區上的讀操作是阻塞的,因此將從工作池排程一個 Goroutine。在 Goroutine 返回一些結果之前,main Goroutine 將被阻塞。

由於我們在 worker Goroutine 中模擬阻塞操作,因此呼叫排程器來排程另一個可用的 Goroutine,直到它可用為止。當 worker Goroutine 可用時,它將寫入

result

channel。由於在緩衝區滿之前,對緩衝 channel 的寫入是非阻塞的,所以在這裡對

result

channel 的寫入是非阻塞的。此外,噹噹前工作執行緒 Goroutine 不可用時,將使用任務緩衝區中的值執行多個其他工作執行緒 Goroutine。在所有工作者 Goroutines 消耗任務之後,當

task

channel 緩衝區為空時,範圍迴圈結束。當

task

channel 關閉時,它不會丟擲死鎖錯誤。

有時,所有的工作執行緒都可以睡眠,所以主執行緒會醒來並工作,直到

result

channel 緩衝區再次清空。

所有的 worker Goroutine 死後,

main

Goroutine 將重新獲得控制權,並從

result

channel 列印剩餘的結果,繼續執行。

上面的例子雖然很長,但是很好地解釋了多個 Goroutine 如何在同一個 channel 上提供內容並優雅地完成工作。當員工的工作遇到阻礙時,goroutine 功能強大。如果刪除

time。Sleep()

呼叫,那麼只有一個 Goroutine 將執行此任務,因為在

for range

迴圈完成並在 Goroutine 死亡之前,不會排程其他 Goroutine。

您可以得到不同的結果,就像在前面的例子中一樣,這取決於您的系統有多快,因為如果所有的 worker Gorutine 都被阻塞了,即使是一微秒,main Goroutine 也會像解釋的那樣被喚醒。

現在,讓我們使用同步 Goroutines 的 WaitGroup 概念。使用前面的 WaitGroup 示例,我們可以獲得相同的結果,但更優雅。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/0rRfc

hn7sL1

上面的結果看起來很整潔,因為在 main Goroutine 中的

result

channel 上的讀取操作是非阻塞的,因為

result

channel 已經由 result 填充,而 main Goroutine 被

wg。Wait()

呼叫阻塞。使用

waitGroup

,我們可以防止很多 ( 不必要的 ) 上下文切換 ( 排程 ),這裡是 7,而前面的示例中是 9。

但這是有代價的,因為你必須等待所有的工作都完成。

Mutex

互斥是 Go 中最簡單的概念之一。但是在我解釋它之前,讓我們先理解競態條件是什麼。goroutines 有獨立的棧,因此它們之間不共享任何資料。但是在某些情況下,堆中的某些資料可能在多個 Goroutine 之間共享。在這種情況下,多個 Goroutine 試圖在相同的記憶體位置操作資料,從而導致意想不到的結果。我將向您展示一個簡單的示例。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/MQNep

ChxiEa

在上面的程式中,我們生成了 1000 個 Goroutines,它增加了初始值為

0

的全域性變數

i

的值。由於我們正在實現 WaitGroup,所以我們希望所有 1000 個 Goroutines 都將

i

的值逐個遞增,結果

i

的最終值為 1000。當 main Goroutine 在

wg。Wait()

呼叫後再次執行時,我們將輸出

i

value of i after 1000 operations is 937

什麼?為什麼小於 1000 呢?看起來有些 Goroutine 沒用。但實際上,我們的程式有一個競態條件。讓我們看看會發生什麼。

i = i + 1

的計算有 3 個步驟

(1)得到 i 的值

(2)i 的增量值為 1

(3)用新值更新 i 的值

讓我們設想一個場景,在這些步驟之間安排了不同的 Goroutine。例如,讓我們考慮 1000 個 Goroutines 中的兩個 Goroutines,即 G1 和 G2。

i

0

時,G1 首先開始,執行前兩個步驟,

i

現在是

1

。但是在 G1 更新第 3 步中的

i

值之前,會排程新的 Goroutine G2 並執行所有步驟。但是對於 G2,

i

的值仍然是

0

,因此在執行步驟 3 之後,

i

將是 1。現在 G1 再次被安排完成步驟 3,並更新步驟 2 中

i

的值 1。在完美的世界裡,goroutines 在完成所有的 3 個步驟後被排程,兩個 Goroutines 的成功操作會產生

i

為 2 的值,但這裡不是這樣。因此,我們可以推測為什麼我們的程式沒有將

i

的值賦值為

1000

到目前為止,我們瞭解到 Goroutines 是合作安排的。除非一個 Goroutine 塊具有併發性課程中提到的條件之一,否則另一個 Goroutine 不會取代它。既然

i = i + 1

不是阻塞,為什麼 Go 排程器計劃另一個 Goroutine ?

您一定要在

stackoverflow

上檢視這個答案。

在任何情況下,您都不應該依賴 Go 的排程演算法並實現自己的邏輯來同步不同的 Goroutine。

一種確保每次只有一個 Goroutine 完成上述 3 個步驟的方法是實現互斥鎖。互斥 (

互斥

) 是程式設計中的一個概念,在這個概念中,一次只能有一個例程 ( 執行緒 ) 執行操作。這是透過一個獲取值上的鎖的例程來完成的,對它必須做的值做任何操作,然後釋放鎖。當值被鎖定時,沒有其他例程可以對其讀寫。

在 Go 中,互斥資料結構 ( map ) 是由

sync

包提供的。在 Go 中,在對可能導致競態條件的值執行任何操作之前,我們使用

mutex。Lock()

方法獲取一個鎖,然後是操作程式碼。一旦我們完成了操作,在上面的程式

i = i + 1

中,我們使用

mutex。unlock()

方法來解鎖它。當鎖存在時,任何其他 Goroutine 試圖讀取或寫入

i

的值時,該 Goroutine 將阻塞,直到從第一個 Goroutine 解鎖操作為止。因此,只有 1 個 Goroutine 可以讀取或寫入

i

的值,從而避免了競態條件。請記住,在鎖定和解鎖之間的操作中出現的任何變數在整個操作解鎖之前都不能用於其他 Goroutines。

讓我們用互斥鎖修改前面的示例。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/xVFAX

_0Uig8

在上面的程式中,我們建立了一個互斥量

m

,並將它的指標傳遞給所有派生的 Goroutines。在開始對

i

進行操作之前,我們使用

m。lock()

語法獲得互斥物件

m

上的鎖,然後在操作之後使用

m。unlock()

語法解鎖它。上面的程式產生下面的結果。

value of i after 1000 operations is 1000

從上面的結果我們可以看到互斥幫助我們解決了競態條件。但是第一條規則是避免 Goroutines 之間共享資源。

您可以在執行

Go run -race program。Go

這樣的程式時,使用

race

引數在 Go 中測試競態條件。請在

這裡

閱讀更多關於 race 檢測器的資訊。

併發模式

併發有很多方法可以使我們的日常程式設計更加容易。以下是一些概念和方法,我們可以使用它們使程式更快和更可靠。

Generator ( 生產者 )

使用 channel,我們可以更好地實現生產者。如果生產者在計算上很昂貴,那麼我們也可以同時生成資料。這樣,程式就不必等待所有資料生成。例如,生成斐波那契數列。

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/1_2MD

eqQ3o5

使用

fib

函式,我們得到了一個可以迭代和利用從它接收到的資料的 channel。而在

fib

函式內部,由於我們必須返回一個只接收 channel,我們正在建立一個緩衝 channel 並在最後返回它。此函式的返回值將將此雙向 channel 轉換為單向接收 channel。在使用匿名 Goroutine 時,我們使用 for 迴圈將斐波那契數推入這個 channel。一旦完成 for 迴圈,我們就會從 Goroutine 內部關閉它。在

main

Goroutine 中,使用

for range

fib

函式呼叫,我們可以直接訪問這個 channel。

fan-in & fan-out (扇入和扇出)

扇入是一種多路複用策略,將多個 channel 的輸入組合起來產生一個輸出 channel。扇出是一種多路複用策略,其中單個 channel 被分成多個 channel。

package main

import (

“fmt”

“sync”

// return channel for input numbers

func getInputChan() <-chan int {

// make return channel

input := make(chan int, 100)

// sample numbers

numbers := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}

// run Goroutine

go func() {

for num := range numbers {

input <- num

}

// close channel once all numbers are sent to channel

close(input)

}()

return input

}

// returns a channel which returns square of numbers

func getSquareChan(input <-chan int) <-chan int {

// make return channel

output := make(chan int, 100)

// run Goroutine

go func() {

// push squares until input channel closes

for num := range input {

output <- num * num

}

// close output channel once for loop finishesh

close(output)

}()

return output

}

// returns a merged channel of `outputsChan` channels

// this produce fan-in channel

// this is veriadic function

func merge(outputsChan 。。。<-chan int) <-chan int {

// create a WaitGroup

var wg sync。WaitGroup

// make return channel

merged := make(chan int, 100)

// increase counter to number of channels `len(outputsChan)`

// as we will spawn number of Goroutines equal to number of channels received to merge

wg。Add(len(outputsChan))

// function that accept a channel (which sends square numbers)

// to push numbers to merged channel

output := func(sc <-chan int) {

// run until channel (square numbers sender) closes

for sqr := range sc {

merged <- sqr

}

// once channel (square numbers sender) closes,

// call `Done` on `WaitGroup` to decrement counter

wg。Done()

}

// run above `output` function as groutines, `n` number of times

// where n is equal to number of channels received as argument the function

// here we are using `for range` loop on `outputsChan` hence no need to manually tell `n`

for _, optChan := range outputsChan {

go output(optChan)

}

// run Goroutine to close merged channel once done

go func() {

// wait until WaitGroup finishesh

wg。Wait()

close(merged)

}()

return merged

}

func main() {

// step 1: get input numbers channel

// by calling `getInputChan` function, it runs a Goroutine which sends number to returned channel

chanInputNums := getInputChan()

// step 2: `fan-out` square operations to multiple Goroutines

// this can be done by calling `getSquareChan` function multiple times where individual function call returns a channel which sends square of numbers provided by `chanInputNums` channel

// `getSquareChan` function runs Goroutines internally where squaring operation is ran concurrently

chanOptSqr1 := getSquareChan(chanInputNums)

chanOptSqr2 := getSquareChan(chanInputNums)

// step 3: fan-in (combine) `chanOptSqr1` and `chanOptSqr2` output to merged channel

// this is achieved by calling `merge` function which takes multiple channels as arguments

// and using `WaitGroup` and multiple Goroutines to receive square number, we can send square numbers

// to `merged` channel and close it

chanMergedSqr := merge(chanOptSqr1, chanOptSqr2)

// step 4: let‘s sum all the squares from 0 to 9 which should be about `285`

// this is done by using `for range` loop on `chanMergedSqr`

sqrSum := 0

// run until `chanMergedSqr` or merged channel closes

// that happens in `merge` function when all Goroutines pushing to merged channel finishes

// check line no。 86 and 87

for num := range chanMergedSqr {

sqrSum += num

}

// step 5: print sum when above `for loop` is done executing which is after `chanMergedSqr` channel closes

fmt。Println(“Sum of squares between 0-9 is”, sqrSum)

}

Go 中的 channel 解析— Go 中的併發性

https://

play。golang。org/p/hATZm

b6P1-u

我不打算解釋上面的程式是如何工作的,因為我已經在程式中添加了註釋來解釋了這一點。以上程式產生以下結果

Sum of squares between 0-9 is 285

via: Go 中的 channel 解析- Go 中的併發性

作者:

Uday Hiwarale

譯者:

wumansgy

polaris1119

本文由

GCTT

原創編譯,

Go語言中文網

榮譽推出

標簽: channel  goroutine  阻塞  main  我們