您當前的位置:首頁 > 體育

Rust語言聖經53 - 執行緒同步:訊息傳遞

作者:由 孫飛Sunface 發表于 體育時間:2022-01-11

在多執行緒間有多種方式可以共享、傳遞資料,最常用的方式就是透過訊息傳遞或者將鎖和

Arc

聯合使用,而對於前者,在程式設計界還有一個大名鼎鼎的

Actor執行緒模型

為其背書,典型的有Erlang語言,還有Go語言中很經典的一句話:

Do not communicate by sharing memory; instead, share memory by communicating

而對於後者,我們將在下一節中進行講述。

訊息通道

與Go語言內建的

chan

不同,Rust是在標準庫裡提供了訊息通道(

channel

),你可以將其想象成一場直播,多個主播聯合起來在搞一場直播,最終內容透過通道傳輸給螢幕前的我們,其中主播被稱之為

傳送者

,觀眾被稱之為

接收者

,顯而易見的是:一個通道應該支援多個傳送者和接收者。

但是,在實際使用中,我們需要使用不同的庫來滿足諸如:

多傳送者 -> 單接收者,多傳送者 -> 多接收者

等場景形式,此時一個標準庫顯然就不夠了,不過別急,讓我們先從標準庫講起。

多傳送者,單接收者

標準庫提供了通道

std::sync::mpsc

,其中

mpsc

multiple producer, single consumer

的縮寫,代表了該通道支援多個傳送者,但是隻支援唯一的接收者。 當然,支援多個傳送者也意味著支援單個傳送者,我們先來看看單傳送者、單接收者的簡單例子:

use

std

::

sync

::

mpsc

use

std

::

thread

fn

main

()

{

// 建立一個訊息通道, 返回一個元組:(傳送者,接收者)

let

tx

rx

=

mpsc

::

channel

();

// 建立執行緒,併發送訊息

thread

::

spawn

move

||

{

// 傳送一個數字1, send方法返回Result,透過unwrap進行快速錯誤處理

tx

send

1

)。

unwrap

();

// 下面程式碼將報錯,因為編譯器自動推匯出通道傳遞的值是i32型別,那麼Option型別將產生不匹配錯誤

// tx。send(Some(1))。unwrap()

});

// 在主執行緒中接收子執行緒傳送的訊息並輸出

println

“receive {}”

rx

recv

()。

unwrap

());

}

以上程式碼並不複雜,但仍有幾點需要注意:

tx

rx

對應傳送者和接收者,它們的型別由編譯器自動推導:

tx。send(1)

傳送了整數,因此它們分別是

mpsc::Sender

mpsc::Receiver

型別,需要注意,由於內部是泛型實現,一旦型別被推導確定,該通道就只能傳遞對應型別的值, 例如此例中非

i32

型別的值將導致編譯錯誤

接收訊息的操作

rx。recv()

會阻塞當前執行緒,直到讀取到值,或者通道被關閉

需要使用

move

tx

的所有權轉移到子執行緒的閉包中

在註釋中提到

send

方法返回一個

Result

,說明它有可能返回一個錯誤,例如接收者被

drop

導致了傳送的值不會被任何人接收,此時繼續傳送毫無意義,因此返回一個錯誤最為合適,在程式碼中我們僅僅使用

unwrap

進行了快速處理,但在實際專案中你需要對錯誤進行進一步的處理。

同樣的,對於

recv

方法來說,當傳送者關閉時,它也會接收到一個錯誤,用於說明不會再有任何值被髮送過來。

不阻塞的try_recv方法

除了上述

recv

方法,還可以使用

try_recv

嘗試接收一次訊息,該方法並

不會阻塞執行緒

,當通道中沒有訊息時,它會立刻返回一個錯誤:

use

std

::

sync

::

mpsc

use

std

::

thread

fn

main

()

{

let

tx

rx

=

mpsc

::

channel

();

thread

::

spawn

move

||

{

tx

send

1

)。

unwrap

();

});

println

“receive {:?}”

rx

try_recv

());

}

由於子執行緒的建立需要時間,因此

println!

try_recv

方法會先執行,而此時子執行緒的

訊息還未被髮出

try_recv

會嘗試立即讀取一次訊息,因為訊息沒有發出,此次讀取最終會報錯,且主執行緒執行結束(可悲的是,相對於主執行緒中的程式碼,子執行緒的建立速度實在是過慢,直到主執行緒結束,都無法完成子執行緒的初始化。。):

receive Err(Empty)

如上,

try_recv

