您當前的位置:首頁 > 書法

SpringBoot RocketMQ 整合使用和監控

作者:由 zhisheng 發表于 書法時間:2018-02-11

SpringBoot RocketMQ 整合使用和監控

前提

透過前面兩篇文章可以簡單的瞭解 RocketMQ 和 安裝 RocketMQ ,今天就將 SpringBoot 和 RocketMQ 整合起來使用。

相關文章

1、

SpringBoot Kafka 整合使用

2、

SpringBoot RabbitMQ 整合使用

3、

SpringBoot ActiveMQ 整合使用

4、

Kafka 安裝及快速入門

5、

SpringBoot RabbitMQ 整合進階版

6、

RocketMQ 初探

7、

RocketMQ 安裝及快速入門

關注我

微信公眾號:zhisheng

轉載請務必註明原創地址為:

http://www。

54tianzhisheng。cn/2018/

02/07/SpringBoot-RocketMQ/

建立專案

在 IDEA 建立一個 SpringBoot 專案,專案結構如下:

SpringBoot RocketMQ 整合使用和監控

pom 檔案

引入 RocketMQ 的一些相關依賴,最後的 pom 檔案如下:

<?xml version=“1。0” encoding=“UTF-8”?>

xmlns=

“http://maven。apache。org/POM/4。0。0”

xmlns:xsi=

“http://www。w3。org/2001/XMLSchema-instance”

xsi:schemaLocation=

“http://maven。apache。org/POM/4。0。0 http://maven。apache。org/xsd/maven-4。0。0。xsd”

>

4。0。0

com。zhisheng

rocketmq

0。0。1-SNAPSHOT

jar

rocketmq

Demo project for Spring Boot RocketMQ

org。springframework。boot

spring-boot-starter-parent

1。5。9。RELEASE

<!—— lookup parent from repository ——>

UTF-8

UTF-8

1。8

org。springframework。boot

spring-boot-starter-web

org。springframework。boot

spring-boot-starter-test

test

org。apache。rocketmq

rocketmq-common

4。2。0

org。apache。rocketmq

rocketmq-client

4。2。0

org。springframework。boot

spring-boot-maven-plugin

配置檔案

application。properties 中如下:

# 消費者的組名

apache。rocketmq。consumer。PushConsumer=PushConsumer

# 生產者的組名

apache。rocketmq。producer。producerGroup=Producer

# NameServer地址

apache。rocketmq。namesrvAddr=localhost:9876

生產者

package

com。zhisheng。rocketmq。client

import

org。apache。rocketmq。client。producer。DefaultMQProducer

import

org。apache。rocketmq。common。message。Message

import

org。apache。rocketmq。remoting。common。RemotingHelper

import

org。springframework。beans。factory。annotation。Value

import

org。springframework。stereotype。Component

import

org。springframework。util。StopWatch

import

javax。annotation。PostConstruct

/**

* Created by zhisheng_tian on 2018/2/6

*/

@Component

public

class

RocketMQClient

{

/**

* 生產者的組名

*/

@Value

“${apache。rocketmq。producer。producerGroup}”

private

String

producerGroup

/**

* NameServer 地址

*/

@Value

“${apache。rocketmq。namesrvAddr}”

private

String

namesrvAddr

@PostConstruct

public

void

defaultMQProducer

()

{

//生產者的組名

DefaultMQProducer

producer

=

new

DefaultMQProducer

producerGroup

);

//指定NameServer地址,多個地址以 ; 隔開

producer

setNamesrvAddr

namesrvAddr

);

try

{

/**

* Producer物件在使用之前必須要呼叫start初始化,初始化一次即可

* 注意:切記不可以在每次傳送訊息時,都呼叫start方法

*/

producer

start

();

//建立一個訊息例項,包含 topic、tag 和 訊息體

//如下:topic 為 “TopicTest”,tag 為 “push”

Message

message

=

new

Message

“TopicTest”

“push”

“傳送訊息——zhisheng——-”

getBytes

RemotingHelper

DEFAULT_CHARSET

));

StopWatch

stop

=

new

StopWatch

();

stop

start

();

for

int

i

=

0

i

<

10000

i

++)

{

SendResult

result

=

producer

send

message

);

System

out

println

“傳送響應:MsgId:”

+

result

getMsgId

()

+

“,傳送狀態:”

