0


IOT云平台 simple(6)springboot netty实现IOT云平台基本的架构(mqtt、Rabbitmq)

本系列教程包括:
IOT云平台 simple(0)IOT云平台简介
IOT云平台 simple(1)netty入门
IOT云平台 simple(2)springboot入门
IOT云平台 simple(3)springboot netty实现TCP Server
IOT云平台 simple(4)springboot netty实现简单的mqtt broker
IOT云平台 simple(5)springboot netty实现modbus TCP Master
IOT云平台 simple(6)springboot netty实现IOT云平台基本的架构(mqtt、Rabbitmq)

本章首先简单介绍了IOT云平台最基本的架构,然后基于springboot netty实现IOT Server;最后进行了测试验证。

测试环境:

  1. mqtt终端,这里用MQTT.fx 工具软件模拟;
  2. IOT server:基于springboot netty进行开发;
  3. Rabbitmq broker;本地安装Windows 64位环境;
  4. Rabbitmq consumer,订阅mq message的server,这里用MQTT Assistant工具软件模拟;

1 IOT云平台最基本的架构

本章涉及的IOT云平台最基本架构图:
在这里插入图片描述
说明
1)为了简单,这里只包括mqtt上行链路,即mqtt终端上传数据;
2)这里通过Rabbitmq 进行消息的分发,也可以用其他mq中间件,如kafka。
3)Mqtt terminal:mqtt终端,指的是具体的设备传感器或者mqtt协议网关。

具体流程
第1步,mqtt终端
实现mqtt协议或者其他协议(modbus、wifi、蓝牙)转换为mqtt协议。

第2步,mqtt终端->IOT server
mqtt终端publish message到IOT server;

第3步,IOT server->Rabbitmq broker
IOT Server中mqtt broker的模块收到mqtt message,进行解析,然后通过Rabbitmq producer模块,publish message到Rabbitmq broker;

第4步,Rabbitmq broker->不同server
Rabbitmq broker收到消息存入指定的queue,然后分发到订阅消息的不同server;

第5步,不同server
不同server监听收到消息进行相应的处理;如:存入到时序数据库、进行大数据流的计算、具体业务的处理。

2 集成开发

第1步:POM文件引入netty、Rabbitmq的依赖:

<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.63.Final</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>3.0.1</version></dependency>

2.1实现mqtt broker

创建主要的类:
1)TCPServer
server类,实现mqtt broker。
2 )TCPServerStartListener
监听到springboot启动后,启动server。
3)TCPServerChannelInitializer
server channel初始化的类

包括两个server channel处理的类:
1) MqttMessageChannelHandler:
server channel处理的类;实现mqtt消息的解析;

@Component@Slf4j@ChannelHandler.SharablepublicclassMqttMessageChannelHandlerextendsChannelInboundHandlerAdapter{@AutowiredMessageStrategyManager messageStrategyManager;@OverridepublicvoidchannelRead(ChannelHandlerContext channelHandlerContext,Object msg)throwsException{MqttMessage mqttMessage =(MqttMessage) msg;
        log.info("--------------------------channelRead begin---------------------------*");
        log.info("from client:"+ channelHandlerContext.channel().remoteAddress());
        log.info("receive message:"+ mqttMessage.toString());try{MqttMessageType type = mqttMessage.fixedHeader().messageType();MessageStrategy messageStrategy =  messageStrategyManager.getMessageStrategy(type);if(messageStrategy!=null){
                messageStrategy.sendResponseMessage(channelHandlerContext,mqttMessage);}}catch(Exception e){
            e.printStackTrace();}
        log.info("--------------------------channelRead end---------------------------*");}@OverridepublicvoidexceptionCaught(ChannelHandlerContext ctx,Throwable cause){// (4)// 当出现异常就关闭连接
        cause.printStackTrace();
        ctx.close();}}

2) MqMessageChannelHandler:
server channel处理的类;实现mq消息发布到Rabbitmq broker;

