一.准备3台centos7.9服务器


二.下载 rocketmq-5.1.4.tar.gz 分别上传到3台服务器 /usr/local/ 目录

# 解压
tar xf /usr/local/ rocketmq-5.1.4.tar.gz

下载jdk11并解压到 /usr/local/jdk-11

分别设置服务器主机别名

hostnamectl set-hostname normal-node5
hostnamectl set-hostname normal-node6
hostnamectl set-hostname normal-node7

3台服务器分别配置环境变量

vi /etc/profile
# jdk11 config
export JAVA_HOME=/usr/local/jdk-11
export CLASSPATH=$JAVA_HOME/lib
export PATH=$PATH:$JAVA_HOME/bin

# rocketmq config
PATH=$ROCKETMQ_HOME/bin:$JAVA_HOME/bin:$PATH:$HOME/.local/bin:$HOME/bin
export ROCKETMQ_HOME=/usr/local/rocketmq-5.1.4
export NAMESRV_ADDR='normal-node7:9876;normal-node6:9876;normal-node5:9876'
# 刷新环境变量
source /etc/profile


三. 启动mqnamesrv服务

由于RocketMQ默认预设JVM内存是4G,这是RocketMQ的最佳配置,但是如果虚拟机或云服务器内存不足,就需要修改JVM内存

# 进入bin目录
cd /usr/local/rocketmq-5.1.4/bin
# 编辑启动文件
vi ./runserver.sh


启动服务

nohup ./mqnamesrv &

查询运行日志

tail -f nohup.out

日志中显示boot success成功


四. 启动broker服务集群

conf目录下存在三种配置方式

        2m-2s-async:2主2从异步刷盘(吞吐量大,但是消息可能丢失)


        2m-2s-sync:2主2从同步刷盘(吞吐量会下降,但是消息更安全)


        2m-2s-notslve:2主无从(单点故障),然后可以直接配置broker.conf,进行单点环境配置


        dleger是用来实现主从切换的,集群中的节点会基于Raft协议随机选举出一个leader,其他的就是follower,

通常正式环境都会采用这种方式搭建集群

搭建2主2从模式,配置2m-2s-async目录Broker文件

连接 normal-node6 服务器

# 进入配置文件目录
cd /usr/local/rocketmq-5.1.4/conf/2m-2s-async/
# 修改broker a节点配置
vi broker-a.properties
# 外网ip(需要外网调用是开启,建议线上关闭)
brokerIP1=192.168.56.116
# 所属集群名字
brokerClusterName=DefaultCluster
# broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
# 0 表示 Master,>0 表示 Slave
brokerId=0
# nameServer地址,分号分割
namesrvAddr=normal-node7:9876;normal-node6:9876;normal-node5:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=false
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=10911
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
# 文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=600000
# 删除的文件被引用时,不会马上被删除,最大的存活时间
destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
# 存储使用率阀值,当使用率超过阀值时,将拒绝发送消息请求
diskMaxUsedSpaceRatio=88
# 磁盘空间警戒阈值,超过这个值则停止接受消息,默认值90
diskSpaceWarningLevelRatio=90
# 强制删除文件阈值,默认85
diskSpaceCleanForciblyRatio=85
# 存储路径
storePathRootDir=/usr/app/rocketmq/store
# commitLog 存储路径
storePathCommitLog=/usr/app/rocketmq/store/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/usr/app/rocketmq/store/comsumequeue
# 消息索引存储路径
storePathIndex=/usr/app/rocketmq/store/index
# checkpoint 文件存储路径
storeCheckpoint=/usr/app/rocketmq/store/checkpoint
# abort 文件存储路径
abortFile=/usr/app/rocketmq/store/abort
# 限制的消息大小
maxMessageSize=4194304
# commitLog最少刷盘page数
flushCommitLogLeastPages=4
# consumeQueue最少刷盘page数
flushConsumeQueueLeastPages=2
# commitLog刷盘间隔时间
flushCommitLogThoroughInterval=10000
# consumeQueue刷盘间隔时间
flushConsumeQueueThoroughInterval=60000
# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=ASYNC_MASTER
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
# 发消息线程池数量,默认1
sendMessageThreadPoolNums=8
# 服务端处理消息拉取线程池线程数量 默认为16加上当前操作系统CPU核数的两倍,默认32
pullMessageThreadPoolNums=32
# 修改broker b-s节点配置
vi broker-b-s.properties
# 外网ip(需要外网调用是开启,建议线上关闭)
brokerIP1=192.168.56.116
# 所属集群名字
brokerClusterName=DefaultCluster
# broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b-s
# 0 表示 Master,>0 表示 Slave
brokerId=1
# nameServer地址,分号分割
namesrvAddr=normal-node7:9876;normal-node6:9876;normal-node5:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=false
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=11011
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
# 文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=600000
# 删除的文件被引用时,不会马上被删除,最大的存活时间
destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
# 存储使用率阀值,当使用率超过阀值时,将拒绝发送消息请求
diskMaxUsedSpaceRatio=88
# 磁盘空间警戒阈值,超过这个值则停止接受消息,默认值90
diskSpaceWarningLevelRatio=90
# 强制删除文件阈值,默认85
diskSpaceCleanForciblyRatio=85
# 存储路径
storePathRootDir=/usr/app/rocketmq/storeSlave
# commitLog 存储路径
storePathCommitLog=/usr/app/rocketmq/storeSlave/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/usr/app/rocketmq/storeSlave/consumequeue
# 消息索引存储路径
storePathIndex=/usr/app/rocketmq/storeSlave/index
# checkpoint 文件存储路径
storeCheckpoint=/usr/app/rocketmq/storeSlave/checkpoint
# abort 文件存储路径
abortFile=/usr/app/rocketmq/storeSlave/abort
# 限制的消息大小
maxMessageSize=4194304
# commitLog最少刷盘page数
flushCommitLogLeastPages=4
# consumeQueue最少刷盘page数
flushConsumeQueueLeastPages=2
# commitLog刷盘间隔时间
flushCommitLogThoroughInterval=10000
# consumeQueue刷盘间隔时间
flushConsumeQueueThoroughInterval=60000
# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=SLAVE
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
# 发消息线程池数量,默认1
sendMessageThreadPoolNums=8
# 服务端处理消息拉取线程池线程数量 默认为16加上当前操作系统CPU核数的两倍,默认32
pullMessageThreadPoolNums=32

