0


Rabbitmq在java中的使用

1、原生java的使用

1.1、maven导入相关依赖

<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version></dependency><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.11.0</version></dependency></dependencies>

1.2、通用类及常用方法讲解

常用方法可以参考: RabbitMQ 常用方法介绍(二)

publicclassRabbitMqUtils{publicstaticChannelgetChannel()throwsIOException,TimeoutException{ConnectionFactory connectionFactory =newConnectionFactory();
        connectionFactory.setHost("rabbitIP");
        connectionFactory.setUsername("rabbit用户名");
        connectionFactory.setPassword("rabbit密码");
        connectionFactory.setPort(rabbit端口);return connectionFactory.newConnection().createChannel();}}// 消费者常用方法/*
 消费者消费消息
 1. 消费哪个队列
 2. 消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
 3. 消费成功的回调函数
 4. 消费者未成功消费的回调
*/
channel.basicConsume(QUEUE_NAME,true, deliverCallback, cancelCallback);// 预取值: 一次性取多少条消息,不满足则等待
channel.basicQos(1);/*
 消费失败,将当前tag的消息重新放入队列中
 1. 消息标记 tag
 2. 是否批量应答未应答消息
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);// 生产者常用方法/*
 生成一个队列
 1. 队列名称
 2. 队列里面的消息是否持久化 默认消息存储在内存中
 3. 该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
 4. 是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
 5. 其他参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);/*
 发送一个消息
 1. 发送到那个交换机
 2. 路由的 key 是哪个
 3. 其他的参数信息
 4. 发送消息的消息体
*/
channel.basicPublish("", QUEUE_NAME,null, message.getBytes());

