Java中的NIO

前言

NIO(Non-blocking I/O)是在 Java 1.4 中引入了的框架,是一种同步非阻塞的I/O模型,也是I/O多路复用的基础,提供了更接近操作系统底层的高性能数据操作方式。

Netty 实际上就基于 Java NIO 技术封装完善之后得到一个高性能框架,熟悉 NIO 的基本概念对于学习和更好地理解 Netty 还是很有必要的,之后会出一篇关于Netty的文章。

基本概念

  • 区分同步或异步。同步是一种可靠的有序运行机制,进行同步操作时,后续的任务是等待当前调用返回,才会进行下一步;而异步则相反,其他任务不需要等待当前调用返回,通常依靠事件、回调等机制来实现任务间次序关系。
  • 区分阻塞与非阻塞。进行阻塞操作时,当前线程会处于阻塞状态,无法从事其他任务,只有当条件就绪才能继续;而非阻塞则是不管 IO 操作是否结束,直接返回,相应操作在后台继续处理。

NIO 主要组成部分

  • Buffer,高效的数据容器,除了布尔类型,所有原始数据类型都有相应的 Buffer 实现。

  • Channel,是 NIO 中被用来支持批量式 IO 操作的一种抽象。

    可以把Buffer理解为集装箱,而Channel就是货车,用来运输Buffer的。

  • Selector,是 NIO 实现多路复用的基础,它提供了一种高效的机制,可以检测到注册在 Selector 上的多个 Channel 中,是否有 Channel 处于就绪状态,进而实现了单线程对多 Channel 的高效管理。

简单实现

实现一个最简单的服务器应用,要求能够同时服务多个客户端请求即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class SimpleServer extends Thread {
private ServerSocket serverSocket;

public int getPort() {
return serverSocket.getLocalPort();
}

@SneakyThrows
@Override
public void run() {
try {
// 参数0 指随机绑定一个可用端口
serverSocket = new ServerSocket(0);
while (true) {
// 阻塞等待客户端连接
Socket socket = serverSocket.accept();
// 启动一个单独线程负责回复客户端请求
RequestHandler requestHandler = new RequestHandler(socket);
requestHandler.start();
}
} finally {
if (serverSocket != null) serverSocket.close();
}
}
}

