0


Linux 下 RocketMQ 安装、配置与运维(详细讲解)

一 RocketMQ 下载安装

1 下载 RocketMQ

** 下载当前最新版本RocketMQ**

官网下载: https://dist.apache.org/repos/dist/release/rocketmq/5.3.0/rocketmq-all-5.3.0-bin-release.zip

wget https://dist.apache.org/repos/dist/release/rocketmq/5.3.0/rocketmq-al                                                                                                            l-5.3.0-bin-release.zip

执行下载图:

下载成功图:

2 安装RocketMQ

安装过程非常简单,解压RocketMQ压缩包即可

unzip rocketmq-all-5.3.0-bin-release.zip
解压过程中: 

3 验证安装

1 启动NameServer

以后台启动NameServer服务:

nohup sh bin/mqnamesrv &

执行后看到创建了个后台进程,但此时并无法看到日记

打开日记查看执行效果:

tail -f nohup.out

2 启动Broker

启动Broker 可以加上--enable-proxy 方式启动代理,也可正常启动不使用代理,如下:

# 开启代理方式启动
nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &

# 默认不使用代理方式启动
nohup sh bin/mqbroker -n localhost:9876

启动成功如下图:

启动成功, 看一下brokerIP xx.xx.xx.xx10911 如果是内网IP外网是无法访问的,需要配置外网IP,云服务器如果使用默认配置一般是内网IP

** **
配置broker外网IP

** rocketMQ主目录\conf\broker.conf**

brokerIP1=外网IP地址

增加后如下图:

3 测试连通

使用自带工具验证本地环境:

生产端发送测试

#先设置工具依赖变量
export NAMESRV_ADDR=localhost:9876

#测试生产端发送
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

执行如下:

从上图中可以看到send_ok,说明生产端已正常发送信息到队列。

测试接收端:

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

执行后则到队列的信息如下图:

按ctrl+c结束接收测试。

从上面看,本地生产发送与接收数据正常,基本可以判断本地安装正常。

注意事项

broket 启动时默认启动脚本内存参数是使用8G内存。如果您的内存足够可以继续增加,如果内存有限则要缩小, 如果内存小于8G可能存在报错:

修改默认内存,文件位置:bin/runbroker.sh,如下图:

用vi打开脚本,找到配置内存参数如下图:

配置项:JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g",根据实际内存业务情况,变更-Xms8g -Xmx8g参数大小即可。

二 JAVA客户端连接

1 mvn 项目引入依赖如下:

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-acl</artifactId>
            <version>4.5.2</version>
        </dependency>

2 生产发送端java代码:

java生产端发送消息代码:


import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;

@Service
public class MQRocketServiceImpl {

    private DefaultMQProducer producer;

    private static final Logger logger = LogManager.getLogger(MQRocketServiceImpl.class);

    @PostConstruct
    public void initProducer() throws MQClientException {
        producer = new DefaultMQProducer("CONSUMER_GROUP");
        producer.setNamesrvAddr(xx.xx.xx.xx:9876);
        producer.setInstanceName(RunTimeUtil.getRocketMqUniqeInstanceName());
        producer.start();
    }

    @PreDestroy
    public void shutdownProducer() {
        if (producer != null) {
            producer.shutdown();
        }
    }

    public boolean sendMsg(String text, String key) {
        try {
            Message msg = new Message(
                    MqCfg.TOPIC,
                    MqCfg.SUB_EXPRESSION,
                    key,
                    text.getBytes(StandardCharsets.UTF_8) // 使用标准字符集
            );
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    logger.info("Message sent successfully: {}", sendResult);
                }

                @Override
                public void onException(Throwable e) {
                    logger.error("Failed to send message", e);
                }
            });
            return true;
        } catch (UnsupportedEncodingException | MQClientException | RemotingException | InterruptedException e) {
            logger.error("Error sending message", e);
            return false;
        }
    }
}

执行后发送成功打印结果:

3 服务端接收代码:

java服务端接收代码


import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
public class MsgReceiveServiceImpl implements ApplicationRunner {

    @Autowired
    private PackageHandlerImpl packageHandler;

    private static final Logger logger = LogManager.getLogger(MsgReceiveServiceImpl.class);

    @Override
    public void run(ApplicationArguments args) {
        receiveQueue();
    }

    private void receiveQueue() {
       

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(MqCfg.CONSUMER_GROUP);
        consumer.setNamesrvAddr("xx.xx.xx.xx:9076");

        try {
            consumer.subscribe(MqCfg.TOPIC, MqCfg.SUB_EXPRESSION);
            consumer.registerMessageListener((MessageListenerOrderly) this::processMessages);
            consumer.start();
            logger.info("MQ消费者启动成功。");
        } catch (MQClientException e) {
            logger.error("MQ消费者启动失败!", e);
            throw new RuntimeException("连接MQ错误,启动失败!", e);
        }
    }

