0


rabbitmq

用途

  • 流量削峰

最大处理量如果是一秒一万条订单,一秒钟来了两万条,可以先存在消息队列里面,按照能力去消费处理

  • 应用解耦

下单后,需要去调用很多其他系统,使用我们的发布订阅,让需要接受这条消息的服务监听这个queue

  • 异步处理

在我们一些需要异步调用的场景中,回调

核心概念

生产者
交换机(需要重点理解)接受生产者的消息,并按照规则推到队列里面,这些规则的配置可以实现不同场景的需求
队列
消费者

安装

docker

docker run -d-p15672:15672  -p5672:5672  -eRABBITMQ_DEFAULT_USER=admin -eRABBITMQ_DEFAULT_PASS=admin --name rabbitmq --hostname=rabbitmqhostone  rabbitmq:management

3.8.8 https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.8.8
22.3 https://www.erlang-solutions.com/downloads/

# 安装erlangrpm-ivh esl-erlang_22.3.1-1_centos_7_amd64.rpm

warning: esl-erlang_22.3.1-1_centos_7_amd64.rpm: Header V4 RSA/SHA1 Signature, key ID a14f4fca: NOKEY
error: Failed dependencies:
执行以下命令:

yum install epel-release
yum install unixODBC unixODBC-devel wxBase wxGTK SDL wxGTK-gl

yum install socat -y#安装RabbitMQrpm-ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm

#添加开机启动 RabbitMQ 服务chkconfig rabbitmq-server on

#启动服务
/sbin/service rabbitmq-server start

#查看服务状态
/sbin/service rabbitmq-server status

#停止服务(选择执行)
/sbin/service rabbitmq-server stop

#开启 web 管理插件,rabbitmq 默认不开启
rabbitmq-plugins enable rabbitmq_management

# 现在登录如果使用ip是无法登录的# 添加配置文件,去掉 ip 限制cd /etc/rabbitmq

vim rabbitmq-env.conf
#  Specifies new style config file locationCONFIG_FILE=/etc/rabbitmq/rabbitmq.conf

vim rabbitmq.conf

loopback_users = none

/sbin/service rabbitmq-server restart

#创建账号
rabbitmqctl add_user admin 123#设置用户角色
rabbitmqctl set_user_tags admin administrator

#设置用户权限# set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
rabbitmqctl set_permissions -p"/" admin ".*"".*"".*"#户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限#当前用户和角色
rabbitmqctl list_users  

# 关闭防火墙# 查看防火墙状态: 
systemctl status firewalld.service

# 关闭防火墙
systemctl stop firewalld.service

# 开机禁用防火墙
systemctl disable firewalld.service

hello world

还是国际惯例,咱们来一个 hello world,实现的功能也很简单,创建一个生产者,发送一条 hello world 的消息,再创建一个 消费者,消费这条消息,并在控制台打印

我们创建一个 maven 的简单项目,后面再去整合 SpringBoot, 只需要引入两个依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.6</version></dependency>

创建一个生产者

/**
 * 生产者:发消息
 */publicclassProducer{// 队列名称publicstaticfinalStringQUEUE_NAME="hello";// 发消息publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{// 创建一个连接工厂ConnectionFactory factory =newConnectionFactory();// 工厂 IP 连接 RabbitMQ 的队列
        factory.setHost("172.16.0.28");// 用户名
        factory.setUsername("admin");// 密码
        factory.setPassword("123");// 创建连接Connection connection = factory.newConnection();// 获取信道Channel channel =  connection.createChannel();/**
         * 生成一个队列
         * 1.队列名称
         * 2.队列里面的消息是否持久化(默认 false,内存)
         * 3.该队列是否值供一个消费者进行消费,是否进行消费共享, true 可以多个消费者消费
         * 4.是否自动删除  最后一个消费者断开连接后 该队列是否自动删除 false 不自动删除
         * 5.其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 发消息String message ="hello world";/**
         * 发送一个消费
         * 1.发送到哪个交换机
         * 2.路由的 key 值是哪个,本次是队列的名称
         * 3.其他参数信息
         * 4.发送消息的消息体
         */
        channel.basicPublish("",QUEUE_NAME,null, message.getBytes());System.out.println("消息发送完毕");}}

消费者

/**
 * 消费者
 */publicclassConsumer{// 队列名称publicstaticfinalStringQUEUE_NAME="hello";// 接收消息publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{// 创建一个连接工厂ConnectionFactory factory =newConnectionFactory();// 工厂 IP 连接 RabbitMQ 的队列
        factory.setHost("172.16.0.28");// 用户名
        factory.setUsername("admin");// 密码
        factory.setPassword("123");// 创建连接Connection connection = factory.newConnection();// 获取信道Channel channel =  connection.createChannel();// 声明 接收消息DeliverCallback deliverCallback =(consumerTag, message)->{System.out.println(newString(message.getBody()));};// 声明 取消消息的回调CancelCallback cancelCallback =(consumerTag)->{System.out.println("消息 消费被中断");};/**
         * 消费者消费消息:
         *  1。 消费哪个队列
         *  2. 消费成功后是否要自动应答,true 代表自动应答, false 代表手动应答
         *  3。消费者未成功消费的回调
         *  4。消费者取消消费的回调
         */
        channel.basicConsume(QUEUE_NAME,true, deliverCallback, cancelCallback);}}

