端午无聊,就来学学MQ吧
消息队列
消息指的是两个应用之间传递的数据
消息队列是在消息传递中保存消息的容器
生成者:只负责发送数据
消费者:只负责读取数据
(数据就是消息)
为什么要用消息队列
解耦
如果一个系统(系统a)的需求是共享自己系统的数据,而其他系统(系统BCD)是需求者。而这个时候系统a为了共享自己系统的数据,就需要分别写相关的代码(a->b)(a->c)(a->d)。假如某一时刻系统d的需求突然变了,他不在需要a系统的数据。这个时候,系统a的相关代码就要删除(a->d)。假如过了不久又有了一个新系统e需要系统a的数据,则系统a又需要增加新的代码。这样子系统的耦合性太高了,所以可以用MQ来降低耦合。系统a只需要把数据发送到MQ,其他系统如果需要数据,则从MQ中获取即可
异步
假如我们的操作为请求进入后又系统a响应调度系统bdc进行操作。这样子就是同步请求,响应的时间就是系统ABCD的总和。如果使用了MQ,系统a发送数据到MQ,然后就可以返回响应给客户端,不需要再等待其他调度系统的响应,可以大大地提高性能
削峰
对于高并发的场景下,如果关于sql的请求直接进入mysql进行执行,那么mysql很有可能崩溃,而导致系统的崩溃。而如果使用MQ,那么请求不再是直接发送sql到数据库,而是把数据发送到MQ,MQ短时间内积压数据是可以接受的,然后由消费者每次拉取2000条进行处理,以防止在请求峰值时期大量的请求直接发送到MySQl导致系统崩溃。
(上面了解了这么多终于到正片了)
RabbitMQ的特点
使用Erlang语言开发的,实现AMQP(高级消息队列协议)的开源消息中间件。
可靠性:支持持久化,传输确认,发布确认等保证了MQ的可靠性。灵活的分发消息策略、支持集群、多种协议、支持多种语言、客户端可视化管理界面插件机制
初体验
永远的hello world
pom配置:
<!--消息队列相关依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
生产者
上来啥也不会先照着做
上代码
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Configuration
@Component
public class DirectRabbitConfig implements BeanPostProcessor {
/*
* 这是创建交换机和队列用的rabbitAdmin对象
* */
@Resource
private RabbitAdmin rabbitAdmin;
//初始化rabbitAdmin对象
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin=new RabbitAdmin(connectionFactory);
//只有设置为true,spring 才会加载RabbitAdmin这个类
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
//实例化bean后,也就是Bean的后置处理器
@Override
public Object postProcessAfterInitialization(Object bean,String beanName)throws BeansException{
//创建交换机
rabbitAdmin.declareExchange(rabbitmqDemoDirectExchange());
//创建队列
rabbitAdmin.declareQueue(rabbitmqDemoDirectQuece());
return null;
}
@Bean
public Queue rabbitmqDemoDirectQuece(){
/*
* 1.name:队列名称
* 2.durable:是否持久化
* 3.exclusive:是否独享
* 4.autoDelete:是否自动删除
* */
return new Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC,true,false,false);
}
@Bean
public DirectExchange rabbitmqDemoDirectExchange(){
//Direct交换机
return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE,true,false);
}
@Bean
public Binding bindDirect(){
//链式写法,绑定交换机和队列,并设置匹配键
return BindingBuilder
//绑定队列
.bind(rabbitmqDemoDirectQuece())
//到交换机
.to(rabbitmqDemoDirectExchange())
//并设置匹配键
.with(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING);
}
}
public class RabbitMQConfig {
/*
* RabbitMQ的队列主题名称
* */
public static final String RABBITMQ_DEMO_TOPIC="rabbitmqDemoTopic";
/*
* RabbitMQ的DIRECT交换机名称
* */
public static final String RABBITMQ_DEMO_DIRECT_EXCHANGE="rabbitmqDemoDirectExchange";
/*
* RabbitMQ的DIRECT交换机和队列绑定的匹配键DirectRouting
* */
public static final String RABBITMQ_DEMO_DIRECT_ROUTING="rabbitmqDemoDirectRouting";
}
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
@RequestMapping("/mall/rabbitmq")
public class RabbitMQController {
@Resource
private RabbitMQServiceImpl rabbitMQService;
@PostMapping("/sendMsg")
public String sendMsg(@RequestParam(name="msg") String msg) throws Exception{
return rabbitMQService.sendMsg(msg);
}
}
import org.example.productflashsalesystem.RabbitMQDemo.demo.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@Service
public class RabbitMQServiceImpl {
//日期格式化
private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Resource
private RabbitTemplate rabbitTemplate;
public String sendMsg(String msg) throws Exception{
try{
String msgId= UUID.randomUUID().toString().replace("-","").substring(0,32);
String sendTime=sdf.format(new Date());
Map<String,Object> map=new HashMap<>();
map.put("msgId",msgId);
map.put("sendTime",sendTime);
map.put("msg",msg);
rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE,RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING,map);
return "ok";
}catch (Exception e){
e.printStackTrace();
return "error";
}
}
}
(这些放在一个包里)
启动
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitMQDemoApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitMQDemoApplication.class,args);
}
}
消费者
下面是消费者登场
import org.example.productflashsalesystem.RabbitMQDemo.demo.RabbitMQConfig;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
//使用queuesToDeclare属性,如果不存在则会创建队列
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
public class RabbitDemoConsumer {
@RabbitHandler
public void process(Map map){
System.out.println("消费者RabbitDemoConsumer从RabbitMQ服务端消费消息:"+map.toString());
}
}
启动
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitMQDemocoApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitMQDemocoApplication.class,args);
}
}
(怎么感觉有点在上生物课了。。。。)
废话不多说接下去就是了解这些代码都写了什么对象!
RabbitMQ中的组成部分
Broker:消息队列服务进程。此进程包括两个部分:Exchange和Queue。
Exchange:消息队列交换机。按一定的规则将消息路由转发到某个队列。RabbitMQ有4种交换器(在后面会介绍比较常用的三种)
Binding:绑定,RabbitMQ中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个Key,告诉RabbitMQ如何正确地将消息路由到队列
Channel:多路复用连接中的一条独立的双向数据流通道,可读可写。Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作都是在Channel这个接口中完成的,包括定义Queue、Exchange、绑定Queue与Exchange、发布消息等等。在我们进行这些操作之前我们都需要与RabbitMQ建立一个Connection(Connection 表示到消息代理的真实 TCP 连接),但是如果每次访问RabbitMQ都需要建立Connection的话,在消息量大的时候建立TCP Connection的开销无疑也是巨大的,效率也比较低。这时候Channel起了作用。Channel是Connection中的虚拟连接(AMPQ连接),Channel可以复用Connection的TCP连接,当每个信道的流量不是很大时,复用单一的Connection可以在产生性能瓶颈的情况下有效地节省TCP连接资源。但是当信道的流量很大时,这时候多个信道复用一个Connection就会产生性能瓶颈,就需要开辟多个Connection,将这些信道均摊到这些Connection中。
Queue:消息队列,存储消息的队列
Producer:消息生产者。生产方客户端将消息同交换路由发送到队列中。
Consumer:消息消费者。消费队列中存储的消息。
流程图
消息生产者连接到RabbitMQ Broker,创建connection,开启channel
生产者声明交换机类型、名称、是否持久化等。
生产者发送消息,并指定消息是否持久化等属性和routing key。
exchange收到消息之后,根据routing key路由到跟当前交换机绑定的相匹配的队列里面。
消费者监听接收到消息之后开始业务处理。
(好吧,英语不好好难受。。。。)
Rabbit MQ消息模型
Exchange
这个组件好像挺关键的,深入看看这么个事
消息发送到RabbitMQ后首先要经过Exchange路由才能对应的Queue
Direct Exchange
一对一的,点对点的发送。上面的代码就是例子
Fanout Exchange
发布订阅
Topic Exchange
通配符交换机
总结
比较常用的就是以上三种:直连(DirectExchange),发布订阅(FanoutExchange),通配符(TopicExchange)。熟练运用这三种交换机类型,基本上可以解决大部分的业务场景。
实际上稍微思考一下,可以发现通配符(TopicExchange)这种模式其实是可以达到直连(DirectExchange)和发布订阅(FanoutExchange)这两种的效果的。
FanoutExchange不需要绑定routingKey,所以性能相对TopicExchange会好一点。
版权归原作者 葡萄院下的lin 所有, 如有侵权,请联系我们删除。