什么是RabbitMQ
Q全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。
为什么使用MQ
在项目中,可将一些无需即时返回且耗时的操作提取出来,进行**异步处理**,而这种异步处理 的方式大大的节省了服务器的请求响应时间,从而**提高**了**系统**的**吞吐量**。
开发中消息队列通常有如下应用场景:
1、任务**异步**处理 将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应 用程序的响应时间。 2、应用程序**解耦合** MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。 3、**削峰填谷** 如订单系统,在下单的时候就会往数据库写数据。但是数据库只能支撑每秒1000左右的并发 写入,并发量再高就容易宕机。低峰期的时候并发也就100多个,但是在高峰期时候,并量 会突然激增到5000以上,这个时候数据库肯定卡死了。 ** 解决办法**: 消息被MQ保存起来,然后系统就可以按照自己的消费能力来消费,比如每1000 个数据,这样慢慢写入数据库,这样就不会卡死数据库了。 但是使用了MQ之后,限制消费的消息速度为1000,但是这样一来,高峰期产生的数据势会 被积压在MQ中,高峰就被“削”掉了。但是因为消息积压,在高峰期过后的一段时间内,消费 的消息速度还是会维持在1000QPS,直到消费完积压的消息,这就叫做“填谷”
RabbitMQ各组件功能
Broker:标识消息队列服务器实体.
Virtual Host:虚拟主机。标识一批交换机、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是 /。
Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
Banding:绑定,用于消息队列和交换机之间的关联。一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟链接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。
Connection:网络连接,比如一个TCP连接。
Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Consumer:消息的消费者,表示一个从一个消息队列中取得消息的客户端应用程序。
Message:消息,消息是不具名的,它是由消息头和消息体组成。消息体是不透明的,而消息头则是由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(优先级)、delivery-mode(消息可能需要持久性存储[消息的路由模式])等。
创建项目
可以在windows中安装RabbiMQ 也可以在Linux 中安装RabbiMQ 可以到百度上查怎么安装的 我写的文章中也有具体怎么安装的
安装后 在IDEA中创建一个 Maven项目
添加依赖 Maven
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
** 在编写代码前 要开启RabbitMQ的服务 否则是无法使用的**
什么是生产者什么是消费者 .就好比微信发消息.发消息的一方是生产者(生产消息),而消费者就是获取消息的一方
学过Socket通信的就很好掌握在Socket中客户端就好比是生产者, 而服务端就好比是消费者但是不一样的是,服务端必须保证在线.否则客户端是无法发送消息的,也就是连接失败,而在RabbitMQ ,利用队列来存储客户端(生产者)发送的消息(就好比数据库) 而服务端(消费者)开启后会自动获取队列中的全部消息 ,而队列就好比是中间商 来帮助 客户端和服务端 交互 不用 服务端一直开启
我们在来说说微信 : 我们在微信中发生一条信息给其他用户,但是那个用户不在线,信息发过去了没???
答案是没有, 那么存储在哪里了? , 存储在你和他的队列里了, 当他上线后自动会从队列中获取,你发送的全部消息
下面我们就来实现,上面讲述的功能
编写RabbitMQ 连接
package com.rabbitmq;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
//队列名称
public static final String QUEUE_NAME = "simple_queue";
public static Connection getConnection() throws Exception {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//主机地址 如果是本机就是localhost 如果在其他 地方比如:虚拟机中 那么就是ip地址
connectionFactory.setHost("192.168.216.128");
//连接端口;默认为 5672
connectionFactory.setPort(5672);
//虚拟主机名称 就是和你用户绑定的虚拟机 在创建用户时候就指定了
connectionFactory.setVirtualHost("/itcast");
//连接用户名
connectionFactory.setUsername("admin");
//连接密码
connectionFactory.setPassword("admin");
//创建连接
return connectionFactory.newConnection();
}
}
编写生产者
Producer
package com.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
//生产者
public class Producer {
public static void main(String[] args) throws Exception {
//创建连接
Connection connection = ConnectionUtil.getConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明(创建)队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(ConnectionUtil.QUEUE_NAME, true, false, false, null);
// 要发送的信息
String message = "你好;小兔子111!";
/**
* 参数1:交换机名称,如果没有指定则使用默认Default Exchage
* 参数2:路由key,简单模式可以传递队列名称
* 参数3:消息其它属性
* 参数4:消息内容
*/
channel.basicPublish("", ConnectionUtil.QUEUE_NAME, null, message.getBytes());
System.out.println("已发送消息:" + message);
// 关闭资源
channel.close();
connection.close();
}
}
在执行上述的消息发送之后;可以用发送消息的账户,登录RabbitMQ的管理控制台,可以发现队列和其消息:
我是在linux上搭建的启动服务器后在游览器上输入http://192.168.216.128:15672就能进入RabbitMQ ,ip是linux 的ip地址windows也一样
查看发送的消息信息
编写消费者
package com.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
//消费者
public class Consumer {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明(创建)队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(ConnectionUtil.QUEUE_NAME, true, false, false, null);
//创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 消息者标签,在channel.basicConsume时候可以指定
* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
* properties 属性信息
* body 消息
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("接收到的消息为:" + new String(body, StandardCharsets.UTF_8));
}
};
//监听消息
/**
* 参数1:队列名称
* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
* 参数3:消息接收到后回调
*/
channel.basicConsume(ConnectionUtil.QUEUE_NAME, true, consumer);
//不关闭资源,应该一直监听消息
//channel.close();
//connection.close();
}
}
当启动消费者后 自动获取 生产者 发送到队列里的消息 而且持续保持监听
只要消费者接受到消息 那么 就会将队列里对应的消息删除
你也可以手动收到消息
//手动确认消息
channel.basicAck(envelope.getDeliveryTag(), true);
那么就要将 是否自动确认 关闭
channel.basicConsume(ConnectionUtil.QUEUE_NAME, false, consumer);
具体怎么使用 看下面的案例
Work Queues(工作队列模式)
在上面的入门案例中我们 完成了 一个消费者对应一个生产者
这个案例中我们完成一个生产者对应多个消费者 来加快消息的处理
应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
比如 抢购活动 下了1000个订单 我可以使用多个消费者来 分别处理这1000个订单 Work Queues与入门程序的简单模式的代码是几乎一样的;可以完全复制,并复制多个消费者, 进行并行消费。只是确认消息 需要手动确定
结构图:
RabbitMQ连接上面有这里就不写了
生产者
Producer
package com.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
//生产者
public class Producer {
public static void main(String[] args) throws Exception {
//创建连接
Connection connection = ConnectionUtil.getConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明(创建)队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(ConnectionUtil.QUEUE_NAME, true, false, false, null);
for (int i = 0; i < 30; i++) {
// 要发送的信息
String message = "你好;小兔子!---- "+i;
/**
* 参数1:交换机名称,如果没有指定则使用默认Default Exchage
* 参数2:路由key,简单模式可以传递队列名称
* 参数3:消息其它属性
* 参数4:消息内容
*/
channel.basicPublish("", ConnectionUtil.QUEUE_NAME, null, message.getBytes());
System.out.println("已发送消息:" + message);
}
// 关闭资源
channel.close();
connection.close();
}
}
一次生产 30条 "你好;小兔子!---- "+i;
可以到RabbitMQ的管理控制台 查看
消费者 1
Consumer1
package com.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
//消费者
public class Consumer {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明(创建)队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(ConnectionUtil.QUEUE_NAME, true, false, false, null);
//创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 消息者标签,在channel.basicConsume时候可以指定
* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
* properties 属性信息
* body 消息
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("接收到的消息为:" + new String(body, StandardCharsets.UTF_8));
}
};
//监听消息
/**
* 参数1:队列名称
* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
* 参数3:消息接收到后回调
*/
channel.basicConsume(ConnectionUtil.QUEUE_NAME, true, consumer);
//不关闭资源,应该一直监听消息
//channel.close();
//connection.close();
}
}
代码Consumer1 和Consumer2 一模一样 复制一下 改下文件名 ,启动消费者 1 和 消费者2 然后 运行 生产者
部分截图:
在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。 简单来说就是谁抢到就是谁的
Publish/Subscribe(订阅模式)
在之前的 模式中都是 一次只能将消息发送给一个队里 那么我们可以将消息发送给所有指定的队列里吗??
当前可以 我们可以使用交换机 来代替我,们发送 就和中间商一样 我们把消息给中间商,而中间商 帮助我们来发送消息给各个用户 这就是订阅模式
订阅模式是: 将某一个消费者 的消息发给多个队里 而队列里的所有消费者都能共享到此消息
就拿微信群来说: 我在群中发送一条消息 只要是 群里的用户 都能接收到我的消息,这原理就是 发送者将消息交给交换机而交换机在发送给 此群所有人的队列里因为此群所有人的队列 都和此交换机绑定了如果还不懂 那么 微信公众号 知道吧如果你不关注他的公众号那么他发送的信息你是不会收到的 , 如果你关注了他的公众号那么就相当于和此公众号的交换机绑定了 那么公众号 发生一条消息 给交换机,此交换机就会将 消息发生给所有绑定此交换机的微信用户的队列里 当微信在线时候就会自动接收到消息
package com.itheima.rabbitmq.simple.ps;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
//交换机名称
static final String FANOUT_EXCHAGE = "fanout_exchange";
//队列名称
static final String FANOUT_QUEUE_1 = "fanout_queue_1";
//队列名称
static final String FANOUT_QUEUE_2 = "fanout_queue_2";
public static Connection getConnection() throws Exception {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//主机地址 如果是本机就是localhost 如果在其他 地方比如:虚拟机中 那么就是ip地址
connectionFactory.setHost("localhost");
//连接端口;默认为 5672
connectionFactory.setPort(5672);
//虚拟主机名称 就是和你用户绑定的虚拟机
connectionFactory.setVirtualHost("/");
//连接用户名
connectionFactory.setUsername("guest");
//连接密码
connectionFactory.setPassword("guest");
//创建连接
return connectionFactory.newConnection();
}
}
生产者
package com.rabbitmq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* 发布与订阅使用的交换机类型为:fanout
*/
public class Producer { //生产者
public static void main(String[] args) throws Exception {
//创建连接
Connection connection = ConnectionUtil.getConnection();
// 创建频道
Channel channel = connection.createChannel();
/**
* 创建交换机
* 参数1:交换机名称
* 参数2:交换机类型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(ConnectionUtil.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
// 声明(创建)队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(ConnectionUtil.FANOUT_QUEUE_1, true, false, false, null);
channel.queueDeclare(ConnectionUtil.FANOUT_QUEUE_2, true, false, false, null);
//队列绑定交换机 也就是每次消息要推送的队列
channel.queueBind(ConnectionUtil.FANOUT_QUEUE_1, ConnectionUtil.FANOUT_EXCHAGE, "");
channel.queueBind(ConnectionUtil.FANOUT_QUEUE_2, ConnectionUtil.FANOUT_EXCHAGE, "");
for (int i = 1; i <= 10; i++) {
// 发送信息
String message = "你好;小兔子!发布订阅模式--" + i;
/**
* 参数1:交换机名称,如果没有指定则使用默认Default Exchage
* 参数2:路由key,简单模式可以传递队列名称
* 参数3:消息其它属性
* 参数4:消息内容
*/
channel.basicPublish(ConnectionUtil.FANOUT_EXCHAGE, "", null, message.getBytes());
System.out.println("已发送消息:" + message);
}
// 关闭资源
channel.close();
connection.close();
}
}
消费者1
package com.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer{ //消费者1
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 创建频道
Channel channel = connection.createChannel();
//创建交换机
channel.exchangeDeclare(ConnectionUtil.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
// 声明(创建)队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(ConnectionUtil.FANOUT_QUEUE_1, true, false, false, null);
//队列绑定交换机
channel.queueBind(ConnectionUtil.FANOUT_QUEUE_1, ConnectionUtil.FANOUT_EXCHAGE, "");
//创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 消息者标签,在channel.basicConsume时候可以指定
* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
* properties 属性信息
* body 消息
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
}
};
//监听消息
/**
* 参数1:队列名称
* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
* 参数3:消息接收到后回调
*/
channel.basicConsume(ConnectionUtil.FANOUT_QUEUE_1, true, consumer);
}
}
消费者2 将消费者1代码复制一份,然后将 FANOUT_QUEUE_1 都换成FANOUT_QUEUE_2就行 ,先运行 所有消费者然后在运行生产者
控制台结果:
可以看出来 交换机成功的把 消息 发送到两个队列里了
然后我们在来看看 RabbiMQ 控制台里
这个是我们创建的交换机
点击fanout_exchange 进入里面
这就是我们绑定的队列 看看就行了别乱点
我们在看看队列
这就是我们创建的队列 我们点击fanout_queue_1进入
这个就是此队列绑定的交换机
RoutingKEY(路由模式)
什么是路由模式:
就是在订阅模式的基础上 加个RoutingKEY标记这个标记的作用是来区分消息的发送, 就和送快递一样根据地址将快递送给对应的人也可以说是分流我们还拿微信来说:
就比如一个企业群:普通消息是所有人都能接收到 而有些消息只能管理层才能接收到 ,比如:通知经理以上员工来开会 这就需要管理层关联管理队列和普通队列 ,普通员工关联普通队列
在比如商品发布: 商品分为会员商品和普通商品 ,会员能接受到普通商品的发布和会员商品的发布信息而普通用户只能接受到普通商品的发布信息
商品发布案例:
ConnectionUtil(连接)
package com.itheima.rabbitmq.simple.direct;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
//交换机名称
static final String DIRECT_EXCHAGE = "direct_exchange";
//会员队列名称
static final String DIRECT_QUEUE_MEMBER = "direct_queue_member";
//会员路由
static final String DIRECT_ROUTING_MEMBER = "member";
//普通队列名称
static final String DIRECT_QUEUE_COMMON = "direct_queue_common";
//普通路由
static final String DIRECT_ROUTING_COMMON = "common";
public static Connection getConnection() throws Exception {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//主机地址 如果是本机就是localhost 如果在其他 地方比如:虚拟机中 那么就是ip地址
connectionFactory.setHost("localhost");
//连接端口;默认为 5672
connectionFactory.setPort(5672);
//虚拟主机名称 就是和你用户绑定的虚拟机
connectionFactory.setVirtualHost("/");
//连接用户名
connectionFactory.setUsername("guest");
//连接密码
connectionFactory.setPassword("guest");
//创建连接
return connectionFactory.newConnection();
}
}
生产者
package com.rabbitmq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Producer { //生产者
public static void main(String[] args) throws Exception {
//创建连接
Connection connection = ConnectionUtil.getConnection();
// 创建频道
Channel channel = connection.createChannel();
/**
* 创建交换机
* 参数1:交换机名称
* 参数2:交换机类型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(ConnectionUtil.DIRECT_EXCHAGE, BuiltinExchangeType.FANOUT);
// 声明(创建)队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(ConnectionUtil.DIRECT_QUEUE_MEMBER, true, false, false, null);
channel.queueDeclare(ConnectionUtil.DIRECT_QUEUE_COMMON , true, false, false, null);
//队列绑定交换机 和指定 Routingkey
channel.queueBind(ConnectionUtil.DIRECT_QUEUE_MEMBER, ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_MEMBER);
channel.queueBind(ConnectionUtil.DIRECT_QUEUE_COMMON , ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_COMMON);
// 发送信息
String message = "新增了会员商品。路由模式;routing key 为 "+ConnectionUtil.DIRECT_ROUTING_MEMBER ;
/**
* 参数1:交换机名称,如果没有指定则使用默认Default Exchage
* 参数2:路由key,简单模式可以传递队列名称
* 参数3:消息其它属性
* 参数4:消息内容
*/
channel.basicPublish(ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_MEMBER, null, message.getBytes());
System.out.println("已发送消息:" + message);
// 发送信息
message = "新增了普通商品。路由模式;routing key 为 "+ConnectionUtil.DIRECT_ROUTING_COMMON ;
/**
* 参数1:交换机名称,如果没有指定则使用默认Default Exchage
* 参数2:路由key,简单模式可以传递队列名称
* 参数3:消息其它属性
* 参数4:消息内容
*/
channel.basicPublish(ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_COMMON, null, message.getBytes());
System.out.println("已发送消息:" + message);
// 关闭资源
channel.close();
connection.close();
}
}
会员消费者
package com.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {//会员消费者
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 创建频道
Channel channel = connection.createChannel();
//创建交换机
channel.exchangeDeclare(ConnectionUtil.DIRECT_EXCHAGE, BuiltinExchangeType.FANOUT);
// 声明(创建)队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(ConnectionUtil.DIRECT_QUEUE_MEMBER, true, false, false, null);
//队列绑定交换机 和指定 Routingkey
channel.queueBind(ConnectionUtil.DIRECT_QUEUE_MEMBER, ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_MEMBER);//会员Routing
channel.queueBind(ConnectionUtil.DIRECT_QUEUE_MEMBER, ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_COMMON);//普通Routing
//创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 消息者标签,在channel.basicConsume时候可以指定
* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
* properties 属性信息
* body 消息
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
}
};
//监听消息
/**
* 参数1:队列名称
* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
* 参数3:消息接收到后回调
*/
channel.basicConsume(ConnectionUtil.DIRECT_QUEUE_MEMBER, true, consumer);
}
}
普通消费者
package com.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {//普通消费者
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 创建频道
Channel channel = connection.createChannel();
//创建交换机
channel.exchangeDeclare(ConnectionUtil.DIRECT_EXCHAGE, BuiltinExchangeType.FANOUT);
// 声明(创建)队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(ConnectionUtil.DIRECT_QUEUE_COMMON , true, false, false, null);
//队列绑定交换机 和指定 Routingkey
channel.queueBind(ConnectionUtil.DIRECT_QUEUE_COMMON , ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_COMMON);//普通Routing
//创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 消息者标签,在channel.basicConsume时候可以指定
* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
* properties 属性信息
* body 消息
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));
}
};
//监听消息
/**
* 参数1:队列名称
* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
* 参数3:消息接收到后回调
*/
channel.basicConsume(ConnectionUtil.DIRECT_QUEUE_COMMON , true, consumer);
}
}
先运行所有消费者然后在运行生产者 控制台结果:
会员
普通
大家可能会发现 在Producer类中和Consumer以及Consumer1 中都存在重复创建队列和交换机代码
channel.exchangeDeclare(xxx) 创建交换机
channel.queueBind(xxx) 创建队列
那么能不能省略呢答案是能 但是这样的话你必须保证发送消息的目标队列必须存在否则消息将丢失,所以还是不要省略为好这样在发送消息之前就将需要的队列创建完成了, 保证了消息不会丢失
Topics(通配符模式)
Topics通配符模式是在RoutingKey路由模式的基础上升级的 增加了通配符功能 ,在RoutingKey路由模式中我们要分流发送信息,需要指定每一个队列的RoutingKey,如果是100个队列呢? 那不给累死…
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: group.member1
通配符规则:
#:匹配一个或多个词 (最常用)
*:匹配不多不少恰好1个词
举例:
group.#:能够匹配group.xxx 或者 group.xxx.xx
group.*:只能匹配group.xxx
我们将RoutingKey路由模式的代码改动下:
将ConnectionUtil类中的 路由变量换成以下代码
//会员路由
static final String DIRECT_ROUTING_MEMBER = "item.member";
//普通路由
static final String DIRECT_ROUTING_COMMON = "item.common";
Consumer
package com.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {//会员消费者
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 创建频道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(ConnectionUtil.DIRECT_EXCHAGE, BuiltinExchangeType.TOPIC);
// 声明(创建)队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(ConnectionUtil.DIRECT_QUEUE_MEMBER, true, false, false, null);
//队列绑定交换机 和指定Routing
channel.queueBind(ConnectionUtil.DIRECT_QUEUE_MEMBER, ConnectionUtil.DIRECT_EXCHAGE, "item.*");//会员Routing
//创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 消息者标签,在channel.basicConsume时候可以指定
* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
* properties 属性信息
* body 消息
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("消费者1-接收到的消息为:" + new String(body, StandardCharsets.UTF_8));
}
};
//监听消息
/**
* 参数1:队列名称
* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
* 参数3:消息接收到后回调
*/
channel.basicConsume(ConnectionUtil.DIRECT_QUEUE_MEMBER, true, consumer);
}
}
Consumer1
package com.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class Consumer1 {//普通消费者
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 创建频道
Channel channel = connection.createChannel();
//创建交换机
channel.exchangeDeclare(ConnectionUtil.DIRECT_EXCHAGE, BuiltinExchangeType.TOPIC);
// 声明(创建)队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(ConnectionUtil.DIRECT_QUEUE_COMMON , true, false, false, null);
//队列绑定交换机 和指定 Routingkey
channel.queueBind(ConnectionUtil.DIRECT_QUEUE_COMMON , ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_COMMON);//普通Routing
//创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 消息者标签,在channel.basicConsume时候可以指定
* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
* properties 属性信息
* body 消息
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("消费者2-接收到的消息为:" + new String(body, StandardCharsets.UTF_8));
}
};
//监听消息
/**
* 参数1:队列名称
* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
* 参数3:消息接收到后回调
*/
channel.basicConsume(ConnectionUtil.DIRECT_QUEUE_COMMON , true, consumer);
}
}
Producer
package com.rabbitmq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Producer { //生产者
public static void main(String[] args) throws Exception {
//创建连接
Connection connection = ConnectionUtil.getConnection();
// 创建频道
Channel channel = connection.createChannel();
/**
* 创建交换机
* 参数1:交换机名称
* 参数2:交换机类型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(ConnectionUtil.DIRECT_EXCHAGE, BuiltinExchangeType.TOPIC);
// 声明(创建)队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(ConnectionUtil.DIRECT_QUEUE_MEMBER, true, false, false, null);
channel.queueDeclare(ConnectionUtil.DIRECT_QUEUE_COMMON , true, false, false, null);
//队列绑定交换机 和指定 Routingkey
channel.queueBind(ConnectionUtil.DIRECT_QUEUE_MEMBER, ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_MEMBER);
channel.queueBind(ConnectionUtil.DIRECT_QUEUE_COMMON , ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_COMMON);
// 发送信息
String message = "新增了会员商品。路由模式;routing key 为 "+ConnectionUtil.DIRECT_ROUTING_MEMBER ;
/**
* 参数1:交换机名称,如果没有指定则使用默认Default Exchage
* 参数2:路由key,简单模式可以传递队列名称
* 参数3:消息其它属性
* 参数4:消息内容
*/
channel.basicPublish(ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_MEMBER, null, message.getBytes());
System.out.println("已发送消息:" + message);
// 发送信息
message = "新增了普通商品。路由模式;routing key 为 "+ConnectionUtil.DIRECT_ROUTING_COMMON ;
/**
* 参数1:交换机名称,如果没有指定则使用默认Default Exchage
* 参数2:路由key,简单模式可以传递队列名称
* 参数3:消息其它属性
* 参数4:消息内容
*/
channel.basicPublish(ConnectionUtil.DIRECT_EXCHAGE, ConnectionUtil.DIRECT_ROUTING_COMMON, null, message.getBytes());
System.out.println("已发送消息:" + message);
// 关闭资源
channel.close();
connection.close();
}
}
消息手动确认
什么是消息确认ACK。如果在处理消息的过程中,消费者的服务器在处理消息的时候出现异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢失。为了确保数据不会丢失,RabbitMQ支持消息确定-ACK。
ACK的消息确认机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。
如果一个消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中。
如果在集群的情况下,RabbitMQ会立即将这个消息推送给这个在线的其他消费者。这种机制保证了在消费者服务端故障的时候,不丢失任何消息和任务。
消息永远不会从RabbitMQ中删除,只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
消息的ACK确认机制默认是打开的
false只确认当前一个消息收到,true确认所有consumer获得的消息 ,一般使用false 就行了
channel.basicAck(envelope.getDeliveryTag(),false); //确认消息被消费了
返回false(消息没有被消费成功),消息重新回到队列,(类似数据库的回滚)
channel.basicNack(envelope.getDeliveryTag(), false, true);
版权归原作者 西门吹雪@132 所有, 如有侵权,请联系我们删除。