RabbitMQ安装与JAVA使用
一、 linux环境安装
RabbitMQ介绍:使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
rabbitMQ官网: https://www.rabbitmq.com/download.html
安装步骤如下:
1. 安装erlang环境, rpm -ivh erlang-22.3.4.12-1.el7.x86_64.rpm
2. 安装rabbitMQ(需要有外网),yum install -y rabbitmq-server-3.7.18-1.el7.noarch.rpm
3. 拷贝配置文件,cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
4.、修改rabbitmq.config文件,开启允许来宾用户登录访问页面(用户密码guest/guest)
5、启动rabbitMQ插件管理,rabbitmq-plugins enable rabbitmq_management
6、启动RabbitMQ服务(关闭、重启、查看状态命令)
systemctl start rabbitmq-server
systemctl stop rabbitmq-server
systemctl restart rabbitmq-server
systemctl status rabbitmq-server
7、关闭防火墙
[root@xupeng ~]# systemctl disable firewalld
[root@xupeng ~]# systemctl stop firewalld
8、访问页面,端口是15672
9、页面介绍
10、错误排查,因为我之前安装过一遍,所以没有问题,初次安装可能会出现如下两个错误
10.1. 提示socat依赖被需要,则执行yum -y install socat
10.2. rabbitMQ启动不起来,可以尝试再次调用重启命令,还是起不来 journalctl -xe 命令查看错误日志。我的日志里有这么一条错误ERROR: epmd error for host 192: badarg (unknown POSIX error) ,大概意思就是主机名称不能有数字,调整命令如下
# 查看下自己主机名称[root@xupeng ~]# hostname
xupeng
# 修改下主机名称[root@xupeng ~]# set-hostname xupeng
.
11、下面是实际操作,
[root@xupeng test]# ll
总用量 32200
drwxr-xr-x. 2 root root 2229月 412:09 bin
-rw-r--r--. 1 root root 199917289月 8 09:39 erlang-22.3.4.12-1.el7.x86_64.rpm
-rw-r--r--. 1 root root 104944319月 8 09:39 rabbitmq-server-3.7.18-1.el7.noarch.rpm
drwxrwxr-x. 7 root root 409610月 42021 redis-6.2.6
-rw-r--r--. 1 root root 24765424月 1222:58 redis-6.2.6.tar.gz
[root@xupeng test]# rpm -ivh erlang-22.3.4.12-1.el7.x86_64.rpm
警告:erlang-22.3.4.12-1.el7.x86_64.rpm: 头V4 RSA/SHA1 Signature, 密钥 ID 6026dfca: NOKEY
准备中... ################################# [100%]
正在升级/安装...
1:erlang-22.3.4.12-1.el7 ################################# [100%][root@xupeng test]# yum list | grep erlang
erlang.x86_64 22.3.4.12-1.el7 installed
[root@xupeng test]# yum -y remove erlang-*
已加载插件:fastestmirror, langpacks
参数 erlang-22.3.4.12-1.el7.x86_64.rpm 没有匹配
不删除任何软件包
[root@xupeng test]# yum install -y rabbitmq-server-3.7.18-1.el7.noarch.rpm
已加载插件:fastestmirror, langpacks
正在检查 rabbitmq-server-3.7.18-1.el7.noarch.rpm: rabbitmq-server-3.7.18-1.el7.noarch
rabbitmq-server-3.7.18-1.el7.noarch.rpm 将被安装
正在解决依赖关系
There are unfinished transactions remaining. You might consider running yum-complete-transaction, or "yum-complete-transaction --cleanup-only" and "yum history redo last", first to finish them. If those don't work you'll have to try removing/installing packages by hand (maybe package-cleanup can help).
--> 正在检查事务
---> 软件包 rabbitmq-server.noarch.0.3.7.18-1.el7 将被 安装
--> 解决依赖关系完成
依赖关系解决
===============================================================================================================================================================================================================================
Package 架构 版本 源 大小
===============================================================================================================================================================================================================================
正在安装:
rabbitmq-server noarch 3.7.18-1.el7 /rabbitmq-server-3.7.18-1.el7.noarch 11 M
事务概要
===============================================================================================================================================================================================================================
安装 1 软件包
总计:11 M
安装大小:11 M
Downloading packages:
Running transaction check
Running transaction test
Transaction test succeeded
Running transaction
警告:RPM 数据库已被非 yum 程序修改。
正在安装 : rabbitmq-server-3.7.18-1.el7.noarch 1/1
验证中 : rabbitmq-server-3.7.18-1.el7.noarch 1/1
已安装:
rabbitmq-server.noarch 0:3.7.18-1.el7
完毕!
[root@xupeng test]# ll /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq*
-rw-r--r--. 1 root root 332359月 162019 /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example
[root@xupeng test]# cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
修改配置文件
继续回到linux
[root@xupeng rabbitmq]# rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@xupeng:
rabbitmq_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@xupeng...
The following plugins have been enabled:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
set3 plugins.
Offline change; changes will take effect at broker restart.
[root@xupeng ~]# systemctl disable firewalld[root@xupeng ~]# systemctl stop firewalld[root@xupeng ~]# systemctl start rabbitmq-server
二、模型演示
最左侧圆点是生产者,最右侧圆点是消费者,红色的是队列,深蓝的圆点是路由
大致使用就是生产者的消息可以通过路由或直接向队列里添加,消费者从队列里获取
<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.2</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version></dependency><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.2</version></dependency></dependencies>
2.1.HelloWord模型(直连)
消费者直接放消息到队列,生产者取走。只允许一个消费者对一生产者。
首先在管理页面创建虚拟主机,配置一个用户访问虚拟主机
图一↓
图二↓
图三↓
代码测试↓
1.启动Customer类main方法
2.启动Provider类main方法
3.管理页面可以看到当前队列信息
importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importjava.io.IOException;publicclassProvider{publicstaticvoidmain(String[] args)throwsIOException{Connection connection =RabbitMQUtils.getConnection();// 从连接中获取通道Channel channel = connection.createChannel();// 参数一:让通道与消息队列绑定,这样才知道发送给哪个消息队列,队列名称(没有自己创建)// 参数二:队列是否要持久化,true开启。// 参数三:是否独占队列,true独占// 参数四:是否消费完成后自动删除队列,true自动删除,// 参数五:额外参数
channel.queueDeclare("hello",false,false,false,null);// 发布消息// 参数1:交换机名称 参数2:队列名称 参数3:额外参数 参数4:消息
channel.basicPublish("","hello",null,"我要向队列中传递消息了".getBytes());RabbitMQUtils.closeChannlAndConnection(channel, connection);}}importcom.rabbitmq.client.*;importjava.io.IOException;publicclassCustomer{publicstaticvoidmain(String[] args)throwsException{Connection connection =RabbitMQUtils.getConnection();Channel channel = connection.createChannel();
channel.queueDeclare("hello",false,false,false,null);// 消费消息// 参数1:消费的消息队列名称 参数2:开启消息的自动确认机制 参数3:回调函数
channel.basicConsume("hello",true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body是:"+newString(body));}});// 因为basicConsume函数属于异步消费,正常需要开启线程去监听消息队列,如果关闭会导致主线程退出// 消费者就因该一直阻塞,等待消费// channel.close();// connection.close();}}/**
* 获取连接工具类,防止导错类,类路径也给出来了
*/importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;publicclassRabbitMQUtils{// 创建链接MQ的工厂privatestaticConnectionFactory connectionFactory;static{
connectionFactory =newConnectionFactory();// 设置地址
connectionFactory.setHost("192.168.10.132");// 端口不是web控制页面的那个端口
connectionFactory.setPort(5672);// 虚拟主机
connectionFactory.setVirtualHost("ems");// 用户和密码
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");}publicstaticConnectiongetConnection(){try{// 获取链接对象Connection connection = connectionFactory.newConnection();return connection;}catch(Exception e){
e.printStackTrace();}returnnull;}publicstaticvoidcloseChannlAndConnection(Channel channel,Connection connection){try{
channel.close();}catch(Exception e){
e.printStackTrace();}try{
connection.close();}catch(Exception e){
e.printStackTrace();}}}
细节补充
queueDeclare方法的第二个参数是否持久化队列,设置为true的话,在重启MQ时会保存在磁盘,下次启动时恢复队列。开启持久化的话,标志是个D,但是消息队列里的消息不会持久化,依然丢失。
持久化消息通过channel.basicPublish方法配置MessageProperties.PERSISTENT_TEXT_PLAIN
//channel.basicPublish(“”,“hello”, MessageProperties.PERSISTENT_TEXT_PLAIN,“你好呀11”.getBytes());
2.2.WorkQueue(任务模型)
直连模型如果消费速度比生产速度慢,可能会导致任务队列的任务越来越多,并且得不到消费。任务模型可以让多个消费者绑定到一个队列。共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
2.2.1 平均分配 - 假设有两个消费者,十个任务,每个线程会各分配5个,并不会因为某一线程处理速度快而多消费
启动下图的Customer1、Customer2,然后启动Provider向队列中添加消息查看效果。
因为开启了自动确认机制,如果消息消费过程中出现异常,该消息并不会放回至队列中,导致消息消失。自动确认机制是预先从队列获取好任务并删除队列中的任务,实际上此时还并为消费。
publicclassProvider{publicstaticvoidmain(String[] args)throwsIOException{Connection connection =RabbitMQUtils.getConnection();Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);for(int i =0; i <10; i++){
channel.basicPublish("","work",null,("你好呀"+ i).getBytes());}RabbitMQUtils.closeChannlAndConnection(channel, connection);}}publicclassCustomer1{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{Connection connection =RabbitMQUtils.getConnection();Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("work",true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者1:"+newString(body));}});}}publicclassCustomer2{// 与消费者1代码一至}
2.2.2 能者多劳 - 处理速度快的线程会多消费任务
修改消费者代码,关闭消息自动确认机制,由于消费者2方法必然会报错导致消息消费失败,会将消息重新放回队列。如果不调用channel.basicAck或channel.basicNack,方法线程无法判断消息是否处理完成导致线程一直阻塞。从而无法重新去消息队列获取数据。
publicclassCustomer1{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{Connection connection =RabbitMQUtils.getConnection();Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);// 表示一次只消费一个消息
channel.basicQos(1);// 关闭任务自动确认机制
channel.basicConsume("work",false,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者1:"+newString(body));try{Thread.sleep(10);// 此处需要手动确认任务完成,否则不会接取下一个任务,队列任务中会出现未确认的数量
channel.basicAck(envelope.getDeliveryTag(),false);}catch(InterruptedException e){
e.printStackTrace();}}});}}publicclassCustomer2{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{Connection connection =RabbitMQUtils.getConnection();Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);// 表示一次只消费一个消息
channel.basicQos(1);// 关闭自动确认机制
channel.basicConsume("work",false,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{try{System.out.println("消费者2:"+newString(body));Thread.sleep(40);int x =10/0;// 此处需要手动确认任务完成,否则不会接取下一个任务,队列任务中会出现未确认的数量
channel.basicAck(envelope.getDeliveryTag(),false);}catch(Exception e){
e.printStackTrace();
channel.basicNack(envelope.getDeliveryTag(),false,true);}}});}}
2.3.fanout模型(广播)
Publish/Subscribe:广播模型,可以有多个消费者,并且每个消费者有自己的队列,生产者将消息发送给交换机,交换机来决定放入哪个队列当中
下图代码:消费者启动时创建了两个临时队列,生产者将信息传入到logs交换机中,交换机将消息放入到了两个队列中,所以消费者1与消费者2均能消费同样的信息
publicclassProvider{publicstaticvoidmain(String[] args)throwsIOException{Connection connection =RabbitMQUtils.getConnection();Channel channel = connection.createChannel();// 指定交换机,交换机名称,交换机类型, fanout = 广播类型
channel.exchangeDeclare("logs","fanout");// 交换机指向logs,第二个参数是路由key参数,此时还没有意义,可以为空
channel.basicPublish("logs","work",null,"你好呀".getBytes());RabbitMQUtils.closeChannlAndConnection(channel, connection);}}publicclassCustomer1{publicstaticvoidmain(String[] args)throwsException{Connection connection =RabbitMQUtils.getConnection();Channel channel = connection.createChannel();// 绑定交换机
channel.exchangeDeclare("logs","fanout");// 获取临时队列,并绑定临时队列String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,"logs","");// 消费消息
channel.basicConsume(queueName,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者1:"+newString(body));}});}}publicclassCustomer2{//....一样的代码....}
2.4.Direct模型(路由直连)
广播模式增强版,队列与交换机的绑定时,要指定一个RoutingKey (路由key),Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与队列的Routing key完全一致,才会接收到消息。
publicclassProvider{publicstaticvoidmain(String[] args)throwsIOException{Connection connection =RabbitMQUtils.getConnection();Channel channel = connection.createChannel();// 指定交换机,交换机类型
channel.exchangeDeclare("logs2","direct");// 交换机指向logs,指定路由参数
channel.basicPublish("logs2","info",null,"我是info信息".getBytes());
channel.basicPublish("logs2","error",null,"我是error信息".getBytes());RabbitMQUtils.closeChannlAndConnection(channel, connection);}}publicclassCustomer1{publicstaticvoidmain(String[] args)throwsException{Connection connection =RabbitMQUtils.getConnection();Channel channel = connection.createChannel();// 绑定交换机,指定模式为路由直连
channel.exchangeDeclare("logs2","direct");// 获取临时队列,并绑定临时队列, routingKeyString queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,"logs2","error");// 消费消息
channel.basicConsume(queueName,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者1:"+newString(body));}});}}publicclassCustomer2{publicstaticvoidmain(String[] args)throwsException{Connection connection =RabbitMQUtils.getConnection();Channel channel = connection.createChannel();// 绑定交换机,指定模式为路由直连
channel.exchangeDeclare("logs2","direct");// 获取临时队列,并绑定临时队列, routingKeyString queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,"logs2","info");// 消费消息
channel.basicConsume(queueName,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者1:"+newString(body));}});}}
2.5.Topic模型(订阅/动态路由模型)
路由模式的路由key是一个固定值,而Topic模型的路由值可以用通配符
通配符*:一个或多个词语,比如log.*,可以匹配log.a.b,log.aaaa.bbb
通配符#:只能匹配一个词,比如log.#能陪陪log.a,log.b
PS: 也可以俩通配符一起用,比如: *.log.#
publicclassProvider{publicstaticvoidmain(String[] args)throwsIOException{Connection connection =RabbitMQUtils.getConnection();Channel channel = connection.createChannel();// 指定交换机,交换机名称,交换机类型, topic= 动态路由模式
channel.exchangeDeclare("topics","topic");// 发送消息
channel.basicPublish("topics","user.save",null,("我是AAAAA").getBytes());RabbitMQUtils.closeChannlAndConnection(channel, connection);}}publicclassCustomer1{publicstaticvoidmain(String[] args)throwsException{Connection connection =RabbitMQUtils.getConnection();Channel channel = connection.createChannel();// 绑定交换机
channel.exchangeDeclare("topics","topic");// 获取临时队列,并绑定临时队列, routingKeyString queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,"topics","user.#");
channel.queueBind(queueName,"topics","*.user.#");// 消费消息
channel.basicConsume(queueName,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者1:"+newString(body));}});}}
三、springboot整合
依赖与配置文件
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
server.port=80
spring.application.name=rabbitMQboot
spring.rabbitmq.host=192.168.14.130
spring.rabbitmq.port=5672
spring.rabbitmq.username=ems
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/ems
3.1 直连模型
/**
* 用于向消息队列添加数据
*/@Controller@RequestMapping("/test")publicclassTestController{// 注入RabbitTemplate@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/hello")publicvoidhelloWord(){
rabbitTemplate.convertAndSend("hello","我是消息");}}// 表示监听的队列,declare 表示是否持久化队列,是否自动删除// 默认持久化 非独占 自动删除@Component@RabbitListener(queuesToDeclare =@Queue(name ="hello",declare ="false",autoDelete ="true"))publicclassCustomerController{// 表示这个是处理消息的回调函数@RabbitHandlerpublicvoidreceivel(String msg){System.out.println("收到消息:"+ msg);}}
3.2 任务模型
@Controller@RequestMapping("/test")publicclassTestController{// 注入RabbitTemplate@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/work")publicvoidwork(){for(int i =0; i <10; i++){
rabbitTemplate.convertAndSend("work","work模型"+ i);}}}@ComponentpublicclassWorkController{@RabbitListener(queuesToDeclare =@Queue("work"))publicvoidreceivel1(String msg){System.out.println("消费者1"+ msg);}@RabbitListener(queuesToDeclare =@Queue("work"))publicvoidreceivel2(String msg){System.out.println("消费者2"+ msg);}}
3.3 广播模型
@Controller@RequestMapping("/test")publicclassTestController{@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/fanout")publicvoidfanout(){// 路由key不写
rabbitTemplate.convertAndSend("logs","","广播数据");}}@ComponentpublicclassFanoutController{@RabbitListener(bindings =@QueueBinding(// 绑定一个临时队列,绑定交换机
value =@Queue,exchange =@Exchange(value ="logs",type ="fanout")))publicvoidreceivel1(String msg){System.out.println("消费者1"+ msg);}@RabbitListener(bindings =@QueueBinding(// 绑定一个临时队列,绑定交换机
value =@Queue,exchange =@Exchange(value ="logs",type ="fanout")))publicvoidreceivel2(String msg){System.out.println("消费者2"+ msg);}}
3.4 路由模型
@Controller@RequestMapping("/test")publicclassTestController{// 注入RabbitTemplate@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/route")publicvoidroute(){
rabbitTemplate.convertAndSend("directs","info","发送info级别的路由信息");}}@ComponentpublicclassRouteController{@RabbitListener(bindings =@QueueBinding(
value =@Queue,//临时队列
exchange =@Exchange(value ="directs",type ="direct"),
key ={"info","error"}//路由key))publicvoidreceivel1(String msg){System.out.println("消费者1"+ msg);}@RabbitListener(bindings =@QueueBinding(
value =@Queue,//临时队列
exchange =@Exchange(value ="directs",type ="direct"),
key ={"info"}//路由key))publicvoidreceive2(String msg){System.out.println("消费者2"+ msg);}}
3.4 、订阅(动态路由)
@Controller@RequestMapping("/test")publicclassTestController{// 注入RabbitTemplate@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/topic")publicvoidtopic(){
rabbitTemplate.convertAndSend("topics","rizhi.info","动态路由");}}@ComponentpublicclassTopicController{@RabbitListener(bindings =@QueueBinding(
value =@Queue,
exchange =@Exchange(type ="topic",name ="topics"),
key ={"user.#","rizhi.info"}))publicvoidrecevicel(String msg){System.out.println("消费者1"+ msg);}}
版权归原作者 问君何为变化 所有, 如有侵权,请联系我们删除。