0


RabbitMQ

RabbitMQ

  1. 设置开机启动添加开机启动 RabbitMQ 服务chkconfig rabbitmq-server on``````启动服务service rabbitmq-server start 查看服务状态service rabbitmq-server status 停止服务(选择执行)service rabbitmq-server stop
  2. 下载web页面插件开启 web 管理插件rabbitmq-plugins enable rabbitmq_management页面登录网址http://192.168.6.100:15672/
  3. 添加用户 进行登录​4.添加一个新的用户创建账号rabbitmqctl add_user admin 123设置用户角色rabbitmqctl set_user_tags admin administrator设置用户权限set_permissions [-p <vhostpath>] <user> <conf> <write> <read>rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限当前用户和角色rabbitmqctl list_users``````6. 重置命令关闭应用的命令为rabbitmqctl stop_app清除的命令为rabbitmqctl reset重新启动命令为rabbitmqctl start_app

IDEA 初次发送mq消息 (生产者)

  1. 第一步,先添加依赖 <!--指定 jdk 编译版本--> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins></build> <dependencies> <!--rabbitmq 依赖客户端--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <!--操作文件流的一个依赖--> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency></dependencies>
  2. 第二步:编写代码 实现发送package com.atguigu.rabbit;​import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;​​//生产者public class RabbitTest { //设置队列名称 public static final String QUERY_NAME = "hello-rabbitMQ"; public static void main(String[] args) throws Exception{​ //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置连接地址IP 连接rabbitMQ的队列 factory.setHost("192.168.6.100"); //设置用户名称 factory.setUsername("admin"); //设置用户密码 factory.setPassword("admin"); //创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel();​ //生成一个队列(这里用默认的交换机 所以直接创建队列) /** 参数介绍 * 队列名称 * 是否持久化(硬盘),true表示是,false表示否, * 该队列是否提供多个消费者进行消费,true表示是,false表示否 * 是否自动删除:当最后一个消费者断开连接之后,该队列是否自动删除,true是,false否 * 其他参数 目前先用null */ channel.queueDeclare(QUERY_NAME,false,false,false,null); //发送消息 String message = "hello-world";​ /** 参数介绍 * 交换机名称 目前默认 为空串 * 路由的Key值是哪个,本次是队列的名称 * 其他参数信息 * 发送的消息 这里需要调用getBytes()获取他的二进制 */ channel.basicPublish("",QUERY_NAME,null,message.getBytes()); System.out.println("消息发送成功");​ }}

IDEA 初次消息接受(消费者)

  1. 第一步:创建消费类,编写代码package com.atguigu.rabbit;​import com.rabbitmq.client.*;​​//消费则public class Comsumer { //队列的名称 一定要和生产者的队列名称一致 public static final String QUERY_NAME = "hello-rabbitMQ"; public static void main(String[] args) throws Exception {​ //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置ip地址 factory.setHost("192.168.6.100"); //设置用户账号 factory.setUsername("admin"); //设置密码 factory.setPassword("admin"); //创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel();​ //消费者未成功消费的自动回调 DeliverCallback callback = (consumerTag,message)->{ System.out.println(new String(message.getBody())); }; // 消费者取消消息,消息中断的回调 CancelCallback cancelCallback = (consumerTag)->{ System.out.println("消息取消,中断"); }; //接受消息 /*** 参数介绍 * 消费哪个队列 * 消费成功之后是否自动应答,true是,false否 * 消费者未成功消费的自动回调 * 消费者取消消息,消息中断的回调 */ channel.basicConsume(QUERY_NAME,true,callback,cancelCallback); }}

轮询分发消息

  1. 在rabbitMQ当中,消费者又叫工作线程 是有多个的,但是他们每一个线程获取到一个队列消息之后,其他线程不能在获得相同的队列,防止队列被重复消费,于是采用轮询的方式, 你一个我一个。 一个生产者发送消息 由多个工作线程接受, 当其中一个接受到了 其他线程不能在得到该线程,他们去拿其他的。

