0


RabbitMQ笔记

端午无聊,就来学学MQ吧


消息队列

消息指的是两个应用之间传递的数据

消息队列是在消息传递中保存消息的容器

生成者:只负责发送数据

消费者:只负责读取数据

(数据就是消息)

为什么要用消息队列

解耦

如果一个系统(系统a)的需求是共享自己系统的数据,而其他系统(系统BCD)是需求者。而这个时候系统a为了共享自己系统的数据,就需要分别写相关的代码(a->b)(a->c)(a->d)。假如某一时刻系统d的需求突然变了,他不在需要a系统的数据。这个时候,系统a的相关代码就要删除(a->d)。假如过了不久又有了一个新系统e需要系统a的数据,则系统a又需要增加新的代码。这样子系统的耦合性太高了,所以可以用MQ来降低耦合。系统a只需要把数据发送到MQ,其他系统如果需要数据,则从MQ中获取即可

异步

假如我们的操作为请求进入后又系统a响应调度系统bdc进行操作。这样子就是同步请求,响应的时间就是系统ABCD的总和。如果使用了MQ,系统a发送数据到MQ,然后就可以返回响应给客户端,不需要再等待其他调度系统的响应,可以大大地提高性能

削峰

对于高并发的场景下,如果关于sql的请求直接进入mysql进行执行,那么mysql很有可能崩溃,而导致系统的崩溃。而如果使用MQ,那么请求不再是直接发送sql到数据库,而是把数据发送到MQ,MQ短时间内积压数据是可以接受的,然后由消费者每次拉取2000条进行处理,以防止在请求峰值时期大量的请求直接发送到MySQl导致系统崩溃。

(上面了解了这么多终于到正片了)


RabbitMQ的特点

使用Erlang语言开发的,实现AMQP(高级消息队列协议)的开源消息中间件。

可靠性:支持持久化,传输确认,发布确认等保证了MQ的可靠性。灵活的分发消息策略、支持集群、多种协议、支持多种语言、客户端可视化管理界面插件机制

初体验

永远的hello world

pom配置:
<!--消息队列相关依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
生产者

上来啥也不会先照着做

上代码

​
​
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
​
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
​
import javax.annotation.Resource;
​
@Configuration
@Component
public class DirectRabbitConfig implements BeanPostProcessor {
    /*
    * 这是创建交换机和队列用的rabbitAdmin对象
    * */
    @Resource
    private RabbitAdmin rabbitAdmin;
    //初始化rabbitAdmin对象
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin=new RabbitAdmin(connectionFactory);
        //只有设置为true,spring 才会加载RabbitAdmin这个类
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
    //实例化bean后,也就是Bean的后置处理器
    @Override
    public Object postProcessAfterInitialization(Object bean,String beanName)throws BeansException{
        //创建交换机
        rabbitAdmin.declareExchange(rabbitmqDemoDirectExchange());
        //创建队列
        rabbitAdmin.declareQueue(rabbitmqDemoDirectQuece());
        return null;
    }
    @Bean
    public Queue rabbitmqDemoDirectQuece(){
        /*
        * 1.name:队列名称
        * 2.durable:是否持久化
        * 3.exclusive:是否独享
        * 4.autoDelete:是否自动删除
        * */
        return new Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC,true,false,false);
    }
​
    @Bean
    public DirectExchange rabbitmqDemoDirectExchange(){
        //Direct交换机
        return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE,true,false);
    }
​
    @Bean
    public Binding bindDirect(){
        //链式写法,绑定交换机和队列,并设置匹配键
        return BindingBuilder
                //绑定队列
                .bind(rabbitmqDemoDirectQuece())
                //到交换机
                .to(rabbitmqDemoDirectExchange())
                //并设置匹配键
                .with(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING);
    }
}
​
​
​
public class RabbitMQConfig {
    /*
    * RabbitMQ的队列主题名称
    * */
    public static final String RABBITMQ_DEMO_TOPIC="rabbitmqDemoTopic";
    /*
    * RabbitMQ的DIRECT交换机名称
    * */
    public static final String RABBITMQ_DEMO_DIRECT_EXCHANGE="rabbitmqDemoDirectExchange";
    /*
    * RabbitMQ的DIRECT交换机和队列绑定的匹配键DirectRouting
    * */
    public static final String RABBITMQ_DEMO_DIRECT_ROUTING="rabbitmqDemoDirectRouting";
​
}
​
​
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
​
import javax.annotation.Resource;
​
@RestController
@RequestMapping("/mall/rabbitmq")
public class RabbitMQController {
    @Resource
    private RabbitMQServiceImpl rabbitMQService;
    @PostMapping("/sendMsg")
    public String sendMsg(@RequestParam(name="msg") String msg) throws Exception{
        return rabbitMQService.sendMsg(msg);
    }
}
​
​
​
import org.example.productflashsalesystem.RabbitMQDemo.demo.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
​
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
​
@Service
public class RabbitMQServiceImpl {
    //日期格式化
    private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    @Resource
    private RabbitTemplate rabbitTemplate;
    public String sendMsg(String msg) throws Exception{
        try{
            String msgId= UUID.randomUUID().toString().replace("-","").substring(0,32);
            String sendTime=sdf.format(new Date());
            Map<String,Object> map=new HashMap<>();
            map.put("msgId",msgId);
            map.put("sendTime",sendTime);
            map.put("msg",msg);
            rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE,RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING,map);
            return "ok";
        }catch (Exception e){
            e.printStackTrace();
            return "error";
        }
    }
}

