0


RabbitMq------初级+高级(一)

总括执行流程:生产根据routingkey发送给交换机,而交换机通过routingkey来寻找对应的bindingkey从而把消息发送到队列里面去。

如果只有队列没有交换机的话,routingkey也就是队列名!

一、队列(Queue)

队列,也就是存放消息的主要位置。AMQP里面提供了只属于RabbitMq的Queue

1-1、new方式创建:

我们来看以下Queue的构造方法里面有那些参数:

(1)queue——队列名字
(2)durable——持久化(true持久化,false非持久化)

(3)exclusive——是否排外(true 排外,false不排外)
(4)autoDelete——是否自动删除(消费者断开之后:true删除,false不删除)
(5)其他条件:红框为经常使用!就是map里面的数据,直接用键值对的形式来注入信息。

当然也有其他方式来创建Queue:

1-2、使用创建类QueueBuilder

QueueBuilder.builld——Queue的另一种创建方式,并且相比new queue就是更加的细致化,快捷一点。

我们来看一下QueueBuilder里面的代码:

(1)是否持久化以及队列名称(使用静态方法)

二、交换机(Exchange)

交换机:我们通俗理解就是一个分发机。也就是用来把消息分发给对应的队列里面去。

2-1.使用new FanoutExchange/DirectExchange/TopicExchange()

(1)name——交换机名。
(2)durable——持久化(true持久化,false非持久化)
(3)autoDelete——是否自动删除(没有队列与交换机绑定之后:true删除,false不删除)
(4) 其他条件:后面添加!

2-2、使用ExchangeBuilder.build

三、绑定(Binding)——使用BindingBuilder里面静态方法。

3-1、FanoutExchange——广播交换机

bind对应的new queue创建的队列对象,而to就是对应的交换机对象。因为这是广播交换机,没有涉及到bindingkey这个数值。

3-2、DirectExchange——直连交换机

这里就是与 FanoutExchange不同的点就是可以顺便定义routingkey也就是bindingkey的这个数值.

3-3、TopicExchange——主题交换机(routingkey可以使用通配符)

四、消息(Message)

4-1使用new的方式去创建