我们启动下


我们来简单梳理下,在生产者中我们主要做的是,定义一个 队列,并往这个队列中发送消息,消费者中则是指定监听对应的 queue

消息应答

消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了

  • 自动应答
  • 手动应答 - Channel.basicAck(用于肯定确认) RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了 - 是否批量应答

批量应答的理解,不建议使用,可能会应答没有处理完的消息

自动入队

发生点在工作线程
没有 ack 的消息会被重新放回队列被别的消费者消费

文字说明,我们启动两个消费者,消费者c1,c2分别接收消息 m1, m2, 在c1 ack之前把c1关掉,这时m1会被c2重新消费

/**
 * 消息在手动应答时是不丢失,放回队列中重新消费
 */publicclassTask02{// 队列名称publicstaticfinalStringTASK_QUEUE_NAME="ack_queue";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();boolean durable =true;// 声明队列
        channel.queueDeclare(TASK_QUEUE_NAME, durable,false,false,null);Scanner scanner =newScanner(System.in);while(scanner.hasNext()){String message = scanner.next();
            channel.basicPublish("",TASK_QUEUE_NAME,null, message.getBytes("UTF-8"));// 解决中文编码System.out.println("生产者发送消息: "+ message);}}}publicclassWork3{// 队列名称publicstaticfinalStringTASK_QUEUE_NAME="ack_queue";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{Channel channel =RabbitMqUtils.getChannel();System.out.println("C1 等待接收消息处理时间较短");DeliverCallback deliverCallback =(consumerTag, message)->{// 沉睡 1 sSleepUtils.sleep(1);System.out.println("接收到消息: "+newString(message.getBody(),"UTF-8"));// 手动应答/**
             * 1.消息的标记 tag
             * 2.是否批量应答 false 不批量应答信道中的消息 true: 批量
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};// 采用手动应答boolean autoAck =false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback,(consumerTag)->{System.out.println(consumerTag +"消费者取消消费接口回调逻辑");});}}publicclassWork4{// 队列名称publicstaticfinalStringTASK_QUEUE_NAME="ack_queue";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{Channel channel =RabbitMqUtils.getChannel();System.out.println("C2 等待接收消息处理时间较长");DeliverCallback deliverCallback =(consumerTag, message)->{// 沉睡 30 sSleepUtils.sleep(30);System.out.println("接收到消息: "+newString(message.getBody(),"UTF-8"));// 手动应答/**
             * 1.消息的标记 tag
             * 2.是否批量应答 false 不批量应答信道中的消息 true: 批量
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};// 采用手动应答boolean autoAck =false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback,(consumerTag)->{System.out.println(consumerTag +"消费者取消消费接口回调逻辑");});}}

我们先启动 Task02 ,发送两条消息,后面分别启动 Work4, Work3,Work3 和 Work4根据轮训机制,会分别取到一条消息,然后再 ack 之前,我们把 Work4 关掉,会发现两条消息都被 Work3 消费了

持久化

这里的处理发生在 ,生产者发送消息的时候
需要分别设置队列和消息的持久化


这里存在一种情况,消息在落盘之前 宕机了,消息也会丢失,后面会讲到处理方式(需要发布确认)

发布确认

这一小节来处理上一小节提出的问题,确保消息能被发布
发布确认总共有三种策略,下面我们我们分别说明,代码演示下,重点计算下每种策略所花的时间
首先我们需要开启发布确认

main 函数,下面我们分别写三个方法,分别实现 每种发布确认策略

// 批量发消息的个数publicstaticfinalintMESSAGE_COUNT=1000;publicstaticvoidmain(String[] args)throwsException{//        1. 单个确认publishMessageIndividually();// 发布 1000个单独确认消息耗时 398ms//        2. 批量确认//        publishMessageBatch(); // 发布 1000个批量确认消息耗时 69ms//        3. 异步批量确认//        publishMessageAsync(); // 发布 1000个异步确认消息耗时 33ms}
  • 单个发布确认

串行,一条消息发布确认后才可以开始下一条消息
没发送一个消息调用一次 channel.waitForConfirms();

