0


[小白学微服务]消息队列RabbitMQ 的安装和基础使用

前言

我们为什么要用消息队列技术?

有些复杂的业务系统,一次用户请求可能会同步调用N个系统的接口,需要等待所有的接口都返回了,才能真正的获取执行结果。

这样就导致了:

1.系统之间耦合性太高,如果调用的任何一个子系统出现异常,整个请求都会异常,对系统的稳定性非常不利

2.这种同步接口调用的方式总耗时比较长,非常影响用户的体验,特别是在网络不稳定的情况下,极容易出现接口超时问题

3.果用户突增,一时间所有的请求都到数据库,可能会导致数据库无法承受这么大的压力,响应变慢或者直接挂掉。


一、都有什么消息队列,各有什么特色?

1.ActiveMQ:它是一个完全支持JMS规范的的消息中间件。丰富的API,多种集群架构模式让ActiveMQ在业界成为老牌的消息中间件,在中小型企业颇受欢迎!
2.Kafka:主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。
3.RocketMQ:RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。
4.RabbitMQ:RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

二、安装RabbitMQ(Linux环境)

官方教程,由此进入

1.使用docker引入RabbitMQ镜像

docker pull rabbitmq:3-management

:3-management 是指定的版本,如果不写默认引入最新版

我们已经成功引入了镜像

2.启动rabbitmq容器,

镜像只是相当于硬盘上的应用文件,容器则是相当于运行的应用程序

执行如下代码:

docker run \
 -e RABBITMQ_DEFAULT_USER=whr \
 -e RABBITMQ_DEFAULT_PASS=123456 \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3-management

解释下各项配置参数:

RABBITMQ_DEFAULT_USER=whr 是设置rabbitmq的默认用户名为whr

RABBITMQ_DEFAULT_PASS=123456 是设置abbitmq的默认用户密码为123456

--name mq 容器名字

--hostname mq1 主机名,集群部署要用

-p 15672:15672 UI界面端口
-p 5672:5672 连接端口 这两个都是开放端口

-d 后台运行

(ps: / 是换行符)

3.访问UI界面

上面我们完成了启动,可以去UI界面看看 访问方式就是 ip:端口号

输入上面配置的用户名,密码 即可登录

原版网页是英文的,我这里翻译为了中文

这样我们的Rabbit就已经完成安装和启动了

三、springbot整合RabbitMq

1. 引入所需依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.编写配置文件

在application.yml配置文件中 载入如下配置:

spring:
  rabbitmq:
    host: 47.96.73.173 #ip
    port: 5672 #连接端口
    username: whr #用户名
    password: 123456 #密码
    virtual-host: / #虚拟主机默认是 '/'
    listener:
      simple:
        prefetch: 1 #设置预取消息上限

这样整合就完成了,在正式开始使用之前我们先来了解下rabbitmq的组件和模式


四、RabbitMq的组成

1.Queue

队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列

2.Exchange

交换机,用来实现消息的路由

3.channels

通道,建立连接后,会形成通道,消息的投递获取依赖通道

4.connections

无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费

五、RabbitMq常用模式

1.简单模式

1个生产者 + 1个队列 + 1个消费者;

生产者代码:

    @Autowired
    private RabbitTemplate rabbitTemplate;
    //simple
    @Test
    public void sendMsg_1(){
        String queueName = "simple.queue";
        String msg = "spring amqp 666";
        rabbitTemplate.convertAndSend(queueName, msg);
    }

消费者代码:

@Component
@Slf4j
public class SoringRabbitListener {
//simple
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue (String msg){
        log.info("收到消息:{}", msg);
    }
}

运行结果:

2.工作者模式

1个生产者 + 1个队列 + 多个消费者,一条消息只能被消费一次;

生产者代码:

    @Test
    public void sendMsg_2() throws InterruptedException {
        String queueName = "simple.queue";
        String msg = "spring amqp 666 ";
        int i = 0;
        while (i < 50){
            rabbitTemplate.convertAndSend(queueName, "", msg + i);
            Thread.sleep(20);
            i++;
        }
    }

消费者代码:

//    work
@Component
@Slf4j
public class SoringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue_1 (String msg) throws InterruptedException {
        log.info("消费者1收到消息:{} 时间是:{}", msg, LocalDateTime.now());
        Thread.sleep(20);
    }
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue_2 (String msg) throws InterruptedException {
        log.error("消费者2收到消息:{} 时间是:{}", msg, LocalDateTime.now());
        Thread.sleep(100);
    }
}

运行结果:

3.订阅消息模型之 fanout

个生产者 + 1个交换机 + 多个队列 + 多个消费者,一条消息可以被多个消费者消费;

生产者:

    @Test
    public void sendToFanoutExchange(){
        String fanoutExchangeName = "itcast.fanout";
        String msg = "spring amqp 666";
        rabbitTemplate.convertAndSend(fanoutExchangeName,"", msg);
    }

消费者:

@Component
@Slf4j
public class SoringRabbitListener {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "fanout.queue1"),
            exchange = @Exchange(name = "whr.fanout", type = ExchangeTypes.FANOUT)
    ))
    public void listenFanoutQueue1 (String msg){
      log.info("消费者1收到消息:{} 时间是:{}", msg, LocalDateTime.now());
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "fanout.queue2"),
            exchange = @Exchange(name = "whr.fanout", type = ExchangeTypes.FANOUT)
    ))
    public void listenFanoutQueue2 (String msg){
        log.info("消费者2收到消息:{} 时间是:{}", msg, LocalDateTime.now());
    }
}

运行结果:

4.订阅消息模型之durect

1个生产者 + 1个交换机 + 多个队列 + 多个消费者 ,routingKey ,一条消息发送给符合 routingKey 的队列

生产者:

    @Test
    public void sendToDirectExchange(){
        String fanoutExchangeName = "whr.direct";
        String msg1 = "此消息发给category";
        String msg2 = "此消息发给goods";
        String msg3 = "此消息发给flavor";
        rabbitTemplate.convertAndSend(fanoutExchangeName,"category", msg1);
        rabbitTemplate.convertAndSend(fanoutExchangeName,"goods", msg2);
        rabbitTemplate.convertAndSend(fanoutExchangeName,"flavor", msg3);
    }

消费者:

@Component
@Slf4j
public class SoringRabbitListener { 
    //路由 direct
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "whr.direct"),
            key = {"goods", "category"}
    ))
    public void listenDirectQueue1 (String msg){
        log.info("消费者1收到消息:{} 时间是:{}", msg, LocalDateTime.now());
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "whr.direct"),
            key = {"flavor", "category"}
    ))
    public void listenDirectQueue2 (String msg){
        log.info("消费者2收到消息:{} 时间是:{}", msg, LocalDateTime.now());
    }
}

结果:

5.订阅消息模型之topic

通配符,

#

:匹配一个或者多个

*

:一个词;

生产者:

    @Test
    public void sendToTopicExchange(){
        String fanoutExchangeName = "whr.topic";
        String msg1 = "此消息发给至:com.kk";
        rabbitTemplate.convertAndSend(fanoutExchangeName,"com.kk", msg1);
    }

消费者:

@Component
@Slf4j
public class SoringRabbitListener { 
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "whr.topic", type = ExchangeTypes.TOPIC),
            key = "com.#"
    ))
    public void listenTopicQueue1 (String msg){
        log.info("消费者1收到消息:{} 时间是:{}", msg, LocalDateTime.now());
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "whr.topic", type = ExchangeTypes.TOPIC),
            key = "#.kk"
    ))
    public void listenTopicQueue2 (String msg){
        log.info("消费者2收到消息:{} 时间是:{}", msg, LocalDateTime.now());
    }
}

结果:

结束语

以上就是RabbbitMq的基本使用了,新人作者,还有不足,欢迎大佬多多补充,多多关照哈~拜拜↑拜拜↓拜拜↑


本文转载自: https://blog.csdn.net/qq_62859700/article/details/136521808
版权归原作者 老公525 所有, 如有侵权,请联系我们删除。

“[小白学微服务]消息队列RabbitMQ 的安装和基础使用”的评论:

还没有评论