一、RabbitMQ入门
浏览器访问RabbitMQ的官网RabbitMQ官网
我们点击上方的菜单Features查看RabbitMQ的主要特性
上面介绍了一下RabbitMQ的特性:
1、异步消息传递:
支持多种消息传递协议、消息队列、传递确认、灵活的队列路由、多种交换类型。
2、开发人员经验:
使用 Kubernetes、BOSH、Chef、Docker 和 Puppet 进行部署。使用最喜欢的编程语言开发跨语言消息传递,如 Java、.NET、PHP、Python、JavaScript、Ruby、Go 等: Java、.NET、PHP、Python、JavaScript、Ruby、Go 等。
3、分布式部署:
以集群形式部署,实现高可用性和高吞吐量;跨多个可用区和区域联合部署。
4、企业和云就绪:
可插入式身份验证和授权,支持 TLS 和 LDAP。轻量级,易于在公共云和私有云中部署。
5、工具和插件:
各种工具和插件支持持续集成、运营指标以及与其他企业系统的集成。灵活的插件方法可扩展 RabbitMQ 功能。
6、管理和监控
用于管理和监控 RabbitMQ 的 HTTPAPI、命令行工具和用户界面。
二、windows中安装RabbitMQ
简单的了解RabbitMQ的特性后,点击右上角的菜单Docs开始学习如何使用RabbitMQ吧。
点击红框内的红色字体下载链接 Downloads and Installation下载RabbitMQ;
点击下图中蓝框内的链接分别下载RabbitMQ和erlang,安装RabbitMQ之前需要先完成erlang的安装。
安装过程就不介绍了,一直下一步、下一步就行了。
安装完成后,可以点开我们的安装目录RabbitMQ Server看一下
有个sbin目录,存放的是一些命令工具,etc点开看一下,有一个readme
In this directory you can find an example configuration file for RabbitMQ.
Note that this directory is not where the real RabbitMQ configuration lives.
The default location for the real configuration file is %APPDATA%\RabbitMQ\rabbitmq.config.
%APPDATA% usually expands to C:\Users%USERNAME%\AppData\Roaming or similar.
这个文件内容告诉了我们RabbitMQ的配置文件的位置
C:\Users%USERNAME%\AppData\Roaming\RabbitMQ\rabbitmq.config
我们访问这个路径C:\Users%USERNAME%\AppData\Roaming,发现这个目录下确实存放了一个RabbitMQ目录,目录下有一个advanced.config文件。
三、在windows启动RabbitMQ
通过命令启动RabbitMQ,在这之前,需要把erlang的安装目录添加到系统的Path环境变量中。
然后进到RabbitMQ的安装目录下的sbin目录下,在地址栏输入cmd,打开命令窗口
rabbitmq-plugins enable rabbitmq_management
如图,就是启动成功了,注意,启动这个插件的同时rabbitmq也已经启动了
四、RabbitMQ Management
在经过上一个步骤之后,通过http://localhost:15672 访问RabbitMQ的控制台管理工具RabbitMQ Management,这里的用户名/密码都是guest
登陆进去之后,就能看到这样一个页面
接下来,详细介绍页面各个菜单的功能。
1、队列操作
在RabbitMQ Management提供的页面中,可以进行一些操作,首先点击上方导航栏的Queues,来对基本的队列进行操作。
创建队列
点击红框内的Add a new queue展开添加队列的表单
作为初学者,只需要填写队列名称即可,其他使用默认值即可。填完名称之后,点击【Add queue】按钮。如图创建了一个test队列。
添加之后,刚刚添加的test会显示在上面的Queues-All queues的表格中。
然后点击表格中的队列名,这是一个链接,点击之后会进入队列的详情页面。
队列详情页面
如图,这就是点击队列名之后进入的队列详情页面,图片中已经标记了页面各个地方的功能。
查看消费者
点击左边的箭头展开,因为现在还没有消费者,所以没有显示。
绑定交换机
如图,在这个部分可以绑定一个交换机,这里的交换机和现实的交换机类似,功能都是转发,只不过这里是转发消息到绑定的队列。
上面的图片中,可以输入交换机名称绑定,但是因为现在还没有创建交换机,所以暂时不做操作。
发布消息
在这个部分,可以发布消息,第一个下拉框选择转发的消息类型,是否持久化,Headers和Properties可以点击右边的?查看具体的说明,在Payload后面的textarea中输入要发送的消息内容,最后点击Publish message。
获取消息
如图,博主在上面的界面发送了一条Hello world!消息,点击Get Message(s)就会查询到发送的消息的详情,显示在下方。
移动消息
在这个部分,可以把消息移动到指定的队列,输入目标队列名称,点击Move messages。
比如,我们创建一个队列hello,把test队列的消息移动到hello队列里。
移动消息
消息已经从test队列移动到了hello队列
原来的test队列已经没有消息了
删除队列
这个功能非常简单,点击删除按钮就可以删除当前的test队列了。
清空队列
这个操作会清空队列中所有消息。
为了测试这个功能,在test队列中发布了一条消息hello
然后清空一下消息
刷新页面,然后再查询一下,发现当前队列已经没有消息了
2、交换机操作
点击上方导航栏的Exchanges,对交换机进行操作。
添加交换机
输入交换机名称,点击Add exchange
如图,添加了一个交换机test_exchange
交换机详情页面
点击表格中的test_exchange进到交换机详情界面
绑定交换机/队列
如图,当前交换机又可以绑定其他交换机或者队列,由此可见,这里的交换机和网络设备的交换机功能一样,queue相当于网络中的计算机,多台计算机通过交换机exchange连接形成一个网络。
同样的,这里的queue通过exchange连接起来,形成一个复杂queue的网络架构。
同样的,也可以绑定路由键,这不就是对应网络设备的路由器吗。
总结,消息队列采用了现实生活中的网络模型,多个queue通过exchange和route相互连接,形成一个复杂的队列结构。
发布消息
这里发布消息和发布消息到队列中是一样的
如图,在交换机test_exchange上绑定了队列test,并发送了一条消息,test队列中成功获取到了这条消息。
删除交换机
简单的删除功能,不介绍了。
五、在Java中使用RabbitMQ
springboot整合RabbitMQ
1、创建一个maven项目
在idea中创建一个maven项目,命名为rabbitmq-maven。
2、添加RabbitMQ的依赖
在pom.xml中添加RabbitMQ的依赖
<dependencies>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>3.0.8</version>
</dependency>
</dependencies>
3、使用RabbitMQ客户端API
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author heyunlin
* @version 1.0
*/
public class RabbitExample {
private static Connection getConn() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setHost("localhost");
factory.setPort(5672);
return factory.newConnection();
}
public static void main(String[] args) {
testGet();
}
static void test() {
String queue = "test_queue";
String exchange = "test_exchange";
try (Connection conn = getConn(); Channel channel = conn.createChannel()) {
// 创建队列
AMQP.Queue.DeclareOk testQueue = channel.queueDeclare(queue, true, false, false, null);
System.out.println(testQueue.getQueue());
// 队列和交换机绑定
channel.queueBind(queue, exchange, "");
// 删除队列
channel.queueDelete(queue);
// 清空队列
channel.queuePurge(queue);
// 创建交换机
channel.exchangeDeclare(exchange, "direct");
// 取消交换机和队列的绑定关系
channel.queueUnbind(queue, exchange, "");
// 发布消息
channel.basicPublish(exchange, "", null, "send message from exchange test_exchange.".getBytes());
// 交换机和交换机/队列绑定
channel.exchangeBind(queue, exchange, "");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
static void testGet() {
Connection conn = null;
Channel channel = null;
try {
conn = getConn();
channel = conn.createChannel();
// 获取队列信息
AMQP.Queue.DeclareOk declareOk = channel.queueDeclarePassive("test");
// 队列名称
String queue = declareOk.getQueue();
// 消息数量
int messageCount = declareOk.getMessageCount();
// 消费者数量
int consumerCount = declareOk.getConsumerCount();
System.out.println(queue);
System.out.println(messageCount);
System.out.println(consumerCount);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} finally {
try {
if (channel != null) {
channel.close();
}
if (conn != null) {
conn.close();
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
}
六、springboot整合RabbitMQ
创建一个springboot项目,命名为rabbitmq-springboot
添加整合了RabbitMQ的starter依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
完整的pom.xml如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.6</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
<lombok.version>1.18.22</lombok.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
修改application.yml,添加以下内容。
spring:
application:
name: rabbitmq
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
server:
port: 8088
logging:
level:
com.example.rabbitmq: debug
新建RabbitMQ的配置类
项目根目录下创建config包,在config包下创建一个类RabbitMQConfig.java
package com.example.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author heyunlin
* @version 1.0
*/
@Configuration
public class RabbitMQConfig {
// 声明需要使用的交换机/路由Key/队列的名称
public static final String DEFAULT_EXCHANGE = "exchange";
public static final String DEFAULT_ROUTE = "route";
public static final String DEFAULT_QUEUE = "queue";
// 声明交换机,需要几个声明几个,这里就一个
@Bean
public DirectExchange exchange(){
return new DirectExchange(DEFAULT_EXCHANGE);
}
// 声明队列,需要几个声明几个,这里就一个
@Bean
public Queue queue(){
return new Queue(DEFAULT_QUEUE);
}
// 声明路由Key(交换机和队列的关系),需要几个声明几个,这里就一个
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(exchange())
.with(DEFAULT_ROUTE);
}
}
消息生产者
项目根目录下创建producer包,在producer包下创建一个类RabbitProducer.java
package com.example.rabbitmq.producer;
import com.example.rabbitmq.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author heyunlin
* @version 1.0
*/
@Component
public class RabbitProducer {
private final RabbitTemplate rabbitTemplate;
@Autowired
public RabbitProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
/**
* 发送消息
* @param message 消息内容
*/
public void sendMessage(Object message) {
rabbitTemplate.convertAndSend(RabbitMQConfig.DEFAULT_EXCHANGE, RabbitMQConfig.DEFAULT_ROUTE, message);
}
}
消息消费者
项目根目录下创建consumer包,在producer包下创建一个类RabbitConsumer.java
package com.example.rabbitmq.consumer;
import com.example.rabbitmq.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author heyunlin
* @version 1.0
*/
@Slf4j
@Component
@RabbitListener(queues = RabbitMQConfig.DEFAULT_QUEUE)
public class RabbitConsumer {
@RabbitHandler
public void receive(Object message) {
log.debug("收到一条消息:{}", message);
}
}
创建控制器类
项目根目录下创建controller包,在controller包下创建一个类MessageController.java,把我们刚刚创建的消息生产者依赖进来。
package com.example.rabbitmq.controller;
import com.example.rabbitmq.producer.RabbitProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
/**
* @author heyunlin
* @version 1.0
*/
@RestController
@RequestMapping(path = "/message", produces = "application/json;charset=utf-8")
public class MessageController {
private final RabbitProducer producer;
@Autowired
public MessageController(RabbitProducer producer) {
this.producer = producer;
}
@RequestMapping(value = "/send", method = RequestMethod.POST)
public void sendMessage(Object message) {
producer.sendMessage(message);
}
}
测试发送消息
这里可以使用多种方式测试,为了方便起见,使用postman
一测试居然报错了,查看后台报错信息,说是简单的消息转换器只支持String类型和字节数组的消息
我们回到上面,把消息类型的参数都改成String的,改完再测试一下
这一次控制台打印出来了收到消息的日志,至此,springboot整合RabbitMQ完美完成。
版权归原作者 Gin--- 所有, 如有侵权,请联系我们删除。