什么是rabbitMQ
** RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue 高级消息队列协议 )的开源实现,能够实现异步消息处理
RabbitMQ是一个消息代理:它接受和转发消息。
你可以把它想象成一个邮局:当你把你想要发布的邮件放在邮箱中时,你可以确定邮差先生最终将邮件发送给你的收件人。在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。
RabbitMQ和邮局的主要区别在于它不处理纸张,而是接受,存储和转发二进制数据块优点:异步消息处理
业务解耦(下订单操作:扣减库存、生成订单、发红包、发短信),将下单操作主流程:扣减库存、生成订单然后通过MQ消息队列完成通知,发红包、发短信
错峰流控 (通知量 消息量 订单量大的情况实现MQ消息队列机制,淡季情况下访问量会少)灵活的路由(Flexible Routing) 在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。 RabbitMQ网站端口号:15672 程序里面实现的端口为:5672**
使用docker安装RabbitMQ,如果没有使用过docker的可以看这篇文章https://blog.csdn.net/qq_44716544/article/details/119870837
1.拉取RabbitMQ镜像
docker pull rabbitmq:management
2.运行RabbitMQ镜像
docker run -itd --name rabbit01 --hostname myrabbit -e RABBITMQ_DEFAULT_USER=chenjinxian -e RABBITMQ_DEFAULT_PASS=chenjinxian -p 15672:15672 -p 5672:5672 -p 25672:25672 rabbitmq:management
注意:RABBITMQ_DEFAULT_USER=chenjinxian -e RABBITMQ_DEFAULT_PASS=chenjinxian
这里设置的是(RABBITMQ_DEFAULT_USER)登录的账号和( RABBITMQ_DEFAULT_PASS)密码,根据自身来修改
这里看到容器已经开启成功了,然后就可以使用了
3.通过浏览器打开
如果你使用的是本地虚拟机,那么你直接使用虚拟机显示的ipv4地址加端口号就可以访问了;
如果你使用的是云服务器,那么你需要在对应服务器(阿里云,腾讯云等)的安全组中开放
15672
端口,并且在防火墙中也开放15672端口
显示如上图那么就可以开始使用了
然后通过命令进入rabbitmq容器
docker exec -it rabbit01 /bin/bash
授权账号和密码
rabbitmqctl add_user admin admin
设置用户分配操作权限
rabbitmqctl set_user_tags admin administrator
用户级别:
- administrator:可以登录控制台、查看所有信息、可以对 rabbitmq进行管理
- monitoring:监控者 登录控制台,查看所有信息
- policymaker:策略制定者 登录控制台,指定策略
- managment 普通管理员 登录控制台
**为用户添加资源权限 **
rabbitmqctl set_permissions -p / admin ".*"".*"".*"
也可以在界面操作进行添加用户
RabbitMQ支持的消息模型
1.简单模式 Simple
2.工作模式 Work
![](https://img-blog.csdnimg.cn/20210904163134927.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBA5Ya36KGA772e5aSa5aW9,size_11,color_FFFFFF,t_70,g_se,x_16)
3.发布订阅模式
4.路由模式
5.主题 Topic模式
6.参数模式
7.出版商确认模式
1.入门案例
1. RabbitMQ入门案例 - Simple 简单模式
- jdk1.8
- 构建一个 maven工程
- 定义生产者
- 定义消费者
- 观察消息的在 rabbitmq-server服务中的进程
01 构建一个maven工程
02 导入依赖
<dependencies>
<!--导入rabbitmq的依赖-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.13.0</version>
</dependency>
</dependencies>
**3.代码编写 **
在上图的模型中,有以下概念:
- 生产者,也就是要发送消息的程序
- 消费者:消息的接受者,会一直等待消息到来。
- 消息队列:图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
**生产者 **
package com.chen.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
/**
* @description: 简单模式Simple
*/
public class Producer {
public static void main(String[] args) {
// 所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
// ip port
// 1: 创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("128.197.157.151");
connectionFactory.setPort(5672);
connectionFactory.setUsername("chenjinxian");//rabbitmq登录的账号
connectionFactory.setPassword("chenjinxian");//rabbitmq登录的密码
connectionFactory.setVirtualHost("/");
//springboot ---rabbitmq
Connection connection = null;
Channel channel = null;
try {
// 2: 创建连接Connection Rabbitmq为什么是基于channel去处理而不是链接? 长连接----信道channel
connection = connectionFactory.newConnection("生成者");
// 3: 通过连接获取通道Channel
channel = connection.createChannel();
// 4: 通过通创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
String queueName = "queue1";
/*
* @params1 队列的名称
* @params2 是否要持久化durable=false 所谓持久化消息是否存盘,如果false 非持久化 true是持久化? 非持久化会存盘吗? 会存盘,但是会随从重启服务会丢失。
* @params3 排他性,是否是独占独立
* @params4 是否自动删除,随着最后一个消费者消息完毕消息以后是否把队列自动删除
* @params5 携带附属参数
*/
channel.queueDeclare(queueName, true, false, false, null);
// 5: 准备消息内容
String message = "Hello chenjinxian!!!";
// 6: 发送消息给队列queue
// @params1: 交换机 @params2 队列、路由key @params 消息的状态控制 @params4 消息主题
// 面试题:可以存在没有交换机的队列吗?不可能,虽然没有指定交换机但是一定会存在一个默认的交换机。
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("消息发送成功!!!");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
// 7: 关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
// 8: 关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
消费者
package com.chen.rabbitmq.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) {
// 所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
// ip port
// 1: 创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("128.197.157.151");//服务器IP
connectionFactory.setPort(5672);
connectionFactory.setUsername("chenjinxian");
connectionFactory.setPassword("chenjinxian");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2: 创建连接Connection
connection = connectionFactory.newConnection("消费者");
// 3: 通过连接获取通道Channel
channel = connection.createChannel();
// 4: 通过通创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
// true = ack 正常的逻辑是没问题 死循环 rabbit 重发策略
// false = nack 消息这在消费消息的时候可能会异常和故障
final Channel channel2 = channel;
channel2.basicConsume("queue1", false, new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
try {
System.out.println("收到消息是" + new String(message.getBody(), "UTF-8"));
channel2.basicAck(message.getEnvelope().getDeliveryTag(),false);
}catch (Exception ex){
ex.printStackTrace();
// 三次确认 -- reject + sixin
}
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
System.out.println("接受失败了...");
}
});
System.out.println("开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
} finally {
// 7: 关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
// 8: 关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
2. 什么是AMQP
01 什么是AMQP
AMQP全称:Advanced Message Queuing Protocol(高级消息队列协议)。是应用层协议的一个开发标准,为面向消息的中间件设计
02 AMQP生产者流转过程
03 AMQP消费者流转过程
3. RabbitMQ的核心组成部分
01 RabbitMQ的核心组成部分
核心概念: 核心概念:
Server :又称Broker ,接受客户端的连接,实现AMQP实体服务。安装rabbitmq-serverConnection:连接,应用程序与Broker的网络连接TCP/IP/三次握手和四次挥手 服务器:又称Broker,接受客户端的连接,实现AMQP实体服务。安装Rabbitmq-serverConnection:连接,应用程序与Broker的网络连接tcp/ip/三次握手和四次挥手
Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进 息读写的通道,客户端可以建立对恪Channel,每个Channel代表一个会话任务。 频道:网络信道,几乎所有的操作都在频道中进行频道,是进息读写的通道,客户端可以建立对恪频道频道,每个频道代表一个会话任务频道。
Message :消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,如消息的优先级,延迟等高级特性,Body则就是消息体的内容。 消息:消息:服务与应用程序之间传送的数据,由Properties和Body组成,Properties可是对消息进行修饰,如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
Virtual Host虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange 虚拟主机虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange
Exchange :交换机,接受消息,根据路由键发送消息到绑定的队列。(==不具备消息存储的能力==)Bindings : Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key. 交换:交换机,接受消息,根据路由键发送消息到绑定的队列.(=不具备消息存储的能力==)绑定:Exchange和Queue之间的虚拟连接,Binding中可以保护多个路由密钥。
Routing key :是一个路由规则,虚拟机可以用它来确定如何路由一个特疋消恳.bttos:/bloq.csdn.net/qg _4485823(Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费苦。"gwa" 路由密钥:是一个路由规则,虚拟机可以用它来确定如何路由一个特征消息(队列:队列:也成为消息队列,消息队列,保存消息并将它们转发给消费者.
02 RabbitMQ整体架构是什么样子的?
03 RabbitMQ的运行流程
4. RabbitMQ入门案例 - fanout 模式
01 RabbitMQ的模式之发布订阅模式
发布订阅模式的具体实现
- 类型:fanout
- 特点:Fanout - 发布与订阅模式,是一种广播机制,它是没有路由 key的模式
(注意这里已经在可视化界面让队列绑定了交换机)
生产者
package com.chen.rabbitmq.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
发布订阅模式的具体实现
类型:fanout
特点:Fanout - 发布与订阅模式,是一种广播机制,它是没有路由 key的模式
*/
public class Producer {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("128.156.157.161");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("chenjinxian");
connectionFactory.setPassword("chenjinxian");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 准备发送消息的内容
String message = "hello xuexi!!!";
// 6:准备交换机
String exchangeName = "fanout_change";
// 8: 指定交换机的类型
String type = "fanout";
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
// @params2: 队列名称/routingkey
// @params3: 属性配置
// @params4: 发送消息的内容
// #.course.* queue3
// *.order.# queue2 ta
// com.order.course.xxx collecion
channel.basicPublish(exchangeName,"", null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
消费者
package com.chen.rabbitmq.fanout;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
发布订阅模式的具体实现
类型:fanout
特点:Fanout - 发布与订阅模式,是一种广播机制,它是没有路由 key的模式
*/
public class Consumer {
private static Runnable runnable = new Runnable() {
public void run() {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("128.156.157.151");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("chenjinxian");
connectionFactory.setPassword("chenjinxian");
//获取队列的名称
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
// 这里如果queue已经被创建过一次了,可以不需要定义
//channel.queueDeclare("queue1", false, false, false, null);
// 6: 定义接受消息的回调
Channel finalChannel = channel;
finalChannel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println(queueName + ":开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
};
public static void main(String[] args) {
// 启动三个线程去执行
new Thread(runnable, "queue1").start();
new Thread(runnable, "queue2").start();
new Thread(runnable, "queue3").start();
new Thread(runnable, "queue4").start();
//new Thread(runnable, "queue5").start();
}
}
5. RabbitMQ入门案例 - Direct 模式
(注意这里已经在可视化界面让队列绑定了交换机)
生产者
package com.chen.rabbitmq.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
Direct 模式
*/
public class Producer {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("128.176.157.151");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("chenjinxian");
connectionFactory.setPassword("chenjinxian");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 准备发送消息的内容
String message = "hello direct_exchange!!!";
// 6:准备交换机
String exchangeName = "direct_exchange";
// 7: 定义路由key
String routeKey = "email";
// 8: 指定交换机的类型
String type = "direct";
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
// @params2: 队列名称/routingkey
// @params3: 属性配置
// @params4: 发送消息的内容
// #.course.* queue3
// *.order.# queue2 ta
// com.order.course.xxx collecion
channel.basicPublish(exchangeName, routeKey, null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
消费者
package com.chen.rabbitmq.routing;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
Direct 模式
*/
public class Consumer {
private static Runnable runnable = new Runnable() {
public void run() {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("123.156.147.151");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("chenjinxian");
connectionFactory.setPassword("chenjinxian");
//获取队列的名称
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
// 这里如果queue已经被创建过一次了,可以不需要定义
//channel.queueDeclare("queue1", false, false, false, null);
// 6: 定义接受消息的回调
Channel finalChannel = channel;
finalChannel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println(queueName + ":开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
};
public static void main(String[] args) {
// 启动三个线程去执行
new Thread(runnable, "queue1").start();
new Thread(runnable, "queue2").start();
new Thread(runnable, "queue3").start();
new Thread(runnable, "queue4").start();
// new Thread(runnable, "queue5").start();
}
}
6. RabbitMQ入门案例 - Topic 模式
(注意这里已经在可视化界面让队列绑定了交换机)
** 生产者**
package com.chen.rabbitmq.topics;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
Topic模式
*/
public class Producer {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("125.156.157.151");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("chenjinxian");
connectionFactory.setPassword("chenjinxian");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 准备发送消息的内容
String message = "hello topic_exchange!!!";
// 6:准备交换机
String exchangeName = "topic_exchange";
// 7: 定义路由key
String routeKey = "com.order.user";
// 8: 指定交换机的类型
String type = "topic";
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
// @params2: 队列名称/routingkey
// @params3: 属性配置
// @params4: 发送消息的内容
// #.course.* queue3
// *.order.# queue2 ta
// com.order.course.xxx collecion
channel.basicPublish(exchangeName, routeKey, null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
消费者不变
完整案例(创建交换机,创建队列,交换机与队列绑定)
package com.chen.rabbitmq.all;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
完整案例
*/
public class Producer {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("151.156.157.151");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("chenjinxian");
connectionFactory.setPassword("chenjinxian");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 6: 准备发送消息的内容
String message = " 你好,小白";
// 交换机
String exchangeName = "direct_message_exchange";
// 交换机的类型 direct/topic/fanout/headers
String exchangeType = "direct";
// 如果你用界面把queueu 和 exchange的关系先绑定话,你代码就不需要在编写这些声明代码可以让代码变得更加简洁,但是不容读懂
// 如果用代码的方式去声明,我们要学习一下
// 7: 声明交换机 所谓的持久化就是指,交换机会不会随着服务器重启造成丢失,如果是true代表不丢失,false重启就会丢失
channel.exchangeDeclare(exchangeName,exchangeType,true);
// 8: 声明队列
channel.queueDeclare("queue5",true,false,false,null);
channel.queueDeclare("queue6",true,false,false,null);
channel.queueDeclare("queue7",true,false,false,null);
// 9:绑定队列和交换机的关系
channel.queueBind("queue5",exchangeName,"order");
channel.queueBind("queue6",exchangeName,"order");
channel.queueBind("queue7",exchangeName,"course");
channel.basicPublish(exchangeName, "course", null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
** 执行完后生成队列和交换机**
7. RabbitMQ入门案例 - Work模式
01 Work模式轮询模式(Round-Robin)
当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?
主要有两种模式:
- 轮询模式的分发:一个消费者一条,按均分配
- 公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配
01轮询模式
生产者
package com.chen.rabbitmq.work.lunxun;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
轮询模式
*/
public class Producer {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("123.156.147.151");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("chenjinxian");
connectionFactory.setPassword("chenjinxian");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 6: 准备发送消息的内容
//===============================end topic模式==================================
for (int i = 1; i <= 20; i++) {
//消息的内容
String msg = "学相伴:" + i;
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
// @params2: 队列名称/routingkey
// @params3: 属性配置
// @params4: 发送消息的内容
channel.basicPublish("", "queue1", null, msg.getBytes());
}
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
消费者
package com.chen.rabbitmq.work.lunxun;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
轮询模式
*/
public class Work1 {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("123.156.147.155");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("chenjinxian");
connectionFactory.setPassword("chenjinxian");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者-Work1");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
// 这里如果queue已经被创建过一次了,可以不需要定义
// channel.queueDeclare("queue1", false, false, false, null);
// 同一时刻,服务器只会推送一条消息给消费者
// 6: 定义接受消息的回调
Channel finalChannel = channel;
//finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
try{
System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
Thread.sleep(200);
}catch(Exception ex){
ex.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println("Work1-开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
package com.chen.rabbitmq.work.lunxun;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
轮询模式
*/
public class Work2 {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("123.195.157.151");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("chenjinxian");
connectionFactory.setPassword("chenjinxian");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者-Work2");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
// 这里如果queue已经被创建过一次了,可以不需要定义
//channel.queueDeclare("queue1", false, true, false, null);
// 同一时刻,服务器只会推送一条消息给消费者
//channel.basicQos(1);
// 6: 定义接受消息的回调
Channel finalChannel = channel;
//finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
try{
System.out.println("Work2-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
Thread.sleep(100);
}catch(Exception ex){
ex.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println("Work2-开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
02 Work模式公平分发模式
生产者
package com.chen.rabbitmq.work.fair;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
公平分发模式
*/
public class Producer {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("125.156.157.151");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("chenjinxian");
connectionFactory.setPassword("chenjinxian");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 6: 准备发送消息的内容
//===============================end topic模式==================================
for (int i = 1; i <= 20; i++) {
//消息的内容
String msg = "学相伴:" + i;
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
// @params2: 队列名称/routingkey
// @params3: 属性配置
// @params4: 发送消息的内容
channel.basicPublish("", "queue1", null, msg.getBytes());
}
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
消费者
package com.chen.rabbitmq.work.fair;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
公平分发模式
*/
public class Work1 {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("123.156.146.151");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("chenjinxian");
connectionFactory.setPassword("chenjinxian");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者-Work1");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
// 这里如果queue已经被创建过一次了,可以不需要定义
// channel.queueDeclare("queue1", false, false, false, null);
// 同一时刻,服务器只会推送一条消息给消费者
// 6: 定义接受消息的回调
final Channel finalChannel = channel;
finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", false, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
try{
System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
Thread.sleep(1000);
// 改成手动应答
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}catch(Exception ex){
ex.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println("Work1-开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
package com.chen.rabbitmq.work.fair;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
公平分发模式
*/
public class Work2 {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("121.156.157.131");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("chenjinxian");
connectionFactory.setPassword("chenjinxian");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者-Work2");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
// 这里如果queue已经被创建过一次了,可以不需要定义
//channel.queueDeclare("queue1", false, true, false, null);
// 同一时刻,服务器只会推送一条消息给消费者
//channel.basicQos(1);
// 6: 定义接受消息的回调
final Channel finalChannel = channel;
finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", false, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
try{
System.out.println("Work2-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
Thread.sleep(200);
// 一定使用我们的手动应答
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}catch(Exception ex){
ex.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println("Work2-开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
8. RabbitMQ使用场景
01 解耦、削峰、异步
同步异步的问题(串行)
串行方式:将订单信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端
并行方式 异步线程池
并行方式:将订单信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间
存在问题
- 耦合度高
- 需要自己写线程池自己维护成本太高
- 出现了消息可能会丢失,需要你自己做消息补偿
- 如何保证消息的可靠性你自己写
- 如果服务器承载不了,你需要自己去写高可用
异步消息队列的方式
好处:
- 完全解耦,用 MQ建立桥接
- 有独立的线程池和运行模型
- 出现了消息可能会丢失,MQ有持久化功能
- 如何保证消息的可靠性,死信队列和消息转移等
- 如果服务器承载不了,你需要自己去写高可用,HA镜像模型高可用
按照以上约定,用户的响应时间相当于是订单信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20QPS。比串行提高了3倍,比并行提高了两倍
02 高内聚,低耦合
好处:
- 完全解耦,用 MQ建立桥接
- 有独立的线程池和运行模型
- 出现了消息可能会丢失,MQ有持久化功能
- 如何保证消息的可靠性,死信队列和消息转移等
- 如果服务器承载不了,你需要自己去写高可用,HA镜像模型高可用
按照以上约定,用户的响应时间相当于是订单信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20QPS。比串行提高了3倍,比并行提高了两倍
四、Springboot案例
1. Fanout 模式
生产者
导入依赖
<!--rabbitmq starter 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml
# 服务端口
server:
port: 8080
# 配置rabbitmq服务
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
host: 127.0.0.1
port: 5672
目录结构
创建配置类RabbitMqConfiguration.java
package com.chen.springbootorderrabbitmqproducer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
*/
@Configuration
public class RabbitMqConfiguration {
// 1: 声明交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout_order_ex", true, false);
}
// 2: 声明队列 duanxin.fanout.queue
@Bean
public Queue duanxinqueue() {
return new Queue("duanxin.fanout.queue", true);
}
// 2: 声明队列 duanxin.fanout.queue
@Bean
public Queue smsqueue() {
return new Queue("sms.fanout.queue", true);
}
// 2: 声明队列 duanxin.fanout.queue
@Bean
public Queue emailqueue() {
return new Queue("email.fanout.queue", true);
}
// 3: 确定绑定关系
@Bean
public Binding bindduanxin(){
return BindingBuilder.bind(duanxinqueue()).to(fanoutExchange());
}
// 3: 确定绑定关系
@Bean
public Binding bindsms(){
return BindingBuilder.bind(smsqueue()).to(fanoutExchange());
}
// 3: 确定绑定关系
@Bean
public Binding bindemail(){
return BindingBuilder.bind(emailqueue()).to(fanoutExchange());
}
}
编写实现类OrderService.java
package com.chen.springbootorderrabbitmqproducer.service;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
/**
*/
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
// 交换机
private String exchangeName = "fanout_order_ex";
// 路由key
private String routingKey = "";
/**
* @Author xuke
* @Description 模拟用户购买商品下单的业务
* @Date 22:26 2021/3/5
* @Param [userId, productId, num]
* @return void
**/
public void makeOrder(String userId,String productId,int num){
// 1: 根据商品id查询库存是否充足
// 2: 保存订单
String orderId = UUID.randomUUID().toString();
System.out.println("保存订单成功:id是:" + orderId);
// 3: 发送消息
rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);
}
}
编写测试类
package com.chen.springbootorderrabbitmqproducer.rabbitmq.springbootorderrabbitmqproducer;
import com.chen.springbootorderrabbitmqproducer.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class SpringbootOrderRabbitmqProducerApplicationTests {
@Autowired
private OrderService orderService;
@Test
public void contextLoads() {
orderService.makeOrder("100","100",10);
}
@Test
public void testDirect() {
orderService.makeOrderDirect("100","100",10);
}
@Test
public void testDirectTTl() {
orderService.makeOrderDirectTtl("100","100",10);
}
// @Test
// public void testTopic() {
// orderService.makeOrderTopic("100","100",10);
// }
}
消费者
application.yml
# 服务端口
server:
port: 8080
# 配置rabbitmq服务
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
host: 127.0.0.1
port: 5672
接受消息
package com.chen.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
*/
@Service
@RabbitListener(queues ={"duanxin.direct.queue"})
public class DirectDuanxinConsumber {
// 告诉你的接收服务器的消息,没有返回值
@RabbitHandler
public void reviceMessage(String message){
System.out.println("duanxin--direct--->接收到订单消息,订单id是: " + message);
}
}
package com.chen.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
*/
@Service
@RabbitListener(queues ={"email.direct.queue"})
public class DirectEmailConsumber {
@RabbitHandler
public void reviceMessage(String message){
System.out.println("email---direct-->接收到订单消息,订单id是: " + message);
}
}
package com.chen.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
*/
@Service
@RabbitListener(queues ={"sms.direct.queue"})
public class DirectSmsConsumber {
@RabbitHandler
public void reviceMessage(String message){
System.out.println("sms--direct--->接收到订单消息,订单id是: " + message);
}
}
2. Direct 模式
生产者
配置类
package com.chen.springbootorderrabbitmqproducer.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
*/
@Configuration
public class RabbitMqConfiguration2 {
// 1: 声明交换机
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct_order_ex", true, false);
}
// 2: 声明队列 duanxin.direct.queue
@Bean
public Queue duanxinqueue() {
return new Queue("duanxin.direct.queue", true);
}
// 2: 声明队列 duanxin.direct.queue
@Bean
public Queue smsqueue() {
return new Queue("sms.direct.queue", true);
}
// 2: 声明队列 duanxin.direct.queue
@Bean
public Queue emailqueue() {
return new Queue("email.direct.queue", true);
}
// 3: 确定绑定关系
@Bean
public Binding bindduanxin(){
return BindingBuilder.bind(duanxinqueue()).to(directExchange()).with("msg");
}
// 3: 确定绑定关系
@Bean
public Binding bindsms(){
return BindingBuilder.bind(smsqueue()).to(directExchange()).with("sms");
}
// 3: 确定绑定关系
@Bean
public Binding bindemail(){
return BindingBuilder.bind(emailqueue()).to(directExchange()).with("email");
}
}
实现类:
package com.chen.springbootorderrabbitmqproducer.service;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
/**
*/
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
// 交换机
private String exchangeName = "fanout_order_ex";
// 路由key
private String routingKey = "";
/**
* @Description 模拟用户购买商品下单的业务
* @Param [userId, productId, num]
* @return void
**/
public void makeOrderDirect(String userId,String productId,int num){
// 1: 根据商品id查询库存是否充足
// 2: 保存订单
String orderId = UUID.randomUUID().toString();
System.out.println("保存订单成功:id是:" + orderId);
// 3: 发送消息
rabbitTemplate.convertAndSend("direct_order_ex","email",orderId);
rabbitTemplate.convertAndSend("direct_order_ex","sms",orderId);
}
}
测试:
package com.chen.springbootorderrabbitmqproducer.rabbitmq.springbootorderrabbitmqproducer;
import com.chen.springbootorderrabbitmqproducer.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class SpringbootOrderRabbitmqProducerApplicationTests {
@Autowired
private OrderService orderService;
@Test
public void testDirect() {
orderService.makeOrderDirect("100","100",10);
}
}
**消费者 **
package com.chen.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
* @description:
*/
@Service
@RabbitListener(queues ={"duanxin.direct.queue"})
public class DirectDuanxinConsumber {
// 告诉你的接收服务器的消息,没有返回值
@RabbitHandler
public void reviceMessage(String message){
System.out.println("duanxin--direct--->接收到订单消息,订单id是: " + message);
}
}
package com.chen.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
*/
@Service
@RabbitListener(queues ={"email.direct.queue"})
public class DirectEmailConsumber {
@RabbitHandler
public void reviceMessage(String message){
System.out.println("email---direct-->接收到订单消息,订单id是: " + message);
}
}
package com.chen.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
@Service
@RabbitListener(queues ={"sms.direct.queue"})
public class DirectSmsConsumber {
@RabbitHandler
public void reviceMessage(String message){
System.out.println("sms--direct--->接收到订单消息,订单id是: " + message);
}
}
3. Topic 模式
生产者
public class OrderService{
@Autowired
private RabbitTemplate rabbitTemplate;
//模拟用户下单
public void makeOrder(String userid,String productid,int num){
public void makeOrderTopic(String userId,String productId,int num){
// 1: 根据商品id查询库存是否充足
// 2: 保存订单
String orderId = UUID.randomUUID().toString();
System.out.println("保存订单成功:id是:" + orderId);
// 3: 发送消息
//com.# duanxin
//#.email.* email
//#.sms.# sms
// 设置消息确认机制
rabbitTemplate.convertAndSend("topic_order_ex","com.email.sms.xxx",orderId);
}
}
@Test
public void testTopic() {
orderService.makeOrderTopic("100","100",10);
}
消费者(采用注解)
FanoutSmsConsumer.java
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "sms.topic.queue",durable = "true",antoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange",type = "ExchangeTypes.TOPIC")
key = "#.sms.#"
))
public class TopicSmsConsumer{
@RabbitHandler
public void reviceMessage(String message){
sout("sms接收到了的订单信息是:"+message);
}
}
FanoutDuanxinConsumer.java
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "duanxin.topic.queue",durable = "true",antoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange",type = "ExchangeTypes.TOPIC")
key = "#.duanxin.#"
))
public classTopicDuanxinConsumer{
@RabbitHandler
public void reviceMessage(String message){
sout("duanxin接收到了的订单信息是:"+message);
}
}
FanoutEmailConsumer.java
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "email.topic.queue",durable = "true",antoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange",type = "ExchangeTypes.TOPIC")
key = "#.email.#"
))
public class TopicEmailConsumer{
@RabbitHandler
public void reviceMessage(String message){
sout("email接收到了的订单信息是:"+message);
}
}
五、RabbitMQ高级
- 过期时间TTL
概述
过期时间 TTl表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置 TTL,目前有两种方法可以设置
- 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间
- 第二种方法是对消息进行单独设置,每条消息 TTL可以不同
如果上述两种方法同时使用,则消息的过期时间以两者 TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的 TTL值,就称为 dead message被投递到死信队列,消费者将无法再收到该消息
设置队列TTL
RabbitMqConfiguration.java
@Configuration
public class TTLRabbitMQConfiguration{
//1.声明注册direct模式的交换机
@Bean
public DirectExchange ttldirectExchange(){
return new DirectExchange("ttl_direct_exchange",true,false);}
//2.队列的过期时间
@Bean
public Queue directttlQueue(){
//设置过期时间
Map<String,Object> args = new HashMap<>();
args.put("x-message-ttl",5000);//这里一定是int类型
return new Queue("ttl.direct.queue",true,false,false,args);}
@Bean
public Binding ttlBingding(){
return BindingBuilder.bin(directttlQueue()).to(ttldirectExchange()).with("ttl");
}
}
设置消息TTL
public class OrderService{
@Autowired
private RabbitTemplate rabbitTemplate;
//模拟用户下单
public void makeOrder(String userid,String productid,int num){
//1.根据商品id查询库存是否足够
//2.保存订单
String orderId = UUID.randomUUID().toString();
sout("订单生产成功:"+orderId);
//3.通过MQ来完成消息的分发
//参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容
String exchangeName = "ttl_order_exchange";
String routingKey = "ttlmessage";
//给消息设置过期时间
MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){
public Message postProcessMessage(Message message){
//这里就是字符串
message.getMessageProperties().setExpiration("5000");
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
}
}
rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId,messagePostProcessor);
}
}
RabbitMqConfiguration.java
@Configuration
public class TTLRabbitMQConfiguration{
//1.声明注册direct模式的交换机
@Bean
public DirectExchange ttldirectExchange(){
return new DirectExchange("ttl_direct_exchange",true,false);}
//2.队列的过期时间
@Bean
public Queue directttlQueue(){
//设置过期时间
Map<String,Object> args = new HashMap<>();
args.put("x-message-ttl",5000);//这里一定是int类型
return new Queue("ttl.direct.queue",true,false,false,args);}
@Bean
public Queue directttlMessageQueue(){
return new Queue("ttlMessage.direct.queue",true,false,false,args);}
@Bean
public Binding ttlBingding(){
return BindingBuilder.bin(directttlMessageQueue()).to(ttldirectExchange()).with("ttlmessage");
}
}
2. 死信队列
概述
DLX,全称
Dead-Letter-Exchange
,可以称之为死信交换机,也有人称之为死信邮箱。当消息再一个队列中变成死信之后,它能被重新发送到另一个交换机中,这个交换机就是 DLX,绑定 DLX的队列就称之为死信队列。消息变成死信,可能是由于以下原因:
- 消息被拒绝
- 消息过期
- 队列达到最大长度
DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性,当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的 DLX上去,进而被路由到另一个队列,即死信队列。
要想使用死信队列,只需要在定义队列的时候设置队列参数
x-dead-letter-exchange
指定交换机即可
代码
DeadRabbitMqConfiguration.java
@Configuration
public class DeadRabbitMqConfiguration{
//1.声明注册direct模式的交换机
@Bean
public DirectExchange deadDirect(){
return new DirectExchange("dead_direct_exchange",true,false);}
//2.队列的过期时间
@Bean
public Queue deadQueue(){
return new Queue("dead.direct.queue",true);}
@Bean
public Binding deadbinds(){
return BindingBuilder.bind(deadDirect()).to(deadQueue()).with("dead");
}
}
RabbitMqConfiguration.java
@Configuration
public class TTLRabbitMQConfiguration{
//1.声明注册direct模式的交换机
@Bean
public DirectExchange ttldirectExchange(){
return new DirectExchange("ttl_direct_exchange",true,false);}
//2.队列的过期时间
@Bean
public Queue directttlQueue(){
//设置过期时间
Map<String,Object> args = new HashMap<>();
//args.put("x-max-length",5);
args.put("x-message-ttl",5000);//这里一定是int类型
args.put("x-dead-letter-exchange","dead_direct_exchange");
args.put("x-dead-letter-routing-key","dead");//fanout不需要配置
return new Queue("ttl.direct.queue",true,false,false,args);}
@Bean
public Binding ttlBingding(){
return BindingBuilder.bin(directttlQueue()).to(ttldirectExchange()).with("ttlmessage");
}
}
3. 内存磁盘的监控
01 RabbitMQ内存警告
02 RabbitMQ的内存控制
参考帮助文档:
http://www.rabbbitmq.com/configure.html
当出现警告的时候,可以通过配置去修改和调整
命令的方式
rabbitmqctl set_vm_memory_high_watermark <fraction>
rabbitmqctl set_vm_memory_high_watermark absolute 50MB
fraction/value 为内存阈值。默认情况是:0.4/2GB,代表的含义是:当 RabbitMQ的内存超过40%时,就会产生警告并且会阻塞所有生产者的连接。通过此命令修改阈值在 Broker重启以后将会失效,通过修改配置文件设置的阈值则不会随着重启而消失,但修改了配置文件一样要重启 Broker才会生效
**配置文件方式 rabbitmq.conf **
03 RabbitMQ的内存换页
04 RabbitMQ的磁盘预警
4. 集群(docker集群rabbitmq)
1.先创建三个rabbitmq容器
docker run -itd --name rabbit01 --hostname myrabbit01 -v /home/software/rabbitmqcluster/rabbitmq01:/var/lib/rabbitmq -e RABBITMQ_DEFAULT_USER=chenjinxian -e RABBITMQ_DEFAULT_PASS=chenjinxian -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' -p 15672:15672 -p 5672:5672 -p 25672:25672 rabbitmq:management
docker run -itd --name rabbit02 --hostname myrabbit02 -v /home/software/rabbitmqcluster/rabbitmq01:/var/lib/rabbitmq -e RABBITMQ_DEFAULT_USER=chenjinxian -e RABBITMQ_DEFAULT_PASS=chenjinxian -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' --link rabbit01:myrabbit01 -p 15673:15672 -p 5673:5672 -p 25673:25672 rabbitmq:management
docker run -itd --name rabbit03 --hostname myrabbit03 -v /home/software/rabbitmqcluster/rabbitmq01:/var/lib/rabbitmq -e RABBITMQ_DEFAULT_USER=chenjinxian -e RABBITMQ_DEFAULT_PASS=chenjinxian -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' --link rabbit01:myrabbit01 --link rabbit02:myrabbit02 -p 15674:15672 -p 5674:5672 -p 25674:25672 rabbitmq:management
启动容器成功后,读者可以访问
分别通过浏览器访问:ip(自己的IP地址):15672;ip:15673;ip:15674都可以访问
2.容器节点加入集群
执行如下命令,进入第一个rabbitmq节点容器:
docker exec -it rabbit01 /bin/bash
进入容器后,操作rabbitmq,执行如下命令:
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
exit
执行如下命令,进入第二个rabbitmq节点容器:
docker exec -it rabbit02 /bin/bash
进入第二个rabbitmq节点容器,执行如下命令:
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@myrabbit01
rabbitmqctl start_app
exit
,进入第三个rabbitmq节点容器,执行如下命令:
docker exec -it rabbit03 /bin/bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@myrabbit01
rabbitmqctl start_app
exit
最后可以看到节点信息
版权归原作者 冷血~多好 所有, 如有侵权,请联系我们删除。