RabbitMQ简介:
RabbitMQ是一个实现了AMQP(Advanced Message Queuing Protocol)高级消息队列协议的消息队列服务,用Erlang语言。是面向消息的中间件。
你可以把它想像成一个邮局:你把信件放入邮箱,邮递员就会把信件投递到你的收件人处。在这个比喻中,RabbitMQ是一个邮箱、邮局、邮递员。RabbitMQ和邮局的主要区别是,它处理的不是纸,而是接收、存储和发送二进制的数据——消息。
主要流程:生产者(Producer)与消费者(Consumer)和 RabbitMQ 服务(Broker)建立连接, 然后生产者发布消息(Message)同时需要携带交换机(Exchange) 名称以及路由规则(Routing Key),这样消息会到达指定的交换机,然后交换机根据路由规则匹配对应的 Binding,最终将消息发送到匹配的消息队列(Quene),最后 RabbitMQ 服务将队列中的消息投递给订阅了该队列的消费者(消费者也可以主动拉取消息)。
这里总结了rabbitmq的五种模式代码案例,全部手敲测试通过,简单记录
准备环节:
rabbitmq,创建用户,分配主机
创建工程并引入依赖
<dependencies>
<!-- rabbitmq 依赖-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
</dependencies>
1. 简单模式:Hello_world
单个生产者生产消息,直接通过队列进行发送至单个消费者,消费者消费
生产者代码
package com.yutao.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 发送消息
*
* @author yt
* @create 2022/10/14 13:40
*/
public class Producer_HelloWorld{
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置参数
factory.setHost("192.168.149.129");
//端口
factory.setPort(5672);
//自己在网页上创建的虚拟机
factory.setVirtualHost("/itcast");
//账号密码
factory.setUsername("yutao");
factory.setPassword("yutao");
//创建连接
Connection connection = factory.newConnection();
//创建channel
Channel channel = connection.createChannel();
/**
* 创建队列Queue
* 参数:
* 1.queue:队列名称
* 2.durable:是否持久化,当mq重启之后是否还在
* 3.exclusive:是否独占,只能有一个消费者监听这个队列 当connection关闭时是否=删除队列
* 4.aotoDelete:是否自动删除,当没有接收端时是否自动删除 null
* 5.arguments:参数
*/
//如果没有hello_world的队列会自动创建,有就不会创建
channel.queueDeclare("hello_world",true,false,false,null);
//发送消息
/**
* 1.exchange:交换机名称。简单模式用默认的“”
* 2.routingket:路由名称
* 3.props:配置信息
* 4.body:字节数组 发送的数据
*/
String body = "你好呀,我是于涛";
channel.basicPublish("","hello_world",null,body.getBytes());
channel.close();
connection.close();
}
}
消费者代码
package com.yutao.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者
*
* @author yt
* @create 2022/10/14 13:39
*/
public class Consumer_HelloWorld{
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置参数
factory.setHost("192.168.149.129");
//端口
factory.setPort(5672);
//自己在网页上创建的虚拟机
factory.setVirtualHost("/itcast");
//账号密码
factory.setUsername("yutao");
factory.setPassword("yutao");
//创建连接
Connection connection = factory.newConnection();
//创建channel
Channel channel = connection.createChannel();
/**
* 创建队列Queue
* 参数:
* 1.queue:队列名称
* 2.durable:是否持久化,当mq重启之后是否还在
* 3.exclusive:是否独占,只能有一个消费者监听这个队列 当connection关闭时是否=删除队列
* 4.aotoDelete:是否自动删除,当没有接收端时是否自动删除 null
* 5.arguments:参数
*/
//如果没有hello_world的队列会自动创建,有就不会创建
channel.queueDeclare("hello_world",true,false,false,null);
//接收消息
/**
* 1.queue:队列名称
* 2.aotoACK:是否自动确认
* 3.callback:回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
/**
* 回调方法。当收到消息之后会自动执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息,交换机,路由Key
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag = " + consumerTag);
System.out.println("Exchange = " + envelope.getExchange());
System.out.println("RoutingKey = " + envelope.getRoutingKey());
System.out.println("properties = " + properties);
System.out.println("body = " + new String(body));
}
};
channel.basicConsume("hello_world",true,consumer);
}
}
抽取工具类
package com.yutao.producer.util;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author yt
* @create 2022/10/14 14:30
*/
public class RabbitUtils {
public static ConnectionFactory connectionFactory;
static {
connectionFactory = new ConnectionFactory();
//设置参数
connectionFactory.setHost("192.168.149.129");
//端口
connectionFactory.setPort(5672);
//自己在网页上创建的虚拟机
connectionFactory.setVirtualHost("/itcast");
//账号密码
connectionFactory.setUsername("yutao");
connectionFactory.setPassword("yutao");
}
public static Connection getConnection(){
try {
return connectionFactory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return null;
}
public static void closeConnectionAndchannel(Channel channel,Connection connection){
try {
if (channel!=null){
channel.close();
}
if (connection!=null){
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
2.工作模式:work_queues
多个消费者争抢一条消息消费,一条消息只能被一个消费者消费,相当于对简单模式的拓展
让多个消费者绑定到一个队列共同消费队列中的消息。默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息
生产者代码:发送10条消息
package com.yutao.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 发送消息
* 工作队列发送消息,一个发送者发送消息 n个消费者监听 只能有一个消费者消费
*
* @author yt
* @create 2022/10/14 13:40
*/
public class Producer_WorkQueues {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
Connection connection = RabbitUtils.getConnection();
//创建channel
Channel channel = connection.createChannel();
/**
* 创建队列Queue
* 参数:
* 1.queue:队列名称
* 2.durable:是否持久化,当mq重启之后是否还在
* 3.exclusive:是否独占,只能有一个消费者监听这个队列 当connection关闭时是否=删除队列
* 4.aotoDelete:是否自动删除,当没有接收端时是否自动删除 null
* 5.arguments:参数
*/
//如果没有hello_world的队列会自动创建,有就不会创建
channel.queueDeclare("work_queues", true, false, false, null);
//发送消息
/**
* 1.exchange:交换机名称。简单模式用默认的“”
* 2.routingket:路由名称
* 3.props:配置信息
* 4.body:字节数组 发送的数据
*/
for (int i = 0; i < 10; i++) {
String body = i + "你好呀,我是于涛";
channel.basicPublish("", "work_queues", null, body.getBytes());
}
channel.close();
connection.close();
}
}
创建两个消费者(代码相同):
package com.yutao.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者
*
* @author yt
* @create 2022/10/14 13:39
*/
public class Consumer_WorkQueues1 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
Connection connection = RabbitUtils.getConnection();
//创建channel
Channel channel = connection.createChannel();
/**
* 创建队列Queue
* 参数:
* 1.queue:队列名称
* 2.durable:是否持久化,当mq重启之后是否还在
* 3.exclusive:是否独占,只能有一个消费者监听这个队列 当connection关闭时是否=删除队列
* 4.aotoDelete:是否自动删除,当没有接收端时是否自动删除 null
* 5.arguments:参数
*/
//如果没有hello_world的队列会自动创建,有就不会创建
channel.queueDeclare("work_queues",true,false,false,null);
//接收消息
/**
* 1.queue:队列名称
* 2.aotoACK:是否自动确认
* 3.callback:回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
/**
* 回调方法。当收到消息之后会自动执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息,交换机,路由Key
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag = " + consumerTag);
System.out.println("Exchange = " + envelope.getExchange());
System.out.println("RoutingKey = " + envelope.getRoutingKey());
System.out.println("properties = " + properties);*/
System.out.println("body = " + new String(body));
}
};
channel.basicConsume("work_queues",true,consumer);
}
}
消费结果
消费者1:
消费者2
3.订阅模式:pub/sub
生产者将消息发送给交换机,交换机将消息分散到队列中,然后消费者在对应的队列中消费
交换机(Exchange)有三种:
Fanout 广播:将消息交给所有绑定到交换机的队列
Diect 定向:把消息交给符合指定的key的队列
Topic 通配符:把消息交给符合routing pattern(路由模式的队列)
交换机只负责转发消息,不具备存储消息的能力,没有队列绑定交换机,或者没有路由规则的队列,消息将丢失!!!
生产者代码:
创建交换机
创建两个队列
将两个队列绑定至交换机上
发送消息到交换机上
关闭资源
package com.yutao.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.RabbitUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 发送消息
* 生产者将消息发送给交换机,交换机将消息分散到队列中,然后消费者在对应的队列中消费
*
* 创建交换机
* 创建两个队列
* 将两个队列绑定至交换机上
* 发送消息到交换机上
* 关闭资源
*
* @author yt
* @create 2022/10/14 13:40
*/
public class Producer_PubSub {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
Connection connection = RabbitUtils.getConnection();
//创建channel 信道
Channel channel = connection.createChannel();
//创建交换机
/**
* String exchange,交换机名称
* BuiltinExchangeType type, 交换机类型 @See BuiltinExchangeType 定向 广播 通配符 参数匹配
* boolean durable, 是否持久化
* boolean autoDelete, 是否自动删除
* internal 内部使用 一般是false
* Map<String, Object> arguments 参数列表
*/
String ExchangeName = "test_fanout";
channel.exchangeDeclare(ExchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
//创建队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//绑定队列和交换机
/**
* String queue, 队列名称
* String exchange, 交换机名称
* String routingKey 路由key
*/
channel.queueBind(queue1Name,ExchangeName,"");
channel.queueBind(queue2Name,ExchangeName,"");
//发送消息
String body = "日志信息:张三调用了findAll方法 日志级别为:info";
channel.basicPublish(ExchangeName,"",null,body.getBytes());
//释放资源
RabbitUtils.closeConnectionAndchannel(channel,connection);
}
}
两个消费者分别监听两个队列即可
消费者一:接收消息保存至数据库
package com.yutao.consumer;
import com.rabbitmq.client.*;
import util.RabbitUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者
* 接收队列一消息
*
* @author yt
* @create 2022/10/14 13:39
*/
public class Consumer_PubSub1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitUtils.getConnection();
//创建channel
Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";
//接收消息
/**
* 1.queue:队列名称
* 2.aotoACK:是否自动确认
* 3.callback:回调对象
*/
Consumer consumer = new DefaultConsumer(channel) {
/**
* 回调方法。当收到消息之后会自动执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息,交换机,路由Key
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body = " + new String(body));
System.out.println("将日志信息保存至数据库");
}
};
//接收队列一
channel.basicConsume(queue1Name, true, consumer);
}
}
消费者二:接收消息打印至控制台
package com.yutao.consumer;
import com.rabbitmq.client.*;
import util.RabbitUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者
* 接收队列二消息
* @author yt
* @create 2022/10/14 13:39
*/
public class Consumer_PubSub2 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitUtils.getConnection();
//创建channel
Channel channel = connection.createChannel();
String queue2Name = "test_fanout_queue2";
//接收消息
/**
* 1.queue:队列名称
* 2.aotoACK:是否自动确认
* 3.callback:回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
/**
* 回调方法。当收到消息之后会自动执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息,交换机,路由Key
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body = " + new String(body));
System.out.println("将日志信息打印至控制台");
}
};
//接收队列二
channel.basicConsume(queue2Name,true,consumer);
}
}
4. 路由模式:Routing
生产者将消息发送至交换机,不同级别的日志走不同的队列,区分不同的消息,消费者通过订阅不同的队列来处理自己的逻辑
生产者代码:
队列一绑定error级别的日志
队列二绑定info,error,warning级别的日志
队列一只能接收到error的消息
队列二能收到info,error,warning的消息
package com.yutao.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.RabbitUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 发送消息
*
* 队列一绑定error级别的日志
* 队列二绑定info,error,warning级别的日志
*
* 队列一只能接收到error的消息
* 队列二能收到info,error,warning的消息
* @author yt
* @create 2022/10/14 13:40
*/
public class Producer_Routing {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
Connection connection = RabbitUtils.getConnection();
//创建channel 信道
Channel channel = connection.createChannel();
//创建交换机
/**
* String exchange,交换机名称
* BuiltinExchangeType type, 交换机类型 @See BuiltinExchangeType 定向 广播 通配符 参数匹配
* boolean durable, 是否持久化
* boolean autoDelete, 是否自动删除
* internal 内部使用 一般是false
* Map<String, Object> arguments 参数列表
*/
String ExchangeName = "test_direct";
channel.exchangeDeclare(ExchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
//创建队列
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//绑定队列和交换机
/**
* String queue, 队列名称
* String exchange, 交换机名称
* String routingKey 路由key
*/
//队列一绑定error级别的日志
channel.queueBind(queue1Name,ExchangeName,"error");
//队列二绑定info,error,warning级别的日志
channel.queueBind(queue2Name,ExchangeName,"info");
channel.queueBind(queue2Name,ExchangeName,"error");
channel.queueBind(queue2Name,ExchangeName,"warning");
//发送消息
String body = "日志信息:张三调用了findAll方法 日志级别为:info";
//参数二指定绑定的队列
channel.basicPublish(ExchangeName,"info",null,body.getBytes());
//释放资源
RabbitUtils.closeConnectionAndchannel(channel,connection);
}
}
消费队列一(error)
package com.yutao.consumer;
import com.rabbitmq.client.*;
import util.RabbitUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者
* 接收队列一消息
*
* @author yt
* @create 2022/10/14 13:39
*/
public class Consumer_Routing1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitUtils.getConnection();
//创建channel
Channel channel = connection.createChannel();
String queue1Name = "test_direct_queue1";
//接收消息
/**
* 1.queue:队列名称
* 2.aotoACK:是否自动确认
* 3.callback:回调对象
*/
Consumer consumer = new DefaultConsumer(channel) {
/**
* 回调方法。当收到消息之后会自动执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息,交换机,路由Key
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body = " + new String(body));
System.out.println("将日志信息保存至数据库");
}
};
//接收队列一
channel.basicConsume(queue1Name, true, consumer);
}
}
无法收到info的消息 ,因为绑定的是error
消费者二(info,error,warning)
package com.yutao.consumer;
import com.rabbitmq.client.*;
import util.RabbitUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者
* 接收队列一消息
*
* @author yt
* @create 2022/10/14 13:39
*/
public class Consumer_Routing2 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitUtils.getConnection();
//创建channel
Channel channel = connection.createChannel();
String queue2Name = "test_direct_queue2";
//接收消息
/**
* 1.queue:队列名称
* 2.aotoACK:是否自动确认
* 3.callback:回调对象
*/
Consumer consumer = new DefaultConsumer(channel) {
/**
* 回调方法。当收到消息之后会自动执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息,交换机,路由Key
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body = " + new String(body));
System.out.println("将日志信息打印至控制台");
}
};
//接收队列一
channel.basicConsume(queue2Name, true, consumer);
}
}
可以收到
当生产者发送error级别的消息时
队列一能收到
队列二能收到
Routing模式要求队列在绑定交换机时需要指定routing key,消费会转发到符合routing key的队列
5.通配符模式:Topics
生产者生产消息发送至交换机,交换机通过通配符将消息发送给能匹配上的队列,再由订阅队列的消费者进行消费
生产者代码:
package com.yutao.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.RabbitUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 发送消息
*根据通配符发送指定队列
* @author yt
* @create 2022/10/14 13:40
*/
public class Producer_Topics {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
Connection connection = RabbitUtils.getConnection();
//创建channel 信道
Channel channel = connection.createChannel();
//创建交换机
/**
* String exchange,交换机名称
* BuiltinExchangeType type, 交换机类型 @See BuiltinExchangeType 定向 广播 通配符 参数匹配
* boolean durable, 是否持久化
* boolean autoDelete, 是否自动删除
* internal 内部使用 一般是false
* Map<String, Object> arguments 参数列表
*/
String ExchangeName = "test_topic";
channel.exchangeDeclare(ExchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
//创建队列
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//绑定队列和交换机
/**
* String queue, 队列名称
* String exchange, 交换机名称
* String routingKey 路由key
*/
//队列一绑定所有以error为结尾的日志,和绑定order后面所有的日志
channel.queueBind(queue1Name,ExchangeName,"#.error");
channel.queueBind(queue1Name,ExchangeName,"order.*");
//队列二绑定所有级别的日志
channel.queueBind(queue2Name,ExchangeName,"*.*");
//发送消息
String body = "日志信息:张三调用了delete方法 日志级别为:order.info";
//参数二指定绑定的队列
channel.basicPublish(ExchangeName,"order.info",null,body.getBytes());
//释放资源
RabbitUtils.closeConnectionAndchannel(channel,connection);
}
}
消费者一(队列:test_topic_queue1)
package com.yutao.consumer;
import com.rabbitmq.client.*;
import util.RabbitUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者
* 接收队列一消息
*
* @author yt
* @create 2022/10/14 13:39
*/
public class Consumer_Topic1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitUtils.getConnection();
//创建channel
Channel channel = connection.createChannel();
String queue1Name = "test_topic_queue1";
//接收消息
/**
* 1.queue:队列名称
* 2.aotoACK:是否自动确认
* 3.callback:回调对象
*/
Consumer consumer = new DefaultConsumer(channel) {
/**
* 回调方法。当收到消息之后会自动执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息,交换机,路由Key
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body = " + new String(body));
System.out.println("将日志信息保存至数据库");
}
};
//接收队列一
channel.basicConsume(queue1Name, true, consumer);
}
}
消费者二(队列:test_topic_queue2)
package com.yutao.consumer;
import com.rabbitmq.client.*;
import util.RabbitUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者
* 接收队列一消息
*
* @author yt
* @create 2022/10/14 13:39
*/
public class Consumer_Topic2 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitUtils.getConnection();
//创建channel
Channel channel = connection.createChannel();
String queue2Name = "test_topic_queue2";
//接收消息
/**
* 1.queue:队列名称
* 2.aotoACK:是否自动确认
* 3.callback:回调对象
*/
Consumer consumer = new DefaultConsumer(channel) {
/**
* 回调方法。当收到消息之后会自动执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息,交换机,路由Key
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body = " + new String(body));
System.out.println("将日志信息保存至数据库");
}
};
//接收队列一
channel.basicConsume(queue2Name, true, consumer);
}
}
通配符和定向相比,通配符更加灵活!
SpringBoot整合RabbitMq
生产者:
创建工程,导入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.3.9.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.3.10.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>
在application.yml中导入配置信息
#基本信息
spring:
rabbitmq:
#ip
host: 192.168.149.129
#端口
port: 5672
username: guest
password: guest
#虚拟机
virtual-host: /
创建rabbitmq的配置,创建交换机,并绑定队列,这里演示的是通配符的配置
package com.yutao.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 主要是创建一个交换机,创建一个队列,将二者绑定上即可,这里只简单列出方法,后续可以在此基础上进行拓展
* @author yt
* @create 2022/10/14 16:44
*/
@Configuration
public class RabbitMQConfig {
//交换机名称
public static final String EXCANGE_NAME = "boot_topic_echange";
//队列名称
public static final String QUEUE_NAME = "boot_queue";
//交换机
@Bean("bootExchange")
public Exchange bootExchange() {
//创建通配符交换机
return ExchangeBuilder.topicExchange(EXCANGE_NAME).durable(true).build();
}
//队列
@Bean("bootQueue")
public Queue bootQueue() {
return QueueBuilder.durable(QUEUE_NAME).build();
}
//绑定关系
/**
* 那个队列和那个交换机进行绑定.最后with的是routing Key 这里展示的是通配符的配置
*/
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}
创建启动类
package com.yutao;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author yt
* @create 2022/10/14 16:43
*/
@SpringBootApplication
public class ProducerAppliction {
public static void main(String[] args) {
SpringApplication.run(ProducerAppliction.class,args);
}
}
创建测试类,导入rabbitTemplate,进行消息发送
convertAndSend参数分别是:交换机名称,routingkey,发送的消息
package com.yutao;
import com.yutao.rabbitmq.config.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @author yt
* @create 2022/10/14 16:55
*/
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSend() {
//convertAndSend参数分别是:交换机名称,routingkey,发送的消息
String msg = "springboot 整合 rabbitmq";
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCANGE_NAME, "boot.haha", msg);
}
}
消息发送成功!
消费者:
创建工程,引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.3.9.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.3.10.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>
编写application.yml文件
#基本信息
spring:
rabbitmq:
#ip
host: 192.168.149.129
#端口
port: 5672
username: guest
password: guest
#虚拟机
virtual-host: /
创建启动类
package com.yutao;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author yt
* @create 2022/10/14 17:13
*/
@SpringBootApplication
public class ConsumerAppliction {
public static void main(String[] args) {
SpringApplication.run(ConsumerAppliction.class,args);
}
}
创建监听队列的方法,注意要注入spring容器
package com.yutao.rabbitmq.consumer;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author yt
* @create 2022/10/14 17:15
*/
@Component
public class RabbitMQListener {
@RabbitListener(queues = "boot_queue")
public void ListenerQueue(Message message) {
System.out.println("message = " + message);
System.out.println("收到消息为:" + new String(message.getBody()));
}
}
启动之后即可收到消息
消息丢失:
在异常情况下,比如交换机挂了,消费者挂了,都会导致消息丢失的情况
1.生产者到rabbitmq消息丢失,这个可以用手动ack确认来解决
2.消息到达rabbitmq中丢失,这个可以用rabbit中的持久化来解决
3.rabbitmq到消费者过程丢失,不能判断是否被消费,这个也可以用回调的方式来解决
消息生产者到rabbitMq手动ACK确认
在原有的生产者代码的application.yml文件中增加如下配置
spring:
rabbitmq:
#ip
host: 120.27.224.146 #mq服务器的ip地址
port: 5678
virtual-host: /sbzq
username: kaying
password: hs3PGVzZD1rmrdSx
connection-timeout: 15000
//发送消息到交换机后会触发回调方法
publisher-confirm-type: correlated
添加回调方法即可显示成功与失败
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* 实现 RabbitTemplate.ConfirmCallback 接口 实现里面的回调方法
* 将 rabbitTemplate 用init方法注入到 ConfirmCallback中即可使用 @PostConstruct注解是将rabbitTemplate注入
* @author yt
* @create 2022/12/29 15:09
*/
@Component
public class Callback implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
//注入
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
}
/**
*
* @param correlationData 保存了回调信息的id及相关信息(可以自己将发送的消息放到里面)
* @param ack 交换机是否收到消息true=收到了,false=没收到
* @param cause 失败原因,成功=null,失败会有失败原因,可以作为日志保存
*/
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) { // 消息投递到broker 的状态,true表示成功
System.out.println("消息发送到Broker成功!");
System.out.println("correlationData = " + correlationData);
System.out.println("ack = " + ack);
System.out.println("cause = " + cause);
} else { // 发送异常
System.out.println("发送失败");
System.out.println("correlationData = " + correlationData);
System.out.println("ack = " + ack);
System.out.println("cause = " + cause);
}
}
}
结果:
如果交换机到队列的时候找不到队列,将会删除消息造成消息丢失
配置文件中增加
#基本信息
spring:
rabbitmq:
#ip
host: 120.27.224.146 #mq服务器的ip地址
port: 5678
virtual-host: /sbzq
username: kaying
password: hs3PGVzZD1rmrdSx
connection-timeout: 15000
publisher-confirm-type: correlated
//回退消息
publisher-returns: true
实现回退接口并注入
package com.yutao.rabbitmq.config;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* 实现 RabbitTemplate.ConfirmCallback 接口 实现里面的回调方法
* 将 rabbitTemplate 用init方法注入到 ConfirmCallback中即可使用 @PostConstruct注解是将rabbitTemplate注入
* @author yt
* @create 2022/12/29 15:09
*/
@Component
public class Callback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
//注入
@PostConstruct
public void init(){
//注入回调
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
/**
*
* @param correlationData 保存了回调信息的id及相关信息(可以自己将发送的消息放到里面)
* @param ack 交换机是否收到消息true=收到了,false=没收到
* @param cause 失败原因,成功=null,失败会有失败原因,可以作为日志保存
*/
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) { // 消息投递到broker 的状态,true表示成功
System.out.println("消息发送到Broker成功!");
System.out.println("correlationData = " + correlationData);
System.out.println("ack = " + ack);
System.out.println("cause = " + cause);
} else { // 发送异常
System.out.println("发送失败");
System.out.println("correlationData = " + correlationData);
System.out.println("ack = " + ack);
System.out.println("cause = " + cause);
}
}
//在消息不可达消费者时将会退给生产者,只有不可达目的地时才会回退
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("returnedMessage = " + returnedMessage);
}
}
结果
版权归原作者 于京京9909 所有, 如有侵权,请联系我们删除。