// 单个确认publicstaticvoidpublishMessageIndividually()throwsException{Channel channel =RabbitMqUtils.getChannel();//  队列的声明String queueName =UUID.randomUUID().toString();
    channel.queueDeclare(queueName,true,false,false,null);// 开启发布确认
    channel.confirmSelect();// 开始时间long begin =System.currentTimeMillis();// 批量发消息for(int i =0; i <MESSAGE_COUNT; i++){String message = i +"";
        channel.basicPublish("", queueName,null, message.getBytes());// 单个消息马上进行发布确认boolean flag = channel.waitForConfirms();if(flag){System.out.println("消息发送成功");}}long end =System.currentTimeMillis();System.out.println("发布 "+MESSAGE_COUNT+"个单独确认消息耗时 "+(end - begin)+"ms");}
  • 批量发布确认

计算发送的消息,达到一定量之后调用一次 channel.waitForConfirms();
本质上还是同步,而且会存在某些消息没有被发布的问题,这个实现其实个人感觉有点鸡肋

// 批量发送确认publicstaticvoidpublishMessageBatch()throwsException{Channel channel =RabbitMqUtils.getChannel();//  队列的声明String queueName =UUID.randomUUID().toString();
        channel.queueDeclare(queueName,true,false,false,null);// 开启发布确认
        channel.confirmSelect();// 开始时间long begin =System.currentTimeMillis();// 批量确认大小int batchSize =100;// 未确认消息个数// 批量发消息 批量发布确认for(int i =0; i <MESSAGE_COUNT; i++){String message = i +"";
            channel.basicPublish("", queueName,null, message.getBytes());// 判断达到100条消息的时候,批量确认一次if(i % batchSize ==0){// 发布确认
                channel.waitForConfirms();}}long end =System.currentTimeMillis();System.out.println("发布 "+MESSAGE_COUNT+"个批量确认消息耗时 "+(end - begin)+"ms");}
  • 异步发布确认


这里是通过回调函数来异步确认

// 异步发布确认publicstaticvoidpublishMessageAsync()throwsException{Channel channel =RabbitMqUtils.getChannel();//  队列的声明String queueName =UUID.randomUUID().toString();
        channel.queueDeclare(queueName,true,false,false,null);// 开启发布确认
        channel.confirmSelect();/**
         * 线程安全有序的一个哈希表 适用于高并发的情况下
         * 1. 轻松的将序号和消息进行关联
         * 2. 轻松批量删除条目 只要给到序号
         * 3. 支持高并发(多线程)
         */ConcurrentSkipListMap<Long,String> outstandingConfirms =newConcurrentSkipListMap<>();// 开始时间long begin =System.currentTimeMillis();// 准备消息的监听器 监听哪些消息成功了 哪些消息失败了// 消息确认成功 回调函数ConfirmCallback ackCallback =(deliveryTag, multiple)->{// 2. 删除已经确认的消息,剩下的就是未确认的消息if(multiple){ConcurrentNavigableMap<Long,String> confirmed = outstandingConfirms.headMap(deliveryTag);
                confirmed.clear();}else{
                outstandingConfirms.remove(deliveryTag);}System.out.println("确认的消息: "+ deliveryTag);};// 消息确认失败 回调函数ConfirmCallback nackCallback =(deliveryTag, multiple)->{// 3. 打印一下未确认的消息都有哪些String message = outstandingConfirms.get(deliveryTag);System.out.println("未确认的消息是 "+ message +"未确认的消息: "+ deliveryTag);};/**
         * 1. 监听哪些消息成功了
         * 2. 监听哪些消息失败了
         */
        channel.addConfirmListener(ackCallback, nackCallback);// 异步通知// 批量发送消息for(int i =0; i <MESSAGE_COUNT; i++){String message = i +"";
            channel.basicPublish("", queueName,null, message.getBytes());// 1. 此处记录下所有要发送的消息 消息的总和
            outstandingConfirms.put(channel.getNextPublishSeqNo(), message);}// 结束时间long end =System.currentTimeMillis();System.out.println("发布 "+MESSAGE_COUNT+"个异步确认消息耗时 "+(end - begin)+"ms");}

有两个点需要说明

  1. channel.addConfirmListener(ackCallback, nackCallback); // 异步通知 这里添加回调函数
  2. ConcurrentSkipListMap 创建一个 并发集合,记录消息状态

TODO 这里可以补充下哈,但还是感谢尚硅谷老师

交换机

这一小节会介绍几种常见交换机绑定队列的方式和几种常见交换机

前面我们没有手动去指定交换机

默认会给我们提供一个无名交换机

类似的,如果我们没有给队列命名,我们采用的也就是临时队列

绑定关系则是指的,路由与队列之间的映射关系

下面我们来介绍不同类型的交换机

fanout

广播,会把接收到的消息 广播到它知道的所有队列中

