1. 生产者与消费者关系
在RabbitMQ中,生产者(Producer)负责发送消息,通常是应用程序向RabbitMQ服务器发送具有特定路由键的消息;消费者(Consumer)则负责处理接收到的这些消息。在RabbitMQ中,生产者和消费者之间使用交换器(Exchange)和队列(Queue)进行消息路由和存储。生产者将消息发送到交换器,交换器根据消息的路由键将其放入相应的队列中,最后消费者从队列中获取并处理这些消息。
2. 交换器与队列进行消息路由和存储
2.1 交换器与队列
交换器(Exchange)负责处理生产者发送的消息,并根据路由键(Routing Key)将消息分发到相应的队列(Queue)中。RabbitMQ中以下四种类型的交换器:
直接交换器(Direct)
分发交换器(Fanout)
主题交换器(Topic)
头部交换器(Headers)
队列是用于存储消息的数据结构,并为消费者准备好消息以进行消息消费。生产者发送的消息将被保存在一个或多个队列中,等待消费者进行消息处理。
2.2Java代码示例
以下是一个简单的Java代码示例,展示了如何在RabbitMQ中创建交换器、队列和绑定,并进行消息的发送与接收。
添加以下依赖到pom.xml文件
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.13.0</version>
</dependency>
RabbitMqUtils代码:
public class RabbitMqUtils {
//得到一个连接的 channel
public static Channel getChannel() throws Exception{
// 创建工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
// 创建链接
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
2.2 Java代码示例与注释
创建一个生产者ProducerDemo
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ProducerDemo {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建链接
Channel channel = RabbitMqUtils.getChannel();
// 创建交换器
String exchangeName = "test_exchange";
// 1队列名称 2队列类型 3 是否持久化交换机 true是 false否
channel.exchangeDeclare(exchangeName, "direct", true);
// 创建队列
String queueName = "test_queue";
/**
* 生成一个队列
* 1.队列名称
* 2.队列里面的消息是否持久化 默认消息存储在内存中
* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
* 5.其他参数
*/
channel.queueDeclare(queueName, true, false, false, null);
// 绑定队列与交换器
String routingKey = "test.routing.key";
// 1 绑定的队列 2交换机名称 3路由key
channel.queueBind(queueName, exchangeName, routingKey);
// 发送消息
String message = "Hello, RabbitMQ!";
/**
* 发送一个消息
* 1.发送到那个交换机
* 2.路由的 key 是哪个
* 3.其他的参数信息
* 4.发送消息的消息体
*/
channel.basicPublish(exchangeName, routingKey, null, message.getBytes("UTF-8"));
// 关闭资源
channel.close();
// 关闭连接
connection.close();
}
}
以上是在ProducerDemo类中,我们首先创建了一个交换器和一个队列。然后将队列与交换机进行绑定,并设置一个路由键。接着,我们将消息发送到交换器,并使用相同的路由键。
创建一个消费者ConsumerDemo
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class ConsumerDemo {
public static void main(String[] args) throws IOException {
// 创建链接
Channel channel = RabbitMqUtils.getChannel();
// 创建队列
String queueName = "test_queue";
/**
* 生成一个队列
* 1.队列名称
* 2.队列里面的消息是否持久化 默认消息存储在内存中
* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
* 5.其他参数
*/
channel.queueDeclare(queueName, true, false, false, null);
// 创建消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received: " + message);
}
};
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
* 3.消费者未成功消费的回调
*/
channel.basicConsume(queueName, true, consumer);
}
}
2.3 交换器与队列总结
交换器与队列用于实现消息的路由和存储。在具体应用中,我们需要创建不同类型的交换器、队列,并使用路由键进行绑定。 Java代码示例展示了在RabbitMQ中如何进行简单的交换器、队列创建、绑定以及消息的发送和接收
3. 消息确认机制(ACK)
RabbitMQ中的消息确认机制,即ACK(Acknowledgement),是为了确保消息成功地从生产者传递到消费者。消费者处理完一个消息后,需要向RabbitMQ服务器发送一个ACK信号,告知服务器该消息已收到且处理完毕,允许服务器删除这个消息。根据确认时机的不同,ACK分为:
3.1 自动确认(Auto Ack)
自动确认是指,当消费者接收到消息后,会立即向服务器发送ACK信号,不管消息是否处理成功。自动确认的优点是速度快,但是可能导致消息丢失。如果在消费者处理消息过程中发生异常或宕机,由于已经发送了ACK信号,服务器将认为消息已被处理,从而导致消息丢失。
3.2 手动确认(Manual Ack)
手动确认是指,当消费者接收到消息后,可以选择在消息处理成功后再向服务器发送ACK信号。如果消费者处理消息过程中发生异常,可以选择不发送ACK信号,服务器将重新分发该消息给其他消费者。这样可以降低消息丢失的风险,但是相对于自动确认,速度较慢。
Java代码示例:
生产者代码:
public class producer {
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 发布确认
channel.confirmSelect();
// 创建队列
// 持久化
boolean durable = true;
/**
* 生成一个队列
* 1.队列名称
* 2.队列里面的消息是否持久化 默认消息存储在内存中
* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
* 5.其他参数
*/
channel.queueDeclare(TASK_QUEUE_NAME,durable,false,false,null);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
// 设置生产发送消息为持久化消息(要求保存到磁盘上)
/**
* 发送一个消息
* 1.发送到那个交换机
* 2.路由的 key 是哪个
* 3.其他的参数信息
* 4.发送消息的消息体
*/
channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者发送消息:"+message);
}
}
}
消费者代码:
package cn.example.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import java.io.IOException;
public class ConsumerDemo {
public static void main(String[] args) throws IOException {
Channel channel = RabbitMqUtils.getChannel();
String queueName = "ack_queue";
// 创建队列
channel.queueDeclare(queueName, false, false, false, null);
channel.basicQos(1);
// 自动确认
boolean autoAck = false;
// 手动确认
// boolean autoAck = true;
// 创建一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received: " + message);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (!autoAck) {
// 手动确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
* 3.消费者未成功消费的回调
*/
channel.basicConsume(queueName, autoAck, consumer);
}
}
3.3 消息确认机制总结
生产者和消费者通过交换器和队列进行消息的发送与接收。为了确保消息正常传递,RabbitMQ提供了消息确认机制。自动确认的速度较快,但存在消息丢失的风险;手动确认可以降低消息丢失风险,但速度较慢。开发者可根据实际应用场景选择合适的确认机制。
版权归原作者 亿滋 所有, 如有侵权,请联系我们删除。