MQ概述
MQ全称message queue(消息队列),是在消息的传输过程中保存消息的容器,多用于分布式系统之间进行通信。

总结
MQ,消息队列,存储消息的中间件。
分布式系统通信两种方式:直接远程调用和借助第三方完成间接通信
发送方称为生产者,接收方称为消费者
MQ的优势
1.应用解耦
传统的远程调用,如下其中库存系统出现故障会影响到订单系统,添加新系统需要重新修改订单系统的代码
使用MQ后,消息通过中间件转发,消费者从MQ中取消息,如果库存系统出现异常,等库存系统自我修复后再去MQ中取消息,不会影响其他系统。添加新系统也是去MQ中取,不用修改订单系统的代码。
2.异步提速
传统的直接调用,需要耗时920ms
使用MQ,一个订单只需要25ms,提升用户体验和系统的吞吐量(吞吐量:单位时间内处理请求的数目)
3.削峰填谷
传统的远程调用,A系统处理不了直接崩溃了
使用MQ ,将请求交给MQ处理,然后由MQ每秒拉出1000个请求给A系统处理


使用了MQ之后,限制消费信息速度为1000,这样一来,高峰期产生的数据势必会被积压再MQ中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。
总结
应用解耦:提高系统容错性和可维护性
异步提速:提升了用户体验和系统吞吐量
削峰填谷:提高系统稳定性
MQ的劣势
1.系统可用性降低
系统引入的外部依赖越多,系统的稳定性越差,一旦MQ宕机,就会对业务造成影响,如何保证MQ的高可用?
2.系统复杂度提高
MQ的加入大大的增加了系统的复杂度,以前系统间是同步的远程调用,限制是通过MQ进行异步调用,如何保证消息没有被重复消费?怎么处理消息丢失情况?怎么保证消息传递的顺序性?
3.一致性问题
A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统和C系统处理成功,D系统处理失败,如何保证消息数据处理的一致性?
使用MQ需要满足什么条件?
1.生产者不需要从消费者处获得反馈,引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层动作还未做,上层却当成动作做完了继续往后执行,即所谓的异步成为可能。
2.容许短暂的不一致性。
3.确实使用了有效果,即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ的成本。
常见的MQ产品对比
在Docker容器中部署RabbitMQ
#1.下载RabbitMQ镜像
[root@localhost ~]# docker pull rabbitmq:management
#2.创建RabbitMQ容器
docker run -d --name=tensquare_rabbitmq
-p 5672:5672 -p 5671:5671 -p 4369:4369
-p 15671:15671 -p 15672:15672 -p 25672:25672 rabbitmq:management
注意:如果docker pull rabbitmq 后面不带management,启动rabbitmq后无法打开管理界面,所以我们要下载带management插件的rabbitmq
你可能就需要配置web插件
rabbitmq-plugins enable rabbitmq_management
主要端口:
4369 erlang 发现口
5672 client 端通信口
15672 管理界面 ui 端口
25672 server 间内部通信口
网页访问
http://虚拟机ip:15671
rabbitmq有一个默认的用户名和密码,guest和guest,但为了安全考虑,该用户名和密码只允许本地访问,如果是远程操作的话,需要创建新的用户名和密码
docker run -di --name rabbitmq -p 5672:5672 -p 15672:15672
-v `pwd`/rabbitmq:/var/lib/rabbitmq --hostname myRabbit
-e RABBITMQ_DEFAULT_VHOST=myvhost
-e RABBITMQ_DEFAULT_USER=admin
-e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3.7.14-management
#说明:
#-i:表示运行容器
#-d:在run后面加上-d参数,则会创建一个守护式容器在后台运行
#(这样创建容器后不会自动登录容器,如果只加-it两个参数,创建后就会自动进去容器)。
#-di:后台运行容器;
#--name:指定容器名;
#-p:指定服务运行的端口(5672:应用访问端口;15672:控制台Web端口号);
#-v:映射目录或文件(文件共享),格式 宿主机目录:容器目录
#--hostname:主机名
#(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名);
#-e 指定环境变量;(RABBITMQ_DEFAULT_VHOST:默认虚拟机名;RABBITMQ_DEFAULT_USER:
#默认的用户名;RABBITMQ_DEFAULT_PASS:默认用户名的密码)

