0


springboot 整合 mqtt

springboot 整合 mqtt

最近由于iot越来越火, 物联网的需求越来越多, 那么理所当然的使用mqtt的场景也就越来越多,
接下来是我使用springboot整合mqtt的过程, 以及踩过的一些坑.

mqtt服务器使用的是 EMQX, 官网 : 这里

搭建的时候如果你使用的是集群 记得开放以下端口:

EMQX集群端口

好了, 搭建成功下一步就是我们的java程序要与mqtt连接, 这里有两种方式(其实不止两种)进行连接.
一是 直接使用 MQTT Java 客户端库,详情可以查看官方的例子: MQTT Java 客户端 我就跳过了

二是使用

spring integration mqtt

也是比较推荐的一种,也是我们主讲这种.

第一步 添加 maven dependency

<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.5.14</version></dependency>

第二步 添加配置

1 先写好一些基本配置

mqtt:username: test                        # 账号password:123456# 密码host-url: tcp://127.0.0.1:1883# mqtt连接tcp地址in-client-id: ${random.value}# 随机值,使出入站 client ID 不同out-client-id: ${random.value}client-id: ${random.int}# 客户端Id,不能相同,采用随机数 ${random.value}default-topic: test/#,topic/+/+/up         # 默认主题timeout:60# 超时时间keepalive:60# 保持连接clearSession:true# 清除会话(设置为false,断开连接,重连后使用原来的会话 保留订阅的主题,能接收离线期间的消息)
  1. 然后写一个对应的类MqttProperties
importorg.springframework.beans.factory.annotation.Value;importorg.springframework.stereotype.Component;/**
 * MqttProperties 
 *
 * @author hengzi
 * @date 2022/8/23
 */@ComponentpublicclassMqttProperties{/**
     * 用户名
     */@Value("${mqtt.username}")privateString username;/**
     * 密码
     */@Value("${mqtt.password}")privateString password;/**
     * 连接地址
     */@Value("${mqtt.host-url}")privateString hostUrl;/**
     * 进-客户Id
     */@Value("${mqtt.in-client-id}")privateString inClientId;/**
     * 出-客户Id
     */@Value("${mqtt.out-client-id}")privateString outClientId;/**
     * 客户Id
     */@Value("${mqtt.client-id}")privateString clientId;/**
     * 默认连接话题
     */@Value("${mqtt.default-topic}")privateString defaultTopic;/**
     * 超时时间
     */@Value("${mqtt.timeout}")privateint timeout;/**
     * 保持连接数
     */@Value("${mqtt.keepalive}")privateint keepalive;/**是否清除session*/@Value("${mqtt.clearSession}")privateboolean clearSession;// ...getter and setter}

接下来就是配置一些乱七八糟的东西, 这里有很多概念性的东西 比如 管道

channel

, 适配器

adapter

, 入站

Inbound

, 出站

Outbound

,等等等等, 看起来是非常头痛的

好吧,那就一个一个来,

首先连接mqtt需要一个客户端, 那么我们就开一个客户端工厂, 这里可以产生很多很多的客户端

@BeanpublicMqttPahoClientFactorymqttPahoClientFactory(){DefaultMqttPahoClientFactory factory =newDefaultMqttPahoClientFactory();MqttConnectOptions options =newMqttConnectOptions();
        options.setServerURIs(mqttProperties.getHostUrl().split(","));
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        factory.setConnectionOptions(options);return factory;}

然后再搞两根管子(

channel

),一个出站,一个入站

//出站消息管道,@BeanpublicMessageChannelmqttOutboundChannel(){returnnewDirectChannel();}// 入站消息管道@BeanpublicMessageChannelmqttInboundChannel(){returnnewDirectChannel();}

为了使这些管子能流通 就需要一个适配器(

adapter

)

// Mqtt 管道适配器@BeanpublicMqttPahoMessageDrivenChannelAdapteradapter(MqttPahoClientFactory factory){returnnewMqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));}

然后定义消息生产者

// 消息生产者@BeanpublicMessageProducermqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(newDefaultPahoMessageConverter());//入站投递的通道
        adapter.setOutputChannel(mqttInboundChannel());
        adapter.setQos(1);return adapter;}

