Quasar - Fiber, Channel, Actor

Quasarは軽量スレッド、GoライクなChannl、ErlangライクなActorや、非同期プログラミングツールを提供するJavaのライブラリです。

今回は、Fiber、Channel、Actorを試してみました。

Bytecode Instrumentation

このライブラリを使うには、Instrumentationを使ってバイトコードの書き換えを行う必要があります。この書き換えは、JVMの起動オプションにQuasarが提供するJava Agentを指定して実行時に書き換えを行うか、もしくはAntタスクを使って事前に書き換えを実行してく必要があります。今回はJava Agentを使って試してみました。どちらの使い方も以下のページに記載されています。

Fiber

Fiberは軽量スレッドです。インターフェースははThread/Runnableに近いので、特に難しくありません。

FiberSample.java:

 1package net.wrap_trap.quasar_sample;
 2
 3import java.util.concurrent.ExecutionException;
 4
 5import co.paralleluniverse.fibers.Fiber;
 6import co.paralleluniverse.fibers.SuspendExecution;
 7
 8@SuppressWarnings("serial")
 9public class FiberSample {
10
11  public static void main(String[] args) throws ExecutionException, InterruptedException {
12    Fiber<Void> a = new Fiber<Void>() {
13      @Override
14      protected Void run() throws InterruptedException, SuspendExecution {
15        for(int i = 0; i < 10; i ++) {
16          System.out.print("a");
17          sleep(1000L);
18        }
19        return null;
20      }
21    }.start();
22
23    Fiber<Void> b = new Fiber<Void>() {
24      @Override
25      protected Void run() throws InterruptedException, SuspendExecution {
26        for(int i = 0; i < 10; i ++) {
27          System.out.print("b");
28          sleep(1000L);
29        }
30        return null;
31      }
32    }.start();
33
34    a.join();
35    b.join();
36  }
37}

実行結果:

abababababababababab

Fiberの実装が気になるところですが、今回は概要を。ザッとドキュメントを読んだ限りだと、以下のような記述があります。

Fiberがどのくらい軽量かについては、以下のページにベンチの結果が載っています。

Channel

Fiber間やスレッド間でメッセージングを行う為の仕組みとしてChannelが提供されています。Fiber間でChannelを使ってメッセージングを行うコードを書いてみました。

ChannelSample.java:

 1package net.wrap_trap.quasar_sample;
 2
 3import java.util.Calendar;
 4import java.util.concurrent.ExecutionException;
 5
 6import co.paralleluniverse.fibers.Fiber;
 7import co.paralleluniverse.fibers.SuspendExecution;
 8import co.paralleluniverse.strands.channels.Channel;
 9import co.paralleluniverse.strands.channels.Channels;
10import co.paralleluniverse.strands.channels.Channels.OverflowPolicy;
11
12@SuppressWarnings("serial")
13public class ChannelSample {
14
15  public static void main(String[] args) throws ExecutionException, InterruptedException {
16    Channel<String> channel = Channels.newChannel(0, OverflowPolicy.BLOCK, false, false);
17    Fiber<Void> receiver = new Fiber<Void>() {
18      @Override
19      protected Void run() throws SuspendExecution, InterruptedException {
20        while(true) {
21          String message = channel.receive();
22          System.out.println("received: " + message);
23        }
24      }
25    }.start();
26
27    Fiber<Void> sender = new Fiber<Void>() {
28      @Override
29      protected Void run() throws SuspendExecution, InterruptedException {
30        int i = 0;
31        while(true) {
32          String message = String.format("foo%d", i++);
33          channel.send(message);
34          System.out.println("sent: " + message);
35          sleep(5000L);
36        }
37      }
38    }.start();
39
40    receiver.join();
41    sender.join();
42  }
43}

sent: foo0
received: foo0
sent: foo1
received: foo1
sent: foo2
received: foo2
sent: foo3
received: foo3
sent: foo4
received: foo4
sent: foo5
received: foo5
...

Channel#receiveを呼び出すと、メッセージを受信するまでブロックします。golangを使ったことが無いのでこれが”Go-like channels”なのか良く分かりませんが、Fiber間のメッセージ送受信を簡単に書けるようになっていると思います。

また、ChannelのExampleを見ると、スレッド-Fiber間でもChannelを介してメッセージのやり取りができるようです。

Actor

JavaでActorといえばAkkaですが、Quasarを使ってActorを書くことができます。特にServerActorはErlangのgen_serverのようなインターフェースを提供しています。

ServerActorを使ってサンプルコードを書いてみました。callが同期呼び出し、castが非同期呼び出し、sendMessageがメッセージ送信です。サーバはメッセージを受信したらシャットダウンするようにしています。このあたりは”Erlang-like actors”を感じます。