/**
 * 发消息
 */publicclassEmitLog{// 交换机名称privatestaticfinalStringEXCHANGE_NAME="logs";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();

        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");Scanner scanner =newScanner(System.in);while(scanner.hasNext()){String message = scanner.next();
            channel.basicPublish(EXCHANGE_NAME,"",null, message.getBytes("UTF-8"));System.out.println("生产者发出消息: "+ message);}}}/**
 * 消息接收
 */publicclassReceiveLogs01{// 交换机名称privatestaticfinalStringEXCHANGE_NAME="logs";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();// 声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");// 声明一个队列 临时队列// 队列的名称是随机的// 当消费者断开与队列的连接的时候 队列就自动删除String queueName = channel.queueDeclare().getQueue();/**
         * 绑定交换机与队列
         */
        channel.queueBind(queueName,EXCHANGE_NAME,"");System.out.println("等待接收消息,把接收到消息打印在屏幕上......");// 接收消息// 消费者取消消息时回调接口DeliverCallback deliverCallback =(consumerTag, message)->{System.out.println("ReceiveLogs01接收到的消息:"+newString(message.getBody(),"UTF-8"));};

        channel.basicConsume(queueName,true, deliverCallback, consumerTag ->{});}}/**
 * 消息接收
 */publicclassReceiveLogs02{// 交换机名称privatestaticfinalStringEXCHANGE_NAME="logs";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();// 声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");// 声明一个队列 临时队列// 队列的名称是随机的// 当消费者断开与队列的连接的时候 队列就自动删除String queueName = channel.queueDeclare().getQueue();/**
         * 绑定交换机与队列
         */
        channel.queueBind(queueName,EXCHANGE_NAME,"");System.out.println("等待接收消息,把接收到消息打印在屏幕上......");// 接收消息// 消费者取消消息时回调接口DeliverCallback deliverCallback =(consumerTag, message)->{System.out.println("ReceiveLogs02接收到的消息:"+newString(message.getBody(),"UTF-8"));};

        channel.basicConsume(queueName,true, deliverCallback, consumerTag ->{});}}

我们可以看到,我们发送的m1和 m2,会被 两个队列全部接收

direct

发送的时候必须指定路由规则,exchange需要根据routingkey把消息发送给每一个匹配的queue
如果多个队列具有相同的 routingkey,和 fanout 的情况就会类似

publicclassDirectLog{// 交换机名称privatestaticfinalStringEXCHANGE_NAME="direct_logs";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();Scanner scanner =newScanner(System.in);while(scanner.hasNext()){String message = scanner.next();
            channel.basicPublish(EXCHANGE_NAME,"error",null, message.getBytes("UTF-8"));System.out.println("生产者发出消息: "+ message);}}}publicclassReceiveLogsDirect01{publicstaticfinalStringEXCHANGE_NAME="direct_logs";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();// 声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);// 声明一个队列
        channel.queueDeclare("console",false,false,false,null);

        channel.queueBind("console",EXCHANGE_NAME,"info");
        channel.queueBind("console",EXCHANGE_NAME,"warning");DeliverCallback deliverCallback =(consumerTag, message)->{System.out.println("ReceiveLogsDirect01接收到的消息:"+newString(message.getBody(),"UTF-8"));};

        channel.basicConsume("console",true, deliverCallback, consumerTag ->{});}}publicclassReceiveLogsDirect02{publicstaticfinalStringEXCHANGE_NAME="direct_logs";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();// 声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);// 声明一个队列
        channel.queueDeclare("disk",false,false,false,null);

        channel.queueBind("disk",EXCHANGE_NAME,"error");DeliverCallback deliverCallback =(consumerTag, message)->{System.out.println("ReceiveLogsDirect02接收到的消息:"+newString(message.getBody(),"UTF-8"));};

        channel.basicConsume("disk",true, deliverCallback, consumerTag ->{});}}

这里在测试的时候,我们需要向不同的 routingKey 发消息,对应的消息就会根据 routingKey 进入到不同的队列

topic

可以理解为 是在 direct 的基础上加上了模糊匹配的规则,模糊匹配规则有如下两条

  • *可以代替一个单词
  • #可以替代零个或多个单词