+

result

getSendStatus

());

}

stop

stop

();

System

out

println

“————————傳送一萬條訊息耗時:”

+

stop

getTotalTimeMillis

());

}

catch

Exception

e

{

e

printStackTrace

();

}

finally

{

producer

shutdown

();

}

}

}

消費者

package

com。zhisheng。rocketmq。server

import

org。apache。rocketmq。client。consumer。DefaultMQPushConsumer

import

org。apache。rocketmq。client。consumer。listener。ConsumeConcurrentlyStatus

import

org。apache。rocketmq。client。consumer。listener。MessageListenerConcurrently

import

org。apache。rocketmq。common。consumer。ConsumeFromWhere

import

org。apache。rocketmq。common。message。MessageExt

import

org。apache。rocketmq。remoting。common。RemotingHelper

import

org。springframework。beans。factory。annotation。Value

import

org。springframework。stereotype。Component

import

javax。annotation。PostConstruct

/**

* Created by zhisheng_tian on 2018/2/6

*/

@Component

public

class

RocketMQServer

{

/**

* 消費者的組名

*/

@Value

“${apache。rocketmq。consumer。PushConsumer}”

private

String

consumerGroup

/**

* NameServer 地址

*/

@Value

“${apache。rocketmq。namesrvAddr}”

private

String

namesrvAddr

@PostConstruct

public

void

defaultMQPushConsumer

()

{

//消費者的組名

DefaultMQPushConsumer

consumer

=

new

DefaultMQPushConsumer

consumerGroup

);

//指定NameServer地址,多個地址以 ; 隔開

consumer

setNamesrvAddr

namesrvAddr

);

try

{

//訂閱PushTopic下Tag為push的訊息

consumer

subscribe

“TopicTest”

“push”

);

//設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費

//如果非第一次啟動,那麼按照上次消費的位置繼續消費

consumer

setConsumeFromWhere

ConsumeFromWhere

CONSUME_FROM_FIRST_OFFSET

);

consumer

registerMessageListener

((

MessageListenerConcurrently

list

context

->

{

try

{

for

MessageExt

messageExt

list

{

System

out

println

“messageExt: ”

+

messageExt

);

//輸出訊息內容

String

messageBody

=

new

String

messageExt

getBody

(),

RemotingHelper

DEFAULT_CHARSET

);

System

out

println

“消費響應:msgId : ”

+

messageExt

getMsgId

()

+

“, msgBody : ”

+

messageBody

);

//輸出訊息內容

}

}

catch

Exception

e

{

e

printStackTrace

();

return

ConsumeConcurrentlyStatus

RECONSUME_LATER

//稍後再試

}

return

ConsumeConcurrentlyStatus

CONSUME_SUCCESS

//消費成功

});

consumer

start

();

}

catch

Exception

e

{

e

printStackTrace

();

}

}

}

啟動類

package

com。zhisheng。rocketmq

import

org。springframework。boot。SpringApplication

import

org。springframework。boot。autoconfigure。SpringBootApplication

@SpringBootApplication

public

class

RocketmqApplication

{

public

static

void

main

String

[]

args

{

SpringApplication

run

RocketmqApplication

class

args

);

}

}

RocketMQ

程式碼已經都寫好了,接下來我們需要將與 RocketMQ 有關的啟動起來。

啟動 Name Server

在前面文章中已經寫過怎麼啟動,

http://www。

54tianzhisheng。cn/2018/

02/06/RocketMQ-install/#%E5%90%AF%E5%8A%A8-NameServer

進入到目錄 :

cd distribution/target/apache-rocketmq

啟動:

nohup sh bin/mqnamesrv &

tail -f ~/logs/rocketmqlogs/namesrv。log //透過日誌檢視是否啟動成功

啟動 Broker

nohup sh bin/mqbroker -n localhost:9876 &

tail -f ~/logs/rocketmqlogs/broker。log //透過日誌檢視是否啟動成功

然後執行啟動類,執行效果如下:

SpringBoot RocketMQ 整合使用和監控

監控

RocketMQ有一個對其擴充套件的開源專案 ocketmq-console ,如今也提交給了 Apache ,地址在:

https://github。com/apache/rocketmq-externals/tree/master/rocketmq-console

,官方也給出了其支援的功能的中文文件:

https://github。com/apache/rocketmq-externals/blob/master/rocketmq-console/doc/1_0_0/UserGuide_CN。md

, 那麼該如何安裝?

Docker 安裝

1、獲取 Docker 映象

docker pull styletang/rocketmq-console-ng

2、執行,注意將你自己的 NameServer 地址替換下面的 127。0。0。1

docker run -e “JAVA_OPTS=-Drocketmq。namesrv。addr=127。0。0。1:9876 -Dcom。rocketmq。sendMessageWithVIPChannel=false” -p 8080:8080 -t styletang/rocketmq-console-ng

非 Docker 安裝

我們 git clone 一份程式碼到本地:

git clone https://github。com/apache/rocketmq-externals。git

cd rocketmq-externals/rocketmq-console/

需要 jdk 1。7 以上。 執行以下命令:

mvn spring-boot:run

或者

mvn clean package -Dmaven。test。skip=true

java -jar target/rocketmq-console-ng-1。0。0。jar

注意:

1、如果你下載依賴緩慢,你可以重新設定 maven 的 mirror 為阿里雲的映象

alimaven

aliyun maven

http://maven。aliyun。com/nexus/content/groups/public/

central

2、如果你使用的 RocketMQ 版本小於 3。5。8,如果您使用 rocketmq < 3。5。8,請在啟動 rocketmq-console-ng 時新增

-Dcom。rocketmq。sendMessageWithVIPChannel = false

(或者您可以在 ops 頁面中更改它)

3、更改 resource / application。properties 中的 rocketmq。config。namesrvAddr(或者可以在ops頁面中更改它)

錯誤解決方法

1、Docker 啟動專案報錯

org。apache。rocketmq。remoting。exception。RemotingConnectException: connect to failed

SpringBoot RocketMQ 整合使用和監控

將 Docker 啟動命令改成如下以後:

docker run -e “JAVA_OPTS=-Drocketmq。config。namesrvAddr=127。0。0。1:9876 -Drocketmq。config。isVIPChannel=false” -p 8080:8080 -t styletang/rocketmq-console-ng

報錯資訊改變了,新的報錯資訊如下:

ERROR op=global_exception_handler_print_error

org。apache。rocketmq。console。exception。ServiceException: This date have‘t data!

SpringBoot RocketMQ 整合使用和監控

看到網上有人也遇到這個問題,他們都透過自己的方式解決了,但是方法我都試了,不適合我。不得不說,阿里,你能再用心點嗎?既然把 RocketMQ 捐給 Apache 了,這些文件啥的都必須更新啊,不要還滯後著呢,不然少不了被吐槽!

搞了很久這種方法沒成功,暫時放棄!mmp

2、非 Docker 安裝,只好把原始碼編譯打包了。

1) 注意需要修改如下圖中的配置:

SpringBoot RocketMQ 整合使用和監控

rocketmq。config。namesrvAddr=localhost:9876 //注意替換你自己的ip

#如果你 rocketmq 版本小於 3。5。8 才需設定 `rocketmq。config。isVIPChannel` 為 false,預設是 true, 這個可以在原始碼中可以看到的

rocketmq。config。isVIPChannel=

2) 執行以下命令:

mvn clean package -Dmaven。test。skip=true

編譯成功:

SpringBoot RocketMQ 整合使用和監控

可以看到已經打好了 jar 包:

執行:

java -jar rocketmq-console-ng-1。0。0。jar

成功,不報錯了,開心?,訪

問 http://localhost:808

0/

SpringBoot RocketMQ 整合使用和監控

SpringBoot RocketMQ 整合使用和監控

SpringBoot RocketMQ 整合使用和監控

SpringBoot RocketMQ 整合使用和監控

SpringBoot RocketMQ 整合使用和監控

SpringBoot RocketMQ 整合使用和監控

SpringBoot RocketMQ 整合使用和監控

整個監控大概就是這些了。

然後我執行之前的 SpringBoot 整合專案,檢視監控資訊如下:

SpringBoot RocketMQ 整合使用和監控

總結

整篇文章講述了 SpringBoot 與 RocketMQ 整合和 RocketMQ 監控平臺的搭建。

參考文章

1、

http://www。ymq。io/2018/02/02/spring-boot-rocketmq-example/#%E6%96%B0%E5%8A%A0%E9%A1%B9%E7%9B%AE

2、GitHub 官方 README