1.3、编写不同的交换机类型

  1. Direct exchange(直连交换机)直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的,步骤如下:将一个队列绑定到某个交换机上,同时赋予该绑定一个路由键(routing key) 当一个携带着路由值为R的消息被发送给直连交换机时,交换机会把它路由给绑定值同样为R的队列。下列代码生产者,生成error 的路由信息, 消费者1获取info和warning消息,消费者2获取error消息所以生产者生产的消息全被消费者2获取并且消费[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IjUU7Ih3-1679553545469)(null)]// 生产者publicclassDirectLogs{publicstaticfinalString EXCHANGE_NAME ="direct_logs";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);Scanner systemInput =newScanner(inputStream);while(systemInput.hasNext()){String message = systemInput.next();// 发送error消息 channel.basicPublish(EXCHANGE_NAME,"error",null, message.getBytes(StandardCharsets.UTF_8));System.out.println("生产者发出消息: "+ message);} channel.close();}}// 消费者1publicclassReceiveLogsDirect1{publicstaticfinalString EXCHANGE_NAME ="direct_logs";publicstaticfinalString QUEUE_NAME ="console";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();// 声明一个交换机 channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);// 声明一个队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 绑定 info和warning消息 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"info"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"warning"); channel.basicConsume(QUEUE_NAME,true,((consumerTag, message)->{System.out.println("ReceiveLogsDirect1"+newString(message.getBody()));}), consumerTag ->{});}}// 消费者2publicclassReceiveLogsDirect2{publicstaticfinalString EXCHANGE_NAME ="direct_logs";publicstaticfinalString QUEUE_NAME ="disk";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();// 声明一个交换机 channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);// 声明一个队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 绑定error消息 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"error"); channel.basicConsume(QUEUE_NAME,true,((consumerTag, message)->{System.out.println("ReceiveLogsDirect2: "+newString(message.getBody()));}), consumerTag ->{});}}
  2. Fanout exchange(扇型交换机)扇型交换机(funout exchange)将消息路由给绑定到它身上的所有队列。不同于直连交换机,路由键在此类型上不启任务作用。如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的发送给这所有的N个队列下列代码中,所有的消费者都能消费消息,因为都绑定了同一队列 “logs”// 生产者publicclassEmitLog{publicstaticfinalString EXCHANGE_NAME ="logs";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"fanout");Scanner systemInput =newScanner(inputStream);while(systemInput.hasNext()){String message = systemInput.next(); channel.basicPublish(EXCHANGE_NAME,"",null, message.getBytes(StandardCharsets.UTF_8));System.out.println("生产者发出消息: "+ message);} channel.close();}}// 消费者1publicclassReceiveLogs01{publicstaticfinalString EXCHANGE_NAME ="logs";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{Channel channel =RabbitMqUtils.getChannel();// 声明一个交换机 channel.exchangeDeclare(EXCHANGE_NAME,"fanout");// 声明一个队列 临时队列、// 生成一个临时队列,队列的名称是随机的// 当消费者断开与队列的连接的时候,队列会自动删除String queueName = channel.queueDeclare().getQueue();// 绑定交换机与队列 channel.queueBind(queueName, EXCHANGE_NAME,"");System.out.println("ReceiveLogs01等待接收消息。。。");DeliverCallback deliverCallback =(consumerTag, message)->{System.out.println("ReceiveLogs01控制台打印消息: "+newString(message.getBody(),StandardCharsets.UTF_8));}; channel.basicConsume(queueName,true, deliverCallback, consumerTag ->{});}}// 消费者2publicclassReceiveLogs02{publicstaticfinalString EXCHANGE_NAME ="logs";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{Channel channel =RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"fanout");String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME,"");System.out.println("ReceiveLogs02等待接收消息。。。");DeliverCallback deliverCallback =(consumerTag, message)->{System.out.println("ReceiveLogs02控制台打印消息: "+newString(message.getBody(),StandardCharsets.UTF_8));}; channel.basicConsume(queueName,true, deliverCallback, consumerTag ->{});}}
  3. Topic exchange(主题交换机)主题交换机(topic exchanges)中,队列通过路由键绑定到交换机上,然后,交换机根据消息里的路由值,将消息路由给一个或多个绑定队列。扇型交换机和主题交换机异同:对于扇型交换机路由键是没有意义的,只要有消息,它都发送到它绑定的所有队列上 对于主题交换机,路由规则由路由键决定,只有满足路由键的规则,消息才可以路由到对应的队列上绑定键也必须拥有同样的格式。主题交换机背后的逻辑跟直连交换机很相似 —— 一个携带着特定路由键的消息会被主题交换机投递给绑定键与之想匹配的队列。但是它的绑定键和路由键有两个特殊应用方式:- * (星号) 用来表示一个单词.- # (井号) 用来表示任意数量(零个或多个)单词。None// 生产者publicclassTopicLogs{publicstaticfinalString EXCHANGE_NAME ="topic_logs";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);Scanner systemInput =IOUtils.SYSTEM_INPUT;/* Q1--> 绑定的是 中间带 orange 带 3 个单词的字符串 (*.orange.*) Q2--> 绑定的是 最后一个单词是 rabbit 的 3 个单词 (*.*.rabbit) 第一个单词是 lazy 的多个单词 (lazy.#) */Map<String,String> bindingKeyMap =newHashMap<>(); bindingKeyMap.put("quick.orange.rabbit"," 被队列 Q1Q2 接收到"); bindingKeyMap.put("lazy.orange.elephant"," 被队列 Q1Q2 接收到"); bindingKeyMap.put("quick.orange.fox"," 被队列 Q1 接收到"); bindingKeyMap.put("lazy.brown.fox"," 被队列 Q2 接收到"); bindingKeyMap.put("lazy.pink.rabbit"," 虽然满足两个绑定但只被队列 Q2 接收一次 "); bindingKeyMap.put("quick.brown.fox"," 不匹配任何绑定不会被任何队列接收到会被丢弃"); bindingKeyMap.put("quick.orange.male.rabbit"," 是四个单词不匹配任何绑定会被丢弃"); bindingKeyMap.put("lazy.orange.male.rabbit"," 是四个单词但匹配 Q2");for(Map.Entry<String,String> bindingKeyEntry: bindingKeyMap.entrySet()){String bindingKey = bindingKeyEntry.getKey();String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME,bindingKey,null, message.getBytes(StandardCharsets.UTF_8));System.out.println(" 生产者发出消息"+ message);} channel.close();}}// Q1publicclassReceiveLogsTopic1{publicstaticfinalString EXCHANGE_NAME ="topic_logs";publicstaticfinalString QUEUE_NAME ="Q1";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();// 声明一个交换机 channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);// 声明一个队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 绑定 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"*.orange.*"); channel.basicConsume(QUEUE_NAME,true,((consumerTag, message)->{System.out.println("Q1: "+newString(message.getBody())+ message.getEnvelope().getRoutingKey());}), consumerTag ->{});}}// Q2publicclassReceiveLogsTopic2{publicstaticfinalString EXCHANGE_NAME ="topic_logs";publicstaticfinalString QUEUE_NAME ="Q2";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"*.*.rabbit"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"lazy.#"); channel.basicConsume(QUEUE_NAME,true,((consumerTag, message)->{System.out.println("Q2: "+newString(message.getBody())+ message.getEnvelope().getRoutingKey());}), consumerTag ->{});}}
  4. Headers exchange(头交换机)类似主题交换机,但是头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。 此交换机有个重要参数:”x-match”当”x-match”为“any”时,消息头的任意一个值被匹配就可以满足条件 当”x-match”设置为“all”的时候,就需要消息头的所有值都匹配成功本人未编写关于此交换机的相关代码可以参考: 中间件系列七 RabbitMQ之header exchange(头交换机)用法