快速入门
代码测试
生产者
遇到问题:com.rabbitmq.client.ShutdownSignalException: connection error

虚拟机没有配置
package com.xxxx.Producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 发送消息
*/
public class Producer_HelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("192.168.126.129");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值 5672
factory.setVirtualHost("/itcast");//虚拟机 默认值 /
factory.setUsername("huaxiaobai");//用户名 默认值 guest
factory.setPassword("huaxiaobai");//密码 默认值 guest
//3.创建连接 connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建队列Queue
/*
public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare(
String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException {}
参数:
1.queue :队列名称
2.durable :是否持久化,当mq重启之后,还在
3.exclusive:是否独占,两个意思1.只能有一个消费者监听这队列,2.当Connection关闭时,是否删除队列
4.autoDelete:是否自动删除,当没有Consumer时,自动删除掉
5.argument:参数
*/
channel.queueDeclare("hello_world",true,false,false,null);
//6.发送消息
String body = "hello rabbitmq";
/*
public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body){}
参数:
1.exchange:交换机名称,简单模式下交换机会默认的""(空字符串)
2.routingKey:路由名称
3.props:配置信息
4.body:发送消息数据
*/
channel.basicPublish("","hello_world",null, body.getBytes());
//7.释放资源
channel.close();
connection.close();
}
}
消费者
package com.xxxx.Consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_HelloWorld {
public static void main(String[]args)throws IOException, TimeoutException{
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("192.168.126.129");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值 5672
factory.setVirtualHost("/itcast");//虚拟机 默认值 /
factory.setUsername("huaxiaobai");//用户名 默认值 guest
factory.setPassword("huaxiaobai");//密码 默认值 guest
//3.创建连接 connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建队列Queue
/*
public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare(
String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException {}
参数:
1.queue :队列名称
2.durable :是否持久化,当mq重启之后,还在
3.exclusive:是否独占,两个意思1.只能有一个消费者监听这队列,2.当Connection关闭时,是否删除队列
4.autoDelete:是否自动删除,当没有Consumer时,自动删除掉
5.argument:参数
*/
channel.queueDeclare("hello_world",true,false,false,null);
//6.接收消息
/*
public String basicConsume(String queue, boolean autoAck, Consumer callback){};
参数:
1.queue:队列名称
2.autoAck:是否自动确认
2.callback:回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法:当收到消息后,会自动执行该方法
1.consumerTag, 标识
2.envelope, 获取一些信息,交换机,路由key
3.properties, 配置信息
4.body,数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:" + consumerTag);
System.out.println("Exchange:" + envelope.getExchange());
System.out.println("RoutingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
};
channel.basicConsume("hello_world",true,consumer);
//关闭资源?不需要
}
}
RabbitMQ的工作模式
RabbitMQ Tutorials — RabbitMQ
简单模式

工作队列模式

** work queues:**与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列的消息
应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度
小结
1.在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系
2.work Queue对于任务过重或者任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,只需要有一个节点成功发送即刻。
Pub/Sub订阅模式

在订阅模型中,多了一个Exchange角色,而且过程略有变化
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
C:消费者,消息的接收者,会一直等待消息到来
Queue:消息队列,接收消息,缓存消息
Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理小i下,例如递交给某个特别队列,递交给所有队列,或是将消息丢弃,到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
1.Fanout:广播,将消息交给所有绑定到交换机的队列
2.Direct:定向,把消息交给符合指定RoutingKey的队列
3.Topic:通配符,把消息交给符合Routing pattern(路由模式)的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
生产者
package com.xxxx.Producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Produce_PubSub {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("192.168.126.129");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值 5672
factory.setVirtualHost("/itcast");//虚拟机 默认值 /
factory.setUsername("huaxiaobai");//用户名 默认值 guest
factory.setPassword("huaxiaobai");//密码 默认值 guest
//3.创建连接 connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建交换机
/*
public DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type,
boolean durable, boolean autoDelete,
boolean internal, Map<String, Object> arguments) throws IOException {}
参数:
1.exchange:交换机名称
2.type:交换机类型
DIRECT("direct"),:定向
FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列
TOPIC("topic"),:通配符的方式
HEADERS("headers");:参数匹配
3.durable:是否持久化
4.autoDelete:是否自动删除
5.internal:内部使用,一般false
6.argument:参数,
*/
String exchangeName = "test_fanout";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,
true,false,false,null);
//6.创建队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7.绑定队列和交换机
/*
public com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String queue, String exchange,
String routingKey) throws IOException {}
1.queue:队列名称
2.exchange:交换机名称
3.routingKey:路由键,绑定规则
如果交换机的类型为fanout,routingKey设置为"";
*/
channel.queueBind(queue1Name,exchangeName , "");
channel.queueBind(queue2Name,exchangeName , "");
//8.发送消息
String body = "日志信息:花笑白调用findAll方法....日志级别:info..";
channel.basicPublish(exchangeName, "", null,body.getBytes());
//9.释放资源
channel.close();
connection.close();
}
}
消费者
package com.xxxx.Consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_PubSub1 {
public static void main(String[]args)throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("192.168.126.129");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值 5672
factory.setVirtualHost("/itcast");//虚拟机 默认值 /
factory.setUsername("huaxiaobai");//用户名 默认值 guest
factory.setPassword("huaxiaobai");//密码 默认值 guest
//3.创建连接 connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建队列Queue
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
//6.接收消息
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
System.out.println("将日志的信息打印到控制台");
}
};
channel.basicConsume(queue1Name,true,consumer);
//关闭资源?不需要
}
}
package com.xxxx.Consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_PubSub2 {
public static void main(String[]args)throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("192.168.126.129");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值 5672
factory.setVirtualHost("/itcast");//虚拟机 默认值 /
factory.setUsername("huaxiaobai");//用户名 默认值 guest
factory.setPassword("huaxiaobai");//密码 默认值 guest
//3.创建连接 connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建队列Queue
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
//6.接收消息
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
System.out.println("将日志的信息保存到数据库");
}
};
channel.basicConsume(queue2Name,true,consumer);
//关闭资源?不需要
}
}
Routing 路由模式

