您當前的位置:首頁 > 詩詞

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

作者:由 macrozheng 發表于 詩詞時間:2020-09-21

以前看過的關於RabbitMQ核心訊息模式的文章都是基於JavaAPI的,最近看了下官方文件,發現這些核心訊息模式都可以透過Spring AMQP來實現。於是總結了下RabbitMQ的實用技巧,包括RabbitMQ在Windows和Linux下的安裝、5種核心訊息模式的Spring AMQP實現,相信對於想要學習和回顧RabbitMQ的朋友都會有所幫助。

SpringBoot實戰電商專案mall(40k+star)地址:

https://

github。com/macrozheng/m

all

簡介

RabbitMQ是最受歡迎的開源訊息中介軟體之一,在全球範圍內被廣泛應用。RabbitMQ是輕量級且易於部署的,能支援多種訊息協議。RabbitMQ可以部署在分散式系統中,以滿足大規模、高可用的要求。

相關概念

我們先來了解下RabbitMQ中的相關概念,這裡以5種訊息模式中的

路由模式

為例。

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

安裝及配置

接下來我們介紹下RabbitMQ的安裝和配置,提供Windows和Linux兩種安裝方式。

Windows下的安裝

安裝Erlang,下載地址:

http://

erlang。org/download/otp

_win64_21。3。exe

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

安裝RabbitMQ,下載地址:

https://

dl。bintray。com/rabbitmq

/all/rabbitmq-server/3。7。14/rabbitmq-server-3。7。14。exe

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

安裝完成後,進入RabbitMQ安裝目錄下的sbin目錄;

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

在位址列輸入cmd並回車啟動命令列,然後輸入以下命令啟動管理功能。

rabbitmq-plugins enable rabbitmq_management

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

Linux下的安裝

下載

rabbitmq 3。7。15

的Docker映象;

docker pull rabbitmq:3。7。15

使用Docker命令啟動服務;

docker run -p 5672:5672 -p 15672:15672 ——name rabbitmq

\

-d rabbitmq:3。7。15

進入容器並開啟管理功能;

docker

exec

-it rabbitmq /bin/bash

rabbitmq-plugins

enable

rabbitmq_management

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

開啟防火牆便於外網訪問。

firewall-cmd ——zone

=

public ——add-port

=

15672/tcp ——permanent

firewall-cmd ——zone

=

public ——add-port

=

5672/tcp ——permanent

firewall-cmd ——reload

訪問及配置

訪問RabbitMQ管理頁面地址,檢視是否安裝成功(Linux下使用伺服器IP訪問即可):http://localhost:15672/

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

輸入賬號密碼並登入,這裡使用預設賬號密碼登入:guest guest

建立帳號並設定其角色為管理員:mall mall

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

建立一個新的虛擬host為:/mall

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

點選mall使用者進入使用者配置頁面;

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

給mall使用者配置該虛擬host的許可權;

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

至此,RabbitMQ的配置完成。

5種訊息模式

這5種訊息模式是構建基於RabbitMQ的訊息應用的基礎,一定要牢牢掌握它們。學過RabbitMQ的朋友應該瞭解過這些訊息模式的Java實現,這裡我們使用Spring AMQP的形式來實現它們。

簡單模式

簡單模式是最簡單的訊息模式,它包含一個生產者、一個消費者和一個佇列。生產者向佇列裡傳送訊息,消費者從佇列中獲取訊息並消費。

模式示意圖

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

Spring AMQP實現

首先需要在

pom。xml

中新增Spring AMQP的相關依賴;

<!——Spring AMQP依賴——>

org。springframework。boot

spring-boot-starter-amqp

然後修改

application。yml

,新增RabbitMQ的相關配置;

spring

rabbitmq

host

localhost

port

5672

virtual-host

/mall

username

mall

password

mall

publisher-confirms

true

#訊息傳送到交換器確認

publisher-returns

true

#訊息傳送到佇列確認

新增

簡單模式

相關Java配置,建立一個名為

simple。hello

的佇列、一個生產者和一個消費者;

/**

* Created by macro on 2020/5/19。

*/

@Configuration

public

class

SimpleRabbitConfig

{

@Bean

public

Queue

hello

()

{

return

new

Queue

“simple。hello”

);

}

@Bean

public

SimpleSender

simpleSender

(){

return

new

SimpleSender

();

}

