Java实现非阻塞式服务器的示例代码
1.创建阻塞的服务器
当 ServerSocketChannel 与 SockelChannel 采用默认的阻塞模式时,为了同时处理多个客户的连接,必须使用多线程
public class EchoServer { private int port = 8000; private ServerSocketChannel serverSocketChannel = null; private ExecutorService executorService; //线程池 private static final int POOL_MULTIPLE = 4; //线程池中工作线程的数目 public EchoServer() throws IOException { //创建一个线程池 executorService = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() * POOL_MULTIPLE); //创建一个ServerSocketChannel对象 serverSocketChannel = ServerSocketChannel.open(); //使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时,可以顺利绑定相同的端口 serverSocketChannel.socket().setReuseAddress(true); //把服务器进程与一个本地端口绑定 serverSocketChannel.socket().bind(new InetSocketAddress(port)); System.out.println("服务器启动"); } public void service() { while (true) { SocketChannel socketChannel = null; try { socketChannel = serverSocketChannel.accept(); //处理客户连接 executorService.execute(new Handler(socketChannel)); } catch(IOException e) { e.printStackTrace(); } } } public static void main(String args[])throws IOException { new EchoServer().service(); } //处理客户连按 class Handler implements Runnable { private SocketChannel socketChannel; public Handler(SocketChannel socketChannel) { this.socketChannel = socketChannel; } public void run() { handle(socketChannel); } public void handle(SocketChannel socketChannel) { try { //获得与socketChannel关联的Socket对象 Socket socket = socketChannel.socket(); System.out.println("接收到客户连接,来自:" + socket.getInetAddress() + ":" + socket.getPort()); BufferedReader br = getReader(socket); PrintWriter pw = getWriter(socket); String msg = null; while ((msg = br.readLine()) != null) { System.out.println(msg); pw.println(echo(msg)); if (msg.equals("bye")) { break; } } } catch (IOException e) { e.printStackTrace(); } finally { try { if(socketChannel != null) { socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } } } } private PrintWriter getWriter(Socket socket) throws IOException { OutputStream socketOut = socket.getOutputStream(); return new PrintWriter(socketOut,true); } private BufferedReader getReader(Socket socket) throws IOException { InputStream socketIn = socket.getInputStream(); return new BufferedReader(new InputStreamReader(socketIn)); } public String echo(String msg) { return "echo:" + msg; } }
2.创建非阻塞的服务器
在非阻塞模式下,EchoServer 只需要启动一个主线程,就能同时处理三件事:
- 接收客户的连接
- 接收客户发送的数据
- 向客户发回响应数据
EchoServer 委托 Selector 来负责监控接收连接就绪事件、读就绪事件和写就绪事件如果有特定事件发生,就处理该事件
// 创建一个Selector对象 selector = Selector.open(); //创建一个ServerSocketChannel对象 serverSocketChannel = ServerSocketChannel.open(); //使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时 //可以顺利绑定到相同的端口 serverSocketChannel.socket().setReuseAddress(true); //使ServerSocketChannel工作于非阻塞模式 serverSocketChannel.configureBlocking(false): //把服务器进程与一个本地端口绑定 serverSocketChannelsocket().bind(new InetSocketAddress(port));
EchoServer 类的 service() 方法负责处理本节开头所说的三件事,体现其主要流程的代码如下:
public void service() throws IOException { serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT); //第1层while循环 while(selector.select() > 0) { //获得Selector的selected-keys集合 Set readyKeys = selector.selectedKeys(); Iterator it = readyKeys.iterator(); //第2层while循环 while (it.hasNext()) { SelectionKey key = null; //处理SelectionKey try { //取出一个SelectionKey key = (SelectionKey) it.next(); //把 SelectionKey从Selector 的selected-key 集合中删除 it.remove(); 1f (key.isAcceptable()) { 处理接收连接就绪事件; } if (key.isReadable()) { 处理读就绪水件; } if (key.isWritable()) { 处理写就绪事件; } } catch(IOException e) { e.printStackTrace(); try { if(key != null) { //使这个SelectionKey失效 key.cancel(); //关闭与这个SelectionKey关联的SocketChannel key.channel().close(); } } catch(Exception ex) { e.printStackTrace(); } } } } }
- 首先由
ServerSocketChannel
向Selector
注册接收连接就绪事件,如果Selector
监控到该事件发生,就会把相应的SelectionKey
对象加入selected-keys
集合 - 第一层 while 循环,不断询问
Selector
已经发生的事件,select()
方法返回当前相关事件已经发生的SelectionKey
的个数,如果当前没有任何事件发生,该方法会阻塞下去,直到至少有一个事件发生。Selector
的selectedKeys()
方法返回selected-keys
集合,它存放了相关事件已经发生的SelectionKey
对象 - 第二层 while 循环,从
selected-keys
集合中依次取出每个SelectionKey
对象并从集合中删除,,然后调用isAcceptable()
、isReadable()
和isWritable()
方法判断到底是哪种事件发生了,从而做出相应的处理
2.1处理接收连接就绪事件
if (key.isAcceptable()) { //获得与SelectionKey关联的ServerSocketChannel ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); //获得与客户连接的SocketChannel SocketChannel socketChannel = (SocketChannel) ssc.accept(); //把Socketchannel设置为非阻塞模式 socketChannel.configureBlocking(false); //创建一个用于存放用户发送来的数据的级冲区 ByteBuffer buffer = ByteBuffer.allocate(1024); //Socketchannel向Selector注册读就绪事件和写就绪事件 socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer); }
2.2处理读就绪事件
public void receive(SelectionKey key) throws IOException { //获得与SelectionKey关联的附件 ByteBuffer buffer = (ByteBuffer) key.attachment(); //获得与SelectionKey关联的Socketchannel SocketChannel socketChannel = (SocketChannel)key.channel(); //创建一个ByteBuffer用于存放读到的数据 ByteBuffer readBuff = ByteBuffer.allocate(32); socketChannel.read(readBuff); readBuff.flip(); //把buffer的极限设为容量 buffer.limit(buffer.capacity()); //把readBuff中的内容拷贝到buffer buffer.put(readBuff); }
2.3处理写就绪事件
public void send(SelectionKey key) throws IOException { //获得与SelectionKey关联的ByteBuffer ByteBuffer buffer = (ByteBuffer) key.attachment(); //获得与SelectionKey关联的SocketChannel SocketChannel socketChannel = (SocketChannel) key.channel(); buffer.flip(); //按照GBK编码把buffer中的字节转换为字符串 String data = decode(buffer); //如果还没有读到一行数据就返回 if(data.indexOf("\r\n") == -1) return; //截取一行数据 String outputData = data.substring(0, data.indexOf("\n") + 1); //把输出的字符串按照GBK编码转换为字节,把它放在outputBuffer中 ByteBuffer outputBuffer = encode("echo:" + outputData); //输出outputBuffer的所有字节 while(outputBuffer,hasRemaining()) socketChannel.write(outputBuffer); //把outputData字符审按照GBK编码,转换为字节,把它放在ByteBuffer ByteBuffer temp = encode(outputData); //把buffer的位置设为temp的极限 buffer.position(temp.limit()): //删除buffer已经处理的数据 buffer.compact(); //如果已经输出了字符串“bye\r\n”,就使SelectionKey失效,并关闭SocketChannel if(outputData.equals("bye\r\n")) { key.cancel(); socketChannel.close(); } }
完整代码如下:
public class EchoServer { private int port = 8000; private ServerSocketChannel serverSocketChannel = null; private Selector selector; private Charset charset = Charset.forName("GBK"); public EchoServer() throws IOException { // 创建一个Selector对象 selector = Selector.open(); //创建一个ServerSocketChannel对象 serverSocketChannel = ServerSocketChannel.open(); //使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时 //可以顺利绑定到相同的端口 serverSocketChannel.socket().setReuseAddress(true); //使ServerSocketChannel工作于非阻塞模式 serverSocketChannel.configureBlocking(false): //把服务器进程与一个本地端口绑定 serverSocketChannelsocket().bind(new InetSocketAddress(port)); } public void service() throws IOException { serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT); //第1层while循环 while(selector.select() > 0) { //获得Selector的selected-keys集合 Set readyKeys = selector.selectedKeys(); Iterator it = readyKeys.iterator(); //第2层while循环 while (it.hasNext()) { SelectionKey key = null; //处理SelectionKey try { //取出一个SelectionKey key = (SelectionKey) it.next(); //把 SelectionKey从Selector 的selected-key 集合中删除 it.remove(); 1f (key.isAcceptable()) { //获得与SelectionKey关联的ServerSocketChannel ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); //获得与客户连接的SocketChannel SocketChannel socketChannel = (SocketChannel) ssc.accept(); //把Socketchannel设置为非阻塞模式 socketChannel.configureBlocking(false); //创建一个用于存放用户发送来的数据的级冲区 ByteBuffer buffer = ByteBuffer.allocate(1024); //Socketchannel向Selector注册读就绪事件和写就绪事件 socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer); } if (key.isReadable()) { receive(key); } if (key.isWritable()) { send(key); } } catch(IOException e) { e.printStackTrace(); try { if(key != null) { //使这个SelectionKey失效 key.cancel(); //关闭与这个SelectionKey关联的SocketChannel key.channel().close(); } } catch(Exception ex) { e.printStackTrace(); } } } } } public void receive(SelectionKey key) throws IOException { //获得与SelectionKey关联的附件 ByteBuffer buffer = (ByteBuffer) key.attachment(); //获得与SelectionKey关联的Socketchannel SocketChannel socketChannel = (SocketChannel)key.channel(); //创建一个ByteBuffer用于存放读到的数据 ByteBuffer readBuff = ByteBuffer.allocate(32); socketChannel.read(readBuff); readBuff.flip(); //把buffer的极限设为容量 buffer.limit(buffer.capacity()); //把readBuff中的内容拷贝到buffer buffer.put(readBuff); } public void send(SelectionKey key) throws IOException { //获得与SelectionKey关联的ByteBuffer ByteBuffer buffer = (ByteBuffer) key.attachment(); //获得与SelectionKey关联的SocketChannel SocketChannel socketChannel = (SocketChannel) key.channel(); buffer.flip(); //按照GBK编码把buffer中的字节转换为字符串 String data = decode(buffer); //如果还没有读到一行数据就返回 if(data.indexOf("\r\n") == -1) return; //截取一行数据 String outputData = data.substring(0, data.indexOf("\n") + 1); //把输出的字符串按照GBK编码转换为字节,把它放在outputBuffer中 ByteBuffer outputBuffer = encode("echo:" + outputData); //输出outputBuffer的所有字节 while(outputBuffer,hasRemaining()) socketChannel.write(outputBuffer); //把outputData字符审按照GBK编码,转换为字节,把它放在ByteBuffer ByteBuffer temp = encode(outputData); //把buffer的位置设为temp的极限 buffer.position(temp.limit()): //删除buffer已经处理的数据 buffer.compact(); //如果已经输出了字符串“bye\r\n”,就使SelectionKey失效,并关闭SocketChannel if(outputData.equals("bye\r\n")) { key.cancel(); socketChannel.close(); } } //解码 public String decode(ByteBuffer buffer) { CharBuffer charBuffer = charset.decode(buffer); return charBuffer.toStrinq(); } //编码 public ByteBuffer encode(String str) { return charset.encode(str); } public static void main(String args[])throws Exception { EchoServer server = new EchoServer(); server.service(); } }
3.阻塞模式与非阻塞模式混合使用
使用非阻塞模式时,ServerSocketChannel 以及 SocketChannel 都被设置为非阻塞模式,这使得接收连接、接收数据和发送数据的操作都采用非阻塞模式,EchoServer 采用一个线程同时完成这些操作
假如有许多客户请求连接,可以把接收客户连接的操作单独由一个线程完成,把接收数据和发送数据的操作由另一个线程完成,这可以提高服务器的并发性能
负责接收客户连接的线程按照阻塞模式工作,如果收到客户连接,就向 Selector 注册读就绪和写就绪事件,否则进入阻塞状态,直到接收到了客户的连接。负责接收数据和发送数据的线程按照非阻塞模式工作,只有在读就绪或写就绪事件发生时,才执行相应的接收数据和发送数据操作
public class EchoServer { private int port = 8000; private ServerSocketChannel serverSocketChannel = null; private Selector selector = null; private Charset charset = Charset.forName("GBK"); public EchoServer() throws IOException { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().setReuseAddress(true); serverSocketChannelsocket().bind(new InetSocketAddress(port)); } public void accept() { while(true) { try { SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); ByteBuffer buffer = ByteBuffer.allocate(1024); synchronized(gate) { selector.wakeup(); socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer); } } catch(IOException e) { e.printStackTrace(); } } } private Object gate=new Object(); public void service() throws IOException { while(true) { synchronized(gate){} int n = selector.select(); if(n == 0) continue; Set readyKeys = selector.selectedKeys(); Iterator it = readyKeys.iterator(); while (it.hasNext()) { SelectionKey key = null; try { it.remove(); if (key.isReadable()) { receive(key); } if (key.isWritable()) { send(key); } } catch(IOException e) { e.printStackTrace(); try { if(key != null) { key.cancel(); key.channel().close(); } } catch(Exception ex) { e.printStackTrace(); } } } } } public void receive(SelectionKey key) throws IOException { ... } public void send(SelectionKey key) throws IOException { ... } public String decode(ByteBuffer buffer) { ... } public ByteBuffer encode(String str) { ... } public static void main(String args[])throws Exception { final EchoServer server = new EchoServer(); Thread accept = new Thread() { public void run() { server.accept(); } }; accept.start(); server.service(); } }
注意一点:主线程的 selector select() 方法和 Accept 线程的 register(...) 方法都会造成阻塞,因为他们都会操作 Selector 对象的共享资源 all-keys 集合,这有可能会导致死锁
导致死锁的具体情形是:Selector 中尚没有任何注册的事件,即 all-keys 集合为空,主线程执行 selector.select() 方法时将进入阻塞状态,只有当 Accept 线程向 Selector 注册了事件,并且该事件发生后,主线程才会从 selector.select() 方法返回。然而,由于主线程正在 selector.select() 方法中阻塞,这使得 Acccept 线程也在 register() 方法中阻塞。Accept 线程无法向 Selector 注册事件,而主线程没有任何事件可以监控,所以这两个线程将永远阻塞下去
为了避免对共享资源的竞争,同步机制使得一个线程执行 register() 时,不允许另一个线程同时执行 select() 方法,反之亦然
到此这篇关于Java实现非阻塞式服务器的示例代码的文章就介绍到这了,更多相关Java服务器内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
SparkStreaming-Kafka通过指定偏移量获取数据实现
这篇文章主要为大家介绍了SparkStreaming-Kafka通过指定偏移量获取数据,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪2023-06-06SpringCloud Eureka Provider及Consumer的实现
这篇文章主要介绍了SpringCloud Eureka 提供者及调用者的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下2019-10-10Spring Boot中使用JDBC Templet的方法教程
这篇文章主要给大家介绍了关于在Spring Boot中使用JDBC Templet的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧。2018-03-03基于@MapperScan和@ComponentScan的使用区别
这篇文章主要介绍了@MapperScan和@ComponentScan的使用区别,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2021-09-09
最新评论