publicclassEmitLogTopic{// 交换机名称privatestaticfinalStringEXCHANGE_NAME="topic_logs";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();Scanner scanner =newScanner(System.in);/**
         * Q1-->绑定的是
         * 中间带 orange 带 3 个单词的字符串(*.orange.*)
         * Q2-->绑定的是
         * 最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)
         * 第一个单词是 lazy 的多个单词(lazy.#)
         *
         */Map<String,String> bindingKeyMap =newHashMap<>();
        bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");
        bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");
        bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到");
        bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到");

        bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");
        bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
        bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
        bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");for(Map.Entry<String,String> bindingKeyEntry :
                bindingKeyMap.entrySet()){String bindingKey =
                    bindingKeyEntry.getKey();String message = bindingKeyEntry.getValue();
            channel.basicPublish(EXCHANGE_NAME, bindingKey,null,
                    message.getBytes("UTF-8"));System.out.println("生产者发出消息"+ message);}}}/**
 * 声明主题交换机 及相关队列
 *
 * 消费者 c1
 */publicclassReceiveLogsTopic01{// 交换机名称privatestaticfinalStringEXCHANGE_NAME="topic_logs";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();// 声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);// 声明一个队列String queueName ="Q1";
        channel.queueDeclare(queueName,false,false,false,null);

        channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");DeliverCallback deliverCallback =(consumerTag, message)->{System.out.println(newString(message.getBody(),"UTF-8"));System.out.println("接收队列: "+ queueName +" 绑定键: "+ message.getEnvelope().getRoutingKey());};

        channel.basicConsume(queueName,true, deliverCallback, consumerTag ->{});}}/**
 * 声明主题交换机 及相关队列
 * <p>
 * 消费者 c1
 */publicclassReceiveLogsTopic02{// 交换机名称privatestaticfinalStringEXCHANGE_NAME="topic_logs";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();// 声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);// 声明一个队列String queueName ="Q2";
        channel.queueDeclare(queueName,false,false,false,null);

        channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.rabbit");
        channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");DeliverCallback deliverCallback =(consumerTag, message)->{System.out.println(newString(message.getBody(),"UTF-8"));System.out.println("接收队列: "+ queueName +" 绑定键: "+ message.getEnvelope().getRoutingKey());};

        channel.basicConsume(queueName,true, deliverCallback, consumerTag ->{});}}

交换机和队列的声明方式

基于注解和编程

@RabbitListener(bindings =@QueueBinding(
    value =@Queue(name ="direct.queue1"),
    exchange =@Exchange(name ="hmall.direct", type =ExchangeTypes.DIRECT),
    key ={"red","blue"}))publicvoidlistenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1的消息:【"+ msg +"】");}@RabbitListener(bindings =@QueueBinding(
    value =@Queue(name ="direct.queue2"),
    exchange =@Exchange(name ="hmall.direct", type =ExchangeTypes.DIRECT),
    key ={"red","yellow"}))publicvoidlistenDirectQueue2(String msg){System.out.println("消费者2接收到direct.queue2的消息:【"+ msg +"】");}@RabbitListener(bindings =@QueueBinding(
    value =@Queue(name ="topic.queue1"),
    exchange =@Exchange(name ="hmall.topic", type =ExchangeTypes.TOPIC),
    key ="china.#"))publicvoidlistenTopicQueue1(String msg){System.out.println("消费者1接收到topic.queue1的消息:【"+ msg +"】");}@RabbitListener(bindings =@QueueBinding(
    value =@Queue(name ="topic.queue2"),
    exchange =@Exchange(name ="hmall.topic", type =ExchangeTypes.TOPIC),
    key ="#.news"))publicvoidlistenTopicQueue2(String msg){System.out.println("消费者2接收到topic.queue2的消息:【"+ msg +"】");}

消息转换器

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version></dependency>
@BeanpublicMessageConvertermessageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter =newJackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;}

如果

spring-boot-starter-web

则无需重复引入

可靠性

生产者的可靠性

重试机制
spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled:true# 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier:1# 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts:3# 最大重试次数

阻塞重试,建议禁用
验证方式,发送消息的时候把 rabbitmq 停用

生产者消息确认机制

publisher confirm->生产者把消息成功发送给了 exchange,ack 和 nack
publisher return->exchange路由消息失败会触发
如何开启

spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型publisher-returns:true# 开启publisher return机制
@TestvoidtestPublisherConfirm(){// 1. 创建 CorrelationDataCorrelationData cd =newCorrelationData();// 2. 给 future 添加 ConfirmCallback
        cd.getFuture().addCallback(newListenableFutureCallback<CorrelationData.Confirm>(){@OverridepublicvoidonFailure(Throwable ex){// 2.1 Future 异常 基本不会出现
                log.error("send message fail", ex);}@OverridepublicvoidonSuccess(CorrelationData.Confirm result){// 2.2 Future 接收到回执的处理逻辑,参数中的 result 就是回执内容if(result.isAck()){
                    log.info("发送消息成功,收到 ack");}else{
                    log.error("发送消息失败,收到nack,reason: {}", result.getReason());}}});// 3. 发送消息
        rabbitTemplate.convertAndSend("hmall.direct","q","hello", cd);}@PostConstructpublicvoidinit(){
        rabbitTemplate.setReturnsCallback(newRabbitTemplate.ReturnsCallback(){@OverridepublicvoidreturnedMessage(ReturnedMessage returned){
                log.error("触发return callback,");
                log.info("exchange: {}", returned.getExchange());
                log.info("routingKey: {}", returned.getRoutingKey());
                log.info("message: {}", returned.getMessage());
                log.info("replyCode: {}", returned.getReplyCode());
                log.info("replyText: {}", returned.getReplyText());}});}

这个案例routingkey是匹配不到 queue 的,所有会返回 ack,然后触发 returnCallback

生产建议

不建议开启 publisher return ,最多仅仅开启 publisher confirm

mq本身的可靠性

