Quasar - Fiber, Channel, Actor
Quasarは軽量スレッド、GoライクなChannl、ErlangライクなActorや、非同期プログラミングツールを提供するJavaのライブラリです。
今回は、Fiber、Channel、Actorを試してみました。
Bytecode Instrumentation
このライブラリを使うには、Instrumentationを使ってバイトコードの書き換えを行う必要があります。この書き換えは、JVMの起動オプションにQuasarが提供するJava Agentを指定して実行時に書き換えを行うか、もしくはAntタスクを使って事前に書き換えを実行してく必要があります。今回はJava Agentを使って試してみました。どちらの使い方も以下のページに記載されています。
Fiber
Fiberは軽量スレッドです。インターフェースははThread/Runnableに近いので、特に難しくありません。
FiberSample.java:
1package net.wrap_trap.quasar_sample;
2
3import java.util.concurrent.ExecutionException;
4
5import co.paralleluniverse.fibers.Fiber;
6import co.paralleluniverse.fibers.SuspendExecution;
7
8@SuppressWarnings("serial")
9public class FiberSample {
10
11 public static void main(String[] args) throws ExecutionException, InterruptedException {
12 Fiber<Void> a = new Fiber<Void>() {
13 @Override
14 protected Void run() throws InterruptedException, SuspendExecution {
15 for(int i = 0; i < 10; i ++) {
16 System.out.print("a");
17 sleep(1000L);
18 }
19 return null;
20 }
21 }.start();
22
23 Fiber<Void> b = new Fiber<Void>() {
24 @Override
25 protected Void run() throws InterruptedException, SuspendExecution {
26 for(int i = 0; i < 10; i ++) {
27 System.out.print("b");
28 sleep(1000L);
29 }
30 return null;
31 }
32 }.start();
33
34 a.join();
35 b.join();
36 }
37}
実行結果:
abababababababababab
Fiberの実装が気になるところですが、今回は概要を。ザッとドキュメントを読んだ限りだと、以下のような記述があります。
- 継続(continuation)を使っている
- Instrumentationを使って継続(continuation)を実現している
- Instrumentationでバイトコードの書き換えが行われるポイントは以下の2点
- throws SuspendExecution
- @Suspendable
- FiberのスケジューリングはForkJoinPoolを使っている
Fiberがどのくらい軽量かについては、以下のページにベンチの結果が載っています。
Channel
Fiber間やスレッド間でメッセージングを行う為の仕組みとしてChannelが提供されています。Fiber間でChannelを使ってメッセージングを行うコードを書いてみました。
ChannelSample.java:
1package net.wrap_trap.quasar_sample;
2
3import java.util.Calendar;
4import java.util.concurrent.ExecutionException;
5
6import co.paralleluniverse.fibers.Fiber;
7import co.paralleluniverse.fibers.SuspendExecution;
8import co.paralleluniverse.strands.channels.Channel;
9import co.paralleluniverse.strands.channels.Channels;
10import co.paralleluniverse.strands.channels.Channels.OverflowPolicy;
11
12@SuppressWarnings("serial")
13public class ChannelSample {
14
15 public static void main(String[] args) throws ExecutionException, InterruptedException {
16 Channel<String> channel = Channels.newChannel(0, OverflowPolicy.BLOCK, false, false);
17 Fiber<Void> receiver = new Fiber<Void>() {
18 @Override
19 protected Void run() throws SuspendExecution, InterruptedException {
20 while(true) {
21 String message = channel.receive();
22 System.out.println("received: " + message);
23 }
24 }
25 }.start();
26
27 Fiber<Void> sender = new Fiber<Void>() {
28 @Override
29 protected Void run() throws SuspendExecution, InterruptedException {
30 int i = 0;
31 while(true) {
32 String message = String.format("foo%d", i++);
33 channel.send(message);
34 System.out.println("sent: " + message);
35 sleep(5000L);
36 }
37 }
38 }.start();
39
40 receiver.join();
41 sender.join();
42 }
43}
sent: foo0
received: foo0
sent: foo1
received: foo1
sent: foo2
received: foo2
sent: foo3
received: foo3
sent: foo4
received: foo4
sent: foo5
received: foo5
...
Channel#receiveを呼び出すと、メッセージを受信するまでブロックします。golangを使ったことが無いのでこれが”Go-like channels”なのか良く分かりませんが、Fiber間のメッセージ送受信を簡単に書けるようになっていると思います。
また、ChannelのExampleを見ると、スレッド-Fiber間でもChannelを介してメッセージのやり取りができるようです。
Actor
JavaでActorといえばAkkaですが、Quasarを使ってActorを書くことができます。特にServerActorはErlangのgen_serverのようなインターフェースを提供しています。
ServerActorを使ってサンプルコードを書いてみました。call
が同期呼び出し、cast
が非同期呼び出し、sendMessage
がメッセージ送信です。サーバはメッセージを受信したらシャットダウンするようにしています。このあたりは”Erlang-like actors”を感じます。
ActorSample.java:
1package net.wrap_trap.quasar_sample;
2
3import java.util.concurrent.ExecutionException;
4import java.util.concurrent.TimeoutException;
5
6import co.paralleluniverse.actors.ActorRef;
7import co.paralleluniverse.actors.BasicActor;
8import co.paralleluniverse.actors.behaviors.RequestMessage;
9import co.paralleluniverse.actors.behaviors.RequestReplyHelper;
10import co.paralleluniverse.actors.behaviors.Server;
11import co.paralleluniverse.actors.behaviors.ServerActor;
12import co.paralleluniverse.actors.behaviors.ValueResponseMessage;
13import co.paralleluniverse.fibers.SuspendExecution;
14import co.paralleluniverse.strands.Strand;
15
16@SuppressWarnings("serial")
17public class ActorSample {
18
19 public static void main(String[] args) throws InterruptedException, SuspendExecution, ExecutionException, TimeoutException {
20 Server<Request, Integer, Request> s = startServer();
21
22 call(s);
23 cast(s);
24 sendMessage(s);
25 }
26
27 private static Server<Request, Integer, Request> startServer() {
28 return new ServerActor<Request, Integer, Request>() {
29 @Override
30 public void init() throws SuspendExecution {
31 System.out.println("server: init");
32 }
33
34 @Override
35 public Integer handleCall(ActorRef<?> from, Object id, Request m) {
36 System.out.println("server: handleCall");
37 return m.a + m.b;
38 }
39
40 @SuppressWarnings("unchecked")
41 public void handleCast(ActorRef<?> from, Object id, Request m) throws SuspendExecution {
42 System.out.println("server: handleCast");
43 RequestReplyHelper.reply(m, m.a + m.b);
44 }
45
46 @Override
47 protected void handleInfo(Object r) throws SuspendExecution {
48 System.out.println("server: handleInfo");
49 Request m = (Request)r;
50 System.out.println(String.format("a=%d, b=%d", m.a, m.b));
51 ServerActor.currentServerActor().shutdown();
52 }
53
54 @Override
55 protected void terminate(Throwable cause) throws SuspendExecution {
56 System.out.println("server: terminate");
57 }
58 }.spawnThread();
59 }
60
61 private static void call(Server<Request, Integer, Request> s) {
62 new BasicActor<Void, Void>() {
63 protected Void doRun() throws SuspendExecution, InterruptedException {
64 System.out.println("client: s.call(new Request(3, 4))");
65 Integer ret = s.call(new Request(3, 4));
66 System.out.println("client: s.call(new Request(3, 4)): " + ret);
67 return null;
68 }
69 }.spawn();
70 }
71
72 private static void cast(Server<Request, Integer, Request> s) {
73 new BasicActor<ValueResponseMessage<Integer>, Void>() {
74 protected Void doRun() throws SuspendExecution, InterruptedException {
75 System.out.println("client: s.cast(new Request(3, 4))");
76 s.cast(new Request(ref, 3, 4));
77 ValueResponseMessage<Integer> ret = receive();
78 System.out.println("client: s.cast(new Request(3, 4)): " + ret.getValue());
79 return null;
80 }
81 }.spawn();
82 }
83
84 private static void sendMessage(Server<Request, Integer, Request> s) {
85 new BasicActor<Void, Void>() {
86 protected Void doRun() throws SuspendExecution, InterruptedException {
87 Strand.sleep(3000L); // wait for sending shutdown request
88 System.out.println("client: s.send(new Request(3, 4))");
89 s.send(new Request(3, 4));
90 return null;
91 }
92 }.spawn();
93 }
94
95 @SuppressWarnings("rawtypes")
96 static class Request extends RequestMessage {
97 final int a;
98 final int b;
99
100 public Request(int a, int b) {
101 this.a = a;
102 this.b = b;
103 }
104
105 @SuppressWarnings("unchecked")
106 public Request(ActorRef<?> ref, int a, int b) {
107 super(ref);
108 this.a = a;
109 this.b = b;
110 }
111 }
112}
server: init
client: s.call(new Request(3, 4))
server: handleCall
client: s.call(new Request(3, 4)): 7
client: s.cast(new Request(3, 4))
server: handleCast
client: s.cast(new Request(3, 4)): 7
client: s.send(new Request(3, 4))
server: handleInfo
a=3, b=4
server: terminate
Erlangのように、LinkやMonitor、Supervisor、Hot Code Swapping等の機能があるようです。
Conclusion
ベンチの結果を見る限りではスレッドより並列処理の性能が良さそうなので、Fiberは並列数が多い時に使えるかもしれません。また、簡単にActorを使えるのも嬉しいところです。
ただ、Instrumentationが必要なのは面倒です。また、バイトコードの書き換えがデバッグを難しくする可能性はあります。情報もまだ少ないので、期待どおりに動かないとツラい…