1.4、消息确认

publicclassConfirmMessage{publicstaticfinalint MESSAGE_COUNT =1000;/**
     * 单个确认
     * @throws Exception
     */publicstaticvoidpublishMessageIndividually()throwsException{Channel channel =RabbitMqUtils.getChannel();// 队列声明String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,true,false,false,null);// 开启发布确认
        channel.confirmSelect();// 开始时间long begin =System.currentTimeMillis();for(int i =0; i < MESSAGE_COUNT; i++){String message = i +"";
            channel.basicPublish("", queueName,null, message.getBytes());// 单个消息就马上进行发布确认boolean flag = channel.waitForConfirms();if(flag){System.out.println("消息发送成功");}}long end =System.currentTimeMillis();System.out.println("发布"+ MESSAGE_COUNT +"个消息, 耗时: "+(end-begin)+"ms");
        channel.close();}// 批量确认publicstaticvoidpublishMessageBatch()throwsException{Channel channel =RabbitMqUtils.getChannel();String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,true,false,false,null);// 开启发布确认
        channel.confirmSelect();// 批量确认消息大小int batchSize =100;// 未确认消息个数int outstandingMessageCount =0;// 开始时间long begin =System.currentTimeMillis();for(int i =0; i < MESSAGE_COUNT; i++){String message = i +"";
            channel.basicPublish("", queueName,null, message.getBytes());// 批量消息就马上进行发布确认
            outstandingMessageCount++;if(outstandingMessageCount == batchSize){boolean flag = channel.waitForConfirms();if(flag){System.out.println("消息发送成功");}
                outstandingMessageCount =0;}}// 为了确保还有剩余没有确认消息 再次确认if(outstandingMessageCount >0){
            channel.waitForConfirms();}long end =System.currentTimeMillis();System.out.println("发布"+ MESSAGE_COUNT +"个消息, 耗时: "+(end-begin)+"ms");
        channel.close();}// 异步批量确认publicstaticvoidasyncPublishMessageBatch()throwsException{Channel channel =RabbitMqUtils.getChannel();String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,true,false,false,null);// 开启发布确认
        channel.confirmSelect();/*
            存放消息队列的map
            key: 消息序号
            value: 消息
         */ConcurrentSkipListMap<Long,String> outstandingConfirms =newConcurrentSkipListMap<>();// 开始时间long begin =System.currentTimeMillis();// 消息成功ConfirmCallback ackCallback =(deliveryTag, multiple)->{if(multiple){
                outstandingConfirms.headMap(deliveryTag).clear();}else{// 删除已确认的消息
                outstandingConfirms.remove(deliveryTag);}System.out.println("确认的消息:"+ deliveryTag);};// 消息失败ConfirmCallback nackCallback =(deliveryTag, multiple)->{System.out.println("未确认的消息: "+ deliveryTag);};/*
            1、成功的消息处理
            2、失败的消息处理
         */
        channel.addConfirmListener(ackCallback, nackCallback);for(int i =0; i < MESSAGE_COUNT; i++){String message = i +"";
            channel.basicPublish("", queueName,null, message.getBytes());// 记录消息
            outstandingConfirms.put(channel.getNextPublishSeqNo(), message);}long end =System.currentTimeMillis();System.out.println("发布"+ MESSAGE_COUNT +"个消息, 耗时: "+(end-begin)+"ms");
        channel.close();}publicstaticvoidmain(String[] args)throwsException{// 单个发布确认//        publishMessageIndividually(); // 发布1000个消息, 耗时: 29707ms// 批量发布确认//        publishMessageBatch(); // 发布1000个消息, 耗时: 567ms// 异步发布确认asyncPublishMessageBatch();// 发布1000个消息, 耗时: 101ms}}

1.5、死信队列

通过 交换机参数声明死信队列

生产者

publicclassProducer{// 普通交换机名称publicstaticfinalString NORMAL_EXCHANGE ="normal_exchange";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();for(int i =0; i <11; i++){String message ="info"+ i;
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null, message.getBytes(StandardCharsets.UTF_8));}}}

消费者1 主要用来接收消息,声明死信队列,可以看到,如果是info5就拒绝消息,然后将消息给死信队列

