0


Linux 下 RocketMQ 安装、配置与运维(详细讲解)

一 RocketMQ 下载安装

1 下载 RocketMQ

** 下载当前最新版本RocketMQ**

官网下载: https://dist.apache.org/repos/dist/release/rocketmq/5.3.0/rocketmq-all-5.3.0-bin-release.zip

  1. wget https://dist.apache.org/repos/dist/release/rocketmq/5.3.0/rocketmq-al l-5.3.0-bin-release.zip

执行下载图:

下载成功图:

2 安装RocketMQ

安装过程非常简单,解压RocketMQ压缩包即可

  1. unzip rocketmq-all-5.3.0-bin-release.zip
  1. 解压过程中:

3 验证安装

1 启动NameServer

以后台启动NameServer服务:

  1. nohup sh bin/mqnamesrv &

执行后看到创建了个后台进程,但此时并无法看到日记

打开日记查看执行效果:

  1. tail -f nohup.out

2 启动Broker

启动Broker 可以加上--enable-proxy 方式启动代理,也可正常启动不使用代理,如下:

  1. # 开启代理方式启动
  2. nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
  3. # 默认不使用代理方式启动
  4. nohup sh bin/mqbroker -n localhost:9876

启动成功如下图:

启动成功, 看一下brokerIP xx.xx.xx.xx10911 如果是内网IP外网是无法访问的,需要配置外网IP,云服务器如果使用默认配置一般是内网IP

** **
配置broker外网IP

** rocketMQ主目录\conf\broker.conf**

  1. brokerIP1=外网IP地址

增加后如下图:

3 测试连通

使用自带工具验证本地环境:

生产端发送测试

  1. #先设置工具依赖变量
  2. export NAMESRV_ADDR=localhost:9876
  3. #测试生产端发送
  4. sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

执行如下:

从上图中可以看到send_ok,说明生产端已正常发送信息到队列。

测试接收端:

  1. sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

执行后则到队列的信息如下图:

按ctrl+c结束接收测试。

从上面看,本地生产发送与接收数据正常,基本可以判断本地安装正常。

注意事项

broket 启动时默认启动脚本内存参数是使用8G内存。如果您的内存足够可以继续增加,如果内存有限则要缩小, 如果内存小于8G可能存在报错:

修改默认内存,文件位置:bin/runbroker.sh,如下图:

用vi打开脚本,找到配置内存参数如下图:

配置项:JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g",根据实际内存业务情况,变更-Xms8g -Xmx8g参数大小即可。

二 JAVA客户端连接

1 mvn 项目引入依赖如下:

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-client</artifactId>
  4. <version>4.3.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.rocketmq</groupId>
  8. <artifactId>rocketmq-acl</artifactId>
  9. <version>4.5.2</version>
  10. </dependency>

2 生产发送端java代码:

java生产端发送消息代码:

  1. import org.apache.logging.log4j.LogManager;
  2. import org.apache.logging.log4j.Logger;
  3. import org.apache.rocketmq.client.exception.MQClientException;
  4. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  5. import org.apache.rocketmq.client.producer.SendCallback;
  6. import org.apache.rocketmq.client.producer.SendResult;
  7. import org.apache.rocketmq.common.message.Message;
  8. import org.apache.rocketmq.remoting.common.RemotingHelper;
  9. import org.apache.rocketmq.remoting.exception.RemotingException;
  10. import org.springframework.stereotype.Service;
  11. import javax.annotation.PostConstruct;
  12. import javax.annotation.PreDestroy;
  13. import java.io.UnsupportedEncodingException;
  14. import java.nio.charset.StandardCharsets;
  15. @Service
  16. public class MQRocketServiceImpl {
  17. private DefaultMQProducer producer;
  18. private static final Logger logger = LogManager.getLogger(MQRocketServiceImpl.class);
  19. @PostConstruct
  20. public void initProducer() throws MQClientException {
  21. producer = new DefaultMQProducer("CONSUMER_GROUP");
  22. producer.setNamesrvAddr(xx.xx.xx.xx:9876);
  23. producer.setInstanceName(RunTimeUtil.getRocketMqUniqeInstanceName());
  24. producer.start();
  25. }
  26. @PreDestroy
  27. public void shutdownProducer() {
  28. if (producer != null) {
  29. producer.shutdown();
  30. }
  31. }
  32. public boolean sendMsg(String text, String key) {
  33. try {
  34. Message msg = new Message(
  35. MqCfg.TOPIC,
  36. MqCfg.SUB_EXPRESSION,
  37. key,
  38. text.getBytes(StandardCharsets.UTF_8) // 使用标准字符集
  39. );
  40. producer.send(msg, new SendCallback() {
  41. @Override
  42. public void onSuccess(SendResult sendResult) {
  43. logger.info("Message sent successfully: {}", sendResult);
  44. }
  45. @Override
  46. public void onException(Throwable e) {
  47. logger.error("Failed to send message", e);
  48. }
  49. });
  50. return true;
  51. } catch (UnsupportedEncodingException | MQClientException | RemotingException | InterruptedException e) {
  52. logger.error("Error sending message", e);
  53. return false;
  54. }
  55. }
  56. }

