您當前的位置:首頁 > 繪畫

RxJava2.0的初學者必備教程(九)

作者:由 Android架構 發表于 繪畫時間:2019-04-10

前言

好久不見朋友們,最近一段時間在忙工作上的事情,這兩天正好有點時間,趕緊寫下了這篇教程,免得大家說我太鹹了。

正題

先來回顧一下上上節,我們講Flowable的時候,說它採用了

響應式拉

的方式,我們還舉了個

葉問打小日本

的例子,再來回顧一下吧,我們說把

上游

看成

小日本

, 把

下游

當作

葉問

, 當呼叫

Subscription。request(1)

時,

葉問

就說

我要打一個!

然後

小日本

就拿出

一個鬼子

給葉問, 讓他打, 等葉問打死這個鬼子之後, 再次呼叫

request(10)

, 葉問就又說

我要打十個!

然後小日本又派出

十個鬼子

給葉問, 然後就在邊上看熱鬧, 看葉問能不能打死十個鬼子, 等葉問打死十個鬼子後再繼續要鬼子接著打。

但是不知道大家有沒有發現,在我們前兩節中的例子中,我們口中聲稱的

響應式拉

並沒有完全體現出來,比如這個例子:

Flowable

create

new

FlowableOnSubscribe

<

Integer

>()

{

@Override

public

void

subscribe

FlowableEmitter

<

Integer

>

emitter

throws

Exception

{

Log

d

TAG

“emit 1”

);

emitter

onNext

1

);

Log

d

TAG

“emit 2”

);

emitter

onNext

2

);

Log

d

TAG

“emit 3”

);

emitter

onNext

3

);

Log

d

TAG

“emit complete”

);

emitter

onComplete

();

}

},

BackpressureStrategy

ERROR

)。

subscribeOn

Schedulers

io

())

observeOn

AndroidSchedulers

mainThread

())

subscribe

new

Subscriber

<

Integer

>()

{

@Override

public

void

onSubscribe

Subscription

s

{

Log

d

TAG

“onSubscribe”

);

mSubscription

=

s

s

request

1

);

}

@Override

public

void

onNext

Integer

integer

{

Log

d

TAG

“onNext: ”

+

integer

);

mSubscription

request

1

);

}

@Override

public

void

onError

Throwable

t

{

Log

w

TAG

“onError: ”

t

);

}

@Override

public

void

onComplete

()

{

Log

d

TAG

“onComplete”

);

}

});

雖然我們在下游中是每次處理掉了一個事件之後才呼叫request(1)去請求下一個事件,也就是說葉問的確是在打死了一個鬼子之後才繼續打下一個鬼子,可是上游呢?上游真的是每次當下遊請求一個才拿出一個嗎?從上上篇文章中我們知道並不是這樣的,上游仍然是一開始就傳送了所有的事件,也就是說小日本並沒有等葉問打死一個才拿出一個,而是一開始就拿出了所有的鬼子,這些鬼子從一開始就在這兒排隊等著被打死。

有個故事是這麼說的:

楚人有賣盾與矛者,先譽其盾之堅,曰:“吾盾之堅,物莫能陷也。”俄而又譽其矛之利,曰:“吾矛之利,萬物莫不陷也。”市人詰之曰:“以子之矛陷子之盾,何如?”其人弗能應也。眾皆笑之。

沒錯,我們前後所說的就是自相矛盾了,這說明了什麼呢,說明我們的實現並不是一個完整的實現,那麼,究竟怎樣的實現才是完整的呢?

我們先自己來想一想,在下游中呼叫Subscription。request(n)就可以告訴上游,下游能夠處理多少個事件,那麼上游要根據下游的處理能力正確的去傳送事件,那麼上游是不是應該知道下游的處理能力是多少啊,對吧,不然,一個巴掌拍不響啊,這種事情得你情我願才行。

那麼上游從哪裡得知下游的處理能力呢?我們來看看上游最重要的部分,肯定就是

FlowableEmitter

了啊,我們就是透過它來發送事件的啊,來看看它的原始碼吧(別緊張,它的程式碼灰常簡單):

public

interface

FlowableEmitter

<

T

>

extends

Emitter

<

T

>

{

void

setDisposable

Disposable

s

);

void

setCancellable

Cancellable

c

);

