##2023年12月16日 20:25:36 项目中使用RabbitMQ作为应用间信息互通,本次梳理下关于MQ的使用。
1、引入依赖
<!-- 引入依赖,使用v2.5.6版本 -->
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.5.6</version>
</dependency>
</dependencies>
2、在Nacos中配置MQ信息,此次项目中使用了两个MQ实例(①已方提供MQ Server,本项目内各系统信息互通;②甲方提供MQ Server,与甲方做信息互通),本次介绍mq1
spring:
rabbitmq:
mq1:
host: 127.0.0.1
port: 5672
username: admin
password: ****
enable: false ##队列是否启用,在Configuration中来确认是否初始化MQ
mq2:
host: 127.0.0.2
port: 5672
username: admin
password: ***
enable: false
3、编写配置文件
@Data
@Component("mq1RabbitmqConfig")
@ConfigurationProperties(prefix = "spring.rabbitmq.mq1") //读取mq1的配置信息
@ConditionalOnProperty(name = "spring.rabbitmq.mq1.enable", havingValue = "true") //是否启用
public class MQ1RabbitConfiguration {
private String host;
private Integer port;
private String username;
private String password;
@Autowired
private ReturnCallBack returnCallBack;
@Autowired
private ConfirmCallBack confirmCallBack;
@Bean(value = "mq1ConnectionFactory") //命名mq1的ConnectionFactory,如果项目中只有一个mq则不必如此
public ConnectionFactory createConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
//开启发送到交换机和队列的回调
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
return connectionFactory;
}
@Bean(name = "mq1RabbitTemplate") //命名mq1的RabbitTemplate,如果项目中只有一个mq则不必如此
public RabbitTemplate brainRabbitTemplate(@Qualifier("mq1ConnectionFactory") ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//发送消息时设置强制标志,仅当提供了returnCallback时才适用
rabbitTemplate.setMandatory(true);
//确保消息是否发送到交换机,成功与失败都会触发
rabbitTemplate.setConfirmCallback(confirmCallBack);
//确保消息是否发送到队列,成功发送不触发,失败触发
rabbitTemplate.setReturnsCallback(returnCallBack);
return rabbitTemplate;
}
@Bean("mq1RabbitListenerContainerFactory")//命名mq1的RabbitListenerContainerFactory,如果项目中只有一个mq则不必如此
SimpleRabbitListenerContainerFactory mq1RabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("mq1ConnectionFactory") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory listenerContainerFactory = new SimpleRabbitListenerContainerFactory();
configurer.configure(listenerContainerFactory, connectionFactory);
return listenerContainerFactory;
}
}
4、编写回调callback,此一举措是为了记录交换机、队列本身是否健康,如业务无此细纠,也可不记录。
ConfirmCallBack.java :消息发送到交换机成功、失败都会回调
ReturnCallBack.java:消息发送队列失败回调
@Component
@Slf4j
public class ConfirmCallBack implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
if(!ack){
log.info("消息发送交换机失败:{}",s);
}else{
log.info("消息发送交换机成功");
}
}
}
@Component
@Slf4j
public class ReturnCallBack implements RabbitTemplate.ReturnsCallback {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.info("消息发送队列失败:{}", JSON.toJSON(returnedMessage));
}
}
5、MQ工具类,方便各业务点调用,减少代码冗余。
@Component
@ConditionalOnProperty(name = "spring.rabbitmq.mq1.enable", havingValue = "true") //是否启用
public class RabbitMQUtil {
@Resource(name = "mq1RabbitTemplate") //初始化mq1的RabbitTemplate对象,如果项目中只有一个MQ,则无需这么麻烦,
private RabbitTemplate mq1RabbitTemplate;
public void test1(List ob){
//发送到fanout类型的交换机,跟这个交换机绑定的队列都会收到这一条消息,故第二个参数routekey无需填写
mq1RabbitTemplate.convertAndSend("fanout.exchange",null, JSON.toJSONString(ob));
}
public void test2(String str){
//发送到direct类型的交换机,根据routekey去找发送到哪个队列里,只有这一个队列才能收到这条消息
mq1RabbitTemplate.convertAndSend("direct.exchange","routekey",str);
}
}
6、生产者Producer
@Service
public class producer {
@Autowired(required = false) //一定要加required=false,否则mq配置不启用时,这里会报错
private RabbitMQUtil rabbitMQUtil;
public void producer(List ob) {
if(rabbitMQUtil != null){
rabbitMQUtil.test1(ob);
}
}
}
7、消费者Consumer(Listener)
①此处采用动态绑定生成队列的方式,可以满足多据点的模式,部署A据点后,根据A的特征值自动创建关于A的exchage、queue(mq1Queue**_A**),免去了在java类中显式声明并绑定exchage和queue的代码
②此处使用了死信队列 x-dead-letter-exchange,可以在msg消费失败时让msg进入对应死信队列,进而监听死信队列进行补偿操作。
③我们也可以用死信队列去做延时队列:给一个队列提前绑定好死信队列,发送消息到队列并设置消息的timeout,到点未消费也会扔到死信队列,通过监听对应的死信队列达到延时队列的作用
@Slf4j
@Component
@ConditionalOnExpression(value = "${spring.rabbitmq.mq1.enable:false} && '${aa.bb}' eq 'cc' ") //mq1队列启用,并且满足某些条件才初始化
public class MQ1ccRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "mq1Queue**_A**"
, arguments = {@Argument(name = "x-dead-letter-exchange", value = "dead.letter.exchange"),
@Argument(name = "x-dead-letter-routing-key", value = "dead.letter.routekey")}
),
exchange = @Exchange(name = "mq1_fanout_exchange",type = ExchangeTypes.FANOUT)
),ackMode = "MANUAL")
public void mq1listener1(Message message,Channel channel, String poJson) throws IOException {
try {
MyClass po = JSONUtil.toBean(poJson,MyClass.class);
if(myService.saveOrUpdate(po)){
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else {
//第二个参数为false时,进入死信队列消费,true时重新进入队列头
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}
} catch (Exception e) {
log.info("队列{}消费省级目录信息失败:{},异常信息:{}",RabbitMqConstants.DOWN_REGION_QUEUES + "_${unified.areaCode}",poJson,e.getMessage());
//第二个参数为false时,进入死信队列消费,true时重新进入队列头
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}
}
}
8、显式绑定死信队列的exchange和queue(其他队列也可以这样绑,死信队列和其他队列没有任何区别,只是说可以在普通队列中去用 **x-dead-letter-exchange **去标记一个队列的死信队列,让普通队列消费失败时,将消息扔到死信队列。
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.mq1.enable", havingValue = "true")
public class BrainDiedConfig {
@Resource(name = "mq1RabbitTemplate")
private RabbitTemplate brainRabbitTemplate;
@Bean("dead.letter.exchange")
public Exchange diedExchange(){
return new DirectExchange("dead.letter.exchange");
}
@Bean("dead.letter.queues")
public Queue diedQueue(){
return new Queue("dead.letter.queues",true);
}
@Bean
public Binding bindingDied(@Qualifier("dead.letter.queues") Queue diedQueue,@Qualifier("dead.letter.exchange") Exchange diedExchange){
return BindingBuilder.bind(diedQueue).to(diedExchange).with("dead.letter.routeke").noargs();
}
}
9、消费死信队列
@RabbitListener(queues = "dead.letter.queues")
public void diedLetterConsumer(Message message, Channel channel, String poJson){
try {
String queueName = (String) message.getMessageProperties().getHeaders().get("x-first-death-queue");
String exchangeName = (String) message.getMessageProperties().getHeaders().get("x-first-death-exchange");
dao.save(queueName, exchangeName, poJson);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
10、死信队列建表语句
create table died_letter
(
id bigint auto_increment
primary key,
queue_name varchar(50) not null comment '队列名称'
exchange_name varchar(50) not null comment '交换机名称',
msg_json mediumtext not null comment '队列信息',
create_time datetime default CURRENT_TIMESTAMP not null comment '生成时间',
is_handle tinyint default 0 not null comment '是否被处理,0:未处理;1:已处理'
)
comment '死信队列信息';
版权归原作者 蓝色土耳其love 所有, 如有侵权,请联系我们删除。