0


B080-RabbitMQ

目录

RabbitMQ认识

概念

MQ全称为Message Queue,即消息队列. 它也是一个队列,遵循FIFO原则 。RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue Protocol高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。官方地址:http://www.rabbitmq.com/

使用场景

在这里插入图片描述

优点

任务异步处理:
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。(丢进去由接收方分别异步处理)

消除峰值:
异步化提速(发消息),提高系统稳定性(多系统调用),服务解耦(5-10个服务),排序保证,消除峰值
(放入队列中不用马上都处理完,有中间状态,消息分发后可由多个订阅方分别异步处理)

服务解耦:
应用程序解耦合 MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
(将单体业务拆分为生产者,消息队列和消费者)

AMQP协议

AMQP是一套公开的消息队列协议,最早在2003年被提出,它旨在从协议层定义消息通信数据的标准格式, 为的就是解决MQ市场上协议不统一的问题。RabbitMQ就是遵循AMQP标准协议开发的MQ服务。
(其他Python,C#,PHP也都能用)

JMS

JMS是Java消息服务,是java提供的一套消息服务API标准,其目的是为所有的java应用程序提供统一的消息通信的标准,类似java的 jdbc,只要遵循jms标准的应用程序之间都可以进行消息通信。它和AMQP有什么不同,jms是java语言专属的消息服务标准,它是在api层定义标准,并且只能用于java应用;而AMQP是在协议层定义的标准,是跨语言的。
(只能Java用,基本已经被摒弃)

RabbitMQ安装

安装elang

otp_win64_20.2.exe
配置环境变量

安装RabbitMQ

rabbitmq-server-3.7.4.exe
可通过任务管理器或开始菜单启动或关闭服务

安装管理插件

安装rabbitMQ的管理插件,方便在浏览器端管理RabbitMQ ,进入到RabbitMQ的sbin目录,使用cmd执行命令: rabbitmq-plugins.bat enable rabbitmq_management , 安装成功后重新启动RabbitMQ
(开启可视化界面)

重启MQ

登录RabbitMQ

进入浏览器,输入:http://localhost:15672,初始账号和密码:guest/guest
在这里插入图片描述

消息队列的工作流程

在这里插入图片描述

RabbitMQ常用模型

HelloWorld-基本消息模型

一个生产者与一个消费者
在这里插入图片描述

生产者发送消息
导包
<dependencies><!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><!--和springboot2.0.5对应--><version>5.4.1</version></dependency></dependencies>
获取链接工具类
importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;publicclassConnectionUtil{/**
     * 建立与RabbitMQ的连接
     * @return
     * @throws Exception
     */publicstaticConnectiongetConnection()throwsException{//定义连接工厂ConnectionFactory factory =newConnectionFactory();//设置服务地址
        factory.setHost("127.0.0.1");//端口,和管理端端口15672不一样,管理端是另外一台网页版的系统,5672才是MQ本身
        factory.setPort(5672);//设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("/");//集群的时候才用这个参数
        factory.setUsername("guest");
        factory.setPassword("guest");// 通过工程获取连接Connection connection = factory.newConnection();return connection;}}
消息的生产者
importcn.itsource.utils.ConnectionUtil;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;//消息的生产者publicclassSender{publicstaticfinalStringHELLO_QUEUE="hello_queue";publicstaticvoidmain(String[] args)throwsException{//1.获取连接Connection conn =ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//3.创建队列(hello这里用默认的交换机)/*  String queue :队列的名字,可自定义,   
        boolean durable: 持久化,   
        boolean exclusive:是否独占;大家都能用,传false,
        boolean autoDelete: 用完即删;关了就没了,消费者还要拿,所以传false,
        Map<String, Object> arguments:没有其他要传的属性就传false          */
        channel.queueDeclare(HELLO_QUEUE,true,false,false,null);String msg="今天中午吃啥";//4.发送消息
        channel.basicPublish("",HELLO_QUEUE,null, msg.getBytes());

        channel.close();
        conn.close();}}

在这里插入图片描述

消费者消费消息
模拟消费者
importcom.rabbitmq.client.*;importjava.io.IOException;//模拟消费者publicclassConsume{publicstaticvoidmain(String[] args)throwsException{//1.获取连接Connection conn =ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//回调,可新建类实现Consumer接口或继承DefaultConsumer类或用匿名内部类覆写处理方法Consumer callback=newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("consumerTag:"+consumerTag);// 消费者标识System.out.println("envelope:"+envelope);// 消息队列里面的一些公共属性System.out.println("消息内容:"+newString(body));System.out.println("消费完成----------------");}};//3.监听队列/*
            queue :队列名字
            autoAck:自动签收
            Consumer callback: 回调
         */
        channel.basicConsume(Sender.HELLO_QUEUE,false,callback);}}

