0


RabbitMQ系列【16】AmqpTemplate接口详解

有道无术,术尚可求,有术无道,止于术。

文章目录

前言

RabbitTemplate

spring-amqp

提供的一个

 RabbitMQ

消息操作模板类,在之前我们使用它完成了简单的消息发送。

RabbitTemplate 

主要提供了发送消息、接收消息以及其他附加功能,内部封装了

RabbitMQ

原生

API

,大大简化了使用

 RabbitMQ

操作。

RabbitTemplate

主要实现了

AmqpTemplate

RabbitOperations

接口:
在这里插入图片描述

AmqpTemplate

AmqpTemplate

接口主要声明了三类方法:

publicinterfaceAmqpTemplate{// 发送消息voidsend(Message var1)throwsAmqpException;// 接收消息Messagereceive()throwsAmqpException;// 发送消息并接收回复MessagesendAndReceive(Message var1)throwsAmqpException;}

API

首先,我们看下

AmqpTemplate

中声明的各种方法。

send

send 

方法一共有三个,需要创建

Message

消息对象,将消息封装到该对象内发送,如果没有指定交换机、路由键,将使用默认值,也就是空字符串。

// 发送消息到默认交换机、默认路由KEYvoidsend(Message message)throwsAmqpException;// 发送消息到默认交换机、使用指定路由KEYvoidsend(String routingKey,Message message)throwsAmqpException;// 发送消息到指定交换机、使用指定路由KEYvoidsend(String exchange,String routingKey,Message message)throwsAmqpException;

示例:

Message message =newMessage("消息".getBytes());
        rabbitTemplate.send(message);
        rabbitTemplate.send("route.key", message);
        rabbitTemplate.send("exchange_name","route.key", message);

convertAndSend

convertAndSend

方法可以转换对象并发送,并可以添加一个消息处理器

MessagePostProcessor 

// 将Java对象转换为Amqp{@link Message}并将其发送到默认交换机、使用默认路由KEYvoidconvertAndSend(Object message)throwsAmqpException;// 将Java对象转换为Amqp{@link Message}并将其发送到默认交换机、使用自定义路由KEYvoidconvertAndSend(String routingKey,Object message)throwsAmqpException;// 将Java对象转换为Amqp{@link Message}并将其发送到自定义交换机、使用自定义路由KEYvoidconvertAndSend(String exchange,String routingKey,Object message)throwsAmqpException;// 将Java对象转换为Amqp{@link Message}并将其发送到自定义交换机、使用自定义路由KEY// 在发送消息之前添加一个消息处理器MessagePostProcessor voidconvertAndSend(Object message,MessagePostProcessor messagePostProcessor)throwsAmqpException;// 将Java对象转换为Amqp{@link Message}并将其发送到默认交换机、使用自定义路由KEY// 在发送消息之前添加一个消息处理器MessagePostProcessor voidconvertAndSend(String routingKey,Object message,MessagePostProcessor messagePostProcessor)throwsAmqpException;// 将Java对象转换为Amqp{@link Message}并将其发送到自定义交换机、使用自定义路由KEY// 在发送消息之前添加一个消息处理器MessagePostProcessorvoidconvertAndSend(String exchange,String routingKey,Object message,MessagePostProcessor messagePostProcessor)throwsAmqpException;
MessagePostProcessor 

是一个函数型接口,提供了一个

postProcessMessage

方法处理消息,由于直接发送的是对象,如果需要设置一些消息的属性,就需要使用该接口进行设置,例如:

MessagePostProcessor messagePostProcessor = message1 ->{MessageProperties messageProperties = message1.getMessageProperties();
            messageProperties.setExpiration("1000");return message1;};
        rabbitTemplate.convertAndSend("","","消息",messagePostProcessor);

receive

一般获取消息有两种处理模式:

  • push:由RabbitMQ主动将消息推送给订阅队列的消费者,调用channel.basicConsume方法。
  • pull:主动从指定队列中拉取消息,需要消费者调用channel.basicGet方法。
receive 

方法,就是从主动队列中获取消息。

// 如果默认队列中有消息,则接收消息。立即返回,可能有NULL值@NullableMessagereceive()throwsAmqpException;// 从指定队列中获取消息。立即返回,可能有NULL值@NullableMessagereceive(String queueName)throwsAmqpException;// 如果默认队列中有消息,则接收消息。可能有NULL值,并指定一个超时时间@NullableMessagereceive(long timeoutMillis)throwsAmqpException;// 从指定队列中获取消息。可能有NULL值,并指定一个超时时间@NullableMessagereceive(String queueName,long timeoutMillis)throwsAmqpException;

receiveAndConvert

receiveAndConvert

可以拉取消息并进行对象转换。

// 如果默认队列中有消息,则接收消息并将其转换为Java对象。立即返回,可能为null 值。@NullableObjectreceiveAndConvert()throwsAmqpException;// 从指定队列中接收消息并将其转换为Java对象。立即返回,可能为null 值。@NullableObjectreceiveAndConvert(String queueName)throwsAmqpException;// 如果默认队列中有消息,则接收消息并将其转换为Java对象。立即返回,可能有NULL值,并指定一个超时时间@NullableObjectreceiveAndConvert(long timeoutMillis)throwsAmqpException;// 从指定队列中接收消息并将其转换为Java对象,并指定一个超时时间,可能为null 值。@NullableObjectreceiveAndConvert(String queueName,long timeoutMillis)throwsAmqpException;//  如果默认队列中有消息,则接收消息并将其转换为Java对象。立即返回,可能为null 值。并可以添加一个消息转换器SmartMessageConverter。@Nullable<T>TreceiveAndConvert(ParameterizedTypeReference<T> type)throwsAmqpException;// 从指定队列中接收消息并将其转换为Java对象。立即返回,可能为null 值。并可以添加一个消息转换器SmartMessageConverter。@Nullable<T>TreceiveAndConvert(String queueName,ParameterizedTypeReference<T> type)throwsAmqpException;// 如果默认队列中有消息,则接收消息并将其转换为Java对象。可能有NULL值,并指定一个超时时间及消息转换器SmartMessageConverter。@Nullable<T>TreceiveAndConvert(long timeoutMillis,ParameterizedTypeReference<T> type)throwsAmqpException;// 从指定队列中接收消息并将其转换为Java对象。可能为null 值。并指定一个超时时间及消息转换器SmartMessageConverter。@Nullable<T>TreceiveAndConvert(String queueName,long timeoutMillis,ParameterizedTypeReference<T> type)throwsAmqpException;

示例:

// 接收消息,队列存在是,会报错;队列中没有消息,返回NULLMessage bootQueue1= rabbitTemplate.receive("bizQueue");Message bootQueue2 = rabbitTemplate.receive("bizQueue",1000);Message bootQueue3= rabbitTemplate.receive("backupQueue");

使用消息转换器,可以直接发送、接收对象:

// User 需要实现SerializableUser user =newUser();
        user.setName("张三");
        rabbitTemplate.setMessageConverter(newJackson2JsonMessageConverter());
        rabbitTemplate.convertAndSend(MqBizConfig.BIZ_EXCHANGE,MqBizConfig.BIZ_ROUTE_KEY,user);User receiveUser = rabbitTemplate.receiveAndConvert("bizQueue",newParameterizedTypeReference<User>(){});

receiveAndReply

receiveAndReply

支持在获取消息时传入一个回调函数

ReceiveAndReplyCallback

,处理接收到消息和回复消息的业务逻辑。

receiveAndReply

应用于

RPC

模式

Server

端,

Server

收到消息,并回复消息给客户端:
在这里插入图片描述
该模式用的比较少,实现起来也比较麻烦,这里就不演示了。

// 收到消息并回复,R:接收到的消息 S: 返回的消息<R,S>booleanreceiveAndReply(ReceiveAndReplyCallback<R,S> callback)throwsAmqpException;<R,S>booleanreceiveAndReply(String queueName,ReceiveAndReplyCallback<R,S> callback)throwsAmqpException;<R,S>booleanreceiveAndReply(ReceiveAndReplyCallback<R,S> callback,String replyExchange,String replyRoutingKey)throwsAmqpException;<R,S>booleanreceiveAndReply(String queueName,ReceiveAndReplyCallback<R,S> callback,String replyExchange,String replyRoutingKey)throwsAmqpException;<R,S>booleanreceiveAndReply(ReceiveAndReplyCallback<R,S> callback,ReplyToAddressCallback<S> replyToAddressCallback)throwsAmqpException;<R,S>booleanreceiveAndReply(String queueName,ReceiveAndReplyCallback<R,S> callback,ReplyToAddressCallback<S> replyToAddressCallback)throwsAmqpException;

sendAndReceive

sendAndReceive

也属于

RPC

模式,发送消息并接收回复消息,属于

Client

端。

@NullableMessagesendAndReceive(Message message)throwsAmqpException;@NullableMessagesendAndReceive(String routingKey,Message message)throwsAmqpException;@NullableMessagesendAndReceive(String exchange,String routingKey,Message message)throwsAmqpException;

convertSendAndReceive

convertSendAndReceive

以及

convertSendAndReceiveAsType

是对

sendAndReceive

的扩展,可以直接发送对象消息,并可以设置类型转换器:

@NullableObjectconvertSendAndReceive(Object message)throwsAmqpException;@NullableObjectconvertSendAndReceive(String routingKey,Object message)throwsAmqpException;@NullableObjectconvertSendAndReceive(String exchange,String routingKey,Object message)throwsAmqpException;@NullableObjectconvertSendAndReceive(Object message,MessagePostProcessor messagePostProcessor)throwsAmqpException;@NullableObjectconvertSendAndReceive(String routingKey,Object message,MessagePostProcessor messagePostProcessor)throwsAmqpException;@NullableObjectconvertSendAndReceive(String exchange,String routingKey,Object message,MessagePostProcessor messagePostProcessor)throwsAmqpException;@Nullable<T>TconvertSendAndReceiveAsType(Object message,ParameterizedTypeReference<T> responseType)throwsAmqpException;@Nullable<T>TconvertSendAndReceiveAsType(String routingKey,Object message,ParameterizedTypeReference<T> responseType)throwsAmqpException;@Nullable<T>TconvertSendAndReceiveAsType(String exchange,String routingKey,Object message,ParameterizedTypeReference<T> responseType)throwsAmqpException;@Nullable<T>TconvertSendAndReceiveAsType(Object message,MessagePostProcessor messagePostProcessor,ParameterizedTypeReference<T> responseType)throwsAmqpException;@Nullable<T>TconvertSendAndReceiveAsType(String routingKey,Object message,MessagePostProcessor messagePostProcessor,ParameterizedTypeReference<T> responseType)throwsAmqpException;@Nullable<T>TconvertSendAndReceiveAsType(String exchange,String routingKey,Object message,MessagePostProcessor messagePostProcessor,ParameterizedTypeReference<T> responseType)throwsAmqpException;

本文转载自: https://blog.csdn.net/qq_43437874/article/details/128057272
版权归原作者 云烟成雨TD 所有, 如有侵权,请联系我们删除。

“RabbitMQ系列【16】AmqpTemplate接口详解”的评论:

还没有评论