0


Netty进阶 -- 非阻塞网络编程 实现群聊+私聊+心跳检测系统

📢📢📢📣📣📣

哈喽!大家好,我是【Bug 终结者,【CSDN新星创作者】🏆,阿里云技术博主🏆,51CTO人气博主🏆,INfoQ写作专家🏆

一位上进心十足,拥有极强学习力的【Java领域博主】😜😜😜

🏅【Bug 终结者】博客的领域是【面向后端技术】的学习,未来会持续更新更多的【后端技术】以及【学习心得】。 偶尔会分享些前端基础知识,会更新实战项目,面向企业级开发应用
🏅 如果有对【后端技术】、【前端领域】感兴趣的【小可爱】,欢迎关注【Bug 终结者】💞💞💞

❤️❤️❤️ 感谢各位大可爱小可爱! ❤️❤️❤️

文章目录

Netty系列文章

Netty入门 – 什么是Netty?

一、需求说明

使用Netty实现群聊+私聊系统

  1. 编写一个Netty群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞形式)
  2. 实现多人群聊
  3. 实现单人私聊
  4. 利用心跳检测机制监听客户端是否存在连接(是否存在读、写、读写操作
  5. 服务器端:可以检测用户上线,离线,并且实现消息转发功能
  6. 客户端:通过channel可以无阻塞发送消息给其它所有在线用户,同时可以接受所有在线用户发送的消息(由服务器转发消息得到

二、什么是心跳检测机制?

心跳检测机制就是在一定的时间范围内客户端与服务器之间没有发生读、写、读写操作,那么就认定客户端与服务器无连接,这样就节省了服务器的资源

❤️Netty实现心跳检测机制

服务器启动前添加前置处理器

//添加心跳检测
pipeline.addLast(newIdleStateHandler(3,5,7,TimeUnit.SECONDS));//添加自定义心跳处理器
pipeline.addLast(newHeartbeatServerHandler());

IdleStateHandler是Netty 提供的处理空闲状态的处理器

参数说明

// long readerIdleTime: 表示多长时间没有读,就会发送一个心跳检测包,检测是否还处于连接状态// long writerIdleTime: 表示多长时间没有写,就会发送一个心跳检测包,检测是否还处于连接状态// long allIdleTime:    表示多长时间没有读写操作,就会发送一个心跳检测包,检测是否处于连接状态// 最后一个参数是当前时间的单位,秒或分钟或小时。

源码表示当前处理器类是表示多长时间内没有读、没有写、或者没有读写操作,就会触发IdleStateEvent事件
Triggers an IdleStateEvent when a Channel has not performed read, write, or both operation for a while.

IdleStateEvent事件 触发后, 就会传递给管道的下一个handler处理
通过调用(触发)handleruserEventTiggered 在该方法中处理 当IdleStateEvent事件

HeartbeatServerHandler自定义心跳处理器

publicclassHeartbeatServerHandlerextendsChannelInboundHandlerAdapter{/**
     * 客户端在指定时间内未触发相应操作执行此方法,即认为与客户端断开连接
     * @param ctx   全局上下文对象
     * @param evt   事件
     * @throws Exception    发生异常时抛出
     */@OverridepublicvoiduserEventTriggered(ChannelHandlerContext ctx,Object evt)throwsException{//判断当前事件是否为IdleStateEventif(evt instanceofIdleStateEvent){//将evt强转为IdleStateEventIdleStateEvent event =(IdleStateEvent) evt;//判断到底发生的事件是什么String eventType =null;//由于IdleStateEvent底层判断事件是根据枚举类型来的,所以直接判断即可switch(event.state()){case READER_IDLE:
                    eventType ="读空闲";break;case WRITER_IDLE:
                    eventType ="写空闲";break;case ALL_IDLE:
                    eventType ="读写空闲";break;}System.out.println(ctx.channel().remoteAddress()+"发生超时事件,事件类型为:"+ eventType);System.out.println("服务器做相应处理");}}

心跳检测机制就是这样,简单来说,就是每隔一段时间去检测客户端是否与服务器连接,如果无连接,那么就断开,从而节省服务器的资源

三、需求分析

🚝多人群聊

利用map集合,Map<String, Channel> 里面存入当前在线的所有用户,继承 SimpleChannelInboundHandler 处理器 并在对应的处理器进行添加通道到map

然后实现处理器的channelRead0方法进行转发数据,这就简单的实现了多人群聊
在这里插入图片描述

🚝单人私聊

单人私聊与多人群聊类似,也是在channelRead0方法内进行判断是否为私聊用户,私聊用户输入#端口号#要发送的内容,即可简单检测到本次消息为私聊,并从map中取出对应的key,拿出key对应的channel,进行转发,即可完成私聊

在这里插入图片描述

接受消息,其它用户不会看到此私聊消息

🚝服务器检测用户上线、离线

服务器端检测用户当前的状态,实现对应的方法进行相应的提示即可

  • 实现handlerAdded检测某个用户加入聊天,
  • 实现channelActive表示channel处于活跃状态,即上线
  • 实现channelInactive表示channel处于非活跃状态,即离线,
  • 实现 handlerRemoved 表示离线

在这里插入图片描述

四、效果图

在这里插入图片描述

五、核心源码

GroupChatServer服务器端代码

packagecom.wanshi.netty.groupchat;importcom.wanshi.netty.heartbeat.HeartbeatServerHandler;importio.netty.bootstrap.ServerBootstrap;importio.netty.channel.*;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.SocketChannel;importio.netty.channel.socket.nio.NioServerSocketChannel;importio.netty.handler.codec.string.StringDecoder;importio.netty.handler.codec.string.StringEncoder;importio.netty.handler.timeout.IdleStateHandler;importjava.util.concurrent.TimeUnit;publicclassGroupChatServer{// 监听端口privateint port;publicGroupChatServer(int port){this.port = port;}//编写run方法,处理客户端的请求publicvoidrun(){//创建两个线程组EventLoopGroup bossGroup =newNioEventLoopGroup(1);//Nio核数 * 2EventLoopGroup workerGroup =newNioEventLoopGroup();ServerBootstrap bootstrap =newServerBootstrap();try{
            bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,128).childOption(ChannelOption.SO_KEEPALIVE,true).childHandler(newChannelInitializer<SocketChannel>(){@OverrideprotectedvoidinitChannel(SocketChannel socketChannel)throwsException{//获取pipelineChannelPipeline pipeline = socketChannel.pipeline();//向pipeline加入解码器
                            pipeline.addLast("decoder",newStringDecoder());//向pipeline加入编码器
                            pipeline.addLast("encoder",newStringEncoder());//加入自己的业务处理handler
                            pipeline.addLast(newGroupChatServerHandler());//加入心跳检测机制
                            pipeline.addLast(newIdleStateHandler(3,5,7,TimeUnit.SECONDS));
                            pipeline.addLast(newHeartbeatServerHandler());}});System.out.println("netty 服务器启动");ChannelFuture future = bootstrap.bind(port).sync();//监听关闭事件
            future.channel().closeFuture().sync();}catch(Exception e){
            e.printStackTrace();}finally{
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();}}publicstaticvoidmain(String[] args){GroupChatServer groupChatServer =newGroupChatServer(7000);
        groupChatServer.run();}}

