0


Netty框架:java实现websocket,向网页实时推送消息

packagecom.cetcnav.nioWebSocket;importcom.cetcnav.base.utils.PropertiesUtils;importcn.hutool.core.lang.Console;importio.netty.bootstrap.ServerBootstrap;importio.netty.channel.Channel;importio.netty.channel.ChannelInitializer;importio.netty.channel.ChannelOption;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.SocketChannel;importio.netty.channel.socket.nio.NioServerSocketChannel;importio.netty.handler.codec.http.HttpObjectAggregator;importio.netty.handler.codec.http.HttpServerCodec;importio.netty.handler.logging.LogLevel;importio.netty.handler.logging.LoggingHandler;importio.netty.handler.stream.ChunkedWriteHandler;/**
 * @author wp
 * 2021年4月14日下午3:43:54
 * smartReceptionWS
 * NioWebSocketServer.java
 * 类说明:
 */publicclassNioWebSocketServerimplementsRunnable{@Overridepublicvoidrun(){init();}/**
     * niowebsocket端口号
     */privatefinalstaticInteger port =PropertiesUtils.getInteger("nio.wensocket.port");publicvoidinit(){Console.log("正在启动websocket服务器");NioEventLoopGroup boss=newNioEventLoopGroup();NioEventLoopGroup work=newNioEventLoopGroup();try{ServerBootstrap bootstrap=newServerBootstrap();
            bootstrap.option(ChannelOption.SO_BACKLOG,1024);// 防止tcp数据积攒大块发送;有数据了就发送
            bootstrap.childOption(ChannelOption.SO_KEEPALIVE,true);
            bootstrap.group(boss,work).localAddress(port);// 绑定监听端口;
            bootstrap.channel(NioServerSocketChannel.class);
            bootstrap.childHandler(newNioWebSocketChannelInitializer());// 服务器异步创建绑定Channel channel = bootstrap.bind().sync().channel();Console.log("webSocket服务器启动成功:"+channel);//关闭服务器通道
            channel.closeFuture().sync();}catch(InterruptedException e){
            e.printStackTrace();Console.error("运行出错:"+e);}finally{
            boss.shutdownGracefully();
            work.shutdownGracefully();Console.log("websocket服务器已关闭");}}publicstaticvoidmain(String[] args){newNioWebSocketServer().init();}}
