0


深入理解网络 I/O:单 Selector 多线程|单线程模型

在这里插入图片描述

🔭 嗨,您好 👋 我是 vnjohn,在互联网企业担任 Java 开发,CSDN 优质创作者
📖 推荐专栏:Spring、MySQL、Nacos、Java,后续其他专栏会持续优化更新迭代
🌲文章所在专栏:网络 I/O
🤔 我当前正在学习微服务领域、云原生领域、消息中间件等架构、原理知识
💬 向我询问任何您想要的东西,ID:vnjohn
🔥觉得博主文章写的还 OK,能够帮助到您的,感谢三连支持博客🙏
😄 代词: vnjohn
⚡ 有趣的事实:音乐、跑步、电影、游戏

目录

前言

在之前的文章中,从阻塞 I/O:BIO、非阻塞 I/O:NIO、多路复用 select/poll、多路复用 epoll

重要的 I/O 模型也是现在市场上大部分中间件运用的模型也就是基于 I/O 多路复用:epoll,比如:Redis、RocketMQ、Nginx 等,这些地方都运用了 epoll,只不过在 RocketMQ 的实现采用了 Netty,而 Netty 也基于 epoll 这套多路复用模型进行实现的,所以在后续的这些文章会围绕 Netty 的变种,看它是如何一步步从单 Selector 非线性模型 —> 单 Selector 线性模型 —> 单 Selector Group 混杂模式 —> 多 Selector Group 主从模式一步步演练过来的,本篇博文主要围绕单 Selector 非线性模型 —> 单 Selector 线性模型进行具体的展开.

非线性 VS 线性

非线性指的就是多个线程并行执行完这一段业务,结果并不是按顺序执行的(你以为的执行结果)

线性指的就是由一个线程执行完这一段业务,结果是按顺序执行完毕的

单 Selector 非线性模型

图解分析

假设说,现在给 客户端1 分配到的是 socket fd 2,在客户端读数据时为它分配一个读事件,当它到达读的逻辑时,再给它分配一个对应的写事件,那么如果不对读事件或写事件做 cancel 的话,那么读、写事件会一直存在,也就是它会在被 epfd 所分配的链表结构中一直存放着,其实这些读写事件走完它的流程时,它相当于已完成本次的读写任务了,它没有本质上存在的意义了,如果一直存在,它就会一直被调起,重复的调用!!!

在之前的 epoll 分析时,并没有看到 epoll_ctl(epfd, EPOLL_CTL_DEL, fd, events) 的函数调用,说明在这里就是要分析使用它的地方,它在 Java 代码中相当于就是 java.nio.channels.SelectionKey#cancel 的实现

SelectionKey#cancel:请求取消此事件的通道与 selector 的注册,调用该方法返回时,该事件将无效,并且将添加到 selector 取消事件集合中,在下一个选择 select 操作期间,**

该事件将从所有的 selector 事件集合中删除

**,也就是不会再被调用

在这里插入图片描述

如上图:

  1. 主线程 Main Thread 负责接收客户端连接,由单个 selector 管理所有客户端 fds 连接,并对所连接的客户端 fd 注册读 read 事件 > 也就是调用 epoll_ctl
  2. 当 select 方法被调用时,会监听到链表中有读状态的 fd 事件,然后在 Java 程序中会调用 readHandler 方法去新开辟一个线程资源去处理,由于此时新开辟的线程和主线程并不是线程执行的,若此时不加 SelectionKey#cancel,即使已经抛出了线程,在线程执行前后这个时差上,该客户端的 fd 读事件会被重复触发.
  3. 当 readHandler 方法执行完,会向 selector 注册一个客户端 fd 写事件,也就是调用 epoll_ctl,然后下一次循环走到 select 方法被调用时,会监听到客户端写状态的 fd,然后再调用 writeHandler 方法新开辟一个线程资源去处理,由于此时新开辟的写线程也不是和主线程线性执行的,若此时不加 SelectionKey#cancel,select 方法再次被调用时,就会一直调用 writeHandler 方法去执行也就是会一直开辟新的线程去执行写的操作.
  4. 造成客户端 fd R/W 事件重复调用的原因:在主线程中不是通过线性的方式去执行读、写操作的,所以读写事件会被重复调用,解决方案:调用 SelectionKey#cancel 方法,在内核级别相当于 epoll_ctl(epfd, EPOLL_CTL_DEL, socketfd)