返回了一個錯誤,錯誤內容是

Empty

,代表通道並沒有訊息。如果你嘗試把

println!

複製一些行,就會發現一個有趣的輸出:

···

receive Err(Empty)

receive Ok(1)

receive Err(Disconnected)

···

如上,當子執行緒建立成功且傳送訊息後,主執行緒會接收到

Ok(1)

的訊息內容,緊接著子執行緒結束,傳送者也隨著被

drop

,此時接收者又會報錯,但是這次錯誤原因有所不同:

Disconnected

代表傳送者已經被關閉。

傳輸具有所有權的資料

使用通道來傳輸資料,一樣要遵循Rust的所有權規則:

若值的型別實現了

Copy

特徵,則直接複製一份該值,然後傳輸過去,例如之前的

i32

型別

若值沒有實現

Copy

,則它的所有權會被轉移給接收端,在傳送端繼續使用該值將報錯

一起來看看第二種情況:

use

std

::

sync

::

mpsc

use

std

::

thread

fn

main

()

{

let

tx

rx

=

mpsc

::

channel

();

thread

::

spawn

move

||

{

let

s

=

String

::

from

“我,飛走咯!”

);

tx

send

s

)。

unwrap

();

println

“val is {}”

s

);

});

let

received

=

rx

recv

()。

unwrap

();

println

“Got: {}”

received

);

}

以上程式碼中,

String

底層的字串是儲存在堆上,被沒有實現

Copy

特徵,當它被髮送後,會將所有權從傳送端的

s

轉移給接收端的

received

,之後

s

將無法被使用:

error[E0382]: borrow of moved value: `s`

——> src/main。rs:10:31

|

8 | let s = String::from(“我,飛走咯!”);

| - move occurs because `s` has type `String`, which does not implement the `Copy` trait // 所有權被轉移,由於`String`沒有實現`Copy`特徵

9 | tx。send(s)。unwrap();

| - value moved here // 所有權被轉移走

10 | println!(“val is {}”, s);

| ^ value borrowed here after move // 所有權被轉移後,依然對s進行了借用

各種細節不禁令人感嘆:Rust還是安全!假如沒有所有權的保護,

String

字串將被兩個執行緒同時持有,任何一個執行緒對字串內容的修改都會導致另外一個執行緒持有的字串被改變,除非你故意這麼設計,否則這就是不安全的隱患。

使用for進行迴圈接收

下面來看看如何連續接收通道中的值:

use

std

::

sync

::

mpsc

use

std

::

thread

use

std

::

time

::

Duration

fn

main

()

{

let

tx

rx

=

mpsc

::

channel

();

thread

::

spawn

move

||

{

let

vals

=

vec

String

::

from

“hi”

),

String

::

from

“from”

),

String

::

from

“the”

),

String

::

from

“thread”

),

];

for

val

in

vals

{

tx

send

val

)。

unwrap

();

thread

::

sleep

Duration

::

from_secs

1

));

}

});

for

received

in

rx

{

println

“Got: {}”

received

);

}

}

在上面程式碼中,主執行緒和子執行緒是併發執行的,子執行緒在不停的

傳送訊息 -> 休眠1秒

,與此同時,主執行緒使用

for

迴圈

阻塞

的從

rx

迭代器

中接收訊息,當子執行緒執行完成時,傳送者

tx

會隨之被

drop

,此時

for

迴圈將被終止,最終

main

執行緒成功結束。

使用多傳送者

由於子執行緒會拿走傳送者的所有權,因此我們必須對傳送者進行克隆,然後讓每個執行緒拿走它的一份複製:

use

std

::

sync

::

mpsc

use

std

::

thread

fn

main

()

{

let

tx

rx

=

mpsc

::

channel

();

let

tx1

=

tx

clone

();

thread

::

spawn

move

||

{

tx

send

String

::

from

“hi from raw tx”

))。

unwrap

();

});

thread

::

spawn

move

||

{

tx1

send

String

::

from

“hi from cloned tx”

))。

unwrap

();

});

for

received

in

rx

{

println

“Got: {}”

received

);

}

}

程式碼並無太大區別,就多了一個對傳送者的克隆

let tx1 = tx。clone();

,然後一個子執行緒拿走

tx

的所有權,另一個子執行緒拿走

tx1

的所有權,皆大歡喜。

但是有幾點需要注意:

需要所有的傳送者都被

drop

掉後,接收者

rx

才會收到錯誤,進而跳出

for

迴圈,最終結束主執行緒

這裡雖然用了