/**

* The current outstanding request amount。

*

This method is thread-safe。

* @return the current outstanding request amount

*/

long

requested

();

boolean

isCancelled

();

FlowableEmitter

<

T

>

serialize

();

}

FlowableEmitter是個介面,繼承Emitter,Emitter裡面就是我們的onNext(),onComplete()和onError()三個方法。我們看到FlowableEmitter中有這麼一個方法:

long

requested

();

方法註釋的意思就是

當前外部請求的數量

,哇哦,這好像就是我們要找的答案呢。 我們還是實際驗證一下吧。

先來看

同步

的情況吧:

public

static

void

demo1

()

{

Flowable

create

new

FlowableOnSubscribe

<

Integer

>()

{

@Override

public

void

subscribe

FlowableEmitter

<

Integer

>

emitter

throws

Exception

{

Log

d

TAG

”current requested: “

+

emitter

requested

());

}

},

BackpressureStrategy

ERROR

subscribe

new

Subscriber

<

Integer

>()

{

@Override

public

void

onSubscribe

Subscription

s

{

Log

d

TAG

”onSubscribe“

);

mSubscription

=

s

}

@Override

public

void

onNext

Integer

integer

{

Log

d

TAG

”onNext: “

+

integer

);

}

@Override

public

void

onError

Throwable

t

{

Log

w

TAG

”onError: “

t

);

}

@Override

public

void

onComplete

()

{

Log

d

TAG

”onComplete“

);

}

});

}

這個例子中,我們在上游中打印出當前的request數量,下游什麼也不做。

我們先猜測一下結果,下游沒有呼叫request(),說明當前下游的處理能力為0,那麼上游得到的requested也應該是0,是不是呢?

來看看執行結果:

D/TAG: onSubscribe

D/TAG: current requested: 0

哈哈,結果果然是0,說明我們的結論基本上是對的。

那下游要是呼叫了request()呢,來看看:

public

static

void

demo1

()

{

Flowable

create

new

FlowableOnSubscribe

<

Integer

>()

{

@Override

public

void

subscribe

FlowableEmitter

<

Integer

>

emitter

throws

Exception

{

Log

d

TAG

”current requested: “

+

emitter

requested

());

}

},

BackpressureStrategy

ERROR

subscribe

new

Subscriber

<

Integer

>()

{

@Override

public

void

onSubscribe

Subscription

s

{

Log

d

TAG

”onSubscribe“

);

mSubscription

=

s

s

request

10

);

//我要打十個!

}

@Override

public

void

onNext

Integer

integer

{

Log

d

TAG

”onNext: “

+

integer

);

}

@Override

public

void

onError

Throwable

t

{

Log

w

TAG

”onError: “

t

);

}

@Override

public

void

onComplete

()

{

Log

d

TAG

”onComplete“

);

}

});

}

這次在下游中呼叫了request(10),告訴上游我要打十個,看看執行結果:

D/TAG: onSubscribe

D/TAG: current requested: 10

果然!上游的requested的確是根據下游的請求來決定的,那要是下游多次請求呢?比如這樣:

public

static

void

demo1

()

{

Flowable

create

new

FlowableOnSubscribe

<

Integer

>()

{

@Override

public

void

subscribe

FlowableEmitter

<

Integer

>

emitter

throws

Exception

{

Log

d

TAG

”current requested: “

+

emitter

requested

());

}

},

BackpressureStrategy

ERROR

subscribe

new

Subscriber

<

Integer

>()

{

@Override

public

void

onSubscribe

Subscription

s

{

Log

d

TAG

”onSubscribe“

);

mSubscription

=

s

s

request

10

);

//我要打十個!

s

request

100

);

//再給我一百個!

}

@Override

public

void

onNext

Integer

integer

{

Log

d

TAG

”onNext: “

+

integer

);

}

@Override

public

void

onError

Throwable

t

{

Log

w

TAG

”onError: “

t

);

}

@Override

public

void

onComplete

()

{

Log

d

TAG

”onComplete“

);

}

});

}

下游先呼叫了request(10), 然後又呼叫了request(100),來看看執行結果:

D/TAG: onSubscribe

D/TAG: current requested: 110

看來多次呼叫也沒問題,做了

加法

誒加法?對哦,只是做加法,那什麼時候做

