文章目录
AMQP核心概念
概念
Advanced Message Queueing Protocol 高级消息队列协议
协议模型
- Server: 又称Broker,接收客户端连接,实现AMQP实体服务
- Connection: 连接,应用程序与Broker的网络连接
- Channel: 网络信道,几乎所有操作都在Channel中进行,是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务
- Message: 消息,服务器和应用程序之间传送的数据,由Properties和Body组成。- Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;- Body则是消息体内容
- Virtual host: 虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个Virtual host里面可以有若干个Exchange和Queue,同一个 Virtual host不能有相同名称的Exchange和Queue
- Exchange: 交换机,接收消息,根据路由键转发消息到绑定的队列
- Binding: Exchange和Queue的虚拟连接,binding中可以包含 routing key , 队列必须有绑定的交换机
- Routing key : 一个路由规则
- Queue: 消息队列,保存消息并将其转发给消费者
整体架构
传递的信息:
- 生产者只需要把消息投递到指定的交换机 。不需要关系发给哪个队列和消费者
- 消费者只需要监听从哪个队列获取消息。不需要关心消息从哪个交换机来的。
- 生产者和消费者是解耦合状态:因此需要 exchange 和 queue建立绑定关系
消息流转图
传递的信息:
- 消息发送端需要指定 exchange交换机
- 一个交换机可以绑定多个队列
- 需要指定消息发送的 routing key 保证消息发送到指定的队列
RabbitMQ
是一个开源的消息代理和队列服务器,是用Erlang语言编写的,基于AMQP协议
- Erlang语言使得RabbitMq性能很高,有着原生Socket一样的延迟
命令行
基础操作
- rabbitmqctl stop_app : 关闭应用
- rabbitmqctl start_app : 启动应用
- rabbitmqctl status : 节点状态
用户
- rabbbitmqctl add_user [username] [password] : 添加用户
- rabbbitmqctl list_users: 列出所有用户
- rabbbitmqctl delete_user [username] : 删除用户
- rabbbitmqctl clear_permissions -p [vhostpath] [username] : 清除用户权限
- rabbbitmqctl list_user_permissions [username] : 列出用户权限
- rabbbitmqctl change_password [username] [newpassword] : 修改密码
- rabbbitmqctl set_permissions -p [vhostpath] [username] “.*” “.*” … :设置用户权限
组件-虚拟主机
- rabbbitmqctl add_vhost vhostpath :创建虚拟主机
- rabbbitmqctl list_vhosts :列出所有虚拟主机
- rabbbitmqctl list_permissions -p [vhostpath] : 列出虚拟主机上所有权限
- rabbbitmqctl delete_vhost [vhostpath] :删除虚拟主机
组件-队列
- rabbbitmqctl list_queues : 查看所有队列信息
- rabbbitmqctl -p [vhostpath] purge_queue blue :清除队列里的消息
插件启动–可视化界面
- rabbitmq-plugins enable rabbitmq_management 启动
- rabbitmq-plugins disable rabbitmq_management 关闭
…
高级操作
- rabbbitmqctl reset :移除所有数据
- rabbbitmqctl join_cluster <clusternode> [–ram] :组成集群命令
- rabbbitmqctl cluster_status :查看集群状态
- rabbbitmqctl change_cluster_node_type [ disc |ram] :修改集群节点存储形式
- rabbbitmqctl forget_cluster_node [–offline] :忘记节点
- rabbbitmqctl rename_cluster_node [oldnode] [newnode] :修改节点名称
java 中 的消息生产与消费
ConnectionFactory :获取连接工厂
Connection :连接
Channel :数据通信信道,可以发送和接收消息
Queue :具体的消息存储队列
Producer & Consumer :生产和消费者
快速入门项目演示:https://gitee.com/isymikasan/rabbitmq-api/tree/master/src/main/java/com/mikasa/rabbitmq/quickstart
核心概念
Exchange 交换机
接收消息,并根据路由键转发消息所绑定的队列
交换机属性
- Name :交换机名称
- Type :交换机类型 - direct- topic- fanout- headers
- Durability :是否需要持久化
- Auto Delete :当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange
- Internal :是否用于RabbitMq内部使用 ,默认false
- Arguments :扩展参数
交换机类型
Direct Exchange
所有发送到 Direct Exchange 的消息被转发到RouteKey中指定的 Queue
Topic Exchange
所有发送到 Topic Exchange 的消息被转发到所有关心RouteKey中指定的 Topic 的Queue上
Exchange 将 RouteKey 和某个 Topic 进行模糊匹配,一个队列绑定一个Topic
模糊匹配的通配符
:匹配一个或多个词
- :匹配不多不少一个词
- 例如 :- “log.#” 可以匹配到 “log.info.oa”- “log.*” 只能匹配到 “log.erro”
Fanout Exchange
- 不处理路由键,只需要简单的将队列绑定到交换机
- 发送到交换机的消息会被转发到与该交换机绑定的所有队列上
- fanout转发消息是最快的
Binding-绑定
- Exchange和Exchange、Queue之间的连接关系
- 绑定中可以包含 routing key 和其他参数
Queue-消息队列
- 消息队列,实际存储消息数据
- Durability :是否持久化 Durable-是 Transient-否
- Auto delete :最后一个监听被移除后队列自动删除
message-消息
服务器和应用程序之间传送的数据
由生产者创建
本质上是一段数据 有Properties和Paylad(Body)组成
- 常用属性:- delivery mode :2: 持久化 1:不是持久化- headers :自定义属性
- 其他属性:- content_type- content_encoding :字符集- priority- correlation_id- reply_to- expiration :过期时间ms 10s没被消费就过期了- message_id- timestamp- type user_id- app_id- cluster_id
代码示例:https://gitee.com/isymikasan/rabbitmq-api/tree/master/src/main/java/com/mikasa/rabbitmq/api/message
虚拟主机
虚拟地址,用于进行逻辑隔离,最上层的消息路由
一个Virtual host里面可以有若干个Exchange和Queue,同一个 Virtual host不能有相同名称的Exchange和Queue
消息确认机制
Confirm 确认消息
消息的确认
- 生成者投递消息后,如果Broker收到消息,则会给生产者一个应答
- 生产者接收应答,用来确认这条消息是否正常发送到Broker
确认消息的流程图
确认消息的实现
confirm
- 1.在生产端的channel上开启消息确认模式:channel.confirmSelect()
- 2.在生产端的channel上添加监听:addComfirmListener ,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、日志等后续处理
代码示例:https://gitee.com/isymikasan/rabbitmq-api/tree/master/src/main/java/com/mikasa/rabbitmq/api/confirm
return消息机制
Return Listener 用于处理一些不可路由的消息
某些情况下,如果我们在发送消息的时候,当前exchange不存在或者指定的路由key路由不到,这个时候我们需要监听这种不可达的消息,就要使用 return Listener
- 生产者发送消息的基础API中有个配置项: - Mandatory: 如果为 ture,监听器会接收到路由不可达的消息进行后续处理,为false,broker端会自动丢弃
图示:
- 在声明exchange或者routing key时有误导致消息没有找到指定的交换机这种情况
RabbitMQ 消息限流原理
RabbitMQ 的消息限流是通过 QoS(Quality of Service)机制来实现的。在 RabbitMQ 中,生产者和消费者之间的通信是基于 AMQP(Advanced Message Queuing Protocol)协议的。AMQP 协议中定义了消息确认机制(ACK)和预取值(prefetch)机制,这两个机制是实现 QoS 的核心。
消息确认机制(ACK)
在 RabbitMQ 中,消息生产者发送消息到队列后,队列会发送一个确认消息(ACK)给生产者,表示已经成功接收到该消息。生产者在收到 ACK 后才会认为该消息已经被成功处理。
如果消费者没有发送 ACK 给 RabbitMQ,RabbitMQ 会认为该消息还未被处理,会将该消息重新发送给其他消费者进行处理。如果消息在队列中存在过长时间,可能会导致队列阻塞,造成消息丢失或者重复消费。
预取值(prefetch)机制
在 RabbitMQ 中,消费者可以通过 channel.basicQos() 方法来设置预取值(prefetch)机制。预取值机制是指消费者一次从队列中获取多少个消息,并将这些消息缓存在客户端的缓存区中,等待客户端逐个处理。在缓存区中的消息数量超过一定阈值时,RabbitMQ 会停止向该消费者发送消息,直到消费者处理完缓存区中的消息后再重新开始发送。
预取值机制可以防止消费者一次性从队列中获取过多的消息,导致消费者处理不过来,造成队列阻塞。预取值机制可以在一定程度上控制消息的处理速度,避免系统因消息处理速度不够跟上消息产生速度而崩溃。
消费端
自定义监听
一般是在监听中编写while循环通过consumer.nextDelivery获取下一条消息进行消费。
使用自定义的Consumer更加方便,解耦合
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-o30WgiHA-1679235160541)(https://foruda.gitee.com/images/1679224992765938751/6f8811a4_9519077.png “屏幕截图”)]
- 继承 DefaultConsumer类重写 hanleDelivery方法
消费端限流
场景:
rabbitmq服务器上有万条未处理消息,随便打开一个消费者客户端会出现:巨量消息瞬间全部推送过来,单个客户端无法处理,服务器卡死或崩溃
qos:
Quality of Service 服务质量保证功能,在非自动确认消息的前提下(autoAck :false),如果一定数目的消息(通过基于consumer或channel设置的Qos值)未被确认前,不进行新的消费
- void BasicQos (uint prefetchSize, ushort prefetchCount, bool global) - prefetchSize :消息限制大小 0表示不限制- prefetchCount :一次最多处理多少消息- global :一个channel可以有好多consume监听,这个是设置管道级别还是消费者级别,false表示在consume级别做限制
代码示例:https://gitee.com/isymikasan/rabbitmq-api/tree/master/src/main/java/com/mikasa/rabbitmq/api/limit
消费端ACK与重回队列
ACK
消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿
如果由于服务器宕机等严重问题,我们需要手工进行ACK保障消费端消费成功
重回队列
消费端重回队列是为了对没有处理成功的消息,把消息重新投递给Broker
一般实际应用中会关闭重回队列
代码示例:https://gitee.com/isymikasan/rabbitmq-api/tree/master/src/main/java/com/mikasa/rabbitmq/api/ack
TTL(Time-To-Live)
RabbitMQ中的TTL指的是消息或队列的“存活时间”。当消息或队列的存活时间超过TTL时,它们会被自动删除。
TTL可以应用于队列和消息,这意味着您可以为队列或消息设置不同的存活时间。在实际应用中,TTL通常用于以下场景:
- 避免消息或队列一直存在,浪费存储空间和资源。
- 限制消息或队列的生命周期,确保数据的时效性和准确性。
TTL对队列的应用
- 当我们创建一个队列时,可以设置队列的TTL。这意味着当队列中的消息在规定的时间内没有被消费者消费,那么这个队列将被自动删除。
- 在RabbitMQ中,可以通过队列属性
x-expires
来设置队列的TTL。该属性的值为毫秒数。 - 以下是在Java中设置队列TTL的示例:
Map<String,Object> args =newHashMap<>();
args.put("x-expires",10000);// 设置队列TTL为10秒
channel.queueDeclare("my_queue",true,false,false, args);
TTL对消息的应用
- 当我们发送一条消息时,可以在消息属性中设置TTL,以确保消息在规定的时间内被消费者消费,否则该消息将被自动删除。
- 在RabbitMQ中,可以通过消息属性
expiration
来设置消息的TTL。该属性的值为毫秒数。 - 以下是在Java中设置消息TTL的示例:
AMQP.BasicProperties properties =newAMQP.BasicProperties.Builder().expiration("5000")// 设置消息TTL为5秒.build();channel.basicPublish(exchangeName, routingKey, properties, messageBody);
TTL对死信队列的应用
TTL还可以与死信队列(Dead Letter Queue,DLQ)结合使用。当消息的TTL过期时,消息将被发送到DLQ中,而不是直接被删除。能被用于后续发送给别的指定交换机
当消息过期时,它可能被视为“死亡消息”,并将被发送到队列的DLQ中。因此,TTL和DLQ的结合使用可以用于实现以下场景:
- 处理“过期”或“失效”的消息。
- 对于未能及时处理的消息,将其发送到一个专用的队列中,以便稍后进行处理。
- 将已经达到其生命周期的消息移动到一个专用的队列中,以便将它们存档或删除。
以下是在Java中设置队列的DLQ的示例:
Map<String,Object> args =newHashMap<>();
args.put("x-dead-letter-exchange","my_exchange");// 设置死信消息发送到的exchange
args.put("x-dead-letter-routing-key","my_queue.dead");// 设置死信消息的routing key
channel.queueDeclare("my_queue",true,false,false, args);
死信队列
DLX: Dead-Letter-Exchange
死信队列的概念
当消息无法被消费者成功消费时,这些消息就会变成死信消息。
消息变成死信的原因
- 消息被拒绝(前提是requeue=false)- basic.reject- basic.nack
- 消息TTL过期
- 消息无法路由到任何队列
- 队列达到最大长度
- 消息大小限制:当消息的大小超过了队列的限制时,消息将变成死信。
使用死信队列
在 Java 中使用死信队列,我们需要进行以下步骤:
- 首先,我们需要创建一个普通的队列,并设置其死信交换器和死信路由键:
Map<String,Object> args =newHashMap<String,Object>();
args.put("x-dead-letter-exchange","dlx.exchange");
args.put("x-dead-letter-routing-key","dlx.key");Channel channel = connection.createChannel();
channel.queueDeclare("test.queue",true,false,false, args);
在上面的代码中,我们创建了一个名为
test.queue
的队列,并设置了它的死信交换器为
dlx.exchange
,死信路由键为
dlx.key
。
- 接下来,我们需要创建一个死信交换器和一个死信队列:
channel.exchangeDeclare("dlx.exchange","direct",true);channel.queueDeclare("dlx.queue",true,false,false,null);channel.queueBind("dlx.queue","dlx.exchange","dlx.key");
在上面的代码中,我们创建了一个名为dlx.exchange
的死信交换器,类型为direct
,并创建了一个名为dlx.queue
的死信队列,并将其绑定到dlx.exchange
上,并使用路由键dlx.key
。 - 最后,我们需要在消费者中处理死信消息:
channel.basicConsume("dlx.queue",false,"my-consumer-tag",newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{// 处理死信消息 channel.basicAck(envelope.getDeliveryTag(),false);}});
在上面的代码中,我们创建了一个消费者,并将其绑定到dlx.queue
上。当消费者收到消息时,我们可以在handleDelivery
方法中对消息进行处理。在处理完成后,我们需要使用channel.basicAck
方法来确认消息。
代码示例:https://gitee.com/isymikasan/rabbitmq-api/tree/master/src/main/java/com/mikasa/rabbitmq/api/dlx
注意事项
通常情况下,死信队列需要重新处理死信消息,或将其存档或删除。
需要注意的是,RabbitMQ并不会自动将消息从队列中删除,只有当消费者成功地处理了消息后,消息才会从队列中删除。因此,当消息变成死信时,需要特别注意处理这些消息,以免占用过多的存储空间和资源。
死信队列是一种特殊的队列,用于接收所有变成死信消息的消息。一旦消息变成死信,它就会被路由到死信队列,以便我们可以对这些异常情况中的消息进行后续处理。
SpringAMQP
!Spring使用版本的区别,个别API使用可能有变化,请注意!
API在 com.mikasa.spring.RabbitMQConfig类中配合 com.mikasa.spring.ApplicationTests测试类进行演示
配合项目查看:
- 项目地址:https://gitee.com/isymikasan/rabbitmq-spring/tree/master/rabbitmq-spring
- 核心API演示地址: - 配置类:https://gitee.com/isymikasan/rabbitmq-spring/blob/master/rabbitmq-spring/src/main/java/com/mikasa/spring/RabbitMQConfig.java- 测试类:https://gitee.com/isymikasan/rabbitmq-spring/blob/master/rabbitmq-spring/src/test/java/com/mikasa/spring/ApplicationTests.java
ConnectionFactory
配置连接信息
@BeanpublicConnectionFactoryconnectionFactory(){CachingConnectionFactory connectionFactory =newCachingConnectionFactory();// ip+port
connectionFactory.setAddresses("39.108.108.234:5672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");return connectionFactory;}
Exchange、Queue、Binding的声明
/**
* SpringAMQP中的交换机的声明
* @return
*/@BeanpublicTopicExchangeexchange001(){returnnewTopicExchange("topic001",true,false);}/**
* SpringAMQP中的队列的声明
* @return
*/@BeanpublicQueuequeue001(){returnnewQueue("queue001",true);}/**
* SpringAMQP中绑定的声明
* @return
*/@BeanpublicBindingbinding001(){returnBindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");}
RabbitAdmin
创建Admin管理对象用于操作mq如声明交换机、队列等等
- 注入spring容器
@BeanpublicRabbitAdminrabbitAdmin(ConnectionFactory connectionFactory){RabbitAdmin rabbitAdmin =newRabbitAdmin(connectionFactory);// 初始化,必须为true,在springbean加载后进行初始化,将所有类型为 exchange、queue、binging对象全部加载进指定的集合中
rabbitAdmin.setAutoStartup(true);return rabbitAdmin;}
- RabbitAdmin常见API使用
@TestpublicvoidtestAdmin()throwsException{ rabbitAdmin.declareExchange(newDirectExchange("test.direct",false,false)); rabbitAdmin.declareExchange(newTopicExchange("test.topic",false,false)); rabbitAdmin.declareExchange(newFanoutExchange("test.fanout",false,false)); rabbitAdmin.declareQueue(newQueue("test.direct.queue",false)); rabbitAdmin.declareQueue(newQueue("test.topic.queue",false)); rabbitAdmin.declareQueue(newQueue("test.fanout.queue",false)); rabbitAdmin.declareBinding(newBinding("test.direct.queue",// 声明绑定类型,这里是对队列的绑定Binding.DestinationType.QUEUE,"test.direct","direct",newHashMap<>())); rabbitAdmin.declareBinding(BindingBuilder.bind(newQueue("test.topic.queue",false))//直接创建队列.to(newTopicExchange("test.topic",false,false))//直接创建交换机 建立关联关系.with("user.#"));//指定路由Key rabbitAdmin.declareBinding(BindingBuilder.bind(newQueue("test.fanout.queue",false)).to(newFanoutExchange("test.fanout",false,false)));//清空队列数据 rabbitAdmin.purgeQueue("test.topic.queue",false);}
RabbitTemplate
消息模板,SpringAMQP整合进行发送消息的关键类 (springboot中只需要在配置文件中配置信息即可)
- 注入spring容器
@BeanpublicRabbitTemplaterabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate(connectionFactory);return rabbitTemplate;}
- API使用示例
@AutowiredprivateRabbitTemplate rabbitTemplate;/** * 发送消息 * @throws Exception */@TestpublicvoidtestSendMessage()throwsException{// 消息配置MessageProperties messageProperties =newMessageProperties(); messageProperties.getHeaders().put("desc","信息描述.."); messageProperties.getHeaders().put("type","自定义消息类型..");// 创建消息Message message =newMessage("Hello RabbitMQ".getBytes(), messageProperties);// 发送消息// MessagePostProcessor 消息发送后进行额外的设置 rabbitTemplate.convertAndSend("topic001","spring.amqp", message,newMessagePostProcessor(){@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{System.err.println("------添加额外的设置---------"); message.getMessageProperties().getHeaders().put("desc","额外修改的信息描述"); message.getMessageProperties().getHeaders().put("attr","额外新加的属性");return message;}});}/** * 发送文本消息 * @throws Exception */@TestpublicvoidtestSendMessage2()throwsException{//1 创建消息MessageProperties messageProperties =newMessageProperties(); messageProperties.setContentType("text/plain");Message message =newMessage("mq 消息1234".getBytes(), messageProperties);// send方法只能传消息对象 rabbitTemplate.send("topic001","spring.abc", message);// convertAndSend会把数据转换为 message rabbitTemplate.convertAndSend("topic001","spring.amqp","hello object message send!"); rabbitTemplate.convertAndSend("topic002","rabbit.abc","hello object message send!");}
SimpleMessageListenerContainer
Spring AMQP 中提供的一个消息监听容器,用于在 RabbitMQ 中消费消息。它提供了一种方便的方式来配置和管理消息监听器以及相关的连接、通道和消费者等资源。
使用
在 Spring 中使用
SimpleMessageListenerContainer
,需要先将它作为一个 Bean 注册到 Spring 容器中,同时配置相关的属性,如连接工厂、监听的队列、消息监听器等。
可以对消费者进行丰富的配置
- 监听多个队列
- 设置事务特性、事务管理器、事务属性
- 设置消费者数量、最大最小数量、批量消费
- 设置消息确认和自动确认模式、是否重回队列、异常捕获handler函数等
- 设置消费者标签生产策略、是否独占模式、消费者属性等
- 设置具体监听器、转换器
- 在运行中可以动态的修改消费者数量的大小、接收消息的模式等
@BeanpublicSimpleMessageListenerContainermessageContainer(ConnectionFactory connectionFactory){SimpleMessageListenerContainer container =newSimpleMessageListenerContainer(connectionFactory);// 监听队列
container.setQueues(queue001(),queue002(),queue003(),queue_image(),queue_pdf());// 当前消费者数量
container.setConcurrentConsumers(1);// 最大消费者数量
container.setMaxConcurrentConsumers(5);// 是否
container.setDefaultRequeueRejected(false);// 签收模式
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setExposeListenerChannel(true);// 消费端标签策略
container.setConsumerTagStrategy(newConsumerTagStrategy(){@OverridepublicStringcreateConsumerTag(String queue){return queue +"_"+ UUID.randomUUID().toString();}});// 设置消息监听器或适配器
container.setMessageListener(“消息监听器或适配器”)return container;
MessageListener 消息监听器
在 Spring 中使用 RabbitMQ,通常会使用
MessageListener
接口来监听队列中的消息。
MessageListener
接口定义了一个
onMessage
方法,用于接收队列中的消息,并进行后续处理。
监听器
需求:将消息的内容解析为字符串,并打印出来
- 使用spring提供的匿名内部类重写onMessage方法
container.setMessageListener(newChannelAwareMessageListener(){@OverridepublicvoidonMessage(Message message,Channel channel)throwsException{String msg =newString(message.getBody());System.err.println("----------消费者: "+ msg);}});
- 定义了一个名为
RabbitMQListener
的 Spring 组件,并实现了MessageListener
接口中的onMessage
方法。在onMessage
方法中,我们将消息的内容解析为字符串,并打印出来。@ComponentpublicclassRabbitMQListenerimplementsMessageListener{@OverridepublicvoidonMessage(Message message){String body =newString(message.getBody(),StandardCharsets.UTF_8);System.out.println("Received message: "+ body);}}``````@ConfigurationpublicclassRabbitMQConfig{@AutowiredprivateRabbitMQListener rabbitMQListener;@BeanpublicSimpleMessageListenerContainermessageListenerContainer(ConnectionFactory connectionFactory){SimpleMessageListenerContainer container =newSimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueues(testQueue()); container.setMessageListener(rabbitMQListener);return container;}@BeanpublicQueuetestQueue(){returnnewQueue("test.queue");}}
适配器:MessageListenerAdapter
Spring AMQP 中提供的一个适配器类,用于将一个普通的 Java 对象转换为 RabbitMQ 消息监听器,并且可以自动解析和处理消息。
MessageListenerAdapter类中
publicstaticfinalString ORIGINAL_DEFAULT_LISTENER_METHOD ="handleMessage";
规定了自定义适配器类必须要有 handleMessage方法,反射根据这个名字把监听的消息传进去。
所以自定义的适配器必须有 handleMessage这个方法或者重写指定默认方法
消息转换器:MessageConverter
正常情况下消息体为二进制的数据方式进行传输,如果希望内部进行转换,或者指定自定义的转换器,就需要用到MessageConverter
很强大!可以将接收的消息转换为java对象、json格式、图片、文档、视频等
- 自定义转换器需要实现 MessageConverter接口 - 重写 toMessage() fromMessage()两个方法
!适配器和消息转换器的演示和详细说明请进项目查看!
SpringBoot
有详细的中文注释说明
消费端:https://gitee.com/isymikasan/rabbitmq-spring/tree/master/rabbitmq-springboot-consumer
生产端:https://gitee.com/isymikasan/rabbitmq-spring/tree/master/rabbitmq-springboot-producer
Spring Cloud Stream
- 什么是Spring Cloud Stream- Spring Cloud Stream是Spring Cloud的子项目之一,它提供了一种基于Spring Boot的构建消息驱动微服务应用的框架。使用Spring Cloud Stream可以很方便地搭建一个消息驱动的微服务系统,实现各个微服务之间的通信。
- 为什么要用Spring Cloud Stream整合MQ- Spring Cloud Stream作为消息中间件的抽象层,可以屏蔽底层消息中间件的实现细节,降低了对消息中间件的依赖。同时,Spring Cloud Stream提供了一些常用的消息模式(如发布/订阅、请求/应答),可以大大简化代码的编写。- 使用Spring Cloud Stream还可以轻松切换底层消息中间件,不必因为更换消息中间件而修改应用程序代码。目前,Spring Cloud Stream支持RabbitMQ、Kafka、Amazon Kinesis等多个消息中间件。
消费端:https://gitee.com/isymikasan/rabbitmq-spring/tree/master/rabbitmq-springcloudstream-consumer
生产端:https://gitee.com/isymikasan/rabbitmq-spring/tree/master/rabbitmq-springcloudstream-producer
deliveryTag和correlationData
- deliveryTag- 是一个整数- 用于标识消息在channel中的传递顺序(请求级别),它是一个单调递增的数字,每次传递一个新的消息,deliveryTag 都会自增。- 当消费者确认消费某条消息时,需要将该消息的
deliveryTag
值发送给 RabbitMQ,RabbitMQ 就会认为该消息已经被消费成功,可以从队列中删除。在实现消息确认机制时,该值起着非常重要的作用。 - correlationData- 是一个字节数组,通常用于标识一个消息的唯一性- 应用程序级别(全局)- (生产者发送消息到Broker)消息发送失败时可以根据该值进行消息追踪。在 RabbitMQ 的生产者发送消息时,可以通过设置
correlationData
来唯一标识一条消息,一旦消息发送失败,可以根据correlationData
进行追踪。输入链接说明
版权归原作者 mikasa_akm 所有, 如有侵权,请联系我们删除。