一、实现消息可靠性投递
1.1、消息生产者端确认机制
修改yml
spring: rabbitmq: host: 192.168.200.110 port: 5672 username: guest password: 123456 virtual-host: / publisher-confirm-type: correlated #交换机确认 publisher-returns: true #队列确认
@Configuration
@Slf4j
public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void initRabbitTemplate(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
//消息发送到交换机成功或失败都会调用
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("correlationData:" + correlationData);
log.info("ack:" + ack);
log.info("cause:" + cause);
}
//发送到队列失败调用
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.info("消息主体:" + new String(returnedMessage.getMessage().getBody()));
log.info("应答码:" + returnedMessage.getReplyCode());
log.info("描述:" + returnedMessage.getReplyText());
log.info("使用交换机:" + returnedMessage.getExchange());
log.info("消息使用的路由键:" + returnedMessage.getRoutingKey());
}
}
@SpringBootTest
public class MQTest {
public static final String EXCHANGE_DIRECT = "exchange.direct.order";
public static final String ROUTING_KEY = "order";
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test1(){
rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"你好,美羊羊");
}
}
1.2、备份交换机
创建备份交换机
创建绑定队列
将原交换机与备份交换机绑定
@Test
public void test1(){
//交换机名要对,路由键错才行
rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY + "1","你好,慢羊羊");
}
1.3、消费端确认机制
修改yml
spring: rabbitmq: host: 192.168.200.110 port: 5672 username: guest password: 123456 virtual-host: / listener: simple: acknowledge-mode: manual #手动确认
@RabbitListener(queues = {QUEUE_NAME})
public void getMessage(String date, Message message, Channel channel) throws IOException {
//获取当前deliveryTagID
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info(" "+1 / 0);
//成功返回ACK信息
channel.basicAck(deliveryTag,false);
log.info("接收消息为:" + date);
} catch (Exception e) {
//获取消息是否重复投递
Boolean redelivered = message.getMessageProperties().getRedelivered();
//失败返回NACK信息
if (redelivered){
//long var1,
// boolean var3,
// boolean var4 控制消息是否重新放回队列
channel.basicNack(deliveryTag,false,false);
}else {
channel.basicNack(deliveryTag,false,true);
}
throw new RuntimeException(e);
}
}
}
二、消费端限流设置
只需要修改yml
spring: rabbitmq: host: 192.168.200.110 port: 5672 username: guest password: 123456 virtual-host: / listener: simple: acknowledge-mode: manual #手动确认 prefetch: 1 #设置每次从队列中读取消息数
三、消息超时设置
3.1、从队列设置全局超时时间
3.2、设置消息本身超时时间
@Test
public void test4(){
//创建消息后置处理器
MessagePostProcessor messagePostProcessor = message -> {
//设置过期时间,单位毫秒
message.getMessageProperties().setExpiration("10000");
return message;
};
rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"a",messagePostProcessor);
}
四、死信
4.1、消费端拒绝接收消息
4.1.1、创建死信交换机与队列
正常创建绑定即可
4.1.2、创建常规交换机与队列
创建常规队列注意事项
//监听正常队列
//@RabbitListener(queues = {QUEUE_NAME_NORMAL})
public void getMessageNormal(String date, Message message, Channel channel) throws Exception {
//获取当前deliveryTagID
long deliveryTag = message.getMessageProperties().getDeliveryTag();
//返回ACK信息
channel.basicReject(deliveryTag,false);
log.info("拒绝接收消息" );
}
//监听死信队列
//@RabbitListener(queues = {QUEUE_NAME_DEAD})
public void getMessageDead(String date, Message message, Channel channel) throws Exception {
//获取当前deliveryTagID
long deliveryTag = message.getMessageProperties().getDeliveryTag();
//成功返回ACK信息
channel.basicAck(deliveryTag,false);
log.info("接收消息为:" + date);
}
4.2、消息数量超过队列容纳限度
@Test
public void test5(){
for (int i = 1; i <=20 ; i++) {
rabbitTemplate.convertAndSend(EXCHANGE_DIRECT_NORMAL,ROUTING_KEY_NORMAL,"a"+i);
}
}
五、延迟队列
5.1、使用死信队列实现
5.2、使用插件实现
docker inspect rabbitmq
下载的插件放入source后的目录
进入容器内部
docker exec -it rabbitmq /bin/bash
启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
退出容器
exit
重启容器
docker restart rabbitmq
5.3、创建交换机与队列
队列正常创建,无需参数设置
测试代码:
public static final String EXCHANGE_DIRECT_DELAY = "exchange.delay";
public static final String ROUTING_KEY_DELAY = "delay";
@Test
public void test6(){
//创建消息后置处理器
MessagePostProcessor messagePostProcessor = message -> {
//设置过期时间,单位毫秒
//必须安装启动延迟插件设置才生效
message.getMessageProperties().setHeader("x-delay","10000");
return message;
};
rabbitTemplate.convertAndSend(EXCHANGE_DIRECT_DELAY,ROUTING_KEY_DELAY,
"你好,插件" + new SimpleDateFormat("HH:mm:ss").format(new Date()),messagePostProcessor);
}
public static final String QUEUE_NAME_DELAY = "queue.delay";
@RabbitListener(queues = {QUEUE_NAME_DELAY})
public void getMessageDelay(String date, Message message, Channel channel) throws Exception {
//获取当前deliveryTagID
long deliveryTag = message.getMessageProperties().getDeliveryTag();
//成功返回ACK信息
channel.basicAck(deliveryTag,false);
log.info("接收消息为:" + date);
log.info("当前时间为:" + new SimpleDateFormat("HH:mm:ss").format(new Date()));
}
六、事务消息
在Java配置类进行设置
@Bean
public RabbitTransactionManager transactionManager(CachingConnectionFactory cachingConnectionFactory){
return new RabbitTransactionManager(cachingConnectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory cachingConnectionFactory){
RabbitTemplate rabbitTemplate1 = new RabbitTemplate(cachingConnectionFactory);
rabbitTemplate1.setChannelTransacted(true);
return rabbitTemplate1;
}
@Test
@Transactional
@Rollback(value = false)
public void test7(){
rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"异常前");
int var = 3 / 0;
rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"异常后");
}
七、优先级队列
创建交换机与队列
@Test
public void test8(){
//创建消息后置处理器
MessagePostProcessor messagePostProcessor = message -> {
message.getMessageProperties().setPriority(3);
return message;
};
rabbitTemplate.convertAndSend("exchange.priority","priority","第3级",messagePostProcessor);
}
版权归原作者 鸭鸭老板 所有, 如有侵权,请联系我们删除。