0


【开源物联网】基于WebSocket进行MQTT通信

1、概述

MQTT是物联网主流通信协议,但是很多终端天然不具备Mqtt通信能力,比如Web H5、小程序等终端形式,这些终端提供更底层的WebSocket通信方式。因此,研究基于WebSocket进行Mqtt通信是非常普遍的需求。

2、基于WebSocket进行MQTT通信

2.1通信框架

基于WebSocket进行MQTT通信框架代码如下:

public void startup() {
        mainGroup = new NioEventLoopGroup();
        subGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap server = new ServerBootstrap();
            // 绑定两个线程组
            server.group(mainGroup, subGroup)
                    // 指定NIO的模式
                    .channel(NioServerSocketChannel.class)
                    // 子处理器,用于处理workerGroup
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();

                            ch.pipeline().addLast("logging", new LoggingHandler("DEBUG"));// 设置log监听器,并且日志级别为debug,方便观察运行流程

                            // websocket 基于http协议,所以要有http编解码器 服务端用HttpServerCodec
                            pipeline.addLast(new HttpServerCodec());
                            // 对写大数据流的支持
                            pipeline.addLast(new ChunkedWriteHandler());

                            /**
                             * 我们通常接收到的是一个http片段,如果要想完整接受一次请求的所有数据,我们需要绑定HttpObjectAggregator,然后我们
                             * 就可以收到一个FullHttpRequest-是一个完整的请求信息。
                             * 对httpMessage进行聚合,聚合成FullHttpRequest或FullHttpResponse
                             * 几乎在netty中的编程,都会使用到此hanler
                             */
                            pipeline.addLast(new HttpObjectAggregator(1024 * 64));

                            // ====================== 以上是用于支持http协议 , 以下是支持httpWebsocket
                            // ======================

                            /**
                             * websocket 服务器处理的协议,用于指定给客户端连接访问的路由 : /ws 本handler会帮你处理一些繁重的复杂的事 会帮你处理握手动作:
                             * handshaking(close, ping, pong) ping + pong = 心跳
                             * 对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同
                             */
                            // pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));

                            // 自定义的handler
                            pipeline.addLast(new WebsocketMqttHandler());
                            pipeline.addLast(new MqttMessageWebSocketFrameEncoder());
                            pipeline.addLast(MqttEncoder.INSTANCE);
                            pipeline.addLast(new MqttDecoder());
                            pipeline.addLast(new MqttHandler());
                        }
                    });

            // 启动server,并且设置8088为启动的端口号,同时启动方式为同步
            ChannelFuture future = server.bind(port).sync();
            // 监听关闭的channel,设置位同步方式
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            System.out.println("start exception" + e.toString());
        } finally {
            // 退出线程组
            mainGroup.shutdownGracefully();
            subGroup.shutdownGracefully();
        }
    }

框架基于Java netty库实现,本文关注基于WebSocket的MQTT通信,MQTT本身的Java实现不是本文分析重点, 详情请参考MQTT物联网网关Broker与Java开源实现 。第44-46行添加的MqttEncoder、MqttDecoder和MqttHandler和MQTT物联网网关Broker与Java开源实现 描述的功能相同,共同完成Mqtt协议的处理。第42行的WebsocketMqttHandler需要在Mqtt协议处理之前从WebSocket报文内容里面提取出Mqtt报文;第43行的MqttMessageWebSocketFrameEncoder用于将要发送出去的Mqtt报文编码成WebSocket报文。

WebsocketMqttHandler两大功能:建立连接、收发报文。

WebsocketMqttHandler的核心代码如下:

@Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 获取客户端传输过来的消息
        logger.info("收到消息:" + msg);
        if (msg instanceof FullHttpRequest) {
            // 以http请求形式接入,但是走的是websocket
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {
            // 处理websocket客户端的消息
            handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
        }

    }

其主体功能包括对Http报文的处理和对WebSocket帧的处理。

  • Http报文处理:http报文用于客户端和Broker之间建立连接;
  • WebSocket帧处理:从WebSocket报文帧里面提取(组合)Mqtt报文。

2.2建立WebSocket连接

以微信小程序作为客户端,建立与服务端简的WebSocket连接,客户端操作详情请参考微信小程序MQTT通信及开源框架实现,本文关注基于WebSocket进行MQTT的流程。

建立WebSocket连接由两个步骤完成:

  • 客户端首先通过http协议发送一条协议升级(升级到websocket)请求报文;
  • 服务端进行握手成功后返回一条101 http协议报文表示WebSocket连接已建立。

其代码实现如下:

/**
     * 唯一的一次http请求,用于创建websocket
     */
    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
        // 要求Upgrade为websocket,过滤掉get/Post
        if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
            // 若不是websocket方式,则创建BAD_REQUEST的req,返回给客户端
            sendHttpResponse(ctx, req,
                    new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }
        logger.info(req.content());
        // ctx.fireChannelRead(req.content());
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                "ws://localhost:1885/websocket",
                "mqtt", false);
        handshaker = wsFactory.newHandshaker(req);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        } else {
            req.headers().set("Sec-WebSocket-Protocol", "mqtt");
            handshaker.handshake(ctx.channel(), req);
        }
    }

识别出收到的报文是Http报文时在第22行进行握手完成协议升级成websocket协议建立连接。

建立连接过程跟踪

采用wireshark工具对连接过程进行跟踪可以看到如下报文信息:

WebSocket连接建立报文跟踪

从服务器端返回的101报文表示建立连接成功。

2.3基于WebSocket进行MQTT通信

建立websocket连接后,Mqtt报文基于WebSocket进行传输,结束WebSocket报文后进行如下处理:

接收报文

private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
        // 判断是否关闭链路的指令
        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        // 判断是否ping消息
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        ByteBuf echoMsg = frame.content();
        frame.retain();
        ctx.fireChannelRead(echoMsg);
    }

在第14行继续交给下一个handler(即Mqtt相关handler,包括MqttDecoder、MqttHandler)处理。

发送报文

在MqttMessageWebSocketFrameEncoder里对要发送的Mqtt报文进行WebSocket封装,如下第9行所示:

@Override
    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
        if (msg == null)
            return;

        // byte[] data = ;

        // out.add(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(msg)));
        ctx.channel().writeAndFlush(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(msg)));
    }

3、更多

开源项目:Open-Api

更多信息:www.lokei.cn


本文转载自: https://blog.csdn.net/navlange/article/details/126675672
版权归原作者 navlange 所有, 如有侵权,请联系我们删除。

“【开源物联网】基于WebSocket进行MQTT通信”的评论:

还没有评论