SpringBoot RocketMQ 整合使用和監控
前提
透過前面兩篇文章可以簡單的瞭解 RocketMQ 和 安裝 RocketMQ ,今天就將 SpringBoot 和 RocketMQ 整合起來使用。
相關文章
1、
SpringBoot Kafka 整合使用
2、
SpringBoot RabbitMQ 整合使用
3、
SpringBoot ActiveMQ 整合使用
4、
Kafka 安裝及快速入門
5、
SpringBoot RabbitMQ 整合進階版
6、
RocketMQ 初探
7、
RocketMQ 安裝及快速入門
關注我
微信公眾號:zhisheng
轉載請務必註明原創地址為:
http://www。
54tianzhisheng。cn/2018/
02/07/SpringBoot-RocketMQ/
建立專案
在 IDEA 建立一個 SpringBoot 專案,專案結構如下:
pom 檔案
引入 RocketMQ 的一些相關依賴,最後的 pom 檔案如下:
<?xml version=“1。0” encoding=“UTF-8”?>
xmlns= “http://maven。apache。org/POM/4。0。0” xmlns:xsi= “http://www。w3。org/2001/XMLSchema-instance” xsi:schemaLocation= “http://maven。apache。org/POM/4。0。0 http://maven。apache。org/xsd/maven-4。0。0。xsd” > 4。0。0 com。zhisheng rocketmq 0。0。1-SNAPSHOT jar rocketmq Demo project for Spring Boot RocketMQ org。springframework。boot spring-boot-starter-parent 1。5。9。RELEASE <!—— lookup parent from repository ——> UTF-8
UTF-8
1。8
org。springframework。boot
spring-boot-starter-web
org。springframework。boot
spring-boot-starter-test
test
org。apache。rocketmq
rocketmq-common
4。2。0
org。apache。rocketmq
rocketmq-client
4。2。0
org。springframework。boot
spring-boot-maven-plugin
配置檔案
application。properties 中如下:
# 消費者的組名
apache。rocketmq。consumer。PushConsumer=PushConsumer
# 生產者的組名
apache。rocketmq。producer。producerGroup=Producer
# NameServer地址
apache。rocketmq。namesrvAddr=localhost:9876
生產者
package
com。zhisheng。rocketmq。client
;
import
org。apache。rocketmq。client。producer。DefaultMQProducer
;
import
org。apache。rocketmq。common。message。Message
;
import
org。apache。rocketmq。remoting。common。RemotingHelper
;
import
org。springframework。beans。factory。annotation。Value
;
import
org。springframework。stereotype。Component
;
import
org。springframework。util。StopWatch
;
import
javax。annotation。PostConstruct
;
/**
* Created by zhisheng_tian on 2018/2/6
*/
@Component
public
class
RocketMQClient
{
/**
* 生產者的組名
*/
@Value
(
“${apache。rocketmq。producer。producerGroup}”
)
private
String
producerGroup
;
/**
* NameServer 地址
*/
@Value
(
“${apache。rocketmq。namesrvAddr}”
)
private
String
namesrvAddr
;
@PostConstruct
public
void
defaultMQProducer
()
{
//生產者的組名
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
producerGroup
);
//指定NameServer地址,多個地址以 ; 隔開
producer
。
setNamesrvAddr
(
namesrvAddr
);
try
{
/**
* Producer物件在使用之前必須要呼叫start初始化,初始化一次即可
* 注意:切記不可以在每次傳送訊息時,都呼叫start方法
*/
producer
。
start
();
//建立一個訊息例項,包含 topic、tag 和 訊息體
//如下:topic 為 “TopicTest”,tag 為 “push”
Message
message
=
new
Message
(
“TopicTest”
,
“push”
,
“傳送訊息——zhisheng——-”
。
getBytes
(
RemotingHelper
。
DEFAULT_CHARSET
));
StopWatch
stop
=
new
StopWatch
();
stop
。
start
();
for
(
int
i
=
0
;
i
<
10000
;
i
++)
{
SendResult
result
=
producer
。
send
(
message
);
System
。
out
。
println
(
“傳送響應:MsgId:”
+
result
。
getMsgId
()
+
“,傳送狀態:”
+
result
。
getSendStatus
());
}
stop
。
stop
();
System
。
out
。
println
(
“————————傳送一萬條訊息耗時:”
+
stop
。
getTotalTimeMillis
());
}
catch
(
Exception
e
)
{
e
。
printStackTrace
();
}
finally
{
producer
。
shutdown
();
}
}
}
消費者
package
com。zhisheng。rocketmq。server
;
import
org。apache。rocketmq。client。consumer。DefaultMQPushConsumer
;
import
org。apache。rocketmq。client。consumer。listener。ConsumeConcurrentlyStatus
;
import
org。apache。rocketmq。client。consumer。listener。MessageListenerConcurrently
;
import
org。apache。rocketmq。common。consumer。ConsumeFromWhere
;
import
org。apache。rocketmq。common。message。MessageExt
;
import
org。apache。rocketmq。remoting。common。RemotingHelper
;
import
org。springframework。beans。factory。annotation。Value
;
import
org。springframework。stereotype。Component
;
import
javax。annotation。PostConstruct
;
/**
* Created by zhisheng_tian on 2018/2/6
*/
@Component
public
class
RocketMQServer
{
/**
* 消費者的組名
*/
@Value
(
“${apache。rocketmq。consumer。PushConsumer}”
)
private
String
consumerGroup
;
/**
* NameServer 地址
*/
@Value
(
“${apache。rocketmq。namesrvAddr}”
)
private
String
namesrvAddr
;
@PostConstruct
public
void
defaultMQPushConsumer
()
{
//消費者的組名
DefaultMQPushConsumer
consumer
=
new
DefaultMQPushConsumer
(
consumerGroup
);
//指定NameServer地址,多個地址以 ; 隔開
consumer
。
setNamesrvAddr
(
namesrvAddr
);
try
{
//訂閱PushTopic下Tag為push的訊息
consumer
。
subscribe
(
“TopicTest”
,
“push”
);
//設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費
//如果非第一次啟動,那麼按照上次消費的位置繼續消費
consumer
。
setConsumeFromWhere
(
ConsumeFromWhere
。
CONSUME_FROM_FIRST_OFFSET
);
consumer
。
registerMessageListener
((
MessageListenerConcurrently
)
(
list
,
context
)
->
{
try
{
for
(
MessageExt
messageExt
:
list
)
{
System
。
out
。
println
(
“messageExt: ”
+
messageExt
);
//輸出訊息內容
String
messageBody
=
new
String
(
messageExt
。
getBody
(),
RemotingHelper
。
DEFAULT_CHARSET
);
System
。
out
。
println
(
“消費響應:msgId : ”
+
messageExt
。
getMsgId
()
+
“, msgBody : ”
+
messageBody
);
//輸出訊息內容
}
}
catch
(
Exception
e
)
{
e
。
printStackTrace
();
return
ConsumeConcurrentlyStatus
。
RECONSUME_LATER
;
//稍後再試
}
return
ConsumeConcurrentlyStatus
。
CONSUME_SUCCESS
;
//消費成功
});
consumer
。
start
();
}
catch
(
Exception
e
)
{
e
。
printStackTrace
();
}
}
}
啟動類
package
com。zhisheng。rocketmq
;
import
org。springframework。boot。SpringApplication
;
import
org。springframework。boot。autoconfigure。SpringBootApplication
;
@SpringBootApplication
public
class
RocketmqApplication
{
public
static
void
main
(
String
[]
args
)
{
SpringApplication
。
run
(
RocketmqApplication
。
class
,
args
);
}
}
RocketMQ
程式碼已經都寫好了,接下來我們需要將與 RocketMQ 有關的啟動起來。
啟動 Name Server
在前面文章中已經寫過怎麼啟動,
http://www。
54tianzhisheng。cn/2018/
02/06/RocketMQ-install/#%E5%90%AF%E5%8A%A8-NameServer
進入到目錄 :
cd distribution/target/apache-rocketmq
啟動:
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv。log //透過日誌檢視是否啟動成功
啟動 Broker
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker。log //透過日誌檢視是否啟動成功
然後執行啟動類,執行效果如下:
監控
RocketMQ有一個對其擴充套件的開源專案 ocketmq-console ,如今也提交給了 Apache ,地址在:
https://github。com/apache/rocketmq-externals/tree/master/rocketmq-console
,官方也給出了其支援的功能的中文文件:
https://github。com/apache/rocketmq-externals/blob/master/rocketmq-console/doc/1_0_0/UserGuide_CN。md
, 那麼該如何安裝?
Docker 安裝
1、獲取 Docker 映象
docker pull styletang/rocketmq-console-ng
2、執行,注意將你自己的 NameServer 地址替換下面的 127。0。0。1
docker run -e “JAVA_OPTS=-Drocketmq。namesrv。addr=127。0。0。1:9876 -Dcom。rocketmq。sendMessageWithVIPChannel=false” -p 8080:8080 -t styletang/rocketmq-console-ng
非 Docker 安裝
我們 git clone 一份程式碼到本地:
git clone https://github。com/apache/rocketmq-externals。git
cd rocketmq-externals/rocketmq-console/
需要 jdk 1。7 以上。 執行以下命令:
mvn spring-boot:run
或者
mvn clean package -Dmaven。test。skip=true
java -jar target/rocketmq-console-ng-1。0。0。jar
注意:
1、如果你下載依賴緩慢,你可以重新設定 maven 的 mirror 為阿里雲的映象
alimaven
aliyun maven
http://maven。aliyun。com/nexus/content/groups/public/
central
2、如果你使用的 RocketMQ 版本小於 3。5。8,如果您使用 rocketmq < 3。5。8,請在啟動 rocketmq-console-ng 時新增
-Dcom。rocketmq。sendMessageWithVIPChannel = false
(或者您可以在 ops 頁面中更改它)
3、更改 resource / application。properties 中的 rocketmq。config。namesrvAddr(或者可以在ops頁面中更改它)
錯誤解決方法
1、Docker 啟動專案報錯
org。apache。rocketmq。remoting。exception。RemotingConnectException: connect to
將 Docker 啟動命令改成如下以後:
docker run -e “JAVA_OPTS=-Drocketmq。config。namesrvAddr=127。0。0。1:9876 -Drocketmq。config。isVIPChannel=false” -p 8080:8080 -t styletang/rocketmq-console-ng
報錯資訊改變了,新的報錯資訊如下:
ERROR op=global_exception_handler_print_error
org。apache。rocketmq。console。exception。ServiceException: This date have‘t data!
看到網上有人也遇到這個問題,他們都透過自己的方式解決了,但是方法我都試了,不適合我。不得不說,阿里,你能再用心點嗎?既然把 RocketMQ 捐給 Apache 了,這些文件啥的都必須更新啊,不要還滯後著呢,不然少不了被吐槽!
搞了很久這種方法沒成功,暫時放棄!mmp
2、非 Docker 安裝,只好把原始碼編譯打包了。
1) 注意需要修改如下圖中的配置:
rocketmq。config。namesrvAddr=localhost:9876 //注意替換你自己的ip
#如果你 rocketmq 版本小於 3。5。8 才需設定 `rocketmq。config。isVIPChannel` 為 false,預設是 true, 這個可以在原始碼中可以看到的
rocketmq。config。isVIPChannel=
2) 執行以下命令:
mvn clean package -Dmaven。test。skip=true
編譯成功:
可以看到已經打好了 jar 包:
執行:
java -jar rocketmq-console-ng-1。0。0。jar
成功,不報錯了,開心?,訪
問 http://localhost:808
0/
整個監控大概就是這些了。
然後我執行之前的 SpringBoot 整合專案,檢視監控資訊如下:
總結
整篇文章講述了 SpringBoot 與 RocketMQ 整合和 RocketMQ 監控平臺的搭建。
參考文章
1、
http://www。ymq。io/2018/02/02/spring-boot-rocketmq-example/#%E6%96%B0%E5%8A%A0%E9%A1%B9%E7%9B%AE
2、GitHub 官方 README