前言
NIO(Non-blocking I/O)是在 Java 1.4 中引入了的框架,是一种同步非阻塞的I/O模型,也是I/O多路复用的基础,提供了更接近操作系统底层的高性能数据操作方式。
Netty 实际上就基于 Java NIO 技术封装完善之后得到一个高性能框架,熟悉 NIO 的基本概念对于学习和更好地理解 Netty 还是很有必要的,之后会出一篇关于Netty的文章。
基本概念
- 区分同步或异步。同步是一种可靠的有序运行机制,进行同步操作时,后续的任务是等待当前调用返回,才会进行下一步;而异步则相反,其他任务不需要等待当前调用返回,通常依靠事件、回调等机制来实现任务间次序关系。
- 区分阻塞与非阻塞。进行阻塞操作时,当前线程会处于阻塞状态,无法从事其他任务,只有当条件就绪才能继续;而非阻塞则是不管 IO 操作是否结束,直接返回,相应操作在后台继续处理。
NIO 主要组成部分
Buffer,高效的数据容器,除了布尔类型,所有原始数据类型都有相应的 Buffer 实现。
Channel,是 NIO 中被用来支持批量式 IO 操作的一种抽象。
可以把Buffer理解为集装箱,而Channel就是货车,用来运输Buffer的。
Selector,是 NIO 实现多路复用的基础,它提供了一种高效的机制,可以检测到注册在 Selector 上的多个 Channel 中,是否有 Channel 处于就绪状态,进而实现了单线程对多 Channel 的高效管理。
简单实现
实现一个最简单的服务器应用,要求能够同时服务多个客户端请求即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class SimpleServer extends Thread { private ServerSocket serverSocket;
public int getPort() { return serverSocket.getLocalPort(); }
@SneakyThrows @Override public void run() { try { serverSocket = new ServerSocket(0); while (true) { Socket socket = serverSocket.accept(); RequestHandler requestHandler = new RequestHandler(socket); requestHandler.start(); } } finally { if (serverSocket != null) serverSocket.close(); } } }
|
@SneakyThrows使用了Lombok插件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class RequestHandler extends Thread { private final Socket socket;
RequestHandler(Socket socket) { this.socket = socket; }
@SneakyThrows @Override public void run() { try (PrintWriter out = new PrintWriter(socket.getOutputStream())) { out.println("Hello NIO!"); out.flush(); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public class SimpleDemo { public static void main(String[] args) throws IOException, InterruptedException {
SimpleServer server = new SimpleServer(); server.start();
TimeUnit.SECONDS.sleep(1); try (Socket client = new Socket(InetAddress.getLocalHost(), server.getPort())) { BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(client.getInputStream())); bufferedReader.lines().forEach(System.out::println); } } }
|
运行 SimpleDemo,一个简单的 Socket 服务器就被实现出来了。
上面的代码中每一个 Client 启动一个线程似乎都有些浪费。使用线程池的机制来优化下。
1 2 3 4 5 6 7 8 9 10 11
| @SneakyThrows @Override public void run() { serverSocket = new ServerSocket(0); ExecutorService executor = Executors.newFixedThreadPool(12); while (true) { Socket socket = serverSocket.accept(); RequestHandler requestHandler = new RequestHandler(socket); executor.execute(requestHandler); } }
|
通过一个固定大小的线程池,来负责管理工作线程,避免频繁创建、销毁线程的开销。如果连接数并不是非常多,只有最多几百个连接的普通应用,这种模式往往可以工作的很好。但是,如果连接数量急剧上升,这种实现方式就无法很好地工作了,因为线程上下文切换开销会在高并发时变得很明显。
完整实现
NIO 引入的多路复用机制,提供了另外一种思路。
将 Channel 注册到 Selector 中。
Selector 类似调度员的角色,会去轮询处于就绪状态的Channel。
调用 Selector 的 select() 方法,这个方法会阻塞,当有 Channel 发生接入请求,就会被唤醒。
通过 SelectionKey 可以获取就绪 Channel 的集合,进行后续的 I/O 操作。
通过 SocketChannel 和 Buffer 进行数据操作。
Server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| public class NIOServer extends Thread {
@SneakyThrows @Override public void run() { try (Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {
serverSocketChannel.bind(new InetSocketAddress(InetAddress.getLocalHost(), 8888)); serverSocketChannel.configureBlocking(false); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { selector.select(5000);
Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> iter = selectedKeys.iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); SocketChannel clientChannel; if (key.isAcceptable()) { ServerSocketChannel channel = (ServerSocketChannel) key.channel(); clientChannel = channel.accept(); clientChannel.configureBlocking(false); clientChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); } else if (key.isReadable()) { clientChannel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(3); clientChannel.read(byteBuffer); String msg = new String(byteBuffer.array()); System.out.println("server:" + msg); key.interestOps(SelectionKey.OP_WRITE); } else if (key.isWritable()) { clientChannel = (SocketChannel) key.channel(); clientChannel.write(Charset.defaultCharset().encode("Hello NIO")); key.interestOps(SelectionKey.OP_READ); } iter.remove(); } } } } }
|
Client
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class NIODemo { public static void main(String[] args) throws IOException, InterruptedException {
NIOServer server = new NIOServer(); server.start(); TimeUnit.MILLISECONDS.sleep(500);
List<String> msgList = Arrays.asList("WWW", "AMD", "YES");
try (Socket client = new Socket(InetAddress.getLocalHost(), 8888)) { InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), 8888); SocketChannel socketChannel2 = SocketChannel.open(address);
Thread sendMsgThread = new Thread(() -> { msgList.forEach(msg -> { try { socketChannel2.write(ByteBuffer.wrap(msg.getBytes())); } catch (IOException e) { e.printStackTrace(); } }); }); sendMsgThread.start();
int bytesRead; ByteBuffer buf = ByteBuffer.allocate(9); while ((bytesRead = socketChannel2.read(buf)) != -1) { System.out.println("client:" + new String(buf.array(), 0, bytesRead, StandardCharsets.UTF_8)); buf.clear(); } } } }
|
客户端向服务端发送三条消息分别是 WWW AMD YES,服务端每收到一条消息会向客户端发送 Hello NIO,上面代码简化实现使用了固定长度的消息来避免半包和粘包问题。也简化了 Buffer 的用法。
总结
可以看到,在前面两个样例中,IO 都是同步阻塞模式,所以需要多线程以实现多任务处理。而 NIO 则是利用了单线程轮询事件的机制,通过高效地定位就绪的 Channel,来决定做什么,仅仅 select 阶段是阻塞的,可以有效避免大量客户端连接时,频繁线程切换带来的问题,应用的扩展能力有了非常大的提高。学习Java NIO 是为了更好的理解Netty,毕竟Netty的优点实在太多了。
Netty优点
代码地址 https://github.com/pepsiyoung/hexo-blog-demo/tree/main/console/src/main/java/nio
参考资料
《Java 核心技术面试精讲》