減法

呢?

當然是傳送事件啦!

來看個例子吧:

public

static

void

demo2

()

{

Flowable

create

new

FlowableOnSubscribe

<

Integer

>()

{

@Override

public

void

subscribe

final

FlowableEmitter

<

Integer

>

emitter

throws

Exception

{

Log

d

TAG

”before emit, requested = “

+

emitter

requested

());

Log

d

TAG

”emit 1“

);

emitter

onNext

1

);

Log

d

TAG

”after emit 1, requested = “

+

emitter

requested

());

Log

d

TAG

”emit 2“

);

emitter

onNext

2

);

Log

d

TAG

”after emit 2, requested = “

+

emitter

requested

());

Log

d

TAG

”emit 3“

);

emitter

onNext

3

);

Log

d

TAG

”after emit 3, requested = “

+

emitter

requested

());

Log

d

TAG

”emit complete“

);

emitter

onComplete

();

Log

d

TAG

”after emit complete, requested = “

+

emitter

requested

());

}

},

BackpressureStrategy

ERROR

subscribe

new

Subscriber

<

Integer

>()

{

@Override

public

void

onSubscribe

Subscription

s

{

Log

d

TAG

”onSubscribe“

);

mSubscription

=

s

s

request

10

);

//request 10

}

@Override

public

void

onNext

Integer

integer

{

Log

d

TAG

”onNext: “

+

integer

);

}

@Override

public

void

onError

Throwable

t

{

Log

w

TAG

”onError: “

t

);

}

@Override

public

void

onComplete

()

{

Log

d

TAG

”onComplete“

);

}

});

}

程式碼很簡單,來看看執行結果:

D/TAG: onSubscribe

D/TAG: before emit, requested = 10

D/TAG: emit 1

D/TAG: onNext: 1

D/TAG: after emit 1, requested = 9

D/TAG: emit 2

D/TAG: onNext: 2

D/TAG: after emit 2, requested = 8

D/TAG: emit 3

D/TAG: onNext: 3

D/TAG: after emit 3, requested = 7

D/TAG: emit complete

D/TAG: onComplete

D/TAG: after emit complete, requested = 7

大家應該能看出端倪了吧,下游呼叫request(n) 告訴上游它的處理能力,上游每傳送一個

next事件

之後,requested就減一,

注意是next事件,complete和error事件不會消耗requested

,當減到0時,則代表下游沒有處理能力了,這個時候你如果繼續傳送事件,會發生什麼後果呢?當然是

MissingBackpressureException

啦,試一試:

public

static

void

demo2

()

{

Flowable

create

new

FlowableOnSubscribe

<

Integer

>()

{

@Override

public

void

subscribe

final

FlowableEmitter

<

Integer

>

emitter

throws

Exception

{

Log

d

TAG

”before emit, requested = “

+

emitter

requested

());

Log

d

TAG

”emit 1“

);

emitter

onNext

1

);

Log

d

TAG

”after emit 1, requested = “

+

emitter

requested

());

Log

d

TAG

”emit 2“

);

emitter

onNext

2

);

Log

d

TAG

”after emit 2, requested = “

+

emitter

requested

());

Log

d

TAG

”emit 3“

);

emitter

onNext

3

);

Log

d

TAG

”after emit 3, requested = “

+

emitter

requested

());

Log

d

TAG

”emit complete“

);

emitter

onComplete

();

Log

d

TAG

”after emit complete, requested = “

+

emitter

requested

());

}

},

BackpressureStrategy

ERROR

subscribe

new

Subscriber

<

Integer

>()

{

@Override

public

void

onSubscribe

Subscription

s

{

Log

d

TAG

”onSubscribe“

);

mSubscription

=

s

s

request

2

);

//request 2

}

@Override

public

void

onNext

Integer

integer

{

Log

d

TAG

”onNext: “

+

integer

);

}

@Override

public

void

onError

Throwable

t

{

Log

w

TAG

”onError: “

t

);

}

@Override

public

void

onComplete

()

{

Log

d

TAG

”onComplete“

);

}

});

}

還是這個例子,只不過這次只request(2), 看看執行結果:

D/TAG: onSubscribe

D/TAG: before emit, requested = 2

D/TAG: emit 1

D/TAG: onNext: 1

