0


RabbitMq在Java中的应用

SpringBoot集成RabbitMq

一、RabbitMq的用途及作用

一、削峰、异步、解耦

经常开发的人都知道、RabbitMq常用于并发、流量大的场景,因为RabbitMq属于中间件需要维护,所以一般小项目几乎不会使用。而在于大型的并发环境下,大量的流量积压到接口中,使Mysql连接分配出现不够使用的情况,此时就可以使用RabbitMq来解决。

削峰:

当流量洪峰到达接口时,可以用现实中来举例子,mq就相当于一个独木桥,mysq就相当于河对岸,使大量的人从容有序的排队过河,而不会出现所有人全部淌水过河到河对岸,大大减少MySQL的压力。

异步:

通常采用异步通知的方式,就好比我们在抢票的时候,点击提交,系统会返回一个提示正在努力抢票中,而实际上你的订单正在mq队列中排队处理,处理结果会在后续异步通知结果。

解耦:

解耦主要两方面:

1.生产消息的应用 和 消费消息的应用不是同一种语言可以解耦

2.生产消息的应用 宕机,不会影响到消费者消费消息

二、RabbitMQ介绍


市面上比较火爆的几款MQ:

ActiveMQ,RocketMQ,Kafka,RabbitMQ。

  • 语言的支持:ActiveMQ,RocketMQ只支持Java语言,Kafka可以支持多们语言,RabbitMQ支持多种语言。
  • 效率方面:ActiveMQ,RocketMQ,Kafka效率都是毫秒级别,RabbitMQ是微秒级别的。
  • 消息丢失,消息重复问题: RabbitMQ针对消息的持久化,和重复问题都有比较成熟的解决方案。
  • 学习成本:RabbitMQ非常简单。

RabbitMQ是由Rabbit公司去研发和维护的,最终是在Pivotal。

RabbitMQ严格的遵循AMQP协议,高级消息队列协议,帮助我们在进程之间传递异步消息。

消息队列之间的对比?

支持多语言:RabbitMQ,Kafaka ;ActiveMQ,RocketMQ只支持java

传输速度:RabbitMQ微秒级,其他毫秒级别

吞吐量:kafka 吞吐量和磁盘性能* 集群数量相关 次之RocketMQ

消息高可靠:每个一个都可以保证不丢失,不重复

三、RabbitMQ安装

我本机采用docker安装,比较简洁方便

第一步docker命令创建文件夹:mkdir rabbit

切换到刚刚创建的文件夹中:cd rabbit/

创建配置文件:vim docker-compose.yaml

将下面配置信息粘贴进去:

version: "3.1"
services:
  rabbitmq:
    image: daocloud.io/library/rabbitmq:management
    restart: always
    container_name: rabbitmq
    ports:
      - 5672:5672
      - 15672:15672
    volumes:
      - ./data:/var/lib/rabbitmq

按esc按键然后按下:输入wq保存并退出

启动rabbitmq:docker-compose up

游览器输入虚拟机地址+15672端口号访问RabbitMq可视化界面 默认用户名密码都是guest

四、Springboot整合RabbitMq