在这里插入图片描述
在这里插入图片描述
只要消费者不关,生产者发一次消息消费者就自动监听消费一次消息

手动签收消息
importcn.itsource.utils.ConnectionUtil;importcom.rabbitmq.client.*;importjava.io.IOException;//模拟消费者publicclassConsume{publicstaticvoidmain(String[] args)throwsException{//1.获取连接Connection conn =ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//回调,可新建类实现Consumer接口或继承DefaultConsumer类或用匿名内部类覆写处理方法Consumer callback=newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("consumerTag:"+consumerTag);// 消费者标识System.out.println("envelope:"+envelope);// 消息队列里面的一些公共属性//                System.out.println(1/0);System.out.println("消息内容:"+newString(body));System.out.println("消费完成----------------");//所有业务逻辑结束以后 手动签收
                channel.basicAck(envelope.getDeliveryTag(),false);// 第二个参数为是否同时签收多个,传false}};//3.监听队列/*
            queue :队列名字
            autoAck:自动签收    签收不等于消费成功,处理逻辑走完没有报错才算签收成功
            Consumer callback: 回调
         */
        channel.basicConsume(Sender.HELLO_QUEUE,false,callback);}}

Work Queues

在这里插入图片描述
一个生产者与多个消费者。
默认轮询,也可以改成能者多劳

Sender
//消息的生产者/*
    如果有多个消费者监听同一个队列,默认轮询
 */publicclassSender{publicstaticfinalStringWORK_QUEUE="work_queue";publicstaticvoidmain(String[] args)throwsException{//1.获取连接Connection conn =ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//3.创建队列/*
        String queue :队列的名字
        boolean durable: 持久化
        boolean exclusive:是否独占
        boolean autoDelete: 用完即删
        Map<String, Object> arguments
         */
        channel.queueDeclare(WORK_QUEUE,true,false,false,null);String msg="今天中午吃啥";//4.发送消息
        channel.basicPublish("",WORK_QUEUE,null, msg.getBytes());

        channel.close();
        conn.close();}}
Consume1
//模拟消费者publicclassConsume1{publicstaticvoidmain(String[] args)throwsException{//1.获取连接Connection conn =ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量//        channel.basicQos(1);//回调Consumer callback=newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+newString(body));//                try {//                    Thread.sleep(100);//                } catch (InterruptedException e) {//                    e.printStackTrace();//                }System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收
                channel.basicAck(envelope.getDeliveryTag(),false);}};//3.监听队列/*
            queue :队列名字
            autoAck:自动签收    签收 不等于  消费成功
            Consumer callback: 回调
         */
        channel.basicConsume(Sender.WORK_QUEUE,false,callback);}}
Consume2
//模拟消费者publicclassConsume2{publicstaticvoidmain(String[] args)throwsException{//1.获取连接Connection conn =ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量//        channel.basicQos(1);//回调Consumer callback=newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);System.out.println("消息内容:"+newString(body));//                try {//                    Thread.sleep(10000);//                } catch (InterruptedException e) {//                    e.printStackTrace();//                }System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收
                channel.basicAck(envelope.getDeliveryTag(),false);}};//3.监听队列/*
            queue :队列名字
            autoAck:自动签收    签收 不等于  消费成功
            Consumer callback: 回调
         */
        channel.basicConsume(Sender.WORK_QUEUE,false,callback);}}

订阅模型-FANOUT-广播

在这里插入图片描述
在广播模式下,消息发送流程是这样的:
1) 可以有多个消费者
2) 每个消费者有自己的queue(队列)
3) 每个队列都要绑定到Exchange(交换机)
4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
5) 交换机把消息发送给绑定过的所有队列
6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