@Bean

public

SimpleReceiver

simpleReceiver

(){

return

new

SimpleReceiver

();

}

}

生產者透過

send方法

向佇列

simple。hello

中傳送訊息;

/**

* Created by macro on 2020/5/19。

*/

public

class

SimpleSender

{

private

static

final

Logger

LOGGER

=

LoggerFactory

getLogger

SimpleSender

class

);

@Autowired

private

RabbitTemplate

template

private

static

final

String

queueName

=

“simple。hello”

public

void

send

()

{

String

message

=

“Hello World!”

this

template

convertAndSend

queueName

message

);

LOGGER

info

“ [x] Sent ‘{}’”

message

);

}

}

消費者從佇列

simple。hello

中獲取訊息;

/**

* Created by macro on 2020/5/19。

*/

@RabbitListener

queues

=

“simple。hello”

public

class

SimpleReceiver

{

private

static

final

Logger

LOGGER

=

LoggerFactory

getLogger

SimpleReceiver

class

);

@RabbitHandler

public

void

receive

String

in

{

LOGGER

info

“ [x] Received ‘{}’”

in

);

}

}

在controller中新增測試介面,呼叫該介面開始傳送訊息;

/**

* Created by macro on 2020/5/19。

*/

@Api

tags

=

“RabbitController”

description

=

“RabbitMQ功能測試”

@Controller

@RequestMapping

“/rabbit”

public

class

RabbitController

{

@Autowired

private

SimpleSender

simpleSender

@ApiOperation

“簡單模式”

@RequestMapping

value

=

“/simple”

method

=

RequestMethod

GET

@ResponseBody

public

CommonResult

simpleTest

()

{

for

int

i

=

0

i

<

10

i

++){

simpleSender

send

();

ThreadUtil

sleep

1000

);

}

return

CommonResult

success

null

);

}

}

執行後結果如下,可以發現生產者往佇列中傳送訊息,消費者從佇列中獲取訊息並消費。

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

工作模式

工作模式是指向多個互相競爭的消費者傳送訊息的模式,它包含一個生產者、兩個消費者和一個佇列。兩個消費者同時繫結到一個佇列上去,當消費者獲取訊息處理耗時任務時,空閒的消費者從佇列中獲取並消費訊息。

模式示意圖

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

Spring AMQP實現

新增

工作模式

相關Java配置,建立一個名為

work。hello

的佇列、一個生產者和兩個消費者;

/**

* Created by macro on 2020/5/19。

*/

@Configuration

public

class

WorkRabbitConfig

{

@Bean

public

Queue

workQueue

()

{

return

new

Queue

“work。hello”

);

}

@Bean

public

WorkReceiver

workReceiver1

()

{

return

new

WorkReceiver

1

);

}

@Bean

public

WorkReceiver

workReceiver2

()

{

return

new

WorkReceiver

2

);

}

@Bean

public

WorkSender

workSender

()

{

return

new

WorkSender

();

}

}

生產者透過

send方法

向佇列

work。hello

中傳送訊息,訊息中包含一定數量的

號;

/**

* Created by macro on 2020/5/19。

*/

public

class

WorkSender

{

private

static

final

Logger

LOGGER

=

LoggerFactory

getLogger

WorkSender

class

);

@Autowired

private

RabbitTemplate

template

private

static

final

String

queueName

=

“work。hello”

public

void

send

int

index

{

StringBuilder

builder

=

new

StringBuilder

“Hello”

);

int

limitIndex

=

index

%

3

+

1

for

int

i

=

0

i

<

limitIndex

i

++)

{

builder

append

‘。’

);

}

builder

append

index

+

1

);

String

message

=

builder

toString

();

template

convertAndSend

queueName

message

);

LOGGER

info

“ [x] Sent ‘{}’”

message

);

}

}

兩個消費者從佇列

work。hello

中獲取訊息,名稱分別為

instance 1

instance 2

,訊息中包含

號越多,耗時越長;

/**

* Created by macro on 2020/5/19。

*/

@RabbitListener

queues

=

“work。hello”

public

class

WorkReceiver

{

private

static

final

Logger

LOGGER

=

LoggerFactory

getLogger

WorkReceiver

class

);

private

final

int

instance

public

WorkReceiver

int

i

{

this

instance

=

i

}

@RabbitHandler

public

