相关依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.13</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<scope>test</scope>
</dependency>
1.创建简单的发送模式(famout模式)
1.明确生产者还有消费者端的rabbitmq的配置是一样的
生产者的配置
server:
port: 8082
spring:
rabbitmq:
host: 127.0.0.1
username: guest
password: guest
listener:
simple:
retry:
enabled: true #是否支持重试
max-attempts: 3 #重试最大次数,默认3条
max-interval: 1000ms #重试最大时间间隔
template:
mandatory: true
publisher-confirm-type: correlated
消费者端配置
server:
port: 8081
spring:
rabbitmq:
host: 127.0.0.1
username: guest
password: guest
listener:
simple:
acknowledge-mode: manual #手动应答
prefetch: 1 #表示消费者端每次从队列拉取多少消息消费,直到手动确认消费完毕后,才会继续拉取下一条
default-requeue-rejected: false #消费被拒绝时true:重回队列,false否
retry:
enabled: true #是否支持重试
max-attempts: 3 #重试最大次数,默认3条
max-interval: 1000ms #重试最大时间间隔
template:
mandatory: true
publisher-confirm-type: correlated
RabbitMq的相关配置(消费端和生产端是共用的)
@Configuration
public class RabbitMQConfiguration {
//创建队列相应的名称
public static final String QUEUE_FAMOUT = "queue_famout";
//创建交换机相应的名称
public static final String EXCHANGE_FAMOUT = "exchange_famout";
//创建队列
@Bean(value = QUEUE_FAMOUT)
public Queue queue_famout(){
/**
* 参数1:队列名称
* 参数2:是否持久化
* 参数3:是否排他,false不排他
* 参数4:是否自动删除
*/
return new Queue(QUEUE_FAMOUT,true,false,true);
}
//创建交换机
@Bean(value = EXCHANGE_FAMOUT)
public Exchange exchange_famout(){
return ExchangeBuilder.fanoutExchange(EXCHANGE_FAMOUT).durable(true).build();
}
//绑定交换机
@Bean
public Binding bind_famout(@Qualifier(EXCHANGE_FAMOUT) Exchange exchange,@Qualifier(QUEUE_FAMOUT) Queue queue){
//with中的是routking key
return BindingBuilder.bind(queue).to(exchange).with("").noargs();
}
}
编写测试类:
其中的setConfirmCallback方法就是开启Confirm机制以后的回调,其中继承的方法的参数分别是
correlationData:内含一个唯一id的对象
ack:回调结果显示,表示的是是否从生产端发送到交换机上,成功是true
case:表示失败的原因。成功为null
@SpringBootTest
@RunWith(SpringRunner.class)
public class MQTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void famout_test(){
//使用发送消息
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("消息回调成功");
System.out.println("接收的结果是"+ack);
//判断是否成功接收
if(ack){
System.out.println("接收成功"+cause);
}else {
System.out.println("接收失败"+cause);
}
}
});
//发送消息
CorrelationData correlationData = new CorrelationData();
correlationData.setId("100");
rabbitTemplate.convertAndSend(RabbitMQConfiguration.EXCHANGE_FAMOUT,"","hello world",correlationData);
System.out.println("消息发送成功");
}
}
结果截图:
![](https://img-blog.csdnimg.cn/a025c863b6914bdbb0bf28f575a27ac1.png)![](https://img-blog.csdnimg.cn/b4ca79b2dece4feb9cdbb7fffb303547.png)
在控制界面能看到信息发送到交换机上面了,但是在回调方法中显示的却是false,是因为回调方法还没有得到消息,就断开连接了,这里可以使用RESTful的方式来查看。
@RestController
@RequestMapping("/test")
public class ProductorController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping()
public void famout_queue(){
//使用发送消息
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("消息回调成功");
System.out.println("接收的结果是"+ack);
//判断是否成功接收
if(ack){
System.out.println("接收成功"+cause);
}else {
System.out.println("接收失败"+cause);
}
}
});
//发送消息
CorrelationData correlationData = new CorrelationData();
correlationData.setId("100");
rabbitTemplate.convertAndSend(RabbitMQConfiguration.EXCHANGE_FAMOUT,"","hello world",correlationData);
System.out.println("消息发送成功");
}
}
在运行结果:
生产端的Confirm机制生效了
消费端:
@Component
//实现ChannelAwareMessageListener接口,实现其中的方法
public class CustomerListing implements ChannelAwareMessageListener {
@Override
//监听的队列名
@RabbitListener(queues = RabbitMQConfiguration.QUEUE_FAMOUT)
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("消息是:"+new String(message.getBody()));
try{
//手动签收成功
TimeUnit.SECONDS.sleep(1);//睡眠一秒钟
//basicAck中的两个参数,一个是标签,还有一个是是否签收 channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
System.out.println(new Date()+"手动签收成功");
}catch (Exception e){
/* *
*前两个参数一样
* requeue:如果为true那么消息重新回到队列中
*/
channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true);
System.out.println("手动签收失败");
}
}
}
延迟队列的创建
rabbitmq是没有延迟队列的,那么实现延迟队列就需要用到TTl+死信队列
实现的原理:设置过期时间,时间到了就会到死信队列,然后消费者通过死信队列获取消息,来达到延迟的效果
TTL:表示过期时间
@Configuration
public class RabbitMQConfiguration {
//创建队列相应的名称
public static final String QUEUE_FAMOUT = "queue_famout";
//创建死信队列
public static final String QUEUE_TOP_DLX = "queue_top_dlx";
//创建Top队列
public static final String QUEUE_TOP = "queue_top";
//创建交换机相应的名称
public static final String EXCHANGE_FAMOUT = "exchange_famout";
//创建死信交换机
public static final String EXCHANGE_TOP_DLX = "exchange_top_dlx";
//创建队列
@Bean(value = QUEUE_FAMOUT)
public Queue queue_famout(){
//设置参数,简单分发模式不添加任何参数
return new Queue(QUEUE_FAMOUT,true,false,true,paramMap);
}
//创建队列
@Bean(value = QUEUE_TOP)
public Queue exchange_top(){
//设置过期时间,并设置绑定的死信队列
Map<String,Object> paramMap = new HashMap<String,Object>();
//设置过期时间
paramMap.put("x-message-ttl",2000);
//设置绑定的死信队列名称
paramMap.put("x-dead-letter-exchange",EXCHANGE_TOP_DLX);
//路由
paramMap.put("x-dead-letter-routing-key","dead");
return new Queue(QUEUE_TOP,true,false,true,paramMap);
}
//创建死信队列
@Bean(value = QUEUE_TOP_DLX)
public Queue queue_top_dlx(){
return new Queue(QUEUE_TOP_DLX,true,false,true);
}
//创建交换机
@Bean(value = EXCHANGE_FAMOUT)
public Exchange exchange_famout(){
return ExchangeBuilder.fanoutExchange(EXCHANGE_FAMOUT).durable(true).build();
}
//创建死信交换机
@Bean(value = EXCHANGE_TOP_DLX)
public Exchange exchange_top_dlx(){
return ExchangeBuilder.topicExchange(EXCHANGE_TOP_DLX).durable(true).autoDelete().build();
}
//绑定交换机
@Bean
public Binding bind_famout(@Qualifier(EXCHANGE_FAMOUT) Exchange exchange,@Qualifier(QUEUE_FAMOUT) Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("").noargs();
}
//绑定交换机
@Bean
public Binding bind_top_dlx(@Qualifier(QUEUE_TOP_DLX) Queue queue ,@Qualifier(EXCHANGE_TOP_DLX) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("top.#").noargs();
}
}
死信队列的创建:
创建一个普通的交换机还有队列
创建一个死信交换机和死信队列
在普通的队列上绑定死信交换机
设置相关参数:
可以在控制面板找到,使用Map对象,放在创建队列参数的之后面
![](https://img-blog.csdnimg.cn/afe6afee0481462eb40a362ebe0dc8af.png)
消费端代码:
@Component
public class CustomerTopDlexListener implements ChannelAwareMessageListener {
@Override
@RabbitListener(queues = RabbitMQConfiguration.QUEUE_TOP_DLX)
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println(new Date() + new String(message.getBody()));
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
System.out.println("签收成功");
}catch (Exception e){
channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true);
System.out.println("签收失败");
}
}
}
路由的使用:
xxx.#:表示能匹配到以xxx.开头的路由
xxx.*:表示只能匹配到xxx.aaa,这种类型的路由
结果就不测试。
哪里有问题请指正,谢谢
版权归原作者 he1IoWorld 所有, 如有侵权,请联系我们删除。