1.1. MQ 的相关概念
1.1.1. 什么是 MQ
MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是 message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不用依赖其他服务。
**1.1.2. ****为什么要用 MQ **
1.流量消峰
2.应用解耦
3.异步处理
**1.1.3. ****MQ 的分类 **
2007 年发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是**当前最****主流的消息中间件之一**
优点:由于 erlang 语言的**高并发特性**,性能较好;**吞吐量到万级**,MQ 功能比较完备,健壮、稳定、易 用、跨平台、**支持多种语言 **如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX 文档齐全;开源提供的管理界面非常棒,用起来很好用,**社区活跃度高**;更新频率相当高 https://www.rabbitmq.com/news.html
**1.1.4. **MQ 的选择
RabbitMQ 结合 erlang 语言本身的并发优势,性能好**时效性微秒级**,**社区活跃度也比较高**,管理界面用起来十分方便,如果你的**数据量没有那么大**,中小型公司优先选择功能比较完备的 RabbitMQ。
**1.2.2.****四大核心概念 **
**生产者 **
产生数据发送消息的程序是生产者。
**交换机 **
交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定。
队列
队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式。
**消费者 **
消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
**1.2.3.****RabbitMQ 核心部分 **
**1.2.4.****各个名词介绍 **
Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等
Connection:publisher/consumer 和 broker 之间的 TCP 连接
Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP
Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。
**Channel 作为轻量级的 **
**Connection ****极大减少了操作系统建立 ****TCP connection **的开销
Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发
消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout
(multicast)
Queue:消息最终被送到这里等待 consumer 取走
Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
1.2.5.安装Centos7
尝试网络连通性
配置静态ip地址
vi /etc/sysconfig/network-scripts/ifcfg-ens33
BROWSER_ONLY=no
BOOTPROTO=static #修改类型
DEFROUTE=yes
IPV4_FAILURE_FATAL=no
IPV6INIT=yes
IPV6_AUTOCONF=yes
IPV6_DEFROUTE=yes
IPV6_FAILURE_FATAL=no
IPV6_ADDR_GEN_MODE=stable-privacy
NAME=ens33
UUID=bc21e331-eea2-4f5b-b229-c9794f1c5048
DEVICE=ens33
ONBOOT=yes #修改启动
IPADDR=192.168.200.10 #ip
NETMASK=255.255.255.0 #子网掩码
GATEWAY=192.168.200.2 #网关
DNS1=8.8.8.8
DNS2=114.114.114.114
systemctl restart network #重启网络
ping www.baidu.com
**1.2.5.**安装RabbitMQ
1.官网地址
https://www.rabbitmq.com/download.html
2.文件上传
上传到/usr/local/software 目录下(如果没有 software 需要自己创建)
-rw-r--r--. 1 root root 43K Jun 15 2021 rabbitmq_delayed_message_exchange-3.8.0.ez
-rw-r--r--. 1 root root 15M Jun 15 2021 rabbitmq-server-3.8.8-1.el7.noarch.rpm
-rw-r--r--. 1 root root 18M Jun 15 2021 erlang-21.3-1.el7.x86_64.rpm
3.安装文件(分别按照以下顺序安装)
1.逐个安装软件
rpm -ivh erlang-21.3-1.el7.x86_64.rpm
yum install socat -y
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
2.查看服务状态
/sbin/service rabbitmq-server status
3.启动服务
/sbin/service rabbitmq-server start
4.添加开机启动 RabbitMQ 服务
chkconfig rabbitmq-server on
5.查看服务状态
/sbin/service rabbitmq-server status
6.停止服务(选择执行)
/sbin/service rabbitmq-server stop
7.开启 web 管理插件
rabbitmq-plugins enable rabbitmq_management
8.启动服务
/sbin/service rabbitmq-server start
9.用默认账号密码(guest)访问地址 http://192.168.200.10:15672/出现权限问题
10.查看防火墙状态
systemctl status firewalld
11.关闭防火墙
systemctl stop firewalld
12.永久关闭防火墙
systemctl enable firewalld
13.控制台展示访问地址 http://192.168.200.10:15672/
4.添加一个新的用户
创建账号
rabbitmqctl add_user admin 123
设置用户角色
rabbitmqctl set_user_tags admin administrator
设置用户权限
语法 set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限
当前用户和角色
rabbitmqctl list_users
5.再次利用 admin 用户登录
- 重置命令
关闭应用的命令为
rabbitmqctl stop_app
清除的命令为
rabbitmqctl reset
重新启动命令为
rabbitmqctl start_app
2. Hello World
“ P”是我们的生产者,“ C”是我们的消费者。中间的框是一个队列-RabbitMQ 代
表使用者保留的消息缓冲区
**2.1. **依赖
<!--指定 jdk 编译版本-->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!--rabbitmq 依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
**2.2. ****消息生产者 **
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("182.92.234.71");
factory.setUsername("admin");
factory.setPassword("123");
//channel 实现了自动 close 接口 自动关闭 不需要显示关闭
try(Connection connection = factory.newConnection();Channel channel =
connection.createChannel()) {
/**
* 生成一个队列
* 1.队列名称
* 2.队列里面的消息是否持久化 默认消息存储在内存中
* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
* 5.其他参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message="hello world";
/**
* 发送一个消息
* 1.发送到那个交换机
* 2.路由的 key 是哪个
* 3.其他的参数信息
* 4.发送消息的消息体
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送完毕");
}
}
}
**2.3. ****消息消费者 **
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("182.92.234.71");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println("等待接收消息....");
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback=(consumerTag,delivery)->{
String message= new String(delivery.getBody());
System.out.println(message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("消息消费被中断");
};
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
* 3.消费者未成功消费的回调
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
版权归原作者 Sunshineㄙ 所有, 如有侵权,请联系我们删除。