act-act about projects rss

Getting Started with Quasar

Quasar - Fiber, Channel, Actor

Quasarは軽量スレッド、GoライクなChannl、ErlangライクなActorや、非同期プログラミングツールを提供するJavaのライブラリです。

今回は、Fiber、Channel、Actorを試してみました。

Bytecode Instrumentation

このライブラリを使うには、Instrumentationを使ってバイトコードの書き換えを行う必要があります。この書き換えは、JVMの起動オプションにQuasarが提供するJava Agentを指定して実行時に書き換えを行うか、もしくはAntタスクを使って事前に書き換えを実行してく必要があります。今回はJava Agentを使って試してみました。どちらの使い方も以下のページに記載されています。

Fiber

Fiberは軽量スレッドです。インターフェースははThread/Runnableに近いので、特に難しくありません。

FiberSample.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package net.wrap_trap.quasar_sample;

import java.util.concurrent.ExecutionException;

import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.SuspendExecution;

@SuppressWarnings("serial")
public class FiberSample {

  public static void main(String[] args) throws ExecutionException, InterruptedException {
    Fiber<Void> a = new Fiber<Void>() {
      @Override
      protected Void run() throws InterruptedException, SuspendExecution {
        for(int i = 0; i < 10; i ++) {
          System.out.print("a");
          sleep(1000L);
        }
        return null;
      }
    }.start();

    Fiber<Void> b = new Fiber<Void>() {
      @Override
      protected Void run() throws InterruptedException, SuspendExecution {
        for(int i = 0; i < 10; i ++) {
          System.out.print("b");
          sleep(1000L);
        }
        return null;
      }
    }.start();

    a.join();
    b.join();
  }
}

実行結果

abababababababababab

Fiberの実装が気になるところですが、今回は概要を。ザッとドキュメントを読んだ限りだと、以下のような記述があります。

  • 継続(continuation)を使っている
  • Instrumentationを使って継続(continuation)を実現している
  • Instrumentationでバイトコードの書き換えが行われるポイントは以下の2点
    • throws SuspendExecution
    • @Suspendable
  • FiberのスケジューリングはForkJoinPoolを使っている

Fiberがどのくらい軽量かについては、以下のページにベンチの結果が載っています。

Channel

Fiber間やスレッド間でメッセージングを行う為の仕組みとしてChannelが提供されています。Fiber間でChannelを使ってメッセージングを行うコードを書いてみました。

ChannelSample.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package net.wrap_trap.quasar_sample;

import java.util.Calendar;
import java.util.concurrent.ExecutionException;

import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.strands.channels.Channel;
import co.paralleluniverse.strands.channels.Channels;
import co.paralleluniverse.strands.channels.Channels.OverflowPolicy;

@SuppressWarnings("serial")
public class ChannelSample {

  public static void main(String[] args) throws ExecutionException, InterruptedException {
    Channel<String> channel = Channels.newChannel(0, OverflowPolicy.BLOCK, false, false);
    Fiber<Void> receiver = new Fiber<Void>() {
      @Override
      protected Void run() throws SuspendExecution, InterruptedException {
        while(true) {
          String message = channel.receive();
          System.out.println("received: " + message);
        }
      }
    }.start();

    Fiber<Void> sender = new Fiber<Void>() {
      @Override
      protected Void run() throws SuspendExecution, InterruptedException {
        int i = 0;
        while(true) {
          String message = String.format("foo%d", i++);
          channel.send(message);
          System.out.println("sent: " + message);
          sleep(5000L);
        }
      }
    }.start();

    receiver.join();
    sender.join();
  }
}
sent: foo0
received: foo0
sent: foo1
received: foo1
sent: foo2
received: foo2
sent: foo3
received: foo3
sent: foo4
received: foo4
sent: foo5
received: foo5
...

Channel#receiveを呼び出すと、メッセージを受信するまでブロックします。golangを使ったことが無いのでこれが”Go-like channels”なのか良く分かりませんが、Fiber間のメッセージ送受信を簡単に書けるようになっていると思います。

また、ChannelのExampleを見ると、スレッド-Fiber間でもChannelを介してメッセージのやり取りができるようです。

Actor

JavaでActorといえばAkkaですが、Quasarを使ってActorを書くことができます。特にServerActorはErlangのgen_serverのようなインターフェースを提供しています。

ServerActorを使ってサンプルコードを書いてみました。callが同期呼び出し、castが非同期呼び出し、sendMessageがメッセージ送信です。サーバはメッセージを受信したらシャットダウンするようにしています。このあたりは”Erlang-like actors”を感じます。