@SneakyThrows使用了Lombok插件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class RequestHandler extends Thread {
private final Socket socket;

RequestHandler(Socket socket) {
this.socket = socket;
}

@SneakyThrows
@Override
public void run() {
// 简化实现,不做读取直接发送字符串
try (PrintWriter out = new PrintWriter(socket.getOutputStream())) {
out.println("Hello NIO!");
out.flush();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class SimpleDemo {
public static void main(String[] args) throws IOException, InterruptedException {

SimpleServer server = new SimpleServer();
server.start();

// 确保先开启server之后再执行下面的代码
TimeUnit.SECONDS.sleep(1);
try (Socket client = new Socket(InetAddress.getLocalHost(), server.getPort())) {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(client.getInputStream()));
bufferedReader.lines().forEach(System.out::println);
}
}
}

运行 SimpleDemo,一个简单的 Socket 服务器就被实现出来了。

上面的代码中每一个 Client 启动一个线程似乎都有些浪费。使用线程池的机制来优化下。

1
2
3
4
5
6
7
8
9
10
11
@SneakyThrows
@Override
public void run() {
serverSocket = new ServerSocket(0);
ExecutorService executor = Executors.newFixedThreadPool(12);
while (true) {
Socket socket = serverSocket.accept();
RequestHandler requestHandler = new RequestHandler(socket);
executor.execute(requestHandler);
}
}

通过一个固定大小的线程池,来负责管理工作线程,避免频繁创建、销毁线程的开销。如果连接数并不是非常多,只有最多几百个连接的普通应用,这种模式往往可以工作的很好。但是,如果连接数量急剧上升,这种实现方式就无法很好地工作了,因为线程上下文切换开销会在高并发时变得很明显。

完整实现

NIO 引入的多路复用机制,提供了另外一种思路。

  1. 将 Channel 注册到 Selector 中。

    Selector 类似调度员的角色,会去轮询处于就绪状态的Channel。

  2. 调用 Selector 的 select() 方法,这个方法会阻塞,当有 Channel 发生接入请求,就会被唤醒。

  3. 通过 SelectionKey 可以获取就绪 Channel 的集合,进行后续的 I/O 操作。

  4. 通过 SocketChannel 和 Buffer 进行数据操作。

Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class NIOServer extends Thread {

@SneakyThrows
@Override
public void run() {
try (Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {

serverSocketChannel.bind(new InetSocketAddress(InetAddress.getLocalHost(), 8888));
// 将该通道设置为非阻塞方式否则无法register
serverSocketChannel.configureBlocking(false);
// 注册到Selector,并说明关注点是连接(当有新连接时就会被唤醒)
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 阻塞等待就绪的Channel,这是关键点之一
selector.select(5000);

Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
SocketChannel clientChannel;
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
clientChannel = channel.accept();
// 将处理对应的客户端信息socketChannel设置为非阻塞
clientChannel.configureBlocking(false);
// 为该通道注册读就绪事件, 选择器会询问该通道的状态,当该通道就绪时,
clientChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
} else if (key.isReadable()) {
clientChannel = (SocketChannel) key.channel();
// 参数设置3 为了简化实现 不考虑半包、粘包问题
ByteBuffer byteBuffer = ByteBuffer.allocate(3);
clientChannel.read(byteBuffer);
String msg = new String(byteBuffer.array());
System.out.println("server:" + msg);
key.interestOps(SelectionKey.OP_WRITE);
} else if (key.isWritable()) {
clientChannel = (SocketChannel) key.channel();
clientChannel.write(Charset.defaultCharset().encode("Hello NIO"));
key.interestOps(SelectionKey.OP_READ);
}
iter.remove();
}
}
}
}
}

Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class NIODemo {
public static void main(String[] args) throws IOException, InterruptedException {

NIOServer server = new NIOServer();
server.start();
// 线程sleep500毫秒确保server端先执行
TimeUnit.MILLISECONDS.sleep(500);

// 客户端需要发送的消息 server端收到bye就断开连接
List<String> msgList = Arrays.asList("WWW", "AMD", "YES");

try (Socket client = new Socket(InetAddress.getLocalHost(), 8888)) {
InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), 8888);
SocketChannel socketChannel2 = SocketChannel.open(address);

// 创建一个线程模拟发送客户端消息
Thread sendMsgThread = new Thread(() -> {
msgList.forEach(msg -> {
try {
socketChannel2.write(ByteBuffer.wrap(msg.getBytes()));
} catch (IOException e) {
e.printStackTrace();
}
});
});
sendMsgThread.start();

// 接收server端发来的消息
int bytesRead;
// 参数设置9 为了简化实现 不考虑半包、粘包问题
ByteBuffer buf = ByteBuffer.allocate(9);
while ((bytesRead = socketChannel2.read(buf)) != -1) {
System.out.println("client:" + new String(buf.array(), 0, bytesRead, StandardCharsets.UTF_8));
buf.clear();
}
}
}
}

客户端向服务端发送三条消息分别是 WWW AMD YES,服务端每收到一条消息会向客户端发送 Hello NIO,上面代码简化实现使用了固定长度的消息来避免半包和粘包问题。也简化了 Buffer 的用法。

总结

可以看到,在前面两个样例中,IO 都是同步阻塞模式,所以需要多线程以实现多任务处理。而 NIO 则是利用了单线程轮询事件的机制,通过高效地定位就绪的 Channel,来决定做什么,仅仅 select 阶段是阻塞的,可以有效避免大量客户端连接时,频繁线程切换带来的问题,应用的扩展能力有了非常大的提高。学习Java NIO 是为了更好的理解Netty,毕竟Netty的优点实在太多了。

Netty优点

  • API使用简单,学习成本低

  • 功能强大,内置了多种解码编码器,支持多种协议。(有效解决半包和粘包问题)

  • 性能高,对比其他主流的NIO框架,Netty的性能最优

代码地址 https://github.com/pepsiyoung/hexo-blog-demo/tree/main/console/src/main/java/nio

参考资料

《Java 核心技术面试精讲》