执行后发送成功打印结果:

3 服务端接收代码:

java服务端接收代码

  1. import org.apache.logging.log4j.LogManager;
  2. import org.apache.logging.log4j.Logger;
  3. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  4. import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
  5. import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
  6. import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
  7. import org.apache.rocketmq.client.exception.MQClientException;
  8. import org.apache.rocketmq.common.message.MessageExt;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.boot.ApplicationArguments;
  11. import org.springframework.boot.ApplicationRunner;
  12. import org.springframework.stereotype.Service;
  13. import java.util.List;
  14. @Service
  15. public class MsgReceiveServiceImpl implements ApplicationRunner {
  16. @Autowired
  17. private PackageHandlerImpl packageHandler;
  18. private static final Logger logger = LogManager.getLogger(MsgReceiveServiceImpl.class);
  19. @Override
  20. public void run(ApplicationArguments args) {
  21. receiveQueue();
  22. }
  23. private void receiveQueue() {
  24. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(MqCfg.CONSUMER_GROUP);
  25. consumer.setNamesrvAddr("xx.xx.xx.xx:9076");
  26. try {
  27. consumer.subscribe(MqCfg.TOPIC, MqCfg.SUB_EXPRESSION);
  28. consumer.registerMessageListener((MessageListenerOrderly) this::processMessages);
  29. consumer.start();
  30. logger.info("MQ消费者启动成功。");
  31. } catch (MQClientException e) {
  32. logger.error("MQ消费者启动失败!", e);
  33. throw new RuntimeException("连接MQ错误,启动失败!", e);
  34. }
  35. }
  36. private ConsumeOrderlyStatus processMessages(List<MessageExt> msgs, ConsumeOrderlyContext context) {
  37. for (MessageExt msg : msgs) {
  38. try {
  39. String text = new String(msg.getBody());
  40. //消息处理……
  41. } catch (Exception e) {
  42. logger.error("处理消息出错,key={},错误信息:", msg.getKeys(), e);
  43. // 可以在此处根据业务需求返回 SUSPEND_CURRENT_QUEUE_A_MOMENT,或者选择其他处理方式
  44. }
  45. }
  46. return ConsumeOrderlyStatus.SUCCESS;
  47. }
  48. }

以上代码实现ApplicationRunner接口,使其作为后台一个线程单独处理消息。

三 RocketMQ 运维

1 RocketMQ面板,可视化管理

RocketMQ与apacheMQ不同,本身没有自带面板查看状态工具,查看队列的状态要依赖命令行这对运维或开发都非常不方便,需要另外安装Rocket MQ 面板工具rocketmq-dashboard:

开源地址:https://github.com/apache/rocketmq-dashboard

下载回来后直接用IDE开发环境运行,也可以打包后放在服务器一起运行,如下图:

开发环境运行视图如下

除了一般的统计信息,还可以进行管理,功能丰富,如下图:

有此神器作观察,相信运维不再什么难事。

2 RocketMQ变更默认端口

1 修改方式一(4.x ,以前的版本是可以的):

要更改 RocketMQ 的本地部署中的端口,您需要修改 RocketMQ 的配置文件。RocketMQ 的配置 主要包括

  1. broker.conf

  1. namesrv.conf

