Databricks から Delta Lake というモジュールがリリースされました。これは Databricks で提供している Delta という Transactional なストレージの一部を OSS として公開したようです。このモジュールは現在 Spark に対応しています。

この Delta のトランザクションの仕組みが気になったので、GitHub に公開されているコードを clone し、データを append / overwrite するテストコードを動かしてトランザクションがどのような仕組みで実現されているか調べてみました。

Setup

公開されたコードは GitHub にあります。Scala が動く環境であればテストコードを動かすことはできます。Windows の場合は winutils が必要です。

package は org.apache.spark.sql.delta になっています。

https://github.com/delta-io/delta

Transaction Protocol

Delta はバージョン番号を使ってトランザクションを管理しています。バージョン番号は、データを更新する度に1つ上がります。バージョン番号や更新内容は、データの保存先のフォルダの配下の _delta_log というフォルダに JSON ファイルとして保存するようになっています。

https://github.com/delta-io/delta/blob/master/README.md#transaction-protocol

File Format

データのファイルフォーマットは Parquet が使われています。Parquet ファイルは特に拡張されているわけではないので、parquet-cliparquet-tools で読み取ることができます。

Append

org.apache.spark.sql.delta.DeltaSuiteOSS に “ppend then read” というテストがあります。このテストケースは、データの追加を3つのトランザクションに分けて実行します。このテストを実行し、どのようなファイルが生成されるか確認してみました。

 1  test("append then read") {
 2    val tempDir = Utils.createTempDir()
 3    Seq(1).toDF().write.format("delta").save(tempDir.toString)
 4    Seq(2, 3).toDF().write.format("delta").mode("append").save(tempDir.toString)
 5
 6    def data: DataFrame = spark.read.format("delta").load(tempDir.toString)
 7    checkAnswer(data, Row(1) :: Row(2) :: Row(3) :: Nil)
 8
 9    // append more
10    Seq(4, 5, 6).toDF().write.format("delta").mode("append").save(tempDir.toString)
11    checkAnswer(data.toDF(), Row(1) :: Row(2) :: Row(3) :: Row(4) :: Row(5) :: Row(6) :: Nil)
12  }

上記のテストコードを実行すると、以下のようなファイルが生成されます。

.
|-- _delta_log
|   |-- 00000000000000000000.json
|   |-- 00000000000000000001.json
|   `-- 00000000000000000002.json
|-- part-00000-6217e923-3003-4d3d-9347-78fc9cd4a465-c000.snappy.parquet
|-- part-00000-bcc5a893-4c8e-46b4-9e37-adbb0e447ccc-c000.snappy.parquet
|-- part-00000-cafe2cc5-8dd0-4d2d-a362-e9e1efd46628-c000.snappy.parquet
|-- part-00001-4717e754-ac60-46fd-8e46-bff299965988-c000.snappy.parquet
`-- part-00001-e61a04d4-e92a-4117-b40e-b6d100af1ad8-c000.snappy.parquet

厳密には上記の1ファイル毎に .crc ファイルも生成されますが、ここでは省略しています。

トランザクションは save の度に実行されます。前記のテストコードでは3回 save しているので _delta_logs に3つの JSON ファイルが生成されています。この json ファイルの番号が version になります。各ファイルを見ていきます。最初は Seq(1) を save したトランザクションです。

{"commitInfo":{"timestamp":1556452442349,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"}}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"43568f12-07f7-4c59-a870-54f3ef1c3c70","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1556452390011}}
{"add":{"path":"part-00000-6217e923-3003-4d3d-9347-78fc9cd4a465-c000.snappy.parquet","partitionValues":{},"size":396,"modificationTime":1556452442321,"dataChange":true}}

commitInfo はコミット時のタイムスタンプや操作が記録されています。protocol には read/write それぞれの min version が、metaData はスキーマ定義が記録されています。 protocolmetaData は version:0 の JSON ファイルにしか出力されません。add はこのトランザクションで追加された Parquet ファイル です。この Parquet ファイルは以下のような値が格納されています。

