0


RabbitMQ详解,入门到基本使用

在讲述MQ之前我们先了解一下一些简单概念。
同步调用:比如打电话。优点:时效性很强。

支付服务要调用别的服务,调用了订单服务,在调用仓储服务,在以此调用别的,时间长。

服务流程如下:

缺点:

1.耦合度高。

2.性能下降。

3.资源浪费。

4.级联失败


异步调用:就比如微信发消息,可以和多个人发消息。

服务流程如下:

优点:1.服务解耦(比如支付之后,不发短信提醒了,不用修改代码,直接取消短信服务的订阅即可)

2.性能提高,吞吐量提高。

3.故障隔离,不担心级联失败。

4.流量削峰。

缺点:

1.对Broker的依赖性太强了。

2.架构复杂,业务没有明显的流程,不好管理


MQ:消息队列(MessageQueue),就是事件驱动架构中的Broker。主要用于分布式系统中进行异步任务调度和数据交换。

MQ产品主要有一下四种:

1.RabbitMQ

2.ActiveMQ

3.RocketMQ

4.Kafka

主要的区别如下:


RabbitMQ在Linux下的安装:

1.拉去容器

docker pull rabbitmq:management

2.创建镜像,并配置用户名和密码。(管理界面端口防火墙记得开发)

docker run -d --name my-rabbitmq \
2  -p 5672:5672 \ # AMQP协议端口
3  -p 15672:15672 \ # 管理界面端口
4  -e RABBITMQ_DEFAULT_USER=admin \
5  -e RABBITMQ_DEFAULT_PASS=admin \
6  rabbitmq:management

3.进入管理页面(页面如下)

输入网址:[IP地址:管理界面端口]

成功进入之后,我们看一下上面这6列:

1.Overview:总览。主要展示的是

MQ

的概要信息 。

2.Connections:连接。无论消费者还是生产者都要建立连接。

3.Channels:通道。 消费者,生产者都要创建通道来执行事务。

4.Exchanges:交换机。消息的路由器,把消息路由到队列。

5.Queues:队列。做消息的存储。

6.Admin:管理。创建用户,创建虚拟主机。

MQ的大致架构如下:


AMQP:

(Advanced Message Queuing Protocol)是高级消息队列协议的缩写,它是一个开放标准的应用层协议,主要设计用于分布式系统之间高效、可靠地传输消息。

AMQP不是某个具体的软件产品或服务,而是一种通用的标准接口,任何遵循AMQP协议的软件系统都可以实现相互之间的互联互通,无论它们是由何种编程语言编写,运行在什么操作系统之上。

SpingAMQP:

Spring AMQP(Spring Advanced Messaging Queuing Platform)是Spring框架的一部分,它是为简化在Java应用程序中使用高级消息队列协议(AMQP)而设计的一个抽象和便捷工具集。

简而言之,Spring AMQP的目标是让开发者更易于在Spring应用程序中使用消息队列服务,降低消息驱动架构的复杂性,提高开发效率。

SpringAMQP的基本使用:
1.基于简单队列的使用

生成者模块:

1.导入依赖:

        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.配置application.yml:(服务器防火墙端口记得开发嗷)

spring:
  rabbitmq:
    host: 43.138.67.251 # rabbitMQ的ip地址
    port: 5672 # 端口
    username: root
    password: 123456
    virtual-host: /

3.用RabbitTemplate发送消息:

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage2SimpleQueue(){
        String routingKey = "simple.queues";
        String message = "hello, spring amqp!";
        rabbitTemplate.convertAndSend(routingKey, message);
    }

消费者模块:

也是导入依赖,配置yml。配置一个Bean监听队列,spring项目启动时,加载Bean对队列进行监听,消息阅后即焚。

@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple")
    public  void listenSimplequeue(String msg) {
        System.out.println("接收到消息:"+msg);
    }
}

2.WorkQueue:工作消息队列

和上面的简单队列使用相比,添加多个消费者。

发布订阅模式:

了解一下交换机Exchange:

交换机的作用是把消息按照一定的规则路由给队列,交换机不能缓存消息,路由失败,消息丢失。

下面我们介绍几种常用的交换机:

1.Fanout Exchange:会将接受到的消息路由到每一个绑定的队列queue。

