0


JAVA操作RabbitMQ

RabbitMQ安装与JAVA使用

一、 linux环境安装

RabbitMQ介绍:使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

rabbitMQ官网: https://www.rabbitmq.com/download.html
在这里插入图片描述
安装步骤如下:
1. 安装erlang环境, rpm -ivh erlang-22.3.4.12-1.el7.x86_64.rpm
2. 安装rabbitMQ(需要有外网),yum install -y rabbitmq-server-3.7.18-1.el7.noarch.rpm
3. 拷贝配置文件,cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
4.、
修改rabbitmq.config文件
,开启允许来宾用户登录访问页面(用户密码guest/guest)
5启动rabbitMQ插件管理,rabbitmq-plugins enable rabbitmq_management
6启动RabbitMQ服务(关闭、重启、查看状态命令)
systemctl start rabbitmq-server
systemctl stop rabbitmq-server
systemctl restart rabbitmq-server
systemctl status rabbitmq-server
7关闭防火墙
[root@xupeng ~]# systemctl disable firewalld
[root@xupeng ~]# systemctl stop firewalld
8访问页面,端口是15672
在这里插入图片描述
9、页面介绍
在这里插入图片描述
10、错误排查,因为我之前安装过一遍,所以没有问题,初次安装可能会出现如下两个错误
10.1. 提示socat依赖被需要,则执行yum -y install socat
10.2. rabbitMQ启动不起来,可以尝试再次调用重启命令,还是起不来 journalctl -xe 命令查看错误日志。我的日志里有这么一条错误ERROR: epmd error for host 192: badarg (unknown POSIX error) ,大概意思就是主机名称不能有数字,调整命令如下

# 查看下自己主机名称[root@xupeng ~]# hostname
xupeng
# 修改下主机名称[root@xupeng ~]# set-hostname xupeng

.
11、下面是实际操作,

[root@xupeng test]# ll
总用量 32200
drwxr-xr-x. 2 root root      2229月   412:09 bin
-rw-r--r--. 1 root root 199917289月   8 09:39 erlang-22.3.4.12-1.el7.x86_64.rpm
-rw-r--r--. 1 root root 104944319月   8 09:39 rabbitmq-server-3.7.18-1.el7.noarch.rpm
drwxrwxr-x. 7 root root     409610月  42021 redis-6.2.6
-rw-r--r--. 1 root root  24765424月  1222:58 redis-6.2.6.tar.gz

[root@xupeng test]# rpm -ivh erlang-22.3.4.12-1.el7.x86_64.rpm 
警告:erlang-22.3.4.12-1.el7.x86_64.rpm: 头V4 RSA/SHA1 Signature, 密钥 ID 6026dfca: NOKEY
准备中...                          ################################# [100%]
正在升级/安装...
   1:erlang-22.3.4.12-1.el7           ################################# [100%][root@xupeng test]# yum list | grep erlang
erlang.x86_64                             22.3.4.12-1.el7              installed
[root@xupeng test]# yum -y remove erlang-*
已加载插件:fastestmirror, langpacks
参数 erlang-22.3.4.12-1.el7.x86_64.rpm 没有匹配
不删除任何软件包

[root@xupeng test]# yum install -y rabbitmq-server-3.7.18-1.el7.noarch.rpm
已加载插件:fastestmirror, langpacks
正在检查 rabbitmq-server-3.7.18-1.el7.noarch.rpm: rabbitmq-server-3.7.18-1.el7.noarch
rabbitmq-server-3.7.18-1.el7.noarch.rpm 将被安装
正在解决依赖关系
There are unfinished transactions remaining. You might consider running yum-complete-transaction, or "yum-complete-transaction --cleanup-only" and "yum history redo last", first to finish them. If those don't work you'll have to try removing/installing packages by hand (maybe package-cleanup can help).
--> 正在检查事务
---> 软件包 rabbitmq-server.noarch.0.3.7.18-1.el7 将被 安装
--> 解决依赖关系完成

依赖关系解决

===============================================================================================================================================================================================================================
 Package                                            架构                                      版本                                               源                                                                       大小