void

receive

String

in

{

StopWatch

watch

=

new

StopWatch

();

watch

start

();

LOGGER

info

“instance {} [x] Received ‘{}’”

this

instance

in

);

doWork

in

);

watch

stop

();

LOGGER

info

“instance {} [x] Done in {}s”

this

instance

watch

getTotalTimeSeconds

());

}

private

void

doWork

String

in

{

for

char

ch

in

toCharArray

())

{

if

ch

==

‘。’

{

ThreadUtil

sleep

1000

);

}

}

}

}

在controller中新增測試介面,呼叫該介面開始傳送訊息;

/**

* Created by macro on 2020/5/19。

*/

@Api

tags

=

“RabbitController”

description

=

“RabbitMQ功能測試”

@Controller

@RequestMapping

“/rabbit”

public

class

RabbitController

{

@Autowired

private

WorkSender

workSender

@ApiOperation

“工作模式”

@RequestMapping

value

=

“/work”

method

=

RequestMethod

GET

@ResponseBody

public

CommonResult

workTest

()

{

for

int

i

=

0

i

<

10

i

++){

workSender

send

i

);

ThreadUtil

sleep

1000

);

}

return

CommonResult

success

null

);

}

}

執行後結果如下,可以發現生產者往佇列中傳送包含不同數量

號的訊息,

instance 1

instance 2

消費者互相競爭,分別消費了一部分訊息。

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

釋出/訂閱模式

釋出/訂閱模式是指同時向多個消費者傳送訊息的模式(類似廣播的形式),它包含一個生產者、兩個消費者、兩個佇列和一個交換機。兩個消費者同時繫結到不同的佇列上去,兩個佇列繫結到交換機上去,生產者透過傳送訊息到交換機,所有消費者接收並消費訊息。

模式示意圖

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

Spring AMQP實現

新增

釋出/訂閱模式

相關Java配置,建立一個名為

exchange。fanout

的交換機、一個生產者、兩個消費者和兩個匿名佇列,將兩個匿名佇列都繫結到交換機;

/**

* Created by macro on 2020/5/19。

*/

@Configuration

public

class

FanoutRabbitConfig

{

@Bean

public

FanoutExchange

fanout

()

{

return

new

FanoutExchange

“exchange。fanout”

);

}

@Bean

public

Queue

fanoutQueue1

()

{

return

new

AnonymousQueue

();

}

@Bean

public

Queue

fanoutQueue2

()

{

return

new

AnonymousQueue

();

}

@Bean

public

Binding

fanoutBinding1

FanoutExchange

fanout

Queue

fanoutQueue1

{

return

BindingBuilder

bind

fanoutQueue1

)。

to

fanout

);

}

@Bean

public

Binding

fanoutBinding2

FanoutExchange

fanout

Queue

fanoutQueue2

{

return

BindingBuilder

bind

fanoutQueue2

)。

to

fanout

);

}

@Bean

public

FanoutReceiver

fanoutReceiver

()

{

return

new

FanoutReceiver

();

}

@Bean

public

FanoutSender

fanoutSender

()

{

return

new

FanoutSender

();

}

}

生產者透過

send方法

向交換機

exchange。fanout

中傳送訊息,訊息中包含一定數量的

號;

/**

* Created by macro on 2020/5/19。

*/

public

class

FanoutSender

{

private

static

final

Logger

LOGGER

=

LoggerFactory

getLogger

FanoutSender

class

);

@Autowired

private

RabbitTemplate

template

private

static

final

String

exchangeName

=

“exchange。fanout”

public

void

send

int

index

{

StringBuilder

builder

=

new

StringBuilder

“Hello”

);

int

limitIndex

=

index

%

3

+

1

for

int

i

=

0

i

<

limitIndex

i

++)

{

builder

append

‘。’

);

}

builder

append

index

+

1

);

String

message

=

builder

toString

();

template

convertAndSend

exchangeName

“”

message

);

LOGGER

info

“ [x] Sent ‘{}’”

message

);

}

}

消費者從繫結的匿名佇列中獲取訊息,訊息中包含

號越多,耗時越長,由於該消費者可以從兩個佇列中獲取並消費訊息,可以看做兩個消費者,名稱分別為

instance 1

instance 2

/**

* Created by macro on 2020/5/19。

*/

public

class

FanoutReceiver

