0


RabbitMq笔记


1.1. MQ 的相关概念

1.1.1. 什么是 MQ

    MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是 message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不用依赖其他服务。

**1.1.2. ****为什么要用 MQ **

    1.流量消峰 

    2.应用解耦

    3.异步处理

**1.1.3. ****MQ 的分类 **

    2007 年发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是**当前最****主流的消息中间件之一**

    优点:由于 erlang 语言的**高并发特性**,性能较好;**吞吐量到万级**,MQ 功能比较完备,健壮、稳定、易 用、跨平台、**支持多种语言 **如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX 文档齐全;开源提供的管理界面非常棒,用起来很好用,**社区活跃度高**;更新频率相当高 https://www.rabbitmq.com/news.html 

**1.1.4. **MQ 的选择

    RabbitMQ 结合 erlang 语言本身的并发优势,性能好**时效性微秒级**,**社区活跃度也比较高**,管理界面用起来十分方便,如果你的**数据量没有那么大**,中小型公司优先选择功能比较完备的 RabbitMQ。 

**1.2.2.****四大核心概念 **

**生产者 **

    产生数据发送消息的程序是生产者。

**交换机 **

    交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定。

队列

    队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式。

**消费者 **

    消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。

**1.2.3.****RabbitMQ 核心部分 **

**1.2.4.****各个名词介绍 **

Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker

Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等

Connection:publisher/consumer 和 broker 之间的 TCP 连接

Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP

Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。

**Channel 作为轻量级的 **

**Connection ****极大减少了操作系统建立 ****TCP connection **的开销

Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发

消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout

(multicast)

Queue:消息最终被送到这里等待 consumer 取走

Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

1.2.5.安装Centos7

尝试网络连通性

配置静态ip地址

vi /etc/sysconfig/network-scripts/ifcfg-ens33

BROWSER_ONLY=no
BOOTPROTO=static    #修改类型
DEFROUTE=yes
IPV4_FAILURE_FATAL=no
IPV6INIT=yes
IPV6_AUTOCONF=yes
IPV6_DEFROUTE=yes
IPV6_FAILURE_FATAL=no
IPV6_ADDR_GEN_MODE=stable-privacy
NAME=ens33
UUID=bc21e331-eea2-4f5b-b229-c9794f1c5048
DEVICE=ens33
ONBOOT=yes   #修改启动

IPADDR=192.168.200.10        #ip
NETMASK=255.255.255.0        #子网掩码
GATEWAY=192.168.200.2        #网关
DNS1=8.8.8.8                 
DNS2=114.114.114.114 

     
systemctl restart network  #重启网络   

ping www.baidu.com

**1.2.5.**安装RabbitMQ

1.官网地址

https://www.rabbitmq.com/download.html

2.文件上传

上传到/usr/local/software 目录下(如果没有 software 需要自己创建)

-rw-r--r--. 1 root root 43K Jun 15  2021 rabbitmq_delayed_message_exchange-3.8.0.ez
-rw-r--r--. 1 root root 15M Jun 15  2021 rabbitmq-server-3.8.8-1.el7.noarch.rpm
-rw-r--r--. 1 root root 18M Jun 15  2021 erlang-21.3-1.el7.x86_64.rpm

3.安装文件(分别按照以下顺序安装)

1.逐个安装软件
rpm -ivh erlang-21.3-1.el7.x86_64.rpm
yum install socat -y
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm

2.查看服务状态
/sbin/service rabbitmq-server status

3.启动服务
/sbin/service rabbitmq-server start

4.添加开机启动 RabbitMQ 服务
chkconfig rabbitmq-server on

5.查看服务状态
/sbin/service rabbitmq-server status

6.停止服务(选择执行)
/sbin/service rabbitmq-server stop

7.开启 web 管理插件
rabbitmq-plugins enable rabbitmq_management

8.启动服务
/sbin/service rabbitmq-server start

9.用默认账号密码(guest)访问地址 http://192.168.200.10:15672/出现权限问题

10.查看防火墙状态
 systemctl status firewalld