那我们收到消息去哪里处理呢,答案是这里:

@Bean//使用ServiceActivator 指定接收消息的管道为 mqttInboundChannel,投递到mqttInboundChannel管道中的消息会被该方法接收并执行@ServiceActivator(inputChannel ="mqttInboundChannel")publicMessageHandlerhandleMessage(){// 这个 mqttMessageHandle 其实就是一个 MessageHandler 的实现类(这个类我放下面)return mqttMessageHandle;// 你也可以这样写//        return new MessageHandler() {//            @Override//            public void handleMessage(Message<?> message) throws MessagingException {//                // do something//            }//        };}

到这里我们其实已经可以接受到来自mqtt的消息了

接下来配置向mqtt发送消息

配置 出站处理器

// 出站处理器@Bean@ServiceActivator(inputChannel ="mqttOutboundChannel")publicMessageHandlermqttOutbound(MqttPahoClientFactory factory){MqttPahoMessageHandler handler =newMqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);
        handler.setAsync(true);
        handler.setConverter(newDefaultPahoMessageConverter());
        handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);return handler;}

这个 出站处理器 在我看来就是让别人 (

MqttPahoMessageHandler

)处理了, 我就不处理了,我只管我要发送什么,至于怎么发送,由

MqttPahoMessageHandler

来完成

接下来我们定义一个接口即可

importorg.springframework.integration.annotation.MessagingGateway;importorg.springframework.integration.mqtt.support.MqttHeaders;importorg.springframework.messaging.handler.annotation.Header;importorg.springframework.stereotype.Component;/**
 * MqttGateway
 *
 * @author hengzi
 * @date 2022/8/23
 */@Component@MessagingGateway(defaultRequestChannel ="mqttOutboundChannel")publicinterfaceMqttGateway{voidsendToMqtt(@Header(MqttHeaders.TOPIC)String topic,String data);voidsendToMqtt(@Header(MqttHeaders.TOPIC)String topic,@Header(MqttHeaders.QOS)IntegerQos,String data);}

我们直接调用这个接口就可以向mqtt 发送数据


到目前为止,整个配置文件长这样:

importorg.eclipse.paho.client.mqttv3.MqttConnectOptions;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.integration.annotation.ServiceActivator;importorg.springframework.integration.channel.DirectChannel;importorg.springframework.integration.core.MessageProducer;importorg.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;importorg.springframework.integration.mqtt.core.MqttPahoClientFactory;importorg.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;importorg.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;importorg.springframework.integration.mqtt.support.DefaultPahoMessageConverter;importorg.springframework.messaging.Message;importorg.springframework.messaging.MessageChannel;importorg.springframework.messaging.MessageHandler;importorg.springframework.messaging.MessagingException;/**
 * MqttConfig
 *
 * @author hengzi
 * @date 2022/8/23
 */@ConfigurationpublicclassMqttConfig{/**
     *  以下属性将在配置文件中读取
     **/@AutowiredprivateMqttProperties mqttProperties;//Mqtt 客户端工厂@BeanpublicMqttPahoClientFactorymqttPahoClientFactory(){DefaultMqttPahoClientFactory factory =newDefaultMqttPahoClientFactory();MqttConnectOptions options =newMqttConnectOptions();
        options.setServerURIs(mqttProperties.getHostUrl().split(","));
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        factory.setConnectionOptions(options);return factory;}// Mqtt 管道适配器@BeanpublicMqttPahoMessageDrivenChannelAdapteradapter(MqttPahoClientFactory factory){returnnewMqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));}// 消息生产者@BeanpublicMessageProducermqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(newDefaultPahoMessageConverter());//入站投递的通道
        adapter.setOutputChannel(mqttInboundChannel());
        adapter.setQos(1);return adapter;}// 出站处理器@Bean@ServiceActivator(inputChannel ="mqttOutboundChannel")publicMessageHandlermqttOutbound(MqttPahoClientFactory factory){MqttPahoMessageHandler handler =newMqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);
        handler.setAsync(true);
        handler.setConverter(newDefaultPahoMessageConverter());
        handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);return handler;}@Bean//使用ServiceActivator 指定接收消息的管道为 mqttInboundChannel,投递到mqttInboundChannel管道中的消息会被该方法接收并执行@ServiceActivator(inputChannel ="mqttInboundChannel")publicMessageHandlerhandleMessage(){return mqttMessageHandle;}//出站消息管道,@BeanpublicMessageChannelmqttOutboundChannel(){returnnewDirectChannel();}// 入站消息管道@BeanpublicMessageChannelmqttInboundChannel(){returnnewDirectChannel();}}