队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey
Exchange不再把消息交换给每个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的Routingkey与消息的Routingkey完全一致才会收到消息
生产者
package com.xxxx.Producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Produce_Routing {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("192.168.126.129");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值 5672
factory.setVirtualHost("/itcast");//虚拟机 默认值 /
factory.setUsername("huaxiaobai");//用户名 默认值 guest
factory.setPassword("huaxiaobai");//密码 默认值 guest
//3.创建连接 connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建交换机
String exchangeName = "test_direct";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,
true,false,false,null);
//6.创建队列
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7.绑定队列和交换机
//队列1的绑定
channel.queueBind(queue1Name,exchangeName , "error");
//队列2的绑定
channel.queueBind(queue2Name,exchangeName , "info");
channel.queueBind(queue2Name,exchangeName , "error");
channel.queueBind(queue2Name,exchangeName , "warning");
//8.发送消息
String body = "日志信息:花笑白调用findAll方法....日志级别:info..";
channel.basicPublish(exchangeName, "info", null,body.getBytes());
//9.释放资源
channel.close();
connection.close();
}
}
消费者
只需修该一下队列名称就行
Topics通配模式


** 生产者**
package com.xxxx.Producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Produce_Topics {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("192.168.126.129");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值 5672
factory.setVirtualHost("/itcast");//虚拟机 默认值 /
factory.setUsername("huaxiaobai");//用户名 默认值 guest
factory.setPassword("huaxiaobai");//密码 默认值 guest
//3.创建连接 connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建交换机
/*
public DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type,
boolean durable, boolean autoDelete,
boolean internal, Map<String, Object> arguments) throws IOException {}
参数:
1.exchange:交换机名称
2.type:交换机类型
DIRECT("direct"),:定向
FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列
TOPIC("topic"),:通配符的方式
HEADERS("headers");:参数匹配
3.durable:是否持久化
4.autoDelete:是否自动删除
5.internal:内部使用,一般false
6.argument:参数,
*/
String exchangeName = "test_topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,
true,false,false,null);
//6.创建队列
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7.绑定队列和交换机
/*
public com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String queue, String exchange,
String routingKey) throws IOException {}
1.queue:队列名称
2.exchange:交换机名称
3.routingKey:路由键,绑定规则
如果交换机的类型为fanout,routingKey设置为"";
*/
//routingkey:系统名称.日志级别
//需求:将error级别的日志和order系统的日志存储到数据库
//将所有日志都打印到控制台
//队列1的绑定
channel.queueBind(queue1Name,exchangeName , "#.error");
channel.queueBind(queue1Name,exchangeName , "order.*");
//队列2的绑定
channel.queueBind(queue2Name,exchangeName , "*.*");
//8.发送消息
String body = "日志信息:花笑白调用findAll方法....日志级别:info..";
channel.basicPublish(exchangeName, "goods.info", null,body.getBytes());
//9.释放资源
channel.close();
connection.close();
}
}
消费者
只需修该一下队列名称就行
spring集成rabbit
生产者
导依赖坐标
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.3.18</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.3.18</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
rabbitmq.properties
rabbitmq.host=192.168.126.129
rabbitmq.port=5672
rabbitmq.username=huaxiaobai
rabbitmq.password=huaxiaobai
rabbitmq.virtual-host=/itcast
spring-rabbitmq-producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
https://www.springframework.org/schema/rabbit/spring-rabbit.xsd
">
<!--加载配置文件-->
<context:property-placeholder location="rabbitmq.properties"/>
<!--定义rabbitmq connectionFactory-->
<rabbit:connection-factory id="connectionFactory"
host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定义持久化队列,不存在则自动创建;不绑定到交换机绑定到默认交换机,
默认交换机类型为direct,名字为:"",路由键为队列的名称-->
<!--id:bean的名称
name:queue的名称
auto-declare:自动创建
auto-delete:自动删除,最后一个消费者和该队列断开连接后,自动删除队列
durable:是否持久化
exclusive:是否独占-->
<rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>
<!--~~~~~~~~~~~~~~~~~~~~~~广播,所以队列都能收到消息~~~~~~~~~~~~~~~~~~~~~~~-->
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_fanout_queue1" name="spring_fanout_queue1" auto-declare="true"/>
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_fanout_queue2" name="spring_fanout_queue2" auto-declare="true"/>
<!--定义广播类型交换机;并绑定上述两个队列-->
<rabbit:fanout-exchange name="spring_fanout_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding queue="spring_fanout_queue1"></rabbit:binding>
<rabbit:binding queue="spring_fanout_queue2"></rabbit:binding>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!--~~~~~~~~~~~~~~~~~~~通配符: *匹配一个单词,#匹配多个单词~~~~~~~~~~~~~~~~~~~-->
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_topic_star" name="spring_topic_star" auto-declare="true"/>
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_topic_queue1" name="spring_topic_queue1" auto-declare="true"/>
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_topic_queue2" name="spring_topic_queue2" auto-declare="true"/>
<rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding pattern="order.*" queue="spring_topic_star"></rabbit:binding>
<rabbit:binding pattern="order.#" queue="spring_topic_queue1"></rabbit:binding>
<rabbit:binding pattern="itcast.#" queue="spring_topic_queue2"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--定义rabbit Template对象操作可以在代码中方便发送消息-->
<rabbit:template id="template" connection-factory="connectionFactory"/>
</beans>
<rabbit:direct-exchange name="aa">
<rabbit:bindings>
<!--direct类型的交换机绑定队列 key: 路由key queue:队列名称-->
<rabbit:binding queue="spring_queue" key="xxx"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
测试代码
package com.xxxx.spring_rabbitmq_producer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
//1.注入RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testHelloWorld(){
//2.发送消息
rabbitTemplate.convertAndSend("spring_queue","hello world spring rabbitmq");
}
/**
* 发送fanout消息
*/
@Test
public void testFanout(){
//2.发送消息
rabbitTemplate.convertAndSend("spring_fanout_exchange","","hello world spring rabbitmq");
}
/**
* 发送topics消息
*/
@Test
public void testTopics(){
//2.发送消息
rabbitTemplate.convertAndSend("spring_topic_exchange","order.hehe.haha","hello world spring rabbitmq");
}
}
** 消费者**
依赖一样,properties文件一样
spring-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
https://www.springframework.org/schema/rabbit/spring-rabbit.xsd
">
<!--加载配置文件-->
<context:property-placeholder location="rabbitmq.properties"/>
<!--定义rabbitmq connectionFactory-->
<rabbit:connection-factory id="connectionFactory"
host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<!--将监听对象交给spring容器管理-->
<bean id="springQueueListener"
class="com.xxxx.rabbitmq.listener.SpringQueueListener"/>
<rabbit:listener-container connection-factory="connectionFactory"
auto-declare="true">
<rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
</rabbit:listener-container>
</beans>
监听类
package com.xxxx.rabbitmq.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class SpringQueueListener implements MessageListener {
@Override
public void onMessage(Message message) {
//打印消息
System.out.println(new String(message.getBody()));
}
}
测试启动类
package com.xxxx.rabbitmq;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {
@Test
public void test1(){
boolean flag = true;
while (true){
}
}
}
SpringBoot整合RabbotMQ
生产者
1.创建生产者SpringBoot工程
2.引入依赖坐标
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.编写yml配置,基本信息配置
rabbitmq.host=192.168.126.129
rabbitmq.port=5672
rabbitmq.username=huaxiaobai
rabbitmq.password=huaxiaobai
rabbitmq.virtual-host=/itcast
4.定义交换机,队列以及绑定关系的配置类
package com.xxxx.rabbitmqproducer.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
public static final String EXCHANGE_NAME = "boot_topic_exchange";
public static final String QUEUE_NAME = "boot_queue";
//1.创建交换机
@Bean("bootExchange")
public Exchange bootExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
//2.创建队列
@Bean("bootQueue")
public Queue bootQueue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
//队列和交换机绑定关系 Binding
/*
1.知道哪个队列
2.知道哪个交换机
3.routing key
*/
@Bean
public Binding bindQueueExchange(
@Qualifier("bootQueue") Queue queue,
@Qualifier("bootExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}
5.注入RabbitTemplate,调用方法,完成消息发送
测试:
package com.xxxx.rabbitmqproducer;
import com.xxxx.rabbitmqproducer.config.RabbitConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class RabbitmqProducerApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSend() {
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME,"boot.info","boot mq hello...");
}
}
消费者
1.创建生产者SpringBoot工程
2.引入依赖坐标
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
3.编写yml配置,基本信息配置
4.编写监听类
package com.xxxx.rabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQListener {
@RabbitListener(queues = "boot_queue")
public void ListenerQueue(Message message){
System.out.println(message);
}
}
5.启动项目
版权归原作者 花笑白 所有, 如有侵权,请联系我们删除。