{

private

static

final

Logger

LOGGER

=

LoggerFactory

getLogger

FanoutReceiver

class

);

@RabbitListener

queues

=

“#{fanoutQueue1。name}”

public

void

receive1

String

in

{

receive

in

1

);

}

@RabbitListener

queues

=

“#{fanoutQueue2。name}”

public

void

receive2

String

in

{

receive

in

2

);

}

private

void

receive

String

in

int

receiver

{

StopWatch

watch

=

new

StopWatch

();

watch

start

();

LOGGER

info

“instance {} [x] Received ‘{}’”

receiver

in

);

doWork

in

);

watch

stop

();

LOGGER

info

“instance {} [x] Done in {}s”

receiver

watch

getTotalTimeSeconds

());

}

private

void

doWork

String

in

{

for

char

ch

in

toCharArray

())

{

if

ch

==

‘。’

{

ThreadUtil

sleep

1000

);

}

}

}

}

在controller中新增測試介面,呼叫該介面開始傳送訊息;

/**

* Created by macro on 2020/5/19。

*/

@Api

tags

=

“RabbitController”

description

=

“RabbitMQ功能測試”

@Controller

@RequestMapping

“/rabbit”

public

class

RabbitController

{

@Autowired

private

FanoutSender

fanoutSender

@ApiOperation

“釋出/訂閱模式”

@RequestMapping

value

=

“/fanout”

method

=

RequestMethod

GET

@ResponseBody

public

CommonResult

fanoutTest

()

{

for

int

i

=

0

i

<

10

i

++){

fanoutSender

send

i

);

ThreadUtil

sleep

1000

);

}

return

CommonResult

success

null

);

}

}

執行後結果如下,可以發現生產者往佇列中傳送包含不同數量

號的訊息,

instance 1

instance 2

同時獲取並消費了訊息。

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

路由模式

路由模式是可以根據

路由鍵

選擇性給多個消費者傳送訊息的模式,它包含一個生產者、兩個消費者、兩個佇列和一個交換機。兩個消費者同時繫結到不同的佇列上去,兩個佇列透過

路由鍵

繫結到交換機上去,生產者傳送訊息到交換機,交換機透過

路由鍵

轉發到不同佇列,佇列繫結的消費者接收並消費訊息。

模式示意圖

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

Spring AMQP實現

新增

路由模式

相關Java配置,建立一個名為

exchange。direct

的交換機、一個生產者、兩個消費者和兩個匿名佇列,佇列透過

路由鍵

都繫結到交換機,

佇列1

的路由鍵為

orange

black

佇列2

的路由鍵為

green

black

/**

* Created by macro on 2020/5/19。

*/

@Configuration

public

class

DirectRabbitConfig

{

@Bean

public

DirectExchange

direct

()

{

return

new

DirectExchange

“exchange。direct”

);

}

@Bean

public

Queue

directQueue1

()

{

return

new

AnonymousQueue

();

}

@Bean

public

Queue

directQueue2

()

{

return

new

AnonymousQueue

();

}

@Bean

public

Binding

directBinding1a

DirectExchange

direct

Queue

directQueue1

{

return

BindingBuilder

bind

directQueue1

)。

to

direct

)。

with

“orange”

);

}

@Bean

public

Binding

directBinding1b

DirectExchange

direct

Queue

directQueue1

{

return

BindingBuilder

bind

directQueue1

)。

to

direct

)。

with

“black”

);

}

@Bean

public

Binding

directBinding2a

DirectExchange

direct

Queue

directQueue2

{

return

BindingBuilder

bind

directQueue2

)。

to

direct

)。

with

“green”

);

}

@Bean

public

Binding

directBinding2b

DirectExchange

direct

Queue

directQueue2

{

return

BindingBuilder

bind

directQueue2

)。

to

direct

)。

with

“black”

);

}

@Bean

public

DirectReceiver

receiver

()

{

return

new

DirectReceiver

();

}

@Bean

public

DirectSender

directSender

()

{

return

new

DirectSender

();

}

}

生產者透過

send方法

向交換機

exchange。direct

中傳送訊息,傳送時使用不同的

路由鍵

,根據

路由鍵

會被轉發到不同的佇列;

/**

* Created by macro on 2020/5/19。

*/

public

class

DirectSender