这两个文件。

  1. 找到配置文件:- 找到 RocketMQ 的安装目录下的 conf 文件夹。- 在 conf 文件夹中,你会看到 broker.confnamesrv.conf
  2. 修改 Broker 端口:- 打开 broker.conf 文件。- 查找 listen_port 这一行,这是 Broker 的监听端口,默认通常是 10911。- 更改 listen_port 的值为所需的端口号。
  3. 修改 NameServer 端口:- 打开 namesrv.conf 文件。- 查找 NAMESRV_PORT 这一行,这是 NameServer 的监听端口,默认通常是 9876。- 更改 NAMESRV_PORT 的值为所需的端口号。
2 端口修改方式二(当前最新的版本5.x)

在使用上面的方式修改端口后发现失效,只能查看源代码:

看NameSrv模块源码发现服务固定是9876,后面通加载参数c 判断配置文件路径加载,如下图:

跟进MixAll.properties2Object,发现只是根据类的参数与类型匹配加载,如下图:

直接看配置参数类:

这次我们要改端口,所以新建个配置文件,只需增加端口配置即可:

文件内容就一个字段: listenPort = xxxx 端口号,如下图:

启动测试,修改成功如下图:

  1. # 启动名称服务 xxx.conf是配置文件路径,可以使用相对路径
  2. nohup sh bin/mqnamesrv -c xxx.conf &

查看了 broker 模块源码发现启动也是一样,所以端口修改方式也是同上 ,修改后启动成功如下图:

3 启停脚本

1 启动RocketMQ脚本:

启动脚本,只需执行脚本就可以快速启动MQ,以下是启动脚本代码如下:

  1. #!/bin/bash
  2. # 启动 Nameserver
  3. echo 'Starting MQ NameServer...'
  4. nohup sh bin/mqnamesrv > mq.log 2>&1 &
  5. sleep 5
  6. # 检查 Nameserver 是否启动成功
  7. if ps aux | grep -v grep | grep -q 'mqnamesrv'; then
  8. echo 'MQ NameServer started successfully.'
  9. else
  10. echo 'Failed to start MQ NameServer.'
  11. exit 1
  12. fi
  13. # 启动 Broker
  14. echo 'Starting MQ Broker...'
  15. nohup sh bin/mqbroker -n 0.0.0.0:9876 >> mq.log 2>&1 &
  16. sleep 5
  17. # 检查 Broker 是否启动成功
  18. if ps aux | grep -v grep | grep -q 'mqbroker'; then
  19. echo 'MQ Broker started successfully.'
  20. else
  21. echo 'Failed to start MQ Broker.'
  22. exit 1
  23. fi
  24. # 显示日志
  25. tail -f mq.log

执行后同时打印日记,退出只需按ctrl+c即可。执行启动脚本成功如下图:

2 停止脚本:

停止RocketMQ脚本:

  1. #!/bin/bash
  2. # 关闭 Nameserver
  3. echo 'Closing MQ NameServer...'
  4. sh bin/mqshutdown namesrv
  5. # 检查 Nameserver 是否成功关闭
  6. sleep 5
  7. if ! ps aux | grep -v grep | grep -q 'mqnamesrv'; then
  8. echo 'MQ NameServer closed successfully.'
  9. else
  10. echo 'Failed to close MQ NameServer.'
  11. exit 1
  12. fi
  13. # 关闭 Broker
  14. echo 'Closing MQ Broker...'
  15. sh bin/mqshutdown broker
  16. # 检查 Broker 是否成功关闭
  17. sleep 20
  18. if ! ps aux | grep -v grep | grep -q 'mqbroker'; then
  19. echo 'MQ Broker closed successfully.'
  20. else
  21. echo 'Failed to close MQ Broker.'
  22. exit 1
  23. fi

执行停止脚本成功停止,如下图:

注意:停止broker服务花的时间通常比较长,如果显示停止失败可以多次调用或者在脚本延长等待时间即可。

标签: rocketmq

本文转载自: https://blog.csdn.net/qyhua/article/details/141280389
版权归原作者 qyhua 所有, 如有侵权,请联系我们删除。

“Linux 下 RocketMQ 安装、配置与运维(详细讲解)”的评论:

还没有评论