Introduction
たまたま仕事でnioを使っているコードを読む機会があり、それをキッカケにnioのI/O多重化を試してみました。
Single Process, Single Thread, I/O Multiplexing
「シングルプロセス、シングルスレッドで多重化」について、以下のページにて説明されていました。
インフラ技術の実験室 - Linuxネットワークプログラミング(シングルプロセス、シングルスレッドで多重化)
このページを読んで、I/0多重化によってaccept/recvを複数待ち受けることにより、シングルスレッドでも複数のリクエストに応答できると理解しました。
Java NIO I/O Multiplexing
次に、nioでI/O多重化を実現する方法を調べてみました。
「New I/Oで高速な入出力」第6回 ノンブロッキングI/Oを使ってみる:ITpro
nioにおけるI/O多重化はSelectableChannel/Selector/SelectionKeyを使って実現できることが分かり、これらを用いてecho serverを書いてみました。
1package net.wrap_trap.example.nio;
2
3import java.io.IOException;
4import java.net.InetSocketAddress;
5import java.nio.ByteBuffer;
6import java.nio.channels.SelectionKey;
7import java.nio.channels.Selector;
8import java.nio.channels.ServerSocketChannel;
9import java.nio.channels.SocketChannel;
10import java.util.Iterator;
11import java.util.Set;
12
13public class EchoServer {
14
15 private ServerSocketChannel serverChannel;
16 private Selector selector;
17
18 public EchoServer() throws IOException {
19 serverChannel = ServerSocketChannel.open();
20 serverChannel.socket().setReuseAddress(true);
21 serverChannel.socket().bind(new InetSocketAddress(8000));
22
23 serverChannel.configureBlocking(false);
24 selector = Selector.open();
25 serverChannel.register(selector, SelectionKey.OP_ACCEPT);
26 }
27
28 public void start() throws IOException {
29 System.out.println("start");
30 try {
31 while (selector.select() > 0) {
32 System.out.println("select loop");
33 Set<SelectionKey> keys = selector.selectedKeys();
34 for (Iterator<SelectionKey> it = keys.iterator(); it.hasNext();) {
35 SelectionKey key = it.next();
36 it.remove();
37
38 if (key.isAcceptable()) {
39 System.out.println("accept");
40 SocketChannel channel = serverChannel.accept();
41 channel.configureBlocking(false);
42 channel.register(key.selector(), SelectionKey.OP_READ);
43 } else {
44 SocketChannel channel = (SocketChannel) key.channel();
45 if (key.isReadable()) {
46 System.out.println("read");
47 ByteBuffer buffer = ByteBuffer.allocate(4096);
48 channel.read(buffer);
49 buffer.flip();
50 channel.write(buffer);
51 }
52 }
53 }
54 }
55 } finally {
56 for (SelectionKey key : selector.keys()) {
57 try {
58 key.channel().close();
59 } catch (IOException ignore) {
60 ignore.printStackTrace();
61 }
62 }
63 }
64 }
65
66 public static void main(String[] args) throws IOException {
67 EchoServer echoServer = new EchoServer();
68 echoServer.start();
69 }
70}
クライアントは以下のコードです。
1# -*- coding: utf-8 -*-
2
3require 'socket'
4
5s = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0)
6sockaddr = Socket.sockaddr_in(8000, '127.0.0.1')
7s.connect(sockaddr)
8s.write "Hello,"
9puts "Received #{s.recv(1024)}"
10s.write "world."
11puts "Received #{s.recv(1024)}"
12
13s.close
Run
client.rbを実行すると以下のように返ってきます。
C:\Users\masayuki\workspace\nio-example>c:\Ruby\Ruby-1.9.3\bin\ruby client.rb
Received Hello,
Received world.
クライアントの方は期待通りの結果でしたが、サーバ側の標準出力にはずっと文字が出続けてしまいます。
select loop
read
select loop
read
select loop
read
select loop
read
select loop
read
select loop
read
select loop
read
どうもbusy loopになっているようです。 Windows nativeの実装の問題かと考え、Linuxで同じコードを実行したものの、やはりbusy loopになってしまいます。
Why do the busy loop occur?
readに関する期待していた振る舞いは以下でした。
- SocketChannel#acceptしたchannelに対し、OP_READを指定してSelectorに登録
- Selector#select()を呼び出してクライアントからのsend(コード上ではs.write)を待つ(blocking)
- クライアントでsendするとSelector#select()から処理が戻る
- selectorからreadableなSelectionKeyを取り出して、sendされた内容を取得
2.の処理で、何故OP_READで登録したchannelがSelector#selectでブロッキングされないのかが理解できませんでした。サーバの出力内容を見る限り、OP_ACCEPTで登録した方はブロッキングされているように見えます。
When do the busy loop start?
サーバから見ると、accept後にOP_READで登録した後、すぐにbusy loopになっているように見えます。それを確認する為、クライアント側のコードでconnect後に数秒待つようにし、その間のサーバ側の状態を確認することにしました。
s.connect(sockaddr)
sleep 20 # when do the busy loop start?
s.write "Hello,"
すると、クライアント側でsleepしてる間、サーバ側の出力は以下となっていました。
start
select loop
accept
クライアントのsleepが終わった後、サーバ側でread loopが始まったことから、クライアントから最初のsendが来るまでの間はブロックできていることになります。つまり、クライアントが側の最初のsendに対してサーバ側でreadするところから、OP_READの状態がずっと続いてしまってます。
クライアントへのレスポンスを返した後、SocketChannel#close()で閉じ、またクライアントも1回のやり取りでsocketを閉じてしまえば問題はないのですが、今回の例にあるように、1本の接続で複数回やり取りを行うことができなくなってしまいます。
この例では1回のsendの終わりを明示的に示すものが無いので、サーバ側でsendの終わりを知ることができないのですが、仮にsendの終端を定義したとしても、「次のクライアントからのsendまで待つ」ということができない限りこのbusy loopの事象は解決できません。
もちろんread時にバッファに読み込んだサイズが0以下であればちょっとsleepする、というやり方でもbusy loopは回避できますが、そもそもSelectorにOP_READを監視させている意味が無くなってしまいます。
Cancel and Re-register OP_READ flag
最初のsendまではブロックされていることから、Selectorに登録しているOP_READの監視を一旦リセットできれば良いのではないかと考えました。そこで、サーバ側でクライアントに応答を返した後、SelectionKey#cancel()を一旦行い、Selectorに対するOP_READの監視を解除した後、再びSocketChannel#register(…, SelectionKey.OP_READ)を呼び出すようにしてみました。
1if (key.isReadable()) {
2 System.out.println("read");
3 ByteBuffer buffer = ByteBuffer.allocate(4096);
4 channel.read(buffer);
5 buffer.flip();
6 channel.write(buffer);
7 Selector selector = key.selector();
8 key.cancel();
9 channel.register(selector, SelectionKey.OP_READ);
10}
しかし、このコードを実行するとクライアントからの最初のsendの契機で以下の例外が発生します。
Exception in thread "main" java.nio.channels.CancelledKeyException
at sun.nio.ch.SelectionKeyImpl.ensureValid(Unknown Source)
at sun.nio.ch.SelectionKeyImpl.interestOps(Unknown Source)
at java.nio.channels.spi.AbstractSelectableChannel.register(Unknown Source)
at java.nio.channels.SelectableChannel.register(Unknown Source)
at net.wrap_trap.example.nio.EchoServer.start(EchoServer.java:53)
at net.wrap_trap.example.nio.EchoServer.main(EchoServer.java:71)
なお、上記コードの最後のSocketChannel#registerの呼び出しをコメントアウトすると、OP_READの監視が解かれます。それ以降、クライアント側でsendしても、Selector#selectのブロッキングが解除されません。
Set OP_WRITE flag
行き詰ったのでググったところ、dWに以下のような記述がありました。
実はコードの中に、NIOセレクタを使った事がない人には分かりにくい、ちょっとしたバグがあるのです。セレクタへのコールは常に即時に返るのですが、読むべきデータが無く、また受け付けるべき接続も無いので、このコールは単純にselect()コールにループして戻り、これでCPU使用率が100%になってしまうのです。面白い事に、I/OサービスのスレッドをループするとCPUを完全使用してしまうように見えるのですが、他のスレッドは必要と思われるだけ、いくらでもプロセッサー・サイクルが得られるので、目立つようなパフォーマンス低下を引き起こす事はないのです。
これを見る限り、busy loopによるCPU使用率の高騰は問題ではないようです。しかし、この後以下のように続いています。
ここでの鍵は、ソケットをWRITEモードで登録するのは、実際に書くべきデータがある場合に限定すべきだということです。書き込みのプロセスは部分的に成功しているだけかも知れないので、読み込みよりもやや複雑です。部分的に成功している場合にはソケットをWRITEモードで登録しておき、そのソケット・チャネルが書き込みできるようになる度に、さらに多くのデータを書き込むようにします。このように、やや複雑な手続きになってしまうのはソケットを多重化する欠点の一つです。長所としては当然ながら、複数ソケットを処理するのに必要なスレッドが少なくてすみ、つまりはサーバーがよりスケーラブルになるのです。スレッドが少なくなればVMが管理するスレッドも少なくなり、コンテキスト・スイッチも減り、メモリ使用も減るのです。 話をコードに戻すと、ソケットをREADモードでのみ登録すべきなのです:channel.register(selector, SelectionKey.OP_READ)。予期した通り、この修正を加えるとCPU利用率が大幅に減りました。これでMegaJogos は、何千ものスレッドを使う代わりに1つのスレッドだけで全ゲーム・プレーヤーにサービスすることができるようになったのです。
ということで、書き込む際にSelectionKeyにOP_WRITEを指定し、書き込みが終わったらOP_READに戻すようにしてみました。
1 } else if (key.isReadable()) {
2 SocketChannel channel = (SocketChannel) key.channel();
3 System.out.println("read");
4 ByteBuffer buffer = ByteBuffer.allocate(4096);
5 if (channel.read(buffer) > 0) {
6 buffer.flip();
7 buffers.add(buffer);
8 key.interestOps(SelectionKey.OP_WRITE);
9 }
10 } else if (key.isWritable()) {
11 System.out.println("write");
12 SocketChannel channel = (SocketChannel) key.channel();
13 for (ByteBuffer buffer : buffers) {
14 channel.write(buffer);
15 }
16 key.interestOps(SelectionKey.OP_READ);
17 }
read/writeの受け渡しにbuffersというインスタンス変数を用いてますが、本来であれば当然NGです。が、取り合えずbusy loopが無くなってCPU使用率が下がるかどうか確認してみましたが、残念ながらbusy loopはこのコードでも起きています。
Conclusion
このまま試行錯誤しても解決しそうにないので、nioでI/O多重化を行っているOSSの実装を何か見てみます。