@Component@Slf4jpublicclassMqMessageChannelHandlerextendsChannelInboundHandlerAdapter{@AutowiredProducerService producerService;@OverridepublicvoidchannelRead(ChannelHandlerContext ctx,Object msg)throwsException{if(!(msg instanceofMqMessage)){return;}MqMessage mqMessage =(MqMessage) msg;
        log.info("转发到Rabbitmq Server:"+ mqMessage.data);
        producerService.sendData(mqMessage.data);}}

2.2实现Rabbitmq producer

1 定义配置类ProducerConfig:

exchange(交换机):topic_exchange
queue(队列):topic_queue
bindKey(绑定key):project1.station1.*

@Slf4j@ConfigurationpublicclassProducerConfig{String bindKey ="project1.station1.*";@BeanpublicRabbitTemplatecreateRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);//开启Mandatory,触发回调函数
        rabbitTemplate.setMandatory(true);//ack
        rabbitTemplate.setConfirmCallback(newRabbitTemplate.ConfirmCallback(){@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){
                log.info("-----------confirm begin---------------");
                log.info("data:"+ correlationData);if(ack){
                    log.info("Ack:true");}else{
                    log.info("Ack:false");}
                log.info("cause:"+ cause);
                log.info("-----------confirm end---------------");}});//return
        rabbitTemplate.setReturnsCallback(newRabbitTemplate.ReturnsCallback(){@OverridepublicvoidreturnedMessage(ReturnedMessage returnedMessage){
                log.info("-----------return begin---------------");
                log.info("message:"+returnedMessage.getMessage());
                log.info("reply code:"+returnedMessage.getReplyCode());
                log.info("reply text:"+returnedMessage.getReplyText());
                log.info("exchange:"+returnedMessage.getExchange());
                log.info("routeKey:"+returnedMessage.getRoutingKey());
                log.info("-----------return end---------------");}});return rabbitTemplate;}@Bean("topic_exchange")publicTopicExchangetopicExchange(){returnExchangeBuilder.topicExchange("topic_exchange").durable(true).build();}@Bean("topic_queue")publicQueuetopicQueue(){returnQueueBuilder.durable("topic_queue").build();}@BeanpublicBindingtopicBind(){returnBindingBuilder.bind(topicQueue()).to(topicExchange()).with(bindKey);}}

2 定义ProducerService类,实现发送消息的功能:
这里发送:
exchange(交换机):topic_exchange
routeKey (路由key):project1.station1.device1

@Slf4j@ComponentpublicclassProducerService{String exchange ="topic_exchange";String routeKey ="project1.station1.device1";@ResourceprivateRabbitTemplate rabbitTemplate;publicvoidsendData(String data){CorrelationData correlationData =newCorrelationData(UUID.randomUUID().toString());
        log.info("------------topic producer begin------------");
        rabbitTemplate.convertAndSend(exchange, routeKey, data);
        log.info("exchange:"+exchange);
        log.info("routeKey:"+routeKey);
        log.info("send data:"+data);
        log.info("------------topic producer end------------");}}

3 测试验证

第1步:mqtt终端发送消息:temperature is 26, 2023.1.14 17:00。
在这里插入图片描述
第2步:IOT Server接收到mqtt终端发送的消息;然后转发到Rabbitmq Server;
在这里插入图片描述
第3步:Rabbitmq Server的topic_queue中有1条消息;
在这里插入图片描述
第4步:MQTT Assistant中从topic_queue中消费消息:
在这里插入图片描述
可见,对于物联网的数据,实现了从端到平台(解析、分发、存储)的整个流程。

代码详见:
https://gitee.com/linghufeixia/iot-simple
code5


本文转载自: https://blog.csdn.net/afei8080/article/details/128557674
版权归原作者 令狐飞侠 所有, 如有侵权,请联系我们删除。

“IOT云平台 simple(6)springboot netty实现IOT云平台基本的架构(mqtt、Rabbitmq)”的评论:

还没有评论