处理消息的

MqttMessageHandle 
@ComponentpublicclassMqttMessageHandleimplementsMessageHandler{@OverridepublicvoidhandleMessage(Message<?> message)throwsMessagingException{}}

在进一步了解之后,发现可以优化的地方,比如channel 的类型是有很多种的, 这里使用的

DirectChannel

,是

Spring Integration

默认的消息通道,它将消息发送给为一个订阅者,然后阻碍发送直到消息被接收,传输方式都是同步的方式,都是由一个线程来运行的.

这里我们可以将入站

channel

改成

ExecutorChannel

一个可以使用多线程的

channel
@BeanpublicThreadPoolTaskExecutormqttThreadPoolTaskExecutor(){ThreadPoolTaskExecutor executor =newThreadPoolTaskExecutor();// 最大可创建的线程数int maxPoolSize =200;
        executor.setMaxPoolSize(maxPoolSize);// 核心线程池大小int corePoolSize =50;
        executor.setCorePoolSize(corePoolSize);// 队列最大长度int queueCapacity =1000;
        executor.setQueueCapacity(queueCapacity);// 线程池维护线程所允许的空闲时间int keepAliveSeconds =300;
        executor.setKeepAliveSeconds(keepAliveSeconds);// 线程池对拒绝任务(无线程可用)的处理策略
        executor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());return executor;}// 入站消息管道@BeanpublicMessageChannelmqttInboundChannel(){// 用线程池returnnewExecutorChannel(mqttThreadPoolTaskExecutor());}

到这里其实可以运行了.

但是这样配置其实还是有点多, 有点乱, 于是我查找官网, f发现一种更简单的配置方法 叫

Java DSL

, 官网连接: Configuring with the Java DSL

我们参考官网,稍微改一下,使用 DSL的方式进行配置:

importorg.eclipse.paho.client.mqttv3.MqttConnectOptions;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.integration.channel.ExecutorChannel;importorg.springframework.integration.dsl.IntegrationFlow;importorg.springframework.integration.dsl.IntegrationFlows;importorg.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;importorg.springframework.integration.mqtt.core.MqttPahoClientFactory;importorg.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;importorg.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;importorg.springframework.integration.mqtt.support.DefaultPahoMessageConverter;importorg.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;importjava.util.concurrent.ThreadPoolExecutor;/**
 * MqttConfigV2
 *
 * @author hengzi
 * @date 2022/8/24
 */@ConfigurationpublicclassMqttConfigV2{@AutowiredprivateMqttProperties mqttProperties;@AutowiredprivateMqttMessageHandle mqttMessageHandle;//Mqtt 客户端工厂 所有客户端从这里产生@BeanpublicMqttPahoClientFactorymqttPahoClientFactory(){DefaultMqttPahoClientFactory factory =newDefaultMqttPahoClientFactory();MqttConnectOptions options =newMqttConnectOptions();
        options.setServerURIs(mqttProperties.getHostUrl().split(","));
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        factory.setConnectionOptions(options);return factory;}// Mqtt 管道适配器@BeanpublicMqttPahoMessageDrivenChannelAdapteradapter(MqttPahoClientFactory factory){returnnewMqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));}// 消息生产者 (接收,处理来自mqtt的消息)@BeanpublicIntegrationFlowmqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){
        adapter.setCompletionTimeout(5000);
        adapter.setQos(1);returnIntegrationFlows.from( adapter).channel(newExecutorChannel(mqttThreadPoolTaskExecutor())).handle(mqttMessageHandle).get();}@BeanpublicThreadPoolTaskExecutormqttThreadPoolTaskExecutor(){ThreadPoolTaskExecutor executor =newThreadPoolTaskExecutor();// 最大可创建的线程数int maxPoolSize =200;
        executor.setMaxPoolSize(maxPoolSize);// 核心线程池大小int corePoolSize =50;
        executor.setCorePoolSize(corePoolSize);// 队列最大长度int queueCapacity =1000;
        executor.setQueueCapacity(queueCapacity);// 线程池维护线程所允许的空闲时间int keepAliveSeconds =300;
        executor.setKeepAliveSeconds(keepAliveSeconds);// 线程池对拒绝任务(无线程可用)的处理策略
        executor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());return executor;}// 出站处理器 (向 mqtt 发送消息)@BeanpublicIntegrationFlowmqttOutboundFlow(MqttPahoClientFactory factory){MqttPahoMessageHandler handler =newMqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);
        handler.setAsync(true);
        handler.setConverter(newDefaultPahoMessageConverter());
        handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);returnIntegrationFlows.from("mqttOutboundChannel").handle(handler).get();}}