Sender
//消息的生产者/*
    变化
        1.不创建 队列
        2.创建交换机
        3.给交换机发送消息
 */publicclassSender{publicstaticfinalStringFANOUT_EXCHANGE="fanout_exchange";publicstaticvoidmain(String[] args)throwsException{//1.获取连接Connection conn =ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//3.创建交换机/*
            exchange:交换机的名字
            type:交换机的类型
            durable:是否持久化
         */
        channel.exchangeDeclare(FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT,true);String msg="今天晚上吃啥";//4.发送消息
        channel.basicPublish(FANOUT_EXCHANGE,"",null, msg.getBytes());

        channel.close();
        conn.close();}}
Consume1
//模拟消费者/*
    1.创建队列
    2.队列绑定到交换机
    3.每个消费者要监听自己的队列
 */publicclassConsume1{publicstaticfinalStringFANOUT_QUEUE1="fanout_queue1";publicstaticvoidmain(String[] args)throwsException{//1.获取连接Connection conn =ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量
        channel.basicQos(1);//创建队列
        channel.queueDeclare(FANOUT_QUEUE1,true,false,false,null);//绑定到交换机
        channel.queueBind(FANOUT_QUEUE1,Sender.FANOUT_EXCHANGE,"");//回调Consumer callback=newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+newString(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收
                channel.basicAck(envelope.getDeliveryTag(),false);}};//3.监听队列/*
            queue :队列名字
            autoAck:自动签收    签收 不等于  消费成功
            Consumer callback: 回调
         */
        channel.basicConsume(FANOUT_QUEUE1,false,callback);}}
Consume2
//模拟消费者/*
    1.创建队列
    2.队列绑定到交换机
    3.每个消费者要监听自己的队列
 */publicclassConsume2{publicstaticfinalStringFANOUT_QUEUE2="fanout_queue2";publicstaticvoidmain(String[] args)throwsException{//1.获取连接Connection conn =ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量
        channel.basicQos(1);//创建队列
        channel.queueDeclare(FANOUT_QUEUE2,true,false,false,null);//绑定到交换机
        channel.queueBind(FANOUT_QUEUE2,Sender.FANOUT_EXCHANGE,"");//回调Consumer callback=newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+newString(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收
                channel.basicAck(envelope.getDeliveryTag(),false);}};//3.监听队列/*
            queue :队列名字
            autoAck:自动签收    签收 不等于  消费成功
            Consumer callback: 回调
         */
        channel.basicConsume(FANOUT_QUEUE2,false,callback);}}

订阅模型-Direct-定向

在这里插入图片描述
把消息交给符合指定routing key 的队列 一堆或一个

Sender
//消息的生产者/*
    变化
        1.交换机类型
        2.给交换机发送消息,指定 routing key
 */publicclassSender{publicstaticfinalStringDIRECT_EXCHANGE="direct_exchange";publicstaticvoidmain(String[] args)throwsException{//1.获取连接Connection conn =ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//3.创建交换机/*
            exchange:交换机的名字
            type:交换机的类型
            durable:是否持久化
         */
        channel.exchangeDeclare(DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT,true);String msg="今天晚上吃啥";//4.发送消息
        channel.basicPublish(DIRECT_EXCHANGE,"dept",null, msg.getBytes());

        channel.close();
        conn.close();}}
Consume1
//模拟消费者/*
    1.指定routing key
 */publicclassConsume1{publicstaticfinalStringDIRECT_QUEUE1="direct_queue1";publicstaticvoidmain(String[] args)throwsException{//1.获取连接Connection conn =ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量
        channel.basicQos(1);//创建队列
        channel.queueDeclare(DIRECT_QUEUE1,true,false,false,null);//绑定到交换机
        channel.queueBind(DIRECT_QUEUE1,Sender.DIRECT_EXCHANGE,"emp.delete");//回调Consumer callback=newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+newString(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收
                channel.basicAck(envelope.getDeliveryTag(),false);}};//3.监听队列/*
            queue :队列名字
            autoAck:自动签收    签收 不等于  消费成功
            Consumer callback: 回调
         */
        channel.basicConsume(DIRECT_QUEUE1,false,callback);}}
