一 RocketMQ 下载安装
1 下载 RocketMQ:
** 下载当前最新版本RocketMQ**
官网下载: https://dist.apache.org/repos/dist/release/rocketmq/5.3.0/rocketmq-all-5.3.0-bin-release.zip
wget https://dist.apache.org/repos/dist/release/rocketmq/5.3.0/rocketmq-al l-5.3.0-bin-release.zip
执行下载图:
下载成功图:
2 安装RocketMQ:
安装过程非常简单,解压RocketMQ压缩包即可
unzip rocketmq-all-5.3.0-bin-release.zip
解压过程中:
3 验证安装
1 启动NameServer
以后台启动NameServer服务:
nohup sh bin/mqnamesrv &
执行后看到创建了个后台进程,但此时并无法看到日记
打开日记查看执行效果:
tail -f nohup.out
2 启动Broker
启动Broker 可以加上--enable-proxy 方式启动代理,也可正常启动不使用代理,如下:
# 开启代理方式启动
nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
# 默认不使用代理方式启动
nohup sh bin/mqbroker -n localhost:9876
启动成功如下图:
启动成功, 看一下brokerIP xx.xx.xx.xx10911 如果是内网IP外网是无法访问的,需要配置外网IP,云服务器如果使用默认配置一般是内网IP
** **
配置broker外网IP
** rocketMQ主目录\conf\broker.conf**
brokerIP1=外网IP地址
增加后如下图:
3 测试连通
使用自带工具验证本地环境:
生产端发送测试
#先设置工具依赖变量
export NAMESRV_ADDR=localhost:9876
#测试生产端发送
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
执行如下:
从上图中可以看到send_ok,说明生产端已正常发送信息到队列。
测试接收端:
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
执行后则到队列的信息如下图:
按ctrl+c结束接收测试。
从上面看,本地生产发送与接收数据正常,基本可以判断本地安装正常。
注意事项
broket 启动时默认启动脚本内存参数是使用8G内存。如果您的内存足够可以继续增加,如果内存有限则要缩小, 如果内存小于8G可能存在报错:
修改默认内存,文件位置:bin/runbroker.sh,如下图:
用vi打开脚本,找到配置内存参数如下图:
配置项:JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g",根据实际内存业务情况,变更-Xms8g -Xmx8g参数大小即可。
二 JAVA客户端连接
1 mvn 项目引入依赖如下:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.5.2</version>
</dependency>
2 生产发送端java代码:
java生产端发送消息代码:
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
@Service
public class MQRocketServiceImpl {
private DefaultMQProducer producer;
private static final Logger logger = LogManager.getLogger(MQRocketServiceImpl.class);
@PostConstruct
public void initProducer() throws MQClientException {
producer = new DefaultMQProducer("CONSUMER_GROUP");
producer.setNamesrvAddr(xx.xx.xx.xx:9876);
producer.setInstanceName(RunTimeUtil.getRocketMqUniqeInstanceName());
producer.start();
}
@PreDestroy
public void shutdownProducer() {
if (producer != null) {
producer.shutdown();
}
}
public boolean sendMsg(String text, String key) {
try {
Message msg = new Message(
MqCfg.TOPIC,
MqCfg.SUB_EXPRESSION,
key,
text.getBytes(StandardCharsets.UTF_8) // 使用标准字符集
);
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
logger.info("Message sent successfully: {}", sendResult);
}
@Override
public void onException(Throwable e) {
logger.error("Failed to send message", e);
}
});
return true;
} catch (UnsupportedEncodingException | MQClientException | RemotingException | InterruptedException e) {
logger.error("Error sending message", e);
return false;
}
}
}
执行后发送成功打印结果:
3 服务端接收代码:
java服务端接收代码
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class MsgReceiveServiceImpl implements ApplicationRunner {
@Autowired
private PackageHandlerImpl packageHandler;
private static final Logger logger = LogManager.getLogger(MsgReceiveServiceImpl.class);
@Override
public void run(ApplicationArguments args) {
receiveQueue();
}
private void receiveQueue() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(MqCfg.CONSUMER_GROUP);
consumer.setNamesrvAddr("xx.xx.xx.xx:9076");
try {
consumer.subscribe(MqCfg.TOPIC, MqCfg.SUB_EXPRESSION);
consumer.registerMessageListener((MessageListenerOrderly) this::processMessages);
consumer.start();
logger.info("MQ消费者启动成功。");
} catch (MQClientException e) {
logger.error("MQ消费者启动失败!", e);
throw new RuntimeException("连接MQ错误,启动失败!", e);
}
}
private ConsumeOrderlyStatus processMessages(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
try {
String text = new String(msg.getBody());
//消息处理……
} catch (Exception e) {
logger.error("处理消息出错,key={},错误信息:", msg.getKeys(), e);
// 可以在此处根据业务需求返回 SUSPEND_CURRENT_QUEUE_A_MOMENT,或者选择其他处理方式
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
}
以上代码实现ApplicationRunner接口,使其作为后台一个线程单独处理消息。
三 RocketMQ 运维
1 RocketMQ面板,可视化管理
RocketMQ与apacheMQ不同,本身没有自带面板查看状态工具,查看队列的状态要依赖命令行这对运维或开发都非常不方便,需要另外安装Rocket MQ 面板工具rocketmq-dashboard:
开源地址:https://github.com/apache/rocketmq-dashboard
下载回来后直接用IDE开发环境运行,也可以打包后放在服务器一起运行,如下图:
开发环境运行视图如下
除了一般的统计信息,还可以进行管理,功能丰富,如下图:
有此神器作观察,相信运维不再什么难事。
2 RocketMQ变更默认端口
1 修改方式一(4.x ,以前的版本是可以的):
要更改 RocketMQ 的本地部署中的端口,您需要修改 RocketMQ 的配置文件。RocketMQ 的配置 主要包括
broker.conf
和
namesrv.conf
这两个文件。
- 找到配置文件:- 找到 RocketMQ 的安装目录下的
conf
文件夹。- 在conf
文件夹中,你会看到broker.conf
和namesrv.conf
。 - 修改 Broker 端口:- 打开
broker.conf
文件。- 查找listen_port
这一行,这是 Broker 的监听端口,默认通常是10911
。- 更改listen_port
的值为所需的端口号。 - 修改 NameServer 端口:- 打开
namesrv.conf
文件。- 查找NAMESRV_PORT
这一行,这是 NameServer 的监听端口,默认通常是9876
。- 更改NAMESRV_PORT
的值为所需的端口号。
2 端口修改方式二(当前最新的版本5.x)
在使用上面的方式修改端口后发现失效,只能查看源代码:
看NameSrv模块源码发现服务固定是9876,后面通加载参数c 判断配置文件路径加载,如下图:
跟进MixAll.properties2Object,发现只是根据类的参数与类型匹配加载,如下图:
直接看配置参数类:
这次我们要改端口,所以新建个配置文件,只需增加端口配置即可:
文件内容就一个字段: listenPort = xxxx 端口号,如下图:
启动测试,修改成功如下图:
# 启动名称服务 xxx.conf是配置文件路径,可以使用相对路径
nohup sh bin/mqnamesrv -c xxx.conf &
查看了 broker 模块源码发现启动也是一样,所以端口修改方式也是同上 ,修改后启动成功如下图:
3 启停脚本
1 启动RocketMQ脚本:
启动脚本,只需执行脚本就可以快速启动MQ,以下是启动脚本代码如下:
#!/bin/bash
# 启动 Nameserver
echo 'Starting MQ NameServer...'
nohup sh bin/mqnamesrv > mq.log 2>&1 &
sleep 5
# 检查 Nameserver 是否启动成功
if ps aux | grep -v grep | grep -q 'mqnamesrv'; then
echo 'MQ NameServer started successfully.'
else
echo 'Failed to start MQ NameServer.'
exit 1
fi
# 启动 Broker
echo 'Starting MQ Broker...'
nohup sh bin/mqbroker -n 0.0.0.0:9876 >> mq.log 2>&1 &
sleep 5
# 检查 Broker 是否启动成功
if ps aux | grep -v grep | grep -q 'mqbroker'; then
echo 'MQ Broker started successfully.'
else
echo 'Failed to start MQ Broker.'
exit 1
fi
# 显示日志
tail -f mq.log
执行后同时打印日记,退出只需按ctrl+c即可。执行启动脚本成功如下图:
2 停止脚本:
停止RocketMQ脚本:
#!/bin/bash
# 关闭 Nameserver
echo 'Closing MQ NameServer...'
sh bin/mqshutdown namesrv
# 检查 Nameserver 是否成功关闭
sleep 5
if ! ps aux | grep -v grep | grep -q 'mqnamesrv'; then
echo 'MQ NameServer closed successfully.'
else
echo 'Failed to close MQ NameServer.'
exit 1
fi
# 关闭 Broker
echo 'Closing MQ Broker...'
sh bin/mqshutdown broker
# 检查 Broker 是否成功关闭
sleep 20
if ! ps aux | grep -v grep | grep -q 'mqbroker'; then
echo 'MQ Broker closed successfully.'
else
echo 'Failed to close MQ Broker.'
exit 1
fi
执行停止脚本成功停止,如下图:
注意:停止broker服务花的时间通常比较长,如果显示停止失败可以多次调用或者在脚本延长等待时间即可。
版权归原作者 qyhua 所有, 如有侵权,请联系我们删除。