GroupChatServerHandler 服务器自定义handler

packagecom.wanshi.netty.groupchat;importio.netty.channel.Channel;importio.netty.channel.ChannelHandlerContext;importio.netty.channel.SimpleChannelInboundHandler;importjava.text.SimpleDateFormat;importjava.util.Date;importjava.util.HashMap;importjava.util.Map;publicclassGroupChatServerHandlerextendsSimpleChannelInboundHandler<String>{//所有的channel存入map集合中,目的是为了私聊好获取用户privatestaticMap<String,Channel> allChannels =newHashMap<String,Channel>();//格式化所有日期时间privateSimpleDateFormat sdf =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");//转化日期privateString currentDate = sdf.format(newDate());/**
     * handlerAdded 表示连接建立,一旦连接建立,第一个被执行
     * 将当前channel加入到map集合
     */@OverridepublicvoidhandlerAdded(ChannelHandlerContext ctx)throwsException{//获取当前channelChannel channel = ctx.channel();//推送客户加入聊天的信息推送给其它在线的客户端//该方法会将channelGroup中所有的channel遍历并发送消息
        allChannels.forEach((k, ch)->{
            ch.writeAndFlush(currentDate+" \n [客户端]"+ channel.remoteAddress()+"加入聊天\n");});//获取端口号String key = channel.remoteAddress().toString().split(":")[1];
        allChannels.put(key, channel);}/**
     * 表示断开连接了,将xx客户离开信息推送给当前在线的客户
     * @param ctx
     * @throws Exception
     */@OverridepublicvoidhandlerRemoved(ChannelHandlerContext ctx)throwsException{//获取当前channelChannel channel = ctx.channel();//推送客户加入聊天的信息推送给其它在线的客户端//该方法会将map中所有的channel遍历并发送消息
        allChannels.forEach((k, ch)->{
            ch.writeAndFlush(currentDate+" \n [客户端]"+ channel.remoteAddress()+"离线\n");});System.out.println("当前在线人数:"+ allChannels.size());}/**
     * 读取数据并将数据转发给在线的客户端
     * @param channelHandlerContext
     * @param s
     * @throws Exception
     */@OverrideprotectedvoidchannelRead0(ChannelHandlerContext channelHandlerContext,String s)throwsException{//获取到当前channelChannel channel = channelHandlerContext.channel();//私聊用户发送消息if(s.contains("#")){String id = s.split("#")[1];String body = s.split("#")[2];Channel userChannel = allChannels.get(id);String key = channel.remoteAddress().toString().split(":")[1];
            userChannel.writeAndFlush(currentDate+"\n "+key+"【私聊】 [用户] "+id+" 说 : "+body);return;}//循环遍历hashmap集合进行转发消息
        allChannels.forEach((k, ch)->{if(channel != ch){
                ch.writeAndFlush(currentDate +" \n [客户端]"+ channel.remoteAddress()+":"+ s +"\n");}else{// 发送消息给自己,回显自己发送的消息
                channel.writeAndFlush(currentDate +" \n [我]:"+ s +"\n");}});}/**
     * 表示channel处于活动状态
     * @param ctx
     * @throws Exception
     */@OverridepublicvoidchannelActive(ChannelHandlerContext ctx)throwsException{System.out.println(currentDate +" -- "+ ctx.channel().remoteAddress()+"上线~");}/**
     * 失去连接时会触发此方法
     * @param ctx 全局上下文对象
     * @throws Exception
     */@OverridepublicvoidchannelInactive(ChannelHandlerContext ctx)throwsException{Channel channel = ctx.channel();String key = channel.remoteAddress().toString().split(":")[1];
        allChannels.remove(key);System.out.println(currentDate +" -- "+ ctx.channel().remoteAddress()+"离线");}@OverridepublicvoidexceptionCaught(ChannelHandlerContext ctx,Throwable cause)throwsException{//关闭
        ctx.close();}}