连接 normal-node5 服务器

# 进入配置文件目录
cd /usr/local/rocketmq-5.1.4/conf/2m-2s-async/
# 修改broker b节点配置
vi broker-b.properties
# 外网ip(需要外网调用是开启,建议线上关闭)
brokerIP1=192.168.56.115
# 所属集群名字
brokerClusterName=DefaultCluster
# broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
# 0 表示 Master,>0 表示 Slave
brokerId=0
# nameServer地址,分号分割
namesrvAddr=normal-node7:9876;normal-node6:9876;normal-node5:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=false
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=10911
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
# 文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=600000
# 删除的文件被引用时,不会马上被删除,最大的存活时间
destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
# 存储使用率阀值,当使用率超过阀值时,将拒绝发送消息请求
diskMaxUsedSpaceRatio=88
# 磁盘空间警戒阈值,超过这个值则停止接受消息,默认值90
diskSpaceWarningLevelRatio=90
# 强制删除文件阈值,默认85
diskSpaceCleanForciblyRatio=85
# 存储路径
storePathRootDir=/usr/app/rocketmq/store
# commitLog 存储路径
storePathCommitLog=/usr/app/rocketmq/store/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/usr/app/rocketmq/store/comsumequeue
# 消息索引存储路径
storePathIndex=/usr/app/rocketmq/store/index
# checkpoint 文件存储路径
storeCheckpoint=/usr/app/rocketmq/store/checkpoint
# abort 文件存储路径
abortFile=/usr/app/rocketmq/store/abort
# 限制的消息大小
maxMessageSize=4194304
# commitLog最少刷盘page数
flushCommitLogLeastPages=4
# consumeQueue最少刷盘page数
flushConsumeQueueLeastPages=2
# commitLog刷盘间隔时间
flushCommitLogThoroughInterval=10000
# consumeQueue刷盘间隔时间
flushConsumeQueueThoroughInterval=60000
# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=ASYNC_MASTER
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
# 发消息线程池数量,默认1
sendMessageThreadPoolNums=8
# 服务端处理消息拉取线程池线程数量 默认为16加上当前操作系统CPU核数的两倍,默认32
pullMessageThreadPoolNums=32

