Calcite の Adapter を書いているのですが、たまにうまく動かせなくて困ることがあるので、頭からコードを読んでみることにしました。
Calcite
GitHub の README.md を見ると、”Apache Calcite is a dynamic data management framework. For more details, see the home page.” と書かれており、詳細は home page を見ろ、となっています。
私が触れてきた範囲で書くと、Calcite は任意のデータソースに対して SQL で操作するインターフェースを提供する、データベースのツールキットです。例えば CSV ファイルに対して SQL で問い合わせるような機能(CSV Adapter)を提供しています。Calcite はサポートするデータソースごとに Adapter を実装するようになっていて、Geode や Cassandra 等の Adapter があります。また、ミドルウェアに組み込まれているケースもあります。詳細は Community のページを参照してください。
Test Code
処理の流れを見ていくにあたり、CSV Adapter を実装した example プロジェクトを使い、以下のようなテストコードを動かしました。model.json
では Adapter の SchemaFactory のクラス等を設定しています。
- Test Code
1 @Test
2 public void testExample() throws ClassNotFoundException, SQLException {
3 Class.forName("org.apache.calcite.jdbc.Driver");
4 try (Connection conn = DriverManager.getConnection("jdbc:calcite:model=target/test-classes/example-model.json", "foo", "bar")) {
5 try (Statement stmt = conn.createStatement()) {
6 stmt.executeQuery("select empno, gender, name from EMPS where name = 'John'");
7 }
8 }
9 }
- example-model.json
1{
2 "version": "1.0",
3 "defaultSchema": "SALES",
4 "schemas": [
5 {
6 "name": "SALES",
7 "type": "custom",
8 "factory": "org.apache.calcite.adapter.csv.CsvSchemaFactory",
9 "operand": {
10 "directory": "sales",
11 "flavor": "translatable"
12 }
13 }
14 ]
15}
JDBC
前記のテストコードでは Avatica という Calcite のサブプロジェクトの JDBC Driver が load されます。ただ、Calcite にも JDBC のクラスが付いており、最終的には CalciteConnectionImpl や CalcitePreparedStatement といったクラスが呼び出されるようになっています。
Planner
Calcite は指定された SQL に対して実行計画を作成します。この実行計画の作成には以下の2種類のクラスがあります。
JavaDoc を見ても、RBO / CBO といった表現は無いのですが、VolcanoPlanner は Cost を見ているので CBO と考えて良さそうです。なお、上記コードでも VolcanoPlanner が生成され、実行計画を作成しています。
422 /** Creates a query planner and initializes it with a default set of
423 * rules. */
424 protected RelOptPlanner createPlanner(
425 final CalcitePrepare.Context prepareContext,
426 org.apache.calcite.plan.Context externalContext,
427 RelOptCostFactory costFactory) {
428 if (externalContext == null) {
429 externalContext = Contexts.of(prepareContext.config());
430 }
431 final VolcanoPlanner planner =
432 new VolcanoPlanner(costFactory, externalContext);
SQL Parser
SQL は SqlParser が parse して SqlNode に変換しています。
596 if (query.sql != null) {
597 final CalciteConnectionConfig config = context.config();
598 final SqlParser.ConfigBuilder parserConfig = createParserConfig()
599 .setQuotedCasing(config.quotedCasing())
600 .setUnquotedCasing(config.unquotedCasing())
601 .setQuoting(config.quoting())
602 .setConformance(config.conformance())
603 .setCaseSensitive(config.caseSensitive());
604 final SqlParserImplFactory parserFactory =
605 config.parserFactory(SqlParserImplFactory.class, null);
606 if (parserFactory != null) {
607 parserConfig.setParserFactory(parserFactory);
608 }
609 SqlParser parser = createParser(query.sql, parserConfig);
610 SqlNode sqlNode;
611 try {
612 sqlNode = parser.parseStmt();
613 statementType = getStatementType(sqlNode.getKind());
614 } catch (SqlParseException e) {
615 throw new RuntimeException(
616 "parse failed: " + e.getMessage(), e);
617 }
select 文の場合は SqlNode を継承した SqlSelect のインスタンスが生成されます。SqlSelect は内部で from
や where
といった SqlNode を持っています。
- core/src/main/java/org/apache/calcite/sql/SqlSelect.java
33public class SqlSelect extends SqlCall { 34 //~ Static fields/initializers --------------------------------------------- 35 36 // constants representing operand positions 37 public static final int FROM_OPERAND = 2; 38 public static final int WHERE_OPERAND = 3; 39 public static final int HAVING_OPERAND = 5; 40 41 SqlNodeList keywordList; 42 SqlNodeList selectList; 43 SqlNode from; 44 SqlNode where; 45 SqlNodeList groupBy; 46 SqlNode having; 47 SqlNodeList windowDecls; 48 SqlNodeList orderBy; 49 SqlNode offset; 50 SqlNode fetch; 51
Logical Plan
次に Logical Plan を作成します。Logical Plan は SQL から生成されます。Logical Plan の時点では、planner による最適化は行われておらず、SQL に基づいた実行計画が作成されるだけです。
239 public PreparedResult prepareSql(
240 SqlNode sqlQuery,
241 SqlNode sqlNodeOriginal,
242 Class runtimeContextClass,
243 SqlValidator validator,
244 boolean needsValidation) {
245 init(runtimeContextClass);
246
247 final SqlToRelConverter.ConfigBuilder builder =
248 SqlToRelConverter.configBuilder()
249 .withTrimUnusedFields(true)
250 .withExpand(THREAD_EXPAND.get())
251 .withExplain(sqlQuery.getKind() == SqlKind.EXPLAIN);
252 final SqlToRelConverter sqlToRelConverter =
253 getSqlToRelConverter(validator, catalogReader, builder.build());
254
255 SqlExplain sqlExplain = null;
256 if (sqlQuery.getKind() == SqlKind.EXPLAIN) {
257 // dig out the underlying SQL statement
258 sqlExplain = (SqlExplain) sqlQuery;
259 sqlQuery = sqlExplain.getExplicandum();
260 sqlToRelConverter.setDynamicParamCountInExplain(
261 sqlExplain.getDynamicParamCount());
262 }
263
264 RelRoot root =
265 sqlToRelConverter.convertQuery(sqlQuery, needsValidation, true);
266 Hook.CONVERTED.run(root.rel);
sqlToRelConverter.convertQuery(sqlQuery, needsValidation, true);
の部分が Logical Plan の生成になります。前記のテストコードを実行すると、ここで以下のような Logical Plan が生成されます。上記の RelRoot
が 一番上の LogicalProject
になります。
1LogicalProject(subset=[rel#28:Subset#3.ENUMERABLE.[]], EMPNO=[$0], GENDER=[$2], NAME=[$1]): rowcount = 15.0, cumulative cost = {15.0 rows, 45.0 cpu, 0.0 io}, id = 23
2 LogicalFilter(subset=[rel#22:Subset#2.NONE.[]], condition=[=($1, 'John')]): rowcount = 15.0, cumulative cost = {15.0 rows, 100.0 cpu, 0.0 io}, id = 21
3 LogicalProject(subset=[rel#20:Subset#1.NONE.[]], EMPNO=[$0], NAME=[$1], GENDER=[$3]): rowcount = 100.0, cumulative cost = {100.0 rows, 300.0 cpu, 0.0 io}, id = 19
4 CsvTableScan(subset=[rel#18:Subset#0.ENUMERABLE.[]], table=[[SALES, EMPS]], fields=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 0
Physical Plan
生成された Logical Plan を元に Physical Plan を作成します。
319 root = optimize(root, getMaterializations(), getLattices());
optimize
が呼び出されると 最初の方で生成した VolcanoPlanner
によって 以下のような Physical Plan が生成されます。
12019-06-03 20:49:33,973 [main] DEBUG - Cheapest plan:
2EnumerableProject(EMPNO=[$0], GENDER=[$2], NAME=[$1]): rowcount = 15.0, cumulative cost = {71.66666666666667 rows, 187.08333333333334 cpu, 0.0 io}, id = 59
3 EnumerableFilter(condition=[=($1, 'John')]): rowcount = 15.0, cumulative cost = {56.66666666666667 rows, 142.08333333333334 cpu, 0.0 io}, id = 58
4 CsvTableScan(table=[[SALES, EMPS]], fields=[[0, 1, 3]]): rowcount = 100.0, cumulative cost = {41.66666666666667 rows, 42.083333333333336 cpu, 0.0 io}, id = 57
Enumerable* は Calcite のスタンダードな Adapter に属するクラスです。この example プロジェクトでは CSV Adapter として CsvTableScan や CsvTable といったクラスを提供して CSV ファイルからデータを取り出すようにし、それ以降は Enumerable Adapter を使ってデータを処理しています。
Implement
Physical Plan が決まると、その Physical Plan を構成するノードがクエリの実行に必要な処理を行います。
332 return implement(root);
このメソッド以降の処理内容は Adapter によって異なります。Enumerable Adapter の場合は、Physical Plan を基に Java のクラスを生成します。どの Enumerable クラスがどのようなコードを出力するかは、EnumerableProject や EnumerableFilter の implement メソッドで実装されている…と言いたいところなのですが、実際には optimize
の中で決まった Cheapest Plan に対して更に HepPlanner で変換が行われ、EnumerableProject と EnumerableFilter は EnumerableCalc に置き換わっているので、EnumerableCalc#implement が呼び出されて以下のようなクラスが生成されます。
/* 1 */ org.apache.calcite.DataContext root;
/* 2 */
/* 3 */ public org.apache.calcite.linq4j.Enumerable bind(final org.apache.calcite.DataContext root0) {
/* 4 */ root = root0;
/* 5 */ final org.apache.calcite.linq4j.Enumerable _inputEnumerable = ((org.apache.calcite.adapter.csv.CsvTranslatableTable) root.getRootSchema().getSubSchema("SALES").getTable("EMPS")).project(root, new int[] {
/* 6 */ 0,
/* 7 */ 1,
/* 8 */ 3});
/* 9 */ return new org.apache.calcite.linq4j.AbstractEnumerable(){
/* 10 */ public org.apache.calcite.linq4j.Enumerator enumerator() {
/* 11 */ return new org.apache.calcite.linq4j.Enumerator(){
/* 12 */ public final org.apache.calcite.linq4j.Enumerator inputEnumerator = _inputEnumerable.enumerator();
/* 13 */ public void reset() {
/* 14 */ inputEnumerator.reset();
/* 15 */ }
/* 16 */
/* 17 */ public boolean moveNext() {
/* 18 */ while (inputEnumerator.moveNext()) {
/* 19 */ final Object[] current = (Object[]) inputEnumerator.current();
/* 20 */ final String inp1_ = current[1] == null ? (String) null : current[1].toString();
/* 21 */ if (inp1_ != null && org.apache.calcite.runtime.SqlFunctions.eq(inp1_, "John")) {
/* 22 */ return true;
/* 23 */ }
/* 24 */ }
/* 25 */ return false;
/* 26 */ }
/* 27 */
/* 28 */ public void close() {
/* 29 */ inputEnumerator.close();
/* 30 */ }
/* 31 */
/* 32 */ public Object current() {
/* 33 */ final Object[] current = (Object[]) inputEnumerator.current();
/* 34 */ return new Object[] {
/* 35 */ current[0],
/* 36 */ current[2],
/* 37 */ current[1]};
/* 38 */ }
/* 39 */
/* 40 */ };
/* 41 */ }
/* 42 */
/* 43 */ };
/* 44 */ }
/* 45 */
/* 46 */
/* 47 */ public Class getElementType() {
/* 48 */ return java.lang.Object[].class;
/* 49 */ }
/* 50 */
/* 51 */
Statement
Implement でクラス(の文字列)を生成した後、そのクラスのインスタンスを生成します。
133 static Bindable getBindable(ClassDeclaration expr, String s, int fieldCount)
134 throws CompileException, IOException {
135 ICompilerFactory compilerFactory;
136 try {
137 compilerFactory = CompilerFactoryFactory.getDefaultCompilerFactory();
138 } catch (Exception e) {
139 throw new IllegalStateException(
140 "Unable to instantiate java compiler", e);
141 }
142 IClassBodyEvaluator cbe = compilerFactory.newClassBodyEvaluator();
143 cbe.setClassName(expr.name);
144 cbe.setExtendedClass(Utilities.class);
145 cbe.setImplementedInterfaces(
146 fieldCount == 1
147 ? new Class[] {Bindable.class, Typed.class}
148 : new Class[] {ArrayBindable.class});
149 cbe.setParentClassLoader(EnumerableInterpretable.class.getClassLoader());
150 if (CalcitePrepareImpl.DEBUG) {
151 // Add line numbers to the generated janino class
152 cbe.setDebuggingInformation(true, true, true);
153 }
154 return (Bindable) cbe.createInstance(new StringReader(s));
155 }
生成したインスタンス(bindable)を CalciteSignature
に詰め、それを Avatica の ResultSet に渡します。
551 signature = calciteConnection.parseQuery(query, context, maxRowCount);
552 statement.setSignature(signature);
553 final int updateCount;
554 switch (signature.statementType) {
555 case CREATE:
556 case DROP:
557 case ALTER:
558 case OTHER_DDL:
559 updateCount = 0; // DDL produces no result set
560 break;
561 default:
562 updateCount = -1; // SELECT and DML produces result set
563 break;
564 }
565 callback.assign(signature, null, updateCount);
566 }
652 public void assign(Meta.Signature signature, Meta.Frame firstFrame,
653 long updateCount) throws SQLException {
654 statement.setSignature(signature);
655
656 if (updateCount != -1) {
657 statement.updateCount = updateCount;
658 } else {
659 final TimeZone timeZone = getTimeZone();
660 statement.openResultSet = factory.newResultSet(statement, new QueryState(sql),
661 signature, timeZone, firstFrame);
662 }
663 }
あとは ResultSet#next が呼び出される度に生成されたインスタンスの moveNext
や current
が呼び出され、CSV のデータが取り出されるようになっています。
Conclusion
今回は Calcite の example プロジェクト プロジェクトを使って、CSV ファイルへのクエリ発行から以下の流れを見てきました。
- SQL の parse
- Logical Plan の生成
- Physical Plan の生成
- 実行クラスの生成
- ResultSet の生成
次は主要なクラスのコードを掘り下げて見ていきたいと思います。