0


【系统开发】尚硅谷 - 谷粒商城项目笔记(七):消息队列

文章目录


消息队列

概述

image-20220210152321702

image-20220210152336379

image-20220210153053885

两大种类

image-20220130193309249

RabbitMQ

在这里插入图片描述

image-20220130194025240

安装及基操

Docker中安装

image-20220210161046651

**下载镜像:

docker pull rabbitmq:management

**

创建实例并启动:

docker run -d --name rabbitmq --publish 5671:5671 \
--publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672 \
rabbitmq:management
  • 4369 – erlang发现口
  • 5672 --client端通信口
  • 15672 – 管理界面ui端口
  • 25672 – server间内部通信口

在web浏览器中输入地址:http://服务器ip:15672/

输入默认账号: guest : guest

image-20220210162234117

overview:概览

connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况

channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。

Exchanges:交换机,用来实现消息的路由

Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。

端口

5672: rabbitMq的编程语言客户端连接端口

15672:rabbitMq管理界面端口

25672:rabbitMq集群的端口

添加用户

如果不使用guest,我们也可以自己创建一个用户:

img

1、 超级管理员(administrator)

可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。

2、 监控者(monitoring)

可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

3、 策略制定者(policymaker)

可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。

4、 普通管理者(management)

仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。

5、 其他

无法登陆管理控制台,通常就是普通的生产者和消费者。

创建Virtual Hosts

虚拟主机:类似于mysql中的database。他们都是以“/”开头

img

设置权限

img

img

img

添加交换机

image-20220210164256701

image-20220210164319979

创建队列

image-20220210164502957

交换机绑定队列

image-20220210164559367

image-20220210164724392

绑定成功

image-20220210164744460

五种消息模型

RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此不予学习。那么也就剩下5种。

但是其实3、4、5这三种都属于订阅模型,只不过进行路由的方式不同。

img

  1. 点对点
  2. 多人监听
  3. 主题模式
  4. 字符串精确匹配传输消息的广播
  5. 字符串支持通配符匹配传输消息的广播

SpringBoot整合MQ

引入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

properties配置

# ip
spring.rabbitmq.host=192.168.11.130
# 端口
spring.rabbitmq.port=5672
# virtualHost
spring.rabbitmq.virtual-host=/
# 开启发送端消息抵达Broker确认
spring.rabbitmq.publisher-confirms=true
# 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
# 只要消息抵达Queue,就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatory=true
# 手动ack消息,不使用默认的消费端确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual

开启RabbitMQ

// 开启RabbitMQ支持@EnableRabbit@SpringBootApplicationpublicclassApplication{publicstaticvoidmain(String[] args){SpringApplication.run(Application.class, args);}}

API使用

创建交换机

image-20220210171016097

创建队列

image-20220210171256354

交换机绑定队列

image-20220210171556719

发送消息

image-20220210171959425

用JSON发送含有对象的消息,需要自定义配置类

@ConfigurationpublicclassMyRabbitConfig{@BeanpublicMessageConvertermessageConverter(){returnnewJackson2JsonMessageConverter();}}

image-20220210172435362

接收消息

image-20220210173227353

image-20220210174110149

消息确认机制

image-20220210174326073

image-20220210174702392

image-20220210191109546

image-20220210191134176

RabbitMQ配置类

