Databricks から Delta Lake というモジュールがリリースされました。これは Databricks で提供している Delta という Transactional なストレージの一部を OSS として公開したようです。このモジュールは現在 Spark に対応しています。
この Delta のトランザクションの仕組みが気になったので、GitHub に公開されているコードを clone し、データを append / overwrite するテストコードを動かしてトランザクションがどのような仕組みで実現されているか調べてみました。
Setup
公開されたコードは GitHub にあります。Scala が動く環境であればテストコードを動かすことはできます。Windows の場合は winutils
が必要です。
package は org.apache.spark.sql.delta
になっています。
https://github.com/delta-io/delta
Transaction Protocol
Delta はバージョン番号を使ってトランザクションを管理しています。バージョン番号は、データを更新する度に1つ上がります。バージョン番号や更新内容は、データの保存先のフォルダの配下の _delta_log
というフォルダに JSON ファイルとして保存するようになっています。
https://github.com/delta-io/delta/blob/master/README.md#transaction-protocol
File Format
データのファイルフォーマットは Parquet が使われています。Parquet ファイルは特に拡張されているわけではないので、parquet-cli や parquet-tools で読み取ることができます。
Append
org.apache.spark.sql.delta.DeltaSuiteOSS に “ppend then read” というテストがあります。このテストケースは、データの追加を3つのトランザクションに分けて実行します。このテストを実行し、どのようなファイルが生成されるか確認してみました。
1 test("append then read") {
2 val tempDir = Utils.createTempDir()
3 Seq(1).toDF().write.format("delta").save(tempDir.toString)
4 Seq(2, 3).toDF().write.format("delta").mode("append").save(tempDir.toString)
5
6 def data: DataFrame = spark.read.format("delta").load(tempDir.toString)
7 checkAnswer(data, Row(1) :: Row(2) :: Row(3) :: Nil)
8
9 // append more
10 Seq(4, 5, 6).toDF().write.format("delta").mode("append").save(tempDir.toString)
11 checkAnswer(data.toDF(), Row(1) :: Row(2) :: Row(3) :: Row(4) :: Row(5) :: Row(6) :: Nil)
12 }
上記のテストコードを実行すると、以下のようなファイルが生成されます。
.
|-- _delta_log
| |-- 00000000000000000000.json
| |-- 00000000000000000001.json
| `-- 00000000000000000002.json
|-- part-00000-6217e923-3003-4d3d-9347-78fc9cd4a465-c000.snappy.parquet
|-- part-00000-bcc5a893-4c8e-46b4-9e37-adbb0e447ccc-c000.snappy.parquet
|-- part-00000-cafe2cc5-8dd0-4d2d-a362-e9e1efd46628-c000.snappy.parquet
|-- part-00001-4717e754-ac60-46fd-8e46-bff299965988-c000.snappy.parquet
`-- part-00001-e61a04d4-e92a-4117-b40e-b6d100af1ad8-c000.snappy.parquet
厳密には上記の1ファイル毎に .crc ファイルも生成されますが、ここでは省略しています。
トランザクションは save
の度に実行されます。前記のテストコードでは3回 save
しているので _delta_logs
に3つの JSON ファイルが生成されています。この json ファイルの番号が version になります。各ファイルを見ていきます。最初は Seq(1)
を save したトランザクションです。
- 00000000000000000000.json
{"commitInfo":{"timestamp":1556452442349,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"}}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"43568f12-07f7-4c59-a870-54f3ef1c3c70","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1556452390011}}
{"add":{"path":"part-00000-6217e923-3003-4d3d-9347-78fc9cd4a465-c000.snappy.parquet","partitionValues":{},"size":396,"modificationTime":1556452442321,"dataChange":true}}
commitInfo
はコミット時のタイムスタンプや操作が記録されています。protocol
には read/write それぞれの min version が、metaData
はスキーマ定義が記録されています。 protocol
と metaData
は version:0 の JSON ファイルにしか出力されません。add
はこのトランザクションで追加された Parquet ファイル です。この Parquet ファイルは以下のような値が格納されています。
$ java -cp target/classes;target/dependency/* org.apache.parquet.cli.Main cat D:\tmp\delta\append\part-00000-6217e923-3003-4d3d-9347-78fc9cd4a465-c000.snappy.parquet
{"value": 1}
次に Seq(2,3)
を保存したトランザクションの JSON ファイルを確認します。
- 00000000000000000001.json
{"commitInfo":{"timestamp":1556452453918,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0}}
{"add":{"path":"part-00000-cafe2cc5-8dd0-4d2d-a362-e9e1efd46628-c000.snappy.parquet","partitionValues":{},"size":396,"modificationTime":1556452453912,"dataChange":true}}
{"add":{"path":"part-00001-e61a04d4-e92a-4117-b40e-b6d100af1ad8-c000.snappy.parquet","partitionValues":{},"size":396,"modificationTime":1556452453899,"dataChange":true}}
commitInfo
の mode
が Append
になっています。また、add
が2つ記録されています。これらのファイルを見てみます。
$ java -cp target/classes;target/dependency/* org.apache.parquet.cli.Main cat D:\tmp\delta\append\part-00000-cafe2cc5-8dd0-4d2d-a362-e9e1efd46628-c000.snappy.parquet
{"value": 2}
$ java -cp target/classes;target/dependency/* org.apache.parquet.cli.Main cat D:\tmp\delta\append\part-00001-e61a04d4-e92a-4117-b40e-b6d100af1ad8-c000.snappy.parquet
{"value": 3}
そして Seq(4,5,6)
を保存したトランザクションの JSON ファイルと Parquet ファイルは以下のとおりです。
- 00000000000000000002.json
{"commitInfo":{"timestamp":1556452472683,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":1}}
{"add":{"path":"part-00000-bcc5a893-4c8e-46b4-9e37-adbb0e447ccc-c000.snappy.parquet","partitionValues":{},"size":396,"modificationTime":1556452472671,"dataChange":true}}
{"add":{"path":"part-00001-4717e754-ac60-46fd-8e46-bff299965988-c000.snappy.parquet","partitionValues":{},"size":400,"modificationTime":1556452472679,"dataChange":true}}
$ java -cp target/classes;target/dependency/* org.apache.parquet.cli.Main cat D:\tmp\delta\append\part-00000-bcc5a893-4c8e-46b4-9e37-adbb0e447ccc-c000.snappy.parquet
{"value": 4}
$ java -cp target/classes;target/dependency/* org.apache.parquet.cli.Main cat D:\tmp\delta\append\part-00001-4717e754-ac60-46fd-8e46-bff299965988-c000.snappy.parquet
{"value": 5}
{"value": 6}
このように _dalta_log
配下の JSON ファイルで version 毎にどの Parquet ファイルが追加されたかを記録しています。
Overwrite
GitHub にアップされているコードには overwrite
のテストケースを見つけられなかったので、以下のコードを追加してみました。期待値も overwrite
の期待値に変更しています。
1 test("overwrite then read") {
2 val tempDir = Utils.createTempDir()
3 Seq(1).toDF().write.format("delta").save(tempDir.toString)
4 Seq(2, 3).toDF().write.format("delta").mode("overwrite").save(tempDir.toString)
5
6 def data: DataFrame = spark.read.format("delta").load(tempDir.toString)
7 checkAnswer(data, Row(2) :: Row(3) :: Nil)
8
9 Seq(4, 5, 6).toDF().write.format("delta").mode("overwrite").save(tempDir.toString)
10 checkAnswer(data.toDF(), Row(4) :: Row(5) :: Row(6) :: Nil)
11 }
それぞれ JSON ファイルを見ていきます。Parquet ファイルの内容は append
の時と同様なので割愛します。
.
|-- _delta_log
| |-- 00000000000000000000.json
| |-- 00000000000000000001.json
| `-- 00000000000000000002.json
|-- part-00000-0e21921d-2ecb-41d7-80b3-6b7e982b13aa-c000.snappy.parquet
|-- part-00000-b953f8cb-ac9f-441f-b544-c40a0e329802-c000.snappy.parquet
|-- part-00000-eef7b120-c3ba-426a-afa3-56e3d3f03f7f-c000.snappy.parquet
|-- part-00001-0fa56342-4b55-4241-8c82-a76c2d1bcbd3-c000.snappy.parquet
`-- part-00001-fa0320b6-c11f-4d00-8c9d-aa0c2f1a2066-c000.snappy.parquet
- 00000000000000000000.json
{"commitInfo":{"timestamp":1556454039726,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"}}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"6f97245f-8e71-4042-aa37-b65136d22696","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1556454038600}}
{"add":{"path":"part-00000-b953f8cb-ac9f-441f-b544-c40a0e329802-c000.snappy.parquet","partitionValues":{},"size":396,"modificationTime":1556454039685,"dataChange":true}}
- 00000000000000000001.json
{"commitInfo":{"timestamp":1556454047961,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":0}}
{"add":{"path":"part-00000-0e21921d-2ecb-41d7-80b3-6b7e982b13aa-c000.snappy.parquet","partitionValues":{},"size":396,"modificationTime":1556454046506,"dataChange":true}}
{"add":{"path":"part-00001-fa0320b6-c11f-4d00-8c9d-aa0c2f1a2066-c000.snappy.parquet","partitionValues":{},"size":396,"modificationTime":1556454046509,"dataChange":true}}
{"remove":{"path":"part-00000-b953f8cb-ac9f-441f-b544-c40a0e329802-c000.snappy.parquet","deletionTimestamp":1556454047960,"dataChange":true}}
remove
が追加されています。これは version:0 で追加した Parquet ファイルの削除を表しています。
- 00000000000000000002.json
{"commitInfo":{"timestamp":1556454057726,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":1}}
{"add":{"path":"part-00000-eef7b120-c3ba-426a-afa3-56e3d3f03f7f-c000.snappy.parquet","partitionValues":{},"size":396,"modificationTime":1556454056539,"dataChange":true}}
{"add":{"path":"part-00001-0fa56342-4b55-4241-8c82-a76c2d1bcbd3-c000.snappy.parquet","partitionValues":{},"size":400,"modificationTime":1556454056548,"dataChange":true}}
{"remove":{"path":"part-00000-0e21921d-2ecb-41d7-80b3-6b7e982b13aa-c000.snappy.parquet","deletionTimestamp":1556454057726,"dataChange":true}}
{"remove":{"path":"part-00001-fa0320b6-c11f-4d00-8c9d-aa0c2f1a2066-c000.snappy.parquet","deletionTimestamp":1556454057726,"dataChange":true}}
これも、version:1 で追加された Parquet ファイルの削除を表しています。
append
の時と比較すると overwrite
の場合は以前の version の Parquet ファイルを削除する記録が残るようになっています。しかし、remove
で指定されている Parquet ファイルは実際には削除されておらず、データフォルダの中に残っています。この点が通常の overwrite
の挙動と異なっています。
Checkpoint
ここまでの結果から、最新のデータを read するのに _delta_log
配下のすべての JSON ファイルを読む必要がありそうです。そうなると変更を重ねたデータを read する時に JSON ファイルの解析に時間がかかるのでは、という懸念があります。
Delta では checkpoint
を追加することによりこの問題を回避できるようです。この checkpoint
は、任意のバージョンの状態を Parquet ファイルに出力します。先程の overwrite
のテストコードをベースに、 checkpoint
を追加するコードを書いて動かしてみました。
1 test("overwrite and checkpoint then read") {
2 val tempDir = Utils.createTempDir()
3 val writersLog = DeltaLog.forTable(spark, new Path(tempDir.toURI))
4 Seq(1).toDF().write.format("delta").save(tempDir.toString)
5 Seq(2, 3).toDF().write.format("delta").mode("overwrite").save(tempDir.toString)
6
7 def data: DataFrame = spark.read.format("delta").load(tempDir.toString)
8 checkAnswer(data, Row(2) :: Row(3) :: Nil)
9
10 Seq(4, 5, 6).toDF().write.format("delta").mode("overwrite").save(tempDir.toString)
11 // Add checkpoint
12 writersLog.checkpoint()
13 checkAnswer(data.toDF(), Row(4) :: Row(5) :: Row(6) :: Nil)
14 }
12行目で checkpoint
を記録しています。このコードを実行すると、以下のようなファイルが生成されます。
.
|-- _delta_log
| |-- 00000000000000000000.json
| |-- 00000000000000000001.json
| |-- 00000000000000000002.checkpoint.parquet
| |-- 00000000000000000002.json
| `-- _last_checkpoint
|-- part-00000-0bc9cae4-22ae-4285-810d-50b1837fb343-c000.snappy.parquet
|-- part-00000-3f35a68e-cf56-4761-8228-996731a810d6-c000.snappy.parquet
|-- part-00000-6156403c-f8aa-4aa0-a01b-ba1b2763b267-c000.snappy.parquet
|-- part-00001-83503bcd-7cd3-4f21-9d94-ca70112cfa65-c000.snappy.parquet
`-- part-00001-9c744388-e8fb-4164-a8e3-a88022e1a25c-c000.snappy.parquet
_delta_log
の配下に _last_checkpoint
と 00000000000000000002.checkpoint.parquet
が追加されていることが分かります。_last_checkpoint
は以下のようになっています。
- _last_checkpoint
{"version":2,"size":7}
version
には checkpoint
を実行した時のバージョンが記録されています。size
は恐らく 00000000000000000002.checkpoint.parquet
のフィールドサイズだと思います。
00000000000000000002.checkpoint.parquet
は Parquet で deprecated になっている INT96
が使われており、parquet-cli では内容を確認することができません。parquet-tools で dump してみました。出力結果が大きいので以下に貼り付けました。
https://gist.github.com/masayuki038/210370087784eb2203376c7453ccf524
この中の remove.path
を見ると version:1 と version:0 のファイルが記録されていることから、この Parquet ファイルを参照することによって過去の JSON ファイルを参照することなく任意のバージョンの状態を取得することができるようになっているようです。
BINARY remove.path
--------------------------------------------------------------------------------
*** row group 1 of 1, values 1 to 7 ***
value 1: R:0 D:0 V:<null>
value 2: R:0 D:0 V:<null>
value 3: R:0 D:2 V:part-00001-83503bcd-7cd3-4f21-9d94-ca70112cfa65-c00 [more]...
value 4: R:0 D:2 V:part-00000-3f35a68e-cf56-4761-8228-996731a810d6-c00 [more]...
value 5: R:0 D:0 V:<null>
value 6: R:0 D:0 V:<null>
value 7: R:0 D:2 V:part-00000-6156403c-f8aa-4aa0-a01b-ba1b2763b267-c00 [more]...
Conclusion
Delta のトランザクションの仕組みを見てみました。このような仕組みは Apache Incubator Project の Hudi や Iceberg も提供しているので、実現方式の比較をしてみたいところです。