源码

以上图解分析的结果会通过以下源码的方式来演练,并会去观察 strace 生成的内核源码,在多线程非线性模型下,加 cancel 与 不加 cancel 方法之间的区别

packageorg.vnjohn.select;importjava.io.IOException;importjava.net.InetSocketAddress;importjava.nio.ByteBuffer;importjava.nio.channels.SelectionKey;importjava.nio.channels.Selector;importjava.nio.channels.ServerSocketChannel;importjava.nio.channels.SocketChannel;importjava.util.Iterator;importjava.util.Set;/**
 * @author vnjohn
 * @since 2023/12/7
 */publicclassSelectMultiplexingSocketMultiThread{privateSelector selector =null;int port =8090;publicvoidinitServer(){try{ServerSocketChannel server =ServerSocketChannel.open();
            server.configureBlocking(false);
            server.bind(newInetSocketAddress(port));//  select、poll、*epoll 都是使用同样的方式打开
            selector =Selector.open();
            server.register(selector,SelectionKey.OP_ACCEPT);}catch(IOException e){
            e.printStackTrace();}}publicvoidstart(){initServer();System.out.println("Socket Server start...");try{while(true){while(selector.select(50)>0){Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iter = selectionKeys.iterator();while(iter.hasNext()){SelectionKey key = iter.next();
                        iter.remove();if(key.isAcceptable()){acceptHandler(key);}elseif(key.isReadable()){// 先在多路复用器里把 key->cancel 了System.out.println("in.....");readHandler(key);}elseif(key.isWritable()){// 1、你准备好要写什么了,这是第一步// 2、第二步你才关心send-queue是否有空间// so,读 read 一开始就要注册,但是 write 依赖 1、2 关系,什么时候用什么时候注册// 如果一开始就注册了write的事件,进入死循环,一直调起!!!
                            key.cancel();writeHandler(key);}}}}}catch(IOException e){
            e.printStackTrace();}}privatevoidwriteHandler(SelectionKey key){newThread(()->{System.out.println("write handler...");SocketChannel client =(SocketChannel) key.channel();ByteBuffer buffer =(ByteBuffer) key.attachment();
            buffer.flip();while(buffer.hasRemaining()){try{
                    client.write(buffer);}catch(IOException e){
                    e.printStackTrace();}}try{Thread.sleep(2000);}catch(InterruptedException e){
                e.printStackTrace();}
            buffer.clear();}).start();}publicvoidacceptHandler(SelectionKey key){try{ServerSocketChannel ssc =(ServerSocketChannel) key.channel();SocketChannel client = ssc.accept();
            client.configureBlocking(false);ByteBuffer buffer =ByteBuffer.allocate(8192);
            client.register(selector,SelectionKey.OP_READ, buffer);System.out.println("-------------------------------------------");System.out.println("new SocketClient:"+ client.getRemoteAddress());System.out.println("-------------------------------------------");}catch(IOException e){
            e.printStackTrace();}}publicvoidreadHandler(SelectionKey key){// 即便已经抛出了线程去读取,但是在时差里,这个 key->read 事件会被重复触发newThread(()->{System.out.println("read handler.....");SocketChannel client =(SocketChannel) key.channel();ByteBuffer buffer =(ByteBuffer) key.attachment();
            buffer.clear();int read;try{while(true){
                    read = client.read(buffer);System.out.println(Thread.currentThread().getName()+" "+ read);if(read >0){
                        key.interestOps(SelectionKey.OP_READ);
                        client.register(key.selector(),SelectionKey.OP_WRITE, buffer);}elseif(read ==0){break;}else{
                        client.close();break;}}}catch(IOException e){
                e.printStackTrace();}}).start();}publicstaticvoidmain(String[] args){SelectMultiplexingSocketMultiThread service =newSelectMultiplexingSocketMultiThread();
        service.start();}}

