1.导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.加配置(提前在你的linux服务器里引入rabbitmq:
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672rabbitmq:management)
spring.rabbitmq.host=192.168.56.10
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
3.主启动类加上注解:@EnableRabbit
4.首先测试创建exchange
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class GulimallOrderApplicationTests {
@Autowired
AmqpAdmin amqpAdmin;
/**
* 1、如何创建Exchange、Queue、Binding
* 1)、使用AmqpAdmin进行创建
* 2、如何收发消息
*/
@Test
public void createExchange() {
//参数:名字,是否持久化,是否自动删除
DirectExchange directExchange = new DirectExchange("hello-java-exchange",
true,false);
amqpAdmin.declareExchange(directExchange);
log.info("exchange[{}]创建成功","hello-java-exchange");
}
}
输入URL(你的rabbitmq的地址):http://192.168.56.10:15672/
登录后可以看到确实创建成功(登录账号密码默认都为guest):
5测试queue
@Test
public void createQueue(){
//参数:名字,是否持久化,是否排他,是否自动删除
Queue queue = new Queue("hello-java-queue",true,false,false);
amqpAdmin.declareQueue(queue);
log.info("queue[{}]创建成功","hello-java-queue");
}
6测试Binding
@Test
public void createBinding() {
//参数:目标队列,目标类型,目标exchange,路由键,参数
Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,
"hello-java-exchange", "hello.java", null);
amqpAdmin.declareBinding(binding);
log.info("Binding[{}]创建成功","hello-java-binding");
}
7测试简单的发送消息
@Test
public void sendMessageTest() {
//1、发送消息
String message = "Hello World";
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",message);
log.info("消息发送完成:{}",message);
}
8测试传输对象
@ToString
@Data
public class OrderReturnApplyEntity implements Serializable {
private static final long serialVersionUID = 1L;
/**
* id
*/
@TableId
private Long id;
/**
* 退货原因名
*/
private String name;
/**
* 排序
*/
private Integer sort;
/**
* 启用状态
*/
private Integer status;
/**
* create_time
*/
private Date createTime;
}
@Configuration
public class MyRabbitConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
@Test
public void sendMessageTest() {
//1、发送消息,如果发送的消息是个对象,会使用序列化机制,将对象写出去,所以对象必须实现Serializable接口
OrderReturnApplyEntity entity = new OrderReturnApplyEntity();
entity.setId(1L);
entity.setCreateTime(new Date());
entity.setName("reason");
entity.setStatus(1);
entity.setSort(2);
//2、发送的对象类型的消息,可以是一个json
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",entity);
log.info("消息发送完成:{}",entity);
}
9.测试RabbitListener
在service中加入:
/**
* Message message:原生消息详细信息,头加体
* Channel channel 当前传输数据的通道
* Queue 可以很多人监听,只要收到消息,队列删除消息,且只能有一个得到消息
* 场景:
* 1.订单服务启动多个,同一个消息,只能有一个客户端收到
* 2.只有一个消息处理完,方法运行结束,才可以接收下一个消息
* RabbitListener能标记到类或方法上(监听哪些队列即可)
* RabbitHandler只能标在方法上(重载区分不同的消息)
*/
@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage(Message message){
//{"id":1,"name":"reason","sort":2,"status":1,"createTime":1665201452403}
byte[] body = message.getBody();
//消息头属性信息
MessageProperties messageProperties = message.getMessageProperties();
System.out.println("message是"+message);
String strRead = new String(body);
strRead = String.copyValueOf(strRead.toCharArray(), 0, body.length);
System.out.println("body是"+strRead);
System.out.println("messageProperties是"+messageProperties);
}
启动主程序,然后启动上面的sendMessageTest方法,则可以得到信息:
9.测试ConfirmCallBack和ReturnCallBack(服务端)
在MyRabbitConfig中加入:
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 定制RabbitTemplate
* 1、服务收到消息就会回调
* 1、spring.rabbitmq.publisher-confirms: true
* 2、设置确认回调
* 2、消息正确抵达队列就会进行回调
* 1、spring.rabbitmq.publisher-returns: true
* spring.rabbitmq.template.mandatory: true
* 2、设置确认回调ReturnCallback
*
* 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
* 1、默认是自动确认的,只要收到消息,客户端会自动确认,服务器会移除消息
* 问题:收到很多消息后,自动回复给服务器ack,但只有一个消息处理成功了,宕机了,则发生消息丢失
* 则应该手动确认(除非明确签收,则会一直处于unack状态。即使服务器宕机,消息也不会丢失,重新变成ready状态)
*
*/
// @PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法
public void initRabbitTemplate() {
/**
* 1、只要消息抵达Broker就ack=true
* correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
* ack:消息是否成功收到
* cause:失败的原因
*/
//设置确认回调
rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
});
/**
* 只要消息没有投递给指定的队列,就触发这个失败回调
* message:投递失败的消息详细信息
* replyCode:回复的状态码
* replyText:回复的文本内容
* exchange:当时这个消息发给哪个交换机
* routingKey:当时这个消息用哪个路邮键
*/
rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
});
}
在配置文件中加入:
#开启发送端确认
spring.rabbitmq.publisher-confirms=true
#开启发送端消息抵达队列的确认
spring.rabbitmq.publisher-returns=true
#只要抵达队列,以异步模式优先进行回调ReturnConfirm
spring.rabbitmq.template.mandatory=true
执行sendMessageTest方法可以看到打印台的信息执行了confirmCallBack方法,说明消息到了服务器。然后对sendMessageTest方法进行修改:
rabbitTemplate.convertAndSend("hello-java-exchange","hello111.java",
entity,new CorrelationData(UUID.randomUUID().toString()));
可以看到打印台的信息是执行了setReturnCallBack方法,因为并没有这样的routing-key,消息并没有进入队列中。
10.手动收货测试
对receiveMessage方法进行修改:
@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage(Message message,Channel channel){
//{"id":1,"name":"reason","sort":2,"status":1,"createTime":1665201452403}
byte[] body = message.getBody();
//消息头属性信息
MessageProperties messageProperties = message.getMessageProperties();
System.out.println("message是"+message);
String strRead = new String(body);
strRead = String.copyValueOf(strRead.toCharArray(), 0, body.length);
System.out.println("body是"+strRead);
System.out.println("messageProperties是"+messageProperties);
//channel内按顺序自增自增
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("deliveryTag:"+deliveryTag);
try {
if(deliveryTag%2==0){
//签收,并且不会批量签收
channel.basicAck(deliveryTag,false);
System.out.println("签收货物:"+deliveryTag);
}else{
//退货
//channel.basicReject(deliveryTag,false);
//效果和basicReject差不多,多得一个参数代表是否退货后丢弃,true是重新入队,false则是直接丢
channel.basicNack(deliveryTag,false,false);
System.out.println("没有签收货物:"+deliveryTag);
}
} catch (IOException e) {
//网络中断
e.printStackTrace();
}
}
配置文件中加上:
#手动ack(确认收货)消息
spring.rabbitmq.listener.direct.acknowledge-mode=manual
可以看到,可以手动签收消息和拒收消息
版权归原作者 Hahahahahahaha~ 所有, 如有侵权,请联系我们删除。