数据持久化
  • 交换机持久化
  • 队列持久化
  • 消息持久化

如果在开启持久化的同时开启 ack,会在持久化完成后才ack,但是由于持久化是批量的,所以建议 ack 使用异步

惰性队列

直接把消息发到磁盘,而不是先到内存再到磁盘

消费者的可靠性

处理模式
spring:rabbitmq:listener:simple:acknowledge-mode: none # 不做处理

消费处理完消息后的三种回执

  • ack 成功处理 rabbitmq删除这条消息
  • nack 消息处理失败 重新投递
  • reject 消息处理失败并拒绝该消息 删除

三种处理模式

  • none 投递完以后 ack
  • manual 手动模式 手动设置 ack 或者 reject
  • auto spring amqp 帮我们做了增强,正常 ack,业务异常 nack, 消息处理或者校验异常 reject
@RabbitListener(queues ="simple.queue")publicvoidlistenSimpleQueueMessage(String msg)throwsInterruptedException{
        log.info("spring 消费者接收到消息:【"+ msg +"】");if(true){//            throw new MessageConversionException("故意的"); // rejectthrownewRuntimeException("");// 会重试}
        log.info("消息处理完成");}

测试方式,先测试 none 模式,会发现直接删掉了。再测试 auto ,分别测试

MessageConversionException

RuntimeException

,前者删掉,后者触发重试

失败重试机制

默认是重新在mq中入队出队

spring:rabbitmq:listener:simple:retry:enabled:true# 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier:1# 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts:3# 最大重试次数stateless:true# true无状态;false有状态。如果业务中包含事务,这里改为false

可配置在客户端重试
重试三次后,返回 reject 删掉了消息

失败处理策略
MessageRecovery

定义
默认是丢弃

RejectAndDontRequeueRecoverer
ImmediateRequeueMessageRecoverer

重新入队

RepublishMessageRecoverer
packagecom.itheima.consumer.config;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.DirectExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.amqp.rabbit.retry.MessageRecoverer;importorg.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;importorg.springframework.context.annotation.Bean;@Configuration@ConditionalOnProperty(name ="spring.rabbitmq.listener.simple.retry.enabled", havingValue ="true")publicclassErrorMessageConfig{@BeanpublicDirectExchangeerrorMessageExchange(){returnnewDirectExchange("error.direct");}@BeanpublicQueueerrorQueue(){returnnewQueue("error.queue",true);}@BeanpublicBindingerrorBinding(Queue errorQueue,DirectExchange errorMessageExchange){returnBindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}@BeanpublicMessageRecovererrepublishMessageRecoverer(RabbitTemplate rabbitTemplate){returnnewRepublishMessageRecoverer(rabbitTemplate,"error.direct","error");}}
业务幂等性
  1. 唯一消息 id,业务处理成功后把id保存到数据库,处理前查询判断这条消息是否处理过
@BeanpublicMessageConvertermessageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jjmc =newJackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jjmc.setCreateMessageIds(true);return jjmc;}
  1. 业务幂等

死信队列

存放没有被消费的消息的队列
概念当中比较重要的是死信的来源,有三个

  • 消息 ttl 过期
  • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  • 消息被拒(basic.reject或basic.nack) 并且 requeue=false

这三种情况后面会分别模拟,值得说一下的是第三种情况,这里可以看一下之前讲到的 消息未应答时可以重新入队,如果这里配置不入队,就可以被添加到死信队列当中

注意一个点即可,配置的是 普通队列 与 死信交换机之间的关系

/**
 * 死信队列  生产者
 */publicclassProducer{// 普通交换机的名称publicstaticfinalStringNORMAL_EXCHANGE="normal_exchange";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();// 死信消息 设置 ttl 单位是 msAMQP.BasicProperties properties =newAMQP.BasicProperties().builder().expiration("10000").build();for(int i =0; i <11; i++){String message ="info"+ i;
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan", properties, message.getBytes());}}}*** 普通队列消费者
 */publicclassConsumer01{// 普通交换机的名称publicstaticfinalStringNORMAL_EXCHANGE="normal_exchange";// 死信交换机的名称publicstaticfinalStringDEAD_EXCHANGE="dead_exchange";// 普通队列的名称publicstaticfinalStringNORMAL_QUEUE="normal_queue";// 死信队列的名称publicstaticfinalStringDEAD_QUEUE="dead_queue";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();// 声明死信和普通交换机, 类型为 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);// 声明普通队列和死信队列Map<String,Object> arguments =newHashMap<>();// 过期时间 不在这里设置 改为在生产者设置消息的 ttl // arguments.put("x-message-ttl", 10000);// 正常队列设置死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);// 设置死信 routing-key
        arguments.put("x-dead-letter-routing-key","lisi");// 设置正常队列长度的限制//        arguments.put("x-max-length", 6);

        channel.queueDeclare(NORMAL_QUEUE,false,false,false, arguments);

        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);// 绑定普通交换机与队列
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");// 绑定死信交换机与队列
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");System.out.println("等待接收消息....");DeliverCallback deliverCallback =(consumerTag, message)->{String s =newString(message.getBody(),"UTF-8");//            if (s.equals("info5")) {//                System.out.println("Consumer01接收的消息是:" + new String(message.getBody(), "UTF-8") + "此消息被拒绝");//                channel.basicReject(message.getEnvelope().getDeliveryTag(), false);//            } else {System.out.println("Consumer01接收的消息是:"+newString(message.getBody(),"UTF-8"));
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);//            }};// 开启手动应答
        channel.basicConsume(NORMAL_QUEUE,false, deliverCallback, consumerTag ->{});}}/**
 * 死信队列消费者
 */publicclassConsumer02{// 死信队列的名称publicstaticfinalStringDEAD_QUEUE="dead_queue";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();System.out.println("等待接收消息....");DeliverCallback deliverCallback =(consumerTag, message)->{System.out.println("Consumer01接收的消息是:"+newString(message.getBody(),"UTF-8"));
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};

        channel.basicConsume(DEAD_QUEUE,false, deliverCallback, consumerTag ->{});}}


