0


【消息队列】RabbitMQ五种消息模式

RabbitMQ

RabbitMQ

RabbitMQ是基于Erlang语言开发的开源消息通信中间件
官网地址:https://www.rabbitmq.com/

RabbitMQ安装

我们在Centos虚拟机中使用Docker来安装

  1. 下载镜像,在线拉取docker pull rabbitmq
  2. 安装MQ
docker run\
--env RABBITMQ_DEFAULT_USER=itcast \  # 设置环境变量用户名
--env RABBITMQ_DEFAULT_PASS= \  # 设置环境变量密码
--name mq \   # 队列名称
--hostname mq1 \  #配置主机名
-p 15672:15672 \  # MQ管理端口
-p 5672:5672 \   #MQ消息传输端口
-d \   # 后台运行
rabbitmq

在这里插入图片描述
在这里插入图片描述

交换机的创建与消息的发送由虚拟主机来完成,每个用户的虚拟主机是相互隔离的

在RabbitMQ中:

channel

:操作MQ的工具

exchange

:路由消息到队列中

queue

:缓存消息

virtual host:

虚拟主机,是对queue,exchange等资源的逻辑分组

常见的消息模型

  1. 基本消息队列
  2. 工作消息队列

这两种并没有用到交换机,而是直接到达队列

  1. 发布订阅(Publish,Subscribe),根据交换机类型不同分为三种:Fanout Exchange:广播Direct Exchange:路由Topic Exchange:主题

基本消息队列

publisher:

消息发布者,将消息发送到队列queue

queue:

消息队列,负责接收并缓存消息

consumer:

订阅队列,处理队列中的消息

java模型(消息发布者)

@Testpublicvoidtest()throwsIOException,TimeoutException{//1.建立连接,与消息队列进行连接ConnetionFactory factory =newConnetionFactory();//设置连接参数,主机名,端口号,vhost,用户名,密码
    factory.setHost(192.168.75.136);
    factory.setPort(5672);
    factory.setVirtualHost("/");
    factory.setUsername("itcast");
    factory.setPassword("");//建立连接Connection connection =factory.newConnection();//创建通道Channel,就可以向队列发送消息了Channel channel =connection.createChannel();//创建队列String queuename="hlh";
    channel.queueDeclare(queuename,false,false,false,null);//发送消息String message="hello";
    channel.basicPublish("",queuename,null,message.getBytes());//关闭通道和连接
    channel.close();
    connection.close();}

java模型(消息消费者)