clone

但是並不會影響效能,因為它並不在熱點程式碼路徑中,僅僅會被執行一次

由於兩個子執行緒誰建立完成是未知的,因此哪條訊息先發送也是未知的,最終主執行緒的輸出順序也不確定

訊息順序

上述第三點的訊息順序僅僅是因為執行緒建立引起的,並不代表通道中的執行緒是無序的,對於通道而言,訊息的傳送順序和接收順序是一直的,滿足

FIFO

原則(先進先出)。

由於篇幅有限,具體的程式碼這裡就不再給出,感興趣的讀者可以自己驗證下。

同步和非同步通道

Rust標準庫的

mpsc

通道其實分為兩種型別:同步和非同步。

非同步通道

之前我們使用的都是非同步通道:無論接收者是否正在接收訊息,訊息傳送者在傳送訊息時都不會阻塞:

use

std

::

sync

::

mpsc

use

std

::

thread

use

std

::

time

::

Duration

fn

main

()

{

let

tx

rx

=

mpsc

::

channel

();

let

handle

=

thread

::

spawn

move

||

{

println

“傳送之前”

);

tx

send

1

)。

unwrap

();

println

“傳送之後”

);

});

println

“睡眠之前”

);

thread

::

sleep

Duration

::

from_secs

3

));

println

“睡眠之後”

);

println

“收到值 {}”

rx

recv

()。

unwrap

());

handle

join

()。

unwrap

();

}

執行後輸出如下:

睡眠之前

傳送之前

傳送之後

//···睡眠3秒

睡眠之後

收到值 1

主執行緒因為睡眠阻塞了3秒,因此並沒有進行訊息接收,而子執行緒卻在此期間輕鬆完成了訊息的傳送。等主執行緒睡眠結束後,才姍姍來遲的從通道中接收了子執行緒老早之前傳送的訊息。

從輸出還可以看出,

傳送之前

傳送之後

是連續輸出的,沒有受到接收端主執行緒的任何影響,因此透過

mpsc::channel

建立的通道是非同步通道。

同步通道

與非同步通道相反,同步通道

傳送訊息是阻塞的,只有在訊息被接收後才解除阻塞

例如:

use

std

::

sync

::

mpsc

use

std

::

thread

use

std

::

time

::

Duration

fn

main

()

{

let

tx

rx

=

mpsc

::

sync_channel

0

);

let

handle

=

thread

::

spawn

move

||

{

println

“傳送之前”

);

tx

send

1

)。

unwrap

();

println

“傳送之後”

);

});

println

“睡眠之前”

);

thread

::

sleep

Duration

::

from_secs

3

));

println

“睡眠之後”

);

println

“receive {}”

rx

recv

()。

unwrap

());

handle

join

()。

unwrap

();

}

執行後輸出如下:

睡眠之前

傳送之前

//···睡眠3秒

睡眠之後

收到值 1

傳送之後

可以看出,主執行緒由於睡眠被阻塞導致無法接收訊息,因此子執行緒的傳送也一直被阻塞,直到主執行緒結束睡眠併成功接收訊息後,傳送才成功:

傳送之後

的輸出是在

收到值 1

之後,說明

只有接收訊息徹底成功後,傳送訊息才算完成

訊息快取

細心的讀者可能已經發現在建立同步通道時,我們傳遞了一個引數

0

mpsc::sync_channel(0);

,這是什麼意思呢?

答案不急給出,先將

0

改成

1

,然後再執行試試:

睡眠之前

傳送之前

傳送之後

睡眠之後

receive 1

納尼。。竟然得到了和非同步通道一樣的效果:根本沒有等待主執行緒的接收開始,訊息傳送就立即完成了! 難道同步通道變成了非同步通道? 別急,將子執行緒中的程式碼修改下試試:

println

“首次傳送之前”

);

tx

send

1

)。

unwrap

();

println

“首次傳送之後”

);

tx

send

1

)。

unwrap

();

println

“再次傳送之後”

);

在子執行緒中,我們又多發了一條訊息,此時輸出如下:

睡眠之前

首次傳送之前

首次傳送之後

//···睡眠3秒

睡眠之後

receive 1

再次傳送之後

Bingo,更奇怪的事出現了,第一條訊息瞬間傳送完成,沒有阻塞,而傳送第二條訊息時卻符合同步通道的特點:阻塞了,直到主執行緒接收後,才傳送完成。

其實,一切的關鍵就在於

1

上,該值可以用來指定同步通道的訊息快取條數,當你設定為

N