packagecom.cetcnav.nioWebSocket;importio.netty.buffer.ByteBuf;importio.netty.buffer.Unpooled;importio.netty.channel.*;importio.netty.handler.codec.http.DefaultFullHttpResponse;importio.netty.handler.codec.http.FullHttpRequest;importio.netty.handler.codec.http.HttpResponseStatus;importio.netty.handler.codec.http.HttpVersion;importio.netty.handler.codec.http.websocketx.*;importio.netty.util.CharsetUtil;importjava.util.Date;importcn.hutool.core.lang.Console;/**
 * @author wp
 * 2021年4月14日下午3:49:10
 * smartReceptionWS
 * NioWebSocketHandler.java
 * 类说明:
 */importstaticio.netty.handler.codec.http.HttpUtil.isKeepAlive;/**
 * @author wp
 * 2021年4月15日下午3:43:31
 * smartReceptionWS
 * NioWebSocketHandler.java
 * 类说明:
 */publicclassNioWebSocketHandlerextendsSimpleChannelInboundHandler<Object>{privateWebSocketServerHandshaker handshaker;@OverrideprotectedvoidchannelRead0(ChannelHandlerContext ctx,Object msg)throwsException{Console.log("收到消息:"+msg);if(msg instanceofFullHttpRequest){//以http请求形式接入,但是走的是websockethandleHttpRequest(ctx,(FullHttpRequest) msg);}elseif(msg instanceofWebSocketFrame){//处理websocket客户端的消息handlerWebSocketFrame(ctx,(WebSocketFrame) msg);}}@OverridepublicvoidchannelActive(ChannelHandlerContext ctx)throwsException{//添加连接Console.log("客户端加入连接:"+ctx.channel());ChannelHandlerPool.addChannel(ctx.channel());}@OverridepublicvoidchannelInactive(ChannelHandlerContext ctx)throwsException{//断开连接Console.log("客户端断开连接:"+ctx.channel());ChannelHandlerPool.removeChannel(ctx.channel());}@OverridepublicvoidchannelReadComplete(ChannelHandlerContext ctx)throwsException{
        ctx.flush();}privatevoidhandlerWebSocketFrame(ChannelHandlerContext ctx,WebSocketFrame frame){// 判断是否关闭链路的指令if(frame instanceofCloseWebSocketFrame){
            handshaker.close(ctx.channel(),(CloseWebSocketFrame) frame.retain());return;}// 判断是否ping消息if(frame instanceofPingWebSocketFrame){
            ctx.channel().write(newPongWebSocketFrame(frame.content().retain()));return;}// 本例程仅支持文本消息,不支持二进制消息if(!(frame instanceofTextWebSocketFrame)){Console.log("本例程仅支持文本消息,不支持二进制消息");thrownewUnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));}// 返回应答消息String request =((TextWebSocketFrame) frame).text();Console.log("服务端收到:"+ request);TextWebSocketFrame tws =newTextWebSocketFrame(newDate().toString()+ ctx.channel().id()+":"+ request);// 群发//        ChannelHandlerPool.send2All(tws);// 返回【谁发的发给谁】
         ctx.channel().writeAndFlush(tws);}/**
     * 唯一的一次http请求,用于创建websocket
     */privatevoidhandleHttpRequest(ChannelHandlerContext ctx,FullHttpRequest req){//要求Upgrade为websocket,过滤掉get/Postif(!req.decoderResult().isSuccess()||(!"websocket".equals(req.headers().get("Upgrade")))){//若不是websocket方式,则创建BAD_REQUEST的req,返回给客户端sendHttpResponse(ctx, req,newDefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.BAD_REQUEST));return;}WebSocketServerHandshakerFactory wsFactory =newWebSocketServerHandshakerFactory("",null,false);
        handshaker = wsFactory.newHandshaker(req);if(handshaker ==null){//版本不支持WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());}else{
            handshaker.handshake(ctx.channel(), req);}}/**
     * 拒绝不合法的请求,并返回错误信息
     * */privatestaticvoidsendHttpResponse(ChannelHandlerContext ctx,FullHttpRequest req,DefaultFullHttpResponse res){// 返回应答给客户端if(res.status().code()!=200){ByteBuf buf =Unpooled.copiedBuffer(res.status().toString(),CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();}ChannelFuture f = ctx.channel().writeAndFlush(res);// 如果是非Keep-Alive,关闭连接if(!isKeepAlive(req)|| res.status().code()!=200){
            f.addListener(ChannelFutureListener.CLOSE);}}}
