calcite

Image by 949158 from Pixabay

Calcite Adapter

Calcite には Adapter という要素があります。Adapter は、SQL で問い合せるデータの種別ごと用意します。例えば、前回の説明で使用した CSV Adapter は、CSV 形式のデータを SQL で問い合わせる機能を提供します。今回は Avro 形式のデータを SQL で問い合わせる Avro Adapter を実装していきながら、Calcite のクラスを眺めていきます。

但し Adapter を一から作るのは大変なので、大部分の処理は Calcite の core パッケージにある Enumerable Adapter を使います。Enumerable Adapterは、row-wise な検索を行う為に必要な機能を持つ Adapter です。CSV AdapterEnumerable Adapter の機能を使っています。

SchemaFactory

Adapter を作るにあたり、最初に作成するクラスは SchemaFactory です。これは Calcite の Schema クラスのインスタンスを作成して返すシンプルな Factory クラスです。作成した SchemaFactory のクラス名を、JSON 形式の設定ファイルの factory に指定します。今回はこの JSON ファイルを model.json とします。 model.json の内容は以下のとおりです。

{
  "version": "1.0",
  "defaultSchema": "SAMPLES",
  "schemas": [
    {
      "name": "SAMPLES",
      "type": "custom",
      "factory": "net.wrap_trap.calcite_avro_sample.AvroSchemaFactory",
      "operand": {
        "directory": "samples"
      }
    }
  ]
}

このファイルは、JDBC の getConnection を呼び出す際に以下のように指定します。

Connection conn = DriverManager.getConnection("jdbc:calcite:model=target/test-classes/model.json");

SchemaFactory クラスのコードは以下のとおりです。

 1package net.wrap_trap.calcite_avro_sample;
 2
 3import org.apache.calcite.model.ModelHandler;
 4import org.apache.calcite.schema.Schema;
 5import org.apache.calcite.schema.SchemaFactory;
 6import org.apache.calcite.schema.SchemaPlus;
 7
 8import java.io.File;
 9import java.util.Map;
10
11public class AvroSchemaFactory implements  SchemaFactory {
12  public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) {
13    String directory = (String) operand.get("directory");
14    File base = (File) operand.get(ModelHandler.ExtraOperand.BASE_DIRECTORY.camelName);
15    File directoryFile = new File(directory);
16    if (base != null && !directoryFile.isAbsolute()) {
17      directoryFile = new File(base, directory);
18    }
19    return new AvroSchema(directoryFile);
20  }
21}

SchemaFactory クラスでは、この json ファイルの operand の値を読むことができます。今回は operand に Avro データファイルを置くディレクトリ名を指定しています。

Schema

Schema は Calcite の Table を管理するクラスです。この AvroSchema は、前記の model.jsonoperand で指定したディレクトリパスの下にある [TableName].avro ファイルを読み、Avro の スキーマ (Calcite の Schema とは異なります) と Avro のレコード(GenericData.Record) を取り出して AvroTable に渡しています。

 1package net.wrap_trap.calcite_avro_sample;
 2
 3import org.apache.avro.Schema;
 4import org.apache.avro.file.DataFileReader;
 5import org.apache.avro.generic.GenericData;
 6import org.apache.avro.generic.GenericDatumReader;
 7import org.apache.calcite.schema.Table;
 8import org.apache.calcite.schema.impl.AbstractSchema;
 9
10import java.io.File;
11import java.io.IOException;
12import java.util.*;
13
14
15public class AvroSchema extends AbstractSchema {
16  private Map<String, Table> tableMap;
17  private File directory;
18
19  public AvroSchema(File directory) {
20    this.directory = directory;
21  }
22
23  @Override
24  public Map<String, Table> getTableMap() {
25    if (tableMap == null) {
26      tableMap = new HashMap<>();
27      File[] avroFiles = directory.listFiles((dir, name) -> name.endsWith(".avro"));
28      Arrays.stream(avroFiles).forEach(file -> {
29        GenericDatumReader<GenericData.Record> datum = new GenericDatumReader<>();
30
31        try (DataFileReader<GenericData.Record> reader = new DataFileReader<>(file, datum)) {
32          Schema schema = reader.getSchema();
33          List<GenericData.Record> records = new ArrayList<>();
34          while (reader.hasNext()) {
35            GenericData.Record record = new GenericData.Record(schema);
36            reader.next(record);
37            records.add(record);
38          }
39          tableMap.put(
40            trim(file.getName(), ".avro").toUpperCase(),
41            new AvroTable(schema, records));
42        } catch (IOException e) {
43          throw new IllegalStateException(e);
44        }
45      });
46    }
47    return tableMap;
48  }

Table

Table (AbstractTable)

Calcite の Table は、テーブルの種別(Table, View 等)やフィールドタイプを返すインターフェースです。AvroTable では Avro ファイルから取り出したデータの各フィールドの型を参照し、Calcite のテーブルの型情報に変換する getRowType を実装しています。なお、以下のコードの schema は、前述の Calcite の Schema クラスではなく、Avro のスキーマを表す Schema クラスです。

42  @Override
43  public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
44    JavaTypeFactory typeFactory = (JavaTypeFactory) relDataTypeFactory;
45
46    List<Pair<String, RelDataType>> ret = schema.getFields().stream().map(field -> {
47      Schema.Type avroFieldType = field.schema().getType();
48      if (avroFieldType == Schema.Type.UNION) {
49        avroFieldType = getAvroNullableField(field);
50      }
51      RelDataType relDataType = AvroFieldType.of(avroFieldType).toType(typeFactory);
52      return new Pair<>(field.name().toUpperCase(), relDataType);
53    }).collect(Collectors.toList());
54    return relDataTypeFactory.createStructType(ret);
55  }