Fanout Exchange的使用:

先把交换机和队列绑定起来

@Configuration
public class FanoutConfig {

    //交换机 fanout
    @Bean
    public FanoutExchange fanoutExchange() {
        return  new FanoutExchange("fanout.exchange");
    }
    //队列1 fanout1.queue
    @Bean
    public Queue fanoutqueue1(){
        return  new Queue("fanout1.queue");
    }
    //队列2 fanout2.queue
    @Bean
    public Queue fanoutqueue2(){
        return  new Queue("fanout2.queue");
    }
    //队列1绑定到交换机
    @Bean
    public Binding bind1(Queue fanoutqueue1, FanoutExchange fanoutExchange) {
        return  BindingBuilder.bind(fanoutqueue1).to(fanoutExchange);
    }
    //队列2绑定到交换机
    @Bean
    public Binding bind2(Queue fanoutqueue2, FanoutExchange fanoutExchange) {
        return  BindingBuilder.bind(fanoutqueue2).to(fanoutExchange);
    }

}

消费者端监听消息:

    //Fanout1
    @RabbitListener(queues = "fanout1.queue")
    public  void FanoutexchengeQueue1(String msg){
        System.err.println("fanout1.queue接收到消息:["+msg+"]");
    }
    //Fanout2
    @RabbitListener(queues = "fanout2.queue")
    public  void FanoutexchengeQueue2(String msg){
        System.err.println("fanout1.queue接收到消息:["+msg+"]");
    }

生产者:

    //Fanout交换机
    @Test
    public  void senFanoutExchange() {
        String exchangename = "fanout.exchange";
        String msg = "Hello FanoutExchange";
        rabbitTemplate.convertAndSend(exchangename,"",msg);
    }

2.Direct Exchange:将接收到的消息,按照一定规则路由到指定的Queue。

Direct Exchange基本使用:

@RabbitListener里面指定参数会生成并绑定交换机和queue。每一个队列和交换机绑定时有一个BindingKye,生成者发送消息时带一个RoutingKey,交换机只把消息发到BindingKey和RoutingKey一致的queue。

    //Direct1
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct1.queue"),
            exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT),
            key = {"red","blue"}
    ))
    public  void listenerDirectExchange1(String msg) {
        System.err.println("direct1.queue接收到消息:["+msg+"]");
    }
    //Direct2
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct2.queue"),
            exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT),
            key = {"red","yellow"}
    ))
    public  void listenerDirectExchange2(String msg) {
        System.err.println("direct2.queue接收到消息:["+msg+"]");
    }

生成者:

    //Direct交换机
    @Test
    public  void senDirectExchange() {
        String exchangename = "direct.exchange";
        String msg = "Hello DirectExchange";
        rabbitTemplate.convertAndSend(exchangename,"blue",msg);
    }

3.Topic Exchange:类似Direct Exchange,区别是生产者发布消息时带的RoutingKey必须是多个单词的列表,并以**. ** 分割

Topic Exchange的使用:

消费者:

    //Topic1
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic1.queue"),
            exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
            key = {"china.#"}
    ))
    public  void listenTopicExchange1(String msg) {
        System.err.println("topic1.queue接收到消息:["+msg+"]");
    }
    //Topic2
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic2.queue"),
            exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
            key = {"#.news"}
    ))
    public  void listenTopicExchange2(String msg) {
        System.err.println("topic2.queue接收到消息:["+msg+"]");
    }

生产者:

    //Topic交换机
    @Test
    public  void senTopicExchange() {
        String exchangename = "topic.exchange";
        String msg = "Hello TopicExchange";
        rabbitTemplate.convertAndSend(exchangename,"aa.news",msg);
    }

SpringAMQP的消息转化器:

SpringAMQP发消息到RabbitMQ,会将消息序列化,用的是JDK序列化,这种方式性能比较低下。

并且为了将Java对象转换为AMQP消息,以及将AMQP消息的内容反序列化回Java对象,通常我们会使用

所以我们要设置一个消息转换器,换一种序列化方式。


本文转载自: https://blog.csdn.net/m0_66532138/article/details/136350299
版权归原作者 花载酒779 所有, 如有侵权,请联系我们删除。

“RabbitMQ详解,入门到基本使用”的评论:

还没有评论