1、概述
MQTT是物联网主流通信协议,但是很多终端天然不具备Mqtt通信能力,比如Web H5、小程序等终端形式,这些终端提供更底层的WebSocket通信方式。因此,研究基于WebSocket进行Mqtt通信是非常普遍的需求。
2、基于WebSocket进行MQTT通信
2.1通信框架
基于WebSocket进行MQTT通信框架代码如下:
public void startup() {
mainGroup = new NioEventLoopGroup();
subGroup = new NioEventLoopGroup();
try {
ServerBootstrap server = new ServerBootstrap();
// 绑定两个线程组
server.group(mainGroup, subGroup)
// 指定NIO的模式
.channel(NioServerSocketChannel.class)
// 子处理器,用于处理workerGroup
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
ch.pipeline().addLast("logging", new LoggingHandler("DEBUG"));// 设置log监听器,并且日志级别为debug,方便观察运行流程
// websocket 基于http协议,所以要有http编解码器 服务端用HttpServerCodec
pipeline.addLast(new HttpServerCodec());
// 对写大数据流的支持
pipeline.addLast(new ChunkedWriteHandler());
/**
* 我们通常接收到的是一个http片段,如果要想完整接受一次请求的所有数据,我们需要绑定HttpObjectAggregator,然后我们
* 就可以收到一个FullHttpRequest-是一个完整的请求信息。
* 对httpMessage进行聚合,聚合成FullHttpRequest或FullHttpResponse
* 几乎在netty中的编程,都会使用到此hanler
*/
pipeline.addLast(new HttpObjectAggregator(1024 * 64));
// ====================== 以上是用于支持http协议 , 以下是支持httpWebsocket
// ======================
/**
* websocket 服务器处理的协议,用于指定给客户端连接访问的路由 : /ws 本handler会帮你处理一些繁重的复杂的事 会帮你处理握手动作:
* handshaking(close, ping, pong) ping + pong = 心跳
* 对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同
*/
// pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
// 自定义的handler
pipeline.addLast(new WebsocketMqttHandler());
pipeline.addLast(new MqttMessageWebSocketFrameEncoder());
pipeline.addLast(MqttEncoder.INSTANCE);
pipeline.addLast(new MqttDecoder());
pipeline.addLast(new MqttHandler());
}
});
// 启动server,并且设置8088为启动的端口号,同时启动方式为同步
ChannelFuture future = server.bind(port).sync();
// 监听关闭的channel,设置位同步方式
future.channel().closeFuture().sync();
} catch (Exception e) {
System.out.println("start exception" + e.toString());
} finally {
// 退出线程组
mainGroup.shutdownGracefully();
subGroup.shutdownGracefully();
}
}
框架基于Java netty库实现,本文关注基于WebSocket的MQTT通信,MQTT本身的Java实现不是本文分析重点, 详情请参考MQTT物联网网关Broker与Java开源实现 。第44-46行添加的MqttEncoder、MqttDecoder和MqttHandler和MQTT物联网网关Broker与Java开源实现 描述的功能相同,共同完成Mqtt协议的处理。第42行的WebsocketMqttHandler需要在Mqtt协议处理之前从WebSocket报文内容里面提取出Mqtt报文;第43行的MqttMessageWebSocketFrameEncoder用于将要发送出去的Mqtt报文编码成WebSocket报文。
WebsocketMqttHandler两大功能:建立连接、收发报文。
WebsocketMqttHandler的核心代码如下:
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
// 获取客户端传输过来的消息
logger.info("收到消息:" + msg);
if (msg instanceof FullHttpRequest) {
// 以http请求形式接入,但是走的是websocket
handleHttpRequest(ctx, (FullHttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
// 处理websocket客户端的消息
handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
}
}
其主体功能包括对Http报文的处理和对WebSocket帧的处理。
- Http报文处理:http报文用于客户端和Broker之间建立连接;
- WebSocket帧处理:从WebSocket报文帧里面提取(组合)Mqtt报文。
2.2建立WebSocket连接
以微信小程序作为客户端,建立与服务端简的WebSocket连接,客户端操作详情请参考微信小程序MQTT通信及开源框架实现,本文关注基于WebSocket进行MQTT的流程。
建立WebSocket连接由两个步骤完成:
- 客户端首先通过http协议发送一条协议升级(升级到websocket)请求报文;
- 服务端进行握手成功后返回一条101 http协议报文表示WebSocket连接已建立。
其代码实现如下:
/**
* 唯一的一次http请求,用于创建websocket
*/
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
// 要求Upgrade为websocket,过滤掉get/Post
if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
// 若不是websocket方式,则创建BAD_REQUEST的req,返回给客户端
sendHttpResponse(ctx, req,
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
logger.info(req.content());
// ctx.fireChannelRead(req.content());
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
"ws://localhost:1885/websocket",
"mqtt", false);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
req.headers().set("Sec-WebSocket-Protocol", "mqtt");
handshaker.handshake(ctx.channel(), req);
}
}
识别出收到的报文是Http报文时在第22行进行握手完成协议升级成websocket协议建立连接。
建立连接过程跟踪
采用wireshark工具对连接过程进行跟踪可以看到如下报文信息:
WebSocket连接建立报文跟踪
从服务器端返回的101报文表示建立连接成功。
2.3基于WebSocket进行MQTT通信
建立websocket连接后,Mqtt报文基于WebSocket进行传输,结束WebSocket报文后进行如下处理:
接收报文
private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// 判断是否关闭链路的指令
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
// 判断是否ping消息
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
ByteBuf echoMsg = frame.content();
frame.retain();
ctx.fireChannelRead(echoMsg);
}
在第14行继续交给下一个handler(即Mqtt相关handler,包括MqttDecoder、MqttHandler)处理。
发送报文
在MqttMessageWebSocketFrameEncoder里对要发送的Mqtt报文进行WebSocket封装,如下第9行所示:
@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
if (msg == null)
return;
// byte[] data = ;
// out.add(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(msg)));
ctx.channel().writeAndFlush(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(msg)));
}
3、更多
开源项目:Open-Api
更多信息:www.lokei.cn
版权归原作者 navlange 所有, 如有侵权,请联系我们删除。