0


RabbitMQ简单测试(JAVA)

1.导入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.加配置(提前在你的linux服务器里引入rabbitmq:

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672rabbitmq:management)

spring.rabbitmq.host=192.168.56.10
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/

3.主启动类加上注解:@EnableRabbit

4.首先测试创建exchange

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class GulimallOrderApplicationTests {

    @Autowired
    AmqpAdmin amqpAdmin;

    /**
     * 1、如何创建Exchange、Queue、Binding
     *      1)、使用AmqpAdmin进行创建
     * 2、如何收发消息
     */
    @Test
    public void createExchange() {
        //参数:名字,是否持久化,是否自动删除
        DirectExchange directExchange = new DirectExchange("hello-java-exchange",
                true,false);
        amqpAdmin.declareExchange(directExchange);
        log.info("exchange[{}]创建成功","hello-java-exchange");
    }

}

输入URL(你的rabbitmq的地址):http://192.168.56.10:15672/

登录后可以看到确实创建成功(登录账号密码默认都为guest):

5测试queue

    @Test
    public void createQueue(){
        //参数:名字,是否持久化,是否排他,是否自动删除
        Queue queue = new Queue("hello-java-queue",true,false,false);
        amqpAdmin.declareQueue(queue);
        log.info("queue[{}]创建成功","hello-java-queue");
    }

6测试Binding

    @Test
    public void createBinding() {
        //参数:目标队列,目标类型,目标exchange,路由键,参数
        Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,
                "hello-java-exchange", "hello.java", null);
        amqpAdmin.declareBinding(binding);
        log.info("Binding[{}]创建成功","hello-java-binding");
    }

7测试简单的发送消息

    @Test
    public void sendMessageTest() {
        //1、发送消息
        String message = "Hello World";
        rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",message);
        log.info("消息发送完成:{}",message);
    }

8测试传输对象

@ToString
@Data
public class OrderReturnApplyEntity implements Serializable {
    private static final long serialVersionUID = 1L;

    /**
     * id
     */
    @TableId
    private Long id;
    /**
     * 退货原因名
     */
    private String name;
    /**
     * 排序
     */
    private Integer sort;
    /**
     * 启用状态
     */
    private Integer status;
    /**
     * create_time
     */
    private Date createTime;
}
@Configuration
public class MyRabbitConfig {
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}
    @Test
    public void sendMessageTest() {
        //1、发送消息,如果发送的消息是个对象,会使用序列化机制,将对象写出去,所以对象必须实现Serializable接口
        OrderReturnApplyEntity entity = new OrderReturnApplyEntity();
        entity.setId(1L);
        entity.setCreateTime(new Date());
        entity.setName("reason");
        entity.setStatus(1);
        entity.setSort(2);
        //2、发送的对象类型的消息,可以是一个json
        rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",entity);
        log.info("消息发送完成:{}",entity);
    }

9.测试RabbitListener

在service中加入:

    /**
     * Message message:原生消息详细信息,头加体
     * Channel channel 当前传输数据的通道
     * Queue 可以很多人监听,只要收到消息,队列删除消息,且只能有一个得到消息
     * 场景:
     *      1.订单服务启动多个,同一个消息,只能有一个客户端收到
     *      2.只有一个消息处理完,方法运行结束,才可以接收下一个消息
     * RabbitListener能标记到类或方法上(监听哪些队列即可)
     * RabbitHandler只能标在方法上(重载区分不同的消息)
     */
    @RabbitListener(queues = {"hello-java-queue"})
    public void receiveMessage(Message message){
        //{"id":1,"name":"reason","sort":2,"status":1,"createTime":1665201452403}
        byte[] body = message.getBody();
        //消息头属性信息
        MessageProperties messageProperties = message.getMessageProperties();
        System.out.println("message是"+message);
        String strRead = new String(body);
        strRead = String.copyValueOf(strRead.toCharArray(), 0, body.length);
        System.out.println("body是"+strRead);
        System.out.println("messageProperties是"+messageProperties);
    }

启动主程序,然后启动上面的sendMessageTest方法,则可以得到信息:

9.测试ConfirmCallBack和ReturnCallBack(服务端)

在MyRabbitConfig中加入:

    @Autowired
    private RabbitTemplate rabbitTemplate;

    
    /**
     * 定制RabbitTemplate
     * 1、服务收到消息就会回调
     *      1、spring.rabbitmq.publisher-confirms: true
     *      2、设置确认回调
     * 2、消息正确抵达队列就会进行回调
     *      1、spring.rabbitmq.publisher-returns: true
     *         spring.rabbitmq.template.mandatory: true
     *      2、设置确认回调ReturnCallback
     *
     * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
     *      1、默认是自动确认的,只要收到消息,客户端会自动确认,服务器会移除消息
     *          问题:收到很多消息后,自动回复给服务器ack,但只有一个消息处理成功了,宕机了,则发生消息丢失
     *          则应该手动确认(除非明确签收,则会一直处于unack状态。即使服务器宕机,消息也不会丢失,重新变成ready状态)
     *
     */
    // @PostConstruct  //MyRabbitConfig对象创建完成以后,执行这个方法
    public void initRabbitTemplate() {

        /**
         * 1、只要消息抵达Broker就ack=true
         * correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
         * ack:消息是否成功收到
         * cause:失败的原因
         */
        //设置确认回调
        rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
            System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
        });

        /**
         * 只要消息没有投递给指定的队列,就触发这个失败回调
         * message:投递失败的消息详细信息
         * replyCode:回复的状态码
         * replyText:回复的文本内容
         * exchange:当时这个消息发给哪个交换机
         * routingKey:当时这个消息用哪个路邮键
         */
        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
            System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
                    "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
        });
    }

在配置文件中加入:

#开启发送端确认
spring.rabbitmq.publisher-confirms=true
#开启发送端消息抵达队列的确认
spring.rabbitmq.publisher-returns=true
#只要抵达队列,以异步模式优先进行回调ReturnConfirm
spring.rabbitmq.template.mandatory=true

执行sendMessageTest方法可以看到打印台的信息执行了confirmCallBack方法,说明消息到了服务器。然后对sendMessageTest方法进行修改:

rabbitTemplate.convertAndSend("hello-java-exchange","hello111.java",
                entity,new CorrelationData(UUID.randomUUID().toString()));

可以看到打印台的信息是执行了setReturnCallBack方法,因为并没有这样的routing-key,消息并没有进入队列中。

10.手动收货测试

对receiveMessage方法进行修改:

    @RabbitListener(queues = {"hello-java-queue"})
    public void receiveMessage(Message message,Channel channel){
        //{"id":1,"name":"reason","sort":2,"status":1,"createTime":1665201452403}
        byte[] body = message.getBody();
        //消息头属性信息
        MessageProperties messageProperties = message.getMessageProperties();
        System.out.println("message是"+message);
        String strRead = new String(body);
        strRead = String.copyValueOf(strRead.toCharArray(), 0, body.length);
        System.out.println("body是"+strRead);
        System.out.println("messageProperties是"+messageProperties);
        //channel内按顺序自增自增
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("deliveryTag:"+deliveryTag);
        try {
            if(deliveryTag%2==0){
                //签收,并且不会批量签收
                channel.basicAck(deliveryTag,false);
                System.out.println("签收货物:"+deliveryTag);
            }else{
                //退货
                //channel.basicReject(deliveryTag,false);
                //效果和basicReject差不多,多得一个参数代表是否退货后丢弃,true是重新入队,false则是直接丢
                channel.basicNack(deliveryTag,false,false);
                System.out.println("没有签收货物:"+deliveryTag);
            }
        } catch (IOException e) {
            //网络中断
            e.printStackTrace();
        }
    }

配置文件中加上:

#手动ack(确认收货)消息
spring.rabbitmq.listener.direct.acknowledge-mode=manual

可以看到,可以手动签收消息和拒收消息


本文转载自: https://blog.csdn.net/xushuai2333333/article/details/127104274
版权归原作者 Hahahahahahaha~ 所有, 如有侵权,请联系我们删除。

“RabbitMQ简单测试(JAVA)”的评论:

还没有评论