時,傳送者就可以無阻塞的往通道中傳送

N

條訊息,當訊息緩衝佇列滿了後,新的訊息傳送將被阻塞(如果沒有接收者消費緩衝佇列中的訊息,那麼第

N+1

條訊息就將觸發傳送阻塞)。

問題又來了,非同步通道建立時完全沒有這個緩衝值引數

mpsc::channel()

,它的緩衝值怎麼設定呢? 額。。。都非同步了,都可以無限傳送了,都有摩托車了,還要腳踏車做啥子哦?事實上非同步通道的緩衝上限取決於你的記憶體大小,不要撐爆就行。

因此,使用非同步訊息雖然能非常高效且不會造成傳送執行緒的阻塞,但是存在訊息未及時消費,最終記憶體過大的問題。在實際專案中,可以考慮使用一個帶緩衝值的同步通道來避免這種風險。

關閉通道

之前我們數次提到了通道關閉,並且提到了當通道關閉後,傳送訊息或接收訊息將會報錯。那麼如何關閉通道呢? 很簡單:

所有傳送者被drop或者所有接收者被drop後,通道會自動關閉

神奇的是,這件事是在編譯期實現的,完全沒有執行期效能損耗!只能說Rust的

Drop

特徵YYDS!

傳輸多種型別的資料

之前提到過,一個訊息通道只能傳輸一種型別的資料,如果你想要傳輸多種型別的資料,可以為每個型別建立一個通道,你也可以使用列舉型別來實現:

se

std

::

sync

::

mpsc

::

{

self

Receiver

Sender

};

enum

Fruit

{

Apple

u8

),

Orange

String

}

fn

main

()

{

let

tx

rx

Sender

<

Fruit

>

Receiver

<

Fruit

>

=

mpsc

::

channel

();

tx

send

Fruit

::

Orange

“sweet”

to_string

()))。

unwrap

();

tx

send

Fruit

::

Apple

2

))。

unwrap

();

for

_

in

0

。。

2

{

match

rx

recv

()。

unwrap

()

{

Fruit

::

Apple

count

=>

println

“received {} apples”

count

),

Fruit

::

Orange

flavor

=>

println

“received {} oranges”

flavor

),

}

}

}

如上所示,列舉型別還能讓我們帶上想要傳輸的資料,但是有一點需要注意,Rust會按照列舉中佔用記憶體最大的那個成員進行記憶體對齊,這意味著就算你傳輸的是列舉中佔用記憶體最小的成員,它佔用的記憶體依然和最大的成員相同, 因此會造成記憶體上的浪費。

新手容易遇到的坑

mpsc

雖然相當簡潔明瞭,但是在使用起來還是可能存在坑:

use

std

::

sync

::

mpsc

fn

main

()

{

use

std

::

thread

let

send

recv

=

mpsc

::

channel

();

let

num_threads

=

3

for

i

in

0

。。

num_threads

{

let

thread_send

=

send

clone

();

thread

::

spawn

move

||

{

thread_send

send

i

)。

unwrap

();

println

“thread {:?} finished”

i

);

});

}

// 在這裡drop send。。。

for

x

in

recv

{

println

“Got: {}”

x

);

}

println

“finished iterating”

);

}

以上程式碼看起來非常正常,但是執行後主執行緒會一直阻塞,最後一行列印輸出也不會被執行,原因在於: 子執行緒拿走的是複製後的

send

的所有權,這些複製會在子執行緒結束後被

drop

,因此無需擔心,但是

send

本身卻直到

main

函式的結束才會被

drop

之前提到,通道關閉的兩個條件:傳送者全部

drop

或接收者被

drop

,要結束

for

迴圈顯然是要求傳送者全部

drop

,但是由於

send

自身沒有被

drop

,會導致該迴圈永遠無法結束,最終主執行緒會一直阻塞。

解決辦法很簡單,

drop

send

即可:在程式碼中的註釋下面新增一行

drop(send);

mpmc、更好的效能

如果你需要mpmc(多傳送者,多接收者)或者需要更高的效能,可以考慮第三方庫:

crossbeam-channel

, 老牌強庫,功能較全,效能較強,之前是獨立的庫,但是後面合併到了

crossbeam

主倉庫中

flume

, 官方給出的效能資料要比crossbeam更好些,但是貌似最近沒怎麼更新

弱弱的說一聲:若文章對你有所幫助,請賞俺一個贊, 感謝感謝感謝:)

本文節選自<>一書

標簽: 執行緒  傳送  TX  send  mpsc