RocketMQ介绍
RocketMQ 是一个 队列模型 的消息中间件,具有高性能、高可靠、高实时、分布式 的特点。它是一个采用 Java 语言开发的分布式的消息系统,由阿里巴巴团队开发,在2016年底贡献给 Apache,成为了 Apache 的一个顶级项目。 在阿里内部,RocketMQ 很好地服务了集团大大小小上千个应用,在每年的双十一当天,更有不可思议的万亿级消息通过 RocketMQ 流转。
RocketMQ 特点
- 是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式等特点
- Producer、Consumer、队列都可以分布式
- Producer 向一些队列轮流发送消息,队列集合称为 Topic,Consumer 如果做广播消费,则一个 Consumer 实例消费这个 Topic 对应的所有队列,如果做集群消费,则多个 Consumer 实例平均消费这个 Topic 对应的队列集合
- 能够保证严格的消息顺序
- 支持拉(pull)和推(push)两种消息模式
- 高效的订阅者水平扩展能力
- 实时的消息订阅机制
- 亿级消息堆积能力
- 支持多种消息协议,如 JMS、OpenMessaging 等
- 较少的依赖 #kafka 、RocketMQ 、RabbitMQ 对比
RocketMQ安装
RocketMQ下载: rocketmq-all-4.8.0-bin-release.zip
1.RocketMQ zip包传入linux服务器
[root@localhost ]# cd usr/local/[root@localhost local]# rz
2.解压缩
[root@localhost local]# unzip rocketmq-all-4.8.0-bin-release.zip
3.调整启动参数(修改默认启动参数,默认启动的最大内存为4G,比较大,修改小一点,否则如果服务器内存不够会启动失败)
[root@localhost local]# cd rocketmq-all-4.8.0-bin-release/bin
[root@localhost bin]# vim runserver.sh
- -Xms4g -Xmx4g -Xmn2g 改为 -Xms256m -Xmx256m -Xmn128m
4.调整broker
[root@localhost bin]# vim runbroker.sh
- -Xms8g -Xmx8g -Xmn4g 改为 -Xms256m -Xmx256m -Xmn128m
5.启动namesrv
[root@localhost bin]# nohup sh mqnamesrv &
6.启动broker,注意ip为公网ip,端口为navmesrv的默认端口9876
[root@localhost bin]# nohup ./mqbroker -n localhost:9876&
7.检查是否启动成功
[root@localhost bin]# jps -l
- 如果发现报错bash: jps: 未找到命令… 请更新以下命令
[root@localhost bin]# sudo yum install java-1.8.0-openjdk-devel.x86_64
- 输入命令 jps -l
- 关闭 RocketMQ 命令 (此处无需关闭,只用于了解)
./mqshutdown broker
./mqshutdown namesrv
#RocketMQ 控制台安装
1.克隆rocketmq项目
[root@localhost local]# cd /usr/local/[root@localhost local]# git clone https://github.com/apache/rocketmq-externals.git
- 进入\rocketmq-externals\rocketmq-console\src\main\resources\ 下修改 application.properties 配置文件
- 配置文件修改如下图
github提供了 Docker 和 非Docker 两种安装方法供其选择,这里使用非Docker方式进行安装
- 在 \rocketmq-externals\rocketmq-console\ 文件夹下打开控制台,输入以下命令进行maven打包
mvn clean package -Dmaven.test.skip=true
- 进入 \rocketmq-externals\rocketmq-console\target\ 文件夹下打开控制台,输入以下命令进行 jar包启动
java -jar rocketmq-console-ng-2.0.0.jar
- 打开浏览器访问 localhost:9877,如果报错
- 开放 10909 01911 9876 端口
firewall-cmd --zone=public --add-port=10909/tcp --permanent
firewall-cmd --zone=public --add-port=10911/tcp --permanent
firewall-cmd --zone=public --add-port=9876/tcp --permanent
systemctl restart firewalld.service
firewall-cmd --reload
- 验证RocketMQ功能是够正常
1.验证生产消息正常,输入命令
[root@localhost rocketmq-all-4.8.0-bin-release]# exportNAMESRV_ADDR=localhost:9876[root@localhost rocketmq-all-4.8.0-bin-release]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
正常的情况下,会看到一堆的类似于如下的输出,这是生产消息后成功的result:
SendResult [sendStatus=SEND_OK, msgId=7F000001372329453F44466341350068, offsetMsgId=C0A8017600002A9F000000000000674E, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=3], queueOffset=33]
2.验证消费消息正常,执行如下命令:
[root@localhost rocketmq-all-4.8.0-bin-release]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
正常的情况下,会看到一堆的类似于如下的输出,这是消费的消息内容:
ConsumeMessageThread_1 Receive New Messages:[MessageExt [brokerName=localhost.localdomain, queueId=0, storeSize=201, queueOffset=0, sysFlag=0, bornTimestamp=1618387294736, bornHost=/192.168.1.118:34722, storeTimestamp=1618387294743, storeHost=/192.168.1.118:10911, msgId=C0A8017600002A9F0000000000000192, commitLogOffset=402, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0,toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0,MAX_OFFSET=34,CONSUME_START_TIME=1618387666005,UNIQ_KEY=7F00000136FE29453F44466306100001,CLUSTER=DefaultCluster,WAIT=true,TAGS=TagA}, body=[72,101,108,108,111,32,82,111,99,107,101,116,77,81,32,50], transactionId='null'}]]
- 访问页面 http://localhost:9877 ,出现下图界面,安装成功
RocketMQ 集成 - 生产者
- gateway下pom.xml文件添加依赖
<!-- rocketmq --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.1</version></dependency>
- nacos 配置 RocketMQ
rocketmq:
name-server:192.168.190.129:9876producer:
# 小坑:必须指定group
group: test-group
- common 下创建实体类 MyMessage.class
package com.bi.cloud.pojo;import lombok.Data;import java.io.Serializable;import java.util.Date;
@Data
publicclassMyMessageimplementsSerializable{private Integer id;private String name;private String status;private Date createTime;}
- gateway下创建 TestProducerController.class
package com.bi.cloud.controller;import com.bi.cloud.pojo.MyMessage;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;import java.util.Date;/**
* 生产者
**/
@RestController
@RequestMapping("/api/testRocketMQ")publicclassTestProducerController{/**
* 用于发送消息到 RocketMQ 的api
*/
@Resource
public RocketMQTemplate rocketMQTemplate;
@GetMapping("/sendMsg")public String testSendMsg(){
String topic ="test-topic";
MyMessage message =newMyMessage();
message.setId(1);
message.setName("王霄");
message.setStatus("default");
message.setCreateTime(newDate());// 发送消息
rocketMQTemplate.convertAndSend(topic, message);return"send message success";}}
- Postman 调用接口
- 如果报错 请关闭linux防火墙
systemctl stop firewalld
- 消息发送成功后,可以到RocketMQ的控制台中进行查看:
RocketMQ 集成 - 消费者
- engine下pom.xml文件添加依赖
<!-- rocketmq --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.1</version></dependency>
- nacos 配置 RocketMQ
rocketmq:
name-server:192.168.190.129:9876producer:
# 小坑:必须指定group
group: test-group
- engine 下创建消费者监听器 TestConsumerListener.class
package com.bi.cloud.service.Impl;import com.alibaba.fastjson.JSON;import com.bi.cloud.pojo.MyMessage;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;/**
* 消费者监听器
**/
@Slf4j
@Component
// topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意
@RocketMQMessageListener(topic ="test-topic", consumerGroup ="consumer-group")publicclassTestConsumerListenerimplementsRocketMQListener<MyMessage>{/**
* 监听到消息的时候就会调用该方法
*/
@Override
publicvoidonMessage(MyMessage message){
log.info("从test-topic中监听到消息");
log.info(JSON.toJSONString(message));}}
- 编写完成后启动项目,由于之前我们已经往队列里发送了消息,所以此时消费者项目一启动,就可以监听到消息并消费,控制台就会输出如下日志:
前往:第八章 Oauth2.0 安全认证子模块集成
参考文献:
https://github.com/apache/rocketmq-externals.git
https://blog.csdn.net/qq_40280582/article/details/111785355
https://zhuhuix.blog.csdn.net/article/details/108866638
https://blog.51cto.com/zero01/2426303
版权归原作者 王疏蔬 所有, 如有侵权,请联系我们删除。