1.docker 安装
阿里云镜像存储服务
registry.cn-beijing.aliyuncs.com/xxkapp/rebbitmq:3.10
2.Docker 运行 并且设置开启自启
docker run -d --restart=always --name rabbitmq -p 5672:5672 -p 15672:15672 registry.cn-beijing.aliyuncs.com/xxkapp/rebbitmq:3.10
3. SpringBoot使用MQ 引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
4.DirectConsumer
@Configuration
public class DirectConsumer {
/*注册一个队列*/
@Bean
public Queue queue(){
return QueueBuilder.durable("q01").maxLength(10).build();
}
/*注册交换机*/
@Bean
public DirectExchange exchange(){
return ExchangeBuilder.directExchange("d_ex01").durable(true).build();
}
/*绑定队列与交换机*/
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(exchange()).with("rk01");
}
@RabbitListener(queues = "q01")
public void consume(String msg){
System.out.println("Consume1"+msg);
}
}
5.DirectProvider
@Service
public class DirectProvider {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String routeKey,String msg){
rabbitTemplate.convertAndSend("d_ex01",routeKey,msg);
}
}
6.测试 DirectTests
@SpringBootTest
class DirectTests {
@Autowired
private DirectProvider directProvider;
@Test
void contextLoads() throws IOException {
for (int i = 0; i < 5; i++) {
directProvider.send("rk01","测试 你好");
}
System.in.read();
}
}
7. 新增一个消费者
@Configuration
public class DirectConsumer2 {
@RabbitListener(queues = "q01")
public void consume(String msg){
System.out.println("Consume2"+msg);
}
}
结果:
Consume1测试 你好
Consume2测试 你好
Consume1测试 你好
Consume2测试 你好
Consume1测试 你好
当一个队列有多个消费者的时候 队列会把消息均发给消费者 平均消费
7 新开一个队列 一个消费者
@Configuration
public class DirectConsumer2 {
/*注册一个队列*/
@Bean
public Queue queue2(){
return QueueBuilder.durable("q02").maxLength(10).build();
}
/*注册交换机*/
@Bean
public DirectExchange exchange2(){
return ExchangeBuilder.directExchange("d_ex01").durable(true).build();
}
/*绑定队列与交换机*/
@Bean
public Binding binding2(){
return BindingBuilder.bind(queue2()).to(exchange2()).with("rk01");
}
@RabbitListener(queues = "q02")
public void consume(String msg){
System.out.println("Consume2"+msg);
}
}
结果:
Consume1测试 你好
Consume2测试 你好
Consume2测试 你好
Consume1测试 你好
Consume2测试 你好
Consume2测试 你好
Consume1测试 你好
Consume2测试 你好
Consume1测试 你好
Consume1测试 你好
多个队列绑定同一个交换机 交换机会复制消息投递到不同的队列 换句话说 每个队列拿取的都是完整消息
8. FanoutExchange 广播模式
@Configuration
public class FanoutConsumer {
/*注册一个队列*/
@Bean
public Queue fanoutQueue1(){
return QueueBuilder.durable("Fanout_Q01").build();
}
/*交换机*/
@Bean
public FanoutExchange fanoutExchange(){
return ExchangeBuilder.fanoutExchange("Fanout_E01").durable(true).build();
}
/*交换机与队列关系*/
@Bean
public Binding fanoutBinding(){
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
/*消费者*/
@RabbitListener(queues = "Fanout_Q01")
public void receiveMessage(String msg){
System.out.println("FanoutConsumer1 消费者1 收到消息"+msg);
}
@RabbitListener(queues = "Fanout_Q01")
public void receiveMessage2(String msg){
System.out.println("FanoutConsumer2 消费者2 收到消息"+msg);
}
}
队列1 交换机1 消费者1 与队列2 交换机1 消费者2
@Configuration
public class FanoutConsumer2 {
/*注册一个队列*/
@Bean
public Queue fanoutQueue2(){
return QueueBuilder.durable("Fanout_Q02").maxLength(100).build();
}
/*交换机*/
@Bean
public FanoutExchange fanoutExchange2(){
return ExchangeBuilder.fanoutExchange("Fanout_E01").durable(true).build();
}
/*交换机与队列关系*/
@Bean
public Binding fanoutBinding2(){
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange2());
}
@RabbitListener(queues = "Fanout_Q02")
public void receiveMessage2(String msg){
System.out.println("FanoutConsumer2 消费者2 收到消息"+msg);
}
}
执行:
@SpringBootTest
class FanoutTests {
@Autowired
private FanoutProvider fanoutProvider;
@Test
void contextLoads() throws IOException {
for (int i = 0; i < 5; i++) {
fanoutProvider.send("广播模式");
}
System.in.read();
}
}
调用发送信息方法:
@Service
public class FanoutProvider {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String msg){
rabbitTemplate.convertAndSend("Fanout_E01","",msg);
}
}
结果 每个消费者都会接收到所有的消息:
FanoutConsumer2 消费者2 收到消息广播模式
FanoutConsumer1 消费者1 收到消息广播模式
FanoutConsumer2 消费者2 收到消息广播模式
FanoutConsumer2 消费者2 收到消息广播模式
FanoutConsumer1 消费者1 收到消息广播模式
FanoutConsumer2 消费者2 收到消息广播模式
FanoutConsumer2 消费者2 收到消息广播模式
FanoutConsumer2 消费者2 收到消息广播模式
FanoutConsumer2 消费者2 收到消息广播模式
FanoutConsumer2 消费者2 收到消息广播模式
9 主题模式 TopicConsumer
主题模式是可以根据我们所设置的规则进行匹配
//@Configuration
public class TopicConsumer {
/*交换机*/
@Bean
public TopicExchange exchange(){
return ExchangeBuilder.topicExchange("t_ex01").durable(true).build();
}
/*队列*/
@Bean
public Queue queue(){
return QueueBuilder.durable("队列1").maxLength(100).build();
}
/*队列1绑定交换机1 #号匹配*/
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(exchange()).with("#");
}
/*队列2*/
@Bean
public Queue queue2(){
return QueueBuilder.durable("队列2").maxLength(100).build();
}
/*绑定队列2*/
@Bean
public Binding binding2(){
return BindingBuilder.bind(queue2()).to(exchange()).with("1.6.*");
}
@Bean
public Queue queue3() {
return QueueBuilder.durable("队列3").maxLength(100).build();
}
@Bean //绑定佩奇
public Binding binding3(){
return BindingBuilder.bind(queue3()).to(exchange()).with("1.8.*");
}
@RabbitListener(queues = "队列1")
public void consume(String msg) {
System.out.println("队列1 :"+msg);
}
@RabbitListener(queues = "队列2")
public void consume2(String msg) {
System.out.println("队列2 :"+msg);
}
@RabbitListener(queues = "队列3")
public void consume3(String msg) {
System.out.println("队列3 :"+msg);
}
}
provider
@Service
public class TopicProvider {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String routeKey, RegisterOk msg)
{
rabbitTemplate.convertAndSend("t_ex01",routeKey,msg);
}
}
@SpringBootTest
class TopicTests {
@Autowired
private TopicProvider topicProvider;
@Test
void test() throws IOException {
RegisterOk registerOk = new RegisterOk().setId(1).setNickName("Rose").setTel("123456789");
topicProvider.send("2.8.9",registerOk);
System.in.read();
}
}
@Data
@Accessors(chain = true)
public class RegisterOk implements Serializable {
private Integer id;
private String nickName;
private String tel;
}
死信队列:
死信是消息在特定一种场景下的表现形式
1.消息被拒绝访问
2.消费者发生异常 超过重试次数
3. 消息的Expiration 过期时长或队列TTL的过期时间
4.超过消息队列的最大容量
关于死信队列 在mq中往往不会单独存在需要绑定一个普通队列
当所绑定的交换机中有消息变成了死信 那么这个消息就会被死信交换机路由到指定的死心队列中 我们可以通过这个死信队列进行监控 进行人工干预
@Bean //死信交换机
public DirectExchange deadExchange() {
return ExchangeBuilder.directExchange("dead_ex").durable(true).build();
}
@Bean //死信队列
public Queue deadQueue() {
return QueueBuilder.durable("dead_ordering_ok_wms").build();
}
@Bean //绑定死信队列与死信交换机的关系
public Binding bindingDead(){
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead_ordering_ok_wms");
}
@Bean //业务交换机
public FanoutExchange exchange() {
return ExchangeBuilder.fanoutExchange("ordering_ok").durable(true).build();
}
@Bean //业务队列
public Queue queue() {
return QueueBuilder
.durable("ordering_ok_wms")
.deadLetterExchange("dead_ex")
.deadLetterRoutingKey("dead_ordering_ok_wms")
//.ttl(20*1000) //该属性是队列的属性,设置消息的过期时间,消息在队列里面停留时间n毫秒后,就会把这个消息投递到死信交换机,针对的是所有的消息
//.maxLength(5) //设置队列存放消息的最大个数,x-max-length属性值,当队列里面消息超过20,会把队列之前的消息依次放进死信队列
.build();
}
@Bean //业务绑定队列与交换机的关系
public Binding binding(){
return BindingBuilder.bind(queue()).to(exchange());
}
// @RabbitListener(queues = "ordering_ok_wms")
public void consume(OrderingOk msg) throws IOException {
log.debug("wms处理订单->{}",msg);
int i = 1/0;
}
自动应答死信配置:
#-------------MQ 高级配置---------
#预抓取数量
spring.rabbitmq.listener.simple.prefetch=250
#设置消费者手动应答模式
spring.rabbitmq.listener.simple.acknowledge-mode = auto
#开启自动应答重试机制
spring.rabbitmq.listener.simple.retry.enabled=true
#默认重试3次
spring.rabbitmq.listener.simple.retry.max-attempts=3
#重试间隔时间 单位ms
spring.rabbitmq.listener.simple.retry.initial-interval=1000ms
#时间间隔倍数,默认是1倍
spring.rabbitmq.listener.simple.retry.multiplier=2
#最大间隔时间
spring.rabbitmq.listener.simple.retry.max-interval=5000ms
10. 延迟队列
使用rabbitmq的延时队列插件
,实现同一个队列中有多个不同超时时间的消息,并按时间超时顺序出队 MQ 的版本是 3.10.0 现在去 GitHub 上根据版本号下载插件
//Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub
安装插件并启用
docker cp /opt/rabbitmq/rabbitmq_delayed_message_exchange-3.10.0.ez rabbitmq:/plugins
进入 Docker 容器
docker exec -it rabbitmq /bin/bash
在plugins内启用插件
#先执行,解除防火墙限制,增加文件权限
umask 0022
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
退出容器
exit
重启 RabbitMQ
docker restart rabbitmq
通过UI查看
延迟队列的使用
消费者
@Configuration
@Slf4j
public class DlelayConsumer {
@Bean //定义延迟交换机
public CustomExchange exchange() {
//arguments ,指定消息到期后,以什么方式投递到队列
Map<String, Object> arguments = Collections.singletonMap("x-delayed-type", "fanout");
CustomExchange exchange = new CustomExchange("ordering_ok", "x-delayed-message", true, false,arguments);
return exchange ;
}
@Bean //业务队列
public Queue queue() {
return QueueBuilder
.durable("ordering_ok_wms")
.build();
}
@Bean //业务绑定队列与交换机的关系
public Binding binding(){
return BindingBuilder.bind(queue()).to(exchange()).with("").noargs();
}
@RabbitListener(queues = "ordering_ok_wms")
public void consume(OrderingOk msg) throws IOException {
log.debug("wms处理订单->{}",msg);
}
}
生产者:
@Service
public class DelayProvider {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send( OrderingOk msg)
{
System.out.println(msg);
rabbitTemplate.convertAndSend("ordering_ok","",msg, message -> {
Long id = msg.getId();
int delay = 0;
switch (id.intValue()){
case 1:
delay = 50*1000;
break;
case 2:
delay = 40*1000;
break;
case 3:
delay = 30*1000;
break;
case 4:
delay = 20*1000;
break;
case 5:
delay = 10*1000;
break;
}
message.getMessageProperties().setDelay(delay); //延迟时间
//设置消息持久化,默认是PERSISTENT
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
});
}
}
版权归原作者 小小康(●ˇ∀ˇ●) 所有, 如有侵权,请联系我们删除。