{

@Autowired

private

RabbitTemplate

template

private

static

final

String

exchangeName

=

“exchange。direct”

private

final

String

[]

keys

=

{

“orange”

“black”

“green”

};

private

static

final

Logger

LOGGER

=

LoggerFactory

getLogger

DirectSender

class

);

public

void

send

int

index

{

StringBuilder

builder

=

new

StringBuilder

“Hello to ”

);

int

limitIndex

=

index

%

3

String

key

=

keys

limitIndex

];

builder

append

key

)。

append

‘ ’

);

builder

append

index

+

1

);

String

message

=

builder

toString

();

template

convertAndSend

exchangeName

key

message

);

LOGGER

info

“ [x] Sent ‘{}’”

message

);

}

}

消費者從自己繫結的匿名佇列中獲取訊息,由於該消費者可以從兩個佇列中獲取並消費訊息,可以看做兩個消費者,名稱分別為

instance 1

instance 2

/**

* Created by macro on 2020/5/19。

*/

public

class

DirectReceiver

{

private

static

final

Logger

LOGGER

=

LoggerFactory

getLogger

DirectReceiver

class

);

@RabbitListener

queues

=

“#{directQueue1。name}”

public

void

receive1

String

in

){

receive

in

1

);

}

@RabbitListener

queues

=

“#{directQueue2。name}”

public

void

receive2

String

in

){

receive

in

2

);

}

private

void

receive

String

in

int

receiver

){

StopWatch

watch

=

new

StopWatch

();

watch

start

();

LOGGER

info

“instance {} [x] Received ‘{}’”

receiver

in

);

doWork

in

);

watch

stop

();

LOGGER

info

“instance {} [x] Done in {}s”

receiver

watch

getTotalTimeSeconds

());

}

private

void

doWork

String

in

){

for

char

ch

in

toCharArray

())

{

if

ch

==

‘。’

{

ThreadUtil

sleep

1000

);

}

}

}

}

在controller中新增測試介面,呼叫該介面開始傳送訊息;

/**

* Created by macro on 2020/5/19。

*/

@Api

tags

=

“RabbitController”

description

=

“RabbitMQ功能測試”

@Controller

@RequestMapping

“/rabbit”

public

class

RabbitController

{

@Autowired

private

DirectSender

directSender

@ApiOperation

“路由模式”

@RequestMapping

value

=

“/direct”

method

=

RequestMethod

GET

@ResponseBody

public

CommonResult

directTest

()

{

for

int

i

=

0

i

<

10

i

++){

directSender

send

i

);

ThreadUtil

sleep

1000

);

}

return

CommonResult

success

null

);

}

}

執行後結果如下,可以發現生產者往佇列中傳送包含不同

路由鍵

的訊息,

instance 1

獲取到了

orange

black

訊息,

instance 2

獲取到了

green

black

訊息。

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

萬用字元模式

萬用字元模式是可以根據

路由鍵匹配規則

選擇性給多個消費者傳送訊息的模式,它包含一個生產者、兩個消費者、兩個佇列和一個交換機。兩個消費者同時繫結到不同的佇列上去,兩個佇列透過

路由鍵匹配規則

繫結到交換機上去,生產者傳送訊息到交換機,交換機透過

路由鍵匹配規則

轉發到不同佇列,佇列繫結的消費者接收並消費訊息。

特殊匹配符號

*

:只能匹配一個單詞;

#

:可以匹配零個或多個單詞。

模式示意圖

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

Spring AMQP實現

新增

萬用字元模式

相關Java配置,建立一個名為

exchange。topic

的交換機、一個生產者、兩個消費者和兩個匿名佇列,匹配

*。orange。*

*。*。rabbit

傳送到

佇列1

,匹配

lazy。#

傳送到

佇列2

/**

* Created by macro on 2020/5/19。

*/

@Configuration

public

class

TopicRabbitConfig

{

@Bean

public

TopicExchange

topic

()

{

return

new

TopicExchange

“exchange。topic”

);

}

@Bean

public

Queue

topicQueue1

()

{

return

new

AnonymousQueue

();

}

@Bean

public

Queue

topicQueue2

()

{

return

new

AnonymousQueue

();

}

@Bean

public

Binding

topicBinding1a

TopicExchange

topic

Queue

topicQueue1

{

return

BindingBuilder

bind

topicQueue1

)。

to

topic

)。

with