一 、分类
  • Publisher - 生产者:发布消息到RabbitMQ中的Exchange
  • Consumer - 消费者:监听RabbitMQ中的Queue中的消息
  • Exchange - 交换机:和生产者建立连接并接收生产者的消息
  • Queue - 队列:Exchange会将消息分发到指定的Queue,Queue和消费者进行交互
  • Routes - 路由:交换机以什么样的策略将消息发布到Queue主要模式:Simple Work Queue (简单工作队列):也就是常说的点对点模式,一条消息由一个消费者进行消费。(当有多个消费者时,默认使用轮训机制把消息分配给消费者)。 Work Queues (工作队列):也叫公平队列,能者多劳的消息队列模型。队列必须接收到来自消费者的 手动ack 才可以继续往消费者发送消息。 Publish/Subscribe (发布订阅模式):一条消息被多个消费者消费。 Routing(路由模式):有选择的接收消息。 Topics (主题模式):通过一定的规则来选择性的接收消息 RPC 模式:发布者发布消息,并且通过 RPC 方式等待结果。目前这个应该场景少,而且代码也较为复杂交换机类型:direct(直连交换机):将队列绑定到交换机,消息的 routeKey 需要与队列绑定的 routeKey 相同。 fanout (扇形交换机):不处理 routeKey ,直接把消息转发到与其绑定的所有队列中。 topic(主题交换机):根据一定的规则,根据 routeKey 把消息转发到符合规则的队列中,其中 # 用于匹配符合一个或者多个词(范围更广), * 用于匹配一个词。 headers (头部交换机):根据消息的 headers 转发消息而不是根据 routeKey 来转发消息, 其中 header 是一个 Map,也就意味着不仅可以匹配字符串类型,也可以匹配其他类型数据。 规则可以分为所有键值对匹配或者单一键值对匹配。

导入依赖

<!--rabbitmq-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

这里有两种方式整合RabbitMq

第一种采用其本身的框架 获取连接

最简模式
package com.wwy.config;
​
/**
 * @author 王伟羽
 * @date 2024/3/13 9:39
 */
​
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
​
import java.io.IOException;
import java.util.concurrent.TimeoutException;
​
/**
 * 配置获取RabbitMq的静态方法
 */
public class RabbitMqUtils {
​
​
    public static Connection getConnection() {
        ConnectionFactory factory = new ConnectionFactory();   //创建连接工厂
        //设置相关属性
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        factory.setHost("192.168.60.139");
        factory.setPort(5672);
        try {
            //获取连接
            Connection conn = factory.newConnection();
            return conn;
        } catch (IOException e) {
            e.printStackTrace();
            return null;
        } catch (TimeoutException e) {
            e.printStackTrace();
            return null;
        }
​
    }
}

创建生产者

package com.wwy.producter;
​
​
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wwy.config.RabbitMqUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
​
import java.io.IOException;
import java.util.Objects;
​
/**
 * @author 王伟羽
 * @date 2024/3/13 9:52
 */
​
@RestController
@RequestMapping(value = "/test")
public class producer {
​
    private final static String QUERE_NAME = "quere_name";
​
    @GetMapping(value = "/sendMessage")
    public String sendMessage(String message) {
        System.out.println(message);
        Connection connection = RabbitMqUtils.getConnection();
        if (Objects.nonNull(connection)) {
            try {
                Channel channel = connection.createChannel();
                // 参数1:指定exchange,使用""。   最简模式(helloword) 使用默认交换机
                // 参数2:指定路由的规则,
                //       使用具体的队列名称。
                //      参数2可以是队列名   也可以是路由规则
​
                // 参数3:指定传递的消息所携带的properties,使用null。
                // 参数4:指定发布的具体消息,byte[]类型
                channel.basicPublish("", QUERE_NAME, null, "马上下课".getBytes("utf-8"));
                return "发送消息成功!";
            } catch (IOException e) {
                e.printStackTrace();
                return "发送消息失败!";
            }
        }
        return "mq初始化失败!";
    }
​
}

消费者消费消息

package com.wwy.consumer;
​
import com.rabbitmq.client.*;
import com.wwy.config.RabbitMqUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
​
import java.io.IOException;
import java.util.Objects;
​
/**
 * @author 王伟羽
 * @date 2024/3/13 10:33
 */