TranslatableTable

TranslatableTableTable を Relational Expression に変換する方法(toRel)を定義するインターフェースです。Relational Expression とは、Scan や Filter、Projection といった 実行計画に表れる要素です。ここでは AvroTableAvroTableScan に変更しています。

72  @Override
73  public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
74    int fieldCount = relOptTable.getRowType().getFieldCount();
75    Integer[] fields = AvroEnumerator.identityList(fieldCount);
76    return new AvroTableScan(context.getCluster(), relOptTable, fields);
77  }

その他(project)

project メソッドは直接どこからも呼び出されてません。これは後述する TableScan で生成するコードから呼び出されるメソッドです。Avro ファイルから読んだデータ(GenericData.Record)を Enumerator に詰めて返します。 fields には SELECT 句で指定されたフィールドのインデックスが格納されています。

16  public Enumerable<Object> project(DataContext root, Integer[] fields) {
17    return new AbstractEnumerable<Object>() {
18      public Enumerator<Object> enumerator() {
19        return new AvroEnumerator(records, fields);
20      }
21    };
22  }

Enumerator の形式に詰めて返したインスタンスは、Enumerable Adapter を経由して JDBC の ResultSet から呼び出されるようになります。

10public class AvroEnumerator implements Enumerator<Object> {
11
12  private List<GenericData.Record> records;
13  private Integer[] fields;
14  private int pos;
15
16  public AvroEnumerator(List<GenericData.Record> records, Integer[] fields) {
17    this.records = records;
18    this.fields = fields;
19    this.pos = -1;
20  }
21
22  @Override
23  public Object current() {
24    GenericData.Record record = records.get(this.pos);
25    return Arrays.stream(this.fields)
26             .map(record::get).collect(Collectors.toList()).toArray();
27  }
28
29  @Override
30  public boolean moveNext() {
31    return (this.records.size() > (++this.pos));
32  }
33
34  @Override
35  public void reset() {
36    this.pos = -1;
37  }
38
39  @Override
40  public void close() {}

TableScan

TableScan はテーブルのデータ (Row) を Scan する Relational Expression です。AvroTableScan の役割は、 AvroTable が保持している Avro のデータを Enumerator に詰めて後続の処理に渡すコードを生成することです。 いつくかポイントになる部分を説明していきます。

今回は Enumerable Adapter をベースに使うので、AvroTableScanEnumerableRel を implements し、Enumerable Adapter から呼び出させるようにします。こうすることで、検索対象のデータを読み込む部分は AvroAdapter で行い、Filter や Aggregation といった処理は Enumerable Adapter の機能を使うことになります。

19public class AvroTableScan extends TableScan implements EnumerableRel {
20  private Integer[] fields;

次に deriveRowType を実装します。deriveRowType は、SELECT 句で指定されたフィールドの型を返す為のメソッドです。 getTable().getRowType()AvroTable の全フィールドの情報を取得し、その中から SELECT 句で指定されたフィールドのインデックス fields に該当する型情報だけを返します。

33  @Override
34  public RelDataType deriveRowType() {
35    List<RelDataTypeField> fieldList = getTable().getRowType().getFieldList();
36    RelDataTypeFactory.Builder builder = getCluster().getTypeFactory().builder();
37    Arrays.stream(fields).forEach(field -> builder.add(fieldList.get(field)));
38    return builder.build();
39  }

register では、Relational Expression の変換ルールを登録します。 AvroProjectTableScanRule.INSTANCE は文字通り AvroProjectTableScanRule のインスタンスです。このインスタンスがどのように Relation Expression を変換するかは後述します。

40  @Override
41  public void register(RelOptPlanner planner) {
42    planner.addRule(AvroProjectTableScanRule.INSTANCE);
43  }

implementEnumerableRel インターフェースで定義しているメソッドで、Enumerable Adapter から呼び出されます。ここでは、前述したように Avro のデータを後続の処理に渡すコードを生成しています。Java コードの生成には linq4j というライブラリを使っています。

45  @Override
46  public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
47    PhysType physType = PhysTypeImpl.of(implementor.getTypeFactory(), getRowType(), pref.preferArray());
48    return implementor.result(physType, Blocks.toBlock(
49      Expressions.call(
50        this.table.getExpression(AvroTable.class),
51        "project",
52        implementor.getRootExpression(),
53        Expressions.constant(this.fields)
54      )
55    ));
56  }
57}