ActorSample.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package net.wrap_trap.quasar_sample;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import co.paralleluniverse.actors.ActorRef;
import co.paralleluniverse.actors.BasicActor;
import co.paralleluniverse.actors.behaviors.RequestMessage;
import co.paralleluniverse.actors.behaviors.RequestReplyHelper;
import co.paralleluniverse.actors.behaviors.Server;
import co.paralleluniverse.actors.behaviors.ServerActor;
import co.paralleluniverse.actors.behaviors.ValueResponseMessage;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.strands.Strand;

@SuppressWarnings("serial")
public class ActorSample {

  public static void main(String[] args) throws InterruptedException, SuspendExecution, ExecutionException, TimeoutException {
    Server<Request, Integer, Request> s = startServer();

    call(s);
    cast(s);
    sendMessage(s);
  }

  private static Server<Request, Integer, Request> startServer() {
    return new ServerActor<Request, Integer, Request>() {
      @Override
      public void init() throws SuspendExecution {
        System.out.println("server: init");
      }

      @Override
      public Integer handleCall(ActorRef<?> from, Object id, Request m) {
        System.out.println("server: handleCall");
        return m.a + m.b;
      }

      @SuppressWarnings("unchecked")
      public void handleCast(ActorRef<?> from, Object id, Request m) throws SuspendExecution {
        System.out.println("server: handleCast");
        RequestReplyHelper.reply(m, m.a + m.b);
      }

      @Override
      protected void handleInfo(Object r) throws SuspendExecution {
        System.out.println("server: handleInfo");
        Request m = (Request)r;
        System.out.println(String.format("a=%d, b=%d", m.a, m.b));
        ServerActor.currentServerActor().shutdown();
      }

      @Override
      protected void terminate(Throwable cause) throws SuspendExecution {
        System.out.println("server: terminate");
      }
    }.spawnThread();
  }

  private static void call(Server<Request, Integer, Request> s) {
    new BasicActor<Void, Void>() {
      protected Void doRun() throws SuspendExecution, InterruptedException {
        System.out.println("client: s.call(new Request(3, 4))");
        Integer ret = s.call(new Request(3, 4));
        System.out.println("client: s.call(new Request(3, 4)): " + ret);
        return null;
      }
    }.spawn();
  }

  private static void cast(Server<Request, Integer, Request> s) {
    new BasicActor<ValueResponseMessage<Integer>, Void>() {
      protected Void doRun() throws SuspendExecution, InterruptedException {
        System.out.println("client: s.cast(new Request(3, 4))");
        s.cast(new Request(ref, 3, 4));
        ValueResponseMessage<Integer> ret = receive();
        System.out.println("client: s.cast(new Request(3, 4)): " + ret.getValue());
        return null;
      }
    }.spawn();
  }

  private static void sendMessage(Server<Request, Integer, Request> s) {
    new BasicActor<Void, Void>() {
      protected Void doRun() throws SuspendExecution, InterruptedException {
        Strand.sleep(3000L); // wait for sending shutdown request

        System.out.println("client: s.send(new Request(3, 4))");
        s.send(new Request(3, 4));
        return null;
      }
    }.spawn();
  }

  @SuppressWarnings("rawtypes")
  static class Request extends RequestMessage {
    final int a;
    final int b;

    public Request(int a, int b) {
      this.a = a;
      this.b = b;
    }

    @SuppressWarnings("unchecked")
    public Request(ActorRef<?> ref, int a, int b) {
      super(ref);
      this.a = a;
      this.b = b;
    }
  }
}
server: init
client: s.call(new Request(3, 4))
server: handleCall
client: s.call(new Request(3, 4)): 7
client: s.cast(new Request(3, 4))
server: handleCast
client: s.cast(new Request(3, 4)): 7
client: s.send(new Request(3, 4))
server: handleInfo
a=3, b=4
server: terminate

Erlangのように、LinkやMonitor、Supervisor、Hot Code Swapping等の機能があるようです。

Conclusion

ベンチの結果を見る限りではスレッドより並列処理の性能が良さそうなので、Fiberは並列数が多い時に使えるかもしれません。また、簡単にActorを使えるのも嬉しいところです。

ただ、Instrumentationが必要なのは面倒です。また、バイトコードの書き換えがデバッグを難しくする可能性はあります。情報もまだ少ないので、期待どおりに動かないとツラい…