calcite

Image by 949158 from Pixabay

Calcite の Adapter を書いているのですが、たまにうまく動かせなくて困ることがあるので、頭からコードを読んでみることにしました。

Calcite

GitHub の README.md を見ると、”Apache Calcite is a dynamic data management framework. For more details, see the home page.” と書かれており、詳細は home page を見ろ、となっています。

私が触れてきた範囲で書くと、Calcite は任意のデータソースに対して SQL で操作するインターフェースを提供する、データベースのツールキットです。例えば CSV ファイルに対して SQL で問い合わせるような機能(CSV Adapter)を提供しています。Calcite はサポートするデータソースごとに Adapter を実装するようになっていて、GeodeCassandra 等の Adapter があります。また、ミドルウェアに組み込まれているケースもあります。詳細は Community のページを参照してください。

Test Code

処理の流れを見ていくにあたり、CSV Adapter を実装した example プロジェクトを使い、以下のようなテストコードを動かしました。model.json では Adapter の SchemaFactory のクラス等を設定しています。

1  @Test
2  public void testExample() throws ClassNotFoundException, SQLException {
3    Class.forName("org.apache.calcite.jdbc.Driver");
4    try (Connection conn = DriverManager.getConnection("jdbc:calcite:model=target/test-classes/example-model.json", "foo", "bar")) {
5      try (Statement stmt = conn.createStatement()) {
6        stmt.executeQuery("select empno, gender, name from EMPS where name = 'John'");
7      }
8    }
9  }
 1{
 2  "version": "1.0",
 3  "defaultSchema": "SALES",
 4  "schemas": [
 5    {
 6      "name": "SALES",
 7      "type": "custom",
 8      "factory": "org.apache.calcite.adapter.csv.CsvSchemaFactory",
 9      "operand": {
10        "directory": "sales",
11        "flavor": "translatable"
12      }
13    }
14  ]
15}

JDBC

前記のテストコードでは Avatica という Calcite のサブプロジェクトの JDBC Driver が load されます。ただ、Calcite にも JDBC のクラスが付いており、最終的には CalciteConnectionImplCalcitePreparedStatement といったクラスが呼び出されるようになっています。

Planner

Calcite は指定された SQL に対して実行計画を作成します。この実行計画の作成には以下の2種類のクラスがあります。

JavaDoc を見ても、RBO / CBO といった表現は無いのですが、VolcanoPlanner は Cost を見ているので CBO と考えて良さそうです。なお、上記コードでも VolcanoPlanner が生成され、実行計画を作成しています。