publicclassConsumer1{// 普通交换机名称publicstaticfinalString NORMAL_EXCHANGE ="normal_exchange";// 死信交换机名称publicstaticfinalString DEAD_EXCHANGE ="dead_exchange";// 普通队列名称publicstaticfinalString NORMAL_QUEUE ="normal_queue";// 死信队列名称publicstaticfinalString DEAD_QUEUE ="dead_queue";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();// 声明死信和普通交换机 类型为direct
        channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);// 声明普通队列Map<String,Object> arguments =newHashMap<>();// 过期时间// 正常队列设置死信交换机
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);// 设置死信RouteKey
        arguments.put("x-dead-letter-routing-key","lisi");
        channel.queueDeclare(NORMAL_QUEUE,false,false,false, arguments);// 声明死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);// 绑定普通的交换机与普通的队列
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE,"zhangsan");// 绑定死信的交换机与死信的队列
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE,"lisi");System.out.println("等待接收消息。。。");DeliverCallback deliverCallback =(consumerTag, message)->{String msg =newString(message.getBody());if(msg.equals("info5")){System.out.println("Consumer1接收消息: "+ msg +"此消息是拒绝的");
                channel.basicReject(message.getEnvelope().getDeliveryTag(),false);}else{System.out.println("Consumer1接收消息: "+ msg);
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);}};
        channel.basicConsume(NORMAL_QUEUE,false, deliverCallback, consumerTag ->{});}}

消费者2 死信队列,未声明交换机是因为在 消费者1中就已经创建了名称为 dead_queue 的交换机

publicclassConsumer2{// 死信队列名称publicstaticfinalString DEAD_QUEUE ="dead_queue";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();System.out.println("等待接收消息。。。");DeliverCallback deliverCallback =(consumerTag, message)->{System.out.println("Consumer2接收消息: "+newString(message.getBody()));};
        channel.basicConsume(DEAD_QUEUE,true, deliverCallback, consumerTag ->{});}}

结果:消费者2只收到了info5消息,其他都被 消费者1给消费

1.6、常见的场景以及常用的argments

可以参考: 官方文档

image-20230323103821822

常用的argments

rabbitmq queue_declare arguments参数注释

Map<String,Object> arguments =newHashMap<>();// 正常队列设置死信交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);// 设置死信RouteKey
arguments.put("x-dead-letter-routing-key","lisi");// 设置队列最大长度
arguments.put("x-max-length",6);// 绑定队列
channel.queueDeclare(NORMAL_QUEUE,false,false,false, arguments);

2、在spring boot中的使用

2.1、编写不同的交换机

2.1.1、 maven

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2.1.2、 yaml配置

spring:rabbitmq:host: 192.168.7.135
    port:5672username: guest
    password: guest
    virtual-host: /
    #  支持发布确认publisher-confirms:true#  支持发布返回publisher-returns:truelistener:simple:#  采用手动应答acknowledge-mode: manual
        #  当前监听容器数concurrency:1#  最大数max-concurrency:1#  是否支持重试retry:enabled:true

2.1.3、rabbit配置