$ java -cp target/classes;target/dependency/* org.apache.parquet.cli.Main cat D:\tmp\delta\append\part-00000-6217e923-3003-4d3d-9347-78fc9cd4a465-c000.snappy.parquet
{"value": 1}

次に Seq(2,3) を保存したトランザクションの JSON ファイルを確認します。

{"commitInfo":{"timestamp":1556452453918,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0}}
{"add":{"path":"part-00000-cafe2cc5-8dd0-4d2d-a362-e9e1efd46628-c000.snappy.parquet","partitionValues":{},"size":396,"modificationTime":1556452453912,"dataChange":true}}
{"add":{"path":"part-00001-e61a04d4-e92a-4117-b40e-b6d100af1ad8-c000.snappy.parquet","partitionValues":{},"size":396,"modificationTime":1556452453899,"dataChange":true}}

commitInfomodeAppend になっています。また、add が2つ記録されています。これらのファイルを見てみます。

$ java -cp target/classes;target/dependency/* org.apache.parquet.cli.Main cat D:\tmp\delta\append\part-00000-cafe2cc5-8dd0-4d2d-a362-e9e1efd46628-c000.snappy.parquet
{"value": 2}
$ java -cp target/classes;target/dependency/* org.apache.parquet.cli.Main cat D:\tmp\delta\append\part-00001-e61a04d4-e92a-4117-b40e-b6d100af1ad8-c000.snappy.parquet
{"value": 3}

そして Seq(4,5,6) を保存したトランザクションの JSON ファイルと Parquet ファイルは以下のとおりです。

{"commitInfo":{"timestamp":1556452472683,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":1}}
{"add":{"path":"part-00000-bcc5a893-4c8e-46b4-9e37-adbb0e447ccc-c000.snappy.parquet","partitionValues":{},"size":396,"modificationTime":1556452472671,"dataChange":true}}
{"add":{"path":"part-00001-4717e754-ac60-46fd-8e46-bff299965988-c000.snappy.parquet","partitionValues":{},"size":400,"modificationTime":1556452472679,"dataChange":true}}
$ java -cp target/classes;target/dependency/* org.apache.parquet.cli.Main cat D:\tmp\delta\append\part-00000-bcc5a893-4c8e-46b4-9e37-adbb0e447ccc-c000.snappy.parquet
{"value": 4}
$ java -cp target/classes;target/dependency/* org.apache.parquet.cli.Main cat D:\tmp\delta\append\part-00001-4717e754-ac60-46fd-8e46-bff299965988-c000.snappy.parquet
{"value": 5}
{"value": 6}

このように _dalta_log 配下の JSON ファイルで version 毎にどの Parquet ファイルが追加されたかを記録しています。

Overwrite

GitHub にアップされているコードには overwrite のテストケースを見つけられなかったので、以下のコードを追加してみました。期待値も overwrite の期待値に変更しています。

 1  test("overwrite then read") {
 2    val tempDir = Utils.createTempDir()
 3    Seq(1).toDF().write.format("delta").save(tempDir.toString)
 4    Seq(2, 3).toDF().write.format("delta").mode("overwrite").save(tempDir.toString)
 5
 6    def data: DataFrame = spark.read.format("delta").load(tempDir.toString)
 7    checkAnswer(data, Row(2) :: Row(3) :: Nil)
 8
 9    Seq(4, 5, 6).toDF().write.format("delta").mode("overwrite").save(tempDir.toString)
10    checkAnswer(data.toDF(), Row(4) :: Row(5) :: Row(6) :: Nil)
11  }

それぞれ JSON ファイルを見ていきます。Parquet ファイルの内容は append の時と同様なので割愛します。

.
|-- _delta_log
|   |-- 00000000000000000000.json
|   |-- 00000000000000000001.json
|   `-- 00000000000000000002.json
|-- part-00000-0e21921d-2ecb-41d7-80b3-6b7e982b13aa-c000.snappy.parquet
|-- part-00000-b953f8cb-ac9f-441f-b544-c40a0e329802-c000.snappy.parquet
|-- part-00000-eef7b120-c3ba-426a-afa3-56e3d3f03f7f-c000.snappy.parquet
|-- part-00001-0fa56342-4b55-4241-8c82-a76c2d1bcbd3-c000.snappy.parquet
`-- part-00001-fa0320b6-c11f-4d00-8c9d-aa0c2f1a2066-c000.snappy.parquet
{"commitInfo":{"timestamp":1556454039726,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"}}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"6f97245f-8e71-4042-aa37-b65136d22696","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1556454038600}}
{"add":{"path":"part-00000-b953f8cb-ac9f-441f-b544-c40a0e329802-c000.snappy.parquet","partitionValues":{},"size":396,"modificationTime":1556454039685,"dataChange":true}}
{"commitInfo":{"timestamp":1556454047961,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":0}}
{"add":{"path":"part-00000-0e21921d-2ecb-41d7-80b3-6b7e982b13aa-c000.snappy.parquet","partitionValues":{},"size":396,"modificationTime":1556454046506,"dataChange":true}}
{"add":{"path":"part-00001-fa0320b6-c11f-4d00-8c9d-aa0c2f1a2066-c000.snappy.parquet","partitionValues":{},"size":396,"modificationTime":1556454046509,"dataChange":true}}
{"remove":{"path":"part-00000-b953f8cb-ac9f-441f-b544-c40a0e329802-c000.snappy.parquet","deletionTimestamp":1556454047960,"dataChange":true}}

remove が追加されています。これは version:0 で追加した Parquet ファイルの削除を表しています。

{"commitInfo":{"timestamp":1556454057726,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":1}}
{"add":{"path":"part-00000-eef7b120-c3ba-426a-afa3-56e3d3f03f7f-c000.snappy.parquet","partitionValues":{},"size":396,"modificationTime":1556454056539,"dataChange":true}}
{"add":{"path":"part-00001-0fa56342-4b55-4241-8c82-a76c2d1bcbd3-c000.snappy.parquet","partitionValues":{},"size":400,"modificationTime":1556454056548,"dataChange":true}}
{"remove":{"path":"part-00000-0e21921d-2ecb-41d7-80b3-6b7e982b13aa-c000.snappy.parquet","deletionTimestamp":1556454057726,"dataChange":true}}
{"remove":{"path":"part-00001-fa0320b6-c11f-4d00-8c9d-aa0c2f1a2066-c000.snappy.parquet","deletionTimestamp":1556454057726,"dataChange":true}}

これも、version:1 で追加された Parquet ファイルの削除を表しています。

append の時と比較すると overwrite の場合は以前の version の Parquet ファイルを削除する記録が残るようになっています。しかし、remove で指定されている Parquet ファイルは実際には削除されておらず、データフォルダの中に残っています。この点が通常の overwrite の挙動と異なっています。

Checkpoint

ここまでの結果から、最新のデータを read するのに _delta_log 配下のすべての JSON ファイルを読む必要がありそうです。そうなると変更を重ねたデータを read する時に JSON ファイルの解析に時間がかかるのでは、という懸念があります。

Delta では checkpoint を追加することによりこの問題を回避できるようです。この checkpoint は、任意のバージョンの状態を Parquet ファイルに出力します。先程の overwrite のテストコードをベースに、 checkpoint を追加するコードを書いて動かしてみました。

 1  test("overwrite and checkpoint then read") {
 2    val tempDir = Utils.createTempDir()
 3    val writersLog = DeltaLog.forTable(spark, new Path(tempDir.toURI))
 4    Seq(1).toDF().write.format("delta").save(tempDir.toString)
 5    Seq(2, 3).toDF().write.format("delta").mode("overwrite").save(tempDir.toString)
 6
 7    def data: DataFrame = spark.read.format("delta").load(tempDir.toString)
 8    checkAnswer(data, Row(2) :: Row(3) :: Nil)
 9    
10    Seq(4, 5, 6).toDF().write.format("delta").mode("overwrite").save(tempDir.toString)
11    // Add checkpoint
12    writersLog.checkpoint()
13    checkAnswer(data.toDF(), Row(4) :: Row(5) :: Row(6) :: Nil)
14  }

12行目で checkpoint を記録しています。このコードを実行すると、以下のようなファイルが生成されます。

.
|-- _delta_log
|   |-- 00000000000000000000.json
|   |-- 00000000000000000001.json
|   |-- 00000000000000000002.checkpoint.parquet
|   |-- 00000000000000000002.json
|   `-- _last_checkpoint
|-- part-00000-0bc9cae4-22ae-4285-810d-50b1837fb343-c000.snappy.parquet
|-- part-00000-3f35a68e-cf56-4761-8228-996731a810d6-c000.snappy.parquet
|-- part-00000-6156403c-f8aa-4aa0-a01b-ba1b2763b267-c000.snappy.parquet
|-- part-00001-83503bcd-7cd3-4f21-9d94-ca70112cfa65-c000.snappy.parquet
`-- part-00001-9c744388-e8fb-4164-a8e3-a88022e1a25c-c000.snappy.parquet

_delta_log の配下に _last_checkpoint00000000000000000002.checkpoint.parquet が追加されていることが分かります。_last_checkpoint は以下のようになっています。

{"version":2,"size":7}

version には checkpoint を実行した時のバージョンが記録されています。size は恐らく 00000000000000000002.checkpoint.parquet のフィールドサイズだと思います。

00000000000000000002.checkpoint.parquet は Parquet で deprecated になっている INT96 が使われており、parquet-cli では内容を確認することができません。parquet-tools で dump してみました。出力結果が大きいので以下に貼り付けました。

https://gist.github.com/masayuki038/210370087784eb2203376c7453ccf524

この中の remove.path を見ると version:1 と version:0 のファイルが記録されていることから、この Parquet ファイルを参照することによって過去の JSON ファイルを参照することなく任意のバージョンの状態を取得することができるようになっているようです。

BINARY remove.path
--------------------------------------------------------------------------------
*** row group 1 of 1, values 1 to 7 ***
value 1: R:0 D:0 V:<null>
value 2: R:0 D:0 V:<null>
value 3: R:0 D:2 V:part-00001-83503bcd-7cd3-4f21-9d94-ca70112cfa65-c00 [more]...
value 4: R:0 D:2 V:part-00000-3f35a68e-cf56-4761-8228-996731a810d6-c00 [more]...
value 5: R:0 D:0 V:<null>
value 6: R:0 D:0 V:<null>
value 7: R:0 D:2 V:part-00000-6156403c-f8aa-4aa0-a01b-ba1b2763b267-c00 [more]...

Conclusion

Delta のトランザクションの仕組みを見てみました。このような仕組みは Apache Incubator Project の HudiIceberg も提供しているので、実現方式の比較をしてみたいところです。