自定义心跳处理器 – HeartbeatServerHandler

packagecom.wanshi.netty.heartbeat;importio.netty.channel.ChannelHandlerContext;importio.netty.channel.ChannelInboundHandlerAdapter;importio.netty.handler.timeout.IdleStateEvent;publicclassHeartbeatServerHandlerextendsChannelInboundHandlerAdapter{/**
     * 客户端在指定时间内未触发相应操作执行此方法,即认为与客户端断开连接
     * @param ctx   全局上下文对象
     * @param evt   事件
     * @throws Exception    发生异常时抛出
     */@OverridepublicvoiduserEventTriggered(ChannelHandlerContext ctx,Object evt)throwsException{//判断当前事件是否为IdleStateEventif(evt instanceofIdleStateEvent){//将evt强转为IdleStateEventIdleStateEvent event =(IdleStateEvent) evt;//判断到底发生的事件是什么String eventType =null;//由于IdleStateEvent底层判断事件是根据枚举类型来的,所以直接判断即可switch(event.state()){case READER_IDLE:
                    eventType ="读空闲";break;case WRITER_IDLE:
                    eventType ="写空闲";break;case ALL_IDLE:
                    eventType ="读写空闲";break;}System.out.println(ctx.channel().remoteAddress()+"发生超时事件,事件类型为:"+ eventType);System.out.println("服务器做相应处理");}}@OverridepublicvoidexceptionCaught(ChannelHandlerContext ctx,Throwable cause)throwsException{System.out.println("发生异常!");}}