どのようなコードが生成されるかについては、前回の Implement のセクションを参照してください。

Rule

Rule は Relational Expression を変換するルールを定義しています。変換する(置き換える)実行計画のシーケンスを、コンストラクタで定義します。

12public class AvroProjectTableScanRule extends RelOptRule {
13
14  static final AvroProjectTableScanRule INSTANCE = new AvroProjectTableScanRule();
15
16  public AvroProjectTableScanRule() {
17    super(RelOptRule.operand(
18      LogicalProject.class,
19      RelOptRule.operand(AvroTableScan.class, RelOptRule.none())
20    ), "AvroProjectTableScanRule");
21  }

上記のコンストラクタでは、実行計画の一部が、

LogicalProject(EMP_ID=[$0], NAME=[$2])
  AvroTableScan(table=[[SAMPLES, TEST]])
    (END)

の場合に変換が行われることを意味します。変換が行われる場合は、以下の onMatch メソッドが呼び出されます。SELECT 句のどのフィールドが指定されたかを LogicalProject から取り出し LogicalProject + AvroTableScan を1つの AvroTableScan に変換します。

23  @Override
24  public void onMatch(RelOptRuleCall call) {
25    LogicalProject project = call.rel(0);
26    AvroTableScan scan = call.rel(1);
27    Integer[] fields = getProjectFields(project.getProjects());
28
29    call.transformTo(
30      new AvroTableScan(scan.getCluster(), scan.getTable(), fields)
31    );
32  }

このルールを登録した状態で select emp_id, name from test where emp_id=? を実行すると、まずこの SQL から Logical Plan として

LogicalFilter(condition=[=($0, ?0)])
  LogicalProject(EMP_ID=[$0], NAME=[$2])
    AvroTableScan(table=[[SAMPLES, TEST]])

が生成され AvroProjectTableScanRule で定義したルールや Enumerable Adapter のルールによって以下のような Physical Plan に変換され、実行されます。

EnumerableFilter(condition=[=($0, ?0)]): rowcount = 15.0, cumulative cost = {115.0 rows, 201.0 cpu, 0.0 io}, id = 41
  AvroTableScan(table=[[SAMPLES, TEST]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 39

Test Code

テストコードでは、JDBC API を使って SQL を発行し、Avro ファイル内のデータを取得できるか確認しています。

15public class JdbcTest {
16
17  private final static String CONNECTION_URL = "jdbc:calcite:model=target/test-classes/model.json";
18
19  @BeforeClass
20  public static void setUpOnce() throws ClassNotFoundException {
21    Class.forName("org.apache.calcite.jdbc.Driver");
22  }
23
24  @Test
25  public void filterByEmpId() throws Exception {
26    try (Connection conn = DriverManager.getConnection(CONNECTION_URL)) {
27       try (PreparedStatement pstmt = conn.prepareStatement("select emp_id, name from test where emp_id=?")) {
28         pstmt.setLong(1, 1L);
29        try (ResultSet rs = pstmt.executeQuery()) {
30          while (rs.next()) {
31            assertThat(rs.getLong("emp_id"), is(1L));
32            assertThat(rs.getString("name"), is("test1"));
33          }
34        }
35      }
36    }
37  }
38
39  @Test
40  public void filterByName() throws Exception {
41    try (Connection conn = DriverManager.getConnection(CONNECTION_URL)) {
42      try (PreparedStatement pstmt = conn.prepareStatement("select emp_id, name from test where name=?")) {
43        pstmt.setString(1, "test2");
44        try (ResultSet rs = pstmt.executeQuery()) {
45          while (rs.next()) {
46            assertThat(rs.getLong("emp_id"), is(2L));
47            assertThat(rs.getString("name"), is("test2"));
48          }
49        }
50      }
51    }
52  }
53}

Conclusion

Avro Adapter を実装しながら、主要なクラスを見てきました。今回説明したコードは以下にアップしています。

今回のように、Calcite の Enumerable Adapter を使うことにより、任意のデータ種別に対して SQL で問い合わせすることが簡単にできるようになっています。