消息应答机制

  1. 当消息被工作线程执行,并没有成功,但是队列又将它删除了,就会导致消息的丢失,于是采用消息应答机制,只有当工作线程执行完线程给信道一个回应,然后队列才能删除消息。
  2. 消息应答分为自动应答和手动应答- 自动应答//自动应答System.out.println("c1 等待接受消息------------");channel.basicConsume(QUERY_NAME,deliverCallback,cancelCallback); //....上面就是自动应答,当channel.basicConsume 接受到消息就会给信道一个回应,但是 后续的。。。代码出现了异常,导致消息丢失它就不管了,所以不靠谱- 手动应答手动给信道一个回答分为批量应答,和非批量应答,批量应答表示:一次性回应这个信道上所有,但是不建议 建议使用非批量 channel.basicAck(里有参数 true/false); true表示批量应答,false表示非
  3. 当其中有多个工作线程,其中一个工作线程突然宕机,它复制的消息会有其他的工作线程执行- 举例子- 创建一个生产者类package com.atguigu.rabbit.third;​import com.atguigu.rabbit.utils.ChannelUtils;import com.rabbitmq.client.Channel;​import java.nio.charset.StandardCharsets;import java.util.Scanner;​public class Work03 {​ public static final String QUERY_NAME = "task_One";​ public static void main(String[] args) throws Exception{ Channel channel = ChannelUtils.getChannel(); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next(); //创建队列​ /** 参数介绍 * 队列名称 * 是否持久化(硬盘),true表示是,false表示否, * 该队列是否提供多个消费者进行消费,true表示是,false表示否 * 是否自动删除:当最后一个消费者断开连接之后,该队列是否自动删除,true是,false否 * 其他参数 目前先用null */ channel.queueDeclare(QUERY_NAME,false,false,false,null);​ channel.basicPublish("",QUERY_NAME,null,message.getBytes()); System.out.println("发布消息成功"+message); } }}- 创建一个消费者类package com.atguigu.rabbit.third;​import com.atguigu.rabbit.utils.ChannelUtils;import com.atguigu.rabbit.utils.SleepUtils;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;​public class Work04 { public static final String QUERY_NAME = "task_One";​ public static void main(String[] args) throws Exception{ Channel channel = ChannelUtils.getChannel();​ System.out.println("c2---接收到消息,较长时间处理");​ DeliverCallback deliverCallback = (var1, var2)->{ SleepUtils.sleep(25); System.out.println("接收到消息"+new String(var2.getBody()));​ //应答 /** * 第一个参数 表示应答的是信道上的哪一个消息 * 第二个表示 非批量应答 */ channel.basicAck(var2.getEnvelope().getDeliveryTag(),false); }; // 消费者取消消息,消息中断的回调 CancelCallback cancelCallback = (consumerTag)->{ System.out.println("消息取消,中断"); }; boolean auToAck =false; channel.basicConsume(QUERY_NAME,auToAck,deliverCallback,cancelCallback); }}- 创建第二个消费者类package com.atguigu.rabbit.third;import com.atguigu.rabbit.utils.ChannelUtils;import com.atguigu.rabbit.utils.SleepUtils;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rabbitmq.client.Delivery;public class Consumer { public static final String QUERY_NAME = "task_One"; public static void main(String[] args) throws Exception{ Channel channel = ChannelUtils.getChannel(); System.out.println("c1---接收到消息,较短时间处理"); DeliverCallback deliverCallback = ( var1, var2)->{ System.out.println("接收到消息"+new String(var2.getBody(),"UTF-8")); SleepUtils.sleep(2); //应答 /** * 第一个参数 表示应答的是信道上的哪一个消息 * 第二个表示 非批量应答 */ channel.basicAck(var2.getEnvelope().getDeliveryTag(),false); }; // 消费者取消消息,消息中断的回调 CancelCallback cancelCallback = (consumerTag)->{ System.out.println("消息取消,中断"); }; boolean auToAck =false; channel.basicConsume(QUERY_NAME,auToAck,deliverCallback,cancelCallback); }}启动三个,第二个消费者有30秒沉睡时间,当它突然被宕机,它复制的消息 会立刻由第一个消费者执行。遇到一个异常 将图片上的all 改为/ 即可channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'hello-rabbitMQ' in vhost '/', class-id=60, method-id=20)

队列持久化

  1. 当rabbit重新启动,之前的队列就会消失,持久化就是即便是rabbit重新启动,这个队列依然存在修改一行代码 只需要将这里的变成true 即可 //创建队列 /** 参数介绍 * 队列名称 * 是否持久化(硬盘),true表示是,false表示否, * 该队列是否提供多个消费者进行消费,true表示是,false表示否 * 是否自动删除:当最后一个消费者断开连接之后,该队列是否自动删除,true是,false否 * 其他参数 目前先用null */ boolean durable = true; //持久化 channel.queueDeclare(QUERY_NAME,durable,false,false,null); channel.basicPublish("",QUERY_NAME,null,message.getBytes()); System.out.println("发布消息成功"+message);}可能报错因为该队列我们之前已经创建过了,他是非持久化的,现在不能变成持久化, 只能网址上删除这个队列,重新创建重新启动之后 这里会有一个大写的蓝色的D 表示已经持久化

消息持久化

  1. 要求将生产者发送的消息保存道磁盘上 在发布消息的方法上加上 MessageProperties.PERSISTENT_TEXT_PLAIN 即可channel.basicPublish("",QUERY_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

不公平分发

  1. 能者多劳, 一个工作线程可能很快就处理完一个消息, 而另外一个可能很慢,所以我们采用不公平分发,让快的处理功能多修改代码 消费者的代码 加上int pre = 1; channel.basicQos(pre); 轮询默认是0 我们修改为1 变成不公平分发// 消费者取消消息,消息中断的回调CancelCallback cancelCallback = (consumerTag)->{ System.out.println("消息取消,中断");};//设置非公平int pre = 1;channel.basicQos(pre);boolean auToAck =false;channel.basicConsume(QUERY_NAME,auToAck,deliverCallback,cancelCallback);

预取值

  1. 提前设计好 每一个工作线程处理几条消息 方法和非公平一样,只不过非公平设置为1 ,0表示公平(轮询),其他的表示这个消费者拿几个消息// 消费者取消消息,消息中断的回调CancelCallback cancelCallback = (consumerTag)->{ System.out.println("消息取消,中断");};//=1 表示非公平 =其他 表示设置这个消费者拿几个消息int pre = 2; //表示拿2个消息channel.basicQos(pre);boolean auToAck =false;channel.basicConsume(QUERY_NAME,auToAck,deliverCallback,cancelCallback);在创建一个消费者 将它// 消费者取消消息,消息中断的回调CancelCallback cancelCallback = (consumerTag)->{ System.out.println("消息取消,中断");};//=1 表示非公平 =其他 表示设置这个消费者拿几个消息int pre = 5; //表示拿5个消息channel.basicQos(pre);boolean auToAck =false;channel.basicConsume(QUERY_NAME,auToAck,deliverCallback,cancelCallback);

发布确认

  1. 生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的****消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。

单个发布

  1. 发布一个消息,确认一下, 只有当收到确认的消息之后才能继续发布其他消息 (发一个确认一个,性能慢,但是知道到底是谁出现问题)
  2. //开启发布确认channel.confirmSelect();
  3. //发一次消息确认一次boolean b = channel.waitForConfirms();if (b){ System.out.println("消息发布成功");}``````//单个发布public static void singlePublish()throws Exception{ Channel channel = ChannelUtils.getChannel(); //开启发布确认 channel.confirmSelect(); //通过uuid创建队列名称 String queryName = UUID.randomUUID().toString(); //创建队列 channel.queueDeclare(queryName,false,false,false,null); //开启时间 Long begin = System.currentTimeMillis(); for (int i = 0; i < PUBLISH_COUNT; i++) { String message = String.valueOf(i); channel.basicPublish("",queryName,null,message.getBytes()); //发一次消息确认一次 boolean b = channel.waitForConfirms(); if (b){ System.out.println("消息发布成功"); } } Long end = System.currentTimeMillis(); System.out.println("单个发布1000次的执行时间为"+ (end-begin)+"ms");}单个发布1000次的执行时间为1227ms

批量发布

  1. 发布多个消息之后,再确认(性能块,但是不知道是谁出现问题)public static void morePublish() throws Exception{ Channel channel = ChannelUtils.getChannel(); //开启发布确认 channel.confirmSelect(); //通过uuid创建队列名称 String queryName = UUID.randomUUID().toString(); //创建队列 channel.queueDeclare(queryName,false,false,false,null); //开启时间 Long begin = System.currentTimeMillis(); for (int i = 0; i < PUBLISH_COUNT; i++) { String message = String.valueOf(i); channel.basicPublish("",queryName,null,message.getBytes()); if (i%100==0){ //发一次消息确认一次 boolean b = channel.waitForConfirms(); if (b){ System.out.println("消息发布成功"); } } } Long end = System.currentTimeMillis(); System.out.println("批量发布1000次的执行时间为"+ (end-begin)+"ms");}批量发布1000次的执行时间为139ms

异步发布

  1. 异步发布, 就是生产者只管一直发送,不需要等待是否发布成功,你只管发, 在代码中 开启监听器,当监听到发布成功,发布失败都主动的反馈给信道,不管生产者的事,这样效率即高,还知道是哪里出了问题//异步发布public static void publishMessageAcnsn() throws Exception{ Channel channel = ChannelUtils.getChannel(); //开启发布确认 channel.confirmSelect(); //通过uuid创建队列名称 String queryName = UUID.randomUUID().toString(); //创建队列 channel.queueDeclare(queryName,false,false,false,null); //开启时间 Long begin = System.currentTimeMillis(); //记录那些消息发送成功 ConfirmCallback ackCallback = ( var1, var3)->{ System.out.println("确认消息" + var1); }; //记录那些消息发送失败 ConfirmCallback nackCallback = ( var1, var3)->{ System.out.println("失败消息" + var1); }; //开启监听器 记录那些发布成功,那些发布失败 channel.addConfirmListener(ackCallback,nackCallback); for (int i = 0; i < PUBLISH_COUNT; i++) { String message = String.valueOf(i); channel.basicPublish("",queryName,null,message.getBytes()); } Long end = System.currentTimeMillis(); System.out.println("异步发布1000次的执行时间为"+ (end-begin)+"ms");}异步发布1000次的执行时间为48ms

处理异步发布失败的消息

  1. 没有发布成功的消息,应该保存在一个map集合当中,跟信道传过来的map一样,k,v v是消息内容。,注意这个map需要保证高并发,多线程 所以采用concurrentSkipMap()//异步发布之后对发送失败的消息进行记录并且重新发布public static void publishMessageAgain() throws Exception{ Channel channel = ChannelUtils.getChannel(); //开启发布确认 channel.confirmSelect(); //通过uuid创建队列名称 String queryName = UUID.randomUUID().toString(); //创建队列 channel.queueDeclare(queryName,false,false,false,null); //开启时间 Long begin = System.currentTimeMillis(); //创建一个Map集合保存每一条发布的消息 //concurrentMap 支持高并发,多线程 ConcurrentSkipListMap<Long,String> concurrentMap = new ConcurrentSkipListMap<>(); //记录那些消息发送成功 ConfirmCallback ackCallback = ( var1, var3)->{ //将发布成功的消息全部删掉 //先判断是否是批量 if (var3){ ConcurrentNavigableMap<Long, String> headMap = concurrentMap.headMap(var1); headMap.clear(); }else{ concurrentMap.remove(var1); } System.out.println("确认消息" + var1); }; //记录那些消息发送失败 ConfirmCallback nackCallback = ( var1, var3)->{ System.out.println("失败消息" + var1); }; //开启监听器 记录那些发布成功,那些发布失败 channel.addConfirmListener(ackCallback,nackCallback); for (int i = 0; i < PUBLISH_COUNT; i++) { String message = String.valueOf(i); //将每次发送的保存在map当中 //channel.getNextPublishSeqNo()得到key值 concurrentMap.put(channel.getNextPublishSeqNo(),message); channel.basicPublish("",queryName,null,message.getBytes()); } Long end = System.currentTimeMillis(); System.out.println("异步发布1000次的执行时间为"+ (end-begin)+"ms");}

fanout 交换机(大喇叭)

  1. fanout发送消息,所有的队列都能接收到生产者 发布消息到logs交换机 rountingKey为空package com.atguigu.rabbit.five;import com.atguigu.rabbit.utils.ChannelUtils;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rabbitmq.client.Delivery;public class Exchange { public static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception { Channel channel = ChannelUtils.getChannel(); //设置交换机 /*** * 1 交换机名称 * 2 交换机的类型 fanout(大喇叭) */ channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); //生成一个随机队列 String queueName = channel.queueDeclare().getQueue(); //绑定交换机和队列 /** * 1 队列名称 * 2 交换机名称 * 3 rongtingKey */ channel.queueBind(queueName,EXCHANGE_NAME,""); System.out.println("等待接受消息"); DeliverCallback deliverCallback = ( var1, var2)->{ System.out.println("01-控制台打印接受到的消息" + new String(var2.getBody())); }; CancelCallback cancelCallback = (var1) ->{ }; /** * 1 队列名称 * 2 自动应答 * 3 消费成功 * 4 取消消费 */ channel.basicConsume(queueName,true,deliverCallback,cancelCallback); }}两个消费者 他们接受logs交换机的信息public class Exchange02 { public static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception { Channel channel = ChannelUtils.getChannel(); //设置交换机 /*** * 1 交换机名称 * 2 交换机的类型 fanout(大喇叭) */ channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); //生成一个随机队列 String queueName = channel.queueDeclare().getQueue(); //绑定交换机和队列 /** * 1 队列名称 * 2 交换机名称 * 3 rongtingKey */ channel.queueBind(queueName,EXCHANGE_NAME,""); System.out.println("等待接受消息"); DeliverCallback deliverCallback = (var1, var2)->{ System.out.println("02-控制台打印接受到的消息" + new String(var2.getBody())); }; CancelCallback cancelCallback = (var1) ->{ }; /** * 1 队列名称 * 2 自动应答 * 3 消费成功 * 4 取消消费 */ channel.basicConsume(queueName,true,deliverCallback,cancelCallback); }}``````public class Exchange { public static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception { Channel channel = ChannelUtils.getChannel(); //设置交换机 /*** * 1 交换机名称 * 2 交换机的类型 fanout(大喇叭) */ channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); //生成一个随机队列 String queueName = channel.queueDeclare().getQueue(); //绑定交换机和队列 /** * 1 队列名称 * 2 交换机名称 * 3 rongtingKey */ channel.queueBind(queueName,EXCHANGE_NAME,""); System.out.println("等待接受消息"); DeliverCallback deliverCallback = ( var1, var2)->{ System.out.println("01-控制台打印接受到的消息" + new String(var2.getBody())); }; CancelCallback cancelCallback = (var1) ->{ }; /** * 1 队列名称 * 2 自动应答 * 3 消费成功 * 4 取消消费 */ channel.basicConsume(queueName,true,deliverCallback,cancelCallback); }}得到结论,他们生产者发送消息到logs交换机, 且rountingkey为空,该交换机的类型是大喇叭,两个消费者都能从logs交换机得到那条消息,成功让一条消息被消费两次

direct 交换机

  1. 生产者规定的rountingKey 如果是disk ,那么只有消费者的rountingKey也是disk的消费者才能消费改生产者发布的消息
  2. public class Drict01 { public static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception { Channel channel = ChannelUtils.getChannel(); //声明交换机 指定交换机的类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //创建队列 channel.queueDeclare("console",true,false,false,null); //绑定交换机 队列 指定rountingKey的类型 channel.queueBind("console",EXCHANGE_NAME,"info"); channel.queueBind("console",EXCHANGE_NAME,"warning"); DeliverCallback deliverCallback = (var1,var2)->{ System.out.println("01---接受到消息"+new String(var2.getBody())); }; CancelCallback cancelCallback = (var1)->{ }; channel.basicConsume("console",true,deliverCallback,cancelCallback); }}
public class Drict02 {
    public static final String EXCHANGE_NAME = "logs";
    public static void main(String[] args) throws Exception {
        Channel channel = ChannelUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
            System.out.println("发送消息"+message);
        }

    }
}
public class Drict03 {
    public static final String EXCHANGE_NAME = "logs";
    public static void main(String[] args) throws Exception {
        Channel channel = ChannelUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        channel.queueDeclare("disk",true,false,false,null);
        channel.queueBind("disk",EXCHANGE_NAME,"error");

        DeliverCallback deliverCallback = (var1,var2)->{
            System.out.println("03---成功接受到消息"+new String(var2.getBody()));
        };
        CancelCallback cancelCallback = (var1)->{

        };

        channel.basicConsume("disk",true,deliverCallback,cancelCallback);
    }
}

topic 交换机

  1. 交换机的rountingKey的类型可以同英文字母来代替,每一个字母之间用.隔开 其中*代表一个字母,#代表多个字母或者0个字母- lzy.# 表示rountingKey是以lzy开头的多个字母- *.lzy. * 表示rountingKey是第第二个字母,且是lzy的package com.atguigu.rabbit.topic;​import com.atguigu.rabbit.utils.ChannelUtils;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;​import java.util.HashMap;import java.util.Map;​public class Topic02 { public static final String EXCHANGE_NAME="topic_logs";​ public static void main(String[] args)throws Exception { Channel channel = ChannelUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); channel.queueDeclare("Q2",true,false,false,null); channel.queueBind("Q2",EXCHANGE_NAME,"*.*.rabbit"); channel.queueBind("Q2",EXCHANGE_NAME,"lazy.#"); //消费消息 DeliverCallback deliverCallback = (var1, var2)->{ System.out.println("接收到消息"+new String(var2.getBody())+"队列名称为Q2,coutingKey是"+var2.getEnvelope().getRoutingKey()); }; CancelCallback cancelCallback = (var1)->{​ }; channel.basicConsume("Q2",true,deliverCallback,cancelCallback); }}``````package com.atguigu.rabbit.topic;​import com.atguigu.rabbit.utils.ChannelUtils;import com.rabbitmq.client.*;​public class Topic01 { public static final String EXCHANGE_NAME="topic_logs"; public static void main(String[] args) throws Exception {​ Channel channel = ChannelUtils.getChannel(); //指定交换机 和它的类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //创建队列 channel.queueDeclare("Q1", true, false, false, null); //绑定交换机和队列 channel.queueBind("Q1",EXCHANGE_NAME,"*.orange.*"); //消费消息 DeliverCallback deliverCallback = (var1,var2)->{ System.out.println("接收到消息"+new String(var2.getBody())+"队列名称为Q1,coutingKey是"+var2.getEnvelope().getRoutingKey()); }; CancelCallback cancelCallback = (var1)->{​ }; channel.basicConsume("Q1",true,deliverCallback,cancelCallback);​ }}``````package com.atguigu.rabbit.topic;import com.atguigu.rabbit.utils.ChannelUtils;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import java.io.IOException;import java.util.HashMap;import java.util.Map;import java.util.Scanner;public class product { public static final String EXCHANGE_NAME="topic_logs"; public static void main(String[] args) throws Exception { Channel channel = ChannelUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); Map<String,String> map = new HashMap<>(); map.put("lazy,orange,rabbit","被队列 Q1Q2 接收到"); map.put("lazy.orange.elephant","被队列 Q1Q2 接收到"); map.put("quick.orange.fox","被队列 Q1 接收到"); map.put("lazy.brown.fox","被队列 Q2 接收到"); map.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次"); map.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃"); map.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃"); map.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2"); map.forEach((k,v)->{ try { channel.basicPublish(EXCHANGE_NAME,k,null,v.getBytes()); } catch (IOException e) { e.printStackTrace(); } //System.out.println("成功发布消息"+v); }); }}

死信队列

  1. 当 一个队列中的长度慢了,新来的消息无法加入到队列,或者一个消息没有被消费者消费,又不能回到原先的队列,或者 队列中消息过期的时候就会变成死新消息,就会产生死信队列, 产生的死信队列 需要由一个新的消费者消费。- 生产者代码 在发布消息的时候 给消息设置过期时间package com.atguigu.rabbit.serven;import com.atguigu.rabbit.utils.ChannelUtils;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import java.util.Scanner;public class product { //普通交换机 public static final String NORMAL_EXCHANGE = "NORMAL_EXCHANGE"; public static void main(String[] args) throws Exception { Channel channel = ChannelUtils.getChannel(); //死信消息 构建每条的过期时间 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("100000").build(); // channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); for (int i = 0; i < 11; i++) { String message = "info" +i; //这里发布的时候加上参数 channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes()); System.out.println("发送消息"+message); } }}- 消费者C1代码 C1不仅要创建两个交换机 还要创建两个队列,以及将普通队列没有读取到的消费发送到死信交换机package com.atguigu.rabbit.serven;import com.atguigu.rabbit.utils.ChannelUtils;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;import java.util.Map;public class DeadConsumer01 { //普通交换机 public static final String NORMAL_EXCHANGE = "NORMAL_EXCHANGE"; //普通队列 public static final String NORMAL_QUEUE = "NORMAL_QUEUE"; //死信交换机 public static final String DEAD_EXCHANGE = "DEAD_EXCHANGE"; //死信队列 public static final String DEAD_QUEUE = "DEAD_QUEUE"; public static void main(String[] args) throws Exception { Channel channel = ChannelUtils.getChannel(); //声明普通交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); //声明死信交换机 channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //设置参数,当出现死信消息,传送到那个交换机 Map<String,Object> map = new HashMap<>(); //设置死信交换机 map.put("x-dead-letter-exchange",DEAD_EXCHANGE); //设置死信RoutingKey map.put("x-dead-letter-routing-key","lisi"); //声明普通队列 channel.queueDeclare(NORMAL_QUEUE,false,false,false,map); //声明死信队列 channel.queueDeclare(DEAD_QUEUE,false,false,false,null); //绑定普通交换机和队列 channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan"); //绑定死信交换机和队列 channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi"); System.out.println("等待接受消息。。。。"); DeliverCallback deliverCallback = (var1,var2)->{ System.out.println("DeadConsumer01接受到消息"+new String(var2.getBody(),"UTF-8")); }; CancelCallback callback = (var1)->{ }; channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,callback); }}- 消费者C2package com.atguigu.rabbit.serven;​import com.atguigu.rabbit.utils.ChannelUtils;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;​//死信队列public class DeadConsumer02 { //死信交换机 public static final String DEAD_EXCHANGE = "DEAD_EXCHANGE"; //死信队列 public static final String DEAD_QUEUE = "DEAD_QUEUE";​ public static void main(String[] args) throws Exception { Channel channel = ChannelUtils.getChannel();​ DeliverCallback deliverCallback = (var1, var2)->{ System.out.println("DeadConsumer02接受到消息"+new String(var2.getBody(),"UTF-8")); }; CancelCallback callback = (var1)->{​ };​ channel.basicConsume(DEAD_QUEUE,true,deliverCallback,callback); }}

设置因为队列长度满了 而产生死信队列

  1. 只需要在创建队列的时候 map多加一个属性//设置队列最大长度map.put("x-max-length",6);

拒绝某消息,让他进入死信队列

  1. 第一: 开启手动应答//手动应答, 因为自动应答不存在拒绝channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,callback);
  2. 第二:在接受消息的地方编写拒绝代码 和应答代码DeliverCallback deliverCallback = (var1,var2)->{ //拒绝接受某些消息 String msg=new String(var2.getBody(),"UTF-8"); if (msg.equals("info5")||msg.equals("info6")){ //拒绝接受 System.out.println("拒接接受该消息" + msg ); //false表示拒绝之后不放回队列 channel.basicReject(var2.getEnvelope().getDeliveryTag(),false); } System.out.println("DeadConsumer01接受到消息"+new String(var2.getBody(),"UTF-8")); channel.basicAck(var2.getEnvelope().getDeliveryTag(),false);};

延迟队列

  1. 比如股票缓解 用户在下了订单之后,提升用户30分钟之内付款,不然失效,就需要延迟队列进行设置延迟时间

整合SpringBoot

  1. 实现步骤:- 导入依赖<!--RabbitMQ 测试依赖--><dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope></dependency>- 编写配置文件 注意rabbit控制台端口为15672,但是它的默认端口是5672spring: rabbitmq: host: 192.168.6.100 port: 5672 username: admin password: admin- 编写配置类,声明交换机,队列和绑定package com.atguigu.config;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class TtlQueueConfig { //普通交换机 public static final String X_EXCHANGE = "X"; //死信交换机 public static final String Y_DEAD_EXCHANGE = "Y"; //普通队列QA public static final String QA_QUEUE = "QA"; //普通队列QB public static final String QB_QUEUE = "QB"; //死信队列QD public static final String QD_DEAD_QUEUE = "QD"; //声明普通交换机 @Bean("xExchange") public DirectExchange xExchange(){ return new DirectExchange(X_EXCHANGE); } //声明死信交换机 @Bean("yExchange") public DirectExchange yExchange(){ return new DirectExchange(Y_DEAD_EXCHANGE); } //声明普通队列 @Bean("queueA") public Queue queueA(){ Map<String,Object> map = new HashMap<>(); map.put("x-dead-letter-exchange",Y_DEAD_EXCHANGE); //设置死信的routingKey map.put("x-dead-letter-routing-key","YD"); //设置队列过期时间 map.put("x-message-ttl",10000); /*** * 1 队列名称 * 2 参数 */ return QueueBuilder.durable(QA_QUEUE).withArguments(map).build(); } //声明普通队列 @Bean("queueB") public Queue queueB(){ Map<String,Object> map = new HashMap<>(); map.put("x-dead-letter-exchange",Y_DEAD_EXCHANGE); //设置死信的routingKey map.put("x-dead-letter-routing-key","YD"); //设置队列过期时间 map.put("x-message-ttl",40000); /*** * 1 队列名称 * 2 参数 */ return QueueBuilder.durable(QB_QUEUE).withArguments(map).build(); } //声明死信队列 @Bean("queueD") public Queue queueD(){ /*** * 1 队列名称 * 2 参数 */ return QueueBuilder.durable(QD_DEAD_QUEUE).build(); } //绑定交换机 @Bean //这里括号里面的是上面声明的交换机 队列中Bean里面的值 public Binding queueAToExchangeX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueA).to(xExchange).with("XA"); } //绑定交换机 @Bean //这里括号里面的是上面声明的交换机 队列中Bean里面的值 public Binding queueBToExchangeX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueB).to(xExchange).with("XA"); } //绑定死信交换机 @Bean //这里括号里面的是上面声明的交换机 队列中Bean里面的值 public Binding queueDToExchangeY(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange){ return BindingBuilder.bind(queueD).to(yExchange).with("YD"); }}- 编写消费者(Controller类发送一个请求,实现消息的发送)package com.atguigu.controller;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.util.Date;@Slf4j@RestController@RequestMapping("/ttl")public class Controller { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg/{name}") public String sentMessage(@PathVariable("name") String name){ //程序启动 会自动将 date替换第一个{} name替换第二个{} log.info("当前时间:{}发送一条消息给ttl队列:{}"+ new Date().toString(),name); rabbitTemplate.convertAndSend("X","XA","发送消息给ttl为10s的队列" + name); rabbitTemplate.convertAndSend("X","XB","发送消息给ttl为40s的队列" + name); return name; }}- 编写消费者package com.atguigu.consumer;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.util.Date;@Component@Slf4jpublic class DeadQueueConsumer { //消费那个队列 @RabbitListener(queues= "QD") public void consumerD(Message message, Channel channel){ log.info("消费西死信队列时间:{},消费的内容是:{}"+new Date().toString(),message.getBody()); }}注意! 别导错包!

死信队列延迟的缺陷(手动输入过期时间)

  1. 我们要手动设置过期时间,岂不写死,于是写一个公共的队列QC 并且配置好(在前面的配置类中加入以下)public static final String QC_QUEUE = "QC";//声明队列QC@Bean("queueC")public Queue queueC(){ Map<String,Object> map = new HashMap<>(); map.put("x-dead-letter-exchange",Y_DEAD_EXCHANGE); map.put("x-dead-letter-routing-key","YD"); //不指明过期时间 ,在controller中指定 return QueueBuilder.durable(QC_QUEUE).withArguments(map).build();}//绑定QC到X@Beanpublic Binding bindQueueCToExchangeX(@Qualifier("queueC") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueB).to(xExchange).with("XC");}
  2. 在controller中编写代码//发送消息并且设置过期时间@GetMapping("sent_message/{oldTime}/{message}")public String sentMessage(@PathVariable("oldTime") String oldTime,@PathVariable("message") String message){ log.info("当前时间:{},过期时间{},发送一条消息给QC队列:{}",new Date().toString(),oldTime,message); rabbitTemplate.convertAndSend("X","XC","发送消息给QC的队列:" +message,msg -> { //设置过期时长 msg.getMessageProperties().setExpiration(oldTime); return msg; }); return message+oldTime;}
  3. 先发一次 30秒延迟的消息,在发一次2秒延迟的消息得到以下结果
  4. 发现,两秒延迟的消息,居然跟30秒延迟的一起被接受到 而且还在后面,总结: 延迟队列中手动设置过期时间,发布的消息个数=》2的时候,是由先后顺序的,不管你第二个几秒过期,都必须在第一个后面才能接受。

弥补延迟队列缺陷

  1. Linux下载插件 rabbitmq_delayed_message_exchange-3.8.0.ez
  2. 编写代码测试- 编写配置类 注意这里声明交换机用的是CustomExchangepackage com.atguigu.config;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class DelayedExchangeConfig { public static final String delayed_Exchange = "delayed_Exchange"; public static final String delayed_Queue = "delayed_Queue"; public static final String delayed_RoutingKey = "delayed_RoutingKey"; //声明交换机 @Bean("delayedExchange") //自定义交换机 public CustomExchange delayedExchange(){ //在交换机当中设计延迟 Map<String,Object> map =new HashMap<>(); //设置延迟类型 为直接类型 map.put("x-delayed-type","direct"); return new CustomExchange(delayed_Exchange,"x-delayed-message",true,false,map); } //声明队列 @Bean("queueH") public Queue queueH(){ return QueueBuilder.durable(delayed_Queue).build(); } //绑定 @Bean public Binding queueHToDelayed_Exchange(@Qualifier("queueH") Queue queueH, @Qualifier("delayedExchange") CustomExchange delayedExchange){ return BindingBuilder.bind(queueH).to(delayedExchange).with(delayed_RoutingKey).noargs(); }}- 编写controller 注意 这里设置过期时间要用Integer类型以及setDelay//发送消息 基于插件@GetMapping("sent/{oldTime}/{message}")public String seMessage(@PathVariable("oldTime") Integer oldTime,@PathVariable("message") String message){ log.info("当前时间:{},过期时间{},发送一条消息给延迟队列:{}",new Date().toString(),oldTime,message); rabbitTemplate.convertAndSend("delayed_Exchange","delayed_RoutingKey",message, correlationData -> { //设置过期时长 //设置过期时长 correlationData.getMessageProperties().setDelay(oldTime); return correlationData; }); return message+oldTime;}- 编写消费者package com.atguigu.consumer;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.util.Date;//消费 基于插件的消息@Slf4j@Componentpublic class DelayedConsumer { @RabbitListener(queues ="delayed_Queue") public void delayedRecive(Message message, Channel channel){ log.info("接受到消息时间:{},接受到的消息是{}",new Date().toString(),message.getBody()); }}- 启动 发送两次 一次30秒延迟 一次 3秒总结: 声明交换机要采用自定义交换机CustomExchange 过期时间的设置要使用Integer类型 和setDelay

发布确认高级

  1. 当生产者发送一个消息,如果此时交换机宕机或者队列宕机,那么发出去的消息就石沉大海了,所以我们需要将没有发布成功的消息缓存起来,重新发布
  2. 需要一个回调的方法,来判断是否发布成功 注意这里需要注入两次package com.atguigu.config;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Component@Slf4j//整个类的步骤:@Component 先实例化ConfirmCallBack @Autowired注入rabbitTemplate 第三PostConstruct 注入ConfirmCallbackpublic class ConfirmCallBack implements RabbitTemplate.ConfirmCallback { //这里是注入一个接口里面的一个接口,光写个Componet 是不行的 需要在进行注入 //注入 @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct //相当于注入 该注解在其他注入注解完成之后执行 public void init(){ rabbitTemplate.setConfirmCallback(this); } //消息发布是否成功回调 /*** * * @param correlationData 保存着发布消息的id和相关消息 * @param b 是否发布成功 是 true 否false * @param s 发布失败的原因 成功的话是null */ @Override public void confirm(CorrelationData correlationData, boolean b, String s) { if (b){ log.info("发布成功,收到id为:{}的消息",correlationData); }else{ log.info("发布失败,还未收到id为:{}的消息,失败原因是:{}",correlationData,s.getBytes()); } }}配置文件配置 只要成功发送到交换机就应答 publisher-confirm-type: correlated# NONE# 禁用发布确认模式,是默认值# ⚫ CORRELATED# 发布消息成功到交换器后会触发回调方法# ⚫ SIMPLE编写生产者//********************************************//高级发布@GetMapping("GJ/{message}")public String GJMessage(@PathVariable("message") String message){ log.info("高级发布:::当前时间:{},发送一条消息给队列:{}",new Date().toString(),message); CorrelationData correlationData = new CorrelationData("1"); rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,ConfirmConfig.CONFIRM_ROUTINGKEY,message,correlationData); return message;}编写消费者@Slf4j@Componentpublic class GJConsumer { @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE) public void consumer(Message message){ log.info("接受到的消息:{}",message.getBody()); }}当其中交换机 或者队列宕机 会打印出错误信息
  3. 上面是只要交换机成功收到消息就应答,下面队列收到消息才应答- 添加配置文件 publisher-returns: truspring: rabbitmq: host: 192.168.6.100 port: 5672 username: admin password: admin publisher-confirm-type: correlated publisher-returns: true #一旦消息不能成功交给队列就回退- 配置类 多实现RabbitTemplate.ReturnsCallback 也需要注入 rabbitTemplate.setReturnsCallback(this);public class ConfirmCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback { @PostConstruct //相当于注入 该注解在其他注入注解完成之后执行 public void init(){ rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); } //当消息不发成功给队列是回退 @Override public void returnedMessage(ReturnedMessage returnedMessage) { log.info("消息:{}被交换机:{}退回的原因是:{},路由key是:{}",returnedMessage.getMessage(), returnedMessage.getExchange(),returnedMessage.getReplyText(),returnedMessage.getRoutingKey()); }- 发送消息//********************************************//高级发布@GetMapping("GJ/{message}")public String GJMessage(@PathVariable("message") String message){ log.info("高级发布:::当前时间:{},发送一条消息给队列:{}",new Date().toString(),message); CorrelationData correlationData = new CorrelationData("1"); //发送两条 一条成功 一条队列出错 让他回退 rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,ConfirmConfig.CONFIRM_ROUTINGKEY,message,correlationData); rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,ConfirmConfig.CONFIRM_ROUTINGKEY + "2",message + "two",correlationData); return message;}- 结果
  4. 可以看到 没有被成功被队列接受的消息被回退了。
  5. 备份 交换机- 上面被回退的消息,会被传输到备份交换机 通过他的消费者消费- 新建一个本分交换机和队列package com.atguigu.config;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class ConfirmConfig { public static final String CONFIRM_EXCHANGE = "confirm_exchange"; public static final String CONFIRM_QUEUE = "confirm_queue"; public static final String CONFIRM_ROUTINGKEY = "key1"; //备份交换机 public static final String BACKUP_EXCHANGE = "BACKUP_EXCHANGE"; //备份队列 public static final String BACKUP_QUEUE = "BACKUP_QUEUE"; //报警队列 public static final String WARNING_QUEUE = "BACKUP_QUEUE"; //声明备份交换机 @Bean("backExchange") public FanoutExchange backExchange(){ return new FanoutExchange(BACKUP_EXCHANGE); } //声明备份队列 @Bean("backQueue") public Queue backQueue(){ return QueueBuilder.durable(BACKUP_QUEUE).build(); } //声明备份队列 @Bean("warningQueue") public Queue warningQueue(){ return QueueBuilder.durable(WARNING_QUEUE).build(); }// @Bean("confirmExchange")// public DirectExchange confirmExchange(){//// return new DirectExchange(CONFIRM_EXCHANGE);// } @Bean("confirmExchange") public DirectExchange confirmExchange(){ //指明 未成功的给备份交换机BACKUP_EXCHANGE return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE) .durable(true).withArgument("alternate-exchange",BACKUP_EXCHANGE).build(); } @Bean("confirmQueue") public Queue confirmQueue(){ return QueueBuilder.durable(CONFIRM_QUEUE).build(); } @Bean public Binding queueToExchange(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange){ return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTINGKEY); } //绑定备份交换机 队列 @Bean public Binding backToExchange(@Qualifier("backQueue") Queue backQueue, @Qualifier("backExchange") FanoutExchange backExchange){ return BindingBuilder.bind(backQueue).to(backExchange); } //绑定报警队列 @Bean public Binding backWarnToExchange(@Qualifier("warningQueue") Queue warningQueue, @Qualifier("backExchange") FanoutExchange backExchange){ return BindingBuilder.bind(warningQueue).to(backExchange); }}- 新建消费者package com.atguigu.consumer;import com.atguigu.config.ConfirmConfig;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component@Slf4jpublic class WarningConsumer { @RabbitListener(queues = ConfirmConfig.WARNING_QUEUE) public void consumer(Message message) throws Exception{ log.info("报警发现不可以,消息是:{}",new String(message.getBody(),"UTF-8")); }}还是上面两次请求 一个可以一个不可以总结 : 没有爆消息回退,而是报警队列, 所以是备份交换机的优先级高一些

幂等性问题

  1. 发送一次消息,消费者消费之后,在给回应的中途断网了,导致交换机没有收到反馈,就会再发一次消息,让消费者重复消费,于是采用幂等性问题解决,就是根绝一系列的字符串拼接规则,生成唯一性id,在去数据库中查找是否以及完成消息。

优先级队列

  1. 代码Map<String,Object> map = new HashMap<>();//设置优先级 官网允许0-255 但是一般不超过10不然浪费cpumap.put("x-max-priority" ,10);channel.queueDeclare(QUERY_NAME,true,false,false,map);//发送消息for (int i = 0; i < 11; i++) { String message = "hello" + i; //将i未5的设计优先级 if (i==5){ AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build(); channel.basicPublish("",QUERY_NAME,properties,message.getBytes()); System.out.println("消息发送成功: 优先级为5"+message); }else{ channel.basicPublish("",QUERY_NAME,null,message.getBytes()); }​​}最后得到的结果是 i 为5的最先被消费

惰性队列

  1. 默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然 RabbitMQ 的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在消息量特别大的时候。在队列声明的时候可以通过“x-queue-mode”参数来设置队列的模式,取值为“default”和“lazy”。下面示例中演示了一个惰性队列的声明细节:Map<String, Object> args = new HashMap<String, Object>();args.put("x-queue-mode", "lazy");channel.queueDeclare("myqueue", false, false, false, args);

rabbitMQ 集群

  1. 先克隆两个虚拟机 创建完整克隆对象
标签: rabbitmq

本文转载自: https://blog.csdn.net/xjj168323/article/details/123912825
版权归原作者 小太阳啊~ 所有, 如有侵权,请联系我们删除。

“RabbitMQ”的评论:

还没有评论