act-act about projects rss

Java NIO2 Asynchronous

Asynchronous

前回ノンブロッキングI/Oを使ってみましたが、今回はNIO2の非同期I/Oを試してみます。

NIO2の非同期I/Oは、大きく2つの方法が提供されています。

  • CompletionHandler
  • Future

今回は非同期I/OのCompletionHandlerを使ってecho serverを作ってみました。

CompletionHandler<V, A>

CompletionHandlerは、I/O操作時にコールバックとして登録しておくクラスのインターフェースです。I/O操作が成功した場合はこのインターフェースのcompletedが、失敗した場合はfailedが呼び出されます。

CompletionHandlerは2つのパラメータを取ります。1つ目が戻り値の型、2つ目がI/O操作時にattachするオブジェクトのクラスとなります。

今回の例では、accept/read/writeの際にこのインターフェースをimplementしたクラスをコールバックとして登録しています。read/writeについては入出力を行うAsynchronousSocketChannelのオブジェクトをattachしています。acceptの戻り値はAynchronousSocketChannel、read/writeの戻り値はIntegerとなる為、各パラメータは以下のようになります。

  • Acceptor implements CompletionHandler<AsynchronousSocketChannel, Void>
  • Reader implements CompletionHandler<Integer, AsynchronousSocketChannel>
  • Writer implements CompletionHandler<Integer, AsynchronousSocketChannel>

Implementation

CompletionHandlerを使用してecho serverを書いてみました。accept/read/writeの各処理のコールバックはそれぞれクラスを分けており、その分コードも長くなってますが、もちろん無名クラスで書くことも可能です。このあたりはJavaScriptでコールバックを登録するお馴染みの書き方に似ていることもあって、そんなに違和感がありません。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
    package net.wrap_trap.example.nio;

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.net.StandardSocketOptions;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousServerSocketChannel;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.nio.channels.CompletionHandler;

    public class AsyncEchoServer {

        private AsynchronousServerSocketChannel serverChannel;

        public void start() throws IOException {
            System.out.println(String.format("start: name: %s", Thread.currentThread().getName()));
            serverChannel = AsynchronousServerSocketChannel.open();
            serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
            serverChannel.bind(new InetSocketAddress(8000));
            serverChannel.accept(serverChannel, new Acceptor());
        }

        class Acceptor implements CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel> {

            private final ByteBuffer buffer = ByteBuffer.allocate(1024);

            public Acceptor(){
                System.out.println("an acceptor has created.");
            }

            public void completed(final AsynchronousSocketChannel channel, AsynchronousServerSocketChannel serverChannel) {
                System.out.println(String.format("write: name: %s", Thread.currentThread().getName()));
                channel.read(buffer, channel, new Reader(buffer));
                serverChannel.accept(serverChannel, new Acceptor());
            }

            public void failed(Throwable exception, AsynchronousServerSocketChannel serverChannel) {
                throw new RuntimeException(exception);
            }
        }

        class Reader implements CompletionHandler<Integer, AsynchronousSocketChannel> {

            private ByteBuffer buffer;

            public Reader(ByteBuffer buffer){
                this.buffer = buffer;
            }

            public void completed(Integer result, AsynchronousSocketChannel channel){
                System.out.println(String.format("read: name: %s", Thread.currentThread().getName()));
                if(result != null && result < 0){
                    try{
                      channel.close();
                      return;
                    }catch(IOException ignore){}
                }
                buffer.flip();
                channel.write(buffer, channel, new Writer(buffer));
            }
            public void failed(Throwable exception, AsynchronousSocketChannel channel){
                throw new RuntimeException(exception);
            }                    
        }

        class Writer implements CompletionHandler<Integer, AsynchronousSocketChannel> {

            private ByteBuffer buffer;

            public Writer(ByteBuffer buffer){
                this.buffer = buffer;
            }

            public void completed(Integer result, AsynchronousSocketChannel channel) {
                System.out.println(String.format("write: name: %s", Thread.currentThread().getName()));
                buffer.clear();
                channel.read(buffer, channel, new Reader(buffer));
            }

            public void failed(Throwable exception, AsynchronousSocketChannel channel) {
                throw new RuntimeException(exception);
            }
        }

        public static void main(String[] args) throws IOException, InterruptedException{
            new AsyncEchoServer().start();
            while(true){
                Thread.sleep(1000L);
            }
        }
    }

Acceptor#completedの最後にAsynchronousSocketChannel#acceptを呼び出しているのは、2つめ以降の接続を受け付ける為です。Writer#completedの最後にAynchronousSocketChannel#readを呼び出しているのは、クライアントから1つの接続につき2回以上のsendを受け付ける為です。

Thread

このecho serverを実行すると、標準出力に以下のように出力されます。

1
2
3
4
5
6
7
8
9
    start: name: main
    an acceptor has created.
    accept: name: Thread-3
    an acceptor has created.
    read: name: Thread-2
    write: name: Thread-1
    read: name: Thread-1
    write: name: Thread-2
    read: name: Thread-2

acceot/read/writeのコールバックは、mainスレッド以外のスレッドで実行されています。つまり非同期処理はスレッドを使って実現されていることになります。

上記結果を見る限り、1リクエストあたり最大3スレッド使用しており、スレッド自体が消費するリソースとコンテキストスイッチの回数が気になるところです。また、accept/read/write間で値を受け渡す際にthread local領域は使えないことになります。

Conclusion

今回はNIO2の非同期I/OのCompletionHandlerを使ってecho serverを書いて見ました。ノンブロッキングI/Oの時と比較すると、大分扱いやすいコードになっています。

CompletionHandlerを使用した場合、非同期I/Oの実現にはスレッドが使用されていることから、同時接続数がかなり多い場合にサーバのリソースを使ってしまうのではないかという懸念があります。