Calcite Adapter
Calcite には Adapter という要素があります。Adapter は、SQL で問い合せるデータの種別ごと用意します。例えば、前回の説明で使用した CSV Adapter は、CSV 形式のデータを SQL で問い合わせる機能を提供します。今回は Avro 形式のデータを SQL で問い合わせる Avro Adapter を実装していきながら、Calcite のクラスを眺めていきます。
但し Adapter を一から作るのは大変なので、大部分の処理は Calcite の core パッケージにある Enumerable Adapter を使います。Enumerable Adapterは、row-wise な検索を行う為に必要な機能を持つ Adapter です。CSV Adapter も Enumerable Adapter の機能を使っています。
SchemaFactory
Adapter を作るにあたり、最初に作成するクラスは SchemaFactory
です。これは Calcite の Schema
クラスのインスタンスを作成して返すシンプルな Factory クラスです。作成した SchemaFactory
のクラス名を、JSON 形式の設定ファイルの factory
に指定します。今回はこの JSON ファイルを model.json
とします。 model.json
の内容は以下のとおりです。
{
"version": "1.0",
"defaultSchema": "SAMPLES",
"schemas": [
{
"name": "SAMPLES",
"type": "custom",
"factory": "net.wrap_trap.calcite_avro_sample.AvroSchemaFactory",
"operand": {
"directory": "samples"
}
}
]
}
このファイルは、JDBC の getConnection
を呼び出す際に以下のように指定します。
Connection conn = DriverManager.getConnection("jdbc:calcite:model=target/test-classes/model.json");
SchemaFactory
クラスのコードは以下のとおりです。
- AvroSchemaFactory.java
1package net.wrap_trap.calcite_avro_sample;
2
3import org.apache.calcite.model.ModelHandler;
4import org.apache.calcite.schema.Schema;
5import org.apache.calcite.schema.SchemaFactory;
6import org.apache.calcite.schema.SchemaPlus;
7
8import java.io.File;
9import java.util.Map;
10
11public class AvroSchemaFactory implements SchemaFactory {
12 public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) {
13 String directory = (String) operand.get("directory");
14 File base = (File) operand.get(ModelHandler.ExtraOperand.BASE_DIRECTORY.camelName);
15 File directoryFile = new File(directory);
16 if (base != null && !directoryFile.isAbsolute()) {
17 directoryFile = new File(base, directory);
18 }
19 return new AvroSchema(directoryFile);
20 }
21}
SchemaFactory
クラスでは、この json ファイルの operand
の値を読むことができます。今回は operand
に Avro データファイルを置くディレクトリ名を指定しています。
Schema
Schema
は Calcite の Table
を管理するクラスです。この AvroSchema
は、前記の model.json
の operand
で指定したディレクトリパスの下にある [TableName].avro ファイルを読み、Avro の スキーマ (Calcite の Schema
とは異なります) と Avro のレコード(GenericData.Record) を取り出して AvroTable
に渡しています。
1package net.wrap_trap.calcite_avro_sample;
2
3import org.apache.avro.Schema;
4import org.apache.avro.file.DataFileReader;
5import org.apache.avro.generic.GenericData;
6import org.apache.avro.generic.GenericDatumReader;
7import org.apache.calcite.schema.Table;
8import org.apache.calcite.schema.impl.AbstractSchema;
9
10import java.io.File;
11import java.io.IOException;
12import java.util.*;
13
14
15public class AvroSchema extends AbstractSchema {
16 private Map<String, Table> tableMap;
17 private File directory;
18
19 public AvroSchema(File directory) {
20 this.directory = directory;
21 }
22
23 @Override
24 public Map<String, Table> getTableMap() {
25 if (tableMap == null) {
26 tableMap = new HashMap<>();
27 File[] avroFiles = directory.listFiles((dir, name) -> name.endsWith(".avro"));
28 Arrays.stream(avroFiles).forEach(file -> {
29 GenericDatumReader<GenericData.Record> datum = new GenericDatumReader<>();
30
31 try (DataFileReader<GenericData.Record> reader = new DataFileReader<>(file, datum)) {
32 Schema schema = reader.getSchema();
33 List<GenericData.Record> records = new ArrayList<>();
34 while (reader.hasNext()) {
35 GenericData.Record record = new GenericData.Record(schema);
36 reader.next(record);
37 records.add(record);
38 }
39 tableMap.put(
40 trim(file.getName(), ".avro").toUpperCase(),
41 new AvroTable(schema, records));
42 } catch (IOException e) {
43 throw new IllegalStateException(e);
44 }
45 });
46 }
47 return tableMap;
48 }
Table
Table (AbstractTable)
Calcite の Table は、テーブルの種別(Table, View 等)やフィールドタイプを返すインターフェースです。AvroTable
では Avro ファイルから取り出したデータの各フィールドの型を参照し、Calcite のテーブルの型情報に変換する getRowType
を実装しています。なお、以下のコードの schema
は、前述の Calcite の Schema クラスではなく、Avro のスキーマを表す Schema
クラスです。
- AvroTable.java
42 @Override
43 public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
44 JavaTypeFactory typeFactory = (JavaTypeFactory) relDataTypeFactory;
45
46 List<Pair<String, RelDataType>> ret = schema.getFields().stream().map(field -> {
47 Schema.Type avroFieldType = field.schema().getType();
48 if (avroFieldType == Schema.Type.UNION) {
49 avroFieldType = getAvroNullableField(field);
50 }
51 RelDataType relDataType = AvroFieldType.of(avroFieldType).toType(typeFactory);
52 return new Pair<>(field.name().toUpperCase(), relDataType);
53 }).collect(Collectors.toList());
54 return relDataTypeFactory.createStructType(ret);
55 }
TranslatableTable
TranslatableTable は Table
を Relational Expression に変換する方法(toRel)を定義するインターフェースです。Relational Expression とは、Scan や Filter、Projection といった 実行計画に表れる要素です。ここでは AvroTable
を AvroTableScan
に変更しています。
- AvroTable.java
72 @Override
73 public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
74 int fieldCount = relOptTable.getRowType().getFieldCount();
75 Integer[] fields = AvroEnumerator.identityList(fieldCount);
76 return new AvroTableScan(context.getCluster(), relOptTable, fields);
77 }
その他(project)
project
メソッドは直接どこからも呼び出されてません。これは後述する TableScan
で生成するコードから呼び出されるメソッドです。Avro ファイルから読んだデータ(GenericData.Record)を Enumerator に詰めて返します。 fields
には SELECT 句で指定されたフィールドのインデックスが格納されています。
- AvroTable.java
16 public Enumerable<Object> project(DataContext root, Integer[] fields) {
17 return new AbstractEnumerable<Object>() {
18 public Enumerator<Object> enumerator() {
19 return new AvroEnumerator(records, fields);
20 }
21 };
22 }
Enumerator の形式に詰めて返したインスタンスは、Enumerable Adapter を経由して JDBC の ResultSet から呼び出されるようになります。
- AvroEnumerable.java
10public class AvroEnumerator implements Enumerator<Object> {
11
12 private List<GenericData.Record> records;
13 private Integer[] fields;
14 private int pos;
15
16 public AvroEnumerator(List<GenericData.Record> records, Integer[] fields) {
17 this.records = records;
18 this.fields = fields;
19 this.pos = -1;
20 }
21
22 @Override
23 public Object current() {
24 GenericData.Record record = records.get(this.pos);
25 return Arrays.stream(this.fields)
26 .map(record::get).collect(Collectors.toList()).toArray();
27 }
28
29 @Override
30 public boolean moveNext() {
31 return (this.records.size() > (++this.pos));
32 }
33
34 @Override
35 public void reset() {
36 this.pos = -1;
37 }
38
39 @Override
40 public void close() {}
TableScan
TableScan はテーブルのデータ (Row) を Scan する Relational Expression です。AvroTableScan
の役割は、 AvroTable
が保持している Avro のデータを Enumerator に詰めて後続の処理に渡すコードを生成することです。
いつくかポイントになる部分を説明していきます。
今回は Enumerable Adapter
をベースに使うので、AvroTableScan
は EnumerableRel
を implements し、Enumerable Adapter から呼び出させるようにします。こうすることで、検索対象のデータを読み込む部分は AvroAdapter
で行い、Filter や Aggregation といった処理は Enumerable Adapter の機能を使うことになります。
- AvroTableScan.java
19public class AvroTableScan extends TableScan implements EnumerableRel {
20 private Integer[] fields;
次に deriveRowType
を実装します。deriveRowType
は、SELECT 句で指定されたフィールドの型を返す為のメソッドです。 getTable().getRowType()
で AvroTable
の全フィールドの情報を取得し、その中から SELECT 句で指定されたフィールドのインデックス fields
に該当する型情報だけを返します。
- AvroTableScan.java
33 @Override
34 public RelDataType deriveRowType() {
35 List<RelDataTypeField> fieldList = getTable().getRowType().getFieldList();
36 RelDataTypeFactory.Builder builder = getCluster().getTypeFactory().builder();
37 Arrays.stream(fields).forEach(field -> builder.add(fieldList.get(field)));
38 return builder.build();
39 }
register
では、Relational Expression の変換ルールを登録します。 AvroProjectTableScanRule.INSTANCE
は文字通り AvroProjectTableScanRule
のインスタンスです。このインスタンスがどのように Relation Expression を変換するかは後述します。
- AvroTableScan.java
40 @Override
41 public void register(RelOptPlanner planner) {
42 planner.addRule(AvroProjectTableScanRule.INSTANCE);
43 }
implement
は EnumerableRel
インターフェースで定義しているメソッドで、Enumerable Adapter から呼び出されます。ここでは、前述したように Avro のデータを後続の処理に渡すコードを生成しています。Java コードの生成には linq4j というライブラリを使っています。
- AvroTableScan.java
45 @Override
46 public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
47 PhysType physType = PhysTypeImpl.of(implementor.getTypeFactory(), getRowType(), pref.preferArray());
48 return implementor.result(physType, Blocks.toBlock(
49 Expressions.call(
50 this.table.getExpression(AvroTable.class),
51 "project",
52 implementor.getRootExpression(),
53 Expressions.constant(this.fields)
54 )
55 ));
56 }
57}
どのようなコードが生成されるかについては、前回の Implement のセクションを参照してください。
Rule
Rule
は Relational Expression を変換するルールを定義しています。変換する(置き換える)実行計画のシーケンスを、コンストラクタで定義します。
- AvroProjectTableScanRule.java
12public class AvroProjectTableScanRule extends RelOptRule {
13
14 static final AvroProjectTableScanRule INSTANCE = new AvroProjectTableScanRule();
15
16 public AvroProjectTableScanRule() {
17 super(RelOptRule.operand(
18 LogicalProject.class,
19 RelOptRule.operand(AvroTableScan.class, RelOptRule.none())
20 ), "AvroProjectTableScanRule");
21 }
上記のコンストラクタでは、実行計画の一部が、
LogicalProject(EMP_ID=[$0], NAME=[$2])
AvroTableScan(table=[[SAMPLES, TEST]])
(END)
の場合に変換が行われることを意味します。変換が行われる場合は、以下の onMatch
メソッドが呼び出されます。SELECT 句のどのフィールドが指定されたかを LogicalProject
から取り出し LogicalProject
+ AvroTableScan
を1つの AvroTableScan
に変換します。
- AvroProjectTableScanRule.java
23 @Override
24 public void onMatch(RelOptRuleCall call) {
25 LogicalProject project = call.rel(0);
26 AvroTableScan scan = call.rel(1);
27 Integer[] fields = getProjectFields(project.getProjects());
28
29 call.transformTo(
30 new AvroTableScan(scan.getCluster(), scan.getTable(), fields)
31 );
32 }
このルールを登録した状態で select emp_id, name from test where emp_id=?
を実行すると、まずこの SQL から Logical Plan として
LogicalFilter(condition=[=($0, ?0)])
LogicalProject(EMP_ID=[$0], NAME=[$2])
AvroTableScan(table=[[SAMPLES, TEST]])
が生成され AvroProjectTableScanRule
で定義したルールや Enumerable Adapter のルールによって以下のような Physical Plan に変換され、実行されます。
EnumerableFilter(condition=[=($0, ?0)]): rowcount = 15.0, cumulative cost = {115.0 rows, 201.0 cpu, 0.0 io}, id = 41
AvroTableScan(table=[[SAMPLES, TEST]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 39
Test Code
テストコードでは、JDBC API を使って SQL を発行し、Avro ファイル内のデータを取得できるか確認しています。
- JdbcTest.java
15public class JdbcTest {
16
17 private final static String CONNECTION_URL = "jdbc:calcite:model=target/test-classes/model.json";
18
19 @BeforeClass
20 public static void setUpOnce() throws ClassNotFoundException {
21 Class.forName("org.apache.calcite.jdbc.Driver");
22 }
23
24 @Test
25 public void filterByEmpId() throws Exception {
26 try (Connection conn = DriverManager.getConnection(CONNECTION_URL)) {
27 try (PreparedStatement pstmt = conn.prepareStatement("select emp_id, name from test where emp_id=?")) {
28 pstmt.setLong(1, 1L);
29 try (ResultSet rs = pstmt.executeQuery()) {
30 while (rs.next()) {
31 assertThat(rs.getLong("emp_id"), is(1L));
32 assertThat(rs.getString("name"), is("test1"));
33 }
34 }
35 }
36 }
37 }
38
39 @Test
40 public void filterByName() throws Exception {
41 try (Connection conn = DriverManager.getConnection(CONNECTION_URL)) {
42 try (PreparedStatement pstmt = conn.prepareStatement("select emp_id, name from test where name=?")) {
43 pstmt.setString(1, "test2");
44 try (ResultSet rs = pstmt.executeQuery()) {
45 while (rs.next()) {
46 assertThat(rs.getLong("emp_id"), is(2L));
47 assertThat(rs.getString("name"), is("test2"));
48 }
49 }
50 }
51 }
52 }
53}
Conclusion
Avro Adapter を実装しながら、主要なクラスを見てきました。今回説明したコードは以下にアップしています。
今回のように、Calcite の Enumerable Adapter を使うことにより、任意のデータ種別に対して SQL で問い合わせすることが簡単にできるようになっています。