===============================================================================================================================================================================================================================
正在安装:
 rabbitmq-server                                    noarch                                    3.7.18-1.el7                                       /rabbitmq-server-3.7.18-1.el7.noarch                                     11 M

事务概要
===============================================================================================================================================================================================================================
安装  1 软件包

总计:11 M
安装大小:11 M
Downloading packages:
Running transaction check
Running transaction test
Transaction test succeeded
Running transaction
警告:RPM 数据库已被非 yum 程序修改。
  正在安装    : rabbitmq-server-3.7.18-1.el7.noarch                                                                                                                                                                        1/1 
  验证中      : rabbitmq-server-3.7.18-1.el7.noarch                                                                                                                                                                        1/1 

已安装:
  rabbitmq-server.noarch 0:3.7.18-1.el7                                                                                                                                                                                        
完毕!

[root@xupeng test]# ll /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq*
-rw-r--r--. 1 root root 332359月  162019 /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example
[root@xupeng test]# cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config

修改配置文件
在这里插入图片描述
继续回到linux

[root@xupeng rabbitmq]# rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@xupeng:
rabbitmq_management
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@xupeng...
The following plugins have been enabled:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
set3 plugins.
Offline change; changes will take effect at broker restart.

[root@xupeng ~]# systemctl disable firewalld[root@xupeng ~]# systemctl stop firewalld[root@xupeng ~]# systemctl start rabbitmq-server

二、模型演示

最左侧圆点是生产者,最右侧圆点是消费者,红色的是队列,深蓝的圆点是路由
大致使用就是生产者的消息可以通过路由或直接向队列里添加,消费者从队列里获取
在这里插入图片描述

<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.2</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version></dependency><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.2</version></dependency></dependencies>

2.1.HelloWord模型(直连)

消费者直接放消息到队列,生产者取走。只允许一个消费者对一生产者。
首先在管理页面创建虚拟主机,配置一个用户访问虚拟主机
图一↓
在这里插入图片描述
图二↓在这里插入图片描述
图三↓
在这里插入图片描述
代码测试↓
1.启动Customer类main方法
2.启动Provider类main方法
3.管理页面可以看到当前队列信息
在这里插入图片描述

importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importjava.io.IOException;publicclassProvider{publicstaticvoidmain(String[] args)throwsIOException{Connection connection =RabbitMQUtils.getConnection();// 从连接中获取通道Channel channel = connection.createChannel();// 参数一:让通道与消息队列绑定,这样才知道发送给哪个消息队列,队列名称(没有自己创建)// 参数二:队列是否要持久化,true开启。// 参数三:是否独占队列,true独占// 参数四:是否消费完成后自动删除队列,true自动删除,// 参数五:额外参数
        channel.queueDeclare("hello",false,false,false,null);// 发布消息// 参数1:交换机名称  参数2:队列名称  参数3:额外参数  参数4:消息
        channel.basicPublish("","hello",null,"我要向队列中传递消息了".getBytes());RabbitMQUtils.closeChannlAndConnection(channel, connection);}}importcom.rabbitmq.client.*;importjava.io.IOException;publicclassCustomer{publicstaticvoidmain(String[] args)throwsException{Connection connection =RabbitMQUtils.getConnection();Channel channel = connection.createChannel();
        channel.queueDeclare("hello",false,false,false,null);// 消费消息// 参数1:消费的消息队列名称  参数2:开启消息的自动确认机制 参数3:回调函数
        channel.basicConsume("hello",true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body是:"+newString(body));}});// 因为basicConsume函数属于异步消费,正常需要开启线程去监听消息队列,如果关闭会导致主线程退出// 消费者就因该一直阻塞,等待消费// channel.close();// connection.close();}}/**
* 获取连接工具类,防止导错类,类路径也给出来了
*/importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;publicclassRabbitMQUtils{// 创建链接MQ的工厂privatestaticConnectionFactory connectionFactory;static{
        connectionFactory =newConnectionFactory();// 设置地址
        connectionFactory.setHost("192.168.10.132");// 端口不是web控制页面的那个端口
        connectionFactory.setPort(5672);// 虚拟主机
        connectionFactory.setVirtualHost("ems");// 用户和密码
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("ems");}publicstaticConnectiongetConnection(){try{// 获取链接对象Connection connection = connectionFactory.newConnection();return connection;}catch(Exception e){
            e.printStackTrace();}returnnull;}publicstaticvoidcloseChannlAndConnection(Channel channel,Connection connection){try{
            channel.close();}catch(Exception e){
            e.printStackTrace();}try{
            connection.close();}catch(Exception e){
            e.printStackTrace();}}}

细节补充
queueDeclare方法的第二个参数是否持久化队列,设置为true的话,在重启MQ时会保存在磁盘,下次启动时恢复队列。开启持久化的话,标志是个D,但是消息队列里的消息不会持久化,依然丢失。
持久化消息通过channel.basicPublish方法配置MessageProperties.PERSISTENT_TEXT_PLAIN
//channel.basicPublish(“”,“hello”, MessageProperties.PERSISTENT_TEXT_PLAIN,“你好呀11”.getBytes());
在这里插入图片描述

2.2.WorkQueue(任务模型)

直连模型如果消费速度比生产速度慢,可能会导致任务队列的任务越来越多,并且得不到消费。任务模型可以让多个消费者绑定到一个队列。共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

2.2.1 平均分配 - 假设有两个消费者,十个任务,每个线程会各分配5个,并不会因为某一线程处理速度快而多消费
启动下图的Customer1、Customer2,然后启动Provider向队列中添加消息查看效果。
因为开启了自动确认机制,如果消息消费过程中出现异常,该消息并不会放回至队列中,导致消息消失。自动确认机制是预先从队列获取好任务并删除队列中的任务,实际上此时还并为消费。

publicclassProvider{publicstaticvoidmain(String[] args)throwsIOException{Connection connection =RabbitMQUtils.getConnection();Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);for(int i =0; i <10; i++){
            channel.basicPublish("","work",null,("你好呀"+ i).getBytes());}RabbitMQUtils.closeChannlAndConnection(channel, connection);}}publicclassCustomer1{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{Connection connection =RabbitMQUtils.getConnection();Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        channel.basicConsume("work",true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者1:"+newString(body));}});}}publicclassCustomer2{// 与消费者1代码一至}

2.2.2 能者多劳 - 处理速度快的线程会多消费任务
修改消费者代码,关闭消息自动确认机制,由于消费者2方法必然会报错导致消息消费失败,会将消息重新放回队列。如果不调用channel.basicAck或channel.basicNack,方法线程无法判断消息是否处理完成导致线程一直阻塞。从而无法重新去消息队列获取数据。

publicclassCustomer1{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{Connection connection =RabbitMQUtils.getConnection();Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);// 表示一次只消费一个消息
        channel.basicQos(1);// 关闭任务自动确认机制
        channel.basicConsume("work",false,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者1:"+newString(body));try{Thread.sleep(10);// 此处需要手动确认任务完成,否则不会接取下一个任务,队列任务中会出现未确认的数量
                    channel.basicAck(envelope.getDeliveryTag(),false);}catch(InterruptedException e){
                    e.printStackTrace();}}});}}publicclassCustomer2{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{Connection connection =RabbitMQUtils.getConnection();Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);// 表示一次只消费一个消息
        channel.basicQos(1);// 关闭自动确认机制
        channel.basicConsume("work",false,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{try{System.out.println("消费者2:"+newString(body));Thread.sleep(40);int x =10/0;// 此处需要手动确认任务完成,否则不会接取下一个任务,队列任务中会出现未确认的数量
                    channel.basicAck(envelope.getDeliveryTag(),false);}catch(Exception e){
                    e.printStackTrace();
                    channel.basicNack(envelope.getDeliveryTag(),false,true);}}});}}

2.3.fanout模型(广播)

