MQ架构设计原理
什么是消息中间件
消息中间件基于队列模型实现异步/同步传输数据
作用:可以实现支撑高并发、异步解耦、流量削峰、降低耦合度。
传统的http请求存在那些缺点
1.Http请求基于请求与响应的模型,在高并发的情况下,客户端发送大量的请求达到
服务器端有可能会导致我们服务器端处理请求堆积。
2.Tomcat服务器处理每个请求都有自己独立的线程,如果超过最大线程数会将该请求缓存到队列中,如果请求堆积过多的情况下,有可能会导致tomcat服务器崩溃的问题。
所以一般都会在nginx入口实现限流,整合服务保护框架。
3.http请求处理业务逻辑如果比较耗时的情况下,容易造成客户端一直等待,阻塞等待过程中会导致客户端超时发生重试策略,有可能会引发幂等性问题。
注意事项:接口是为http协议的情况下,最好不要处理比较耗时的业务逻辑,耗时的业务逻辑应该单独交给多线程或者是mq处理。
Mq应用场景有那些
消息队列(Message Queue,简称MQ)在分布式系统中有着广泛的应用,以下是MQ的一些主要应用场景:
1.异步处理:
- 当某个操作不需要立即返回结果,或者该操作非常耗时且影响主业务流程的性能时,可以使用MQ将操作放入后台异步执行。例如,用户注册成功后发送欢迎邮件或短信,这些操作不需要实时同步完成,可以放入MQ中异步处理。
2.应用解耦:
- MQ允许应用间进行低耦合的通信。通过将消息发送到一个或多个目的地址(队列或主题),应用程序可以独立地执行和扩展,而不需要知道彼此的实现细节。例如,订单系统产生订单后,将订单数据发送到MQ,库存系统、物流系统等多个系统监听MQ中的订单数据,根据订单数据执行相应的业务逻辑。
3.流量削峰:
- 在高并发场景下,如果某个应用或系统突然接收到大量的请求,可能导致系统压力过大甚至崩溃。通过使用MQ,可以将这些请求暂存起来,然后按照系统能够处理的速率匀速地将请求分发给后台服务进行处理。这样可以有效地缓解系统压力,避免系统崩溃。
4.数据同步:
- MQ可以实现不同系统之间的数据同步。例如,在分布式数据库中,可以使用MQ将数据的变更事件(如增删改操作)发布到MQ中,其他系统监听这些事件并据此更新自己的数据副本。
5.日志处理:
- MQ可以用于收集、处理和存储日志信息。通过将日志信息发送到MQ中,可以异步地将日志信息写入到存储系统(如HDFS、Elasticsearch等)中,并进行后续的分析和处理。
-6.任务调度:
- MQ可以用于实现任务的调度和执行。通过将任务信息发送到MQ中,并指定任务的执行时间和执行条件,可以实现定时任务、延迟任务等功能的实现。
7.微服务架构中的通信:
- 在微服务架构中,服务与服务之间的通信是常见的需求。使用MQ可以实现服务之间的松耦合通信,使得服务可以独立地扩展和升级。同时,MQ还可以提供消息持久化、消息确认等机制,保证服务之间通信的可靠性和稳定性。
8.分布式事务:
- 在分布式系统中,实现跨多个服务的事务一致性是一个挑战。通过使用MQ的分布式事务功能(如RabbitMQ的RabbitMQ-delayed-message-exchange插件或Kafka的事务API),可以实现跨多个服务的分布式事务处理。
9.消息广播与订阅:
- MQ可以实现消息的广播与订阅功能。发布者将消息发送到特定的主题(Topic),所有订阅了该主题的消费者都可以接收到这些消息。这种模式常用于实时消息系统、新闻推送等场景。
10.跨语言通信:
- MQ支持多种编程语言和平台,可以实现跨语言、跨平台的通信。这使得不同语言开发的应用或服务可以方便地进行通信和数据交换。
为什么使用MQ
消息中间件(如RabbitMQ、Kafka、ActiveMQ等)在分布式系统中扮演着重要的角色,它们基于队列模型(Queue Model)实现数据的异步或同步传输。下面我将详细解释这些中间件如何支持高并发、异步解耦、流量削峰以及降低耦合度的作用。
1.支撑高并发:
- 当系统面临高并发请求时,如果每个请求都直接由系统处理,可能会导致系统资源迅速耗尽,响应速度下降,甚至服务崩溃。
- 消息中间件通过队列的方式将请求暂存起来,后台服务可以异步地从队列中获取请求并进行处理。这样,即使在高并发场景下,系统也能保持稳定的响应速度,因为请求被均匀地分散到不同的时间段内处理。
- 消息中间件通常支持多消费者模型,即多个服务实例可以同时从队列中消费消息,从而进一步提高并发处理能力。
2.异步解耦:
- 在传统的同步请求-响应模型中,请求方必须等待响应方处理完请求后才能继续执行后续操作。这种模型下,请求方和响应方之间存在紧密的耦合关系。
- 消息中间件引入了异步通信机制,使得请求方和响应方可以解耦。请求方只需将请求发送到消息队列中,然后立即返回,无需等待响应。响应方可以在后台异步地处理这些请求,并通过其他方式(如回调函数、HTTP响应等)将结果通知给请求方。
- 这种异步解耦的方式可以提高系统的响应速度和吞吐量,同时减少系统间的依赖关系,提高系统的可扩展性和可维护性。
3.流量削峰:
- 在某些场景下,系统可能会在短时间内接收到大量的请求,导致系统负载急剧上升,甚至超过系统的处理能力。
- 消息中间件可以将这些请求暂存到队列中,然后按照系统能够处理的速率匀速地将请求分发给后台服务进行处理。这样,即使系统面临巨大的流量冲击,也能保持稳定的性能和响应速度。
- 流量削峰不仅可以保护系统免受流量冲击的影响,还可以提高系统的可用性和容错能力。
4.降低耦合度:
- 在复杂的分布式系统中,各个服务之间可能存在复杂的依赖关系,这些依赖关系可能导致系统难以维护和扩展。
- 消息中间件通过引入消息队列作为服务之间的通信媒介,降低了服务之间的耦合度。服务之间不再需要直接调用对方的方法或接口,而是通过发送和接收消息来进行通信。
- 这种基于消息的通信方式使得服务可以更加独立地设计和实现,提高了系统的可维护性和可扩展性。同时,由于服务之间的耦合度降低,系统的容错能力也得到了提高,即使某个服务出现故障,也不会影响其他服务的正常运行。
Mq与多线程之间区别
MQ可以实现异步/解耦/流量削峰问题;
多线程也可以实现异步,但是消耗到cpu资源,没有实现解耦。
实例:(基于springboot实现异步多线程)
package com.example.rabbitmq.mq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MemberService {
@Autowired
public MemberServiceAsync memberServiceAsync;
//@Bean
@RequestMapping("/addMember")
public String addMember(){
//1.数据库插入数据 log.info(">01<");
System.out.println(">01<");
// sms();
memberServiceAsync.smsAsync();
System.out.println(">04<");
return "用户注册成功!!";
}
public String sms(){
System.out.println(">02<");
try{
System.out.println(">正在发送短信<");
Thread.sleep(3000);
}catch(Exception e){
e.printStackTrace();
}
System.out.println(">03<");
return "短信发送完成!";
}
}
package com.example.rabbitmq.mq;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@Component
public class MemberServiceAsync {
@Async
public void smsAsync(){ // 注意返回类型为void,因为异步方法通常不返回结果给调用者
System.out.println(">02<");
try{
System.out.println(">正在发送短信<");
Thread.sleep(3000);
}catch(Exception e){
e.printStackTrace();
}
System.out.println(">03<");
// 注意:这里不应该返回任何值给调用者,因为调用是异步的
}
}
启动类
package com.example.rabbitmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@EnableAsync
@SpringBootApplication
public class RabbitMqApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitMqApplication.class, args);
}
}
效果
基于多线程队列简单实现mq
package com.example.rabbitmq.mq;
import org.json.JSONObject;
import java.util.concurrent.LinkedBlockingDeque;
/**
* 没有网络的情况下实现MQ
* 利用多线程制造生产者和消费者
*/
public class BoyatopThreadMQ {
//MQ服务器 初始化消息的队列
private static LinkedBlockingDeque<JSONObject> msgs=new LinkedBlockingDeque();
//主函数 程序入口
public static void main(String[] args) {
//生产者 生产线程
Thread producerThread = new Thread(new Runnable() {
@Override
public void run() {
try {
while(true){
Thread.sleep(1000);
JSONObject data = new JSONObject();
data.put("userId","12345");
//存入消息队列
msgs.offer(data);
}
} catch (Exception e) {
e.printStackTrace();
}
}
},"生产者");
producerThread.start();
//消费端 消费线程
Thread consumerThread = new Thread(new Runnable() {
@Override
public void run() {
try {
while(true){
JSONObject data = msgs.poll();
if(data != null){
System.out.println(Thread.currentThread().getName() + ",获取到数据:" + data);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
},"消费者");
consumerThread.start();
}
}
效果:
基于netty实现mq
Netty是一个高性能、异步事件驱动的网络应用程序框架,专门用于构建高性能、高可靠性的网络应用程序。它提供了一个快速简便的方法来构建网络服务器和客户端,并且不需要关注复杂的网络编程细节。Netty支持多种网络协议,包括TCP、UDP、HTTP、WebSocket等,使其适用于各种应用场景。Netty在许多大型互联网公司和开源项目中被广泛使用,特别是在需要处理大量并发连接和低延迟的场景中。
消费者netty客户端与nettyServer端MQ服务器端保持长连接,MQ服务器端保存消费者连接。
生产者netty客户端发送请求给nettyServer端MQ服务器端,MQ服务器端在将该消息内容发送给消费者
导入相关依赖
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.23.Final</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.11</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.3.4</version>
</dependency>
Netty生产端
package com.example.rabbitmq.netty;
import com.alibaba.fastjson.JSONObject;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @ClassName BoyatopNettyMQProducer
* @Author
* @Version V1.0
**/
public class NettyProducer {
public void connect(int port, String host) throws Exception {
//配置客户端NIO 线程组
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {
client.group(group)
// 设置为Netty客户端
.channel(NioSocketChannel.class)
/**
* ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。
* Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。
* 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。
*/
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyProducer.NettyClientHandler());
1. 演示LineBasedFrameDecoder编码器
// ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
// ch.pipeline().addLast(new StringDecoder());
}
});
//绑定端口, 异步连接操作
ChannelFuture future = client.connect(host, port).sync();
//等待客户端连接端口关闭
future.channel().closeFuture().sync();
} finally {
//优雅关闭 线程组
group.shutdownGracefully();
}
}
public static void main(String[] args) {
int port = 9008;
NettyProducer client = new NettyProducer();
try {
client.connect(port, "127.0.0.1");
} catch (Exception e) {
e.printStackTrace();
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
JSONObject data = new JSONObject();
data.put("type", "producer");
JSONObject msg = new JSONObject();
msg.put("userId", "123456");
msg.put("age", "23");
data.put("msg", msg);
// 生产发送数据
byte[] req = data.toJSONString().getBytes();
ByteBuf firstMSG = Unpooled.buffer(req.length);
firstMSG.writeBytes(req);
ctx.writeAndFlush(firstMSG);
}
/**
* 客户端读取到服务器端数据
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("客户端接收到服务器端请求:" + body);
}
// tcp属于双向传输
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
}
Netty服务端
package com.example.rabbitmq.netty;
import com.alibaba.fastjson.JSONObject;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.commons.lang3.StringUtils;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.concurrent.LinkedBlockingDeque;
/**
* @ClassName
* @Author
* @Version V1.0
**/
public class NettyMQServer {
public void bind(int port) throws Exception {
/**
* Netty 抽象出两组线程池BossGroup和WorkerGroup
* BossGroup专门负责接收客户端的连接, WorkerGroup专门负责网络的读写。
*/
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
try {
bootstrap.group(bossGroup, workerGroup)
// 设定NioServerSocketChannel 为服务器端
.channel(NioServerSocketChannel.class)
//BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,
//用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。
.option(ChannelOption.SO_BACKLOG, 100)
// 服务器端监听数据回调Handler
.childHandler(new NettyMQServer.ChildChannelHandler());
//绑定端口, 同步等待成功;
ChannelFuture future = bootstrap.bind(port).sync();
System.out.println("当前服务器端启动成功...");
//等待服务端监听端口关闭
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
//优雅关闭 线程组
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 设置异步回调监听
ch.pipeline().addLast(new NettyMQServer.MayiktServerHandler());
}
}
public static void main(String[] args) throws Exception {
int port = 9008;
new NettyMQServer().bind(port);
}
private static final String type_consumer = "consumer";
private static final String type_producer = "producer";
private static LinkedBlockingDeque<String> msgs = new LinkedBlockingDeque<>();
private static ArrayList<ChannelHandlerContext> ctxs = new ArrayList<>();
// 生产者投递消息的:topicName
public class MayiktServerHandler extends SimpleChannelInboundHandler<Object> {
/**
* 服务器接收客户端请求
*
* @param ctx
* @param data
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object data)
throws Exception {
JSONObject clientMsg = getData(data);
String type = clientMsg.getString("type");
switch (type) {
case type_producer:
producer(clientMsg);
break;
case type_consumer:
consumer(ctx);
break;
}
}
private void consumer(ChannelHandlerContext ctx) {
// 保存消费者连接
ctxs.add(ctx);
// 主动拉取mq服务器端缓存中没有被消费的消息
String data = msgs.poll();
if (StringUtils.isEmpty(data)) {
return;
}
// 将该消息发送给消费者
byte[] req = data.getBytes();
ByteBuf firstMSG = Unpooled.buffer(req.length);
firstMSG.writeBytes(req);
ctx.writeAndFlush(firstMSG);
}
private void producer(JSONObject clientMsg) {
// 缓存生产者投递 消息
String msg = clientMsg.getString("msg");
msgs.offer(msg);
//需要将该消息推送消费者
ctxs.forEach((ctx) -> {
// 将该消息发送给消费者
String data = msgs.poll();
if (data == null) {
return;
}
byte[] req = data.getBytes();
ByteBuf firstMSG = Unpooled.buffer(req.length);
firstMSG.writeBytes(req);
ctx.writeAndFlush(firstMSG);
});
}
private JSONObject getData(Object data) throws UnsupportedEncodingException {
ByteBuf buf = (ByteBuf) data;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
return JSONObject.parseObject(body);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}
}
}
Netty消费端
package com.example.rabbitmq.netty;
import com.alibaba.fastjson.JSONObject;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @ClassName BoyatopNettyMQProducer
* @Author
* @Version V1.0
**/
public class NettyConsumer {
public void connect(int port, String host) throws Exception {
//配置客户端NIO 线程组
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {
client.group(group)
// 设置为Netty客户端
.channel(NioSocketChannel.class)
/**
* ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。
* Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。
* 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。
*/
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyConsumer.NettyClientHandler());
1. 演示LineBasedFrameDecoder编码器
// ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
// ch.pipeline().addLast(new StringDecoder());
}
});
//绑定端口, 异步连接操作
ChannelFuture future = client.connect(host, port).sync();
//等待客户端连接端口关闭
future.channel().closeFuture().sync();
} finally {
//优雅关闭 线程组
group.shutdownGracefully();
}
}
public static void main(String[] args) {
int port = 9008;
NettyConsumer client = new NettyConsumer();
try {
client.connect(port, "127.0.0.1");
} catch (Exception e) {
e.printStackTrace();
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
JSONObject data = new JSONObject();
data.put("type", "consumer");
// 生产发送数据
byte[] req = data.toJSONString().getBytes();
ByteBuf firstMSG = Unpooled.buffer(req.length);
firstMSG.writeBytes(req);
ctx.writeAndFlush(firstMSG);
}
/**
* 客户端读取到服务器端数据
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("客户端接收到服务器端请求:" + body);
}
// tcp属于双向传输
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
}
netty和rabbitmq的区别
首先,从设计目的和功能上看,Netty是一个高性能、异步事件驱动的网络应用程序框架,主要用于构建高性能、高可靠性的网络应用程序。它提供了一个快速简便的方法来构建网络服务器和客户端,并且不需要关注复杂的网络编程细节。而RabbitMQ是一个开源的消息队列系统,主要用于在不同的应用程序之间进行消息传递和异步通信。它实现了一种分布式消息传递机制,使得应用程序可以独立地工作,并通过消息队列来交换数据。
其次,从使用场景上看,Netty更侧重于底层网络通信的开发,可以处理TCP、UDP等底层网络协议,适用于需要高性能网络通信的场景。而RabbitMQ则更侧重于业务层次的消息传递,它支持多种消息传递协议,如AMQP、MQTT、STOMP等,并提供了可靠的消息传递机制、灵活的消息路由和队列管理功能,适用于需要异步通信、解耦和流量削峰等场景。
此外,Netty和RabbitMQ在性能、可扩展性、可靠性等方面也有所不同。Netty通过异步事件驱动的方式和高效的线程模型,实现了高性能的网络通信。而RabbitMQ则通过分布式架构和持久化机制,实现了高可靠性和可扩展性。同时,RabbitMQ还提供了可视化的管理界面和多种编程语言的客户端库,方便用户进行管理和使用。
Mq消息中间件名词
Producer 生产者:投递消息到MQ服务器端;
Consumer 消费者:从MQ服务器端获取消息处理业务逻辑;
Broker MQ服务器端
Topic 主题:分类业务逻辑发送短信主题、发送优惠券主题
Queue 存放消息模型队列 先进先出 后进后出原则 数组/链表
Message 生产者投递消息报文:json
主流mq区别对比
特性
ActiveMQ
RabbitMQ
RocketMQ
kafka
开发语言
java
erlang
java
scala
单机吞吐量
万级
万级
10万级
10万级
时效性
ms级
us级
ms级
ms级以内
可用性
高(主从架构)
高(主从架构)
非常高(分布式架构)
非常高(分布式架构)
功能特性
成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持较好
基于erlang开发,所以并发能力很强,性能极其好,延时很低管理界面较丰富
MQ功能比较完备,扩展性佳
只支持主要的MQ功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广。
centos安装rabbitmq
命令安装
1、更新系统
yum update -y
2、安装依赖包erlang
yum install erlang -y
3、添加rabbitMQ仓库
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
4、安装rabbitMQ
yum install rabbitmq-server -y
erlang安装成功
添加rabbitMQ仓库
安装rabbitMQ成功
手动安装
1.官网地址
Installing RabbitMQ | RabbitMQ
2.文件上传
上传到/usr/local/software 目录下(如果没有 software 需要自己创建)
3.安装文件(分别按照以下顺序安装)
rpm -ivh erlang-21.3-1.el7.x86_64.rpm
yum install socat -y
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
常用命令(按照以下顺序执行)
添加开机启动 RabbitMQ 服务
chkconfig rabbitmq-server on
systemctl enable rabbitmq-server.service
启动服务
/sbin/service rabbitmq-server start
查看服务状态
/sbin/service rabbitmq-server status
停止服务(选择执行)
/sbin/service rabbitmq-server stop
开启 web 管理插件
rabbitmq-plugins enable rabbitmq_management
用默认账号密码(guest)访问地址http://62.234.175.16:15672/出现权限问题
添加一个新的用户
创建账号
rabbitmqctl add_user admin 123
设置用户角色
rabbitmqctl set_user_tags admin administrator
设置用户权限
#set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
rabbitmqctl set_permissions -p "/" admin "." "." ".*"
用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限的
查看当前用户和角色
rabbitmqctl list_users
5.再次利用 admin 用户登录
重置命令
关闭应用的命令为
rabbitmqctl stop_app
清除的命令为
rabbitmqctl reset
重新启动命令为
rabbitmqctl start_app
简单案例
1、导入依赖
<!--rabbitmq 依赖客户端 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!-- 操作文件流的一个依赖 -->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
2、消息生产者
package com.example.rabbitmq.demo1;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
// 创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
//channel 实现了自动 close 接口 自动关闭 不需要显示关闭
try(Connection connection = factory.newConnection(); Channel channel =
connection.createChannel()) {
/**
* 生成一个队列
* 1. 队列名称
* 2. 队列里面的消息是否持久化 默认消息存储在内存中
* 3. 该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
* 4. 是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
* 5. 其他参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message="hello world";
/**
* 发送一个消息
* 1. 发送到那个交换机
* 2. 路由的 key 是哪个
* 3. 其他的参数信息
* 4. 发送消息的消息体
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println(" 消息发送完毕");
}
}
}
3、消息消费者
package com.example.rabbitmq.demo1;
import com.rabbitmq.client.*;
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println(" 等待接收消息....");
// 推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String message= new String(delivery.getBody());
System.out.println(message);
};
// 取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback=(consumerTag)->{
System.out.println(" 消息消费被中断");
};
/**
* 消费者消费消息
* 1. 消费哪个队列
* 2. 消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
* 3. 消费者未成功消费的回调
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
RabbitMQ的六种消息模式
1.简单模式(Simple):
- 这是一个一对一的消息模式。生产者将消息发送到队列,一个消费者从队列中获取消息。当多个消费者同时监听一个队列时,他们并不能同时消费一条消息,而是随机消费消息。一旦消息被消费者接收,它就会从队列中删除。
2.工作队列模式(Work Queues):
- 这是一个一对多的消息模式。生产者将消息发送到队列,多个消费者从队列中获取消息。消息会在消费者之间平均分配,但具体的分发机制(轮询或公平分发)可以配置。这种模式适用于执行资源密集型任务,单个消费者处理不过来,需要多个消费者进行处理。
3.发布订阅模式(Publish/Subscribe 或 Fanout):
- 这是一个广播式的消息传递方式。生产者发送消息到交换机,交换机将消息广播到所有与之绑定的队列,每个队列对应的消费者都可以接收到消息。
4.路由模式(Routing):
- 类似于发布订阅模式,但增加了路由键(Routing Key)的概念。生产者发送消息时,会指定一个路由键。交换机根据路由键将消息发送到匹配的队列,再由队列对应的消费者消费。
5.主题模式(Topics):
- 这是路由模式的扩展。路由键可以是多个单词的字符串,用.分隔。消费者可以通过通配符(* 和 #)来匹配路由键,从而接收消息。* 匹配一个单词,# 匹配一个或多个单词。
6.RPC模式(Remote Procedure Call):
- RPC 是一种通信机制,允许客户端发送请求到远程服务,并等待响应。RabbitMQ 可以用于实现 RPC,客户端发送请求消息到队列,服务器从队列中获取请求并处理,然后将结果发送回客户端。
1、简单模式(点对点)
。在这个模式中,一个生产者(Producer)发送消息到一个队列(Queue),然后一个消费者(Consumer)从该队列中接收并处理这些消息。这个模式没有使用交换机(Exchange),而是直接将消息发送到队列。
案例参照上述简单案例
2、工作队列模式(Work Queues)
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。
相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进
程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
1)轮询分发消息
案例:一个生产者发送消息,两个消费者接收消息
①抽取工具类
package com.example.rabbitmq.demo2;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMqUtils {
// 得到一个连接的 channel
public static Channel getChannel() throws Exception{
// 创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
//设置VirtualHost
// factory.setVirtualHost("/jxHosts");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
②启动两个工作线程
package com.example.rabbitmq.demo2;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
/**
* 演示消息轮询机制
*/
public class Worker01 {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String receivedMessage = new String(delivery.getBody());
System.out.println(" 接收到消息:"+receivedMessage);
};
CancelCallback cancelCallback=(consumerTag)->{
System.out.println(consumerTag+" 消费者取消消费接口回调逻辑");
};
System.out.println("C1 消费者启动等待消费......");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
================================================================================
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
/**
* 演示消息轮询机制
*/
public class Worker02 {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String receivedMessage = new String(delivery.getBody());
System.out.println(" 接收到消息:"+receivedMessage);
};
CancelCallback cancelCallback=(consumerTag)->{
System.out.println(consumerTag+" 消费者取消消费接口回调逻辑");
};
System.out.println("C2 消费者启动等待消费......");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
③启动一个发送线程
package com.example.rabbitmq.demo2;
import com.rabbitmq.client.Channel;
import java.util.Scanner;
/**
* 演示消息轮询机制
*/
public class Task01 {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
try(Channel channel=RabbitMqUtils.getChannel();) {
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
System.out.println("任务一,请输入消息:");
// 从控制台当中接受信息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println(" 发送消息完成:"+message);
}
}
}
}
④结果展示
2)消息应答
① 概念
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成
了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消
息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续
发送给该消费这的消息,因为它无法接收到。
为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是: 消费 者在接
收到消息并且处理该消息之后,告诉 q rabbitmq 它已经处理了,q rabbitmq 可以把该消息删除了。
② 自动应答
消息发送后立即被认为已经传送成功,这种模式需要在 高吞吐量和数据传输安全性方面做权
衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢
失了,当然另一方面这种模式消费者那边可以传递过载的消息, 没有对传递的消息数量进行限制,
当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终
使得内存耗尽,最终这些消费者线程被操作系统杀死, 所以这种模式仅适用在消费者可以高效并
以某种速率能够处理这些消息的情况下使用。
RabbitMQ默认采用自动应答
③ 消息应答的方法
A.Channel.basicAck(用于肯定确认)
RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
B B.Channel.basicNack(用于否定确认)
C C.Channel.basicReject(用于否定确认)
与 Channel.basicNack 相比少一个参数
不处理该消息了直接拒绝,可以将其丢弃了
④ 手动应答
手动应答的好处是可以批量应答并且减少网络拥堵
multiple 的 true 和 false 代表不同意思
true 代表批量应答 channel 上未应答的消息
比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时5-8 的这些还未应答的消息都会被确认收到消息应答
false 同上面相比,只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答
手动应答的好处
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
案例:手动应答,一个生产者,两个消费者
① 睡眠工具类
package com.example.rabbitmq.demo2;
public class SleepUtils {
public static void sleep(int second){
try {
Thread.sleep(1000*second);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
② 生产者
package com.example.rabbitmq.demo2;
import com.rabbitmq.client.Channel;
import java.util.Scanner;
/**
* 确认应答机制
*/
public class Task02 {
private static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
System.out.println("确认应答机制");
Scanner sc = new Scanner(System.in);
System.out.println(" 请输入信息");
while (sc.hasNext()) {
String message = sc.nextLine();
channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" 生产者发出消息" + message);
}
}
}
}
③ 消费者01
package com.example.rabbitmq.demo2;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class Work03 {
private static final String ACK_QUEUE_NAME="ack-queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C1 等待接收消息处理时间较短");
// 消息消费的时候如何处理消息
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String message= new String(delivery.getBody());
SleepUtils.sleep(1);
System.out.println(" 接收到消息:"+message);
/**
* 1. 消息标记 tag
* 2. 是否批量应答未应答消息
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
// 采用手动应答
boolean autoAck=false;
channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{
System.out.println(consumerTag+" 消费者取消消费接口回调逻辑");
});
}
}
④ 消费者02
package com.example.rabbitmq.demo2;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
/**
* 确认应答机制
*
*/
public class Work04 {
private static final String ACK_QUEUE_NAME="ack-queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C2 等待接收消息处理时间较长");
// 消息消费的时候如何处理消息
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String message= new String(delivery.getBody());
SleepUtils.sleep(30);
System.out.println(" 接收到消息:"+message);
/**
* 1. 消息标记 tag
* 2. 是否批量应答未应答消息
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
// 采用手动应答
boolean autoAck=false;
channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{
System.out.println(consumerTag+" 消费者取消消费接口回 调逻辑");
});
}
}
⑤ 结果展示
在发送者发送消息 dd,发出消息之后的把 C2 消费者停掉,按理说该 C2 来处理该消息,但是
由于它处理时间较长,在还未处理完,也就是说 C2 还没有执行 ack 代码的时候,C2 被停掉了,
此时会看到消息被 C1 接收到了,说明消息 dd 被重新入队,然后分配给能处理消息的 C1 处理了
RabbitMQ的四种交换机类型
1.Fanout Exchange(扇型交换机):
- 这种交换机将消息广播到与之绑定的所有队列,无论消息的路由键是什么。它也通常用于发布/订阅模式,其中一个消息被广播给所有订阅者。
2.Direct Exchange(直连交换机):
- 这种交换机根据消息的路由键(Routing Key)将消息发送到与之完全匹配的队列。
3.Topic Exchange(主题交换机):
- 这种交换机根据消息的路由键与队列绑定时指定的路由键模式(通配符)匹配程度,将消息路由到一个或多个队列。它通常用于发布/订阅模式和复杂的消息路由需求。
4.Headers Exchange(头交换机):
- 这种交换机根据消息的标头信息(Headers)来决定消息的路由,而不是使用路由键。它允许开发者在消息头中定义多个属性,并使用这些属性来过滤消息。
RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。
相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来
自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。
1、Fanout Exchange(扇型交换机)
将接收到的所有消息广播到它知道的所有队列中
案例:
① 生产者
package com.example.rabbitmq.demo4;
import com.example.rabbitmq.demo3.RabbitMQConnection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ProducerFanout {
/**
* 定义交换机的名称
*/
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建Connection
Connection connection = RabbitMQConnection.getConnection();
// 创建Channel
Channel channel = connection.createChannel();
// 通道关联交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
String msg = "交换机样例演示";
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
channel.close();
connection.close();
}
}
② 邮件消费者
package com.example.rabbitmq.demo4;
import com.example.rabbitmq.demo3.RabbitMQConnection;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class MailConsumer {
/**
* 定义邮件队列
*/
private static final String QUEUE_NAME = "fanout_email_queue";
/**
* 定义交换机的名称
*/
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("邮件消费者...");
// 创建我们的连接
Connection connection = RabbitMQConnection.getConnection();
// 创建我们通道
final Channel channel = connection.createChannel();
// 关联队列消费者关联队列
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("邮件消费者获取消息:" + msg);
}
};
// 开始监听消息 自动签收
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
③ 短信消费者
package com.example.rabbitmq.demo4;
import com.example.rabbitmq.demo3.RabbitMQConnection;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class SmsConsumer {
/**
* 定义短信队列
*/
private static final String QUEUE_NAME = "fanout_email_sms";
/**
* 定义交换机的名称
*/
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("短信消费者...");
// 创建我们的连接
Connection connection = RabbitMQConnection.getConnection();
// 创建我们通道
final Channel channel = connection.createChannel();
// 关联队列消费者关联队列
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("短信消费者获取消息:" + msg);
}
};
// 开始监听消息 自动签收
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
④ 结果展示
2、Direct Exchange(直连交换机)
消息只去到它绑定的routingKey 队列中去
案例:一个生产者,两个消费者
①生产者
package com.example.rabbitmq.demo5;
import com.example.rabbitmq.demo2.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.util.HashMap;
import java.util.Map;
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 创建多个 bindingKey
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("info"," 普通 info 信息");
bindingKeyMap.put("warning"," 警告 warning 信息");
bindingKeyMap.put("error"," 错误 error 信息");
//debug 没有消费这接收这个消息 所有就丢失了
bindingKeyMap.put("debug"," 调试 debug 信息");
for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){
String bindingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,bindingKey, null,
message.getBytes("UTF-8"));
System.out.println(" 生产者发出消息:" + message);
}
}
}
}
②消费者01
package com.example.rabbitmq.demo5;
import com.example.rabbitmq.demo2.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.apache.commons.io.FileUtils;
import java.io.File;
public class ReceiveLogsDirect01 {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = "disk";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "error");
System.out.println(" 等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
message=" 接收绑定键:"+delivery.getEnvelope().getRoutingKey()+", 消息:"+message;
File file = new File("D:\\rabbitmq_info.txt");
FileUtils.writeStringToFile(file,message,"UTF-8");
System.out.println(" 错误日志已经接收");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
③消费者02
package com.example.rabbitmq.demo5;
import com.example.rabbitmq.demo2.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogsDirect02 {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = "console";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "info");
channel.queueBind(queueName, EXCHANGE_NAME, "warning");
System.out.println(" 等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" 接 收 绑 定 键 :"+delivery.getEnvelope().getRoutingKey()+", 消息:"+message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
④结果展示:
3、Topic Exchange(主题交换机)
尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候 direct 就办不到了。这个时候就只能使用 topic 类型
发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:"stock.usd.nyse", "nyse.vmw",
"quick.orange.rabbit".这种类型的。当然这个单词列表最多不能超过 255 个字节。
在这个规则列表中,其中有两个替换符是大家需要注意的
*(星号)可以代替一个单词
#(井号)可以替代零个或多个单词
案例1:使用全匹配
生产者
package com.example.rabbitmq.demo6;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Scanner;
public class Producer {
public static void main(String[] args) throws Exception {
//连接
ConnectionFactory f = new ConnectionFactory();
f.setHost("127.0.0.1"); // wht6.cn
// f.setPort(5672); //默认端口可以省略
f.setUsername("guest");
f.setPassword("guest");
Channel c = f.newConnection().createChannel();
//定义交换机
c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
//向交换机发消息,并携带路由键关键词
while (true) {
System.out.print("输入消息:");
String s = new Scanner(System.in).nextLine();
System.out.print("输入路由键:");
String key = new Scanner(System.in).nextLine();
c.basicPublish("topic_logs", key, null, s.getBytes());
}
}
}
消费者
package com.example.rabbitmq.demo6;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Scanner;
import java.util.UUID;
public class Consumer {
public static void main(String[] args) throws Exception {
//连接
ConnectionFactory f = new ConnectionFactory();
f.setHost("127.0.0.1"); // wht6.cn
// f.setPort(5672); //默认端口可以省略
f.setUsername("guest");
f.setPassword("guest");
Channel c = f.newConnection().createChannel();
// 定义随机队列,定义交换机,用绑定键绑定
String queue = UUID.randomUUID().toString();
c.queueDeclare(queue, false, true, true, null);
c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
System.out.println("输入绑定键,用空格隔开:");
String s = new Scanner(System.in).nextLine();// "aa bb cc"-->["aa", "bb", "cc"]
String[] a = s.split("\\s+");
for (String key : a) {
c.queueBind(queue, "topic_logs", key);
}
// 从随机队列正常消费数据
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
String key = message.getEnvelope().getRoutingKey();
String s = new String(message.getBody());
System.out.println(key+" - 收到: "+s);
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
// 消费数据
c.basicConsume(queue, true, deliverCallback, cancelCallback);
}
}
结果:
案例2
生产者
package com.example.rabbitmq.demo6;
import com.example.rabbitmq.demo2.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.util.HashMap;
import java.util.Map;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
/**
* Q1--> 绑定的是
* 中间带 orange 带 3 个单词的字符串 (*.orange.*)
* Q2--> 绑定的是
* 最后一个单词是 rabbit 的 3 个单词 (*.*.rabbit)
* 第一个单词是 lazy 的多个单词 (lazy.#)
*
*/
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("quick.orange.rabbit"," 被队列 Q1Q2 接收到");
bindingKeyMap.put("lazy.orange.elephant"," 被队列 Q1Q2 接收到");
bindingKeyMap.put("quick.orange.fox"," 被队列 Q1 接收到");
bindingKeyMap.put("lazy.brown.fox"," 被队列 Q2 接收到");
bindingKeyMap.put("lazy.pink.rabbit"," 虽然满足两个绑定但只被队列 Q2 接收一次");
bindingKeyMap.put("quick.brown.fox"," 不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingKeyMap.put("quick.orange.male.rabbit"," 是四个单词不匹配任何绑定会被丢弃");
bindingKeyMap.put("lazy.orange.male.rabbit"," 是四个单词但匹配 Q2");
for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){
String bindingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,bindingKey, null,
message.getBytes("UTF-8"));
System.out.println(" 生产者发出消息" + message);
}
}
}
}
消费者01
package com.example.rabbitmq.demo6;
import com.example.rabbitmq.demo2.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogsTopic01 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 声明 Q1 队列与绑定关系
String queueName="Q1";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
System.out.println(" 等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" 接 收 队 列 :"+queueName+" 绑 定 键:"+delivery.getEnvelope().getRoutingKey()+", 消息:"+message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
消费者02
package com.example.rabbitmq.demo6;
import com.example.rabbitmq.demo2.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogsTopic02 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 声明 Q2 队列与绑定关系
String queueName="Q2";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
System.out.println(" 等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" 接 收 队 列 :"+queueName+" 绑 定 键:"+delivery.getEnvelope().getRoutingKey()+", 消息:"+message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
结果:
SpringBoot整合RabbitMq
Maven依赖
<!-- springboot-web组件 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 添加springboot对amqp的支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<!--fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.49</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
配置文件
application.yml
spring:
rabbitmq:
host: 127.0.0.1 # wht6.cn
port: 5672
username: guest
password: guest
virtual-host: /jxHosts
listener:
simple:
prefetch: 1 # 预抓取消息数,spring默认250
方式一:聚合项目
Main
package cn.tedu.rabbitmqspring.m5;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import javax.annotation.PostConstruct;
import java.util.UUID;
@SpringBootApplication
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
//定义交换机
@Bean
public TopicExchange logsExchange() {
// 参数: 1.名称 2.持久 3.自动删除
// 默认是持久交换机
return new TopicExchange("topic_logs", false, false);
}
/*
默认使用方法名作为对象的属性名放入spring容器
"rndQueue" ---- Queue实例
*/
@Bean
public Queue rndQueue() {
return new Queue(UUID.randomUUID().toString(), false, true, true);
}
//用生产者发送消息
@Autowired
private Producer p;
/*
@PostConstruct
spring在完成扫描创建实例,完成所有的依赖注入后,
会自动地执行 @PostConstruct 方法
spring单线程执行下面的流程:
创建实例 --> 完成注入 --> @PostConstruct --> 后续配置
*/
@PostConstruct
public void test() {
// new Thread(new Runnable() {
// @Override
// public void run() {
// //线程中执行的代码
// }
// }).start();
// lambda 表达式,匿名内部类的简写
new Thread(() -> p.send()).start();
}
}
生产者Producer
package cn.tedu.rabbitmqspring.m5;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Scanner;
@Component
public class Producer {
// RabbitAutoConfiguration 自动配置类中
// 自动创建 AmqpTemplate 实例
@Autowired
private AmqpTemplate t;
//生产者的发送方法,需要手动调用
public void send() {
while (true) {
System.out.print("输入消息:");
String s = new Scanner(System.in).nextLine();
System.out.print("输入路由键:");
String k = new Scanner(System.in).nextLine();
t.convertAndSend("topic_logs", k, s);
}
}
}
消费者Consumer
package cn.tedu.rabbitmqspring.m5;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
// 启动一个消费者,从队列接收消息,并传递给这个方法进行处理
// 消费者自动启动,不需要手动调用
// spring expression language - SPEL #{} 可以直接访问spring容器中的对象
// ${} OGNL
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "#{rndQueue.name}",declare="false"), //队列,不指定参数,由服务器自动命名,非持久,独占,自动删除
exchange = @Exchange(name = "topic_logs",declare="false"), //交换机,declare="false"使用存在的交换机而不重复定义
key = {"*.orange.*"}
))
public void receive1(String msg) {
System.out.println("消费者1收到:"+msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue, //队列,不指定参数,由服务器自动命名,非持久,独占,自动删除
exchange = @Exchange(name = "topic_logs",declare="false"), //交换机,declare="false"使用存在的交换机而不重复定义
key = {"*.*.rabbit", "lazy.#"}
))
public void receive2(String msg) {
System.out.println("消费者2收到:"+msg);
}
}
方式二:分离项目
分为三个子项目:生产者,短信消费者,邮件消费者
消息实体类
package com.boyatop.entity;
import lombok.Data;
import java.io.Serializable;
/**
* @ClassName MsgEntity
* @Author www.boyatop.com
* @Version V1.0
**/
@Data
public class MsgEntity implements Serializable {
private String msgId;
private String userId;
private String phone;
private String email;
public MsgEntity(String msgId, String userId, String phone, String email) {
this.msgId = msgId;
this.userId = userId;
this.phone = phone;
this.email = email;
}
}
生产者
配置类
package com.boyatop.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* @ClassName RabbitMQConfig
* @Author www.boyatop.com
* @Version V1.0
**/
@Component
public class RabbitMQConfig {
/**
* 定义交换机
*/
private String EXCHANGE_SPRINGBOOT_NAME = "boyatop_ex";
/**
* 短信队列
*/
private String FANOUT_SMS_QUEUE = "fanout_sms_queue";
/**
* 邮件队列
*/
private String FANOUT_EMAIL_QUEUE = "fanout_email_queue";
// 1.注入队列和交换机注入到spring容器中
// 2.关联交换机 <bean id="smsQueue" class="";>
/**
* 邮件和短信队列注入到spring容器中
* @return
*/
@Bean
public Queue smsQueue() {
return new Queue(FANOUT_SMS_QUEUE);
}
@Bean
public Queue emailQueue() {
return new Queue(FANOUT_EMAIL_QUEUE);
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(EXCHANGE_SPRINGBOOT_NAME);
}
/**
* 关联交换机
* 根据参数名称 ioc获取 Queue对象
*/
@Bean
public Binding BindingSmsFanoutExchange(Queue smsQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(smsQueue).to(fanoutExchange);
}
@Bean
public Binding BindingEmailFanoutExchange(Queue emailQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(emailQueue).to(fanoutExchange);
}
}
生产者Controller
package com.boyatop.controller;
import com.boyatop.entity.MsgEntity;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
/**
* @ClassName ProducerController
* @Author www.boyatop.com
* @Version V1.0
**/
@RestController
public class ProducerController {
@Autowired
private AmqpTemplate amqpTemplate;
@RequestMapping("/sendMsg")
public void sendMsg() {
/**
* 参数1 交换机名称
* 参数2 路由key
* 参数3 发送内容
*/
MsgEntity msgEntity = new MsgEntity(UUID.randomUUID().toString(),
"1234", "181111111", "[email protected]");
amqpTemplate.convertAndSend("boyatop_ex", "", msgEntity);
}
}
短信消费者
package com.boyatop;
import com.boyatop.entity.MsgEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @ClassName FanoutSmsConsumer
* @Author www.boyatop.com
* @Version V1.0
**/
@Slf4j
@Component
@RabbitListener(queues = "fanout_sms_queue")
public class FanoutSmsConsumer {
@RabbitHandler
public void process(MsgEntity msgEntity) {
log.info("sms:msgEntity:" + msgEntity);
}
}
邮件消费者
package com.boyatop;
import com.boyatop.entity.MsgEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @ClassName FanoutEmailConsumer
* @Author www.boyatop.com
* @Version V1.0
**/
@Slf4j
@Component
@RabbitListener(queues = "fanout_email_queue")
public class FanoutEmailConsumer {
@RabbitHandler
public void process(MsgEntity msgEntity) {
log.info("email:msgEntity:" + msgEntity);
}
}
SpringBoot开启消息确认机制+解决幂等性问题+生产者获取消费结果
创建SpringBoot项目,导入相关依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>order-producer-consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>order-producer-consumer</name>
<description>order-producer-consumer</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.1.1</version>
</dependency>
<!-- mysql 依赖 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- 添加springboot对amqp的支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<!--fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.49</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
配置文件application.yml
spring:
rabbitmq:
####连接地址
host: 127.0.0.1
####端口号
port: 5672
####账号
username: guest
####密码
password: guest
### 地址
virtual-host: /jxHosts
listener:
simple:
retry:
##开启消费者(程序出现异常的情况下)进行重试
enabled: true
##最大重试次数
max-attempts: 5
##重试间隔时间
initial-interval: 3000
##消费者需要手动发送确认信号
acknowledge-mode: manual
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/first?useUnicode=true&characterEncoding=utf-8
username: root
password: 123456
server:
port: 9090
RabbitMq配置类
package com.example.orderproducerconsumer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* @ClassName RabbitMQConfig
* @Author
* @Version V1.0
**/
@Component
public class RabbitMQConfig {
/**
* 定义交换机
*/
private String EXCHANGE_SPRINGBOOT_NAME = "boyatop_order";
/**
* 订单队列
*/
private String FANOUT_ORDER_QUEUE = "fanout_order_queue";
/**
* 配置orderQueue
*
* @return
*/
@Bean
public Queue orderQueue() {
return new Queue(FANOUT_ORDER_QUEUE);
}
/**
* 配置fanoutExchange
*
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(EXCHANGE_SPRINGBOOT_NAME);
}
// 绑定交换机 orderQueue
@Bean
public Binding bindingOrderFanoutExchange(Queue orderQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(orderQueue).to(fanoutExchange);
}
}
消息记录实体类
package com.example.orderproducerconsumer.entity;
import java.io.Serializable;
public class OrderEntity implements Serializable {
private int id;
private String orderName;
private String orderId;
public OrderEntity(String orderName, String orderId) {
this.orderName = orderName;
this.orderId = orderId;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getOrderName() {
return orderName;
}
public void setOrderName(String orderName) {
this.orderName = orderName;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public OrderEntity(int id, String orderName, String orderId) {
this.id = id;
this.orderName = orderName;
this.orderId = orderId;
}
public OrderEntity() {
}
}
Mapper
package com.example.orderproducerconsumer.mapper;
import com.example.orderproducerconsumer.entity.OrderEntity;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
import org.springframework.stereotype.Component;
@Mapper
public interface OrderMapper {
@Insert("insert order_info values (null,#{orderName},#{orderId})")
int addOrder(OrderEntity orderEntity);
@Select("SELECT * from order_info where orderId=#{orderId} ")
OrderEntity getOrder(String orderId);
}
controller
package com.example.orderproducerconsumer.controller;
import com.example.orderproducerconsumer.entity.OrderEntity;
import com.example.orderproducerconsumer.mapper.OrderMapper;
import com.example.orderproducerconsumer.producer.OrderProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
public class OrderController {
@Autowired
private OrderProducer orderProducer;
@Autowired
private OrderMapper orderMapper;
@RequestMapping("/sendOrder")
public String sendOrder() {
// 生成全局id
String orderId = System.currentTimeMillis() + "";
log.info("orderId:{}", orderId);
String orderName = " boyatop";
orderProducer.sendMsg(orderName, orderId);
return orderId;
}
/**
* 前端主动根据orderId定时查询
*
* @param orderId
* @return
*/
@RequestMapping("/getOrder")
public Object getOrder(String orderId) {
OrderEntity order = orderMapper.getOrder(orderId);
if (order == null) {
return "该订单没有被消费或者订单号错误!";
}
return order;
}
}
生产者
package com.example.orderproducerconsumer.producer;
import com.alibaba.fastjson.JSONObject;
import com.example.orderproducerconsumer.entity.OrderEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @ClassName OrderProducer
* @Author
* @Version V1.0
**/
@Component
@Slf4j
public class OrderProducer implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id = correlationData.getId();
log.info("id:" + id);
}
/**
* 使用mq发送消息
*
* @param orderName
* @param orderId
*/
public void sendMsg(String orderName, String orderId) {
OrderEntity orderEntity = new OrderEntity(orderName, orderId);
rabbitTemplate.convertAndSend("boyatop_order", "", orderEntity, message -> {
return message;
});
// CorrelationData correlationData = new CorrelationData();
// correlationData.setId(JSONObject.toJSONString(orderEntity));
// rabbitTemplate.convertAndSend("/boyatop_order", "", orderEntity,correlationData);
}
}
消费者
package com.example.orderproducerconsumer.consumer;
import com.example.orderproducerconsumer.entity.OrderEntity;
import com.example.orderproducerconsumer.mapper.OrderMapper;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @ClassName fanout_sms_queue
* @Author
* @Version V1.0
**/
@Slf4j
@Component
@RabbitListener(queues = "fanout_order_queue")
public class FanoutOrderConsumer {
@Autowired
private OrderMapper orderMapper;
@RabbitHandler
public void process(OrderEntity orderEntity, Message message, Channel channel) throws IOException {
try {
log.info(">>orderEntity:{}<<", orderEntity.toString());
String orderId = orderEntity.getOrderId();
if (StringUtils.isEmpty(orderId)) {
return;
}
OrderEntity dbOrderEntity = orderMapper.getOrder(orderId);
if (dbOrderEntity != null) {
log.info("另外消费者已经处理过该业务逻辑");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
return;
}
int result = orderMapper.addOrder(orderEntity);
int i = 1 / 0;
log.info(">>插入数据库中数据成功<<");
//开启消息确认机制
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 记录该消息日志形式 存放数据库db中、后期通过定时任务实现消息补偿、人工实现补偿
//消息处理失败,并将消息重新放回队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//将该消息存放到死信队列中,单独写一个死信消费者实现消费。
}
}
}
死信队列
1、概念
死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
2、产生原因
消息投递到MQ中存放 消息已经过期 消费者没有及时的获取到我们消息,消息如果存放到mq服务器中过期之后,会转移到备胎死信队列存放。
队列达到最大的长度 (队列容器已经满了)
消费者消费多次消息失败,就会转移存放到死信队列中;消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.
3、死信队列的架构原理
死信队列和普通队列区别不是很大
普通与死信队列都有自己独立的交换机和路由key、队列和消费者。
区别:
1.生产者投递消息先投递到我们普通交换机中,普通交换机在将该消息投到
普通队列中缓存起来,普通队列对应有自己独立普通消费者。
2.如果生产者投递消息到普通队列中,普通队列发现该消息一直没有被消费者消费
的情况下,在这时候会将该消息转移到死信(备胎)交换机中,死信(备胎)交换机
对应有自己独立的 死信(备胎)队列 对应独立死信(备胎)消费者。
4、死信实战
1)消息TTL过期
1.生产者代码
package com.example.rabbitmq.demo7;
import com.example.rabbitmq.demo2.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
/**
* TTL时间过期
*/
public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
// 设置消息的 TTL 时间
AMQP.BasicProperties properties = new
AMQP.BasicProperties().builder().expiration("10000").build();
// 该信息是用作演示队列个数限制
for (int i = 1; i <11 ; i++) {
String message="info"+i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties,
message.getBytes());
System.out.println(" 生产者发送消息:"+message);
}
}
}
}
2.消费者01代码(启动之后关闭该消费者 模拟其接收不到消息)
package com.example.rabbitmq.demo7;
import com.example.rabbitmq.demo2.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.util.HashMap;
import java.util.Map;
/**
* TTL时间过期
*/
public class Consumer01 {
// 普通交换机名称
private static final String NORMAL_EXCHANGE = "normal_exchange";
// 死信交换机名称
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 声明死信和普通交换机 类型为 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 声明死信队列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
// 死信队列绑定死信交换机与 routingkey
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
// 正常队列绑定死信队列信息
Map<String, Object> params = new HashMap<>();
// 正常队列设置死信交换机 参数 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 正常队列设置死信 routing-key 参数 key 是固定值
params.put("x-dead-letter-routing-key", "lisi");
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println(" 等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer01 接收到消息"+message);
};
channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
});
}
}
3.消费者02代码( 以上步骤完成后 启动 2 C2 消费者 它消费死信队列里面的消息)
package com.example.rabbitmq.demo7;
import com.example.rabbitmq.demo2.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
/**
* TTL时间过期
*/
public class Consumer02 {
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println(" 等待接收死信队列消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer02 接收死信队列的消息" + message);
};
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
});
}
}
2)队列达到最大长度
1.消息生产者代码去掉 TTL 属性
/**
* 队列达到最大长度
*/
public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
// 该信息是用作演示队列个数限制
for (int i = 1; i <11 ; i++) {
String message="info"+i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null, message.getBytes());
System.out.println(" 生产者发送消息:"+message);
}
}
}
}
2.C1 消费者修改以下代码 ( 启动之后关闭该消费者 模拟其接收不到消息)
添加 params.put("x-max-length",6);
/**
* 队列达到最大长度
*/
public class Consumer01 {
// 普通交换机名称
private static final String NORMAL_EXCHANGE = "normal_exchange";
// 死信交换机名称
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 声明死信和普通交换机 类型为 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 声明死信队列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
// 死信队列绑定死信交换机与 routingkey
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
// 正常队列绑定死信队列信息
Map<String, Object> params = new HashMap<>();
// 正常队列设置死信交换机 参数 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 正常队列设置死信 routing-key 参数 key 是固定值
params.put("x-dead-letter-routing-key", "lisi");
params.put("x-max-length",6);
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println(" 等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer01 接收到消息"+message);
};
channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
});
}
}
注意此时需要把原先队列删除 因为参数改变了
3.C2 消费者代码不变( 启动 C2 消费者)
3)消息被拒
1.消息生产者代码同上生产者一致
2.C1 消费者代码( 启动之后关闭该消费者 模拟其接收不到消息)
/**
* 消息被拒
*/
public class Consumer01 {
// 普通交换机名称
private static final String NORMAL_EXCHANGE = "normal_exchange";
// 死信交换机名称
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 声明死信和普通交换机 类型为 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 声明死信队列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
// 死信队列绑定死信交换机与 routingkey
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
// 正常队列绑定死信队列信息
Map<String, Object> params = new HashMap<>();
// 正常队列设置死信交换机 参数 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 正常队列设置死信 routing-key 参数 key 是固定值
params.put("x-dead-letter-routing-key", "lisi");
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println(" 等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
if(message.equals("info5")){
System.out.println("Consumer01 接收到消息" + message + " 并拒绝签收该消息");
//requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
}else {
System.out.println("Consumer01 接收到消息"+message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
boolean autoAck = false;
channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> {
});
}
}
3.C2 消费者代码不变
启动消费者 01 然后再启动消费者 02
注意此时需要把原先队列删除 因为参数改变了
生产者如何获取消费结果
Rocketmq 自带全局消息id,能够根据该全局消息获取消费结果
原理: 生产者投递消息到mq服务器,mq服务器端在这时候返回一个全局的消息id,当我们消费者消费该消息成功之后,消费者会给我们mq服务器端发送通知标记该消息消费成功。
生产者获取到该消息全局id,每隔2s时间调用mq服务器端接口查询该消息是否有被消费成功。
幂等性
1、概念
用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。
举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等
消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断,故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。
2、解决幂等性问题
MQ 消费者的幂等性的解决一般使用全局 ID 或者写个唯一标识比如时间戳 或者 UUID 或者订单消费者消费 MQ 中的消息也可利用 MQ 的该 id 来判断,或者可按自己的规则生成一个全局唯一 id,每次消费消息时用该 id 先判断该消息是否已消费过。
主流的有两种解决思路:
a.唯一 ID+指纹码机制,利用数据库主键去重
指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个 id 是否存在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。
b.利用 redis 的原子性去实现
利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费
延迟队列
1、概念
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
2、延迟队列使用场景
1.订单在十分钟之内未支付则自动取消
2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
3.用户注册成功后,如果三天内没有登陆则进行短信提醒。
4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。
5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
3、RabbitMQ 中的 TTL
TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。
① 消息设置 TTL
一种方式便是针对每条消息设置 TTL
② 队列设置 TTL
一种是在创建队列的时候设置队列的“x-message-ttl”属性
③ 二者区别
第一种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中)。另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。
4、延迟队列案例
创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:
配置类
package com.example.orderproducerconsumer.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class TtlQueueConfig {
public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String DEAD_LETTER_QUEUE = "QD";
// 声明 xExchange
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
// 声明 xExchange
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
// 声明队列 A ttl 为 10s 并绑定到对应的死信交换机
@Bean("queueA")
public Queue queueA(){
Map<String, Object> args = new HashMap<>(3);
// 声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
// 声明队列的 TTL
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}
// 声明队列 A 绑定 X 交换机
@Bean
public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
// 声明队列 B ttl 为 40s 并绑定到对应的死信交换机
@Bean("queueB")
public Queue queueB(){
Map<String, Object> args = new HashMap<>(3);
// 声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
// 声明队列的 TTL
args.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
}
// 声明队列 B 绑定 X 交换机
@Bean
public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
}
// 声明死信队列 QD
@Bean("queueD")
public Queue queueD(){
return new Queue(DEAD_LETTER_QUEUE);
}
// 声明死信队列 QD 绑定关系
@Bean
public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
消息生产者
package com.example.orderproducerconsumer.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("sendMsg/{message}")
public void sendMsg(@PathVariable String message){
log.info(" 当前时间:{}, 发送一条信息给两个 TTL 队列:{}", new Date(), message);
rabbitTemplate.convertAndSend("X", "XA", " 消息来自 ttl 为 为 10S 的队列: "+message);
rabbitTemplate.convertAndSend("X", "XB", " 消息来自 ttl 为 为 40S 的队列: "+message);
}
}
消息消费者
package com.example.orderproducerconsumer.controller;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info(" 当前时间:{}, 收到死信队列信息{}", new Date().toString(), msg);
}
}
发起一个请求 http://localhost:8080/ttl/sendMsg/嘻嘻嘻
第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了。
不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?
5、延时队列优化
在这里新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间
配置类
@Component
public class MsgTtlQueueConfig {
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String QUEUE_C = "QC";
// 声明队列 C 死信交换机
@Bean("queueC")
public Queue queueB(){
Map<String, Object> args = new HashMap<>(3);
// 声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
// 没有声明 TTL 属性
return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
}
// 声明队列 B 绑定 X 交换机
@Bean
public Binding queuecBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
}
消息生产者
@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,@PathVariable String ttlTime) {
rabbitTemplate.convertAndSend("X", "XC", message, correlationData ->{
correlationData.getMessageProperties().setExpiration(ttlTime);
return correlationData;
});
log.info(" 当前时间:{}, 发送一条时长{} 毫秒 TTL 信息给队列 C:{}", new Date(),ttlTime, message);
}
发起请求
http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000
http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000
消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
6、Rabbitmq 插件实现延迟队列
上文中提到的问题,确实是一个问题,如果不能实现在消息粒度上的 TTL,并使其在设置的 TTL 时间及时死亡,就无法设计成一个通用的延时队列。那如何解决呢,接下来我们就去解决该问题。
安装延时队列插件
在官网上下载 Community Plugins | RabbitMQ,下载
rabbitmq_delayed_message_exchange 插件,然后解压放置到 RabbitMQ 的插件目录。
进入 RabbitMQ 的安装目录下的 plgins 目录,执行下面命令让该插件生效,然后重启 RabbitMQ
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
7、插件代码实现
在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:
在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制 消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。
配置类
@Configuration
public class DelayedQueueConfig {
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
// 自定义交换机 我们在这里定义的是一个延迟交换机
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
// 自定义交换机的类型
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false,
args);
}
@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
@Qualifier("delayedExchange") CustomExchange
delayedExchange) {
return
BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
消息生产者
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime) {
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,
correlationData ->{
correlationData.getMessageProperties().setDelay(delayTime);
return correlationData;
});
log.info(" 当 前 时 间 : {}, 发 送 一 条 延 迟 {} 毫 秒 的 信 息 给 队 列 delayed.queue:{}", new
Date(),delayTime, message);
}
消息消费者
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message){
String msg = new String(message.getBody());
log.info(" 当前时间:{}, 收到延时队列的消息:{}", new Date().toString(), msg);
}
发起请求:
http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000
http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000
第二个消息被先消费掉了,符合预期
8、总结
延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。
优先级队列
使用场景
在我们系统中有一个订单催付的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧,但是,tmall商家对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果,小米这样大商家一年起码能给我们创造很大的利润,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用 redis 来存放的定时轮询,大家都知道 redis 只能用 List 做一个简简单单的消息队列,并不能实现一个优先级的场景,所以订单量大了后采用 RabbitMQ 进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级,否则就是默认优先级。
如何添加
a.控制台页面添加
b.队列中代码添加优先级
Map<String, Object> params = new HashMap();
params.put("x-max-priority", 10);
channel.queueDeclare("hello", true, false, false, params);
c.消息中代码添加优先级
AMQP.BasicProperties properties = new
AMQP.BasicProperties().builder().priority(5).build();
d.注意事项
要让队列实现优先级需要做的事情有如下事情:队列需要设置为优先级队列,消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才去消费因为,这样才有机会对消息进行排序
优先级队列案例
消息生产者
public class Producer {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel();) {
// 给消息赋予一个 priority 属性
AMQP.BasicProperties properties = new
AMQP.BasicProperties().builder().priority(5).build();
for (int i = 1; i <11; i++) {
String message = "info"+i;
if(i==5){
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
}else{
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
System.out.println(" 发送消息完成:" + message);
}
}
}
}
消息消费者
public class Consumer {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 设置队列的最大优先级 最大可以设置到 255 官网推荐 1-10 如果设置太高比较吃内存和 CPU
Map<String, Object> params = new HashMap();
params.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, true, false, false, params);
System.out.println(" 消费者启动等待消费......");
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String receivedMessage = new String(delivery.getBody());
System.out.println(" 接收到消息:"+receivedMessage);
};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,(consumerTag)->{
System.out.println(" 消费者无法消费 消息时调用,如队列被删除");
});
}
}
惰性队列
1. 使用场景
RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。
默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然 RabbitMQ 的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在消息量特别大的时候。
2. 两种模式
队列具备两种模式:default 和 lazy。默认的为 default 模式,在 3.6.0 之前的版本无需做任何变更。lazy模式即为惰性队列的模式,可以通过调用 channel.queueDeclare 方法的时候在参数中设置,也可以通过Policy 的方式设置,如果一个队列同时使用这两种方式设置的话,那么 Policy 的方式具备更高的优先级。
如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。在队列声明的时候可以通过“x-queue-mode”参数来设置队列的模式,取值为“default”和“lazy”。下面示例中演示了一个惰性队列的声明细节:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);
3. 内存开销对比
在发送 1 百万条消息,每条消息大概占 1KB 的情况下,普通队列占用内存是 1.2GB,而惰性队列仅仅占用 1.5MB
版权归原作者 沐暖沐 所有, 如有侵权,请联系我们删除。