@Configuration@EnableRabbitpublicclassRabbitMQConfig{@AutowiredprivateRabbitTemplate rabbitTemplate;@BeanpublicAmqpTemplateamqpTemplate(){Logger LOG =LoggerFactory.getLogger(AmqpTemplate.class);
        rabbitTemplate.setEncoding("UTF-8");
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(((message,  replyCode, replyText, exchange, routingKey)->{String correlationId = message.getMessageProperties().getCorrelationId();
            LOG.info("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);}));//开启消息确认  yml 需要配置   publisher-returns: true
        rabbitTemplate.setConfirmCallback(((correlationData, ack, cause)->{if(ack){
               LOG.info("消息发送到交换机成功,correlationId:{}",correlationData.getId());}else{
                LOG.info("消息发送到交换机失败,原因:{}",cause);}}));return rabbitTemplate;}}/**
     * 声明直连交换机 支持持久化.
     * @return the exchange
     */@Bean("directExchange")publicExchangedirectExchange(){returnExchangeBuilder.directExchange("amq.direct").durable(true).build();}@Bean("directQueue")publicQueuedirectQueue(){returnnewQueue("directQueue",true,true,true);//return QueueBuilder.durable("directQueue").build();}@BeanpublicBindingdirectBinding(@Qualifier("directQueue")Queue queue,@Qualifier("directExchange")Exchange directExchange){returnBindingBuilder.bind(queue).to(directExchange).with("direct_routingKey").noargs();}

2.1.4、 direct

配置

/**
     * 声明直连交换机 支持持久化.
     * @return the exchange
     */@Bean("directExchange")publicExchangedirectExchange(){returnExchangeBuilder.directExchange("amq.direct").durable(true).build();}@Bean("directQueue")publicQueuedirectQueue(){returnnewQueue("directQueue",true,true,true);//return QueueBuilder.durable("directQueue").build();}@BeanpublicBindingdirectBinding(@Qualifier("directQueue")Queue queue,@Qualifier("directExchange")Exchange directExchange){returnBindingBuilder.bind(queue).to(directExchange).with("direct_routingKey").noargs();}

生产者

@ComponentpublicclassDirectSender{Logger LOG =LoggerFactory.getLogger(DirectSender.class);@AutowiredprivateRabbitTemplate rabbitTemplate;publicvoidsend(int i){String date =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(newDate());String content = i+":hello!"+date;CorrelationData correlationData =newCorrelationData(UUID.randomUUID().toString());
        LOG.info("class:{},message:{}","DirectSender",content);this.rabbitTemplate.convertAndSend("amq.direct","direct_routingKey",content,correlationData);}}

消费者

@ComponentpublicclassDirectSender{Logger LOG =LoggerFactory.getLogger(DirectSender.class);@AutowiredprivateRabbitTemplate rabbitTemplate;publicvoidsend(int i){String date =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(newDate());String content = i+":hello!"+date;CorrelationData correlationData =newCorrelationData(UUID.randomUUID().toString());
        LOG.info("class:{},message:{}","DirectSender",content);this.rabbitTemplate.convertAndSend("amq.direct","direct_routingKey",content,correlationData);}}

2.1.5、Fanout Exchange

配置

@Bean("fanoutQueueA")publicQueuefanoutQueueA(){returnnewQueue("fanoutQueueA",true,true,true);}@Bean("fanoutQueueB")publicQueuefanoutQueueB(){returnnewQueue("fanoutQueueB",true,true,true);}@Bean("fanoutQueueC")publicQueuefanoutQueueC(){returnnewQueue("fanoutQueueC",true,true,true);}/**
     * 声明一个Fanout类型的交换器
     */@Bean("fanoutExchange")publicFanoutExchangefanoutExchange(){returnnewFanoutExchange("fanoutExchange");}@BeanpublicBindingfanoutABinding(@Qualifier("fanoutQueueA")Queue queue,FanoutExchange fanoutExchange){returnBindingBuilder.bind(queue).to(fanoutExchange);}@BeanpublicBindingfanoutBBinding(@Qualifier("fanoutQueueB")Queue queue,FanoutExchange fanoutExchange){returnBindingBuilder.bind(queue).to(fanoutExchange);}@BeanpublicBindingfanoutCBinding(@Qualifier("fanoutQueueC")Queue queue,FanoutExchange fanoutExchange){returnBindingBuilder.bind(queue).to(fanoutExchange);}

生产

@ComponentpublicclassFanoutSender{Logger LOG =LoggerFactory.getLogger(DirectSender.class);@AutowiredprivateRabbitTemplate rabbitTemplate;publicvoidsend(){String date =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(newDate());String content ="hello!"+date;CorrelationData correlationData =newCorrelationData(UUID.randomUUID().toString());
        LOG.info("class:{},message:{}","FanoutSender",content);this.rabbitTemplate.convertAndSend("amq.fanout","",content,correlationData);}}

消费

@Component@RabbitListener(queues ={"fanoutQueueA"})publicclassFanoutReceiverA{Logger LOG =LoggerFactory.getLogger(FanoutReceiverA.class);@RabbitHandlerpublicvoidprocess(String hello){
        LOG.info("AReceiver  : "+ hello +"/n");}}

2.1.6、

配置

@Bean("topicQueueA")publicQueuetopicQueueA(){returnnewQueue("topicQueueA",true,true,true);}@Bean("topicQueueB")publicQueuetopicQueueB(){returnnewQueue("topicQueueB",true,true,true);}@Bean("topicExchange")publicTopicExchangetopicExchange(){returnnewTopicExchange("topicExchange");}@BeanpublicBindingtopicABinding(@Qualifier("topicQueueA")Queue queue,TopicExchange topicExchange){returnBindingBuilder.bind(queue).to(topicExchange).with("topic.msg");}@BeanpublicBindingtopicBBinding(@Qualifier("topicQueueB")Queue queue,TopicExchange topicExchange){returnBindingBuilder.bind(queue).to(topicExchange).with("topic.#");}

消费者A

@Component@RabbitListener(queues ={"topicQueueA"})publicclassTopicReceiverA{Logger LOG =LoggerFactory.getLogger(TopicReceiverA.class);@RabbitHandlerpublicvoidreceiverMsg(String msg){
        LOG.info("class:{},message:{}","TopicReceiverA",msg);}}

消费者B

@Component@RabbitListener(queues ={"topicQueueB"})publicclassTopicReceiverB{Logger LOG =LoggerFactory.getLogger(TopicReceiverB.class);@RabbitHandlerpublicvoidreceiverMsg(String msg){
        LOG.info("class:{},message:{}","TopicReceiverB",msg);}}

生产者

@ComponentpublicclassTopicSender{Logger LOG =LoggerFactory.getLogger(DirectSender.class);@AutowiredprivateRabbitTemplate rabbitTemplate;publicvoidsend1(){String date =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(newDate());String content ="hello!"+date;CorrelationData correlationData =newCorrelationData(UUID.randomUUID().toString());
        LOG.info("class:{},message:{}","TopicSender",content);this.rabbitTemplate.convertAndSend("amq.topic","topic.msg",content,correlationData);}publicvoidsend2(){String date =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(newDate());String content ="hello!"+date;CorrelationData correlationData =newCorrelationData(UUID.randomUUID().toString());
        LOG.info("class:{},message:{}","TopicSender",content);this.rabbitTemplate.convertAndSend("amq.topic","topic.msg1",content,correlationData);}}

2.2、开始使用

具体可根据自己需要进行增删,

使用swagger是因为不编写前端页面,直接使用swagger调用接口开始生产

<!--RabbitMQ 依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--spring boot web 依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--spring boot test 依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--json 依赖 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><!-- hutools 依赖 --><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.15</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--swagger--><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.9.2</version></dependency><!--RabbitMQ 测试依赖 --><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency>

2.3、 启动类以及配置类

yaml配置

spring:rabbitmq:host: 192.168.7.135
    port:5672username: guest
    password: guest
    virtual-host: /
    #  支持发布确认publisher-confirms:true#  支持发布返回publisher-returns:truelistener:simple:#  采用手动应答acknowledge-mode: manual
        #  当前监听容器数concurrency:1#  最大数max-concurrency:1#  是否支持重试retry:enabled:true

启动类

@EnableSwagger2@SpringBootApplicationpublicclassRabbitMQStart{publicstaticvoidmain(String[] args){SpringApplication.run(RabbitMQStart.class, args);}}

swagger配置类

@ConfigurationpublicclassSwaggerConfig{@BeanpublicDocketwebApiConfig(){returnnewDocket(DocumentationType.SWAGGER_2).groupName("webApi").apiInfo(webApiInfo()).select().build();}privateApiInfowebApiInfo(){returnnewApiInfoBuilder().title("rabbitmq").description("测试").contact(newContact("xxx","xxx","xxx")).version("1.0").build();}}

rabbit直接交换机配置类

@ComponentpublicclassConfirmConfig{// 队列publicstaticfinalString CONFIRM_QUEUE_NAME ="confirm_queue";// 交换机publicstaticfinalString CONFIRM_EXCHANGE_NAME ="confirm_exchange";// routingKeypublicstaticfinalString CONFIRM_ROUTING_KEY ="key1";// 备份交换机publicstaticfinalString BACKUP_EXCHANGE_NAME ="backup_exchange";// 备份队列publicstaticfinalString BACKUP_QUEUE_NAME ="backup_queue";// 报警队列publicstaticfinalString WARNING_QUEUE_NAME ="backup_queue";@BeanpublicQueueconfirmQueue(){returnnewQueue(CONFIRM_QUEUE_NAME);}@BeanpublicDirectExchangeconfirmExchange(){returnExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).withArgument("alternate-exchange", BACKUP_QUEUE_NAME).build();}@BeanpublicBindingqueueBinding(@Qualifier("confirmQueue")Queue confirmQueue,@Qualifier("confirmExchange")DirectExchange confirmExchange){returnBindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);}@BeanpublicQueuebackupQueue(){returnQueueBuilder.durable(BACKUP_QUEUE_NAME).build();}@BeanpublicFanoutExchangebackupExchange(){returnnewFanoutExchange(BACKUP_EXCHANGE_NAME);}@BeanpublicQueuewarningQueue(){returnQueueBuilder.durable(WARNING_QUEUE_NAME).build();}@BeanpublicBindingbackupQueueBindingBackupExchange(@Qualifier("backupQueue")Queue backupQueue,@Qualifier("backupExchange")FanoutExchange backupExchange){returnBindingBuilder.bind(backupQueue).to(backupExchange);}@BeanpublicBindingwarningQueueBindingBackupExchange(@Qualifier("warningQueue")Queue warningQueue,@Qualifier("backupExchange")FanoutExchange backupExchange){returnBindingBuilder.bind(warningQueue).to(backupExchange);}}

消息回调以及回退 配置类

@Slf4j@Component@AllArgsConstructorpublicclassMyCallBackimplementsRabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{privateRabbitTemplate rabbitTemplate;// 重新设置消息回调以及回退类@PostConstructpublicvoidinit(){
        rabbitTemplate.setReturnsCallback(this);
        rabbitTemplate.setConfirmCallback(this);}/**
     * 交换机确认回调方法
     * @param correlationData   保存回调消息的ID及相关信息
     * @param ack 交换机收到消息是否成功
     * @param cause 失败的原因
     */@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){String id =ObjectUtil.isNotNull(correlationData)? correlationData.getId():"";if(ack){
            log.info("交换机已经收到ID为:{}的消息", id);}else{
            log.info("交换机还未收到ID为:{}的消息,原因是:{}", id, cause);}}// 只有不可达目的地的时候,才进行回退@OverridepublicvoidreturnedMessage(ReturnedMessage returnedMessage){
        log.error("消息:{},被交换机{}退回,退回原因:{},路由key:{}",newString(returnedMessage.getMessage().getBody()),
                returnedMessage.getExchange(), returnedMessage.getReplyText(),
                returnedMessage.getRoutingKey());}}

TTL 交换机配置类

@ConfigurationpublicclassTtlQueueConfig{// 普通交换机的名称publicstaticfinalString X_EXCHANGE ="X";// 死信交换机的名称publicstaticfinalString Y_EXCHANGE ="Y";// 普通队列的名称publicstaticfinalString QUEUE_A ="QA";publicstaticfinalString QUEUE_B ="QB";publicstaticfinalString QUEUE_C ="QC";// 死信队列的名称publicstaticfinalString QUEUE_D ="QD";@BeanpublicDirectExchangexExchange(){returnnewDirectExchange(X_EXCHANGE);}@BeanpublicDirectExchangeyExchange(){returnnewDirectExchange(Y_EXCHANGE);}@BeanpublicQueuequeueA(){Map<String,Object> arguments =newHashMap<>();// 设置死信交换机
        arguments.put("x-dead-letter-exchange", Y_EXCHANGE);// 设置死信 RoutingKey
        arguments.put("x-dead-letter-routing-key","YD");// 设置TTL 时间是ms
        arguments.put("x-message-ttl",10000);returnQueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}@BeanpublicQueuequeueB(){Map<String,Object> arguments =newHashMap<>();// 设置死信交换机
        arguments.put("x-dead-letter-exchange", Y_EXCHANGE);// 设置死信 RoutingKey
        arguments.put("x-dead-letter-routing-key","YD");// 设置TTL 时间是ms
        arguments.put("x-message-ttl",40000);returnQueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}@BeanpublicQueuequeueC(){Map<String,Object> arguments =newHashMap<>();// 设置死信交换机
        arguments.put("x-dead-letter-exchange", Y_EXCHANGE);// 设置死信 RoutingKey
        arguments.put("x-dead-letter-routing-key","YD");returnQueueBuilder.durable(QUEUE_C).withArguments(arguments).build();}@BeanpublicQueuequeueD(){returnQueueBuilder.durable(QUEUE_D).build();}@BeanpublicBindingqueueABindingX(@Qualifier("queueA")Queue queueA,@Qualifier("xExchange")Exchange xExchange){returnBindingBuilder.bind(queueA).to(xExchange).with("XA").noargs();}@BeanpublicBindingqueueBBindingX(@Qualifier("queueB")Queue queueB,@Qualifier("xExchange")Exchange xExchange){returnBindingBuilder.bind(queueB).to(xExchange).with("XB").noargs();}@BeanpublicBindingqueueCBindingX(@Qualifier("queueC")Queue queueC,@Qualifier("xExchange")Exchange xExchange){returnBindingBuilder.bind(queueC).to(xExchange).with("XC").noargs();}@BeanpublicBindingqueueDBindingY(@Qualifier("queueD")Queue queueD,@Qualifier("yExchange")Exchange yExchange){returnBindingBuilder.bind(queueD).to(yExchange).with("YD").noargs();}}

死信队列配置类:

死信队列插件安装和使用使用

RabbitMQ延迟消息:死信队列 | 延迟插件 | 二合一用法+踩坑手记+最佳使用心得

packagecom.cjj.config;importorg.springframework.amqp.core.*;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.HashMap;importjava.util.Map;@ConfigurationpublicclassDelayedQueueConfig{// 队列publicstaticfinalString DELAYED_QUEUE_NAME ="delayed.queue";// 交换机publicstaticfinalString DELAYED_EXCHANGE_NAME ="delayed.exchange";// routingKeypublicstaticfinalString DELAYED_ROUTING_KEY ="delayed.routingkey";@BeanpublicQueuedelayedQueue(){returnnewQueue(DELAYED_QUEUE_NAME);}@BeanpublicCustomExchangedelayedExchange(){Map<String,Object> arguments =newHashMap<>();
        arguments.put("x-delayed-type","direct");returnnewCustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false, arguments);}@BeanpublicBindingdelayedQueueBindingDelayedExchange(@Qualifier("delayedQueue")Queue delayedQueue,@Qualifier("delayedExchange")CustomExchange delayedExchange){returnBindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}}

2.4、消费类

直接交换机消费

@Slf4j@ComponentpublicclassConsumer{@RabbitListener(queues ={ConfirmConfig.CONFIRM_QUEUE_NAME})publicvoidreceiveConfirmMessage(Message message){
        log.info("当前时间{} 接收到消息:{}",newDate().toString(),newString(message.getBody()));}}

死信交换机消费

@Slf4j@ComponentpublicclassDeadLetterQueueConsumer{@RabbitListener(queues ="QD")publicvoidreceiveD(Message message,Channel channel){String msg =newString(message.getBody());
        log.info("当前时间{}。 收到死信队列的消息:{}",newDate().toString(), msg);}}

延时交换机消费

@Slf4j@ComponentpublicclassDelayQueueConsumer{@RabbitListener(queues =DelayedQueueConfig.DELAYED_QUEUE_NAME)publicvoidreceiveDelayQueue(Message message){String msg =newString(message.getBody());
        log.info("当前时间:{},收到延时队列的消息:{}",newDate().toString(), msg);}}

2.5、直接交换机以及回退配置类测试

@Slf4j@RestController@AllArgsConstructor@RequestMapping("/confirm")publicclassProductController{privateRabbitTemplate rabbitTemplate;@GetMapping("/sendMsg/{message}")publicvoidsendMessage(@PathVariableString message){CorrelationData correlationData =newCorrelationData("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY, message, correlationData);
        log.info("发送消息内容: {}", message);
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY+"1", message, correlationData);
        log.info("发送消息内容1: {}", message);}}

image-20230323105645598

可以看到,当找不到交换机时,就会调用回退函数,找到了交换机就被正常消费

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-k3wbUtGv-1679553553523)(null)]

