0


Springboot 整合 阿里云消息队列RabbitMQ版服务

因为公司的需要服务都是用的阿里云相关的产品,最近自己工作中也涉及到了消息队列这一块的业务,索性自己也来从零开始对接阿里云的消息队列服务。

准备

本着学习的前提,寻找是否免费的或者做活动的服务,能白嫖的就白嫖,果然被我找到了。

  1. 进入阿里云官方首页,找到精选活动->阿里云使用中心 点击进入

2.进入页面搜索消息队列

    3.  具体队列的相关配置步骤可参考官方文档:快速入门概述 - 消息队列RabbitMQ版 - 阿里云

    4. 本来Rocket版、Kafka版都想学习的,但只有rabbit版的免费,但也够意思了毕竟不要钱(**注**:虽然免费但后面还留了一个很大的坑等着踩呢)

开始

    1. 创建一个springboot项目 命名为:rabbitmq-aliyun

    2.在application.yml中配置相关mq常量(常量可从前面自己创建的阿里云mq控制台获取)
server:
  port: 8080

aliyun:
  rabbitmq:
    accessKey: 密匙key
    accessKeySecret: 密匙密码
    username: 静态用户名
    password:  静态密码
    vHost: 虚拟机名称
    exchange: 交换机名称
    exType: 交换机类型
    queue: 队列名称
    BindingKey:  路由key
    host: 介入点(公网接入点)
    **注**:本地测试必须使用**公网接入点  ,**但是我们使用的免费rabbitMq服务并没有公网接入点,只有VPC接入点![](https://img-blog.csdnimg.cn/7c9413a029714abcbea5700a3acd1fda.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5ZCb54eV5bC-,size_20,color_FFFFFF,t_70,g_se,x_16)

所以自己按照官方教程一步一步实现,总是连接超时,代码检查了无数遍也没有找到问题所在。(官方教程也没有表明用哪一个接入点地址,进了这个大坑)

最后只能需求官方客户帮助:

本着,不花钱的原则,但是使用VPC接入点 还得购买 阿里云ecs服务,岂不是还得花更多的钱。

最后只能升级服务,并且选择支持公网

所以说,白嫖也并没有完全白嫖,还得花钱,要么买ecs服务,要么升配队列服务

3.创建配置数据映射对象 RabbitMqConfigDTO.class

@Configuration
@ConfigurationProperties("aliyun.rabbitmq")
@Data
public class RabbitMqConfigDTO {

    /**
     * 账户密匙key
     */
    private String accessKey;

    /**
     * 账户密匙
     */
    private String accessKeySecret;

    /**
     *  静态用户名
     */
    private String username;

    /**
     * 静态用户名密码
     */
    private String password;

    /**
     * 虚拟机名称
     */
    private String vHost;

    /**
     * 交换机名
     */
    private String exchange;

    /**
     * 交换机类型
     */
    private String exType;

    /**
     * 队列名
     */
    private String queue;

    /**
     * 绑定规则key
     */
    private String BindingKey;

    /**
     * 接入点地址
     */
    private String host;

}
    4. 创建spring工具类 SpringContextHolder.class 用于获取bean对象
public class SpringContextHolder implements ApplicationContextAware {

    @Autowired
    private static ApplicationContext applicationContext;

    public SpringContextHolder() {
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        SpringContextHolder.applicationContext = applicationContext;
    }

    public static ApplicationContext getApplicationContext() {
        assertApplicationContext();
        return applicationContext;
    }

    public static <T> T getBean(String beanName) {
        assertApplicationContext();
        return (T) applicationContext.getBean(beanName);
    }

    public static <T> T getBean(Class<T> requiredType) {
        assertApplicationContext();
        return applicationContext.getBean(requiredType);
    }

    private static void assertApplicationContext() {
        if (applicationContext == null) {
            throw new RuntimeException("applicaitonContext属性为null,请检查是否注入了SpringContextHolder!");
        }
    }
    5. 创建rabbitMq工具类  RabbitMqUtil.class
@Slf4j
@Component
public class RabbitMqUtil {
    
    @Autowired
    private RabbitMqConfigDTO rabbitMqConfigDTO;

    //第三步 建一个静态的本类
    private static RabbitMqUtil rabbitMqUtil;

    //第四步 初始化
    @PostConstruct
    public void init() {
        rabbitMqUtil = this;
    }
    
    /**
     * 创建队列连接
     * @return
     */
    public static Connection getRabbitConnection(){

        ConnectionFactory factory = new ConnectionFactory();

        //公网接入点
        factory.setHost(rabbitMqUtil.rabbitMqConfigDTO.getHost());
        //静态用户名
        factory.setUsername(rabbitMqUtil.rabbitMqConfigDTO.getUsername());
        //静态密码
        factory.setPassword(rabbitMqUtil.rabbitMqConfigDTO.getPassword());

        //自动恢复
        factory.setAutomaticRecoveryEnabled(true);
        //网络恢复时间
        factory.setNetworkRecoveryInterval(5000);
        //虚拟机名称
        factory.setVirtualHost(rabbitMqUtil.rabbitMqConfigDTO.getVHost());
        //端口
        factory.setPort(5672);
        //连接超时时间
        factory.setConnectionTimeout(30*100);
        //设置握手超时时间
        factory.setHandshakeTimeout(300000000);
        factory.setShutdownTimeout(0);

        //创建连接
        Connection connection = null;
        try {
            connection =factory.newConnection();

        }catch (Exception e){
            log.error("rabbitMq连接异常", e);
        }

        return connection;
    }

    /**
     * 创建队列通道
     * @param connection
     * @return
     */
    public static Channel getRabbitChannel(Connection connection){

        Channel channel = null;
        try {
            channel = connection.createChannel();
            String exchange = rabbitMqUtil.rabbitMqConfigDTO.getExchange();
            channel.exchangeDeclare(exchange,rabbitMqUtil.rabbitMqConfigDTO.getExType(), true, false,false, null);
            channel.queueDeclare(rabbitMqUtil.rabbitMqConfigDTO.getQueue(), true, false, false, new HashMap<String, Object>());
            channel.queueBind(rabbitMqUtil.rabbitMqConfigDTO.getQueue(), rabbitMqUtil.rabbitMqConfigDTO.getExchange(), rabbitMqUtil.rabbitMqConfigDTO.getBindingKey());

        }catch (Exception e){
            log.error("创建rabbitMq通道异常", e);
        }

        return channel;
    }

}
    6.创建server接口类
public interface RabbitMqService {

    /**
     * 发送mq消息
     * @return
     */
    String sendMessage() throws IOException, TimeoutException;

    /**
     * 消费消息
     * @return
     * @throws IOException
     * @throws TimeoutException
     */
    String consumeMessage() throws IOException, TimeoutException;
}
    7.创建实现类
@Service
public class RabbitMqServiceImpl implements RabbitMqService {

    @Autowired
    private RabbitMqConfigDTO rabbitMqConfigDTO;

    @Override
    public String sendMessage() throws IOException {

        Connection connection = RabbitMqUtil.getRabbitConnection();
        Channel channel = RabbitMqUtil.getRabbitChannel(connection);
        //开始发送消息
        for(int i=0; i< 10 ; i++){
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish(rabbitMqConfigDTO.getExchange(), "BindingKey", true, props,
                    ("消息发送Body"  + i).getBytes(StandardCharsets.UTF_8));

        }
        connection.close();
        return "消息发送成功";

    }

    @Override
    public String consumeMessage() throws IOException, TimeoutException {

        Connection connection = RabbitMqUtil.getRabbitConnection();
        Channel channel = RabbitMqUtil.getRabbitChannel(connection);

        String exchange = rabbitMqConfigDTO.getExchange();
        channel.exchangeDeclare(exchange,rabbitMqConfigDTO.getExType(), true, false,false, null);
        channel.queueDeclare(rabbitMqConfigDTO.getQueue(), true, false, false, new HashMap<String, Object>());
        channel.queueBind(rabbitMqConfigDTO.getQueue(), rabbitMqConfigDTO.getExchange(), rabbitMqConfigDTO.getBindingKey());

        // 开始消费消息。
        channel.basicConsume(rabbitMqConfigDTO.getQueue(), false, "ConsumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                //接收到的消息,进行业务逻辑处理。
                System.out.println("Received: "  + new String(body, StandardCharsets.UTF_8) +  ", deliveryTag: "  + envelope.getDeliveryTag()  + ", messageId: " +  properties.getMessageId());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
        connection.close();

        return "消费成功";
    }
    
}
     8.创建控制层
@RestController
public class RabbitMqController {

    @Autowired
    private RabbitMqService rabbitMqService;

    @GetMapping("/sendMessage")
    public String sendMessage() throws IOException, TimeoutException {

        return rabbitMqService.sendMessage();
    }

    @GetMapping("/consumeMessage")
    public String consumeMessage() throws IOException, TimeoutException {

        return rabbitMqService.consumeMessage();
    }
}
    9.项目整体结构

    ![](https://img-blog.csdnimg.cn/8c1b733a48314c928857cce5b0dd22c7.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA5ZCb54eV5bC-,size_18,color_FFFFFF,t_70,g_se,x_16)

 10.完成启动项目

 11.点击获取源码

测试

  1. 发送消息

    1. 进入控制台查看

     此时可以看到堆积10条消息,说明消息发送成功了

    3. 消费消息

     4.再次进入控制台查看

            堆积的消息已变为0说明消息已经被全部消费了

后序

自己遇到不懂的,比如前面本地测试需要公网接入点的问题,百度了不少博客,也没有找到sprigboot 整合 阿里云 消息队列RabbitMQ版的博客已经解决方案,所以博主按自己的实际操作记录下来,希望对同样遇到问题的小伙伴有所帮助。

标签: java 阿里云 rabbitmq

本文转载自: https://blog.csdn.net/qq_39818325/article/details/124391558
版权归原作者 君燕尾 所有, 如有侵权,请联系我们删除。

“Springboot 整合 阿里云消息队列RabbitMQ版服务”的评论:

还没有评论