act-act about projects rss

Java NIO Non-Blocking I/O

configureBlocking(false)

前回のコードで、acceptした後にOP_READでselectorにregisterする直前に、以下のような記述がありました。

SocketChannel channel = serverChannel.accept();
channel.configureBlocking(false);
channel.register(key.selector(), SelectionKey.OP_READ);

この「channel.configureBlocking(false)」を実行すると、I/Oがノンブロッキングになります。

しかし前回のecho serverを実際に動かしてみると、クライアントからのリクエストをreadする箇所がノンブロッキングで行われている感触がありませんでした。これは、Selector#select()がクライアントからconnectもしくはsendされるまでの間、このスレッドの処理をブロックしていることに起因します(下図の赤い箇所)。

"Selector#select() blocks a thread."

Non-Blocking I/O

もしreadがノンブロッキングであれば、read待ち中(この図の赤い部分)は何かしら他の処理ができるはずです。しかし、前回のコードではそれが行えませんでした。

これは、以下の2点に起因すると考えています。

  1. channelがread可能になる契機を、Selectorからの通知でフックしている
  2. Selectorはchannelの状態に変化があるまでスレッドをブロックする(Selector#select)

1.は、Selector#select以外の何らかの方法でchannelがread可能となる契機をフックできれば良いのだと思いますが、そのような方法を見つけることができませんでした。以下のページによると、Linuxではioctlシステムコールを使って実現できるようです。

Geekなページ: ノンブロッキングソケット

2.は、Selector#select()の代わりにSelector#select(long timeout)かSelector#selectNow()を使うことで、ブロッキングを解除することができます。

ということで、今回は2.の部分を変えてノンブロッキングI/Oを実感してみたいと思います。

Do something while waiting a read

前回のコードに対し、Selector#select()の代わりにSelector#select(long timeout)を使うことで、上図の赤い部分に処理を実施するように修正してみました。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
    package net.wrap_trap.example.nio;

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.ClosedChannelException;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;

    public class EchoServer {

        private ServerSocketChannel serverChannel;
        private Selector selector;

        public EchoServer() throws IOException {
            serverChannel = ServerSocketChannel.open();
            serverChannel.socket().setReuseAddress(true);
            serverChannel.socket().bind(new InetSocketAddress(8000));

            serverChannel.configureBlocking(false);
            selector = Selector.open();
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        }

        public void start() throws IOException, InterruptedException {
            System.out.println("start");
            try {
                while (true) {
                    int ret = selector.select(10);
                    System.out.println("select loop");
                    if (ret > 0) {
                        handleKeys();
                    } else {
                        System.out.println("do something...");
                        Thread.sleep(10);
                    }
                }
            } finally {
                for (SelectionKey key : selector.keys()) {
                    try {
                        key.channel().close();
                    } catch (IOException ignore) {
                        ignore.printStackTrace();
                    }
                }
            }
        }

        protected void handleKeys() throws IOException, ClosedChannelException {
            Set<SelectionKey> keys = selector.selectedKeys();
            for (Iterator<SelectionKey> it = keys.iterator(); it.hasNext();) {
                SelectionKey key = it.next();
                it.remove();

                if (key.isAcceptable()) {
                    System.out.println("accept");
                    SocketChannel channel = serverChannel.accept();
                    channel.configureBlocking(false);
                    channel.register(key.selector(), SelectionKey.OP_READ);
                } else {
                    SocketChannel channel = (SocketChannel) key.channel();
                    if (key.isReadable()) {
                        System.out.println("read");
                        ByteBuffer buffer = ByteBuffer.allocate(4096);
                        int len = channel.read(buffer);
                        if (len > 0) {
                            buffer.flip();
                            channel.write(buffer);
                        } else if (len == -1) {
                            key.cancel();
                            channel.close();
                        }
                    }
                }
            }
        }

        public static void main(String[] args) throws IOException, InterruptedException {
            EchoServer echoServer = new EchoServer();
            echoServer.start();
        }
    }

Selector#select(long timeout)のタイムアウト値は10ミリ秒としました。この値は、Nettyの実装がselect(10)となっていた為、今回参考にしました。10ミリ秒しかブロッキングしないのでCPU使用率が気になるところでしたが、使用率が大きく上昇するようなことはありませんでした。

上記コードでは、「do something…」が表示されている間でも、クライアントから文字列を送ればその文字列が返ってきます。accept待ちやread待ちの状態でも、何か処理を実行できることが実感できます。

実際には、例えばキューにタスクを詰めておいて、I/Oの処理が無い場合にはキューからタスクを取り出して実行する、といった用途なのだと思います。ただしシングルスレッドでそのような仕組みにした場合、キューから取り出したタスクの処理に時間がかかれば、その分echo serverのレスポンスも遅延すると思われます。

Conclusion

nioのノンブロッキングI/Oを実感してみました。上記コードのreadとwriteの間に、何かもう一つノンブロッキングI/Oによる処理を入れると、さらに利便性を感じられるのではないかと考えています。また、時間を見つけてnio2の非同期I/Oも試してみたいと思います。