0


rabbitMq(路由模式)

路由模式:

idea实现路由模式

package com.aaa.test.procedure;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//发布消息
public class MyProcedureExDirect {
@Test
public void procedure() throws IOException, TimeoutException {
//mq 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUsername("cjh");
connectionFactory.setPassword("cjh");
connectionFactory.setHost("192.168.152.32");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/cjh");
// 创建管道
Connection connection = connectionFactory.newConnection();
// 管道
Channel channel = connection.createChannel();

/**

  • 发布订阅的时候
  • 交换机->direct
  • routingkey是一个具体的值
  • 队列

*/
// 创建交换机
channel.exchangeDeclare("exchange_direct_test", BuiltinExchangeType.DIRECT,false);
// 创建交换机
channel.queueDeclare("exchange_direct_queue_1",false,false,false,null);
channel.queueDeclare("exchange_direct_queue_2",false,false,false,null);
// 交换机绑定队列
channel.queueBind("exchange_direct_queue_1","exchange_direct_test","error");
channel.queueBind("exchange_direct_queue_2","exchange_direct_test","test");
channel.queueBind("exchange_direct_queue_2","exchange_direct_test","test2");
发布消息
channel.basicPublish("exchange_direct_test","error",null,"我就是测试一下路由模式".getBytes());
}
}

消费路由消息

package org.example;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**

  • Hello world!

*/
public class ConsumerAppExchangeDirect

{
public static void main( String[] args ) throws IOException, TimeoutException {
//mq 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUsername("cjh");
connectionFactory.setPassword("cjh");
connectionFactory.setHost("192.168.152.32");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/cjh");
// 创建管道
Connection connection = connectionFactory.newConnection();
// 管道
Channel channel = connection.createChannel();
// 消费消息
Consumer consumer= new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 获取队列消息 body
String s = new String(body);
System.out.println("mq中的消息************* = " + s);
}
};
channel.basicConsume("exchange_direct_queue_1",true,consumer);
}
}

主题模式:

路由模式 角色是一样的

生产者 消费者 交换机 队列

交换机的类型topic 类型的

交换机和队列绑定的时候 routingkey的值属于通配符类型的

代表匹配一个或者多个单词 (多个单词之间是以.分割的)

Test.# test.aaa test.aaa.bbb

  • 代表的是匹配一个单词

. 两个单词 test.aaa test.test

package com.aaa.test.procedure;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//发布消息
public class MyProcedureExTopic {
@Test
public void procedure() throws IOException, TimeoutException {
//mq 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUsername("cjh");
connectionFactory.setPassword("cjh");
connectionFactory.setHost("192.168.152.32");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/cjh");
// 创建管道
Connection connection = connectionFactory.newConnection();
// 管道
Channel channel = connection.createChannel();

/**

  • 发布订阅的时候
  • 交换机->topic
  • routingkey是一个具体的值
  • 队列

/
// 创建交换机
channel.exchangeDeclare("exchange_topic_test", BuiltinExchangeType.TOPIC,false);
// 创建交换机
channel.queueDeclare("exchange_topic_queue_1",false,false,false,null);
channel.queueDeclare("exchange_topic_queue_2",false,false,false,null);
// 交换机绑定队列
channel.queueBind("exchange_topic_queue_1","exchange_topic_test","test.#");
channel.queueBind("exchange_topic_queue_2","exchange_topic_test","
.aaa");
channel.queueBind("exchange_topic_queue_2","exchange_topic_test","test.*");
发布消息
channel.basicPublish("exchange_topic_test","test.aaa",null,"我就是测试一下路由模式".getBytes());
}
}

整合SpringBoot

加依赖

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-amqp</artifactId>

</dependency> <dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-test</artifactId>

</dependency>

配置文件

spring:
rabbitmq:
username: cjh
password: cjh
host: 192.168.152.32
virtual-host: /cjh #虚拟主机在设置的时候以/开头
port: 5672
mq:
exchange:
name: test_exchange_topic
queue:
name1: test_topic_exchange_queue_1
name2: test_topic_exchange_queue_2

配置类

声明交换机

声明队列

交换机和对垒进行绑定

package com.aaa.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TopicMqConfig {
// 1.交换机
@Value("${mq.exchange.name}")
private String EXCHANGENAME;

@Value("${mq.queue.name1}")
private String QUEUENAME1;

@Value("${mq.queue.name2}")
private String QUEUENAME2;
// 声明交换机
@Bean("ex1")
public Exchange getExChange(){
Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGENAME).durable(false).build();
return exchange;
}

// 2.队列
@Bean("queue1")
public Queue getQueue1(){
Queue queue = QueueBuilder.nonDurable(QUEUENAME1).build();
return queue;
}
@Bean("queue2")
public Queue getQueue2(){
Queue queue = QueueBuilder.nonDurable(QUEUENAME2).build();
return queue;
}
// 绑定交换机和队列
@Bean("binding1")
public Binding bindingExQueue1(@Qualifier("ex1")Exchange exchange,@Qualifier("queue1") Queue queue){
Binding binding = BindingBuilder.bind(queue).to(exchange).with(".").noargs();
return binding;
}

@Bean("binding2")
public Binding bindingExQueue2(@Qualifier("ex1")Exchange exchange,@Qualifier("queue2") Queue queue){
Binding binding = BindingBuilder.bind(queue).to(exchange).with("test.#").noargs();
return binding;
}

}

生产者发布消息

package com.aaa;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
@SpringBootTest
public class MqTest {
@Value("${mq.exchange.name}")
private String EXCHANGENAME;
@Resource
public RabbitTemplate rabbitTemplate;
@Test
void sendMsg(){
rabbitTemplate.convertAndSend(EXCHANGENAME,"test.topic","我就是测试一下springBoot和mq的整合");
}
}

消费消息

消费者的服务application.yml和生产者是一样的

只需要监听队列(队列里面只要有消息就开始接收)

消息的可靠性传递

开启确认模式

package com.aaa;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
@SpringBootTest
public class MqTest {
@Value("${mq.exchange.name}")
private String EXCHANGENAME;
@Resource
public RabbitTemplate rabbitTemplate;
@Test
void sendMsg(){
// 消息 访问成功了没
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*

  • @param correlationData
  • @param b 消息是否发送成功
  • @param s

*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
if (b){
System.out.println("发送成功");
}else {
System.out.println("发送失败,原因是:"+s);
}
}
});
rabbitTemplate.convertAndSend(EXCHANGENAME,"test.topic","我就是测试一下springBoot和mq的整合");
}
}

Return(回退模式)

测试类

@Test
void sendMsgReturn() {
// 代表使用的是回退模式
rabbitTemplate.setMandatory(true);

// 消息
rabbitTemplate.setReturnsCallback(returnedMessage -> System.out.println("消息回退了,回退的消息是:"+new String(returnedMessage.getMessage().getBody())));
rabbitTemplate.convertAndSend(EXCHANGENAME, "aaa.topic", "我就是测试一下springBoot和mq的整合");
}

标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/2302_79259015/article/details/134600461
版权归原作者 ෆ710 所有, 如有侵权,请联系我们删除。

“rabbitMq(路由模式)”的评论:

还没有评论