MessageProperties的作用:


  public class MessageProperties implements Serializable {

    private static final int INT_MASK = 32;
    private static final long serialVersionUID = 1619000546531112290L;
    public static final String CONTENT_TYPE_BYTES = "application/octet-stream";
    public static final String CONTENT_TYPE_TEXT_PLAIN = "text/plain";
    public static final String CONTENT_TYPE_SERIALIZED_OBJECT = "application/x-java-serialized-object";
    public static final String CONTENT_TYPE_JSON = "application/json";
    public static final String CONTENT_TYPE_JSON_ALT = "text/x-json";
    public static final String CONTENT_TYPE_XML = "application/xml";
    public static final String SPRING_BATCH_FORMAT = "springBatchFormat";
    public static final String BATCH_FORMAT_LENGTH_HEADER4 = "lengthHeader4";
    public static final String SPRING_AUTO_DECOMPRESS = "springAutoDecompress";
    public static final String X_DELAY = "x-delay";
    public static final String DEFAULT_CONTENT_TYPE = CONTENT_TYPE_BYTES;
    public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
    public static final Integer DEFAULT_PRIORITY = 0;
    private final Map<String, Object> headers = new HashMap<>();
    
    private Date timestamp;          //消息的发送时间
    private String messageId;        //消息id
    private String userId;          //用户id
    private String appId;           //应用程序id
    private String clusterId;       //集群id
    private String type;            //类型
    private String correlationId;    //关联id
    private String replyTo;         //用于指定回复队列的名称
    private String contentType = DEFAULT_CONTENT_TYPE;
    private String contentEncoding;  //消息内容的编码格式
    private long contentLength;      //消息内容的长度
    private boolean contentLengthSet;   //是否限制消息内容的长度
    private MessageDeliveryMode deliveryMode = DEFAULT_DELIVERY_MODE;
    private String expiration;    //消息的失效时间
    private Integer priority = DEFAULT_PRIORITY;
    private Boolean redelivered;           //是否重新提交消息
    private String receivedExchange;       //接收消息的交换机
    private String receivedRoutingKey;     //接收消息的交换机
    private String receivedUserId;        //接收消息的用户id 
    private long deliveryTag;             //消息的编号
    private boolean deliveryTagSet;       //是否设置消息编码
    private Integer messageCount;        //消息的数量
    private String consumerTag;          //指定哪个消费者消费
    private String consumerQueue;       //指定消费此消息的队列
    private Integer receivedDelay;      //延迟发送
    private MessageDeliveryMode receivedDeliveryMode;  //消息是否持久化
    

我们之需要最后这个参数MessageDeliverMode,这个是做消息是否持久化的。

4-2、使用MessgaeBuilder

这里涉及到一个点就是消息id的问题,这里使用到了uuid,注意uuid是一个唯一生成的唯一数值。而消息也是有id值的。因为当拿去的时候也是会通过id来进行幂等性!

注意:

我们可以通过rabbitTemplate封装类的convertAndSend里面的方法可以知道可以使用

五、RabbitAdmin:

RabbitAdmin是RabbitMq用来对Mq里面创建队列以及虚拟机的封装工具类,他可以进行自动创建队列以及虚拟机,以及手动创建!

(1)自动创建:

RabbitAdmin里面有一个initialize()方法(是因为RabbitAdmin实现了InitializingBean接口)——这是一个初始化方法。也就是在生成RabbitAdmin的时候就会执行该操作。

把spring容器里面的交换机,队列,binding的bean全部拿出来

然后进行在rabbitmq系统里面去创建:

所以我们只需要定义一个类:

此时里面的所有虚拟机,队列以及binding就会被自动创建在rabbitmq服务器里面去。

(2)手动创建:

直接使用RabbitAdmin来调用方法:

案例:

注意:

一个细节,就是RabbitAdmin是需要注入spring容器里面才会执行初始化方法的!

此时,生产者相比消费者需要去创建一个@Bean来记录RabbitAdmin,在启动类里面创建。

因为消费者在调用@RabbitListener的时候,会触发SimpleMessageListenerContainer对RabbitAdmin进行初始化注入到spring容器里面去。

六、RabbitTemplate:

是spring-boot-amqp提供的一个封装类属性。主要用来发送消息给rabbitmq。

七、消息的可靠性:

因为在使用rabbit的过程中,可能会出现以下这些情况,导致消息丢失或者消息发送不了。并且在但队列或者交换机不存在的时候,我们直接使用rabbitTemplate在发送消息给队列与交换机是,是不会提示报错信息的。所以我们需要进行消息可靠性的操作。

故障情景:

7-1、发送时丢失:

(1)、生产者消息未送达到exchange。
(2)、消息送达到exchange但是未到队列。
7-2、MQ宕机:

交换机与队列如果不做持久化的话,是会丢失的。

queue消息会丢失,并且无法发送消息,以及无法接收消息。

7-3、消费者接收到消息之后,还没有消费就宕机:

解决方案:

(7.1)生产者确认机制(publisher confirm/return 机制):
主要解决——发送时丢失!
原理:

其实也就是生产者在发送给mq的之后,会给消息指定一个唯一id值,然后如果发送成功,会返回一个结果给生产者!

ack是判断是否已经发送给交换机,而要判断是否发送到队列要看失败原因

java层实现(在生产者层的配置文件进行配置):

这里涉及到两个方法:一个是ConfirmCallback,另一个是ReturnCallback

1、ConfirmCallback(交换机回调)

这里涉及到一个新的api:CorrelationData,他是一个实现ConfirmCallback接口的实现类,以及他还是给消息定义唯一id的数值。我们来看以下CorrelationData的源代码:

此时我们来尝试对其进行一个书写。

因为现在one.Demo交换机并不存在,所以此时:

** 2、ReturnCallback(队列回调)**

RabbitTemplate里面只有一个ReturnCallback方法,与ConfirmCallback不同,ConfirmCallback根据业务场景不同而不同的。

而创建ReturnCallback方法有两种:

1、实现ApplicationContextAware接口:

直接修改ioc容器里面的rabbitmqtemplate的bean

2、自定义rabbitmqtemplate放入ioc容器里面去:

(7.2)持久化():
主要解决——MQ宕机!

看上面做持久化的操作。

(7.3)消费者消息确认():

主要解决——消费者接收消息!

与生产者确认机制有点相识,也是通过ack来判断消息是否被发送到了消费者(并不是消费了喔。)

对于如何发送ack,springamqp对于消费者的配置有三种方式去进行:

1、nono模式:

2、auto模式:

因为mq对于nack的操作是比较迅速的,所以需要进行断电来观察情况。

而默认情况下,消费者调用失败之后,消息会重新进入requeue队列里面去,然后重新发送给消费者去进行消费(失败重试机制)因为本来消费者里面的方法的方法体本来就是错误的,就会不断的报错。

那我们如何停止这种情况咧?

7-3-2-1、本地重试:

也就是如果进入了失败重试机制之后,先进行我们本地定义重试机制,去规定只重试多少次,以及等待时长之类的信息。

需要注意的是: 当失败超过4次之后,就会抛出异常AmqpRejectAndDontRequeueException,也就不再重试了。此时消费者会把ack返回给mq,让mq清除消息。

注意:此时的重试并不会进入到mqrequeue队列里面去,而是消耗本地重试。

7-3-2-2、失败策略:

那我们有没有办法让其不要抛出异常,而是执行其他的操作。

(1)全局异常处理器:

可以使用springboot里面的全局异常处理器 @RestControllerAdvice来对当前的异常进行处理。

(2)实现MessageRecovery接口:

三种实现方式:

RejectAndDontRequeueRecoverer:重试耗尽之后,直接reject,丢弃消息。默认就是这种方式。

ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

其实与修改消息转换器的原理有点相识。都是创建实例,然后翻入ioc容器里面去。

八、死信机制:

我们来假设以下,就是我们以往丢弃掉的消息,假设我们想把其保存起来,也就是做一个数据备份操作的时候,我们就需要使用死信机制。

死信:在原本队列上已经没有使用功能的信息。

执行流程图:

死信交换机:本质是一个DirectExchange

死信队列:本质就是一个Queue。

注意了:死信交换机与死信队列的绑定和之前的交换机与队列绑定是一样的。

但是如果是一个队列添加死信机制的话(simple.queue添加dl.direct)的话,不是基于binding的。而是在创建队列的时候进行配置(也就是类似做持久化的操作)

其实原理就是在队列的基础上把消息根据routingkey发送给死信交换机以及死信队列!

创建死信交换机以及死信队列,并建立binding:

交换机与队列都持久化了!

添加死信机制:

1、基于QueueBuilder:

2、new Queue:

九、TTL(对消息进行设置存活超时时间!——超时死信机制):

我们知道死信除了失败重试机制结束后还没有发送的消息,还有一种就是在队列里面的超时没有消费的消息!

注意了:这里对消息进行存活时间的判断,有两种方式去进行:

9-1、修改队列参数:

第一个是通过队列来对消息进行时间管控的。也就是在队列里面添加消息时间管控机制。(并非对消息进行非持久化操作。

1、基于QueueBuilder:

2、基于new Queue:

9-2、修改信息参数:

1、基于MessagesBuilder:

而这种方式消息超时变成死信的过程是不一样的:

队列的话是进入队列后,而消息的话是发送到队列接收消息加上在队列后存放的时间。

问:假设队列与消息都设置了超时死信的时间,以那个为准?

答:以最短时间为准!


本文转载自: https://blog.csdn.net/weixin_54406708/article/details/132000486
版权归原作者 秃头小秃鹫 所有, 如有侵权,请联系我们删除。

“RabbitMq------初级+高级(一)”的评论:

还没有评论