# 修改broker a-s节点配置
vi broker-a-s.properties
# 外网ip(需要外网调用是开启,建议线上关闭)
brokerIP1=192.168.56.115
# 所属集群名字
brokerClusterName=DefaultCluster
# broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a-s
# 0 表示 Master,>0 表示 Slave
brokerId=1
# nameServer地址;分号分割
namesrvAddr=normal-node7:9876;normal-node6:9876;normal-node5:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=false
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=11011
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
# 文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=600000
# 删除的文件被引用时,不会马上被删除,最大的存活时间
destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
# 存储使用率阀值,当使用率超过阀值时,将拒绝发送消息请求
diskMaxUsedSpaceRatio=88
# 磁盘空间警戒阈值,超过这个值则停止接受消息,默认值90
diskSpaceWarningLevelRatio=90
# 强制删除文件阈值,默认85
diskSpaceCleanForciblyRatio=85
# 存储路径
storePathRootDir=/usr/app/rocketmq/storeSlave
# commitLog 存储路径
storePathCommitLog=/usr/app/rocketmq/storeSlave/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/usr/app/rocketmq/storeSlave/consumequeue
# 消息索引存储路径
storePathIndex=/usr/app/rocketmq/storeSlave/index
# checkpoint 文件存储路径
storeCheckpoint=/usr/app/rocketmq/storeSlave/checkpoint
# abort 文件存储路径
abortFile=/usr/app/rocketmq/storeSlave/abort
# 限制的消息大小
maxMessageSize=4194304
# commitLog最少刷盘page数
flushCommitLogLeastPages=4
# consumeQueue最少刷盘page数
flushConsumeQueueLeastPages=2
# commitLog刷盘间隔时间
flushCommitLogThoroughInterval=10000
# consumeQueue刷盘间隔时间
flushConsumeQueueThoroughInterval=60000
# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=SLAVE
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
# 发消息线程池数量,默认1
sendMessageThreadPoolNums=8
# 服务端处理消息拉取线程池线程数量 默认为16加上当前操作系统CPU核数的两倍,默认32
pullMessageThreadPoolNums=32

注意:同一机器上的两个实例的store目录不能相同,否则报错Lock failed,MQ already started

同一机器上两个实例的listenPort也不能相同,否则报错端口被占用

如果是多网卡的机器,比如云服务器,需要在broker.conf中增加brokerIP1属性,指定所在机器的外网网卡地址


启动broker的脚本是runbroker.sh broker的默认预设内存是8G,如果内存不足同样需要调整JVM

vi runbroker.sh

普通模式启动broker集群

连接normal-node6服务器

# 启动broker-a
nohup ./mqbroker -c /usr/local/rocketmq-5.1.4/conf/2m-2s-async/broker-a.properties &
# 启动broker-b-s
nohup ./mqbroker -c /usr/local/rocketmq-5.1.4/conf/2m-2s-async/broker-b-s.properties &

连接normal-node5服务器

# 启动broker-b
nohup ./mqbroker -c /usr/local/rocketmq-5.1.4/conf/2m-2s-async/broker-b.properties &
# 启动broker-a-s
nohup ./mqbroker -c /usr/local/rocketmq-5.1.4/conf/2m-2s-async/broker-a-s.properties &

查看启动日志

tail -300f nohup.out
# 启动成功日志输出
The broker[broker-b-s, 192.168.56.116:11011] boot success. serializeType=JSON and name server is
 normal-node7:9876;normal-node6:9876;normal-node5:9876

查询服务其他日志,日志目录 /root/logs/rocketmqlogs

tail -300 /root/logs/rocketmqlogs/proxy.log

tail -300 /root/logs/rocketmqlogs/broker.log

tail -300 /root/logs/rocketmqlogs/namesrv.log


测试 

# 进入目录
cd /usr/local/rocketmq-5.1.4/bin
# 在normal-node6上用工具发送数据
./tools.sh org.apache.rocketmq.example.quickstart.Producer

# 在normal-node5上用工具接收数据
./tools.sh org.apache.rocketmq.example.quickstart.Consumer

服务搭建成功


五.使用5.x版本proxy代理启动broker

其他步骤和普通模式一样,默认rmq-proxy的监听端口是8080,如果端口有冲突的话可以通过conf/rmq-proxy.json配置文件中的参数

# 进入目录
cd /usr/local/rocketmq-5.1.4/conf/rmq-proxy.json

配置broker-a和broker-b启动参数:

{
  "rocketMQClusterName": "DefaultCluster",
  "grpcServerPort": 8081,
  "remotingListenPort": 8080
}

代理启动broker-a:

nohup ./mqbroker -c /usr/local/rocketmq-5.1.4/conf/2m-2s-async/broker-a.properties \
-pc ../conf/rmq-proxy.json --enable-proxy &

代理启动broker-b:

nohup ./mqbroker -c /usr/local/rocketmq-5.1.4/conf/2m-2s-async/broker-b.properties \
-pc ../conf/rmq-proxy.json --enable-proxy &

配置broker-a-s和broker-b-s启动参数:{

  "rocketMQClusterName": "DefaultCluster",
  "grpcServerPort": 9081,
  "remotingListenPort": 9080
}

代理启动broker-a-s:

nohup ./mqbroker -c /usr/local/rocketmq-5.1.4/conf/2m-2s-async/broker-a-s.properties \
-pc ../conf/rmq-proxy.json --enable-proxy &

代理启动broker-b-s:

nohup ./mqbroker -c /usr/local/rocketmq-5.1.4/conf/2m-2s-async/broker-b-s.properties \
-pc ../conf/rmq-proxy.json --enable-proxy &

查看启动日志

tail -300f nohup.out
# 启动成功日志输出
Wed Jan 03 18:34:56 UTC 2024 rocketmq-proxy startup successfully

测试...