11.关闭防火墙
 systemctl stop firewalld

12.永久关闭防火墙
systemctl enable firewalld

13.控制台展示访问地址 http://192.168.200.10:15672/

4.添加一个新的用户

创建账号
rabbitmqctl add_user admin 123

设置用户角色
rabbitmqctl set_user_tags admin administrator

设置用户权限
    语法 set_permissions [-p <vhostpath>] <user> <conf> <write> <read>

rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限

当前用户和角色
rabbitmqctl list_users

5.再次利用 admin 用户登录

  1. 重置命令
关闭应用的命令为
rabbitmqctl stop_app

清除的命令为
rabbitmqctl reset

重新启动命令为
rabbitmqctl start_app

2. Hello World

“ P”是我们的生产者,“ C”是我们的消费者。中间的框是一个队列-RabbitMQ 代

表使用者保留的消息缓冲区

**2.1. **依赖

<!--指定 jdk 编译版本-->
<build>
 <plugins>
 <plugin>
 <groupId>org.apache.maven.plugins</groupId>
 <artifactId>maven-compiler-plugin</artifactId>
 <configuration>
 <source>8</source>
 <target>8</target>
 </configuration>
 </plugin>
 </plugins>
</build>
<dependencies>
 <!--rabbitmq 依赖客户端-->
 <dependency>
 <groupId>com.rabbitmq</groupId>
 <artifactId>amqp-client</artifactId>
 <version>5.8.0</version>
 </dependency>
 <!--操作文件流的一个依赖-->
 <dependency>
 <groupId>commons-io</groupId>
 <artifactId>commons-io</artifactId>
 <version>2.6</version>
 </dependency>
</dependencies>

**2.2. ****消息生产者 **

public class Producer {
 private final static String QUEUE_NAME = "hello";
 public static void main(String[] args) throws Exception {
 //创建一个连接工厂
 ConnectionFactory factory = new ConnectionFactory();
 factory.setHost("182.92.234.71");
 factory.setUsername("admin");
 factory.setPassword("123");
 //channel 实现了自动 close 接口 自动关闭 不需要显示关闭
 try(Connection connection = factory.newConnection();Channel channel = 
connection.createChannel()) {
 /**
 * 生成一个队列
 * 1.队列名称
 * 2.队列里面的消息是否持久化 默认消息存储在内存中
 * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
 * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
 * 5.其他参数
 */
 channel.queueDeclare(QUEUE_NAME,false,false,false,null);
 String message="hello world";
 /**
 * 发送一个消息
 * 1.发送到那个交换机
 * 2.路由的 key 是哪个
 * 3.其他的参数信息
 * 4.发送消息的消息体
 */
 channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
 System.out.println("消息发送完毕");
 }
 }
}

**2.3. ****消息消费者 **

public class Consumer {
 private final static String QUEUE_NAME = "hello";
 public static void main(String[] args) throws Exception {
 ConnectionFactory factory = new ConnectionFactory();
 factory.setHost("182.92.234.71");
 factory.setUsername("admin");
 factory.setPassword("123");
 Connection connection = factory.newConnection();
 Channel channel = connection.createChannel();
 System.out.println("等待接收消息....");
 //推送的消息如何进行消费的接口回调
 DeliverCallback deliverCallback=(consumerTag,delivery)->{
 String message= new String(delivery.getBody());
 System.out.println(message);
 };
 //取消消费的一个回调接口 如在消费的时候队列被删除掉了
 CancelCallback cancelCallback=(consumerTag)->{
 System.out.println("消息消费被中断");
 };
/**
 * 消费者消费消息
 * 1.消费哪个队列
 * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
 * 3.消费者未成功消费的回调
 */
 channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
 }
}
标签: rabbitmq 笔记

本文转载自: https://blog.csdn.net/qinqi7697497/article/details/139370507
版权归原作者 Sunshineㄙ 所有, 如有侵权,请联系我们删除。

“RabbitMq笔记”的评论:

还没有评论