Consume2
//模拟消费者/*
    1.指定routing key
 */publicclassConsume2{publicstaticfinalStringDIRECT_QUEUE2="direct_queue2";publicstaticvoidmain(String[] args)throwsException{//1.获取连接Connection conn =ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量
        channel.basicQos(1);//创建队列
        channel.queueDeclare(DIRECT_QUEUE2,true,false,false,null);//绑定到交换机
        channel.queueBind(DIRECT_QUEUE2,Sender.DIRECT_EXCHANGE,"dept");//回调Consumer callback=newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+newString(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收
                channel.basicAck(envelope.getDeliveryTag(),false);}};//3.监听队列/*
            queue :队列名字
            autoAck:自动签收    签收 不等于  消费成功
            Consumer callback: 回调
         */
        channel.basicConsume(DIRECT_QUEUE2,false,callback);}}

订阅模型-Topic-通配符

在这里插入图片描述
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: goods.insert

通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词

Sender
//消息的生产者/*
    变化
        1.交换机类型
        2.给交换机发送消息,指定 routing key
 */publicclassSender{publicstaticfinalStringTOPIC_EXCHANGE="topic_exchange";publicstaticvoidmain(String[] args)throwsException{//1.获取连接Connection conn =ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//3.创建交换机/*
            exchange:交换机的名字
            type:交换机的类型
            durable:是否持久化
         */
        channel.exchangeDeclare(TOPIC_EXCHANGE,BuiltinExchangeType.TOPIC,true);String msg="今天晚上吃啥";//4.发送消息
        channel.basicPublish(TOPIC_EXCHANGE,"user.insert.add.pubilsh",null, msg.getBytes());

        channel.close();
        conn.close();}}
Consume1
//模拟消费者/*
    1.指定routing key
 */publicclassConsume1{publicstaticfinalStringTOPIC_QUEUE1="topic_queue1";publicstaticvoidmain(String[] args)throwsException{//1.获取连接Connection conn =ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量
        channel.basicQos(1);//创建队列
        channel.queueDeclare(TOPIC_QUEUE1,true,false,false,null);//绑定到交换机/*
            #.1到多个单词
            *. 一个单词
         */
        channel.queueBind(TOPIC_QUEUE1,Sender.TOPIC_EXCHANGE,"user.#");//回调Consumer callback=newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+newString(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收
                channel.basicAck(envelope.getDeliveryTag(),false);}};//3.监听队列/*
            queue :队列名字
            autoAck:自动签收    签收 不等于  消费成功
            Consumer callback: 回调
         */
        channel.basicConsume(TOPIC_QUEUE1,false,callback);}}
Consume2
//模拟消费者/*
    1.指定routing key
 */publicclassConsume2{publicstaticfinalStringTOPIC_QUEUE2="topic_queue2";publicstaticvoidmain(String[] args)throwsException{//1.获取连接Connection conn =ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量
        channel.basicQos(1);//创建队列
        channel.queueDeclare(TOPIC_QUEUE2,true,false,false,null);//绑定到交换机/*
            #.1到多个单词
            *. 一个单词
         */
        channel.queueBind(TOPIC_QUEUE2,Sender.TOPIC_EXCHANGE,"email.*");//回调Consumer callback=newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+newString(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收
                channel.basicAck(envelope.getDeliveryTag(),false);}};//3.监听队列/*
            queue :队列名字
            autoAck:自动签收    签收 不等于  消费成功
            Consumer callback: 回调
         */
        channel.basicConsume(TOPIC_QUEUE2,false,callback);}}

总结

01_hello

生产者    1.获取连接    2.获取通道    3.创建队列    4.发送消息

消费者    1.获取连接    2.获取通道    3.监听队列 (并回调)

02_workqueue    默认轮询        可修改(能者多劳)

生产者    1.获取连接    2.获取通道    3.创建队列    4.发送消息

消费者    1.获取连接    2.获取通道    3.监听队列 (并回调)

03_fanout    广播    将消息交给所有绑定到交换机的队列(多个消费者都能收到)

生产者    1.获取连接    2.获取通道    3.创建交换机    4.发送消息到交换机

消费者    1.获取连接    2.获取通道    创建队列    绑定到交换机    3.监听队列 (并回调)

04_direct    定向    把消息交给符合指定 routing key 的队列 一堆或一个

生产者    1.获取连接    2.获取通道    3.创建交换机    4.发送消息到交换机

消费者    1.获取连接    2.获取通道    创建队列    绑定到交换机    3.监听队列 (并回调)