422  /** Creates a query planner and initializes it with a default set of
423   * rules. */
424  protected RelOptPlanner createPlanner(
425      final CalcitePrepare.Context prepareContext,
426      org.apache.calcite.plan.Context externalContext,
427      RelOptCostFactory costFactory) {
428    if (externalContext == null) {
429      externalContext = Contexts.of(prepareContext.config());
430    }
431    final VolcanoPlanner planner =
432        new VolcanoPlanner(costFactory, externalContext);

SQL Parser

SQL は SqlParser が parse して SqlNode に変換しています。

596    if (query.sql != null) {
597      final CalciteConnectionConfig config = context.config();
598      final SqlParser.ConfigBuilder parserConfig = createParserConfig()
599          .setQuotedCasing(config.quotedCasing())
600          .setUnquotedCasing(config.unquotedCasing())
601          .setQuoting(config.quoting())
602          .setConformance(config.conformance())
603          .setCaseSensitive(config.caseSensitive());
604      final SqlParserImplFactory parserFactory =
605          config.parserFactory(SqlParserImplFactory.class, null);
606      if (parserFactory != null) {
607        parserConfig.setParserFactory(parserFactory);
608      }
609      SqlParser parser = createParser(query.sql,  parserConfig);
610      SqlNode sqlNode;
611      try {
612        sqlNode = parser.parseStmt();
613        statementType = getStatementType(sqlNode.getKind());
614      } catch (SqlParseException e) {
615        throw new RuntimeException(
616            "parse failed: " + e.getMessage(), e);
617      }

select 文の場合は SqlNode を継承した SqlSelect のインスタンスが生成されます。SqlSelect は内部で fromwhere といった SqlNode を持っています。

Logical Plan

次に Logical Plan を作成します。Logical Plan は SQL から生成されます。Logical Plan の時点では、planner による最適化は行われておらず、SQL に基づいた実行計画が作成されるだけです。

239  public PreparedResult prepareSql(
240      SqlNode sqlQuery,
241      SqlNode sqlNodeOriginal,
242      Class runtimeContextClass,
243      SqlValidator validator,
244      boolean needsValidation) {
245    init(runtimeContextClass);
246
247    final SqlToRelConverter.ConfigBuilder builder =
248        SqlToRelConverter.configBuilder()
249            .withTrimUnusedFields(true)
250            .withExpand(THREAD_EXPAND.get())
251            .withExplain(sqlQuery.getKind() == SqlKind.EXPLAIN);
252    final SqlToRelConverter sqlToRelConverter =
253        getSqlToRelConverter(validator, catalogReader, builder.build());
254
255    SqlExplain sqlExplain = null;
256    if (sqlQuery.getKind() == SqlKind.EXPLAIN) {
257      // dig out the underlying SQL statement
258      sqlExplain = (SqlExplain) sqlQuery;
259      sqlQuery = sqlExplain.getExplicandum();
260      sqlToRelConverter.setDynamicParamCountInExplain(
261          sqlExplain.getDynamicParamCount());
262    }
263
264    RelRoot root =
265        sqlToRelConverter.convertQuery(sqlQuery, needsValidation, true);
266    Hook.CONVERTED.run(root.rel);

sqlToRelConverter.convertQuery(sqlQuery, needsValidation, true); の部分が Logical Plan の生成になります。前記のテストコードを実行すると、ここで以下のような Logical Plan が生成されます。上記の RelRoot が 一番上の LogicalProject になります。

1LogicalProject(subset=[rel#28:Subset#3.ENUMERABLE.[]], EMPNO=[$0], GENDER=[$2], NAME=[$1]): rowcount = 15.0, cumulative cost = {15.0 rows, 45.0 cpu, 0.0 io}, id = 23
2  LogicalFilter(subset=[rel#22:Subset#2.NONE.[]], condition=[=($1, 'John')]): rowcount = 15.0, cumulative cost = {15.0 rows, 100.0 cpu, 0.0 io}, id = 21
3    LogicalProject(subset=[rel#20:Subset#1.NONE.[]], EMPNO=[$0], NAME=[$1], GENDER=[$3]): rowcount = 100.0, cumulative cost = {100.0 rows, 300.0 cpu, 0.0 io}, id = 19
4      CsvTableScan(subset=[rel#18:Subset#0.ENUMERABLE.[]], table=[[SALES, EMPS]], fields=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 0

Physical Plan

生成された Logical Plan を元に Physical Plan を作成します。

319    root = optimize(root, getMaterializations(), getLattices());

optimize が呼び出されると 最初の方で生成した VolcanoPlanner によって 以下のような Physical Plan が生成されます。

12019-06-03 20:49:33,973 [main] DEBUG - Cheapest plan:
2EnumerableProject(EMPNO=[$0], GENDER=[$2], NAME=[$1]): rowcount = 15.0, cumulative cost = {71.66666666666667 rows, 187.08333333333334 cpu, 0.0 io}, id = 59
3  EnumerableFilter(condition=[=($1, 'John')]): rowcount = 15.0, cumulative cost = {56.66666666666667 rows, 142.08333333333334 cpu, 0.0 io}, id = 58
4    CsvTableScan(table=[[SALES, EMPS]], fields=[[0, 1, 3]]): rowcount = 100.0, cumulative cost = {41.66666666666667 rows, 42.083333333333336 cpu, 0.0 io}, id = 57

Enumerable* は Calcite のスタンダードな Adapter に属するクラスです。この example プロジェクトでは CSV Adapter として CsvTableScan や CsvTable といったクラスを提供して CSV ファイルからデータを取り出すようにし、それ以降は Enumerable Adapter を使ってデータを処理しています。

Implement

Physical Plan が決まると、その Physical Plan を構成するノードがクエリの実行に必要な処理を行います。

332    return implement(root);

このメソッド以降の処理内容は Adapter によって異なります。Enumerable Adapter の場合は、Physical Plan を基に Java のクラスを生成します。どの Enumerable クラスがどのようなコードを出力するかは、EnumerableProject や EnumerableFilter の implement メソッドで実装されている…と言いたいところなのですが、実際には optimize の中で決まった Cheapest Plan に対して更に HepPlanner で変換が行われ、EnumerableProject と EnumerableFilter は EnumerableCalc に置き換わっているので、EnumerableCalc#implement が呼び出されて以下のようなクラスが生成されます。

/*   1 */ org.apache.calcite.DataContext root;
/*   2 */ 
/*   3 */ public org.apache.calcite.linq4j.Enumerable bind(final org.apache.calcite.DataContext root0) {
/*   4 */   root = root0;
/*   5 */   final org.apache.calcite.linq4j.Enumerable _inputEnumerable = ((org.apache.calcite.adapter.csv.CsvTranslatableTable) root.getRootSchema().getSubSchema("SALES").getTable("EMPS")).project(root, new int[] {
/*   6 */     0,
/*   7 */     1,
/*   8 */     3});
/*   9 */   return new org.apache.calcite.linq4j.AbstractEnumerable(){
/*  10 */       public org.apache.calcite.linq4j.Enumerator enumerator() {
/*  11 */         return new org.apache.calcite.linq4j.Enumerator(){
/*  12 */             public final org.apache.calcite.linq4j.Enumerator inputEnumerator = _inputEnumerable.enumerator();
/*  13 */             public void reset() {
/*  14 */               inputEnumerator.reset();
/*  15 */             }
/*  16 */ 
/*  17 */             public boolean moveNext() {
/*  18 */               while (inputEnumerator.moveNext()) {
/*  19 */                 final Object[] current = (Object[]) inputEnumerator.current();
/*  20 */                 final String inp1_ = current[1] == null ? (String) null : current[1].toString();
/*  21 */                 if (inp1_ != null && org.apache.calcite.runtime.SqlFunctions.eq(inp1_, "John")) {
/*  22 */                   return true;
/*  23 */                 }
/*  24 */               }
/*  25 */               return false;
/*  26 */             }
/*  27 */ 
/*  28 */             public void close() {
/*  29 */               inputEnumerator.close();
/*  30 */             }
/*  31 */ 
/*  32 */             public Object current() {
/*  33 */               final Object[] current = (Object[]) inputEnumerator.current();
/*  34 */               return new Object[] {
/*  35 */                   current[0],
/*  36 */                   current[2],
/*  37 */                   current[1]};
/*  38 */             }
/*  39 */ 
/*  40 */           };
/*  41 */       }
/*  42 */ 
/*  43 */     };
/*  44 */ }
/*  45 */ 
/*  46 */ 
/*  47 */ public Class getElementType() {
/*  48 */   return java.lang.Object[].class;
/*  49 */ }
/*  50 */ 
/*  51 */ 

Statement

Implement でクラス(の文字列)を生成した後、そのクラスのインスタンスを生成します。

133  static Bindable getBindable(ClassDeclaration expr, String s, int fieldCount)
134      throws CompileException, IOException {
135    ICompilerFactory compilerFactory;
136    try {
137      compilerFactory = CompilerFactoryFactory.getDefaultCompilerFactory();
138    } catch (Exception e) {
139      throw new IllegalStateException(
140          "Unable to instantiate java compiler", e);
141    }
142    IClassBodyEvaluator cbe = compilerFactory.newClassBodyEvaluator();
143    cbe.setClassName(expr.name);
144    cbe.setExtendedClass(Utilities.class);
145    cbe.setImplementedInterfaces(
146        fieldCount == 1
147            ? new Class[] {Bindable.class, Typed.class}
148            : new Class[] {ArrayBindable.class});
149    cbe.setParentClassLoader(EnumerableInterpretable.class.getClassLoader());
150    if (CalcitePrepareImpl.DEBUG) {
151      // Add line numbers to the generated janino class
152      cbe.setDebuggingInformation(true, true, true);
153    }
154    return (Bindable) cbe.createInstance(new StringReader(s));
155  }

生成したインスタンス(bindable)を CalciteSignature に詰め、それを Avatica の ResultSet に渡します。

551    signature = calciteConnection.parseQuery(query, context, maxRowCount);
552    statement.setSignature(signature);
553    final int updateCount;
554    switch (signature.statementType) {
555    case CREATE:
556    case DROP:
557    case ALTER:
558    case OTHER_DDL:
559      updateCount = 0; // DDL produces no result set
560      break;

561    default:
562      updateCount = -1; // SELECT and DML produces result set
563      break;
564    }
565    callback.assign(signature, null, updateCount);
566  }
652    public void assign(Meta.Signature signature, Meta.Frame firstFrame,
653        long updateCount) throws SQLException {
654      statement.setSignature(signature);
655
656      if (updateCount != -1) {
657        statement.updateCount = updateCount;
658      } else {
659        final TimeZone timeZone = getTimeZone();
660        statement.openResultSet = factory.newResultSet(statement, new QueryState(sql),
661            signature, timeZone, firstFrame);
662      }
663    }

あとは ResultSet#next が呼び出される度に生成されたインスタンスの moveNextcurrent が呼び出され、CSV のデータが取り出されるようになっています。

Conclusion

今回は Calcite の example プロジェクト プロジェクトを使って、CSV ファイルへのクエリ発行から以下の流れを見てきました。

次は主要なクラスのコードを掘り下げて見ていきたいと思います。