strace 追踪

先将源码中 key.cannel 代码进行注释,再观察命令窗口是否会重复调用 R/W 事件操作.

1、先将代码首行 package 移除
2、通过 javac 将源文件 .java 生成 .class
3、通过命令启动服务端:strace -ff -o epoll java -Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.EPollSelectorProvider SelectMultiplexingSocketMultiThread
4、通过 nc localhost 8090 模拟客户端连接

若将代码中 key.cannel 移除,在客户端命令窗口输入内容以后,服务端会读取这份内容并会注册一个写事件:EPOLL_OUT,此时的效果就是在后台会一直触发 writeHandler 方法的调用

在这里插入图片描述

在这里插入图片描述

若将代码中 key.cannel 恢复,在程序中每执行完一次读、写事件以后就会将事件注销掉,也就是它会从链表中移除这两个对应的事件,确保下一次 select 不会被再次触发调用.

在这里插入图片描述

在一定的时间差内,read 事件会被重复触发,当执行到了 writeHandler 以后,该事件已经被 cannel 掉了,此时已经不会再重复被调起了.

小结

非线性模型:由单个线程负责 accept 接收客户端连接,然后抛出不同的线程分别去处理读、写

考虑资源利用,为了充分利用好 CPU 核数
若有一个 socket fd 执行特别耗时,在一个单(线性)流程里会阻塞其他的 socket fd 处理

考虑如何处理当有 N 个 fd 同时有 R/W 处理的时候,可以分为以下几步处理:

  1. 将 N 个 FD 分组,每一组对应一个 selector,将每一个 selector 分别放到不同的线程上,selector 与线程的关系是 1:1
  2. 若是多个线程:它们分别在不同的 CPU 上执行,此时会存在多个 selector 并行,此时线程内部是线性执行的方式,最终是多个 FD 在并行的处理 accept、R/W 事件

不是说一个 selector 中的 FD 并行在多个线程里面处理,而是每一个 selector 都会保证一个 FD 在执行,且是线性处理的

以上的考虑都是基于分而治之思想,假设:程序里有 100W 个连接,有四个线程(selector)此时可以拿出其中一个 selector 就单单关注 accept 事件,然后把 accept 接收过后的客户端 FD R/W 事件分配给其他 selector 去进行处理.

单 Selector 线性模型

单个 selector 充当为一个线程 thread,来接收处理客户端的 accept 以及接收客户端读写 R/W 事件

SelectorThread

