packagecom.cetcnav.nioWebSocket;importcom.cetcnav.base.utils.PropertiesUtils;importcn.hutool.core.lang.Console;importio.netty.bootstrap.ServerBootstrap;importio.netty.channel.Channel;importio.netty.channel.ChannelInitializer;importio.netty.channel.ChannelOption;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.SocketChannel;importio.netty.channel.socket.nio.NioServerSocketChannel;importio.netty.handler.codec.http.HttpObjectAggregator;importio.netty.handler.codec.http.HttpServerCodec;importio.netty.handler.logging.LogLevel;importio.netty.handler.logging.LoggingHandler;importio.netty.handler.stream.ChunkedWriteHandler;/**
* @author wp
* 2021年4月14日下午3:43:54
* smartReceptionWS
* NioWebSocketServer.java
* 类说明:
*/publicclassNioWebSocketServerimplementsRunnable{@Overridepublicvoidrun(){init();}/**
* niowebsocket端口号
*/privatefinalstaticInteger port =PropertiesUtils.getInteger("nio.wensocket.port");publicvoidinit(){Console.log("正在启动websocket服务器");NioEventLoopGroup boss=newNioEventLoopGroup();NioEventLoopGroup work=newNioEventLoopGroup();try{ServerBootstrap bootstrap=newServerBootstrap();
bootstrap.option(ChannelOption.SO_BACKLOG,1024);// 防止tcp数据积攒大块发送;有数据了就发送
bootstrap.childOption(ChannelOption.SO_KEEPALIVE,true);
bootstrap.group(boss,work).localAddress(port);// 绑定监听端口;
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(newNioWebSocketChannelInitializer());// 服务器异步创建绑定Channel channel = bootstrap.bind().sync().channel();Console.log("webSocket服务器启动成功:"+channel);//关闭服务器通道
channel.closeFuture().sync();}catch(InterruptedException e){
e.printStackTrace();Console.error("运行出错:"+e);}finally{
boss.shutdownGracefully();
work.shutdownGracefully();Console.log("websocket服务器已关闭");}}publicstaticvoidmain(String[] args){newNioWebSocketServer().init();}}
packagecom.cetcnav.nioWebSocket;importio.netty.buffer.ByteBuf;importio.netty.buffer.Unpooled;importio.netty.channel.*;importio.netty.handler.codec.http.DefaultFullHttpResponse;importio.netty.handler.codec.http.FullHttpRequest;importio.netty.handler.codec.http.HttpResponseStatus;importio.netty.handler.codec.http.HttpVersion;importio.netty.handler.codec.http.websocketx.*;importio.netty.util.CharsetUtil;importjava.util.Date;importcn.hutool.core.lang.Console;/**
* @author wp
* 2021年4月14日下午3:49:10
* smartReceptionWS
* NioWebSocketHandler.java
* 类说明:
*/importstaticio.netty.handler.codec.http.HttpUtil.isKeepAlive;/**
* @author wp
* 2021年4月15日下午3:43:31
* smartReceptionWS
* NioWebSocketHandler.java
* 类说明:
*/publicclassNioWebSocketHandlerextendsSimpleChannelInboundHandler<Object>{privateWebSocketServerHandshaker handshaker;@OverrideprotectedvoidchannelRead0(ChannelHandlerContext ctx,Object msg)throwsException{Console.log("收到消息:"+msg);if(msg instanceofFullHttpRequest){//以http请求形式接入,但是走的是websockethandleHttpRequest(ctx,(FullHttpRequest) msg);}elseif(msg instanceofWebSocketFrame){//处理websocket客户端的消息handlerWebSocketFrame(ctx,(WebSocketFrame) msg);}}@OverridepublicvoidchannelActive(ChannelHandlerContext ctx)throwsException{//添加连接Console.log("客户端加入连接:"+ctx.channel());ChannelHandlerPool.addChannel(ctx.channel());}@OverridepublicvoidchannelInactive(ChannelHandlerContext ctx)throwsException{//断开连接Console.log("客户端断开连接:"+ctx.channel());ChannelHandlerPool.removeChannel(ctx.channel());}@OverridepublicvoidchannelReadComplete(ChannelHandlerContext ctx)throwsException{
ctx.flush();}privatevoidhandlerWebSocketFrame(ChannelHandlerContext ctx,WebSocketFrame frame){// 判断是否关闭链路的指令if(frame instanceofCloseWebSocketFrame){
handshaker.close(ctx.channel(),(CloseWebSocketFrame) frame.retain());return;}// 判断是否ping消息if(frame instanceofPingWebSocketFrame){
ctx.channel().write(newPongWebSocketFrame(frame.content().retain()));return;}// 本例程仅支持文本消息,不支持二进制消息if(!(frame instanceofTextWebSocketFrame)){Console.log("本例程仅支持文本消息,不支持二进制消息");thrownewUnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));}// 返回应答消息String request =((TextWebSocketFrame) frame).text();Console.log("服务端收到:"+ request);TextWebSocketFrame tws =newTextWebSocketFrame(newDate().toString()+ ctx.channel().id()+":"+ request);// 群发// ChannelHandlerPool.send2All(tws);// 返回【谁发的发给谁】
ctx.channel().writeAndFlush(tws);}/**
* 唯一的一次http请求,用于创建websocket
*/privatevoidhandleHttpRequest(ChannelHandlerContext ctx,FullHttpRequest req){//要求Upgrade为websocket,过滤掉get/Postif(!req.decoderResult().isSuccess()||(!"websocket".equals(req.headers().get("Upgrade")))){//若不是websocket方式,则创建BAD_REQUEST的req,返回给客户端sendHttpResponse(ctx, req,newDefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.BAD_REQUEST));return;}WebSocketServerHandshakerFactory wsFactory =newWebSocketServerHandshakerFactory("",null,false);
handshaker = wsFactory.newHandshaker(req);if(handshaker ==null){//版本不支持WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());}else{
handshaker.handshake(ctx.channel(), req);}}/**
* 拒绝不合法的请求,并返回错误信息
* */privatestaticvoidsendHttpResponse(ChannelHandlerContext ctx,FullHttpRequest req,DefaultFullHttpResponse res){// 返回应答给客户端if(res.status().code()!=200){ByteBuf buf =Unpooled.copiedBuffer(res.status().toString(),CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();}ChannelFuture f = ctx.channel().writeAndFlush(res);// 如果是非Keep-Alive,关闭连接if(!isKeepAlive(req)|| res.status().code()!=200){
f.addListener(ChannelFutureListener.CLOSE);}}}
packagecom.cetcnav.nioWebSocket;importio.netty.buffer.ByteBuf;importio.netty.buffer.Unpooled;importio.netty.channel.Channel;importio.netty.channel.ChannelId;importio.netty.channel.group.ChannelGroup;importio.netty.channel.group.DefaultChannelGroup;importio.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;importio.netty.handler.codec.http.websocketx.TextWebSocketFrame;importio.netty.util.AttributeKey;importio.netty.util.concurrent.GlobalEventExecutor;importjava.util.Collections;importjava.util.HashSet;importjava.util.Iterator;importjava.util.List;importjava.util.Set;importjava.util.concurrent.ConcurrentHashMap;importjava.util.concurrent.ConcurrentMap;importjava.util.stream.Collectors;/**
* @author wp
* 2021年4月14日下午3:50:52
* smartReceptionWS
* ChannelHandlerPool.java
* 类说明:通道组池,管理所有websocket连接
*/publicclassChannelHandlerPool{privatestaticChannelGroup group =newDefaultChannelGroup(GlobalEventExecutor.INSTANCE);/**
*
*/privatestaticConcurrentMap<String,ChannelId>ChannelMap=newConcurrentHashMap<String,ChannelId>();/**
* 通道组池,管理所有websocket连接 含有具体的指定用户名称
*/publicstaticSet<Channel> channelGroup =Collections.synchronizedSet(newHashSet<>());publicstaticvoidaddChannel(Channel channel){
group.add(channel);ChannelMap.put(channel.id().asShortText(),channel.id());}publicstaticvoidremoveChannel(Channel channel){
group.remove(channel);ChannelMap.remove(channel.id().asShortText());/**
* 去除指定用户名称通道
*/String channelId = channel.id().asShortText();Iterator<Channel> it = channelGroup.iterator();while(it.hasNext()){Channel data = it.next();String removeString = data.id().asShortText();if(channelId.equals(removeString)){
it.remove();}}}publicstaticChannelfindChannel(String id){return group.find(ChannelMap.get(id));}publicstaticvoidsend2All(TextWebSocketFrame tws){
group.writeAndFlush(tws);}publicstaticStringsendMessage(String parkId,String message){List<Channel> channelList =getChannelByName(parkId);if(channelList.size()<=0){return"parkId"+ parkId +"不在线!";}ByteBuf byteBuf =Unpooled.copiedBuffer(message.getBytes());// channelList.forEach(channel -> channel.writeAndFlush(new BinaryWebSocketFrame(byteBuf)));
channelList.forEach(channel -> channel.writeAndFlush(newTextWebSocketFrame(message)));return"success";}/**
* 根据用户parkId查找channel
*
* @param parkId
* @return
*/publicstaticList<Channel>getChannelByName(String parkId){AttributeKey<String> key =AttributeKey.valueOf("parkId");returnChannelHandlerPool.channelGroup.stream().filter(channel -> channel.attr(key).get().equals(parkId)).collect(Collectors.toList());}}
packagecom.cetcnav.worker;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importcom.cetcnav.cacheManage.MessageCahce;importcom.cetcnav.nioWebSocket.ChannelHandlerPool;importio.netty.handler.codec.http.websocketx.TextWebSocketFrame;/**
* @author wp
* @date 2021年6月3日下午8:11:47
* @projectName GRWebSocket
* @fileName SendMessage2WebWorker.java
* @tags 类说明: 主动向前端推送解析之后的数据信息
*/publicclassSendMessage2WebWorkerimplementsRunnable{privatestaticLogger log =LoggerFactory.getLogger(SendMessage2WebWorker.class);@Overridepublicvoidrun(){while(true){String json =MessageCahce.take();
log.info(json);ChannelHandlerPool.send2All(newTextWebSocketFrame(json));if(json.contains("0060")){
log.info("已推送干扰数据:");}elseif(json.contains("0061")){
log.info("已推送功放数据:");}}}}
<!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><title>Netty-Websocket</title></head><scriptsrc="webjars/jquery/3.4.1/jquery.js"></script><scripttype="text/javascript">var socket;if(!window.WebSocket){
window.WebSocket = window.MozWebSocket;}if(window.WebSocket){// uid字段要和后台WebSocketHandler类中的paramMap.get("uid")字段对应
socket =newWebSocket("ws://222.222.216.210:7777/websocket");// 55代表用户id
socket.onmessage=function(event){//$("#websocket").html(event.data)
console.log(event.data)};
socket.onopen=function(event){
console.log("Netty-WebSocket服务器。。。。。。连接")};
socket.onclose=function(event){
console.log("Netty-WebSocket服务器。。。。。。关闭")};}else{alert("您的浏览器不支持WebSocket协议!");}</script><body><divid="ws"class="ws"></div></body></html>
本文转载自: https://blog.csdn.net/qq_33651286/article/details/140638776
版权归原作者 Mr.4567 所有, 如有侵权,请联系我们删除。
版权归原作者 Mr.4567 所有, 如有侵权,请联系我们删除。