05_topic        通配符    把消息交给符合routing pattern (路由模式) 的队列 一堆或一个

生产者    1.获取连接    2.获取通道    3.创建交换机    4.发送消息到交换机

消费者    1.获取连接    2.获取通道    创建队列    绑定到交换机    3.监听队列 (并回调)

SpringBoot集成RabbitMQ

导包

<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.5.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><!--spirngboot集成rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies>

yml

server:port:44000spring:application:name: test‐rabbitmq‐producer
  rabbitmq:host: 127.0.0.1
    port:5672username: guest
    password: guest
    virtualHost: /
    listener:simple:acknowledge-mode: manual #手动签收prefetch:1#消费者的消息并发处理数量#publisher-confirms: true #消息发送到交换机失败回调#publisher-returns: true  #消息发送到队列失败回调template:mandatory:true# 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃

config

importorg.springframework.amqp.core.*;importorg.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;importorg.springframework.amqp.rabbit.connection.ConnectionFactory;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;importorg.springframework.amqp.support.converter.Jackson2JsonMessageConverter;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitMQConfig{publicstaticfinalStringEXCHANGE_SPRINGBOOT="exchange_springboot";publicstaticfinalStringQUEUE1_SPRINGBOOT="queue1_springboot";publicstaticfinalStringQUEUE2_SPRINGBOOT="queue2_springboot";//创建一个交换机@BeanpublicExchangecreateExchange(){returnExchangeBuilder.topicExchange(EXCHANGE_SPRINGBOOT).durable(true).build();}//创建两个队列@BeanpublicQueuecreateQueue1(){returnnewQueue(QUEUE1_SPRINGBOOT,true);}@BeanpublicQueuecreateQueue2(){returnnewQueue(QUEUE2_SPRINGBOOT,true);}//把交换机和队列绑定到一起@BeanpublicBindingbind1(){returnBindingBuilder.bind(createQueue1()).to(createExchange()).with("user.*").noargs();}@BeanpublicBindingbind2(){returnBindingBuilder.bind(createQueue2()).to(createExchange()).with("email.*").noargs();}//消费者 还原对象方式(从MQ里取出json转为对象)@Bean("rabbitListenerContainerFactory")publicRabbitListenerContainerFactory<?>rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory =newSimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(newJackson2JsonMessageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setPrefetchCount(1);return factory;}//放到消息队列里面的转换(转为json存进MQ)@BeanpublicRabbitTemplaterabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(newJackson2JsonMessageConverter());return rabbitTemplate;}}

producer

importorg.junit.Test;importorg.junit.runner.RunWith;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.junit4.SpringRunner;@SpringBootTest(classes =App.class)@RunWith(SpringRunner.class)publicclassSender{@AutowiredRabbitTemplate rabbitTemplate;@Testpublicvoidtest(){/*
            问题:多系统之间 信息交互  传递对象
               解决方案:转换为json存储
               实现:
                    1.fastjson    对象 - josn  (作业)
                    2.重写转换器模式
        */for(int i =0; i <5; i++){
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_SPRINGBOOT,"email.save",newUser(1L,"文达"));}System.out.println("消息发送完毕");}}

consumer

importcom.rabbitmq.client.Channel;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.messaging.handler.annotation.Payload;importorg.springframework.stereotype.Component;importjava.io.IOException;//消费者@ComponentpublicclassConsu{@RabbitListener(queues ={RabbitMQConfig.QUEUE1_SPRINGBOOT},containerFactory ="rabbitListenerContainerFactory")//用这个转换器接publicvoiduser(@PayloadUser user,Channel channel,Message message)throwsIOException{System.out.println(message);System.out.println("user队列:"+user);//手动签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}@RabbitListener(queues ={RabbitMQConfig.QUEUE2_SPRINGBOOT})publicvoidemail(@PayloadUser user,Channel channel,Message message )throwsIOException{System.out.println(message);System.out.println("email队列:"+user);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}

队列内容可传string,entity序列化对象,json对象,

标签: rabbitmq

本文转载自: https://blog.csdn.net/ffhgjgj6576567/article/details/132595532
版权归原作者 XIAOMING820 所有, 如有侵权,请联系我们删除。

“B080-RabbitMQ”的评论:

还没有评论