GroupChatClient 客户端

packagecom.wanshi.netty.groupchat;importio.netty.bootstrap.Bootstrap;importio.netty.channel.*;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.SocketChannel;importio.netty.channel.socket.nio.NioSocketChannel;importio.netty.handler.codec.string.StringDecoder;importio.netty.handler.codec.string.StringEncoder;importjava.util.Scanner;publicclassGroupChatClient{//定义属性privatefinalString host;publicfinalint port;publicGroupChatClient(String host,int port){this.host = host;this.port = port;}publicvoid run (){EventLoopGroup eventExecutors =newNioEventLoopGroup();Bootstrap bootstrap =newBootstrap();try{
            bootstrap.group(eventExecutors).channel(NioSocketChannel.class).handler(newChannelInitializer<SocketChannel>(){@OverrideprotectedvoidinitChannel(SocketChannel socketChannel)throwsException{//得到pipelineChannelPipeline pipeline = socketChannel.pipeline();//加入相关的handler
                            pipeline.addLast("decoder",newStringDecoder());
                            pipeline.addLast("encoder",newStringEncoder());//加入自定义handler
                            pipeline.addLast(newGroupChatClientHandler());}});ChannelFuture channelFuture = bootstrap.connect(host, port).sync();//得到channelChannel channel = channelFuture.channel();System.out.println("-----"+ channel.localAddress()+"----");//客户端需要输入信息,创建一个扫描器Scanner scanner =newScanner(System.in);while(scanner.hasNext()){String msg = scanner.nextLine();//通过channel发送到服务器端
                channel.writeAndFlush(msg+"\r\n");}
            channelFuture.channel().closeFuture().sync();}catch(Exception e){}finally{
            eventExecutors.shutdownGracefully();}}publicstaticvoidmain(String[] args){GroupChatClient groupChatClient =newGroupChatClient("127.0.0.1",7000);
        groupChatClient.run();}}

GroupChatClientHandler 客户端自定义处理器Handler