D/TAG: after emit 1, requested = 1

D/TAG: emit 2

D/TAG: onNext: 2

D/TAG: after emit 2, requested = 0

D/TAG: emit 3

W/TAG: onError: io。reactivex。exceptions。MissingBackpressureException: create: could not emit value due to lack of requests

at io。reactivex。internal。operators。flowable。FlowableCreate$ErrorAsyncEmitter。onOverflow(FlowableCreate。java:411)

at io。reactivex。internal。operators。flowable。FlowableCreate$NoOverflowBaseAsyncEmitter。onNext(FlowableCreate。java:377)

at zlc。season。rxjava2demo。demo。ChapterNine$4。subscribe(ChapterNine。java:80)

at io。reactivex。internal。operators。flowable。FlowableCreate。subscribeActual(FlowableCreate。java:72)

at io。reactivex。Flowable。subscribe(Flowable。java:12218)

at zlc。season。rxjava2demo。demo。ChapterNine。demo2(ChapterNine。java:89)

at zlc。season。rxjava2demo。MainActivity$2。onClick(MainActivity。java:36)

at android。view。View。performClick(View。java:4780)

at android。view。View$PerformClick。run(View。java:19866)

at android。os。Handler。handleCallback(Handler。java:739)

at android。os。Handler。dispatchMessage(Handler。java:95)

at android。os。Looper。loop(Looper。java:135)

at android。app。ActivityThread。main(ActivityThread。java:5254)

at java。lang。reflect。Method。invoke(Native Method)

at java。lang。reflect。Method。invoke(Method。java:372)

at com。android。internal。os。ZygoteInit$MethodAndArgsCaller。run(ZygoteInit。java:903)

at com。android。internal。os。ZygoteInit。main(ZygoteInit。java:698)

D/TAG: after emit 3, requested = 0

D/TAG: emit complete

D/TAG: after emit complete, requested = 0

到目前為止我們一直在說同步的訂閱,現在同步說完了,我們先用一張圖來總結一下同步的情況:

RxJava2.0的初學者必備教程(九)

這張圖的意思就是當上下游在同一個執行緒中的時候,在

下游

呼叫request(n)就會直接改變

上游

中的requested的值,多次呼叫便會疊加這個值,而上游每傳送一個事件之後便會去減少這個值,當這個值減少至0的時候,繼續傳送事件便會拋異常了。

我們再來說說

非同步

的情況,非同步和同步會有區別嗎?會有什麼區別呢?帶著這個疑問我們繼續來探究。

同樣的先來看一個基本的例子:

public

static

void

demo3

()

{

Flowable

create

new

FlowableOnSubscribe

<

Integer

>()

{

@Override

public

void

subscribe

FlowableEmitter

<

Integer

>

emitter

throws

Exception

{

Log

d

TAG

”current requested: “

+

emitter

requested

());

}

},

BackpressureStrategy

ERROR

subscribeOn

Schedulers

io

())

observeOn

AndroidSchedulers

mainThread

())

subscribe

new

Subscriber

<

Integer

>()

{

@Override

public

void

onSubscribe

Subscription

s

{

Log

d

TAG

”onSubscribe“

);

mSubscription

=

s

}

@Override

public

void

onNext

Integer

integer

{

Log

d

TAG

”onNext: “

+

integer

);

}

@Override

public

void

onError

Throwable

t

{

Log

w

TAG

”onError: “

t

);

}

@Override

public

void

onComplete

()

{

Log

d

TAG

”onComplete“

);

}

});

}

這次是非同步的情況,上游啥也不做,下游也啥也不做,來看看執行結果:

D/TAG: onSubscribe

D/TAG: current requested: 128

哈哈,又是128,看了我前幾篇文章的朋友肯定很熟悉這個數字啊!這個數字為什麼和我們之前所說的預設的水缸大小一樣啊,莫非?

帶著這個疑問我們繼續來研究一下:

public

static

void

demo3

()

{

Flowable

create

new

FlowableOnSubscribe

<

Integer

>()

{

@Override

public

void

subscribe

FlowableEmitter

<

Integer

>

emitter

throws

Exception

{

Log

d

TAG

”current requested: “

+

emitter

requested

());

}

},

BackpressureStrategy

ERROR

subscribeOn

Schedulers

io