packageorg.vnjohn.selector.singleton;importjava.io.IOException;importjava.nio.ByteBuffer;importjava.nio.channels.*;importjava.util.Iterator;importjava.util.Set;importjava.util.concurrent.LinkedBlockingQueue;/**
 * @author vnjohn
 * @since 2023/12/15
 */publicclassSelectorThreadimplementsRunnable{Selector selector =null;LinkedBlockingQueue<Channel> lbq =newLinkedBlockingQueue<>();SelectorThreadGroup selectorThreadGroup =null;publicSelectorThread(SelectorThreadGroup selectorThreadGroup){try{this.selectorThreadGroup = selectorThreadGroup;
            selector =Selector.open();}catch(IOException e){
            e.printStackTrace();}}@Overridepublicvoidrun(){// loopwhile(true){try{// 1.select:如果一直没有 fd,该方法会阻塞,一直没有返回,通过调用 wakeup() 唤醒System.out.println(Thread.currentThread().getName()+"   :   before select ......"+ selector.keys().size());int num = selector.select();System.out.println(Thread.currentThread().getName()+"   :   after select ......"+ selector.keys().size());// 2.处理 selectKeysif(num >0){Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while(iterator.hasNext()){// 每一个 fd 是线性处理的过程SelectionKey key = iterator.next();
                        iterator.remove();if(key.isAcceptable()){// 接受客户端的过程acceptHandler(key);}elseif(key.isReadable()){readHandler(key);}elseif(key.isWritable()){}}}// 3.处理 queue runTask,队列是堆里的对象,线程的栈是独立的,堆是共享的,只有方法的逻辑,本地变量是线程隔离的if(!lbq.isEmpty()){Channel channel = lbq.take();// accept 使用的是 ServerSocketChannelif(channel instanceofServerSocketChannel){ServerSocketChannel server =(ServerSocketChannel) channel;
                        server.register(selector,SelectionKey.OP_ACCEPT);System.out.println(Thread.currentThread().getName()+" register server");// read / write 使用的是 SocketChannel}elseif(channel instanceofSocketChannel){SocketChannel client =(SocketChannel) channel;ByteBuffer buffer =ByteBuffer.allocateDirect(4096);
                        client.register(selector,SelectionKey.OP_READ, buffer);System.out.println(Thread.currentThread().getName()+" register client:"+ client.getRemoteAddress());}}}catch(IOException|InterruptedException e){
                e.printStackTrace();}}}privatevoidreadHandler(SelectionKey key){System.out.println(Thread.currentThread().getName()+"  readHandler.......");ByteBuffer buffer =(ByteBuffer) key.attachment();SocketChannel client =(SocketChannel) key.channel();
        buffer.clear();while(true){try{int num = client.read(buffer);if(num >0){// 将读到的内容翻转,然后直接写出
                    buffer.flip();while(buffer.hasRemaining()){
                        client.write(buffer);}
                    buffer.clear();}elseif(num ==0){break;}else{// 有可能客户端断开了-异常情况System.out.println("client:"+ client.getRemoteAddress()+"      closed....");
                    key.cancel();
                    client.close();break;}}catch(IOException e){
                e.printStackTrace();}}}privatevoidacceptHandler(SelectionKey key){System.out.println(Thread.currentThread().getName()+"  acceptHandler.......");ServerSocketChannel server =(ServerSocketChannel) key.channel();try{SocketChannel client = server.accept();
            client.configureBlocking(false);// choose a selector and register !!
            selectorThreadGroup.nextSelector(client);}catch(IOException e){
            e.printStackTrace();}}}

第一个循环是死循环,让当前的线程一直阻塞运行处理事件

第二个循环是调用 Selector#select 方法,一直等待拿到事件,在这个里面会判断到来的事件是属于 accept | read | write,再执行对应的操作,在这里会做一个事情:当拿到 accept 新连接的客户端,再将它的连接信息绑定到对应的 selector,也就是将它添加到链表队列中

第三个非循环,只是从队列中取出元素,无元素的情况进行下一次的 select,存在元素则判断这个元素是属于服务端的 channel 还是客户端的 channel,**若是服务端的 channel 则将它往 epfd 注册一个 accept 事件,若是客户端的 channel 则将它往 epfd 注册一个 read 事件,

read 事件是一直存在的!!!

** 而写事件是由服务端主动发起的,在这里就是模拟业务的过程在 readHandler 处理过程中直接将源数据写回到客户端

SelectorGroup

使用 Selector Group 来用于分配线程执行以及 selector 调度执行,目前在此都是采用的单线程!!

该 Group 用于承担以后 Boss、Worker 角色的核心分配类