    private ConsumeOrderlyStatus processMessages(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        for (MessageExt msg : msgs) {
            try {
                String text = new String(msg.getBody());
                //消息处理……
            } catch (Exception e) {
                logger.error("处理消息出错,key={},错误信息:", msg.getKeys(), e);
                // 可以在此处根据业务需求返回 SUSPEND_CURRENT_QUEUE_A_MOMENT,或者选择其他处理方式
            }
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
}

以上代码实现ApplicationRunner接口,使其作为后台一个线程单独处理消息。

三 RocketMQ 运维

1 RocketMQ面板,可视化管理

RocketMQ与apacheMQ不同,本身没有自带面板查看状态工具,查看队列的状态要依赖命令行这对运维或开发都非常不方便,需要另外安装Rocket MQ 面板工具rocketmq-dashboard:

开源地址:https://github.com/apache/rocketmq-dashboard

下载回来后直接用IDE开发环境运行,也可以打包后放在服务器一起运行,如下图:

开发环境运行视图如下

除了一般的统计信息,还可以进行管理,功能丰富,如下图:

有此神器作观察,相信运维不再什么难事。

2 RocketMQ变更默认端口

1 修改方式一(4.x ,以前的版本是可以的):

要更改 RocketMQ 的本地部署中的端口,您需要修改 RocketMQ 的配置文件。RocketMQ 的配置 主要包括

broker.conf

namesrv.conf

这两个文件。

  1. 找到配置文件:- 找到 RocketMQ 的安装目录下的 conf 文件夹。- 在 conf 文件夹中,你会看到 broker.confnamesrv.conf
  2. 修改 Broker 端口:- 打开 broker.conf 文件。- 查找 listen_port 这一行,这是 Broker 的监听端口,默认通常是 10911。- 更改 listen_port 的值为所需的端口号。
  3. 修改 NameServer 端口:- 打开 namesrv.conf 文件。- 查找 NAMESRV_PORT 这一行,这是 NameServer 的监听端口,默认通常是 9876。- 更改 NAMESRV_PORT 的值为所需的端口号。
2 端口修改方式二(当前最新的版本5.x)

在使用上面的方式修改端口后发现失效,只能查看源代码:

看NameSrv模块源码发现服务固定是9876,后面通加载参数c 判断配置文件路径加载,如下图:

跟进MixAll.properties2Object,发现只是根据类的参数与类型匹配加载,如下图:

直接看配置参数类:

这次我们要改端口,所以新建个配置文件,只需增加端口配置即可:

文件内容就一个字段: listenPort = xxxx 端口号,如下图:

启动测试,修改成功如下图:

# 启动名称服务  xxx.conf是配置文件路径,可以使用相对路径
nohup sh bin/mqnamesrv -c xxx.conf &

查看了 broker 模块源码发现启动也是一样,所以端口修改方式也是同上 ,修改后启动成功如下图:

3 启停脚本

1 启动RocketMQ脚本:

启动脚本,只需执行脚本就可以快速启动MQ,以下是启动脚本代码如下:

#!/bin/bash

# 启动 Nameserver
echo 'Starting MQ NameServer...'
nohup sh bin/mqnamesrv > mq.log 2>&1 &
sleep 5

# 检查 Nameserver 是否启动成功
if ps aux | grep -v grep | grep -q 'mqnamesrv'; then
    echo 'MQ NameServer started successfully.'
else
    echo 'Failed to start MQ NameServer.'
    exit 1
fi

# 启动 Broker
echo 'Starting MQ Broker...'
nohup sh bin/mqbroker -n 0.0.0.0:9876 >> mq.log 2>&1 &
sleep 5

# 检查 Broker 是否启动成功
if ps aux | grep -v grep | grep -q 'mqbroker'; then
    echo 'MQ Broker started successfully.'
else
    echo 'Failed to start MQ Broker.'
    exit 1
fi

# 显示日志
tail -f mq.log

执行后同时打印日记,退出只需按ctrl+c即可。执行启动脚本成功如下图:

2 停止脚本:

停止RocketMQ脚本:

#!/bin/bash

# 关闭 Nameserver
echo 'Closing MQ NameServer...'
sh bin/mqshutdown namesrv

# 检查 Nameserver 是否成功关闭
sleep 5
if ! ps aux | grep -v grep | grep -q 'mqnamesrv'; then
    echo 'MQ NameServer closed successfully.'
else
    echo 'Failed to close MQ NameServer.'
    exit 1
fi

# 关闭 Broker
echo 'Closing MQ Broker...'
sh bin/mqshutdown broker

# 检查 Broker 是否成功关闭
sleep 20
if ! ps aux | grep -v grep | grep -q 'mqbroker'; then
    echo 'MQ Broker closed successfully.'
else
    echo 'Failed to close MQ Broker.'
    exit 1
fi

执行停止脚本成功停止,如下图:

注意:停止broker服务花的时间通常比较长,如果显示停止失败可以多次调用或者在脚本延长等待时间即可。

标签: rocketmq

本文转载自: https://blog.csdn.net/qyhua/article/details/141280389
版权归原作者 qyhua 所有, 如有侵权,请联系我们删除。

“Linux 下 RocketMQ 安装、配置与运维(详细讲解)”的评论:

还没有评论