Java中Reactor的反应器模式详解

 更新时间:2023年12月13日 09:41:21   作者:得过且过的勇者y  
这篇文章主要介绍了Java中Reactor的反应器模式详解,Reactor反应器模式有点儿类似事件驱动模式,当有事件触发时,事件源会将事件dispatch分发到handler处理器进行事件处理,反应器模式中的反应器角色类似于事件驱动模式中的dispatcher事件分发器角色,需要的朋友可以参考下

前言

在Java的OIO编程中,最初和最原始的网络服务器程序使用一个while循环,不断地监听端口是否有新的连接,如果有就调用一个处理函数来处理。这种方法最大的问题就是如果前一个网络连接的处理没有结束,那么后面的连接请求没法被接收,于是后面的请求统统会被阻塞住,服务器的吞吐量就太低了。

为了解决这个严重的连接阻塞问题,出现了一个即为经典模式:Connection Per Thread。即对于每一个新的网络连接都分配一个线程,每个线程都独自处理自己负责的输入和输出,任何socket连接的输入和输出处理不会阻塞到后面新socket连接的监听和建立。早期版本的Tomcat服务器就是这样实现的。

这种模式的优点是解决了前面的新连接被严重阻塞的问题,在一定程度上极大地提高了服务器的吞吐量。但是对于大量的连接,需要消耗大量的现成资源,如果线程数太多,系统无法承受。而且线程的反复创建、销毁、线程的切换也需要代价。因此高并发应用场景下多线程OIO的缺陷是致命的,因此引入了Reactor反应器模式。

反应器模式由Reactor反应器线程、Handlers处理器两大角色组成:

  • Reactor反应器线程的职责:负责响应IO事件,并且分发到Handlers处理器
  • Handlers处理器的职责:非阻塞的执行业务处理逻辑

一、单线程Reactor反应器模式

Reactor反应器模式有点儿类似事件驱动模式,当有事件触发时,事件源会将事件dispatch分发到handler处理器进行事件处理。反应器模式中的反应器角色类似于事件驱动模式中的dispatcher事件分发器角色。

  • Reactor反应器:负责查询IO事件,当检测到一个IO时间,将其发送给对应的Handler处理器处理,这里的IO事件就是NIO选择器监控的通道IO事件。
  • Handler处理器:与IO事件绑定,负责IO事件的处理,完成真正的连接建立、通道的读取、处理业务逻辑、负责将结果写出到通道等。

基于NIO实现单线程版本的反应器模式需要用到SelectionKey选择键的几个重要的成员方法:

  1. void attach(Object o):将任何的Java对象作为附件添加到SelectionKey实例,主要是将Handler处理器实例作为附件添加到SelectionKey实例
  2. Object attachment():取出之前通过attach添加到SelectionKey选择键实例的附件,一般用于取出绑定的Handler处理器实例。

Reactor实现示例:

package cn.ken.jredis;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;
/**
 * <pre>
 *
 * </pre>
 *
 * @author <a href="https://github.com/Ken-Chy129" rel="external nofollow"  rel="external nofollow"  rel="external nofollow"  rel="external nofollow" >Ken-Chy129</a>
 * @since 2023/10/14 14:29
 */