(这些放在一个包里)

启动

​
​
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
​
@SpringBootApplication
public class RabbitMQDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(RabbitMQDemoApplication.class,args);
    }
}

消费者

下面是消费者登场

​
​
import org.example.productflashsalesystem.RabbitMQDemo.demo.RabbitMQConfig;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
​
import java.util.Map;
​
@Component
//使用queuesToDeclare属性,如果不存在则会创建队列
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
public class RabbitDemoConsumer {
    @RabbitHandler
    public void process(Map map){
        System.out.println("消费者RabbitDemoConsumer从RabbitMQ服务端消费消息:"+map.toString());
    }
}
​
启动

​
​
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
​
@SpringBootApplication
public class RabbitMQDemocoApplication {
    public static void main(String[] args) {
        SpringApplication.run(RabbitMQDemocoApplication.class,args);
    }
}
​

(怎么感觉有点在上生物课了。。。。)

废话不多说接下去就是了解这些代码都写了什么对象!

RabbitMQ中的组成部分

Broker:消息队列服务进程。此进程包括两个部分:Exchange和Queue。

Exchange:消息队列交换机。按一定的规则将消息路由转发到某个队列。RabbitMQ有4种交换器(在后面会介绍比较常用的三种)

Binding:绑定,RabbitMQ中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个Key,告诉RabbitMQ如何正确地将消息路由到队列

Channel:多路复用连接中的一条独立的双向数据流通道,可读可写。Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作都是在Channel这个接口中完成的,包括定义Queue、Exchange、绑定Queue与Exchange、发布消息等等。在我们进行这些操作之前我们都需要与RabbitMQ建立一个Connection(Connection 表示到消息代理的真实 TCP 连接),但是如果每次访问RabbitMQ都需要建立Connection的话,在消息量大的时候建立TCP Connection的开销无疑也是巨大的,效率也比较低。这时候Channel起了作用。Channel是Connection中的虚拟连接(AMPQ连接),Channel可以复用Connection的TCP连接,当每个信道的流量不是很大时,复用单一的Connection可以在产生性能瓶颈的情况下有效地节省TCP连接资源。但是当信道的流量很大时,这时候多个信道复用一个Connection就会产生性能瓶颈,就需要开辟多个Connection,将这些信道均摊到这些Connection中。

Queue:消息队列,存储消息的队列

Producer:消息生产者。生产方客户端将消息同交换路由发送到队列中。

Consumer:消息消费者。消费队列中存储的消息。

流程图

img

消息生产者连接到RabbitMQ Broker,创建connection,开启channel

生产者声明交换机类型、名称、是否持久化等。

生产者发送消息,并指定消息是否持久化等属性和routing key。

exchange收到消息之后,根据routing key路由到跟当前交换机绑定的相匹配的队列里面。

消费者监听接收到消息之后开始业务处理。

(好吧,英语不好好难受。。。。)

Rabbit MQ消息模型

1650081010_1_.png

image _33_.png

Exchange

这个组件好像挺关键的,深入看看这么个事

消息发送到RabbitMQ后首先要经过Exchange路由才能对应的Queue

Direct Exchange

一对一的,点对点的发送。上面的代码就是例子

Fanout Exchange

发布订阅

Topic Exchange

通配符交换机

总结

比较常用的就是以上三种:直连(DirectExchange),发布订阅(FanoutExchange),通配符(TopicExchange)。熟练运用这三种交换机类型,基本上可以解决大部分的业务场景。

实际上稍微思考一下,可以发现通配符(TopicExchange)这种模式其实是可以达到直连(DirectExchange)和发布订阅(FanoutExchange)这两种的效果的。

FanoutExchange不需要绑定routingKey,所以性能相对TopicExchange会好一点。


本文转载自: https://blog.csdn.net/2301_79909038/article/details/139562694
版权归原作者 葡萄院下的lin 所有, 如有侵权,请联系我们删除。

“RabbitMQ笔记”的评论:

还没有评论