packageorg.vnjohn.selector.singleton;importjava.io.IOException;importjava.net.InetSocketAddress;importjava.nio.channels.Channel;importjava.nio.channels.ServerSocketChannel;importjava.util.concurrent.atomic.AtomicInteger;/**
 * @author vnjohn
 * @since 2023/12/15
 */publicclassSelectorThreadGroup{SelectorThread[] selectorThreads;ServerSocketChannel server =null;AtomicInteger xid =newAtomicInteger(0);publicSelectorThreadGroup(int num){// num 是线程数
        selectorThreads =newSelectorThread[num];// 启动多个线程,一个线程对应一个 selectorfor(int i =0; i < selectorThreads.length; i++){
            selectorThreads[i]=newSelectorThread(this);newThread(selectorThreads[i],"SelectorThread-"+ i).start();}}publicvoidbind(int port){try{
            server =ServerSocketChannel.open();
            server.configureBlocking(false);
            server.bind(newInetSocketAddress(port));// 选择一个 selector 来充当服务端的 accept 连接nextSelector(server);}catch(IOException e){
            e.printStackTrace();}}/**
     * 无论是 ServerSocketChannel 还是 SocketChannel 都复用这个方法
     */publicvoidnextSelector(Channel channel){//  在主线程中,取到堆里的 selectorThread 对象SelectorThread selectorThread =next();// 1.通过队列传递数据、消息
        selectorThread.lbq.add(channel);// 2.通过打断阻塞,让对应的线程在打断后去自己完成注册 selector,唤醒 select 阻塞的操作
        selectorThread.selector.wakeup();// 这个时候才有了队列,多线程模型下才能进行相互之间的通信}privateSelectorThreadnext(){// 单个 group 多线程时,会进行轮询处理,有可能也会导致资源倾斜int index = xid.incrementAndGet()% selectorThreads.length;return selectorThreads[index];}}

重点:**

channel 有可能是 server 的 ServerSocketChannel 也有可能是 client 的 SocketChannel

**,在这里做强制转换会出现错误,将 channel 分配的工作延迟给到队列进行 take,由阻塞链表队列来进行区分是属于服务端的 channel 还是客户端的 channel,再去执行对应的操作:accept、read、write

MainThread

以下主线程类不做任何业务 I/O 相关的工作,只是为了创建一个带有指定数量的 SelectorGroup

packageorg.vnjohn.selector.singleton;/**
 * @author vnjohn
 * @since 2023/12/15
 */publicclassMainThread{publicstaticvoidmain(String[] args){// 1、创建 IO Thread(一个或多个)SelectorThreadGroup selectorThreadGroup =newSelectorThreadGroup(1);// 混杂模式:只有一个线程负责 accept,每个线程都会被分配 client,进行 R/W// SelectorThreadGroup selectorThreadGroup = new SelectorThreadGroup(3);// 2、应该把监听(8090)的 server 注册到某一个 selector 上
        selectorThreadGroup.bind(8090);}}

测试单 selector 模型

1、启动主线程 main 方法,控制台输出内容,如下:

SelectorThread-0   :   before select ......0SelectorThread-0   :   after select ......0SelectorThread-0 register server
SelectorThread-0   :   before select ......1

2、nc localhost 8090 模拟客户端来连接服务端进行读、写操作,首先看到的是由当前 SelectorThread 进行 accept,每次都是从队列中取出元素,根据当前元素是属于服务端 channel 还是客户端 channel 进行区分,服务端 channel 则 accept,客户端 channel 则注册 read,以便于客户端从网卡到来的数据在服务端能够进行响应

新客户端到来,并且写入数据:123,控制台输出内容如下:

SelectorThread-0   :   after select ......1SelectorThread-0  acceptHandler.......SelectorThread-0 register client:/0:0:0:0:0:0:0:1:60036SelectorThread-0   :   before select ......2SelectorThread-0   :   after select ......2SelectorThread-0   :   before select ......2SelectorThread-0   :   after select ......2SelectorThread-0  readHandler.......SelectorThread-0   :   before select ......2

从返回的内容来看,当前的 SelectorThread 先是进行 accept 然后执行唤醒 Selector 的操作,此时 select 马上不会进行阻塞直接返回打印的日志内容:before select、after select,不管是不是有 accept、read、write 事件,它都会先遍历一次进行处理,在一定时间差内,你可以看到它打印了两次

若不让它打印两次,可以在 before select 打印以后进行 **Thread.sleep(50);**,但是这种方式是不可取的,它无法应用到高并发的场景下!!!

在正常情况下,客户端只是先进行连接,而不做 R/W 操作,它会一直阻塞在 Selector#select 这个操作下的,只有当客户端从网卡发送了数据,此时 Selector 马上就会通过中断的方式将有状态的事件存在到内核链表中,此时就能获取到 selectKey,而这个 selectKey 是作为读操作存在的,所以会调用 readHandler 进行读和写的操作!!!

小结

小结一下以上单 Selector 线性模型执行的过程:

  • Selector#select:若一直无 FD 事件存在,该方法会一直阻塞,一直不会返回结果,只能通过 Selector#wakeup 方法将 Selector 唤醒
  • accept:通过当前线程所在的 SelectorGroup 将其分配到某个线程的 SelectorThread 下
  • read、write:在 readHandler 方法执行完读操作以后,模拟业务代码,将客户端写入的数据再由服务端写回到客户端!!

每一个线程都是一个单独的 Selector,若在多线程的情况下,以上的程序可能在并发场景下会被分配到多个 Selector 上

注意的要求是:每个客户端只能绑定到一个 Selector 上,不存在多线程顺序交互的问题,简而言之,就是说每个客户端连接进来它都要以线性的方式进行执行

再言之,单线程模型不能充分的利用到多核多 CPU 资源,同时在 100W 客户端进来时,这种模型跑起来会非常的慢,对于一个高并发系统设计而言,是一定不能够被接受的

所以这种模型仍然会存在问题,在下篇会介绍如何去解决这种应用在多线程场景下,客户端不进行乱串执行以及资源有效利用的问题!!

总结

该篇博文主要介绍多路复用模型 Epoll 下单 Selector 多线程与单线程之间的区别,先是说明了在单 Selector 非线性模型下-多线程会造成读、写事件重复触发的问题, 通过图解和 strace 追踪日志的方式说明了它的缺点,解决事件重复触发问题通过 SelectionKey#cannel 来进行解决,莫须有这种方式不可取会造成假死线程|资源停滞不释放问题,后者介绍了单个 Selector 单 Group 解决这种假死资源的存在问题,结合 Selector#wakeup + 链接阻塞队列的方式来完成,在单 Selector 线性模型下是可取的,但是为了应用多核多 CPU 资源,在多线程场景下这种模型会造成一个客户端在多个 Selector 中乱串执行的问题,希望您能够喜欢,感谢三连支持!

参考文献:

  1. 《UNIX网络编程 卷1:套接字联网API(第3版)》— [美] W. Richard Stevens Bill Fenner Andrew M. Rudoff

学习帮助文档:

  • man pages:yum install man
  • pthread man pages:yum -y install man-pages

🌟🌟🌟愿你我都能够在寒冬中相互取暖,互相成长,只有不断积累、沉淀自己,后面有机会自然能破冰而行!

博文放在 网络 I/O 专栏里,欢迎订阅,会持续更新!

如果觉得博文不错,关注我 vnjohn,后续会有更多实战、源码、架构干货分享!

推荐专栏:Spring、MySQL,订阅一波不再迷路

大家的「关注❤️ + 点赞👍 + 收藏⭐」就是我创作的最大动力!谢谢大家的支持,我们下文见!

标签: 网络 Selector I/O

本文转载自: https://blog.csdn.net/vnjohn/article/details/134984999
版权归原作者 vnjohn 所有, 如有侵权,请联系我们删除。

“深入理解网络 I/O:单 Selector 多线程|单线程模型”的评论:

还没有评论