文章目录
消息队列
概述
两大种类
RabbitMQ
安装及基操
Docker中安装
**下载镜像:
docker pull rabbitmq:management
**
创建实例并启动:
docker run -d --name rabbitmq --publish 5671:5671 \
--publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672 \
rabbitmq:management
- 4369 – erlang发现口
- 5672 --client端通信口
- 15672 – 管理界面ui端口
- 25672 – server间内部通信口
在web浏览器中输入地址:http://服务器ip:15672/
输入默认账号: guest : guest
overview:概览
connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况
channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。
Exchanges:交换机,用来实现消息的路由
Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。
端口:
5672: rabbitMq的编程语言客户端连接端口
15672:rabbitMq管理界面端口
25672:rabbitMq集群的端口
添加用户
如果不使用guest,我们也可以自己创建一个用户:
1、 超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
2、 监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
3、 策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
4、 普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
5、 其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
创建Virtual Hosts
虚拟主机:类似于mysql中的database。他们都是以“/”开头
设置权限
添加交换机
创建队列
交换机绑定队列
绑定成功
五种消息模型
RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此不予学习。那么也就剩下5种。
但是其实3、4、5这三种都属于订阅模型,只不过进行路由的方式不同。
- 点对点
- 多人监听
- 主题模式
- 字符串精确匹配传输消息的广播
- 字符串支持通配符匹配传输消息的广播
SpringBoot整合MQ
引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
properties配置
# ip
spring.rabbitmq.host=192.168.11.130
# 端口
spring.rabbitmq.port=5672
# virtualHost
spring.rabbitmq.virtual-host=/
# 开启发送端消息抵达Broker确认
spring.rabbitmq.publisher-confirms=true
# 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
# 只要消息抵达Queue,就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatory=true
# 手动ack消息,不使用默认的消费端确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
开启RabbitMQ
// 开启RabbitMQ支持@EnableRabbit@SpringBootApplicationpublicclassApplication{publicstaticvoidmain(String[] args){SpringApplication.run(Application.class, args);}}
API使用
创建交换机
创建队列
交换机绑定队列
发送消息
用JSON发送含有对象的消息,需要自定义配置类
@ConfigurationpublicclassMyRabbitConfig{@BeanpublicMessageConvertermessageConverter(){returnnewJackson2JsonMessageConverter();}}
接收消息
消息确认机制
RabbitMQ配置类
@ConfigurationpublicclassMyRabbitConfig{privateRabbitTemplate rabbitTemplate;@Primary@BeanpublicRabbitTemplaterabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate(connectionFactory);this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setMessageConverter(messageConverter());initRabbitTemplate();return rabbitTemplate;}@BeanpublicMessageConvertermessageConverter(){returnnewJackson2JsonMessageConverter();}/**
* 定制RabbitTemplate
* 1、服务收到消息就会回调
* 1、spring.rabbitmq.publisher-confirms: true
* 2、设置确认回调
* 2、消息正确抵达队列就会进行回调
* 1、spring.rabbitmq.publisher-returns: true
* spring.rabbitmq.template.mandatory: true
* 2、设置确认回调ReturnCallback
*
* 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
*
*/// @PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法publicvoidinitRabbitTemplate(){/**
* 1、只要消息抵达Broker就ack=true
* correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
* ack:消息是否成功收到
* cause:失败的原因
*///设置确认回调
rabbitTemplate.setConfirmCallback((correlationData,ack,cause)->{System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");});/**
* 只要消息没有投递给指定的队列,就触发这个失败回调
* message:投递失败的消息详细信息
* replyCode:回复的状态码
* replyText:回复的文本内容
* exchange:当时这个消息发给哪个交换机
* routingKey:当时这个消息用哪个路邮键
*/
rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey)->{System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]"+"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");});}}
延时队列
相当于定时任务
@Configuration@Slf4jpublicclassMyMqConfig{/* 容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ)不存在的情况下 *//* RabbitMq里面如果有,即使属性发生变化,也不会覆盖
/*
* @Description 延时消息队列-死信队列
* @Author WSKH
*/@BeanpublicQueueorderDelayQueue(){/*
Queue(String name, 队列名字
boolean durable, 是否持久化
boolean exclusive, 是否排他
boolean autoDelete, 是否自动删除
Map<String, Object> arguments) 属性
*/HashMap<String,Object> arguments =newHashMap<>();
arguments.put("x-dead-letter-exchange","order-event-exchange");
arguments.put("x-dead-letter-routing-key","order.release.order");// 消息过期时间 1分钟
arguments.put("x-message-ttl",60000);returnnewQueue(MqConstants.ORDER_DELAY_QUEUE,true,false,false, arguments);}/**
* @Description 普通队列-延时队列死亡后,消息经过exchange送给普通队列,接收者负责删除订单
* @Author WSKH
*/@BeanpublicQueueorderReleaseQueue(){returnnewQueue(MqConstants.ORDER_RELEASE_ORDER_QUEUE,true,false,false);}/**
* @Description 创建order交换机
* @Author WSKH
*/@BeanpublicExchangeorderEventExchange(){/*
* String name,
* boolean durable,
* boolean autoDelete,
* Map<String, Object> arguments
* */returnnewTopicExchange(MqConstants.ORDER_EVENT_EXCHANGE,true,false);}/**
* @Description 延时队列和order交换机的绑定(创建订单)
* @Author WSKH
*/@BeanpublicBindingorderCreateBinding(){/*
* String destination, 目的地(队列名或者交换机名字)
* DestinationType destinationType, 目的地类型(Queue、Exhcange)
* String exchange,
* String routingKey,
* Map<String, Object> arguments
* */returnnewBinding(MqConstants.ORDER_DELAY_QUEUE,Binding.DestinationType.QUEUE,MqConstants.ORDER_EVENT_EXCHANGE,"order.create.order",null);}/**
* @Description 普通队列和order交换机的绑定(负责释放到时间还没有支付的订单)
* @Author WSKH
*/@BeanpublicBindingorderReleaseBinding(){returnnewBinding(MqConstants.ORDER_RELEASE_ORDER_QUEUE,Binding.DestinationType.QUEUE,MqConstants.ORDER_EVENT_EXCHANGE,"order.release.order",null);}}
版权归原作者 WSKH0929 所有, 如有侵权,请联系我们删除。