这样看起来真的简单多了, 头也没那么大了, 我要是早知道多好.

好了以上就是配置相关的, 到这里其实是已经完成springboot 与 mqtt 的整合了.


但其实我一直有个想法, 就是我们接收的消息 都是在

handleMessage

这个方法里面执行的,

@OverridepublicvoidhandleMessage(Message<?> message)throwsMessagingException{}

所以我就有了一个想法, 能不能根据 我订阅的主题,在不同的方法执行, 对于这个问题,其实你用

if ... else ...

也能实现, 但很明显,如果我订阅的主题很多的话, 那写起来就很头痛了.

对于这个问题,有两种思路, 一个是添加

Spring Integration

的路由

router

,根据不同topic路由到不同的

channel

, 这个我也知道能不能实现, 我这里就不讨论了.

第二种是, 我也不知道名字改如何叫, 我是参考了

spring

@Controller

的设计, 暂且叫他注解模式.

众所周知,我们的接口都是在类上加

@Controller

这个注解, 就代表这个类是 http 接口, 再在方法加上

@RequestMapping

就能实现不同的 url 调用不同的方法.

参数这个设计 我们在类上面加

@MqttService

就代表这个类是专门处理mqtt消息的服务类
同时 在这个类的方法上 加上

@MqttTopic

就代表 这个主题由这个方法处理.

OK, 理论有了,接下来就是 实践.

先定义 两个注解

importorg.springframework.core.annotation.AliasFor;importorg.springframework.stereotype.Component;importjava.lang.annotation.*;@Documented@Target({ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)@Componentpublic@interfaceMqttService{@AliasFor(
            annotation =Component.class)Stringvalue()default"";}

加上

@Component

注解 spring就会扫描, 并注册到IOC容器里

importjava.lang.annotation.ElementType;importjava.lang.annotation.Retention;importjava.lang.annotation.RetentionPolicy;importjava.lang.annotation.Target;@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)public@interfaceMqttTopic{/**
     * 主题名字
     */Stringvalue()default"";}

参考

@RequestMapping

我们使用起来应该是这样的:

importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.messaging.Message;/**
 * MqttTopicHandle
 *
 * @author hengzi
 * @date 2022/8/24
 */@MqttServicepublicclassMqttTopicHandle{publicstaticfinalLogger log =LoggerFactory.getLogger(MqttTopicHandle.class);// 这里的 # 号是通配符@MqttTopic("test/#")publicvoidtest(Message<?> message){
        log.info("test="+message.getPayload());}// 这里的 + 号是通配符@MqttTopic("topic/+/+/up")publicvoidup(Message<?> message){
        log.info("up="+message.getPayload());}// 注意 你必须先订阅@MqttTopic("topic/1/2/down")publicvoiddown(Message<?> message){
        log.info("down="+message.getPayload());}}