public class Reactor implements Runnable {
    final private Selector selector;
    final private ServerSocketChannel serverSocketChannel;
    public Reactor() {
        try {
            this.selector = Selector.open();
            this.serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(8088));
            // 注册ServerSocket的accept事件
            SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            // 为事件绑定处理器
            sk.attach(new AcceptHandler());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                for (SelectionKey selectedKey : selectionKeys) {
                    dispatch(selectedKey);
                }
                selectionKeys.clear();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    private void dispatch(SelectionKey selectedKey) {
        Runnable handler = (Runnable) selectedKey.attachment();
        // 此处返回的可能是AcceptHandler也可能是IOHandler
        handler.run();
    }
    class AcceptHandler implements Runnable {
        @Override
        public void run() {
            try {
                SocketChannel socketChannel = serverSocketChannel.accept();
                if (socketChannel != null) {
                    new IOHandler(selector, socketChannel); // 注册IO处理器,并将连接加入select列表
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
    public static void main(String[] args) {
        new Reactor().run();
    }
}

Handler实现示例:

package cn.ken.jredis;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
/**
 * <pre>
 *
 * </pre>
 *
 * @author <a href="https://github.com/Ken-Chy129" rel="external nofollow"  rel="external nofollow"  rel="external nofollow"  rel="external nofollow" >Ken-Chy129</a>
 * @since 2023/10/14 14:53
 */
public class IOHandler implements Runnable {
    final private SocketChannel socketChannel;
    final private ByteBuffer buffer;
    public IOHandler(Selector selector, SocketChannel channel) {
        buffer = ByteBuffer.allocate(1024);
        socketChannel = channel;
        try {
            channel.configureBlocking(false);
            SelectionKey sk = channel.register(selector, 0); // 此处没有注册感兴趣的事件
            sk.attach(this);
            sk.interestOps(SelectionKey.OP_READ); // 注册感兴趣的事件,下一次调用select时才生效
            selector.wakeup(); // 立即唤醒当前阻塞select操作,使得迅速进入下次select,从而让上面注册的读事件监听可以立即生效
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
    @Override
    public void run() {
        try {
            int length;
            while ((length = socketChannel.read(buffer)) > 0) {
                System.out.println(new String(buffer.array(), 0, length));
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

在单线程反应器模式中,Reactor反应器和Handler处理器都执行在同一条线程上(dispatch方法是直接调用run方法,没有创建新的线程),因此当其中某个Handler阻塞时,会导致其他所有的Handler都得不到执行。

二、多线程Reactor反应器模式

既然Reactor反应器和Handler处理器在一个线程会造成非常严重的性能缺陷,那么可以使用多线程对基础的反应器模式进行改造。

  1. 将负责输入输出处理的IOHandler处理器的执行,放入独立的线程池中。这样业务处理线程与负责服务监听和IO时间查询的反应器线程相隔离,避免服务器的连接监听收到阻塞。
  2. 如果服务器为多核的CPU,可以将反应器线程拆分为多个子反应器线程,同时引入多个选择器,每一个SubReactor子线程负责一个选择器。

MultiReactor:

package cn.ken.jredis;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * <pre>
 *
 * </pre>
 *
 * @author <a href="https://github.com/Ken-Chy129" rel="external nofollow"  rel="external nofollow"  rel="external nofollow"  rel="external nofollow" >Ken-Chy129</a>
 * @since 2023/10/14 16:51
 */
public class MultiReactor {
    private final ServerSocketChannel server;
    private final Selector[] selectors = new Selector[2];
    private final SubReactor[] reactors = new SubReactor[2];
    private final AtomicInteger index = new AtomicInteger(0);
    public MultiReactor() {
        try {
            server = ServerSocketChannel.open();
            selectors[0] = Selector.open();
            selectors[1] = Selector.open();
            server.bind(new InetSocketAddress(8080));
            server.configureBlocking(false);
            SelectionKey register = server.register(selectors[0], SelectionKey.OP_ACCEPT);
            register.attach(new AcceptHandler());
            reactors[0] = new SubReactor(selectors[0]);
            reactors[1] = new SubReactor(selectors[1]);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
    private void startService() {
        new Thread(reactors[0]).start();
        new Thread(reactors[1]).start();
    }
    class SubReactor implements Runnable {
        final private Selector selector;
        public SubReactor(Selector selector) {
            this.selector = selector;
        }
        @Override
        public void run() {
            while (!Thread.interrupted()) {
                try {
                    selector.select();
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    for (SelectionKey selectionKey : selectionKeys) {
                        dispatch(selectionKey);
                    }
                    selectionKeys.clear();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }
    private void dispatch(SelectionKey selectionKey) {
        Runnable attachment = (Runnable) selectionKey.attachment();
        if (attachment != null) {
            attachment.run();
        }
    }
    class AcceptHandler implements Runnable {
        @Override
        public void run() {
            try {
                SocketChannel socketChannel = server.accept();
                new MultiHandler(selectors[index.getAndIncrement()], socketChannel);
                if (index.get() == selectors.length) {
                    index.set(0);
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

MultiHandler:

package cn.ken.jredis;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * <pre>
 *
 * </pre>
 *
 * @author <a href="https://github.com/Ken-Chy129" rel="external nofollow"  rel="external nofollow"  rel="external nofollow"  rel="external nofollow" >Ken-Chy129</a>
 * @since 2023/10/14 17:28
 */
public class MultiHandler implements Runnable {
    final private Selector selector;
    final private SocketChannel channel;
    final ByteBuffer buffer = ByteBuffer.allocate(1024);
    static ExecutorService pool = Executors.newFixedThreadPool(4);
    public MultiHandler(Selector selector, SocketChannel channel) {
        this.selector = selector;
        this.channel = channel;
        try {
            channel.configureBlocking(false);
            SelectionKey register = channel.register(selector, SelectionKey.OP_READ);
            register.attach(this);
            selector.wakeup();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
    @Override
    public void run() {
        pool.execute(() -> {
            synchronized (this) {
                int length;
                try {
                    while ((length = channel.read(buffer)) > 0) {
                        System.out.println(new String(buffer.array(), 0, length));
                        buffer.clear();
                    }
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }   
        });
    }
}

到此这篇关于Java中Reactor的反应器模式详解的文章就介绍到这了,更多相关Reactor反应器模式内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • SpringMVC使用自定义验证器进行数据验证的方法

    SpringMVC使用自定义验证器进行数据验证的方法

    SpringMVC 提供了强大的数据验证机制,可以方便地验证表单提交的数据,除了自带的验证器之外,SpringMVC 还支持自定义验证器,允许开发者根据业务需求自定义验证规则,本文将介绍如何在 SpringMVC 中使用自定义验证器
    2023-07-07
  • Java实现数组翻转的实现代码

    Java实现数组翻转的实现代码

    这篇文章主要介绍了Java实现数组翻转的实现代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-09-09
  • IDEA 报Plugin'maven-resources-plugin:'not found 的解决方案

    IDEA 报Plugin'maven-resources-plugin:'not found 

    如果在使用 IDEA 时遇到 "Plugin 'maven-resources-plugin:' not found" 错误,可能是由于 Maven 仓库中未找到所需的 Maven 插件,近小编给大家分享几种解决方法,感兴趣的朋友跟随小编一起看看吧
    2023-07-07
  • mysql+spring+mybatis实现数据库读写分离的代码配置

    mysql+spring+mybatis实现数据库读写分离的代码配置

    今天小编就为大家分享一篇关于mysql+spring+mybatis实现数据库读写分离的代码配置,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2019-03-03
  • java+selenium 网易云音乐刷累计听歌数的方法

    java+selenium 网易云音乐刷累计听歌数的方法

    这篇文章主要介绍了java+selenium 网易云音乐刷累计听歌数的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-06-06
  • SpringBoot3集成iText实现PDF导出功能

    SpringBoot3集成iText实现PDF导出功能

    不知道小伙伴们在项目中有没有遇到过导出 PDF 的需求,小编在之前的 tienchin 项目中有一个合同导出的功能,需要将文档导出为PDF,将文档导出为 PDF 有很多方案,不同方案的优缺点也各不相同,今天小编就和大家演示一个,感兴趣的小伙伴跟着小编一起来看看吧
    2024-10-10
  • Java继承extends与super关键字详解

    Java继承extends与super关键字详解

    本篇文章给大家详细讲述了Java继承extends与super关键字的相关知识点,需要的朋友们可以参考学习下。
    2018-02-02
  • 五分钟教你手写 SpringBoot 本地事务管理实现

    五分钟教你手写 SpringBoot 本地事务管理实现

    这篇文章主要介绍了五分钟教你手写 SpringBoot 本地事务管理实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-02-02
  • Java代码注释规范详解

    Java代码注释规范详解

    代码附有注释对程序开发者来说非常重要,随着技术的发展,在项目开发过程中,必须要求程序员写好代码注释,这样有利于代码后续的编写和使用。下面给大家分享java代码注释的规范,需要的朋友参考下
    2016-02-02
  • jtds1.1连接sqlserver2000测试示例

    jtds1.1连接sqlserver2000测试示例

    这篇文章主要介绍了jtds1.1连接sqlserver2000测试示例,需要的朋友可以参考下
    2014-02-02

最新评论