Publish/Subscribe:广播模型,可以有多个消费者,并且每个消费者有自己的队列,生产者将消息发送给交换机,交换机来决定放入哪个队列当中
下图代码:消费者启动时创建了两个临时队列,生产者将信息传入到logs交换机中,交换机将消息放入到了两个队列中,所以消费者1与消费者2均能消费同样的信息

publicclassProvider{publicstaticvoidmain(String[] args)throwsIOException{Connection connection =RabbitMQUtils.getConnection();Channel channel = connection.createChannel();// 指定交换机,交换机名称,交换机类型, fanout = 广播类型
        channel.exchangeDeclare("logs","fanout");// 交换机指向logs,第二个参数是路由key参数,此时还没有意义,可以为空
        channel.basicPublish("logs","work",null,"你好呀".getBytes());RabbitMQUtils.closeChannlAndConnection(channel, connection);}}publicclassCustomer1{publicstaticvoidmain(String[] args)throwsException{Connection connection =RabbitMQUtils.getConnection();Channel channel = connection.createChannel();// 绑定交换机
        channel.exchangeDeclare("logs","fanout");// 获取临时队列,并绑定临时队列String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName,"logs","");// 消费消息
        channel.basicConsume(queueName,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者1:"+newString(body));}});}}publicclassCustomer2{//....一样的代码....}

在这里插入图片描述
在这里插入图片描述

2.4.Direct模型(路由直连)

广播模式增强版,队列与交换机的绑定时,要指定一个RoutingKey (路由key),Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与队列的Routing key完全一致,才会接收到消息。

publicclassProvider{publicstaticvoidmain(String[] args)throwsIOException{Connection connection =RabbitMQUtils.getConnection();Channel channel = connection.createChannel();// 指定交换机,交换机类型
        channel.exchangeDeclare("logs2","direct");// 交换机指向logs,指定路由参数
        channel.basicPublish("logs2","info",null,"我是info信息".getBytes());
        channel.basicPublish("logs2","error",null,"我是error信息".getBytes());RabbitMQUtils.closeChannlAndConnection(channel, connection);}}publicclassCustomer1{publicstaticvoidmain(String[] args)throwsException{Connection connection =RabbitMQUtils.getConnection();Channel channel = connection.createChannel();// 绑定交换机,指定模式为路由直连
        channel.exchangeDeclare("logs2","direct");// 获取临时队列,并绑定临时队列, routingKeyString queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName,"logs2","error");// 消费消息
        channel.basicConsume(queueName,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者1:"+newString(body));}});}}publicclassCustomer2{publicstaticvoidmain(String[] args)throwsException{Connection connection =RabbitMQUtils.getConnection();Channel channel = connection.createChannel();// 绑定交换机,指定模式为路由直连
        channel.exchangeDeclare("logs2","direct");// 获取临时队列,并绑定临时队列, routingKeyString queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName,"logs2","info");// 消费消息
        channel.basicConsume(queueName,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者1:"+newString(body));}});}}

2.5.Topic模型(订阅/动态路由模型)

路由模式的路由key是一个固定值,而Topic模型的路由值可以用通配符
通配符*:一个或多个词语,比如log.*,可以匹配log.a.b,log.aaaa.bbb
通配符#:只能匹配一个词,比如log.#能陪陪log.a,log.b
PS: 也可以俩通配符一起用,比如: *.log.#

publicclassProvider{publicstaticvoidmain(String[] args)throwsIOException{Connection connection =RabbitMQUtils.getConnection();Channel channel = connection.createChannel();// 指定交换机,交换机名称,交换机类型, topic= 动态路由模式
        channel.exchangeDeclare("topics","topic");// 发送消息
        channel.basicPublish("topics","user.save",null,("我是AAAAA").getBytes());RabbitMQUtils.closeChannlAndConnection(channel, connection);}}publicclassCustomer1{publicstaticvoidmain(String[] args)throwsException{Connection connection =RabbitMQUtils.getConnection();Channel channel = connection.createChannel();// 绑定交换机
        channel.exchangeDeclare("topics","topic");// 获取临时队列,并绑定临时队列, routingKeyString queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName,"topics","user.#");
        channel.queueBind(queueName,"topics","*.user.#");// 消费消息
        channel.basicConsume(queueName,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者1:"+newString(body));}});}}

三、springboot整合

依赖与配置文件

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
server.port=80
spring.application.name=rabbitMQboot
spring.rabbitmq.host=192.168.14.130
spring.rabbitmq.port=5672
spring.rabbitmq.username=ems
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/ems

3.1 直连模型

/**
*    用于向消息队列添加数据
*/@Controller@RequestMapping("/test")publicclassTestController{// 注入RabbitTemplate@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/hello")publicvoidhelloWord(){
        rabbitTemplate.convertAndSend("hello","我是消息");}}// 表示监听的队列,declare 表示是否持久化队列,是否自动删除// 默认持久化 非独占 自动删除@Component@RabbitListener(queuesToDeclare =@Queue(name ="hello",declare ="false",autoDelete ="true"))publicclassCustomerController{// 表示这个是处理消息的回调函数@RabbitHandlerpublicvoidreceivel(String msg){System.out.println("收到消息:"+ msg);}}

3.2 任务模型

@Controller@RequestMapping("/test")publicclassTestController{// 注入RabbitTemplate@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/work")publicvoidwork(){for(int i =0; i <10; i++){
            rabbitTemplate.convertAndSend("work","work模型"+ i);}}}@ComponentpublicclassWorkController{@RabbitListener(queuesToDeclare =@Queue("work"))publicvoidreceivel1(String msg){System.out.println("消费者1"+ msg);}@RabbitListener(queuesToDeclare =@Queue("work"))publicvoidreceivel2(String msg){System.out.println("消费者2"+ msg);}}

3.3 广播模型

@Controller@RequestMapping("/test")publicclassTestController{@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/fanout")publicvoidfanout(){// 路由key不写
        rabbitTemplate.convertAndSend("logs","","广播数据");}}@ComponentpublicclassFanoutController{@RabbitListener(bindings =@QueueBinding(// 绑定一个临时队列,绑定交换机
            value =@Queue,exchange =@Exchange(value ="logs",type ="fanout")))publicvoidreceivel1(String msg){System.out.println("消费者1"+ msg);}@RabbitListener(bindings =@QueueBinding(// 绑定一个临时队列,绑定交换机
            value =@Queue,exchange =@Exchange(value ="logs",type ="fanout")))publicvoidreceivel2(String msg){System.out.println("消费者2"+ msg);}}

3.4 路由模型

@Controller@RequestMapping("/test")publicclassTestController{// 注入RabbitTemplate@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/route")publicvoidroute(){
        rabbitTemplate.convertAndSend("directs","info","发送info级别的路由信息");}}@ComponentpublicclassRouteController{@RabbitListener(bindings =@QueueBinding(
            value =@Queue,//临时队列
            exchange =@Exchange(value ="directs",type ="direct"),
            key ={"info","error"}//路由key))publicvoidreceivel1(String msg){System.out.println("消费者1"+ msg);}@RabbitListener(bindings =@QueueBinding(
            value =@Queue,//临时队列
            exchange =@Exchange(value ="directs",type ="direct"),
            key ={"info"}//路由key))publicvoidreceive2(String msg){System.out.println("消费者2"+ msg);}}

3.4 、订阅(动态路由)

@Controller@RequestMapping("/test")publicclassTestController{// 注入RabbitTemplate@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/topic")publicvoidtopic(){
        rabbitTemplate.convertAndSend("topics","rizhi.info","动态路由");}}@ComponentpublicclassTopicController{@RabbitListener(bindings =@QueueBinding(
            value =@Queue,
            exchange =@Exchange(type ="topic",name ="topics"),
            key ={"user.#","rizhi.info"}))publicvoidrecevicel(String msg){System.out.println("消费者1"+ msg);}}

本文转载自: https://blog.csdn.net/weixin_44080194/article/details/126758201
版权归原作者 问君何为变化 所有, 如有侵权,请联系我们删除。

“JAVA操作RabbitMQ”的评论:

还没有评论