Java中的异步非阻塞AIO模型详解

 更新时间:2023年09月19日 10:20:34   作者:超大充电宝  
这篇文章主要介绍了Java中的异步非阻塞AIO模型详解,AIO需要操作系统的支持,在linux内核2.6版本中加入了对真正异步IO的支持,java从jdk1.7开始支持AIO,本文提供了部分实现代码,需要的朋友可以参考下

1、AIO:异步非阻塞简介

AIO需要操作系统的支持,在linux内核2.6版本中加入了对真正异步IO的支持,java从jdk1.7开始支持AIO

核心类有AsynchronousSocketChannel 、AsynchronousServerSocketChannel、AsynchronousChannelGroup

AsynchronousChannelGroup是异步Channel的分组管理器,它可以实现资源共享。

创建AsynchronousChannelGroup时,需要传入一个ExecutorService,也就是绑定一个线程池,该线程池负责两个任务:处理IO事件和触发CompletionHandler回调接口。

2、AsynchronousServerSocketChannel:AIO中网络通信服务端Socket

accept() 方法: AsynchronousServerSocketChannel创建成功后,类似于ServerSocket,也是调用 accept() 方法来接受来自客户端的连接, 由于异步IO实际的IO操作是交给操作系统来做的,用户进程只负责通知操作系统进行IO和接受操作系统IO完成的通知。 所以异步的ServerChannel调用 accept() 方法后,当前线程不会阻塞, 程序也不知道accept()方法什么时候能够接收到客户端请求并且操作系统完成网络IO, 为解决这个问题,AIO为accept()方法提供两个版本:

Future<AsynchronousSocketChannel> accept() :
开始接收客户端请求,如果当前线程需要进行网络IO(即获得AsynchronousSocketChannel),则应该调用该方法返回的Future对象的get()方法,但是get()方法会阻塞该线程,所以这种方式是阻塞式的异步IO。

<&nbsp;A&nbsp;> void accept (Aattachment,CompletionHandler<AsynchronousSocketChannel,? super A> handler):
开始接受来自客户端请求,连接成功或失败都会触发CompletionHandler对象的相应方法。
其中AsynchronousSocketChannel就代表该CompletionHandler处理器在处理连接成功时的result是AsynchronousSocketChannel的实例。
而CompletionHandler接口中定义了两个方法,
completed(V result , A attachment):当IO完成时触发该方法,该方法的第一个参数代表IO操作返回的对象,
第二个参数代表发起IO操作时传入的附加参数。
faild(Throwable exc, A attachment):当IO失败时触发该方法,第一个参数代表IO操作失败引发的异常或错误。

3、AIO编程

服务端

