一、消息队列
什么是消息队列
消息队列,即MQ,Message Queue。
消息队列是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。
二、RabbitMQ
RabbitMQ是基于AMQP的一款消息管理系统。
支持主流的操作系统,Linux、Windows、MacOX等。
支持多种开发语言,Java、Python、Ruby、.NET、PHP、C/C++、node.js等。
官网: Messaging that just works — RabbitMQ
官方教程:RabbitMQ Tutorials — RabbitMQ
RabbitMQ 基本概念
Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。Connection
网络连接,比如一个TCP连接。无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费。Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。Broker
表示消息队列服务器实体。
三、简单消息模型使用
RabbitMQ是一个消息代理:它接受和转发消息。 你可以把它想象成一个邮局:当你把邮件放在邮箱里时,你可以确定邮差先生最终会把邮件发送给你的收件人。 在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。
RabbitMQ与邮局的主要区别是它不处理纸张,而是接受,存储和转发数据消息的二进制数据块。
P(producer/ publisher):生产者,一个发送消息的用户应用程序。
C(consumer):消费者,消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序
队列(红色区域):rabbitmq内部类似于邮箱的一个概念。虽然消息流经rabbitmq和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。
总之:生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区。
代码演示
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.0.6.RELEASE</version>
</dependency>
** 获取连接**
package com.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
/**
* 建立与RabbitMQ的连接
* @return
* @throws Exception
*/
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("192.168.33.88");
//TCP端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("vhost_NetDataGather");
factory.setUsername("admin");
factory.setPassword("123456");
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
}
生产者
import com.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* 生产者
*/
public class Send {
//声明队列名称
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] argv) throws Exception {
// 生产者和Broker建立TCP连接。
Connection connection = ConnectionUtil.getConnection();
// 生产者和Broker建立通道。
Channel channel = connection.createChannel();
// 声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息内容
String message = "Hello World!";
for (int i = 0; i < 10; i++) {
// 向指定的队列中发送消息
message=message+i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
//关闭通道和连接
channel.close();
connection.close();
}
}
消费者
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.util.ConnectionUtil;
/**
* 消费者
*/
public class Recv {
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
}
};
// 监听队列,第二个参数:是否自动进行消息确认。
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
上述代码中:消息一旦被消费者接收,队列中的消息就会被删除。
如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!
因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:
自动ACK:消息一旦被接收,消费者自动发送ACK。
手动ACK:消息接收后,不会发送ACK,需要手动调用。
手动ACK
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.util.ConnectionUtil;
/**
* 消费者,手动进行ACK
*/
public class Recv2 {
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 创建通道
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
// 手动进行ACK
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 监听队列,第二个参数false,手动进行ACK
// 如果第二个参数为true,则会自动进行ACK;如果为false,则需要手动ACK。方法的声明:
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
以上就是RabbitMQ的简单使用讲解,更多详细使用内容,请再多多学习。
版权归原作者 程序猿_liter 所有, 如有侵权,请联系我们删除。