//1.建立连接,与消息队列进行连接ConnetionFactory factory =newConnetionFactory();//设置连接参数,主机名,端口号,vhost,用户名,密码
    factory.setHost(192.168.75.136);
    factory.setPort(5672);
    factory.setVirtualHost("/");
    factory.setUsername("itcast");
    factory.setPassword("");//建立连接Connection connection =factory.newConnection();//创建通道Channel,就可以向队列发送消息了Channel channel =connection.createChannel();//创建队列String queuename="hlh";
    channel.queueDeclare(queuename,false,false,false,null);//订阅消息
    channel.basicConsume(queuename,true,newDefaultConsumer(channel){@Override//处理消息的代码,绑定函数,有了消息才执行publicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{//处理消息String message=newString(body);}})

注意:上边生产者消费者都创建了队列:

这是为了防止消息队列中的队列不存在,在进行消息队列初始化的时候不知道是先建立消费者,还是先建立生产者,所以都执行创建函数,但是创建的队列只有一个不会重复

SpringAMQP

  • AMQP

是用于在应用程序或之间传递业务消息的开放标准,该协议与语言和平台无关,更符合微服务中的独立性的要求

  • Spring AMQP

Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息,包含两部分,其中Spring-amqp是基础抽象,spring-rabbit是底层的默认实现

  • 特征:
  1. 监听器容器,用于异步处理入站消息
  2. 用于发送和接收消息的RabbitTemplate
  3. Rabbitadmin用于自动声明队列,交换和绑定
  • 使用:
  1. 引入spring-amqp的依赖在这里插入图片描述 在yml中配置mq连接信息:
spring:rabbitmq:host: 192.168.75.136 #主机名port:5672#端口virtual-host: / #虚拟主机username: itcast #用户名password:#密码
  1. 在生产者服务中利用RabbitTemplate发送消息到hlh.queue这个队列
publicclass springamqptest{@AutowiredprivateRabbitTemplate rabbittemplate;@Testpublicvoidtest(){String queuename="hlh.queue";String message="hello";
     rabbittemplate.convertAndSend(queuename,message);}}
  1. 在消费者服务端编写消费逻辑,绑定到hlh.queue这个队列中
@ComponentpublicclassSpringrabbitListener{@RabbitListener(queues="hlh.queue")publicvoidlistenSimple(String msg)throwsInterruptedException{//消费逻辑代码}}

注意:消息一旦消费就会从队列中删除,rabbitmq没有消息回溯功能

WorkQueue

Work queue,工作队列。可以提高消息处理速度,避免队列消息堆积

一个消息队列绑定多个消费者

假设现在生产者每秒循环发送50条消息,此时的消费者怎么处理:

@ComponentpublicclassSpringrabbitListener{@RabbitListener(queues="hlh.queue")publicvoidlistenSimple(String msg)throwsInterruptedException{//消费逻辑代码}@RabbitListener(queues="hlh.queue")publicvoidlistenSimple2(String msg)throwsInterruptedException{//消费逻辑代码}}

通过定义多个消费者进行消费,追上生产者生产的速度,同一个消息只能被一个消费者消费,一旦消费完就会在队列中删除

消息预取

指的每个消费者每次取多少条消息:
可以通过配置进行配置:

spring:rabbitmq:host: 192.168.75.136
      port:5672virtual-host: /
      username: itcast
      password:listener:simple:prefecth:1#每次只能获取一条消息,处理完才能获得下一个消息

发布订阅模式

发布订阅可以使得同一个消息发送给多个消费者,实现方式是加入了exchange(交换机)
在这里插入图片描述

注意:exchange负责消息路由,而不是存储,路由失败则消息丢失

交换机的作用:

  1. 接收生产者的消息,将消息按照规则路由到与之绑定的队列
  2. 不能缓存消息,路由失败,消息丢失
  3. FanoutExchange的会将消息路由到每个绑定的队列

SpringAMQP提过了声明交换机,队列,绑定关系的API:
在这里插入图片描述

Fanout Exchange

Fanout Exchange 会将所有的消息路由到每一个跟其绑定的queue
在创建配置类,在配置类中进行消息队列绑定交换机

@ConfigurationpublicclassFanoutConfig{// 声明FanoutExchange交换机@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange("itcast.fanout");}//声明一个队列@BeanpublicQueuefanoutQueue1(){returnnewQueue("fanout.queue1");}// 绑定队列跟交换机@BeanpublicBindingbindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}}

此时的生产者如何发送消息:

publicvoidtest(){//给出交换机名称String exchangeName="itcast.fanout";String message="hello";//发送消息
  rabbitTemplate.convertAndSend(exchangeName,"",message);}

监听者如何收到消息

@RabbitListener(queues="fanout.queue1")publicvoidlistener(String msg){//处理得到的消息}

DirectExchange

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此成为路由模式(routes)

每一个Queue都与Exchange设置一个BindingKey

发布者发送消息时,指定消息的RoutingKey,Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

一个队列可以指定多个Key

我们可以通过 @RabbitListener声明Exchange,Queue,RoutingKey
在消费者方法上注解

@RabbitListener(bindings=@QueueBinding(value=@Queue(name="direct.queue1"),exchange=@Exchange(name="itcast.direct",type=ExchangeTypes.DIRECT),key={"red","blue"}))publicvoidListener(String msg){//进行消息的处理}

在生产者生产时:

publicvoidtest(){//给出交换机名称String exchangeName="itcast.fanout";String message="hello";//发送消息
  rabbitTemplate.convertAndSend(exchangeName,"blue",message);}

TopicExchange

TopicExchange与路由模式类似,区别在于routingKey必须是多个单词的列表,并且以

.

分隔
Queue与Exchange指定BindingKey时可以使用通配符:

#:

代指0个或多个单词

*:

代指一个单词
同样也是使用 @RabbitListener进行声明

@RabbitListener(bindings=@QueueBinding(value=@Queue(name="direct.queue1"),exchange=@Exchange(name="itcast.direct",type=ExchangeTypes.DIRECT),key="hi.#"))publicvoidListener(String msg){//进行消息的处理}

生产者生产消息:

publicvoidtest(){//给出交换机名称String exchangeName="itcast.fanout";String message="hello";//发送消息
  rabbitTemplate.convertAndSend(exchangeName,"hi.now",message);}

消息转换器

在SpringAMQP的发送方法中,接收消息的类型是Object,也就是我们可以发送任意对象类型的消息,SpringAMQP会帮助我们序列化为字节后发送

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的,而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化

如果要修改只需定义一个MessageConverter 类型的Bean即可,推荐使用JSON方式完成序列化

  1. 引入jackson的依赖在这里插入图片描述
  2. 声明MessageConverter:
@BeanpublicMessageConverterjsonMessageConverter(){returnnewJackson2JsonMessageConverter();}

这样发送的消息就会使用自定义的转换类型

标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/weixin_62513677/article/details/138325496
版权归原作者 轻舟慢行. 所有, 如有侵权,请联系我们删除。

“【消息队列】RabbitMQ五种消息模式”的评论:

还没有评论