@Service
public class ConsumerTest {
​
    private final static String QUERE_NAME = "quere_name";
    @Bean
    public void consumeMessage() {
        Connection connection = RabbitMqUtils.getConnection();
        if(Objects.nonNull(connection)){
            try {
                Channel channel = connection.createChannel();
                channel.queueDeclare(QUERE_NAME,true,false,false,null);
                // 第二步创建消费者
                Consumer consumer  = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
​
                        // byte[] body 就是消费者得到的数据
​
                        System.out.println("消费者 得到消息body = " + new String(body,"utf-8"));
​
                    }
                };
                channel.basicConsume(QUERE_NAME,true,consumer);
​
                // 让当前程序卡在 这里
                System.in.read();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

消费者获取消息的另一种方式(官网)

package com.wwy.consumer;

import com.rabbitmq.client.*;
import com.wwy.config.RabbitMqUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import java.io.IOException;
import java.util.Objects;

/**
 * @author 王伟羽
 * @date 2024/3/13 10:33
 */
@Service
public class ConsumerTest {

    private final static String QUERE_NAME = "quere_name";
    @Bean
    public void consumeMessage() {
        Connection connection = RabbitMqUtils.getConnection();
        if(Objects.nonNull(connection)){
            try {
                Channel channel = connection.createChannel();
                channel.queueDeclare(QUERE_NAME,true,false,false,null);
//                // 第二步创建消费者
//                Consumer consumer  = new DefaultConsumer(channel){
//                    @Override
//                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//
//                        // byte[] body 就是消费者得到的数据
//
//                        System.out.println("消费者 得到消息body = " + new String(body,"utf-8"));
//
//                    }
//                };
//                channel.basicConsume(QUERE_NAME,true,consumer);
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println(" [x] Received '" + message + "'");
                };
                channel.basicConsume(QUERE_NAME, true, deliverCallback, consumerTag -> { });
                // 让当前程序卡在 这里
                System.in.read();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

启动程序访问接口

生产者生产成功

消费者收到生产者的消息

使用封装好的RabbitTemplate进行操作,比较方便快捷

第一步配置信息

server:
  port: 8083
spring:
  rabbitmq:         # 单机版配置
    host: 192.168.60.139
    port: 5672
    username: guest         #账户名密码默认都是guest
    password: guest
    publisher-confirm-type: simple
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual

生产者:

package com.wwy.producter;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wwy.config.RabbitMqUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * @author 王伟羽
 * @date 2024/3/13 9:52
 */

@RestController
@RequestMapping(value = "/test")
public class producer {

    private final static String QUERE_NAME = "quere_name";

    @Resource
    private RabbitTemplate rabbitTemplate;

    @GetMapping(value = "/sendMessage")
    public String sendMessage(String message) {
        System.out.println(message);
        rabbitTemplate.convertAndSend(QUERE_NAME,message);
        return "发送成功!";
    }

}

消费者:

package com.wwy.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author 王伟羽
 * @date 2024/3/13 10:33
 */
@Component
public class ConsumerTest {

    private final static String QUERE_NAME = "quere_name";

   @RabbitListener(queues = QUERE_NAME)
   public void handleMessage(String msg) {
       System.out.println("listener 收到消息4 " + msg);
   }
}

运行

工作队列模式

生产者:

package com.wwy.producter;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import com.wwy.config.RabbitMqUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.util.CollectionUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.Objects;

/**
 * @author 王伟羽
 * @date 2024/3/13 13:53
 */
@RestController
@RequestMapping(value = "/workQueues")
public class WorkQueuesProducer {

    private final static String WORK_QUEUES = "work_queues";

    @Resource
    private RabbitTemplate rabbitTemplate;

    //第一种
    @GetMapping(value = "/workQueuesSendMessage")
    public String workQueuesSendMessage(String message) {
        System.out.println("接收消息");
        //第一种生产消息方法
        //获取连接
        Connection connection = RabbitMqUtils.getConnection();
        if (Objects.nonNull(connection)) {
            try {
                Channel channel = connection.createChannel();
                //发送消息
                for (int i = 0; i < 10; i++) {
                    channel.basicPublish("", WORK_QUEUES, null, (message + i).getBytes("utf-8"));
                }
                return "发送成功!";
            } catch (IOException e) {
                e.printStackTrace();
                return "发送失败!";
            }
        }
        return "初始mq失败!";
    }

//第二种
    @GetMapping(value = "/workQueuesSendMessage01")
    public String workQueuesSendMessage01(String message) {
        System.out.println("接收消息");
        //发送消息
        for (int i = 0; i < 10; i++) {
            System.out.println(i);
            rabbitTemplate.convertAndSend(WORK_QUEUES, message);
        }
        return "发送成功!";
    }

}

消费者消费消息

第一种:

package com.wwy.consumer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.wwy.config.RabbitMqUtils;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.Objects;

/**
 * @author 王伟羽
 * @date 2024/3/13 14:13
 */
@Service
public class WorkQueuesConsumerOne {
    private final static String WORK_QUEUES = "work_queues";

    @Bean
    public void getMessageInfoOne() {
        Connection connection = RabbitMqUtils.getConnection();
        if (Objects.nonNull(connection)) {
            try {
                Channel channel = connection.createChannel();
                channel.queueDeclare(WORK_QUEUES, true, false, false, null);
                //设置每次消费消息的数量
                channel.basicQos(1);
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println(" [x] 队列1消息内容 '" + message + "'");
                    System.out.println("队列1获取到消息");
                    //手动ACK
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                };
                channel.basicConsume(WORK_QUEUES, false, deliverCallback, consumerTag -> {
                    System.out.println("队列1消息消费被中断");
                });
            } catch (IOException e) {
                e.printStackTrace();
            }

        }
    }

    @Bean
    public void getMessageInfoTwo() {
        Connection connection = RabbitMqUtils.getConnection();
        if (Objects.nonNull(connection)) {
            try {
                Channel channel = connection.createChannel();
                channel.queueDeclare(WORK_QUEUES, true, false, false, null);

                //设置每次消费消息的数量
                channel.basicQos(1);
                System.out.println("队列2获取到消息");
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println(" [x] 队列2消息内容 '" + message + "'");
                    //手动ACK
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                };
                channel.basicConsume(WORK_QUEUES, false, deliverCallback, consumerTag -> {
                    System.out.println("队列2消息消费被中断");
                });
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}

第二种注解方式:

package com.wwy.consumer;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * @author 王伟羽
 * @date 2024/3/13 15:10
 */

@Component
public class WorkQueuesConsumersOne {

    public final static String WORK_QUEUES = "work_queues";

    @RabbitListener(queues = WORK_QUEUES)
    public void consumerOne(String message) {
        System.out.println("队列1收到消息"+message);
    }

    @RabbitListener(queues = WORK_QUEUES)
    public void consumerTwo(String message) {
        System.out.println("队列2收到消息"+message);
    }
}

发布/订阅模式

1、1个生产者,多个消费者

2、每一个消费者都有自己的一个队列

3、生产者没有将消息直接发送到队列,而是发送到了交换机

4、每个队列都要绑定到交换机

5、生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取的目的

X(Exchanges):交换机一方面:接收生产者发送的消息。另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。

第一种生产者方式:

package com.wwy.producter;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wwy.config.RabbitMqUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.IOException;
import java.util.Objects;

/**
 * @author 王伟羽
 * @date 2024/3/13 16:54
 */

@RestController
@RequestMapping(value = "/publishProducer")
public class PublishProducer {

    private final static String PUBLISH_PRODUCER1 = "publish_name_one";
    private final static String PUBLISH_PRODUCER2 = "publish_name_two";

    //交换机名称
    private final static String EXCHANGE_NAME = "publish_exchange";

    @GetMapping(value = "/publishSendMessage")
    public String publishSendMessage(String message) {
        Connection connection = RabbitMqUtils.getConnection();
        if (Objects.nonNull(connection)) {
            Channel channel = null;
            try {
                channel = connection.createChannel();
                // 绑定交换机
                //参数1: exchange的名称
                //参数2: 指定exchange的类型
                //          FANOUT - pubsub ,    发布订阅
                //          DIRECT - Routing ,   路由模式
                //          TOPIC - Topics      topic
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
                //给交换机绑定对应的队列
                //将队列和交换机绑定
                //String var1, 队列名
                //String var2, 交换机名
                //String var3, 对应绑定队列 路由规则  "" 没有规则所有的队列消息一样
                channel.queueBind(PUBLISH_PRODUCER1, EXCHANGE_NAME, "");
                channel.queueBind(PUBLISH_PRODUCER2, EXCHANGE_NAME, "");
                // 参数1:指定exchange,使用""。   最简模式(helloword) 使用默认交换机
                // 参数2:指定路由的规则,
                //       使用具体的队列名称。
                //      参数2可以是队列名   也可以是路由规则

                // 参数3:指定传递的消息所携带的properties,使用null。
                // 参数4:指定发布的具体消息,byte[]类型

                for (int i = 1; i < 11; i++) {
                    // 消息向交换机发送,没有匹配路由规则
                    channel.basicPublish(EXCHANGE_NAME, PUBLISH_PRODUCER1, null, (message + i).getBytes("utf-8"));
                }
                return "发送成功!";
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return "mq出错!";
    }
}

消费者消费信息:

package com.wwy.consumer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.wwy.config.RabbitMqUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Objects;

/**
 * @author 王伟羽
 * @date 2024/3/13 17:06
 */
@Component
public class PublishConsumerOne {

    private final static String PUBLISH_PRODUCER1 = "publish_name_one";
    private final static String PUBLISH_PRODUCER2 = "publish_name_two";

    //交换机名称
    private final static String EXCHANGE_NAME = "publish_exchange";

    @Bean
    public void publishGetInfo() {
        Connection connection = RabbitMqUtils.getConnection();
        if (Objects.nonNull(connection)) {
            try {
                Channel channel = connection.createChannel();
                //保证消费者 每次只消费一条消息
                channel.basicQos(1);

                // 第一步声明 队列
                channel.queueDeclare(PUBLISH_PRODUCER1,true,false,false,null);
                channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
                channel.queueBind(PUBLISH_PRODUCER1, EXCHANGE_NAME, "");
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println(" 队列一消费消息" + message);
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                };
                channel.basicConsume(PUBLISH_PRODUCER1, true, deliverCallback, consumerTag -> {
                });
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    @Bean
    public void publishGetInfoTwo() {
        Connection connection = RabbitMqUtils.getConnection();
        if (Objects.nonNull(connection)) {
            try {
                Channel channel = connection.createChannel();
                //保证消费者 每次只消费一条消息
                channel.basicQos(1);
                channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
                channel.queueDeclare(PUBLISH_PRODUCER2,true,false,false,null);
                channel.queueBind(PUBLISH_PRODUCER2, EXCHANGE_NAME, "");
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println(" 队列二消费消息" + message);
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                };
                channel.basicConsume(PUBLISH_PRODUCER2, true, deliverCallback, consumerTag -> {
                });
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

运行结果可知,两个消费者都能获取到信息,此种情况适合用户注册业务,一个队列接收短信发送,一个队列接收邮件发送

第二种简便方式 生产者


    @GetMapping(value = "/PublicSubscribe")
    public void PublicSubscribe() {
        rabbitTemplate.convertAndSend("publish_exchange_one", "", "发布订阅模式", new CorrelationData("我是大帅逼"));
    }

消费者:

package com.wwy.consumer;

import lombok.extern.apachecommons.CommonsLog;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.stereotype.Component;

/**
 * @author 王伟羽
 * @date 2024/3/13 17:49
 */

@Component
public class PublishConsumerTwo {

    @RabbitListener(queues = "publish_queue_one")
    public void publishOne(String message) {
        System.out.println("队列1收到的消息" + message);
    }

    @RabbitListener(queues = "publish_queue_two")
    public void publishTwo(String message) {
        System.out.println("队列2收到的消息" + message);
    }
}
主题模式

由于上述第一种方法太过于繁琐,所以主题模式只采用第二种方法,第一种在后续代码里展示

配置主题模式下的队列、交换机并将其绑定起来

package com.wwy.config;

import org.springframework.amqp.core.*;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;

/**
 * @author 王伟羽
 * @date 2024/3/14 9:29
 */
@SpringBootConfiguration
public class TopicConfiguration {

    @Bean
    public TopicExchange getTopicExchange(){
        return new TopicExchange("topic_exchange_one");
    }

    @Bean
    public Queue getTopicQueueOne(){
        return new Queue("topic_queue_one");
    }

    @Bean
    public Queue getTopicQueueTwo(){
        return new Queue("topic_queue_two");
    }

    @Bean
    public Queue getTopicQueueThree(){
        return new Queue("topic_queue_three");
    }
    //* 代表一个词
    //# 代表零个或者多个词
    @Bean
    public Binding getTopicBindingOne(){
        return BindingBuilder.bind(getTopicQueueOne()).to(getTopicExchange()).with("a.*");
    }

    @Bean
    public Binding getTopicBindingThree(){
        return BindingBuilder.bind(getTopicQueueThree()).to(getTopicExchange()).with("a.#");
    }

    @Bean
    public Binding getTopicBindingTwo(){
        return BindingBuilder.bind(getTopicQueueTwo()).to(getTopicExchange()).with("a.111");
    }
}

创建生产者:

package com.wwy.producter;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;

/**
 * @author 王伟羽
 * @date 2024/3/14 9:36
 */
@RestController
@RequestMapping(value = "/topic")
public class TopicProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @GetMapping(value = "/sendTopicMessage")
    public  String sendTopicMessage(String message){
        try {
            rabbitTemplate.convertAndSend("topic_exchange_one","a.123",message.getBytes("utf-8"));
            return "生产者发送消息成功";
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            return "发送消息失败!";
        }
    }
}

创建消费者:

package com.wwy.consumer;

import lombok.extern.apachecommons.CommonsLog;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author 王伟羽
 * @date 2024/3/14 9:45
 */

@Component
public class TopicConsumerOne {

    @RabbitListener(queues = "topic_queue_one")
    public void getTopicMessageOne(String message) {
        System.out.println("队列一收到消息:" + message);
    }

    @RabbitListener(queues = "topic_queue_two")
    public void getTopicMessageTwo(String message) {
        System.out.println("队列二收到消息:" + message);
    }

    @RabbitListener(queues = "topic_queue_three")
    public void getTopicMessageThree(String message) {
        System.out.println("队列三收到消息:" + message);
    }
}

这里有一个问题,在每次消费端重启的时候会继续消费队列里的数据,为了防止这种情况,可以消费者在消费到数据的时候进行手动ack

package com.wwy.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.apachecommons.CommonsLog;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author 王伟羽
 * @date 2024/3/14 9:45
 */

@Component
public class TopicConsumerOne {

    @RabbitListener(queues = "topic_queue_one")
    public void getTopicMessageOne(@Payload Message message, @Header(AmqpHeaders.CHANNEL) Channel channel) {
        try {
            // 获取消息内容
            String messageBody = new String(message.getBody());
            System.out.println("队列一收到消息:"+messageBody);
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            channel.basicAck(deliveryTag, false);
            System.out.println("队列一收到ack");
        } catch (Exception e) {
            // 如果处理消息时出现异常,可以拒绝消息
            Long deliveryTag = (Long) message.getMessageProperties().getDeliveryTag();
            if (!channel.isOpen()) {
                // 如果channel已经关闭,则无法执行basicNack或basicReject
                return;
            }
            try {
                channel.basicNack(deliveryTag, false, true); // 拒绝消息并重新放回队列
            } catch (IOException ex) {
                ex.printStackTrace();
            }
            // 或者可以选择 basicReject 如果不需要重新放回队列
            // channel.basicReject(deliveryTag, true); // 拒绝消息并丢弃
        }
    }

    @RabbitListener(queues = "topic_queue_two")
    public void getTopicMessageTwo(@Payload Message message, @Header(AmqpHeaders.CHANNEL) Channel channel) {
        try {
            // 获取消息内容
            String messageBody = new String(message.getBody());
            System.out.println("队列二收到消息:"+messageBody);
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            channel.basicAck(deliveryTag, false);
            System.out.println("队列二收到ack");
        } catch (Exception e) {
            // 如果处理消息时出现异常,可以拒绝消息
            Long deliveryTag = (Long) message.getMessageProperties().getDeliveryTag();
            if (!channel.isOpen()) {
                // 如果channel已经关闭,则无法执行basicNack或basicReject
                return;
            }
            try {
                channel.basicNack(deliveryTag, false, true); // 拒绝消息并重新放回队列
            } catch (IOException ex) {
                ex.printStackTrace();
            }
            // 或者可以选择 basicReject 如果不需要重新放回队列
            // channel.basicReject(deliveryTag, true); // 拒绝消息并丢弃
        }
    }

    @RabbitListener(queues = "topic_queue_three")
    public void getTopicMessageThree(@Payload Message message, @Header(AmqpHeaders.CHANNEL) Channel channel) {
        try {
            // 获取消息内容
            String messageBody = new String(message.getBody());
            System.out.println("队列三收到消息:"+messageBody);
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            channel.basicAck(deliveryTag, false);
            System.out.println("队列三收到ack");
        } catch (Exception e) {
            // 如果处理消息时出现异常,可以拒绝消息
            Long deliveryTag = (Long) message.getMessageProperties().getDeliveryTag();
            if (!channel.isOpen()) {
                // 如果channel已经关闭,则无法执行basicNack或basicReject
                return;
            }
            try {
                channel.basicNack(deliveryTag, false, true); // 拒绝消息并重新放回队列
            } catch (IOException ex) {
                ex.printStackTrace();
            }
            // 或者可以选择 basicReject 如果不需要重新放回队列
            // channel.basicReject(deliveryTag, true); // 拒绝消息并丢弃
        }
    }
}
路由模式

路由模式几乎与主题模式相同,也是通过key去发送到对应的消费者中去

配置队列,交换机并将其绑定到一起

package com.wwy.config;

import org.springframework.amqp.core.*;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;

/**
 * @author 王伟羽
 * @date 2024/3/14 10:49
 */

@SpringBootConfiguration
public class RouterConfiguration {

        @Bean
        public DirectExchange  getRouterExchange(){
            return new DirectExchange("router_exchange_one");
        }

        @Bean
        public Queue getRouterQueueOne(){
            return new Queue("router_queue_one");
        }

        @Bean
        public Queue getRouterQueueTwo(){
            return new Queue("router_queue_two");
        }

        @Bean
        public Queue getRouterQueueThree(){
            return new Queue("router_queue_three");
        }
        
        @Bean
        public Binding getRouterBindingOne(){
            return BindingBuilder.bind(getRouterQueueOne()).to(getRouterExchange()).with("aaa");
        }

        @Bean
        public Binding getRouterBindingThree(){
            return BindingBuilder.bind(getRouterQueueThree()).to(getRouterExchange()).with("bbb");
        }

        @Bean
        public Binding getRouterBindingTwo(){
            return BindingBuilder.bind(getRouterQueueTwo()).to(getRouterExchange()).with("ccc");
        }

}

生产者:

package com.wwy.producter;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;

/**
 * @author 王伟羽
 * @date 2024/3/14 10:48
 */
@RestController
@RequestMapping(value = "/router")
public class RouterProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @GetMapping(value = "/sendRouterMessage")
    public  String sendTopicMessage(String message){
        try {
            rabbitTemplate.convertAndSend("router_exchange_one","aaa",message.getBytes("utf-8"));
            return "生产者发送消息成功";
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            return "发送消息失败!";
        }
    }
}

消费者:

package com.wwy.consumer;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author 王伟羽
 * @date 2024/3/14 10:57
 */
@Component
public class RouterConsumer {

    @RabbitListener(queues = "router_queue_one")
    public void getTopicMessageOne(@Payload Message message, @Header(AmqpHeaders.CHANNEL) Channel channel) {
        try {
            // 获取消息内容
            String messageBody = new String(message.getBody());
            System.out.println("队列一收到消息:"+messageBody);
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            channel.basicAck(deliveryTag, false);
            System.out.println("队列一收到ack");
        } catch (Exception e) {
            // 如果处理消息时出现异常,可以拒绝消息
            Long deliveryTag = (Long) message.getMessageProperties().getDeliveryTag();
            if (!channel.isOpen()) {
                // 如果channel已经关闭,则无法执行basicNack或basicReject
                return;
            }
            try {
                channel.basicNack(deliveryTag, false, true); // 拒绝消息并重新放回队列
            } catch (IOException ex) {
                ex.printStackTrace();
            }
            // 或者可以选择 basicReject 如果不需要重新放回队列
            // channel.basicReject(deliveryTag, true); // 拒绝消息并丢弃
        }
    }

    @RabbitListener(queues = "router_queue_two")
    public void getTopicMessageTwo(@Payload Message message, @Header(AmqpHeaders.CHANNEL) Channel channel) {
        try {
            // 获取消息内容
            String messageBody = new String(message.getBody());
            System.out.println("队列二收到消息:"+messageBody);
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            channel.basicAck(deliveryTag, false);
            System.out.println("队列二收到ack");
        } catch (Exception e) {
            // 如果处理消息时出现异常,可以拒绝消息
            Long deliveryTag = (Long) message.getMessageProperties().getDeliveryTag();
            if (!channel.isOpen()) {
                // 如果channel已经关闭,则无法执行basicNack或basicReject
                return;
            }
            try {
                channel.basicNack(deliveryTag, false, true); // 拒绝消息并重新放回队列
            } catch (IOException ex) {
                ex.printStackTrace();
            }
            // 或者可以选择 basicReject 如果不需要重新放回队列
            // channel.basicReject(deliveryTag, true); // 拒绝消息并丢弃
        }
    }

    @RabbitListener(queues = "router_queue_three")
    public void getTopicMessageThree(@Payload Message message, @Header(AmqpHeaders.CHANNEL) Channel channel) {
        try {
            // 获取消息内容
            String messageBody = new String(message.getBody());
            System.out.println("队列三收到消息:"+messageBody);
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            channel.basicAck(deliveryTag, false);
            System.out.println("队列三收到ack");
        } catch (Exception e) {
            // 如果处理消息时出现异常,可以拒绝消息
            Long deliveryTag = (Long) message.getMessageProperties().getDeliveryTag();
            if (!channel.isOpen()) {
                // 如果channel已经关闭,则无法执行basicNack或basicReject
                return;
            }
            try {
                channel.basicNack(deliveryTag, false, true); // 拒绝消息并重新放回队列
            } catch (IOException ex) {
                ex.printStackTrace();
            }
            // 或者可以选择 basicReject 如果不需要重新放回队列
            // channel.basicReject(deliveryTag, true); // 拒绝消息并丢弃
        }
    }
}

这里生产者发送消息指定了key为aaa的,所以只有消费者一匹配并接收到消息

总结

以上就是本次测试的所以队列名,大家可以在测试的时候进入可视化界面查看消息、队列、交换机状态。本文只是简单的对RabbitMq的各种模式进行简单了解,后续的如何在项目中实现、死信队列等在下章博客分享,对于本篇如有错误的地方欢迎大家指正。

代码

wangweiyuyu/rabbitmq - 码云 - 开源中国 (gitee.com)https://gitee.com/wangweiyuyu/rabbitmq

标签: java-rabbitmq java

本文转载自: https://blog.csdn.net/weixin_45936627/article/details/136677192
版权归原作者 沐 白 所有, 如有侵权,请联系我们删除。

“RabbitMq在Java中的应用”的评论:

还没有评论