ActorSample.java:

  1package net.wrap_trap.quasar_sample;
  2
  3import java.util.concurrent.ExecutionException;
  4import java.util.concurrent.TimeoutException;
  5
  6import co.paralleluniverse.actors.ActorRef;
  7import co.paralleluniverse.actors.BasicActor;
  8import co.paralleluniverse.actors.behaviors.RequestMessage;
  9import co.paralleluniverse.actors.behaviors.RequestReplyHelper;
 10import co.paralleluniverse.actors.behaviors.Server;
 11import co.paralleluniverse.actors.behaviors.ServerActor;
 12import co.paralleluniverse.actors.behaviors.ValueResponseMessage;
 13import co.paralleluniverse.fibers.SuspendExecution;
 14import co.paralleluniverse.strands.Strand;
 15
 16@SuppressWarnings("serial")
 17public class ActorSample {
 18
 19  public static void main(String[] args) throws InterruptedException, SuspendExecution, ExecutionException, TimeoutException {
 20    Server<Request, Integer, Request> s = startServer();
 21
 22    call(s);
 23    cast(s);
 24    sendMessage(s);
 25  }
 26
 27  private static Server<Request, Integer, Request> startServer() {
 28    return new ServerActor<Request, Integer, Request>() {
 29      @Override
 30      public void init() throws SuspendExecution {
 31        System.out.println("server: init");
 32      }
 33
 34      @Override
 35      public Integer handleCall(ActorRef<?> from, Object id, Request m) {
 36        System.out.println("server: handleCall");
 37        return m.a + m.b;
 38      }
 39
 40      @SuppressWarnings("unchecked")
 41      public void handleCast(ActorRef<?> from, Object id, Request m) throws SuspendExecution {
 42        System.out.println("server: handleCast");
 43        RequestReplyHelper.reply(m, m.a + m.b);
 44      }
 45
 46      @Override
 47      protected void handleInfo(Object r) throws SuspendExecution {
 48        System.out.println("server: handleInfo");
 49        Request m = (Request)r;
 50        System.out.println(String.format("a=%d, b=%d", m.a, m.b));
 51        ServerActor.currentServerActor().shutdown();
 52      }
 53
 54      @Override
 55      protected void terminate(Throwable cause) throws SuspendExecution {
 56        System.out.println("server: terminate");
 57      }
 58    }.spawnThread();
 59  }
 60
 61  private static void call(Server<Request, Integer, Request> s) {
 62    new BasicActor<Void, Void>() {
 63      protected Void doRun() throws SuspendExecution, InterruptedException {
 64        System.out.println("client: s.call(new Request(3, 4))");
 65        Integer ret = s.call(new Request(3, 4));
 66        System.out.println("client: s.call(new Request(3, 4)): " + ret);
 67        return null;
 68      }
 69    }.spawn();
 70  }
 71
 72  private static void cast(Server<Request, Integer, Request> s) {
 73    new BasicActor<ValueResponseMessage<Integer>, Void>() {
 74      protected Void doRun() throws SuspendExecution, InterruptedException {
 75        System.out.println("client: s.cast(new Request(3, 4))");
 76        s.cast(new Request(ref, 3, 4));
 77        ValueResponseMessage<Integer> ret = receive();
 78        System.out.println("client: s.cast(new Request(3, 4)): " + ret.getValue());
 79        return null;
 80      }
 81    }.spawn();
 82  }
 83
 84  private static void sendMessage(Server<Request, Integer, Request> s) {
 85    new BasicActor<Void, Void>() {
 86      protected Void doRun() throws SuspendExecution, InterruptedException {
 87        Strand.sleep(3000L); // wait for sending shutdown request
 88        System.out.println("client: s.send(new Request(3, 4))");
 89        s.send(new Request(3, 4));
 90        return null;
 91      }
 92    }.spawn();
 93  }
 94
 95  @SuppressWarnings("rawtypes")
 96  static class Request extends RequestMessage {
 97    final int a;
 98    final int b;
 99
100    public Request(int a, int b) {
101      this.a = a;
102      this.b = b;
103    }
104
105    @SuppressWarnings("unchecked")
106    public Request(ActorRef<?> ref, int a, int b) {
107      super(ref);
108      this.a = a;
109      this.b = b;
110    }
111  }
112}

server: init
client: s.call(new Request(3, 4))
server: handleCall
client: s.call(new Request(3, 4)): 7
client: s.cast(new Request(3, 4))
server: handleCast
client: s.cast(new Request(3, 4)): 7
client: s.send(new Request(3, 4))
server: handleInfo
a=3, b=4
server: terminate

Erlangのように、LinkやMonitor、Supervisor、Hot Code Swapping等の機能があるようです。

Conclusion

ベンチの結果を見る限りではスレッドより並列処理の性能が良さそうなので、Fiberは並列数が多い時に使えるかもしれません。また、簡単にActorを使えるのも嬉しいところです。

ただ、Instrumentationが必要なのは面倒です。また、バイトコードの書き換えがデバッグを難しくする可能性はあります。情報もまだ少ないので、期待どおりに動かないとツラい…