如图:
示例代码:
pom文件添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml配置rabbitmq连接
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /guoVirtualHost
listener:
simple:
retry:
# 开启消费者(出现异常)会进行重试
enabled: true
# 默认重试无限次,所以需指定重试次数
max-attempts: 5
# 重试间隔时间
initial-interval: 3000
# 保证消费者会消费消息,手动确认
acknowledge-mode: manual
#定义交换机、路由、队列名称
guoguo:
order:
exchange: guoguo_order_exchange
routingKey: guoguo.order
queue: guoguo_order_queue
定义RabbitmqConfig配置类,Exchange,Routingkey,Queue
@Component
public class OrderConfig {
@Value("${guoguo.order.exchange}")
private String orderExchange;
@Value("${guoguo.order.routingKey}")
private String orderRoutingKey;
@Value("${guoguo.order.queue}")
private String orderQueue;
@Bean
public DirectExchange orderExchange(){
return new DirectExchange(orderExchange);
}
@Bean
public Queue orderQueue(){
return new Queue(orderQueue,true,false,false);
}
@Bean
public Binding orderBinding(Queue orderQueue,DirectExchange orderExchange){
//方式1:Queue orderQueue,DirectExchange orderExchange 会去ioc容器中找到Bean
return BindingBuilder.bind(orderQueue).to(orderExchange).with(orderRoutingKey);
//方式2:直接调用方法
//return BindingBuilder.bind(orderQueue()).to( orderExchange()).with(orderRoutingKey);
}
定义发送消息工具类:
@Component
@Slf4j
public class SendMessageUtlis {
@Autowired
RabbitTemplate amqpTemplate;
//订单队列和交换机
@Value("${guoguo.order.exchange}")
private String orderExchange;
@Value("${guoguo.order.routingKey}")
private String orderRoutingKey;
public void sendMessage(Order order){
amqpTemplate.convertAndSend(orderExchange,orderRoutingKey,order,message -> {
return message;
});
}
发送消息:
@RequestMapping("/addOrder")
public String addOrder(){
String quanjuId = System.currentTimeMillis()+ "";
//生产者投递消息时携带全局id发送到mq服务器端
Order order = new Order(1, "mq演示", 66, "男",quanjuId);
log.info("全局id:"+quanjuId);
sendMessage.sendMessage(order );
//返回全局id给客户端
return quanjuId;
}
消费者监听消息:
@Autowired
OrderDao orderDao;
@RabbitListener(queues = "guoguo_order_queue")
public void process(Order order, Message message,Channel channel) throws IOException {
if (StringUtils.isEmpty(order.getQuanjuId())){
return;
}
//拿到全局id
Order orders = orderDao.getQuanjuId(order.getQuanjuId());
if (order!= null){
log.info("已经被消费");
//如果不为null,则已经被消费过,也需要告诉mq服务器端,将该消息删除
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
return;
}
int result = orderDao.saveOrder(order);
log.info("插入成功");
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
标签:
rabbitmq
本文转载自: https://blog.csdn.net/qq_56808014/article/details/126741088
版权归原作者 WontCode 所有, 如有侵权,请联系我们删除。
版权归原作者 WontCode 所有, 如有侵权,请联系我们删除。