@ConfigurationpublicclassMyRabbitConfig{privateRabbitTemplate rabbitTemplate;@Primary@BeanpublicRabbitTemplaterabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate(connectionFactory);this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setMessageConverter(messageConverter());initRabbitTemplate();return rabbitTemplate;}@BeanpublicMessageConvertermessageConverter(){returnnewJackson2JsonMessageConverter();}/**
     * 定制RabbitTemplate
     * 1、服务收到消息就会回调
     *      1、spring.rabbitmq.publisher-confirms: true
     *      2、设置确认回调
     * 2、消息正确抵达队列就会进行回调
     *      1、spring.rabbitmq.publisher-returns: true
     *         spring.rabbitmq.template.mandatory: true
     *      2、设置确认回调ReturnCallback
     *
     * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
     *
     */// @PostConstruct  //MyRabbitConfig对象创建完成以后,执行这个方法publicvoidinitRabbitTemplate(){/**
         * 1、只要消息抵达Broker就ack=true
         * correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
         * ack:消息是否成功收到
         * cause:失败的原因
         *///设置确认回调
        rabbitTemplate.setConfirmCallback((correlationData,ack,cause)->{System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");});/**
         * 只要消息没有投递给指定的队列,就触发这个失败回调
         * message:投递失败的消息详细信息
         * replyCode:回复的状态码
         * replyText:回复的文本内容
         * exchange:当时这个消息发给哪个交换机
         * routingKey:当时这个消息用哪个路邮键
         */
        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey)->{System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]"+"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");});}}

延时队列

相当于定时任务

image-20220302194504015

image-20220302194808124

image-20220302195209042

image-20220302200139566

@Configuration@Slf4jpublicclassMyMqConfig{/* 容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ)不存在的情况下 *//* RabbitMq里面如果有,即使属性发生变化,也不会覆盖
    /*
     * @Description 延时消息队列-死信队列
     * @Author WSKH
     */@BeanpublicQueueorderDelayQueue(){/*
            Queue(String name,  队列名字
            boolean durable,  是否持久化
            boolean exclusive,  是否排他
            boolean autoDelete, 是否自动删除
            Map<String, Object> arguments) 属性
         */HashMap<String,Object> arguments =newHashMap<>();
        arguments.put("x-dead-letter-exchange","order-event-exchange");
        arguments.put("x-dead-letter-routing-key","order.release.order");// 消息过期时间 1分钟
        arguments.put("x-message-ttl",60000);returnnewQueue(MqConstants.ORDER_DELAY_QUEUE,true,false,false, arguments);}/**
     * @Description 普通队列-延时队列死亡后,消息经过exchange送给普通队列,接收者负责删除订单
     * @Author WSKH
     */@BeanpublicQueueorderReleaseQueue(){returnnewQueue(MqConstants.ORDER_RELEASE_ORDER_QUEUE,true,false,false);}/**
    * @Description 创建order交换机
    * @Author WSKH
    */@BeanpublicExchangeorderEventExchange(){/*
         *   String name,
         *   boolean durable,
         *   boolean autoDelete,
         *   Map<String, Object> arguments
         * */returnnewTopicExchange(MqConstants.ORDER_EVENT_EXCHANGE,true,false);}/**
     * @Description 延时队列和order交换机的绑定(创建订单)
     * @Author WSKH
     */@BeanpublicBindingorderCreateBinding(){/*
         * String destination, 目的地(队列名或者交换机名字)
         * DestinationType destinationType, 目的地类型(Queue、Exhcange)
         * String exchange,
         * String routingKey,
         * Map<String, Object> arguments
         * */returnnewBinding(MqConstants.ORDER_DELAY_QUEUE,Binding.DestinationType.QUEUE,MqConstants.ORDER_EVENT_EXCHANGE,"order.create.order",null);}/**
     * @Description 普通队列和order交换机的绑定(负责释放到时间还没有支付的订单)
     * @Author WSKH
     */@BeanpublicBindingorderReleaseBinding(){returnnewBinding(MqConstants.ORDER_RELEASE_ORDER_QUEUE,Binding.DestinationType.QUEUE,MqConstants.ORDER_EVENT_EXCHANGE,"order.release.order",null);}}

本文转载自: https://blog.csdn.net/weixin_51545953/article/details/131305188
版权归原作者 WSKH0929 所有, 如有侵权,请联系我们删除。

“【系统开发】尚硅谷 - 谷粒商城项目笔记(七):消息队列”的评论:

还没有评论