2.6、死信交换机以及动态配置测试

@Slf4j@RestController@RequestMapping("/ttl")@AllArgsConstructorpublicclassSendMsgController{privateRabbitTemplate rabbitTemplate;@GetMapping("/sendMsg/{message}")publicvoidsendMsg(@PathVariableString message){
        log.info("当前时间:{},发送一条消息给两个TTL队列:{}",newDate().toString(), message);
        rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列:"+message);
        rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列:"+message);}}

image-20230323110429644

可以看到,ttl10s的比ttl40s的要30s接收到, 之所以是被死信队列接收,是因为设置的 ttl 分别是10s 和 40s,所以刚好被 死信队列接收

动态配置过期时长和延迟时长

// 动态设置消息过期时长@GetMapping("/sendExpiration/{message}/{ttlTime}")publicvoidsendMsg(@PathVariableString message,@PathVariableString ttlTime){
        log.info("当前时间:{},发送一条时间{}毫秒TTL消息给队列QC:{}",newDate().toString(), ttlTime, message);
        rabbitTemplate.convertAndSend("X","XC", message, msg ->{
            msg.getMessageProperties().setExpiration(ttlTime);return msg;});}// 动态设置消息的延迟时长@GetMapping("/sendDelayMsg/{message}/{delayTime}")publicvoidsendMsg(@PathVariableString message,@PathVariableInteger delayTime){
        log.info("当前时间:{},发送一条时长{}毫秒TTL消息给队列delayed.queue:{}",newDate().toString(), delayTime, message);
        rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,DelayedQueueConfig.DELAYED_QUEUE_NAME, message, msg ->{// 延迟时长
            msg.getMessageProperties().setDelay(delayTime);return msg;});}

image-20230323112917219

参考文档

参考链接: 官方文档

参考链接: 中间件系列三 RabbitMQ之交换机的四种类型和属性

参考链接: RabbitMQ四种交换机类型

参考链接: rabbitmq的常用方法

参考链接: 中间件系列七 RabbitMQ之header exchange(头交换机)用法

参考链接: rabbitmq queue_declare arguments参数注释

参考链接: RabbitMQ延迟消息:死信队列 | 延迟插件 | 二合一用法+踩坑手记+最佳使用心得

参考链接:SpringBoot系列之RabbitMQ使用实用教程

参考视频

参考视频: 尚硅谷视频

标签: rabbitmq java

本文转载自: https://blog.csdn.net/qq_49341576/article/details/129730584
版权归原作者 k_cldh 所有, 如有侵权,请联系我们删除。

“Rabbitmq在java中的使用”的评论:

还没有评论