我们可以看到 普通队列与死信交换机之间的关系
情况一模拟:TTL

AMQP.BasicProperties properties =newAMQP.BasicProperties().builder().expiration("10000").build();

设置发送的消息的 ttl
模拟方式很简单,先启动 c1 然后关闭,然后启动消费者


情况2 超出队列大小
我们运行一次c2 ,把死信队列里面的消息消费掉

重新开始测试,为避免干扰我们去掉消息的ttl

设置队列最大长度为6,所以按照推测,如果发送11条消息,会有5条(超出部分)进入到死信队列

注:我们这里需要删除原来的队列,因为队列的参数被修改了


管理面板中删除即可
我们再次启动 c1 然后关闭 c1再开启 p

结果符合预期
情况3:
我们首先还是排除干扰,先开启c2 消费掉死信中的消息,然后删除队列normal,再然后注释掉 队列长度的配置
模拟方式也很简单,我们把 info5 这条消息 ,basicReject 给拒绝掉,看这条消息会不会进入到我们的死信队列



延迟队列

延迟队列的应用场景是很多的,订单十分钟内未付款取消等等
延迟队列的实现很简单,其实利用前面我们说到的消息的 ttl 属性就可以实现了
这里说一下 队列设置 ttl 和消息设置 ttl 的区别

这里的整合我们用 SpringBoot
版本 2.3.8.RELEASE (大版本尽量一致)

<dependencies><!--RabbitMQ 依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.9.2</version></dependency><!--RabbitMQ 测试依赖--><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

Swagger 配置类

@Configuration@EnableSwagger2publicclassSwaggerConfig{publicDocketwebApiConfig(){returnnewDocket(DocumentationType.SWAGGER_2).groupName("webApi").apiInfo(webApiInfo()).select().build();}privateApiInfowebApiInfo(){returnnewApiInfoBuilder().title("rabbitmq 接口文档").description("本文档描述了 rabbitmq 微服务接口定义").version("1.0").contact(newContact("enjoy6288","http://atguigu.com","[email protected]")).build();}}


配置类

@ConfigurationpublicclassTtlQueueConfig{publicstaticfinalStringX_EXCHANGE="X";publicstaticfinalStringQUEUE_A="QA";publicstaticfinalStringQUEUE_B="QB";publicstaticfinalStringY_DEAD_LETTER_EXCHANGE="Y";publicstaticfinalStringDEAD_LETTER_QUEUE="QD";// 声明 xExchange@Bean("xExchange")publicDirectExchangexExchange(){returnnewDirectExchange(X_EXCHANGE);}// 声明 xExchange@Bean("yExchange")publicDirectExchangeyExchange(){returnnewDirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明队列 A ttl 为 10s 并绑定到对应的死信交换机@Bean("queueA")publicQueuequeueA(){Map<String,Object> args =newHashMap<>(3);//声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key","YD");//声明队列的 TTL
        args.put("x-message-ttl",10000);returnQueueBuilder.durable(QUEUE_A).withArguments(args).build();}// 声明队列 A 绑定 X 交换机@BeanpublicBindingqueueaBindingX(@Qualifier("queueA")Queue queueA,@Qualifier("xExchange")DirectExchange xExchange){returnBindingBuilder.bind(queueA).to(xExchange).with("XA");}//声明队列 B ttl 为 40s 并绑定到对应的死信交换机@Bean("queueB")publicQueuequeueB(){Map<String,Object> args =newHashMap<>(3);//声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key","YD");//声明队列的 TTL
        args.put("x-message-ttl",40000);returnQueueBuilder.durable(QUEUE_B).withArguments(args).build();}//声明队列 B 绑定 X 交换机@BeanpublicBindingqueuebBindingX(@Qualifier("queueB")Queue queue1B,@Qualifier("xExchange")DirectExchange xExchange){returnBindingBuilder.bind(queue1B).to(xExchange).with("XB");}//声明死信队列 QD@Bean("queueD")publicQueuequeueD(){returnnewQueue(DEAD_LETTER_QUEUE);}//声明死信队列 QD 绑定关系@BeanpublicBindingdeadLetterBindingQAD(@Qualifier("queueD")Queue queueD,@Qualifier("yExchange")DirectExchange yExchange){returnBindingBuilder.bind(queueD).to(yExchange).with("YD");}}