public class AioServer {
    private static int DEFAULT_PORT = 12345;
    private static ServerHandler serverHandle;
    public volatile static long clientCount = 0;
    public static void start(){
        start(DEFAULT_PORT);
    }
    public static synchronized void start(int port){
        if(serverHandle!=null)
            return;
        serverHandle = new ServerHandler(port);
        new Thread(serverHandle,"Server").start();
    }
    public static void main(String[] args) {
        AioServer.start();
    }
}
public class ServerHandler implements Runnable{
    private AsynchronousServerSocketChannel channel;
    public ServerHandler(int port) {
        try {
            //创建服务端通道
            channel = AsynchronousServerSocketChannel.open();
            //绑定端口
            channel.bind(new InetSocketAddress(port));
            System.out.println("服务端已启动,端口号:"+port);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void run() {
        channel.accept(this, new AcceptHandler());
//        Future <AsynchronousSocketChannel> accept = channel.accept();
        //该步操作是异步操作 防止当前线程直接执行结束
        //方案1: while(true)+sleep
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
//        //方案2 CountDownLatch 作用:在完成一组正在执行的操作之前,允许当前的现场一直阻塞 此处,让现场在此阻塞,防止服务端执行完成后退出
//
//        CountDownLatch count = new CountDownLatch(1);
//        channel.accept(this, new AcceptHandler());
//        try {
//            count.await();
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
    }
    // CompletionHandler<V,A>
    // V-IO操作的结果,这里是成功建立的连接,AsynchronousSocketChannel
   // A-IO操作附件,这里传入AsynchronousServerSocketChannel便于继续接收请求建立新连接
    class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, ServerHandler> {
        @Override
        public void completed(AsynchronousSocketChannel channel, ServerHandler serverHandler) {
            //创建新的Buffer
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            //异步读  第三个参数为接收消息回调的业务Handler
//            channel.read(buffer, buffer, new ReadHandler(channel));
            //继续接受其他客户端请求
            serverHandler.channel.accept(null, this);
        }
        @Override
        public void failed(Throwable exc, ServerHandler serverHandler) {
            exc.printStackTrace();
        }
    }
    class ReadHandler implements CompletionHandler<ByteBuffer, ByteBuffer> {
        //用户读取或者发送消息的channel
        private AsynchronousSocketChannel channel;
        public ReadHandler(AsynchronousSocketChannel channel) {
            this.channel = channel;
        }
        @Override
        public void completed(ByteBuffer result, ByteBuffer attachment) {
            result.flip();
            byte[] msg = new byte[result.remaining()];
            result.get(msg);
            try {
                String expression = new String(msg, "UTF-8");
                System.out.println("服务器收到消息: " + expression);
//                String result1 = "服务端收到消息\n";
                result.clear();
                //向客户端发送消息
                doWrite(expression);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        //发送消息
        private void doWrite(String msg) {
            byte[] bytes = msg.getBytes();
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            buffer.put(bytes);
            buffer.flip();
            //异步写数据
            channel.write(buffer, buffer, new CompletionHandler <Integer, ByteBuffer>() {
                @Override
                public void completed(Integer result, ByteBuffer attachment) {
                    //如果没有发送完,继续发送
                    if (attachment.hasRemaining()) {
                        channel.write(attachment, attachment, this);
                    } else {
                        //创建新的Buffer
                        ByteBuffer allocate = ByteBuffer.allocate(1024);
                        //异步读 第三个参数为接收消息回调的业务Handler
//                        channel.read(allocate, attachment, new ReadHandler(channel));
                    }
                }
                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    exc.printStackTrace();
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            exc.printStackTrace();
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

客户端

public class AioClient {
    private static String DEFAULT_HOST = "127.0.0.1";
    private static int DEFAULT_PORT = 12345;
    private static ClientHandler clientHandle;
    public static void start(){
        start(DEFAULT_HOST,DEFAULT_PORT);
    }
    public static synchronized void start(String ip,int port){
        if(clientHandle!=null)
            return;
        clientHandle = new ClientHandler(ip,port);
        new Thread(clientHandle,"Client").start();
    }
    //向服务器发送消息
    public static boolean sendMsg(String msg) throws Exception{
        if(msg.equals("exit")) return false;
        clientHandle.sendMsg(msg);
        return true;
    }
    public static void main(String[] args) throws Exception{
        AioClient.start();
        System.out.println("请输入请求消息:");
        Scanner scanner = new Scanner(System.in);
        while(AioClient.sendMsg(scanner.nextLine()));
    }
}
public class ClientHandler implements Runnable{
    private AsynchronousSocketChannel clientChannel;
    private String host;
    private int port;
    private CountDownLatch latch;
    public ClientHandler(String host, int port) {
        this.host = host;
        this.port = port;
        try {
            //创建异步的客户端通道
            clientChannel = AsynchronousSocketChannel.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void run() {
        //创建CountDownLatch等待
//        latch = new CountDownLatch(1);
        //发起异步连接操作,回调参数就是这个类本身,如果连接成功会回调completed方法
        clientChannel.connect(new InetSocketAddress(host, port), this, new AcceptHandler());
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
//        try {
//            latch.await();
//        } catch (InterruptedException e1) {
//            e1.printStackTrace();
//        }
//        try {
//            clientChannel.close();
//        } catch (IOException e) {
//            e.printStackTrace();
//        }
    }
    //向服务器发送消息
    public void sendMsg(String msg){
        byte[] req = msg.getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        System.out.println(">>>>>>msg:"+msg);
        writeBuffer.put(req);
        writeBuffer.flip();
        //异步写
        clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel));
    }
    /**
     * 接收类
     */
    class AcceptHandler implements CompletionHandler<Void, ClientHandler> {
        public AcceptHandler() {}
        @Override
        public void completed(Void result, ClientHandler attachment) {
            System.out.println("连接服务器成功");
    }
        @Override
        public void failed(Throwable exc, ClientHandler attachment) {
            exc.printStackTrace();
            try {
                attachment.clientChannel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
        private AsynchronousSocketChannel channel;
        public WriteHandler(AsynchronousSocketChannel channel) {
            this.channel = channel;
        }
        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            //完成全部数据的写入
            if (attachment.hasRemaining()) {
                //数据没有写完,继续写
                System.out.println("WriteHandler.hasRemaining>>>>>");
                clientChannel.write(attachment, attachment, this);
            } else {
                //读取数据
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                clientChannel.read(readBuffer, readBuffer, new ReadHandler(clientChannel));
            }
        }
        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            exc.printStackTrace();
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
        private AsynchronousSocketChannel clientChannel;
        public ReadHandler(AsynchronousSocketChannel clientChannel) {
            this.clientChannel = clientChannel;
        }
        @Override
        public void completed(Integer result,ByteBuffer buffer) {
            buffer.flip();
            byte[] bytes = new byte[buffer.remaining()];
            buffer.get(bytes);
            String body;
            try {
                body = new String(bytes,"UTF-8");
                System.out.println("客户端收到结果:"+ body);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void failed(Throwable exc,ByteBuffer attachment) {
            System.err.println("数据读取失败...");
            try {
                clientChannel.close();
            } catch (IOException e) {
            }
        }
    }
}

到此这篇关于Java中的异步非阻塞AIO模型详解的文章就介绍到这了,更多相关Java的AIO模型内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • SpringBoot实现WebSocket即时通讯的示例代码

    SpringBoot实现WebSocket即时通讯的示例代码

    本文主要介绍了SpringBoot实现WebSocket即时通讯的示例代码,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-04-04
  • 基于 SpringBoot 实现 MySQL 读写分离的问题

    基于 SpringBoot 实现 MySQL 读写分离的问题

    这篇文章主要介绍了基于 SpringBoot 实现 MySQL 读写分离的问题,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-02-02
  • JavaSE递归求解汉诺塔问题的思路与方法

    JavaSE递归求解汉诺塔问题的思路与方法

    递归是一种非常重要的算法思想,无论你是前端开发,还是后端开发,都需要掌握它,下面这篇文章主要给给大家介绍了关于JavaSE递归求解汉诺塔问题的思路与方法,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2022-08-08
  • 一篇文章教你将JAVA的RabbitMQz与SpringBoot整合

    一篇文章教你将JAVA的RabbitMQz与SpringBoot整合

    这篇文章主要介绍了如何将JAVA的RabbitMQz与SpringBoot整合,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2021-09-09
  • java学习之JVM运行时常量池理解

    java学习之JVM运行时常量池理解

    这篇文章主要介绍了java学习之JVM运行时常量池理解,对常量池的好处以及基本类型的包装类常量池等作了简要分析,有需要的朋友可以借鉴参考下
    2021-09-09
  • 使用Spring AOP实现用户操作日志功能

    使用Spring AOP实现用户操作日志功能

    这篇文章主要介绍了使用Spring AOP实现了用户操作日志功能,功能实现需要一张记录日志的log表,结合示例代码给大家讲解的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2022-05-05
  • Java 将List中的实体类按照某个字段进行分组并存放至Map中操作

    Java 将List中的实体类按照某个字段进行分组并存放至Map中操作

    这篇文章主要介绍了Java 将List中的实体类按照某个字段进行分组并存放至Map中操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-10-10
  • java中for循环删除集合陷阱

    java中for循环删除集合陷阱

    java中在增强for循环为什么不能增删集合呢?一个循环迭代,跟集合的增删改没什么关系。修改集合不是for去做的。for只管循环迭代,你在循环里边修改集合,改变集合的长度,顺序对循环都有影响
    2015-05-05
  • 详解Java中的泛型

    详解Java中的泛型

    这篇文章主要介绍了Java中的泛型,当我们不确定数据类型时,我们可以暂时使用一个字母 T代替数据类型,例如写一个方法,但是我们不知道它是传递的是什么数据类型,我们就可以使用泛型,到时候只要指明T是什么数据类型,就可以使用了,需要的朋友可以参考下
    2023-05-05
  • Java中==与equals()及hashcode()三者之间的关系详解

    Java中==与equals()及hashcode()三者之间的关系详解

    最近也是在读Hollis的《深入理解Java核心技术》里面一节讲到了equals()和hashcode()的关系,对于这个高频面试点,咱们需要认真理清一下几者之间的关系
    2022-10-10

最新评论