())

observeOn

AndroidSchedulers

mainThread

())

subscribe

new

Subscriber

<

Integer

>()

{

@Override

public

void

onSubscribe

Subscription

s

{

Log

d

TAG

”onSubscribe“

);

mSubscription

=

s

s

request

1000

);

//我要打1000個!!

}

@Override

public

void

onNext

Integer

integer

{

Log

d

TAG

”onNext: “

+

integer

);

}

@Override

public

void

onError

Throwable

t

{

Log

w

TAG

”onError: “

t

);

}

@Override

public

void

onComplete

()

{

Log

d

TAG

”onComplete“

);

}

});

}

這次我們在下游呼叫了request(1000)告訴上游我要打1000個,按照之前我們說的,這次的執行結果應該是1000,來看看執行結果:

D/TAG: onSubscribe

D/TAG: current requested: 128

臥槽,你確定你沒貼錯程式碼?

是的,真相就是這樣,就是128,蜜汁128。。。

RxJava2.0的初學者必備教程(九)

RxJava2.0的初學者必備教程(九)

為了答疑解惑,我就直接上圖了:

RxJava2.0的初學者必備教程(九)

可以看到,當上下游工作在不同的執行緒裡時,每一個執行緒裡都有一個requested,而我們呼叫request(1000)時,實際上改變的是下游主執行緒中的requested,而上游中的requested的值是由RxJava內部呼叫request(n)去設定的,這個呼叫會在合適的時候自動觸發。

現在我們就能理解為什麼沒有呼叫request,上游中的值是128了,因為下游在

一開始就在內部呼叫了

request(128)去設定了上游中的值,因此即使下游沒有呼叫request(),上游也能傳送128個事件,這也可以解釋之前我們為什麼說Flowable中預設的水缸大小是128,其實就是這裡設定的。

剛才同步的時候我們說了,上游每傳送一個事件,requested的值便會減一,對於非同步來說同樣如此,那有人肯定有疑問了,一開始上游的requested的值是128,那這128個事件傳送完了不就不能繼續傳送了嗎?

剛剛說了,設定上游requested的值的這個內部呼叫會在

合適的時候

自動觸發,那到底什麼時候是合適的時候呢?是發完128個事件才去呼叫嗎?還是傳送了一半才去呼叫呢?

帶著這個疑問我們來看下一段程式碼:

public

static

void

request

()

{

mSubscription

request

96

);

//請求96個事件

}

public

static

void

demo4

()

{

Flowable

create

new

FlowableOnSubscribe

<

Integer

>()

{

@Override

public

void

subscribe

FlowableEmitter

<

Integer

>

emitter

throws

Exception

{

Log

d

TAG

”First requested = “

+

emitter

requested

());

boolean

flag

for

int

i

=

0

i

++)

{

flag

=

false

while

emitter

requested

()

==

0

{

if

(!

flag

{

Log

d

TAG

”Oh no! I can‘t emit value!“

);

flag

=

true

}

}

emitter

onNext

i

);

Log

d

TAG

”emit “

+

i

+

” , requested = “

+

emitter

requested

());

}

}

},

BackpressureStrategy

ERROR

subscribeOn

Schedulers

io

())

observeOn

AndroidSchedulers

mainThread

())

subscribe

new

Subscriber

<

Integer

>()

{

@Override

public

void

onSubscribe

Subscription

s

{

Log

d

TAG

”onSubscribe“

);

mSubscription

=

s

}

@Override

public

void

onNext

Integer

integer

{

Log

d

TAG

”onNext: “

+

integer

);

}

@Override

public

void

onError

Throwable

t

{

Log

w

TAG

”onError: “

t

);

}

@Override

public

void

onComplete

()

{

Log

d

TAG

”onComplete“

);

}

});

}

這次的上游稍微複雜了一點點,首先仍然是個無限迴圈發事件,但是是有條件的,只有當上遊的requested != 0的時候才會發事件,然後我們呼叫request(96)去消費96個事件(為什麼是96而不是其他的數字先不要管),來看看執行結果吧:

D/TAG: onSubscribe

D/TAG: First requested = 128

D/TAG: emit 0 , requested = 127

D/TAG: emit 1 , requested = 126

D/TAG: emit 2 , requested = 125

。。。