消费者

@Slf4j@ComponentpublicclassDeadLetterQueueConsumer{@RabbitListener(queues ="QD")publicvoidreceiveD(Message message,Channel channel)throwsException{String msg =newString(message.getBody());
        log.info("当前时间:{},收到死信队列信息{}",newDate().toString(), msg);}}

控制层 生产者

@Slf4j@RequestMapping("ttl")@RestControllerpublicclassSendMsgController{@AutowiredprivateRabbitTemplate rabbitTemplate;@GetMapping("sendMsg/{message}")publicvoidsendMsg(@PathVariableString message){
        log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}",newDate(), message);
        rabbitTemplate.convertAndSend("X","XA","消息来自 ttl 为 10S 的队列: "+ message);
        rabbitTemplate.convertAndSend("X","XB","消息来自 ttl 为 40S 的队列: "+ message);}}
GET http://localhost:8080/ttl/sendMsg/aaa

优化 队列ttl存在硬编码

创建一条新的队列QC,不在队列上配置 ttl, 在消息上配置 ttl

@ComponentpublicclassMsgTtlQueueConfig{publicstaticfinalStringY_DEAD_LETTER_EXCHANGE="Y";publicstaticfinalStringQUEUE_C="QC";//声明队列 C 死信交换机@Bean("queueC")publicQueuequeueB(){Map<String,Object> args =newHashMap<>(3);//声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key","YD");//没有声明 TTL 属性returnQueueBuilder.durable(QUEUE_C).withArguments(args).build();}//声明队列 B 绑定 X 交换机@BeanpublicBindingqueueBindingC(@Qualifier("queueC")Queue queueC,@Qualifier("xExchange")DirectExchange xExchange){returnBindingBuilder.bind(queueC).to(xExchange).with("XC");}}

生产者

@GetMapping("sendExpirationMsg/{message}/{ttlTime}")publicvoidsendMsg(@PathVariableString message,@PathVariableString ttlTime){
        rabbitTemplate.convertAndSend("X","XC", message, correlationData ->{
            correlationData.getMessageProperties().setExpiration(ttlTime);return correlationData;});
        log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}",newDate(), ttlTime, message);}
###
GET http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000

###
GET http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000

存在问题,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行
(Callback相关的不用管哈)

使用rabbitmq插件 实现延迟队列

rabbitmq_delayed_message_exchange 解压存放到 plugins 目录

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

@ConfigurationpublicclassDelayedQueueConfig{publicstaticfinalStringDELAYED_QUEUE_NAME="delayed.queue";publicstaticfinalStringDELAYED_EXCHANGE_NAME="delayed.exchange";publicstaticfinalStringDELAYED_ROUTING_KEY="delayed.routingkey";@Bean("delayedQueue")publicQueuedelayedQueue(){returnnewQueue(DELAYED_QUEUE_NAME);}//自定义交换机 我们在这里定义的是一个延迟交换机@Bean("delayedExchange")publicCustomExchangedelayedExchange(){Map<String,Object> args =newHashMap<>();//自定义交换机的类型
        args.put("x-delayed-type","direct");returnnewCustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false, args);}@BeanpublicBindingbindingDelayedQueue(@Qualifier("delayedQueue")Queue queue,@Qualifier("delayedExchange")CustomExchange
                                               delayedExchange){returnBindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}}

我们指定创建延迟交换机

@GetMapping("sendDelayMsg/{message}/{delayTime}")publicvoidsendMsg(@PathVariableString message,@PathVariableInteger delayTime){
        rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME,DELAYED_ROUTING_KEY, message,
                correlationData ->{
                    correlationData.getMessageProperties().setDelay(delayTime);return correlationData;});
        log.info(" 当 前 时 间 : {}, 发 送 一 条 延 迟 {} 毫秒的信息给队列 delayed.queue:{}",newDate(), delayTime, message);}


现在正常了

补充 win

官网下载
下载完成后不要勾选启动
先执行安装插件

rabbitmq-plugins enable rabbitmq_management
标签: rabbitmq ruby 分布式

本文转载自: https://blog.csdn.net/qq_39007838/article/details/140373155
版权归原作者 周周写不完的代码 所有, 如有侵权,请联系我们删除。

“rabbitmq”的评论:

还没有评论