0


RabbitMQ基础使用

1.MQ基础介绍

同步调用

OpenFeign的调用。这种调用中,调用者发起请求后需要等待服务提供者执行业务返回结果后,才

能继续执行后面的业务。也就是说调用者在调用过程中处于阻塞状态,因此我们称这种调用方式为同步调用

异步调用

异步调用通常是基于消息通知的方式,包含三个角色:

    消息发送者:投递消息的人,就是原来的**调用者**

    消息接收者:接收和处理消息的人,就是原来的**服务提供者**

    消息代理:管理、暂存、转发消息,就是原来的服务提供方

不同的MQ介绍
RabbitMQActiveMQRocketMQKafka公司/社区RabbitApache阿里Apache开发语言ErlangJavaJavaScala&Java协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议可用性高一般高高单机吞吐量一般差高非常高消息延迟微秒级毫秒级毫秒级毫秒以内消息可靠性高一般高一般
本文讲解RabbitMQ

2.RabbitMQ

1.安装部署

在docker安装rabbit,代码如下:

docker run \
 -e RABBITMQ_DEFAULT_USER=itheima \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq \
 -p 15672:15672 \
 -p 5672:5672 \
 --network hm-net\
 -d \
 rabbitmq:3.8-management

安装完成后,我们访问 http://192.168.150.101:15672即可看到管理控制台。

2.收发消息

交换机

点开amq.fanout交换机,将交换机和队列做绑定

队列

指定队列的名字添加队列

3.数据隔离

问题:要实现不同虚拟主机之间不同的交换机之间的隔离,需要用到数据隔离的技术。

添加用户

添加自己的虚拟主机

3.SpringAMQP

RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。

而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且

还基于SpringBoot对其实现了自动装配,使用起来非常方便。

1.快速入门

导入依赖:

        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

配置信息:

spring:
  rabbitmq:
    host: 192.168.145.129 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: hmall # 用户名
    password: 123 # 密码

编写消息发送者和接收者的代码

发送者:写了一个测试类,注入了RabbitTemplate对象调用convertAndSend方法把队列名和消息发送出去。

@SpringBootTest
public class amqp {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testsimplequeue(){
        //队列名
        String queue="simple.queue";
        //消息
        String message="hello";
        //发送消息
        rabbitTemplate.convertAndSend(queue,message);
    }
}

接收者:在方法上加上 @RabbitListener(queues = "simple.queue")即可实现对消息的接收。

@Component
public class listen {
    //队列的名称
    @RabbitListener(queues = "simple.queue")
    //发送者发送的什么类型,接收者用什么类型接收
    public void listen1(String message){
        System.out.println("接收到消息"+message);
    }
}

2.WorkQueues模型

Work queues,任务模型。简单来说就是多个消费者绑定到一个队列,共同消费队列中的消息

模拟此环境再多加一个消费者即可。

AMQP默认是采用类似轮询的机制,性能慢的机器会拖慢速度,因此需做如下配置实现能者多劳:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

3.交换机

1.Fanout交换机

fanout交换机是将消息全部发送到绑定的队列中。

使用fanout交换机发送消息要使用如下函数:

    @Test
    public void testfanout(){
        //交换机名称
        String exchange="hmall.fanout";
        //消息
        String message="hello,everyone";
        //fanout交换机发送消息         交换机    队列   消息
        rabbitTemplate.convertAndSend(exchange,null,message);
    }

2.Direct交换机

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

    队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey

(路由key)

    消息的发送方在 向 Exchange发送消息时,也必须指定消息的 
RoutingKey

    Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key

进行判断,只有 队列的

Routingkey

与消息的

Routing key

完全一致,才会接收到消息

绑定路由key实例:

代码示例:

rabbitTemplate.convertAndSend(exchange,"white",message);指定路由key,发送到指定的队列。

    @Test
    public void testdirect(){
        //交换机名称
        String exchange="hmall.direct";
        //消息
        String message="hello,everyone,direct";
        //发送消息                             指定路由key
        rabbitTemplate.convertAndSend(exchange,"white",message);
    }

3.Topic交换机

Topic

类型的

Exchange

Direct

相比,都是可以根据

RoutingKey

把消息路由到不同的队列。

只不过

Topic

类型

Exchange

可以让队列在绑定

BindingKey

的时候使用通配符!

BindingKey

一般都是有一个或多个单词组成,多个单词之间以

.

分割,例如:

item.insert

通配符规则:

  • #:匹配一个或多个词
  • *:匹配不多不少恰好1个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu
  • item.*:只能匹配item.spu
绑定示例:

代码示例:

    @Test
    public void testdirect(){
        //交换机名称
        String exchange="hmall.topic";
        //消息
        String message="hello,everyone,topic";
        //发送消息 topic
        rabbitTemplate.convertAndSend(exchange,"#.news",message);
    }

4.基于bean声明队列交换机

在java代码中注册交换机和队列

fanout交换机代码如下:

@Configuration
public class fanout {
    //注册交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        //return ExchangeBuilder.fanoutExchange("hmall.fanout").build();
        return new FanoutExchange("hmall.fanout");
    }
    //注册队列
    @Bean
    public Queue queue1(){
        //return QueueBuilder.durable("fanout.queue1").build();
        return new Queue("fanout.queue1");
    }
    //绑定
    @Bean
    public Binding fanoutbindingqueue1(Queue queue1,FanoutExchange fanout){
        return BindingBuilder.bind(queue1).to(fanout);
    }
    @Bean
    public Queue queue2(){
        return new Queue("fanout.queue2");
    }
    @Bean
    public Binding fanoutbindingqueue2(Queue queue2,FanoutExchange fanout){
        return BindingBuilder.bind(queue2).to(fanout);
    }
}

direct交换机代码如下:

@Configuration
public class DirectConfig {

    /**
     * 声明交换机
     * @return Direct类型交换机
     */
    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange("hmall.direct").build();
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue directQueue1(){
        return new Queue("direct.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue directQueue2(){
        return new Queue("direct.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
    }
}

5.基于注解声明队列交换机

基于bean声明太过复杂,尤其是direct交换机绑定key时,因此要用到注解声明。

在注解下同时创建队列交换机并完成绑定,代码如下:

    @RabbitListener(bindings = @QueueBinding(
            //注册队列                  名称              持久化
            value = @Queue(name = "direct.queue",durable = "ture"),
            //注册交换机                 名称              类型为direct
            exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
            //交换机绑定队列属性有red和white
            key = {"red","white"}
    ))
    public void listen(String message){
        System.out.println("接收到消息"+message);
    }

4.消息转化器

spring在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:(比如用rabbitmq发送map集合,数据被序列化后字节太多而且可读性差)

  • 数据体积过大
  • 有安全漏洞
  • 可读性差

因此我们使用JSON方式来做序列化和反序列化。

导入依赖:

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

配置消息转换器,在

publisher

consumer

两个服务的启动类中添加一个Bean即可:

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}

RabbitMQ高级

未完待续


本文转载自: https://blog.csdn.net/qq_65317837/article/details/142528613
版权归原作者 悲伤的创可贴 所有, 如有侵权,请联系我们删除。

“RabbitMQ基础使用”的评论:

还没有评论