SpringBoot集成RabbitMq
一、RabbitMq的用途及作用
一、削峰、异步、解耦
经常开发的人都知道、RabbitMq常用于并发、流量大的场景,因为RabbitMq属于中间件需要维护,所以一般小项目几乎不会使用。而在于大型的并发环境下,大量的流量积压到接口中,使Mysql连接分配出现不够使用的情况,此时就可以使用RabbitMq来解决。
削峰:
当流量洪峰到达接口时,可以用现实中来举例子,mq就相当于一个独木桥,mysq就相当于河对岸,使大量的人从容有序的排队过河,而不会出现所有人全部淌水过河到河对岸,大大减少MySQL的压力。
异步:
通常采用异步通知的方式,就好比我们在抢票的时候,点击提交,系统会返回一个提示正在努力抢票中,而实际上你的订单正在mq队列中排队处理,处理结果会在后续异步通知结果。
解耦:
解耦主要两方面:
1.生产消息的应用 和 消费消息的应用不是同一种语言可以解耦
2.生产消息的应用 宕机,不会影响到消费者消费消息
二、RabbitMQ介绍
市面上比较火爆的几款MQ:
ActiveMQ,RocketMQ,Kafka,RabbitMQ。
- 语言的支持:ActiveMQ,RocketMQ只支持Java语言,Kafka可以支持多们语言,RabbitMQ支持多种语言。
- 效率方面:ActiveMQ,RocketMQ,Kafka效率都是毫秒级别,RabbitMQ是微秒级别的。
- 消息丢失,消息重复问题: RabbitMQ针对消息的持久化,和重复问题都有比较成熟的解决方案。
- 学习成本:RabbitMQ非常简单。
RabbitMQ是由Rabbit公司去研发和维护的,最终是在Pivotal。
RabbitMQ严格的遵循AMQP协议,高级消息队列协议,帮助我们在进程之间传递异步消息。
消息队列之间的对比?
支持多语言:RabbitMQ,Kafaka ;ActiveMQ,RocketMQ只支持java
传输速度:RabbitMQ微秒级,其他毫秒级别
吞吐量:kafka 吞吐量和磁盘性能* 集群数量相关 次之RocketMQ
消息高可靠:每个一个都可以保证不丢失,不重复
三、RabbitMQ安装
我本机采用docker安装,比较简洁方便
第一步docker命令创建文件夹:mkdir rabbit
切换到刚刚创建的文件夹中:cd rabbit/
创建配置文件:vim docker-compose.yaml
将下面配置信息粘贴进去:
version: "3.1"
services:
rabbitmq:
image: daocloud.io/library/rabbitmq:management
restart: always
container_name: rabbitmq
ports:
- 5672:5672
- 15672:15672
volumes:
- ./data:/var/lib/rabbitmq
按esc按键然后按下:输入wq保存并退出
启动rabbitmq:docker-compose up
游览器输入虚拟机地址+15672端口号访问RabbitMq可视化界面 默认用户名密码都是guest
四、Springboot整合RabbitMq
一 、分类
- Publisher - 生产者:发布消息到RabbitMQ中的Exchange
- Consumer - 消费者:监听RabbitMQ中的Queue中的消息
- Exchange - 交换机:和生产者建立连接并接收生产者的消息
- Queue - 队列:Exchange会将消息分发到指定的Queue,Queue和消费者进行交互
- Routes - 路由:交换机以什么样的策略将消息发布到Queue主要模式:Simple Work Queue (简单工作队列):也就是常说的点对点模式,一条消息由一个消费者进行消费。(当有多个消费者时,默认使用轮训机制把消息分配给消费者)。 Work Queues (工作队列):也叫公平队列,能者多劳的消息队列模型。队列必须接收到来自消费者的 手动ack 才可以继续往消费者发送消息。 Publish/Subscribe (发布订阅模式):一条消息被多个消费者消费。 Routing(路由模式):有选择的接收消息。 Topics (主题模式):通过一定的规则来选择性的接收消息 RPC 模式:发布者发布消息,并且通过 RPC 方式等待结果。目前这个应该场景少,而且代码也较为复杂交换机类型:direct(直连交换机):将队列绑定到交换机,消息的 routeKey 需要与队列绑定的 routeKey 相同。 fanout (扇形交换机):不处理 routeKey ,直接把消息转发到与其绑定的所有队列中。 topic(主题交换机):根据一定的规则,根据 routeKey 把消息转发到符合规则的队列中,其中 # 用于匹配符合一个或者多个词(范围更广), * 用于匹配一个词。 headers (头部交换机):根据消息的 headers 转发消息而不是根据 routeKey 来转发消息, 其中 header 是一个 Map,也就意味着不仅可以匹配字符串类型,也可以匹配其他类型数据。 规则可以分为所有键值对匹配或者单一键值对匹配。
导入依赖
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
这里有两种方式整合RabbitMq
第一种采用其本身的框架 获取连接
最简模式
package com.wwy.config;
/**
* @author 王伟羽
* @date 2024/3/13 9:39
*/
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 配置获取RabbitMq的静态方法
*/
public class RabbitMqUtils {
public static Connection getConnection() {
ConnectionFactory factory = new ConnectionFactory(); //创建连接工厂
//设置相关属性
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setHost("192.168.60.139");
factory.setPort(5672);
try {
//获取连接
Connection conn = factory.newConnection();
return conn;
} catch (IOException e) {
e.printStackTrace();
return null;
} catch (TimeoutException e) {
e.printStackTrace();
return null;
}
}
}
创建生产者
package com.wwy.producter;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wwy.config.RabbitMqUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.util.Objects;
/**
* @author 王伟羽
* @date 2024/3/13 9:52
*/
@RestController
@RequestMapping(value = "/test")
public class producer {
private final static String QUERE_NAME = "quere_name";
@GetMapping(value = "/sendMessage")
public String sendMessage(String message) {
System.out.println(message);
Connection connection = RabbitMqUtils.getConnection();
if (Objects.nonNull(connection)) {
try {
Channel channel = connection.createChannel();
// 参数1:指定exchange,使用""。 最简模式(helloword) 使用默认交换机
// 参数2:指定路由的规则,
// 使用具体的队列名称。
// 参数2可以是队列名 也可以是路由规则
// 参数3:指定传递的消息所携带的properties,使用null。
// 参数4:指定发布的具体消息,byte[]类型
channel.basicPublish("", QUERE_NAME, null, "马上下课".getBytes("utf-8"));
return "发送消息成功!";
} catch (IOException e) {
e.printStackTrace();
return "发送消息失败!";
}
}
return "mq初始化失败!";
}
}
消费者消费消息
package com.wwy.consumer;
import com.rabbitmq.client.*;
import com.wwy.config.RabbitMqUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.util.Objects;
/**
* @author 王伟羽
* @date 2024/3/13 10:33
*/
@Service
public class ConsumerTest {
private final static String QUERE_NAME = "quere_name";
@Bean
public void consumeMessage() {
Connection connection = RabbitMqUtils.getConnection();
if(Objects.nonNull(connection)){
try {
Channel channel = connection.createChannel();
channel.queueDeclare(QUERE_NAME,true,false,false,null);
// 第二步创建消费者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// byte[] body 就是消费者得到的数据
System.out.println("消费者 得到消息body = " + new String(body,"utf-8"));
}
};
channel.basicConsume(QUERE_NAME,true,consumer);
// 让当前程序卡在 这里
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
消费者获取消息的另一种方式(官网)
package com.wwy.consumer;
import com.rabbitmq.client.*;
import com.wwy.config.RabbitMqUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.util.Objects;
/**
* @author 王伟羽
* @date 2024/3/13 10:33
*/
@Service
public class ConsumerTest {
private final static String QUERE_NAME = "quere_name";
@Bean
public void consumeMessage() {
Connection connection = RabbitMqUtils.getConnection();
if(Objects.nonNull(connection)){
try {
Channel channel = connection.createChannel();
channel.queueDeclare(QUERE_NAME,true,false,false,null);
// // 第二步创建消费者
// Consumer consumer = new DefaultConsumer(channel){
// @Override
// public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//
// // byte[] body 就是消费者得到的数据
//
// System.out.println("消费者 得到消息body = " + new String(body,"utf-8"));
//
// }
// };
// channel.basicConsume(QUERE_NAME,true,consumer);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUERE_NAME, true, deliverCallback, consumerTag -> { });
// 让当前程序卡在 这里
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
启动程序访问接口
生产者生产成功
消费者收到生产者的消息
使用封装好的RabbitTemplate进行操作,比较方便快捷
第一步配置信息
server:
port: 8083
spring:
rabbitmq: # 单机版配置
host: 192.168.60.139
port: 5672
username: guest #账户名密码默认都是guest
password: guest
publisher-confirm-type: simple
publisher-returns: true
listener:
simple:
acknowledge-mode: manual
生产者:
package com.wwy.producter;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wwy.config.RabbitMqUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* @author 王伟羽
* @date 2024/3/13 9:52
*/
@RestController
@RequestMapping(value = "/test")
public class producer {
private final static String QUERE_NAME = "quere_name";
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping(value = "/sendMessage")
public String sendMessage(String message) {
System.out.println(message);
rabbitTemplate.convertAndSend(QUERE_NAME,message);
return "发送成功!";
}
}
消费者:
package com.wwy.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author 王伟羽
* @date 2024/3/13 10:33
*/
@Component
public class ConsumerTest {
private final static String QUERE_NAME = "quere_name";
@RabbitListener(queues = QUERE_NAME)
public void handleMessage(String msg) {
System.out.println("listener 收到消息4 " + msg);
}
}
运行
工作队列模式
生产者:
package com.wwy.producter;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import com.wwy.config.RabbitMqUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.util.CollectionUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.Objects;
/**
* @author 王伟羽
* @date 2024/3/13 13:53
*/
@RestController
@RequestMapping(value = "/workQueues")
public class WorkQueuesProducer {
private final static String WORK_QUEUES = "work_queues";
@Resource
private RabbitTemplate rabbitTemplate;
//第一种
@GetMapping(value = "/workQueuesSendMessage")
public String workQueuesSendMessage(String message) {
System.out.println("接收消息");
//第一种生产消息方法
//获取连接
Connection connection = RabbitMqUtils.getConnection();
if (Objects.nonNull(connection)) {
try {
Channel channel = connection.createChannel();
//发送消息
for (int i = 0; i < 10; i++) {
channel.basicPublish("", WORK_QUEUES, null, (message + i).getBytes("utf-8"));
}
return "发送成功!";
} catch (IOException e) {
e.printStackTrace();
return "发送失败!";
}
}
return "初始mq失败!";
}
//第二种
@GetMapping(value = "/workQueuesSendMessage01")
public String workQueuesSendMessage01(String message) {
System.out.println("接收消息");
//发送消息
for (int i = 0; i < 10; i++) {
System.out.println(i);
rabbitTemplate.convertAndSend(WORK_QUEUES, message);
}
return "发送成功!";
}
}
消费者消费消息
第一种:
package com.wwy.consumer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.wwy.config.RabbitMqUtils;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.Objects;
/**
* @author 王伟羽
* @date 2024/3/13 14:13
*/
@Service
public class WorkQueuesConsumerOne {
private final static String WORK_QUEUES = "work_queues";
@Bean
public void getMessageInfoOne() {
Connection connection = RabbitMqUtils.getConnection();
if (Objects.nonNull(connection)) {
try {
Channel channel = connection.createChannel();
channel.queueDeclare(WORK_QUEUES, true, false, false, null);
//设置每次消费消息的数量
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] 队列1消息内容 '" + message + "'");
System.out.println("队列1获取到消息");
//手动ACK
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(WORK_QUEUES, false, deliverCallback, consumerTag -> {
System.out.println("队列1消息消费被中断");
});
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Bean
public void getMessageInfoTwo() {
Connection connection = RabbitMqUtils.getConnection();
if (Objects.nonNull(connection)) {
try {
Channel channel = connection.createChannel();
channel.queueDeclare(WORK_QUEUES, true, false, false, null);
//设置每次消费消息的数量
channel.basicQos(1);
System.out.println("队列2获取到消息");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] 队列2消息内容 '" + message + "'");
//手动ACK
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(WORK_QUEUES, false, deliverCallback, consumerTag -> {
System.out.println("队列2消息消费被中断");
});
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
第二种注解方式:
package com.wwy.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author 王伟羽
* @date 2024/3/13 15:10
*/
@Component
public class WorkQueuesConsumersOne {
public final static String WORK_QUEUES = "work_queues";
@RabbitListener(queues = WORK_QUEUES)
public void consumerOne(String message) {
System.out.println("队列1收到消息"+message);
}
@RabbitListener(queues = WORK_QUEUES)
public void consumerTwo(String message) {
System.out.println("队列2收到消息"+message);
}
}
发布/订阅模式
1、1个生产者,多个消费者
2、每一个消费者都有自己的一个队列
3、生产者没有将消息直接发送到队列,而是发送到了交换机
4、每个队列都要绑定到交换机
5、生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取的目的
X(Exchanges):交换机一方面:接收生产者发送的消息。另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
第一种生产者方式:
package com.wwy.producter;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wwy.config.RabbitMqUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.util.Objects;
/**
* @author 王伟羽
* @date 2024/3/13 16:54
*/
@RestController
@RequestMapping(value = "/publishProducer")
public class PublishProducer {
private final static String PUBLISH_PRODUCER1 = "publish_name_one";
private final static String PUBLISH_PRODUCER2 = "publish_name_two";
//交换机名称
private final static String EXCHANGE_NAME = "publish_exchange";
@GetMapping(value = "/publishSendMessage")
public String publishSendMessage(String message) {
Connection connection = RabbitMqUtils.getConnection();
if (Objects.nonNull(connection)) {
Channel channel = null;
try {
channel = connection.createChannel();
// 绑定交换机
//参数1: exchange的名称
//参数2: 指定exchange的类型
// FANOUT - pubsub , 发布订阅
// DIRECT - Routing , 路由模式
// TOPIC - Topics topic
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//给交换机绑定对应的队列
//将队列和交换机绑定
//String var1, 队列名
//String var2, 交换机名
//String var3, 对应绑定队列 路由规则 "" 没有规则所有的队列消息一样
channel.queueBind(PUBLISH_PRODUCER1, EXCHANGE_NAME, "");
channel.queueBind(PUBLISH_PRODUCER2, EXCHANGE_NAME, "");
// 参数1:指定exchange,使用""。 最简模式(helloword) 使用默认交换机
// 参数2:指定路由的规则,
// 使用具体的队列名称。
// 参数2可以是队列名 也可以是路由规则
// 参数3:指定传递的消息所携带的properties,使用null。
// 参数4:指定发布的具体消息,byte[]类型
for (int i = 1; i < 11; i++) {
// 消息向交换机发送,没有匹配路由规则
channel.basicPublish(EXCHANGE_NAME, PUBLISH_PRODUCER1, null, (message + i).getBytes("utf-8"));
}
return "发送成功!";
} catch (IOException e) {
e.printStackTrace();
}
}
return "mq出错!";
}
}
消费者消费信息:
package com.wwy.consumer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.wwy.config.RabbitMqUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Objects;
/**
* @author 王伟羽
* @date 2024/3/13 17:06
*/
@Component
public class PublishConsumerOne {
private final static String PUBLISH_PRODUCER1 = "publish_name_one";
private final static String PUBLISH_PRODUCER2 = "publish_name_two";
//交换机名称
private final static String EXCHANGE_NAME = "publish_exchange";
@Bean
public void publishGetInfo() {
Connection connection = RabbitMqUtils.getConnection();
if (Objects.nonNull(connection)) {
try {
Channel channel = connection.createChannel();
//保证消费者 每次只消费一条消息
channel.basicQos(1);
// 第一步声明 队列
channel.queueDeclare(PUBLISH_PRODUCER1,true,false,false,null);
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
channel.queueBind(PUBLISH_PRODUCER1, EXCHANGE_NAME, "");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" 队列一消费消息" + message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(PUBLISH_PRODUCER1, true, deliverCallback, consumerTag -> {
});
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Bean
public void publishGetInfoTwo() {
Connection connection = RabbitMqUtils.getConnection();
if (Objects.nonNull(connection)) {
try {
Channel channel = connection.createChannel();
//保证消费者 每次只消费一条消息
channel.basicQos(1);
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
channel.queueDeclare(PUBLISH_PRODUCER2,true,false,false,null);
channel.queueBind(PUBLISH_PRODUCER2, EXCHANGE_NAME, "");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" 队列二消费消息" + message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(PUBLISH_PRODUCER2, true, deliverCallback, consumerTag -> {
});
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
运行结果可知,两个消费者都能获取到信息,此种情况适合用户注册业务,一个队列接收短信发送,一个队列接收邮件发送
第二种简便方式 生产者
@GetMapping(value = "/PublicSubscribe")
public void PublicSubscribe() {
rabbitTemplate.convertAndSend("publish_exchange_one", "", "发布订阅模式", new CorrelationData("我是大帅逼"));
}
消费者:
package com.wwy.consumer;
import lombok.extern.apachecommons.CommonsLog;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.stereotype.Component;
/**
* @author 王伟羽
* @date 2024/3/13 17:49
*/
@Component
public class PublishConsumerTwo {
@RabbitListener(queues = "publish_queue_one")
public void publishOne(String message) {
System.out.println("队列1收到的消息" + message);
}
@RabbitListener(queues = "publish_queue_two")
public void publishTwo(String message) {
System.out.println("队列2收到的消息" + message);
}
}
主题模式
由于上述第一种方法太过于繁琐,所以主题模式只采用第二种方法,第一种在后续代码里展示
配置主题模式下的队列、交换机并将其绑定起来
package com.wwy.config;
import org.springframework.amqp.core.*;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
/**
* @author 王伟羽
* @date 2024/3/14 9:29
*/
@SpringBootConfiguration
public class TopicConfiguration {
@Bean
public TopicExchange getTopicExchange(){
return new TopicExchange("topic_exchange_one");
}
@Bean
public Queue getTopicQueueOne(){
return new Queue("topic_queue_one");
}
@Bean
public Queue getTopicQueueTwo(){
return new Queue("topic_queue_two");
}
@Bean
public Queue getTopicQueueThree(){
return new Queue("topic_queue_three");
}
//* 代表一个词
//# 代表零个或者多个词
@Bean
public Binding getTopicBindingOne(){
return BindingBuilder.bind(getTopicQueueOne()).to(getTopicExchange()).with("a.*");
}
@Bean
public Binding getTopicBindingThree(){
return BindingBuilder.bind(getTopicQueueThree()).to(getTopicExchange()).with("a.#");
}
@Bean
public Binding getTopicBindingTwo(){
return BindingBuilder.bind(getTopicQueueTwo()).to(getTopicExchange()).with("a.111");
}
}
创建生产者:
package com.wwy.producter;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;
/**
* @author 王伟羽
* @date 2024/3/14 9:36
*/
@RestController
@RequestMapping(value = "/topic")
public class TopicProducer {
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping(value = "/sendTopicMessage")
public String sendTopicMessage(String message){
try {
rabbitTemplate.convertAndSend("topic_exchange_one","a.123",message.getBytes("utf-8"));
return "生产者发送消息成功";
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return "发送消息失败!";
}
}
}
创建消费者:
package com.wwy.consumer;
import lombok.extern.apachecommons.CommonsLog;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author 王伟羽
* @date 2024/3/14 9:45
*/
@Component
public class TopicConsumerOne {
@RabbitListener(queues = "topic_queue_one")
public void getTopicMessageOne(String message) {
System.out.println("队列一收到消息:" + message);
}
@RabbitListener(queues = "topic_queue_two")
public void getTopicMessageTwo(String message) {
System.out.println("队列二收到消息:" + message);
}
@RabbitListener(queues = "topic_queue_three")
public void getTopicMessageThree(String message) {
System.out.println("队列三收到消息:" + message);
}
}
这里有一个问题,在每次消费端重启的时候会继续消费队列里的数据,为了防止这种情况,可以消费者在消费到数据的时候进行手动ack
package com.wwy.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.apachecommons.CommonsLog;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author 王伟羽
* @date 2024/3/14 9:45
*/
@Component
public class TopicConsumerOne {
@RabbitListener(queues = "topic_queue_one")
public void getTopicMessageOne(@Payload Message message, @Header(AmqpHeaders.CHANNEL) Channel channel) {
try {
// 获取消息内容
String messageBody = new String(message.getBody());
System.out.println("队列一收到消息:"+messageBody);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false);
System.out.println("队列一收到ack");
} catch (Exception e) {
// 如果处理消息时出现异常,可以拒绝消息
Long deliveryTag = (Long) message.getMessageProperties().getDeliveryTag();
if (!channel.isOpen()) {
// 如果channel已经关闭,则无法执行basicNack或basicReject
return;
}
try {
channel.basicNack(deliveryTag, false, true); // 拒绝消息并重新放回队列
} catch (IOException ex) {
ex.printStackTrace();
}
// 或者可以选择 basicReject 如果不需要重新放回队列
// channel.basicReject(deliveryTag, true); // 拒绝消息并丢弃
}
}
@RabbitListener(queues = "topic_queue_two")
public void getTopicMessageTwo(@Payload Message message, @Header(AmqpHeaders.CHANNEL) Channel channel) {
try {
// 获取消息内容
String messageBody = new String(message.getBody());
System.out.println("队列二收到消息:"+messageBody);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false);
System.out.println("队列二收到ack");
} catch (Exception e) {
// 如果处理消息时出现异常,可以拒绝消息
Long deliveryTag = (Long) message.getMessageProperties().getDeliveryTag();
if (!channel.isOpen()) {
// 如果channel已经关闭,则无法执行basicNack或basicReject
return;
}
try {
channel.basicNack(deliveryTag, false, true); // 拒绝消息并重新放回队列
} catch (IOException ex) {
ex.printStackTrace();
}
// 或者可以选择 basicReject 如果不需要重新放回队列
// channel.basicReject(deliveryTag, true); // 拒绝消息并丢弃
}
}
@RabbitListener(queues = "topic_queue_three")
public void getTopicMessageThree(@Payload Message message, @Header(AmqpHeaders.CHANNEL) Channel channel) {
try {
// 获取消息内容
String messageBody = new String(message.getBody());
System.out.println("队列三收到消息:"+messageBody);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false);
System.out.println("队列三收到ack");
} catch (Exception e) {
// 如果处理消息时出现异常,可以拒绝消息
Long deliveryTag = (Long) message.getMessageProperties().getDeliveryTag();
if (!channel.isOpen()) {
// 如果channel已经关闭,则无法执行basicNack或basicReject
return;
}
try {
channel.basicNack(deliveryTag, false, true); // 拒绝消息并重新放回队列
} catch (IOException ex) {
ex.printStackTrace();
}
// 或者可以选择 basicReject 如果不需要重新放回队列
// channel.basicReject(deliveryTag, true); // 拒绝消息并丢弃
}
}
}
路由模式
路由模式几乎与主题模式相同,也是通过key去发送到对应的消费者中去
配置队列,交换机并将其绑定到一起
package com.wwy.config;
import org.springframework.amqp.core.*;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
/**
* @author 王伟羽
* @date 2024/3/14 10:49
*/
@SpringBootConfiguration
public class RouterConfiguration {
@Bean
public DirectExchange getRouterExchange(){
return new DirectExchange("router_exchange_one");
}
@Bean
public Queue getRouterQueueOne(){
return new Queue("router_queue_one");
}
@Bean
public Queue getRouterQueueTwo(){
return new Queue("router_queue_two");
}
@Bean
public Queue getRouterQueueThree(){
return new Queue("router_queue_three");
}
@Bean
public Binding getRouterBindingOne(){
return BindingBuilder.bind(getRouterQueueOne()).to(getRouterExchange()).with("aaa");
}
@Bean
public Binding getRouterBindingThree(){
return BindingBuilder.bind(getRouterQueueThree()).to(getRouterExchange()).with("bbb");
}
@Bean
public Binding getRouterBindingTwo(){
return BindingBuilder.bind(getRouterQueueTwo()).to(getRouterExchange()).with("ccc");
}
}
生产者:
package com.wwy.producter;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;
/**
* @author 王伟羽
* @date 2024/3/14 10:48
*/
@RestController
@RequestMapping(value = "/router")
public class RouterProducer {
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping(value = "/sendRouterMessage")
public String sendTopicMessage(String message){
try {
rabbitTemplate.convertAndSend("router_exchange_one","aaa",message.getBytes("utf-8"));
return "生产者发送消息成功";
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return "发送消息失败!";
}
}
}
消费者:
package com.wwy.consumer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author 王伟羽
* @date 2024/3/14 10:57
*/
@Component
public class RouterConsumer {
@RabbitListener(queues = "router_queue_one")
public void getTopicMessageOne(@Payload Message message, @Header(AmqpHeaders.CHANNEL) Channel channel) {
try {
// 获取消息内容
String messageBody = new String(message.getBody());
System.out.println("队列一收到消息:"+messageBody);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false);
System.out.println("队列一收到ack");
} catch (Exception e) {
// 如果处理消息时出现异常,可以拒绝消息
Long deliveryTag = (Long) message.getMessageProperties().getDeliveryTag();
if (!channel.isOpen()) {
// 如果channel已经关闭,则无法执行basicNack或basicReject
return;
}
try {
channel.basicNack(deliveryTag, false, true); // 拒绝消息并重新放回队列
} catch (IOException ex) {
ex.printStackTrace();
}
// 或者可以选择 basicReject 如果不需要重新放回队列
// channel.basicReject(deliveryTag, true); // 拒绝消息并丢弃
}
}
@RabbitListener(queues = "router_queue_two")
public void getTopicMessageTwo(@Payload Message message, @Header(AmqpHeaders.CHANNEL) Channel channel) {
try {
// 获取消息内容
String messageBody = new String(message.getBody());
System.out.println("队列二收到消息:"+messageBody);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false);
System.out.println("队列二收到ack");
} catch (Exception e) {
// 如果处理消息时出现异常,可以拒绝消息
Long deliveryTag = (Long) message.getMessageProperties().getDeliveryTag();
if (!channel.isOpen()) {
// 如果channel已经关闭,则无法执行basicNack或basicReject
return;
}
try {
channel.basicNack(deliveryTag, false, true); // 拒绝消息并重新放回队列
} catch (IOException ex) {
ex.printStackTrace();
}
// 或者可以选择 basicReject 如果不需要重新放回队列
// channel.basicReject(deliveryTag, true); // 拒绝消息并丢弃
}
}
@RabbitListener(queues = "router_queue_three")
public void getTopicMessageThree(@Payload Message message, @Header(AmqpHeaders.CHANNEL) Channel channel) {
try {
// 获取消息内容
String messageBody = new String(message.getBody());
System.out.println("队列三收到消息:"+messageBody);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false);
System.out.println("队列三收到ack");
} catch (Exception e) {
// 如果处理消息时出现异常,可以拒绝消息
Long deliveryTag = (Long) message.getMessageProperties().getDeliveryTag();
if (!channel.isOpen()) {
// 如果channel已经关闭,则无法执行basicNack或basicReject
return;
}
try {
channel.basicNack(deliveryTag, false, true); // 拒绝消息并重新放回队列
} catch (IOException ex) {
ex.printStackTrace();
}
// 或者可以选择 basicReject 如果不需要重新放回队列
// channel.basicReject(deliveryTag, true); // 拒绝消息并丢弃
}
}
}
这里生产者发送消息指定了key为aaa的,所以只有消费者一匹配并接收到消息
总结
以上就是本次测试的所以队列名,大家可以在测试的时候进入可视化界面查看消息、队列、交换机状态。本文只是简单的对RabbitMq的各种模式进行简单了解,后续的如何在项目中实现、死信队列等在下章博客分享,对于本篇如有错误的地方欢迎大家指正。
代码
wangweiyuyu/rabbitmq - 码云 - 开源中国 (gitee.com)https://gitee.com/wangweiyuyu/rabbitmq
版权归原作者 沐 白 所有, 如有侵权,请联系我们删除。