1. 基于 SpringBoot 进行 RabbitMQ 的开发
1.1 工作队列模式
1)引入依赖
2)添加配置
spring:
rabbitmq:
host: 44.34.51.65
port: 5672
username: lk
password: lk
virtual-host: study
3)编写生产者代码
public class Constants {
//工作队列模式
public static final String WORK_QUEUE = "work.queue";
}
声明队列
@Configuration
public class RabbitMQConfig {
//工作队列模式
@Bean("workQueue")
public Queue workQueue() {
return QueueBuilder.durable(Constants.WORK_QUEUE).build();
}
}
@RequestMapping("/producer")
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/work")
public String work() {
for (int i = 0; i < 10; i++) {
//使用内置交换机发送消息,routingKey 和队列名称保持一致
rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE,"hello queue.work" + i);
}
return "发送成功";
}
}
生产者测试
4)编写消费者代码
定义监听类
@Component
public class WorkListener {
@RabbitListener(queues = Constants.WORK_QUEUE)
public void queueListener1(Message message) {
System.out.println("listener 1 [" + Constants.WORK_QUEUE + "] 接收到的消息:" + message);
}
@RabbitListener(queues = Constants.WORK_QUEUE)
public void queueListener2(Message message) {
System.out.println("listener 2 [" + Constants.WORK_QUEUE + "] 接收到的消息:" + message);
}
}
@RabbitListener 是 Spring 框架中用于监听 RabbitMQ 队列的注解,通过使用这个注解,可以定义一个方法,以便从 RabbitMQ 队列中接收消息,该注解支持多种参数类型
1)String:返回消息的内容
2)Message:Spring AMQP 的 Message 类,返回原始的消息体以及消息的属性,如消息 ID、内容、队列信息等
3)Channel:RabbitMQ 的通道对象,可以用于进行更高级的操作,如手动确认消息
消费者测试
2 Publish/Subscribe(发布订阅模式)
1)编写生产者代码
//发布订阅模式
public static final String FANOUT_QUEUE1 = "fanout.queue1";
public static final String FANOUT_QUEUE2 = "fanout.queue2";
public static final String FANOUT_EXCHANGE = "fanout.exchange";
//发布订阅模式
@Bean("fanoutQueue1")
public Queue fanoutQueue1() {
return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();
}
@Bean("fanoutQueue2")
public Queue fanoutQueue2() {
return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();
}
@Bean("fanoutExchange")
public FanoutExchange fanoutExchange() {
return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).durable(true).build();
}
@Bean("fanoutQueueBinding1")
public Binding fanoutQueueBinding1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue1") Queue queue) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
@Bean("fanoutQueueBinding2")
public Binding fanoutQueueBinding2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue2") Queue queue) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
声明两个队列和一个交换机,然后绑定队列和交换机
@RequestMapping("/fanout")
public String fanout() {
rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE,"","hello queue.fanout");
return "发送成功";
}
2)编写消费者代码
由于交换机和队列的绑定关系已经由生产者写完,所以消费者不需要再写
定义监听类
@Component
public class FanoutListener {
@RabbitListener(queues = Constants.FANOUT_QUEUE1)
public void queueListener1(String message) {
System.out.println("队列[" + Constants.FANOUT_QUEUE1 + "] 接收到的消息:" + message);
}
@RabbitListener(queues = Constants.FANOUT_QUEUE2)
public void queueListener2(String message) {
System.out.println("队列[" + Constants.FANOUT_QUEUE2 + "] 接收到的消息:" + message);
}
}
运行程序,观察结果
1.3 Routing(路由模式)
交换机类型为 Direct 时,会把消息交给指定 routing key 的队列
1)编写生产者代码
//路由模式
public static final String DIRECT_QUEUE1 = "direct.queue1";
public static final String DIRECT_QUEUE2 = "direct.queue2";
public static final String DIRECT_EXCHANGE = "direct.exchange";
声明队列,并和交换机绑定
//路由模式
@Bean("directQueue1")
public Queue directQueue1() {
return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();
}@Bean("directQueue2")
public Queue directQueue2() {
return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();
}
@Bean("directExchange")
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build();
}
@Bean("directQueueBinding1")
public Binding directQueueBinding1(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue1") Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with("orange");
}
@Bean("directQueueBinding2")
public Binding directQueueBinding2(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with("black");
}
@Bean("directQueueBinding3")
public Binding directQueueBinding3(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with("green");
}
@RequestMapping("/direct/{routingKey}")
public String direct(@PathVariable("routingKey") String routingKey) {
rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE,routingKey,"hello queue.direct,my routingKey is :" + routingKey);
return "发送成功";
}
使用接口发送消息
2)编写消费者代码
定义监听类
@Component
public class DirectListener {
@RabbitListener(queues = Constants.DIRECT_QUEUE1)
public void queueListener1(String message) {
System.out.println("队列 [" + Constants.DIRECT_QUEUE1 + "] 接收到消息:" + message);
}
@RabbitListener(queues = Constants.DIRECT_QUEUE2)
public void queueListener2(String message) {
System.out.println("队列 [" + Constants.DIRECT_QUEUE2 + "] 接收到消息:" + message);
}
}
运行程序,观察结果
1.4 Topics(通配符模式)
1)编写生产者代码
//通配符模式
public static final String TOPIC_QUEUE1 = "topic.queue1";
public static final String TOPIC_QUEUE2 = "topic.queue2";
public static final String TOPIC_EXCHANGE = "topic.exchange";
声明队列和交换机,并绑定
//通配符模式
@Bean("topicQueue1")
public Queue topicQueue1() {
return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();
}
@Bean("topicQueue2")
public Queue topicQueue2() {
return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();
}
@Bean("topicExchange")
public TopicExchange topicExchange() {
return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).durable(true).build();
}
@Bean("topicQueueBinding1")
public Binding topicQueueBinding1(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue1") Queue queue) {
return BindingBuilder.bind(queue).to(topicExchange).with("*.orange.*");
}
@Bean("topicQueueBinding2")
public Binding topicQueueBinding2(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue2") Queue queue) {
return BindingBuilder.bind(queue).to(topicExchange).with("*.*.rabbit");
}@Bean("topicQueueBinding3")
public Binding topicQueueBinding3(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue2") Queue queue) {
return BindingBuilder.bind(queue).to(topicExchange).with("lazy.#");
}
@RequestMapping("/topic")
public String topic(String routingKey) {
rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE,routingKey,"hello queue.topic,my routingKey is :" + routingKey);
return "发送成功";
}
使用接口发送消息
2)编写消费者代码
@Component
public class TopicListener {
@RabbitListener(queues = Constants.TOPIC_QUEUE1)
public void queueListener1(String message) {
System.out.println("队列[" + Constants.TOPIC_QUEUE1 + "] 接收到的消息:" + message);
}
@RabbitListener(queues = Constants.TOPIC_QUEUE2)
public void queueListener2(String message) {
System.out.println("队列[" + Constants.TOPIC_QUEUE2 + "] 接收到的消息:" + message);
}
}
运行程序,观察结果
2. 基于 SpringBoot + RabbitMQ 完成应用通信
需求描述:
用户下单成功之后,通知物流系统,进行发货
订单系统作为一个生产者,物流系统作为一个消费者
2.1 创建项目
1)创建一个空的文件夹,将两个项目放在一起
2)在这个项目里面创建 Module
3)后续流程和创建 SpringBoot 项目一样
创建两个项目
logistics-service
order-service
2.2 订单系统(生产者)
1)完善配置信息
spring:
rabbitmq:
host: 44.34.51.65
port: 5672
username: lk
password: lk
virtual-host: order
2)声明队列
@Configuration
public class RabbitConfig {
@Bean("orderQueue")
public Queue orderQueue() {
return QueueBuilder.durable("order.create").build();
}
}
3)编写下单接口,下单成功之后,发送订单消息
@RequestMapping("/order")
@RestController
public class OrderController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/createOrder")
public String createOrder() {
//发送消息通知
String orderID = UUID.randomUUID().toString();
rabbitTemplate.convertAndSend("","order.create","下单成功,订单 ID:" + orderID);
return "下单成功";
}
}
4)启动服务,观察结果
2.3 物流系统(消费者)
1)完善配置信息
server:
port: 9090
spring:
rabbitmq:
host: 44.34.51.65
port: 5672
username: lk
password: lk
virtual-host: order
2)监听队列
@Component
public class OrderListener {
@RabbitListener(queues = "order.create")
public void ListenerQueue(String message) {
System.out.println("接收到消息:" + message);
}
}
@RabbitListener(queues = "order.create") 可以加在类上,也可以加在方法上,加在类上,如果消息是 String 类型就会执行 String 类型的方法,如果是对象类型,就会执行对象类型的方法
@RabbitHandler 是一个方法级别的注解,当使用 @RabbitHandler 注解时,这个方法被调用处理特定的消息
@Component
public class OrderCreateListener {
@RabbitHandler
@RabbitListener(queues = "order.create")
public void ListenerQueue(String message){
System.out.println("接收到消息:"+ message);
}
}
2.4 启动服务,观察结果
2.5 发送消息格式为对象
如果通过 RabbitTemplate 发送一个对象作为消息,需要对该对象进行序列化,Spring AMQP 推荐使用 JSON 序列化,Spring AMQP 提供了 Jackson2JsonMessageConverter 和 MappingJackson2MessageConverter 等转换器,需要把一个 MessageConverter 设置到 RabbitTemplate 中
@Configuration
public class RabbitConfig {
@Bean("orderQueue")
public Queue orderQueue() {
return QueueBuilder.durable("order.create").build();
}
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,Jackson2JsonMessageConverter jackson2JsonMessageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);
return rabbitTemplate;
}
}
定义一个对象
@Data
public class OrderInfo {
private String orderId;
private String name;
}
生产者代码
@RequestMapping("/order")
@RestController
public class OrderController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/createOrder2")
public String createOrder2() {
//发送消息通知
OrderInfo orderInfo = new OrderInfo();
orderInfo.setOrderId(UUID.randomUUID().toString());
orderInfo.setName("商品" + new Random().nextInt(100));
rabbitTemplate.convertAndSend("","order.create",orderInfo);
return "下单成功";
}
}
这里注入进来的 RabbitTemplate 就会使用刚才自己创建的对象,此时运行程序,观察结果
此时发送的就是对象了
版权归原作者 Lky strive 所有, 如有侵权,请联系我们删除。