D/TAG: emit 124 , requested = 3

D/TAG: emit 125 , requested = 2

D/TAG: emit 126 , requested = 1

D/TAG: emit 127 , requested = 0

D/TAG: Oh no! I can’t emit value!

首先執行之後上游便會發送完128個事件,之後便不做任何事情,從列印的結果中我們也可以看出這一點。

然後我們呼叫request(96),這會讓下游去消費96個事件,來看看執行結果吧:

D/TAG: onNext: 0

D/TAG: onNext: 1

。。。

D/TAG: onNext: 92

D/TAG: onNext: 93

D/TAG: onNext: 94

D/TAG: onNext: 95

D/TAG: emit 128 , requested = 95

D/TAG: emit 129 , requested = 94

D/TAG: emit 130 , requested = 93

D/TAG: emit 131 , requested = 92

。。。

D/TAG: emit 219 , requested = 4

D/TAG: emit 220 , requested = 3

D/TAG: emit 221 , requested = 2

D/TAG: emit 222 , requested = 1

D/TAG: emit 223 , requested = 0

D/TAG: Oh no! I can‘t emit value!

可以看到,當下遊消費掉第96個事件之後,上游又開始發事件了,而且可以看到當前上游的requested的值是96(打印出來的95是已經發送了一個事件減一之後的值),最終發出了第223個事件之後又進入了等待區,而223-127 正好等於 96。

這是不是說明當下游每消費96個事件便會自動觸發內部的request()去設定上游的requested的值啊!沒錯,就是這樣,而這個新的值就是96。

朋友們可以手動試試請求95個事件,上游是不會繼續傳送事件的。

至於這個96是怎麼得出來的(肯定不是我猜的蒙的啊),感興趣的朋友可以自行閱讀原始碼尋找答案,對於初學者而言應該沒什麼必要,管它內部怎麼實現的呢對吧。

好了今天的教程就到這裡了!透過本節的學習,大家應該知道如何正確的去實現一個完整的響應式拉取了,在

某一些場景

下,可以在傳送事件前先判斷當前的requested的值是否大於0,若等於0則說明下游處理不過來了,則需要等待,例如下面這個例子。

實踐

這個例子是讀取一個文字檔案,需要一行一行讀取,然後處理並輸出,如果文字檔案很大的時候,比如幾十M的時候,全部先讀入記憶體肯定不是明智的做法,因此我們可以一邊讀取一邊處理,實現的程式碼如下:

public

static

void

main

String

[]

args

{

practice1

();

try

{

Thread

sleep

10000000

);

}

catch

InterruptedException

e

{

e

printStackTrace

();

}

}

public

static

void

practice1

()

{

Flowable

create

new

FlowableOnSubscribe

<

String

>()

{

@Override

public

void

subscribe

FlowableEmitter

<

String

>

emitter

throws

Exception

{

try

{

FileReader

reader

=

new

FileReader

”test。txt“

);

BufferedReader

br

=

new

BufferedReader

reader

);

String

str

while

((

str

=

br

readLine

())

!=

null

&&

emitter

isCancelled

())

{

while

emitter

requested

()

==

0

{

if

emitter

isCancelled

())

{

break

}

}

emitter

onNext

str

);

}

br

close

();

reader

close

();

emitter

onComplete

();

}

catch

Exception

e

{

emitter

onError

e

);

}

}

},

BackpressureStrategy

ERROR

subscribeOn

Schedulers

io

())

observeOn

Schedulers

newThread

())

subscribe

new

Subscriber

<

String

>()

{

@Override

public

void

onSubscribe

Subscription

s

{

mSubscription

=

s

s

request

1

);

}

@Override

public

void

onNext

String

string

{

System

out

println

string

);

try

{

Thread

sleep

2000

);

mSubscription

request

1

);

}

catch

InterruptedException

e

{

e

printStackTrace

();

}

}

@Override

public

void

onError

Throwable

t

{

System

out

println

t

);

}

@Override

public

void

onComplete

()

{

}

});

}

執行的結果便是:

RxJava2.0的初學者必備教程(九)

好了,本次的教程就到這裡了,謝謝大家捧場!下節見,敬請期待!

(PS: 我這麼用心的寫文章, 你們也不給個贊嗎?)

標簽: tag  requested  log  emit  emitter