“*。orange。*”

);

}

@Bean

public

Binding

topicBinding1b

TopicExchange

topic

Queue

topicQueue1

{

return

BindingBuilder

bind

topicQueue1

)。

to

topic

)。

with

“*。*。rabbit”

);

}

@Bean

public

Binding

topicBinding2a

TopicExchange

topic

Queue

topicQueue2

{

return

BindingBuilder

bind

topicQueue2

)。

to

topic

)。

with

“lazy。#”

);

}

@Bean

public

TopicReceiver

topicReceiver

()

{

return

new

TopicReceiver

();

}

@Bean

public

TopicSender

topicSender

()

{

return

new

TopicSender

();

}

}

生產者透過

send方法

向交換機

exchange。topic

中傳送訊息,訊息中包含不同的

路由鍵

/**

* Created by macro on 2020/5/19。

*/

public

class

TopicSender

{

@Autowired

private

RabbitTemplate

template

private

static

final

String

exchangeName

=

“exchange。topic”

private

static

final

Logger

LOGGER

=

LoggerFactory

getLogger

TopicSender

class

);

private

final

String

[]

keys

=

{

“quick。orange。rabbit”

“lazy。orange。elephant”

“quick。orange。fox”

“lazy。brown。fox”

“lazy。pink。rabbit”

“quick。brown。fox”

};

public

void

send

int

index

{

StringBuilder

builder

=

new

StringBuilder

“Hello to ”

);

int

limitIndex

=

index

%

keys

length

String

key

=

keys

limitIndex

];

builder

append

key

)。

append

‘ ’

);

builder

append

index

+

1

);

String

message

=

builder

toString

();

template

convertAndSend

exchangeName

key

message

);

LOGGER

info

“ [x] Sent ‘{}’”

message

);

System

out

println

“ [x] Sent ‘”

+

message

+

“’”

);

}

}

消費者從自己繫結的匿名佇列中獲取訊息,由於該消費者可以從兩個佇列中獲取並消費訊息,可以看做兩個消費者,名稱分別為

instance 1

instance 2

/**

* Created by macro on 2020/5/19。

*/

public

class

TopicReceiver

{

private

static

final

Logger

LOGGER

=

LoggerFactory

getLogger

TopicReceiver

class

);

@RabbitListener

queues

=

“#{topicQueue1。name}”

public

void

receive1

String

in

){

receive

in

1

);

}

@RabbitListener

queues

=

“#{topicQueue2。name}”

public

void

receive2

String

in

){

receive

in

2

);

}

public

void

receive

String

in

int

receiver

){

StopWatch

watch

=

new

StopWatch

();

watch

start

();

LOGGER

info

“instance {} [x] Received ‘{}’”

receiver

in

);

doWork

in

);

watch

stop

();

LOGGER

info

“instance {} [x] Done in {}s”

receiver

watch

getTotalTimeSeconds

());

}

private

void

doWork

String

in

){

for

char

ch

in

toCharArray

())

{

if

ch

==

‘。’

{

ThreadUtil

sleep

1000

);

}

}

}

}

在controller中新增測試介面,呼叫該介面開始傳送訊息;

/**

* Created by macro on 2020/5/19。

*/

@Api

tags

=

“RabbitController”

description

=

“RabbitMQ功能測試”

@Controller

@RequestMapping

“/rabbit”

public

class

RabbitController

{

@Autowired

private

TopicSender

topicSender

@ApiOperation

“萬用字元模式”

@RequestMapping

value

=

“/topic”

method

=

RequestMethod

GET

@ResponseBody

public

CommonResult

topicTest

()

{

for

int

i

=

0

i

<

10

i

++){

topicSender

send

i

);

ThreadUtil

sleep

1000

);

}

return

CommonResult

success

null

);

}

}

執行後結果如下,可以發現生產者往佇列中傳送包含不同

路由鍵

的訊息,

instance 1

instance 2

分別獲取到了匹配的訊息。

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!

參考資料

RabbitMQ Tutorials:

https://www。

rabbitmq。com/getstarted

。html

專案原始碼地址

https://

github。com/macrozheng/m

all-learning/tree/master/mall-tiny-rabbit

本文 GitHub

https://

github。com/macrozheng/m

all-learning

已經收錄,歡迎大家Star。

標簽: 佇列  instance  訊息  rabbitmq  created