序
因为公司的需要服务都是用的阿里云相关的产品,最近自己工作中也涉及到了消息队列这一块的业务,索性自己也来从零开始对接阿里云的消息队列服务。
准备
本着学习的前提,寻找是否免费的或者做活动的服务,能白嫖的就白嫖,果然被我找到了。
- 进入阿里云官方首页,找到精选活动->阿里云使用中心 点击进入
2.进入页面搜索消息队列
3. 具体队列的相关配置步骤可参考官方文档:快速入门概述 - 消息队列RabbitMQ版 - 阿里云
4. 本来Rocket版、Kafka版都想学习的,但只有rabbit版的免费,但也够意思了毕竟不要钱(**注**:虽然免费但后面还留了一个很大的坑等着踩呢)
开始
1. 创建一个springboot项目 命名为:rabbitmq-aliyun
2.在application.yml中配置相关mq常量(常量可从前面自己创建的阿里云mq控制台获取)
server:
port: 8080
aliyun:
rabbitmq:
accessKey: 密匙key
accessKeySecret: 密匙密码
username: 静态用户名
password: 静态密码
vHost: 虚拟机名称
exchange: 交换机名称
exType: 交换机类型
queue: 队列名称
BindingKey: 路由key
host: 介入点(公网接入点)
**注**:本地测试必须使用**公网接入点 ,**但是我们使用的免费rabbitMq服务并没有公网接入点,只有VPC接入点![](https://img-blog.csdnimg.cn/7c9413a029714abcbea5700a3acd1fda.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5ZCb54eV5bC-,size_20,color_FFFFFF,t_70,g_se,x_16)
所以自己按照官方教程一步一步实现,总是连接超时,代码检查了无数遍也没有找到问题所在。(官方教程也没有表明用哪一个接入点地址,进了这个大坑)
最后只能需求官方客户帮助:
本着,不花钱的原则,但是使用VPC接入点 还得购买 阿里云ecs服务,岂不是还得花更多的钱。
最后只能升级服务,并且选择支持公网
所以说,白嫖也并没有完全白嫖,还得花钱,要么买ecs服务,要么升配队列服务
3.创建配置数据映射对象 RabbitMqConfigDTO.class
@Configuration
@ConfigurationProperties("aliyun.rabbitmq")
@Data
public class RabbitMqConfigDTO {
/**
* 账户密匙key
*/
private String accessKey;
/**
* 账户密匙
*/
private String accessKeySecret;
/**
* 静态用户名
*/
private String username;
/**
* 静态用户名密码
*/
private String password;
/**
* 虚拟机名称
*/
private String vHost;
/**
* 交换机名
*/
private String exchange;
/**
* 交换机类型
*/
private String exType;
/**
* 队列名
*/
private String queue;
/**
* 绑定规则key
*/
private String BindingKey;
/**
* 接入点地址
*/
private String host;
}
4. 创建spring工具类 SpringContextHolder.class 用于获取bean对象
public class SpringContextHolder implements ApplicationContextAware {
@Autowired
private static ApplicationContext applicationContext;
public SpringContextHolder() {
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringContextHolder.applicationContext = applicationContext;
}
public static ApplicationContext getApplicationContext() {
assertApplicationContext();
return applicationContext;
}
public static <T> T getBean(String beanName) {
assertApplicationContext();
return (T) applicationContext.getBean(beanName);
}
public static <T> T getBean(Class<T> requiredType) {
assertApplicationContext();
return applicationContext.getBean(requiredType);
}
private static void assertApplicationContext() {
if (applicationContext == null) {
throw new RuntimeException("applicaitonContext属性为null,请检查是否注入了SpringContextHolder!");
}
}
5. 创建rabbitMq工具类 RabbitMqUtil.class
@Slf4j
@Component
public class RabbitMqUtil {
@Autowired
private RabbitMqConfigDTO rabbitMqConfigDTO;
//第三步 建一个静态的本类
private static RabbitMqUtil rabbitMqUtil;
//第四步 初始化
@PostConstruct
public void init() {
rabbitMqUtil = this;
}
/**
* 创建队列连接
* @return
*/
public static Connection getRabbitConnection(){
ConnectionFactory factory = new ConnectionFactory();
//公网接入点
factory.setHost(rabbitMqUtil.rabbitMqConfigDTO.getHost());
//静态用户名
factory.setUsername(rabbitMqUtil.rabbitMqConfigDTO.getUsername());
//静态密码
factory.setPassword(rabbitMqUtil.rabbitMqConfigDTO.getPassword());
//自动恢复
factory.setAutomaticRecoveryEnabled(true);
//网络恢复时间
factory.setNetworkRecoveryInterval(5000);
//虚拟机名称
factory.setVirtualHost(rabbitMqUtil.rabbitMqConfigDTO.getVHost());
//端口
factory.setPort(5672);
//连接超时时间
factory.setConnectionTimeout(30*100);
//设置握手超时时间
factory.setHandshakeTimeout(300000000);
factory.setShutdownTimeout(0);
//创建连接
Connection connection = null;
try {
connection =factory.newConnection();
}catch (Exception e){
log.error("rabbitMq连接异常", e);
}
return connection;
}
/**
* 创建队列通道
* @param connection
* @return
*/
public static Channel getRabbitChannel(Connection connection){
Channel channel = null;
try {
channel = connection.createChannel();
String exchange = rabbitMqUtil.rabbitMqConfigDTO.getExchange();
channel.exchangeDeclare(exchange,rabbitMqUtil.rabbitMqConfigDTO.getExType(), true, false,false, null);
channel.queueDeclare(rabbitMqUtil.rabbitMqConfigDTO.getQueue(), true, false, false, new HashMap<String, Object>());
channel.queueBind(rabbitMqUtil.rabbitMqConfigDTO.getQueue(), rabbitMqUtil.rabbitMqConfigDTO.getExchange(), rabbitMqUtil.rabbitMqConfigDTO.getBindingKey());
}catch (Exception e){
log.error("创建rabbitMq通道异常", e);
}
return channel;
}
}
6.创建server接口类
public interface RabbitMqService {
/**
* 发送mq消息
* @return
*/
String sendMessage() throws IOException, TimeoutException;
/**
* 消费消息
* @return
* @throws IOException
* @throws TimeoutException
*/
String consumeMessage() throws IOException, TimeoutException;
}
7.创建实现类
@Service
public class RabbitMqServiceImpl implements RabbitMqService {
@Autowired
private RabbitMqConfigDTO rabbitMqConfigDTO;
@Override
public String sendMessage() throws IOException {
Connection connection = RabbitMqUtil.getRabbitConnection();
Channel channel = RabbitMqUtil.getRabbitChannel(connection);
//开始发送消息
for(int i=0; i< 10 ; i++){
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
channel.basicPublish(rabbitMqConfigDTO.getExchange(), "BindingKey", true, props,
("消息发送Body" + i).getBytes(StandardCharsets.UTF_8));
}
connection.close();
return "消息发送成功";
}
@Override
public String consumeMessage() throws IOException, TimeoutException {
Connection connection = RabbitMqUtil.getRabbitConnection();
Channel channel = RabbitMqUtil.getRabbitChannel(connection);
String exchange = rabbitMqConfigDTO.getExchange();
channel.exchangeDeclare(exchange,rabbitMqConfigDTO.getExType(), true, false,false, null);
channel.queueDeclare(rabbitMqConfigDTO.getQueue(), true, false, false, new HashMap<String, Object>());
channel.queueBind(rabbitMqConfigDTO.getQueue(), rabbitMqConfigDTO.getExchange(), rabbitMqConfigDTO.getBindingKey());
// 开始消费消息。
channel.basicConsume(rabbitMqConfigDTO.getQueue(), false, "ConsumerTag", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
//接收到的消息,进行业务逻辑处理。
System.out.println("Received: " + new String(body, StandardCharsets.UTF_8) + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
connection.close();
return "消费成功";
}
}
8.创建控制层
@RestController
public class RabbitMqController {
@Autowired
private RabbitMqService rabbitMqService;
@GetMapping("/sendMessage")
public String sendMessage() throws IOException, TimeoutException {
return rabbitMqService.sendMessage();
}
@GetMapping("/consumeMessage")
public String consumeMessage() throws IOException, TimeoutException {
return rabbitMqService.consumeMessage();
}
}
9.项目整体结构
![](https://img-blog.csdnimg.cn/8c1b733a48314c928857cce5b0dd22c7.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5ZCb54eV5bC-,size_18,color_FFFFFF,t_70,g_se,x_16)
10.完成启动项目
11.点击获取源码
测试
发送消息
- 进入控制台查看
此时可以看到堆积10条消息,说明消息发送成功了
3. 消费消息
4.再次进入控制台查看
堆积的消息已变为0说明消息已经被全部消费了
后序
自己遇到不懂的,比如前面本地测试需要公网接入点的问题,百度了不少博客,也没有找到sprigboot 整合 阿里云 消息队列RabbitMQ版的博客已经解决方案,所以博主按自己的实际操作记录下来,希望对同样遇到问题的小伙伴有所帮助。
版权归原作者 君燕尾 所有, 如有侵权,请联系我们删除。