OK 接下来就是实现这样的使用

分析 :

当我们收到消息时, 我们从IOC容器中 找到所有 带

@MqttService

注解的类

然后 遍历这些类, 找到带有

@MqttTopic

的方法

接着 把

@MqttTopic

注解的的值 与 接受到的topic 进行对比

如果一致则执行这个方法

废话少说, 上代码

importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.messaging.Message;importorg.springframework.messaging.MessageHandler;importorg.springframework.messaging.MessagingException;importorg.springframework.stereotype.Component;importjava.lang.reflect.InvocationTargetException;importjava.lang.reflect.Method;importjava.util.Map;/**
 * MessageHandleService
 *
 * @author hengzi
 * @date 2022/8/24
 */@ComponentpublicclassMqttMessageHandleimplementsMessageHandler{publicstaticfinalLogger log =LoggerFactory.getLogger(MqttMessageHandle.class);// 包含 @MqttService注解 的类(Component)publicstaticMap<String,Object> mqttServices;/**
     * 所有mqtt到达的消息都会在这里处理
     * 要注意这个方法是在线程池里面运行的
     * @param message message
     */@OverridepublicvoidhandleMessage(Message<?> message)throwsMessagingException{getMqttTopicService(message);}publicMap<String,Object>getMqttServices(){if(mqttServices==null){
            mqttServices =SpringUtils.getBeansByAnnotation(MqttService.class);}return mqttServices;}publicvoidgetMqttTopicService(Message<?> message){// 在这里 我们根据不同的 主题 分发不同的消息String receivedTopic = message.getHeaders().get("mqtt_receivedTopic",String.class);if(receivedTopic==null||"".equals(receivedTopic)){return;}for(Map.Entry<String,Object> entry :getMqttServices().entrySet()){// 把所有带有 @MqttService 的类遍历Class<?> clazz = entry.getValue().getClass();// 获取他所有方法Method[] methods = clazz.getDeclaredMethods();for(Method method: methods ){if(method.isAnnotationPresent(MqttTopic.class)){// 如果这个方法有 这个注解MqttTopic handleTopic = method.getAnnotation(MqttTopic.class);if(isMatch(receivedTopic,handleTopic.value())){// 并且 这个 topic 匹配成功try{
                            method.invoke(SpringUtils.getBean(clazz),message);return;}catch(IllegalAccessException e){
                            e.printStackTrace();
                            log.error("代理炸了");}catch(InvocationTargetException e){
                            log.error("执行 {} 方法出现错误",handleTopic.value(),e);}}}}}}/**
     * mqtt 订阅的主题与我实际的主题是否匹配
     * @param topic 是实际的主题
     * @param pattern 是我订阅的主题 可以是通配符模式
     * @return 是否匹配
     */publicstaticbooleanisMatch(String topic,String pattern){if((topic==null)||(pattern==null)){returnfalse;}if(topic.equals(pattern)){// 完全相等是肯定匹配的returntrue;}if("#".equals(pattern)){// # 号代表所有主题  肯定匹配的returntrue;}String[] splitTopic = topic.split("/");String[] splitPattern = pattern.split("/");boolean match =true;// 如果包含 # 则只需要判断 # 前面的for(int i =0; i < splitPattern.length; i++){if(!"#".equals(splitPattern[i])){// 不是# 号 正常判断if(i>=splitTopic.length){// 此时长度不相等 不匹配
                    match =false;break;}if(!splitTopic[i].equals(splitPattern[i])&&!"+".equals(splitPattern[i])){// 不相等 且不等于 +
                    match =false;break;}}else{// 是# 号  肯定匹配的break;}}return match;}}

工具类

SpringUtils 
importorg.springframework.aop.framework.AopContext;importorg.springframework.beans.BeansException;importorg.springframework.beans.factory.NoSuchBeanDefinitionException;importorg.springframework.beans.factory.config.BeanFactoryPostProcessor;importorg.springframework.beans.factory.config.ConfigurableListableBeanFactory;importorg.springframework.context.ApplicationContext;importorg.springframework.context.ApplicationContextAware;importorg.springframework.stereotype.Component;importjava.util.Map;/**
 * spring工具类 方便在非spring管理环境中获取bean
 * 
 */@ComponentpublicfinalclassSpringUtilsimplementsBeanFactoryPostProcessor,ApplicationContextAware{/** Spring应用上下文环境 */privatestaticConfigurableListableBeanFactory beanFactory;privatestaticApplicationContext applicationContext;publicstaticMap<String,Object>getBeansByAnnotation(Class clsName)throwsBeansException{return beanFactory.getBeansWithAnnotation(clsName);}@OverridepublicvoidpostProcessBeanFactory(ConfigurableListableBeanFactory beanFactory)throwsBeansException{SpringUtils.beanFactory = beanFactory;}@OverridepublicvoidsetApplicationContext(ApplicationContext applicationContext)throwsBeansException{SpringUtils.applicationContext = applicationContext;}/**
     * 获取对象
     *
     * @param name
     * @return Object 一个以所给名字注册的bean的实例
     * @throws org.springframework.beans.BeansException
     *
     */@SuppressWarnings("unchecked")publicstatic<T>TgetBean(String name)throwsBeansException{return(T) beanFactory.getBean(name);}/**
     * 获取类型为requiredType的对象
     *
     * @param clz
     * @return
     * @throws org.springframework.beans.BeansException
     *
     */publicstatic<T>TgetBean(Class<T> clz)throwsBeansException{T result =(T) beanFactory.getBean(clz);return result;}/**
     * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true
     *
     * @param name
     * @return boolean
     */publicstaticbooleancontainsBean(String name){return beanFactory.containsBean(name);}/**
     * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)
     *
     * @param name
     * @return boolean
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */publicstaticbooleanisSingleton(String name)throwsNoSuchBeanDefinitionException{return beanFactory.isSingleton(name);}/**
     * @param name
     * @return Class 注册对象的类型
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */publicstaticClass<?>getType(String name)throwsNoSuchBeanDefinitionException{return beanFactory.getType(name);}/**
     * 如果给定的bean名字在bean定义中有别名,则返回这些别名
     *
     * @param name
     * @return
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */publicstaticString[]getAliases(String name)throwsNoSuchBeanDefinitionException{return beanFactory.getAliases(name);}/**
     * 获取aop代理对象
     * 
     * @param invoker
     * @return
     */@SuppressWarnings("unchecked")publicstatic<T>TgetAopProxy(T invoker){return(T)AopContext.currentProxy();}/**
     * 获取当前的环境配置,无配置返回null
     *
     * @return 当前的环境配置
     */publicstaticString[]getActiveProfiles(){return applicationContext.getEnvironment().getActiveProfiles();}}

OK, 大功告成. 终于舒服了, 终于不用写

if...else...

了, 个人感觉这样处理起来会更加优雅. 写代码最重要是什么, 是优雅~

以上!

参考文章:
使用 Spring integration 在Springboot中集成Mqtt
Spring Integration(一)概述

附:
动态添加主题方式:

importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;importorg.springframework.stereotype.Service;importjava.util.Arrays;/**
 * MqttService
 *
 * @author hengzi
 * @date 2022/8/25
 */@ServicepublicclassMqttService{@AutowiredprivateMqttPahoMessageDrivenChannelAdapter adapter;publicvoidaddTopic(String topic){addTopic(topic,1);}publicvoidaddTopic(String topic,int qos){String[] topics = adapter.getTopic();if(!Arrays.asList(topics).contains(topic)){
            adapter.addTopic(topic,qos);}}publicvoidremoveTopic(String topic){
        adapter.removeTopic(topic);}}

直接调用就行

标签: spring boot iot java

本文转载自: https://blog.csdn.net/weixin_42230797/article/details/126507310
版权归原作者 i小喇叭 所有, 如有侵权,请联系我们删除。

“springboot 整合 mqtt”的评论:

还没有评论