連RabbitMQ的5種核心訊息模式都不懂,也敢說自己會用訊息佇列!
以前看過的關於RabbitMQ核心訊息模式的文章都是基於JavaAPI的,最近看了下官方文件,發現這些核心訊息模式都可以透過Spring AMQP來實現。於是總結了下RabbitMQ的實用技巧,包括RabbitMQ在Windows和Linux下的安裝、5種核心訊息模式的Spring AMQP實現,相信對於想要學習和回顧RabbitMQ的朋友都會有所幫助。
SpringBoot實戰電商專案mall(40k+star)地址:
https://
github。com/macrozheng/m
all
簡介
RabbitMQ是最受歡迎的開源訊息中介軟體之一,在全球範圍內被廣泛應用。RabbitMQ是輕量級且易於部署的,能支援多種訊息協議。RabbitMQ可以部署在分散式系統中,以滿足大規模、高可用的要求。
相關概念
我們先來了解下RabbitMQ中的相關概念,這裡以5種訊息模式中的
路由模式
為例。
安裝及配置
接下來我們介紹下RabbitMQ的安裝和配置,提供Windows和Linux兩種安裝方式。
Windows下的安裝
安裝Erlang,下載地址:
http://
erlang。org/download/otp
_win64_21。3。exe
安裝RabbitMQ,下載地址:
https://
dl。bintray。com/rabbitmq
/all/rabbitmq-server/3。7。14/rabbitmq-server-3。7。14。exe
安裝完成後,進入RabbitMQ安裝目錄下的sbin目錄;
在位址列輸入cmd並回車啟動命令列,然後輸入以下命令啟動管理功能。
rabbitmq-plugins enable rabbitmq_management
Linux下的安裝
下載
rabbitmq 3。7。15
的Docker映象;
docker pull rabbitmq:3。7。15
使用Docker命令啟動服務;
docker run -p 5672:5672 -p 15672:15672 ——name rabbitmq
\
-d rabbitmq:3。7。15
進入容器並開啟管理功能;
docker
exec
-it rabbitmq /bin/bash
rabbitmq-plugins
enable
rabbitmq_management
開啟防火牆便於外網訪問。
firewall-cmd ——zone
=
public ——add-port
=
15672/tcp ——permanent
firewall-cmd ——zone
=
public ——add-port
=
5672/tcp ——permanent
firewall-cmd ——reload
訪問及配置
訪問RabbitMQ管理頁面地址,檢視是否安裝成功(Linux下使用伺服器IP訪問即可):http://localhost:15672/
輸入賬號密碼並登入,這裡使用預設賬號密碼登入:guest guest
建立帳號並設定其角色為管理員:mall mall
建立一個新的虛擬host為:/mall
點選mall使用者進入使用者配置頁面;
給mall使用者配置該虛擬host的許可權;
至此,RabbitMQ的配置完成。
5種訊息模式
這5種訊息模式是構建基於RabbitMQ的訊息應用的基礎,一定要牢牢掌握它們。學過RabbitMQ的朋友應該瞭解過這些訊息模式的Java實現,這裡我們使用Spring AMQP的形式來實現它們。
簡單模式
簡單模式是最簡單的訊息模式,它包含一個生產者、一個消費者和一個佇列。生產者向佇列裡傳送訊息,消費者從佇列中獲取訊息並消費。
模式示意圖
Spring AMQP實現
首先需要在
pom。xml
中新增Spring AMQP的相關依賴;
<!——Spring AMQP依賴——>
org。springframework。boot
spring-boot-starter-amqp
然後修改
application。yml
,新增RabbitMQ的相關配置;
spring
:
rabbitmq
:
host
:
localhost
port
:
5672
virtual-host
:
/mall
username
:
mall
password
:
mall
publisher-confirms
:
true
#訊息傳送到交換器確認
publisher-returns
:
true
#訊息傳送到佇列確認
新增
簡單模式
相關Java配置,建立一個名為
simple。hello
的佇列、一個生產者和一個消費者;
/**
* Created by macro on 2020/5/19。
*/
@Configuration
public
class
SimpleRabbitConfig
{
@Bean
public
Queue
hello
()
{
return
new
Queue
(
“simple。hello”
);
}
@Bean
public
SimpleSender
simpleSender
(){
return
new
SimpleSender
();
}
@Bean
public
SimpleReceiver
simpleReceiver
(){
return
new
SimpleReceiver
();
}
}
生產者透過
send方法
向佇列
simple。hello
中傳送訊息;
/**
* Created by macro on 2020/5/19。
*/
public
class
SimpleSender
{
private
static
final
Logger
LOGGER
=
LoggerFactory
。
getLogger
(
SimpleSender
。
class
);
@Autowired
private
RabbitTemplate
template
;
private
static
final
String
queueName
=
“simple。hello”
;
public
void
send
()
{
String
message
=
“Hello World!”
;
this
。
template
。
convertAndSend
(
queueName
,
message
);
LOGGER
。
info
(
“ [x] Sent ‘{}’”
,
message
);
}
}
消費者從佇列
simple。hello
中獲取訊息;
/**
* Created by macro on 2020/5/19。
*/
@RabbitListener
(
queues
=
“simple。hello”
)
public
class
SimpleReceiver
{
private
static
final
Logger
LOGGER
=
LoggerFactory
。
getLogger
(
SimpleReceiver
。
class
);
@RabbitHandler
public
void
receive
(
String
in
)
{
LOGGER
。
info
(
“ [x] Received ‘{}’”
,
in
);
}
}
在controller中新增測試介面,呼叫該介面開始傳送訊息;
/**
* Created by macro on 2020/5/19。
*/
@Api
(
tags
=
“RabbitController”
,
description
=
“RabbitMQ功能測試”
)
@Controller
@RequestMapping
(
“/rabbit”
)
public
class
RabbitController
{
@Autowired
private
SimpleSender
simpleSender
;
@ApiOperation
(
“簡單模式”
)
@RequestMapping
(
value
=
“/simple”
,
method
=
RequestMethod
。
GET
)
@ResponseBody
public
CommonResult
simpleTest
()
{
for
(
int
i
=
0
;
i
<
10
;
i
++){
simpleSender
。
send
();
ThreadUtil
。
sleep
(
1000
);
}
return
CommonResult
。
success
(
null
);
}
}
執行後結果如下,可以發現生產者往佇列中傳送訊息,消費者從佇列中獲取訊息並消費。
工作模式
工作模式是指向多個互相競爭的消費者傳送訊息的模式,它包含一個生產者、兩個消費者和一個佇列。兩個消費者同時繫結到一個佇列上去,當消費者獲取訊息處理耗時任務時,空閒的消費者從佇列中獲取並消費訊息。
模式示意圖
Spring AMQP實現
新增
工作模式
相關Java配置,建立一個名為
work。hello
的佇列、一個生產者和兩個消費者;
/**
* Created by macro on 2020/5/19。
*/
@Configuration
public
class
WorkRabbitConfig
{
@Bean
public
Queue
workQueue
()
{
return
new
Queue
(
“work。hello”
);
}
@Bean
public
WorkReceiver
workReceiver1
()
{
return
new
WorkReceiver
(
1
);
}
@Bean
public
WorkReceiver
workReceiver2
()
{
return
new
WorkReceiver
(
2
);
}
@Bean
public
WorkSender
workSender
()
{
return
new
WorkSender
();
}
}
生產者透過
send方法
向佇列
work。hello
中傳送訊息,訊息中包含一定數量的
。
號;
/**
* Created by macro on 2020/5/19。
*/
public
class
WorkSender
{
private
static
final
Logger
LOGGER
=
LoggerFactory
。
getLogger
(
WorkSender
。
class
);
@Autowired
private
RabbitTemplate
template
;
private
static
final
String
queueName
=
“work。hello”
;
public
void
send
(
int
index
)
{
StringBuilder
builder
=
new
StringBuilder
(
“Hello”
);
int
limitIndex
=
index
%
3
+
1
;
for
(
int
i
=
0
;
i
<
limitIndex
;
i
++)
{
builder
。
append
(
‘。’
);
}
builder
。
append
(
index
+
1
);
String
message
=
builder
。
toString
();
template
。
convertAndSend
(
queueName
,
message
);
LOGGER
。
info
(
“ [x] Sent ‘{}’”
,
message
);
}
}
兩個消費者從佇列
work。hello
中獲取訊息,名稱分別為
instance 1
和
instance 2
,訊息中包含
。
號越多,耗時越長;
/**
* Created by macro on 2020/5/19。
*/
@RabbitListener
(
queues
=
“work。hello”
)
public
class
WorkReceiver
{
private
static
final
Logger
LOGGER
=
LoggerFactory
。
getLogger
(
WorkReceiver
。
class
);
private
final
int
instance
;
public
WorkReceiver
(
int
i
)
{
this
。
instance
=
i
;
}
@RabbitHandler
public
void
receive
(
String
in
)
{
StopWatch
watch
=
new
StopWatch
();
watch
。
start
();
LOGGER
。
info
(
“instance {} [x] Received ‘{}’”
,
this
。
instance
,
in
);
doWork
(
in
);
watch
。
stop
();
LOGGER
。
info
(
“instance {} [x] Done in {}s”
,
this
。
instance
,
watch
。
getTotalTimeSeconds
());
}
private
void
doWork
(
String
in
)
{
for
(
char
ch
:
in
。
toCharArray
())
{
if
(
ch
==
‘。’
)
{
ThreadUtil
。
sleep
(
1000
);
}
}
}
}
在controller中新增測試介面,呼叫該介面開始傳送訊息;
/**
* Created by macro on 2020/5/19。
*/
@Api
(
tags
=
“RabbitController”
,
description
=
“RabbitMQ功能測試”
)
@Controller
@RequestMapping
(
“/rabbit”
)
public
class
RabbitController
{
@Autowired
private
WorkSender
workSender
;
@ApiOperation
(
“工作模式”
)
@RequestMapping
(
value
=
“/work”
,
method
=
RequestMethod
。
GET
)
@ResponseBody
public
CommonResult
workTest
()
{
for
(
int
i
=
0
;
i
<
10
;
i
++){
workSender
。
send
(
i
);
ThreadUtil
。
sleep
(
1000
);
}
return
CommonResult
。
success
(
null
);
}
}
執行後結果如下,可以發現生產者往佇列中傳送包含不同數量
。
號的訊息,
instance 1
和
instance 2
消費者互相競爭,分別消費了一部分訊息。
釋出/訂閱模式
釋出/訂閱模式是指同時向多個消費者傳送訊息的模式(類似廣播的形式),它包含一個生產者、兩個消費者、兩個佇列和一個交換機。兩個消費者同時繫結到不同的佇列上去,兩個佇列繫結到交換機上去,生產者透過傳送訊息到交換機,所有消費者接收並消費訊息。
模式示意圖
Spring AMQP實現
新增
釋出/訂閱模式
相關Java配置,建立一個名為
exchange。fanout
的交換機、一個生產者、兩個消費者和兩個匿名佇列,將兩個匿名佇列都繫結到交換機;
/**
* Created by macro on 2020/5/19。
*/
@Configuration
public
class
FanoutRabbitConfig
{
@Bean
public
FanoutExchange
fanout
()
{
return
new
FanoutExchange
(
“exchange。fanout”
);
}
@Bean
public
Queue
fanoutQueue1
()
{
return
new
AnonymousQueue
();
}
@Bean
public
Queue
fanoutQueue2
()
{
return
new
AnonymousQueue
();
}
@Bean
public
Binding
fanoutBinding1
(
FanoutExchange
fanout
,
Queue
fanoutQueue1
)
{
return
BindingBuilder
。
bind
(
fanoutQueue1
)。
to
(
fanout
);
}
@Bean
public
Binding
fanoutBinding2
(
FanoutExchange
fanout
,
Queue
fanoutQueue2
)
{
return
BindingBuilder
。
bind
(
fanoutQueue2
)。
to
(
fanout
);
}
@Bean
public
FanoutReceiver
fanoutReceiver
()
{
return
new
FanoutReceiver
();
}
@Bean
public
FanoutSender
fanoutSender
()
{
return
new
FanoutSender
();
}
}
生產者透過
send方法
向交換機
exchange。fanout
中傳送訊息,訊息中包含一定數量的
。
號;
/**
* Created by macro on 2020/5/19。
*/
public
class
FanoutSender
{
private
static
final
Logger
LOGGER
=
LoggerFactory
。
getLogger
(
FanoutSender
。
class
);
@Autowired
private
RabbitTemplate
template
;
private
static
final
String
exchangeName
=
“exchange。fanout”
;
public
void
send
(
int
index
)
{
StringBuilder
builder
=
new
StringBuilder
(
“Hello”
);
int
limitIndex
=
index
%
3
+
1
;
for
(
int
i
=
0
;
i
<
limitIndex
;
i
++)
{
builder
。
append
(
‘。’
);
}
builder
。
append
(
index
+
1
);
String
message
=
builder
。
toString
();
template
。
convertAndSend
(
exchangeName
,
“”
,
message
);
LOGGER
。
info
(
“ [x] Sent ‘{}’”
,
message
);
}
}
消費者從繫結的匿名佇列中獲取訊息,訊息中包含
。
號越多,耗時越長,由於該消費者可以從兩個佇列中獲取並消費訊息,可以看做兩個消費者,名稱分別為
instance 1
和
instance 2
;
/**
* Created by macro on 2020/5/19。
*/
public
class
FanoutReceiver
{
private
static
final
Logger
LOGGER
=
LoggerFactory
。
getLogger
(
FanoutReceiver
。
class
);
@RabbitListener
(
queues
=
“#{fanoutQueue1。name}”
)
public
void
receive1
(
String
in
)
{
receive
(
in
,
1
);
}
@RabbitListener
(
queues
=
“#{fanoutQueue2。name}”
)
public
void
receive2
(
String
in
)
{
receive
(
in
,
2
);
}
private
void
receive
(
String
in
,
int
receiver
)
{
StopWatch
watch
=
new
StopWatch
();
watch
。
start
();
LOGGER
。
info
(
“instance {} [x] Received ‘{}’”
,
receiver
,
in
);
doWork
(
in
);
watch
。
stop
();
LOGGER
。
info
(
“instance {} [x] Done in {}s”
,
receiver
,
watch
。
getTotalTimeSeconds
());
}
private
void
doWork
(
String
in
)
{
for
(
char
ch
:
in
。
toCharArray
())
{
if
(
ch
==
‘。’
)
{
ThreadUtil
。
sleep
(
1000
);
}
}
}
}
在controller中新增測試介面,呼叫該介面開始傳送訊息;
/**
* Created by macro on 2020/5/19。
*/
@Api
(
tags
=
“RabbitController”
,
description
=
“RabbitMQ功能測試”
)
@Controller
@RequestMapping
(
“/rabbit”
)
public
class
RabbitController
{
@Autowired
private
FanoutSender
fanoutSender
;
@ApiOperation
(
“釋出/訂閱模式”
)
@RequestMapping
(
value
=
“/fanout”
,
method
=
RequestMethod
。
GET
)
@ResponseBody
public
CommonResult
fanoutTest
()
{
for
(
int
i
=
0
;
i
<
10
;
i
++){
fanoutSender
。
send
(
i
);
ThreadUtil
。
sleep
(
1000
);
}
return
CommonResult
。
success
(
null
);
}
}
執行後結果如下,可以發現生產者往佇列中傳送包含不同數量
。
號的訊息,
instance 1
和
instance 2
同時獲取並消費了訊息。
路由模式
路由模式是可以根據
路由鍵
選擇性給多個消費者傳送訊息的模式,它包含一個生產者、兩個消費者、兩個佇列和一個交換機。兩個消費者同時繫結到不同的佇列上去,兩個佇列透過
路由鍵
繫結到交換機上去,生產者傳送訊息到交換機,交換機透過
路由鍵
轉發到不同佇列,佇列繫結的消費者接收並消費訊息。
模式示意圖
Spring AMQP實現
新增
路由模式
相關Java配置,建立一個名為
exchange。direct
的交換機、一個生產者、兩個消費者和兩個匿名佇列,佇列透過
路由鍵
都繫結到交換機,
佇列1
的路由鍵為
orange
和
black
,
佇列2
的路由鍵為
green
和
black
;
/**
* Created by macro on 2020/5/19。
*/
@Configuration
public
class
DirectRabbitConfig
{
@Bean
public
DirectExchange
direct
()
{
return
new
DirectExchange
(
“exchange。direct”
);
}
@Bean
public
Queue
directQueue1
()
{
return
new
AnonymousQueue
();
}
@Bean
public
Queue
directQueue2
()
{
return
new
AnonymousQueue
();
}
@Bean
public
Binding
directBinding1a
(
DirectExchange
direct
,
Queue
directQueue1
)
{
return
BindingBuilder
。
bind
(
directQueue1
)。
to
(
direct
)。
with
(
“orange”
);
}
@Bean
public
Binding
directBinding1b
(
DirectExchange
direct
,
Queue
directQueue1
)
{
return
BindingBuilder
。
bind
(
directQueue1
)。
to
(
direct
)。
with
(
“black”
);
}
@Bean
public
Binding
directBinding2a
(
DirectExchange
direct
,
Queue
directQueue2
)
{
return
BindingBuilder
。
bind
(
directQueue2
)。
to
(
direct
)。
with
(
“green”
);
}
@Bean
public
Binding
directBinding2b
(
DirectExchange
direct
,
Queue
directQueue2
)
{
return
BindingBuilder
。
bind
(
directQueue2
)。
to
(
direct
)。
with
(
“black”
);
}
@Bean
public
DirectReceiver
receiver
()
{
return
new
DirectReceiver
();
}
@Bean
public
DirectSender
directSender
()
{
return
new
DirectSender
();
}
}
生產者透過
send方法
向交換機
exchange。direct
中傳送訊息,傳送時使用不同的
路由鍵
,根據
路由鍵
會被轉發到不同的佇列;
/**
* Created by macro on 2020/5/19。
*/
public
class
DirectSender
{
@Autowired
private
RabbitTemplate
template
;
private
static
final
String
exchangeName
=
“exchange。direct”
;
private
final
String
[]
keys
=
{
“orange”
,
“black”
,
“green”
};
private
static
final
Logger
LOGGER
=
LoggerFactory
。
getLogger
(
DirectSender
。
class
);
public
void
send
(
int
index
)
{
StringBuilder
builder
=
new
StringBuilder
(
“Hello to ”
);
int
limitIndex
=
index
%
3
;
String
key
=
keys
[
limitIndex
];
builder
。
append
(
key
)。
append
(
‘ ’
);
builder
。
append
(
index
+
1
);
String
message
=
builder
。
toString
();
template
。
convertAndSend
(
exchangeName
,
key
,
message
);
LOGGER
。
info
(
“ [x] Sent ‘{}’”
,
message
);
}
}
消費者從自己繫結的匿名佇列中獲取訊息,由於該消費者可以從兩個佇列中獲取並消費訊息,可以看做兩個消費者,名稱分別為
instance 1
和
instance 2
;
/**
* Created by macro on 2020/5/19。
*/
public
class
DirectReceiver
{
private
static
final
Logger
LOGGER
=
LoggerFactory
。
getLogger
(
DirectReceiver
。
class
);
@RabbitListener
(
queues
=
“#{directQueue1。name}”
)
public
void
receive1
(
String
in
){
receive
(
in
,
1
);
}
@RabbitListener
(
queues
=
“#{directQueue2。name}”
)
public
void
receive2
(
String
in
){
receive
(
in
,
2
);
}
private
void
receive
(
String
in
,
int
receiver
){
StopWatch
watch
=
new
StopWatch
();
watch
。
start
();
LOGGER
。
info
(
“instance {} [x] Received ‘{}’”
,
receiver
,
in
);
doWork
(
in
);
watch
。
stop
();
LOGGER
。
info
(
“instance {} [x] Done in {}s”
,
receiver
,
watch
。
getTotalTimeSeconds
());
}
private
void
doWork
(
String
in
){
for
(
char
ch
:
in
。
toCharArray
())
{
if
(
ch
==
‘。’
)
{
ThreadUtil
。
sleep
(
1000
);
}
}
}
}
在controller中新增測試介面,呼叫該介面開始傳送訊息;
/**
* Created by macro on 2020/5/19。
*/
@Api
(
tags
=
“RabbitController”
,
description
=
“RabbitMQ功能測試”
)
@Controller
@RequestMapping
(
“/rabbit”
)
public
class
RabbitController
{
@Autowired
private
DirectSender
directSender
;
@ApiOperation
(
“路由模式”
)
@RequestMapping
(
value
=
“/direct”
,
method
=
RequestMethod
。
GET
)
@ResponseBody
public
CommonResult
directTest
()
{
for
(
int
i
=
0
;
i
<
10
;
i
++){
directSender
。
send
(
i
);
ThreadUtil
。
sleep
(
1000
);
}
return
CommonResult
。
success
(
null
);
}
}
執行後結果如下,可以發現生產者往佇列中傳送包含不同
路由鍵
的訊息,
instance 1
獲取到了
orange
和
black
訊息,
instance 2
獲取到了
green
和
black
訊息。
萬用字元模式
萬用字元模式是可以根據
路由鍵匹配規則
選擇性給多個消費者傳送訊息的模式,它包含一個生產者、兩個消費者、兩個佇列和一個交換機。兩個消費者同時繫結到不同的佇列上去,兩個佇列透過
路由鍵匹配規則
繫結到交換機上去,生產者傳送訊息到交換機,交換機透過
路由鍵匹配規則
轉發到不同佇列,佇列繫結的消費者接收並消費訊息。
特殊匹配符號
*
:只能匹配一個單詞;
#
:可以匹配零個或多個單詞。
模式示意圖
Spring AMQP實現
新增
萬用字元模式
相關Java配置,建立一個名為
exchange。topic
的交換機、一個生產者、兩個消費者和兩個匿名佇列,匹配
*。orange。*
和
*。*。rabbit
傳送到
佇列1
,匹配
lazy。#
傳送到
佇列2
;
/**
* Created by macro on 2020/5/19。
*/
@Configuration
public
class
TopicRabbitConfig
{
@Bean
public
TopicExchange
topic
()
{
return
new
TopicExchange
(
“exchange。topic”
);
}
@Bean
public
Queue
topicQueue1
()
{
return
new
AnonymousQueue
();
}
@Bean
public
Queue
topicQueue2
()
{
return
new
AnonymousQueue
();
}
@Bean
public
Binding
topicBinding1a
(
TopicExchange
topic
,
Queue
topicQueue1
)
{
return
BindingBuilder
。
bind
(
topicQueue1
)。
to
(
topic
)。
with
(
“*。orange。*”
);
}
@Bean
public
Binding
topicBinding1b
(
TopicExchange
topic
,
Queue
topicQueue1
)
{
return
BindingBuilder
。
bind
(
topicQueue1
)。
to
(
topic
)。
with
(
“*。*。rabbit”
);
}
@Bean
public
Binding
topicBinding2a
(
TopicExchange
topic
,
Queue
topicQueue2
)
{
return
BindingBuilder
。
bind
(
topicQueue2
)。
to
(
topic
)。
with
(
“lazy。#”
);
}
@Bean
public
TopicReceiver
topicReceiver
()
{
return
new
TopicReceiver
();
}
@Bean
public
TopicSender
topicSender
()
{
return
new
TopicSender
();
}
}
生產者透過
send方法
向交換機
exchange。topic
中傳送訊息,訊息中包含不同的
路由鍵
;
/**
* Created by macro on 2020/5/19。
*/
public
class
TopicSender
{
@Autowired
private
RabbitTemplate
template
;
private
static
final
String
exchangeName
=
“exchange。topic”
;
private
static
final
Logger
LOGGER
=
LoggerFactory
。
getLogger
(
TopicSender
。
class
);
private
final
String
[]
keys
=
{
“quick。orange。rabbit”
,
“lazy。orange。elephant”
,
“quick。orange。fox”
,
“lazy。brown。fox”
,
“lazy。pink。rabbit”
,
“quick。brown。fox”
};
public
void
send
(
int
index
)
{
StringBuilder
builder
=
new
StringBuilder
(
“Hello to ”
);
int
limitIndex
=
index
%
keys
。
length
;
String
key
=
keys
[
limitIndex
];
builder
。
append
(
key
)。
append
(
‘ ’
);
builder
。
append
(
index
+
1
);
String
message
=
builder
。
toString
();
template
。
convertAndSend
(
exchangeName
,
key
,
message
);
LOGGER
。
info
(
“ [x] Sent ‘{}’”
,
message
);
System
。
out
。
println
(
“ [x] Sent ‘”
+
message
+
“’”
);
}
}
消費者從自己繫結的匿名佇列中獲取訊息,由於該消費者可以從兩個佇列中獲取並消費訊息,可以看做兩個消費者,名稱分別為
instance 1
和
instance 2
;
/**
* Created by macro on 2020/5/19。
*/
public
class
TopicReceiver
{
private
static
final
Logger
LOGGER
=
LoggerFactory
。
getLogger
(
TopicReceiver
。
class
);
@RabbitListener
(
queues
=
“#{topicQueue1。name}”
)
public
void
receive1
(
String
in
){
receive
(
in
,
1
);
}
@RabbitListener
(
queues
=
“#{topicQueue2。name}”
)
public
void
receive2
(
String
in
){
receive
(
in
,
2
);
}
public
void
receive
(
String
in
,
int
receiver
){
StopWatch
watch
=
new
StopWatch
();
watch
。
start
();
LOGGER
。
info
(
“instance {} [x] Received ‘{}’”
,
receiver
,
in
);
doWork
(
in
);
watch
。
stop
();
LOGGER
。
info
(
“instance {} [x] Done in {}s”
,
receiver
,
watch
。
getTotalTimeSeconds
());
}
private
void
doWork
(
String
in
){
for
(
char
ch
:
in
。
toCharArray
())
{
if
(
ch
==
‘。’
)
{
ThreadUtil
。
sleep
(
1000
);
}
}
}
}
在controller中新增測試介面,呼叫該介面開始傳送訊息;
/**
* Created by macro on 2020/5/19。
*/
@Api
(
tags
=
“RabbitController”
,
description
=
“RabbitMQ功能測試”
)
@Controller
@RequestMapping
(
“/rabbit”
)
public
class
RabbitController
{
@Autowired
private
TopicSender
topicSender
;
@ApiOperation
(
“萬用字元模式”
)
@RequestMapping
(
value
=
“/topic”
,
method
=
RequestMethod
。
GET
)
@ResponseBody
public
CommonResult
topicTest
()
{
for
(
int
i
=
0
;
i
<
10
;
i
++){
topicSender
。
send
(
i
);
ThreadUtil
。
sleep
(
1000
);
}
return
CommonResult
。
success
(
null
);
}
}
執行後結果如下,可以發現生產者往佇列中傳送包含不同
路由鍵
的訊息,
instance 1
和
instance 2
分別獲取到了匹配的訊息。
參考資料
RabbitMQ Tutorials:
https://www。
rabbitmq。com/getstarted
。html
專案原始碼地址
https://
github。com/macrozheng/m
all-learning/tree/master/mall-tiny-rabbit
本文 GitHub
https://
github。com/macrozheng/m
all-learning
已經收錄,歡迎大家Star。
上一篇:魚凍凍不住的那些年