packagecom.cetcnav.nioWebSocket;importio.netty.buffer.ByteBuf;importio.netty.buffer.Unpooled;importio.netty.channel.Channel;importio.netty.channel.ChannelId;importio.netty.channel.group.ChannelGroup;importio.netty.channel.group.DefaultChannelGroup;importio.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;importio.netty.handler.codec.http.websocketx.TextWebSocketFrame;importio.netty.util.AttributeKey;importio.netty.util.concurrent.GlobalEventExecutor;importjava.util.Collections;importjava.util.HashSet;importjava.util.Iterator;importjava.util.List;importjava.util.Set;importjava.util.concurrent.ConcurrentHashMap;importjava.util.concurrent.ConcurrentMap;importjava.util.stream.Collectors;/**
 * @author wp
 * 2021年4月14日下午3:50:52
 * smartReceptionWS
 * ChannelHandlerPool.java
 * 类说明:通道组池,管理所有websocket连接
 */publicclassChannelHandlerPool{privatestaticChannelGroup group =newDefaultChannelGroup(GlobalEventExecutor.INSTANCE);/**
     * 
     */privatestaticConcurrentMap<String,ChannelId>ChannelMap=newConcurrentHashMap<String,ChannelId>();/**
     * 通道组池,管理所有websocket连接   含有具体的指定用户名称
     */publicstaticSet<Channel> channelGroup =Collections.synchronizedSet(newHashSet<>());publicstaticvoidaddChannel(Channel channel){
        group.add(channel);ChannelMap.put(channel.id().asShortText(),channel.id());}publicstaticvoidremoveChannel(Channel channel){
        group.remove(channel);ChannelMap.remove(channel.id().asShortText());/**
         * 去除指定用户名称通道
         */String channelId = channel.id().asShortText();Iterator<Channel> it = channelGroup.iterator();while(it.hasNext()){Channel data = it.next();String removeString = data.id().asShortText();if(channelId.equals(removeString)){
                it.remove();}}}publicstaticChannelfindChannel(String id){return group.find(ChannelMap.get(id));}publicstaticvoidsend2All(TextWebSocketFrame tws){
        group.writeAndFlush(tws);}publicstaticStringsendMessage(String parkId,String message){List<Channel> channelList =getChannelByName(parkId);if(channelList.size()<=0){return"parkId"+ parkId +"不在线!";}ByteBuf byteBuf =Unpooled.copiedBuffer(message.getBytes());//        channelList.forEach(channel -> channel.writeAndFlush(new BinaryWebSocketFrame(byteBuf)));
        channelList.forEach(channel -> channel.writeAndFlush(newTextWebSocketFrame(message)));return"success";}/**
     * 根据用户parkId查找channel
     * 
     * @param parkId
     * @return
     */publicstaticList<Channel>getChannelByName(String parkId){AttributeKey<String> key =AttributeKey.valueOf("parkId");returnChannelHandlerPool.channelGroup.stream().filter(channel -> channel.attr(key).get().equals(parkId)).collect(Collectors.toList());}}
packagecom.cetcnav.worker;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importcom.cetcnav.cacheManage.MessageCahce;importcom.cetcnav.nioWebSocket.ChannelHandlerPool;importio.netty.handler.codec.http.websocketx.TextWebSocketFrame;/**
 * @author wp
 * @date 2021年6月3日下午8:11:47
 * @projectName GRWebSocket
 * @fileName SendMessage2WebWorker.java
 * @tags 类说明: 主动向前端推送解析之后的数据信息 
 */publicclassSendMessage2WebWorkerimplementsRunnable{privatestaticLogger log =LoggerFactory.getLogger(SendMessage2WebWorker.class);@Overridepublicvoidrun(){while(true){String json =MessageCahce.take();
            log.info(json);ChannelHandlerPool.send2All(newTextWebSocketFrame(json));if(json.contains("0060")){
                log.info("已推送干扰数据:");}elseif(json.contains("0061")){
                log.info("已推送功放数据:");}}}}
<!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><title>Netty-Websocket</title></head><scriptsrc="webjars/jquery/3.4.1/jquery.js"></script><scripttype="text/javascript">var socket;if(!window.WebSocket){
        window.WebSocket = window.MozWebSocket;}if(window.WebSocket){// uid字段要和后台WebSocketHandler类中的paramMap.get("uid")字段对应
        socket =newWebSocket("ws://222.222.216.210:7777/websocket");// 55代表用户id
        socket.onmessage=function(event){//$("#websocket").html(event.data)
            console.log(event.data)};
        socket.onopen=function(event){
            console.log("Netty-WebSocket服务器。。。。。。连接")};
        socket.onclose=function(event){
            console.log("Netty-WebSocket服务器。。。。。。关闭")};}else{alert("您的浏览器不支持WebSocket协议!");}</script><body><divid="ws"class="ws"></div></body></html>

本文转载自: https://blog.csdn.net/qq_33651286/article/details/140638776
版权归原作者 Mr.4567 所有, 如有侵权,请联系我们删除。

“Netty框架:java实现websocket,向网页实时推送消息”的评论:

还没有评论