packagecom.wanshi.netty.groupchat;importio.netty.channel.Channel;importio.netty.channel.ChannelHandlerContext;importio.netty.channel.SimpleChannelInboundHandler;importjava.text.SimpleDateFormat;importjava.util.Date;importjava.util.HashMap;importjava.util.Map;publicclassGroupChatServerHandlerextendsSimpleChannelInboundHandler<String>{//所有的channel存入map集合中,目的是为了私聊好获取用户privatestaticMap<String,Channel> allChannels =newHashMap<String,Channel>();//格式化所有日期时间privateSimpleDateFormat sdf =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");//转化日期privateString currentDate = sdf.format(newDate());/**
     * handlerAdded 表示连接建立,一旦连接建立,第一个被执行
     * 将当前channel加入到map集合
     */@OverridepublicvoidhandlerAdded(ChannelHandlerContext ctx)throwsException{//获取当前channelChannel channel = ctx.channel();//推送客户加入聊天的信息推送给其它在线的客户端//该方法会将channelGroup中所有的channel遍历并发送消息
        allChannels.forEach((k, ch)->{
            ch.writeAndFlush(currentDate+" \n [客户端]"+ channel.remoteAddress()+"加入聊天\n");});//获取端口号String key = channel.remoteAddress().toString().split(":")[1];
        allChannels.put(key, channel);}/**
     * 表示断开连接了,将xx客户离开信息推送给当前在线的客户
     * @param ctx
     * @throws Exception
     */@OverridepublicvoidhandlerRemoved(ChannelHandlerContext ctx)throwsException{//获取当前channelChannel channel = ctx.channel();//推送客户加入聊天的信息推送给其它在线的客户端//该方法会将map中所有的channel遍历并发送消息
        allChannels.forEach((k, ch)->{
            ch.writeAndFlush(currentDate+" \n [客户端]"+ channel.remoteAddress()+"离线\n");});System.out.println("当前在线人数:"+ allChannels.size());}/**
     * 读取数据并将数据转发给在线的客户端
     * @param channelHandlerContext
     * @param s
     * @throws Exception
     */@OverrideprotectedvoidchannelRead0(ChannelHandlerContext channelHandlerContext,String s)throwsException{//获取到当前channelChannel channel = channelHandlerContext.channel();//私聊用户发送消息if(s.contains("#")){String id = s.split("#")[1];String body = s.split("#")[2];Channel userChannel = allChannels.get(id);String key = channel.remoteAddress().toString().split(":")[1];
            userChannel.writeAndFlush(currentDate+"\n "+key+"【私聊】 [用户] "+id+" 说 : "+body);return;}//循环遍历hashmap集合进行转发消息
        allChannels.forEach((k, ch)->{if(channel != ch){
                ch.writeAndFlush(currentDate +" \n [客户端]"+ channel.remoteAddress()+":"+ s +"\n");}else{// 发送消息给自己,回显自己发送的消息
                channel.writeAndFlush(currentDate +" \n [我]:"+ s +"\n");}});}/**
     * 表示channel处于活动状态
     * @param ctx
     * @throws Exception
     */@OverridepublicvoidchannelActive(ChannelHandlerContext ctx)throwsException{System.out.println(currentDate +" -- "+ ctx.channel().remoteAddress()+"上线~");}/**
     * 失去连接时会触发此方法
     * @param ctx 全局上下文对象
     * @throws Exception
     */@OverridepublicvoidchannelInactive(ChannelHandlerContext ctx)throwsException{Channel channel = ctx.channel();String key = channel.remoteAddress().toString().split(":")[1];
        allChannels.remove(key);System.out.println(currentDate +" -- "+ ctx.channel().remoteAddress()+"离线");}@OverridepublicvoidexceptionCaught(ChannelHandlerContext ctx,Throwable cause)throwsException{//关闭
        ctx.close();}}

往期精彩热文回顾

🚀 **Netty入门 – 什么是Netty? **
🚀 如何免费使用阿里云服务器?【一篇文章教会你,真香】
🚀 如何使用Git SVN工具 – TortoiseGit(小乌龟)将本地项目上传至GitEE?【超详细教程】
🚀 前后端分离系列 – SpringBoot + Spring Security + Vue 实现用户认证 SpringSecurity如此简单

🚀 Postman测试工具调试接口详细教程【向后端发送Json数据并接收返回的Json结果】

🚀 Java面向对象 — 吃货联盟订餐系统(完整版)

⛲小结

以上就是【Bug 终结者】对Netty非阻塞网络编程简单的理解,小编认为唯有代码实践,才可提升自己的技术,手不要懒,多敲,本案例完美的体现了Netty非阻塞式网络编程的模式,方便,快捷,代码略微有点多,但滤清思路,一步步来,总会慢慢理解的,加油,希望本文对你有帮助~

如果这篇【文章】有帮助到你,希望可以给【Bug 终结者】点个赞👍,创作不易,如果有对【后端技术】、【前端领域】感兴趣的小可爱,也欢迎关注❤️❤️❤️ 【Bug 终结者】❤️❤️❤️,我将会给你带来巨大的【收获与惊喜】💝💝💝!

标签: 网络 java 服务器

本文转载自: https://blog.csdn.net/weixin_45526437/article/details/123197281
版权归原作者 Bug 终结者 所有, 如有侵权,请联系我们删除。

“Netty进阶 -- 非阻塞网络编程 实现群聊+私聊+心跳检测系统”的评论:

还没有评论