์ค์๊ฐ ๋ถ์ ํ์ดํ๋ผ์ธ ๊ตฌ์ถ - 04. Delta Lake ์๊ฐ ๋ฐ ํ๊ฒฝ ์ค์
์ด๋ฒ ๊ธ์์๋ ์ค์๊ฐ ๋ถ์ ํ์ดํ๋ผ์ธ์ ํต์ฌ ๊ธฐ์ ์ค ํ๋์ธ Delta Lake์ ๋ํด ์๊ฐํ๊ณ , Docker Compose ๊ธฐ๋ฐ์ ํ๊ฒฝ ์ค์ ๋ฐ PySpark๋ฅผ ํ์ฉํ ์ฃผ์ ๊ธฐ๋ฅ์ ์ค์ตํ ๋ด์ฉ์ ์ ๋ฆฌํฉ๋๋ค. ๋ํ, Delta Lake์ ์ฃผ์ ํน์ง๊ณผ Iceberg์์ ์ฐจ์ด์ ์ ๋น๊ตํด๋ณด๊ฒ ์ต๋๋ค.
์ด์ docker compose ๊ธฐ๋ฐ delta-lake ์ค์น - https://jongwho.tistory.com/37
์ค์๊ฐ ๋ถ์ ํ์ดํ๋ผ์ธ ๊ตฌ์ถ - 01. Kafka, Iceberg ์ค์น
RedPanda, Iceberg๋ฅผ Docker Compose๋ก ๊ตฌ์ฑํด์ ์ค์๊ฐ ํ์ดํ๋ผ์ธ ๊ธฐ์ด ๊ตฌ์ฑ ์ด๋ฒ ๊ธ์์๋ Redpanda์ Iceberg, Minio๋ฅผ ๊ตฌ์ฑํด์ ์ค์๊ฐ ๋ฐ์ดํฐ๋ ์ดํฌ ํ๊ฒฝ์ ๊ตฌ์ฑํฉ๋๋ค. ๐ ๋ชฉํRedpanda ์ค์นApache Iceberg ์ค
jongwho.tistory.com
โ Delta Lake๋?
Delta Lake๋ Apache Spark ๊ธฐ๋ฐ์ ACID ํธ๋์ญ์ ์ ์ง์ํ๋ ์ ์ฅ์ ๊ณ์ธต(Storage Layer) ์ผ๋ก, ๋ฐ์ดํฐ ๋ ์ดํฌ(Data Lake)์ ์ ๋ขฐ์ฑ๊ณผ ์ฑ๋ฅ์ ํฌ๊ฒ ํฅ์์์ผ ์ค๋๋ค. ๊ธฐ์กด์ HDFS๋ S3์ ๊ฐ์ ๊ฐ์ฒด ์ ์ฅ์ ์์ ๊ตฌ์ถ๋์ด, Append-Only ํน์ฑ์ ๊ฐ์ง ๋ฐ์ดํฐ ์ ์ฅ ๋ฐฉ์์์ ๋ฐ์ํ๋ ๋ฌธ์ ์ (์: ๋ฐ์ดํฐ ์ ํฉ์ฑ, ๋์์ฑ ์ด์ ๋ฑ)์ ๋ณด์ํฉ๋๋ค.
๐ง ์ค์น ๋ฐ ์คํ
๊ธฐ์กด ๊ธ์์ ์ค์นํ docker compose์ delta-lake๊ฐ ์ค์น๋์ด ์์ต๋๋ค. Iceberg๋ ๊ฐ์ด ์ค์น๋์ด ์์ด Delta-Lake ๋ ธํธ๋ถ ์คํ ์ ํฌํธ์ ํผ๋์ด ์ฌ ์ ์์ต๋๋ค.
docker-compose up -d
๊ตฌ์ฑ ์์:
- Spark (with Delta Lake)
- Jupyter Notebook
Delta Lake์ Quick Start ์์ ๋ ธํธ๋ถ์ ํตํด Delta ํ ์ด๋ธ ์์ฑ ๋ฐ ์ฟผ๋ฆฌ๋ฅผ ์ค์ตํ ์ ์์ต๋๋ค. - ์ ๊ทผ ๋ฐฉ๋ฒ: delta_quickstart ๋ด jupyter notebook ์ฃผ์์์ ํฌํธ๋ฅผ 8088๋ก ๋ณ๊ฒฝ
- ์: http://127.0.0.1:8088/lab?token=b9a9d31ea453fd8e224eeba50c42053bff12391ea1531aba
๐งช Delta Lake ๊ธฐ๋ณธ ๋ด์ฉ ์๊ฐ
Delta Lake์ ๊ธฐ๋ณธ ์ฌ์ฉ๋ฒ์ Jupyter Notebook์์ ๋ค์๊ณผ ๊ฐ์ด ์ค์ตํ์์ต๋๋ค.
1. Delta ํ ์ด๋ธ ์์ฑ
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("DeltaLakeDemo") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")
2. Delta ํ ์ด๋ธ ์กฐํ
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()
3. Append ๋ฐ Update
# Append
new_data = spark.range(5, 10)
new_data.write.format("delta").mode("append").save("/tmp/delta-table")
# Update
from delta.tables import DeltaTable
deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")
deltaTable.update(
condition="id % 2 == 0",
set={"id": "id + 100"}
)
4. Time Travel
# ์ ์ฒด ํ์คํ ๋ฆฌ ํ
์ด๋ธ ํ์ธ
delta_table_history = (DeltaTable
.forPath(spark, "/tmp/delta-table")
.history()
)
(delta_table_history
.select("version", "timestamp", "operation", "operationParameters", "operationMetrics", "engineInfo")
.show()
)
# ์ด์ ๋ฒ์ ์ผ๋ก ํ
์ด๋ธ ๋ณ๊ฒฝ(๋ฒ์ :0) ๋ฐ ์กฐํ
df = (spark
.read
.format("delta")
.option("versionAsOf", 0) # we pass an option `versionAsOf` with the required version number we are interested in
.load("/tmp/delta-table")
.orderBy("id")
)
df.show()
๐ก Delta Lake ์ฃผ์ ๊ธฐ๋ฅ ์์ฝ
ACID ํธ๋์ญ์ | ๋ฉํฐ ์ฐ๋ ๋ ํ๊ฒฝ์์๋ ๋ฐ์ดํฐ ์ ํฉ์ฑ ์ ์ง |
Schema Enforcement & Evolution | ์คํค๋ง ๊ณ ์ ๋ฐ ๋ณ๊ฒฝ ์ง์ |
Time Travel | ํน์ ์์ ์ ๋ฐ์ดํฐ ์ค๋ ์ท ์กฐํ ๊ฐ๋ฅ |
Merge (UPSERT) | ๊ธฐ์กด ํ ์ด๋ธ์ ์กฐ๊ฑด ๊ธฐ๋ฐ ์ฝ์ ๋ฐ ๊ฐฑ์ ์ง์ |
๋ฐ์ดํฐ ์์ถ ๋ฐ ํด๋ฌ์คํฐ๋ง (Z-Ordering) | ์ฟผ๋ฆฌ ์ฑ๋ฅ ์ต์ ํ |
๐Delta Lake์ Iceberg์ ์ฃผ์ ์ฐจ์ด
Delta Lake | Apache Icecberg | |
์์ | Databricks ์ฃผ๋, ์คํ์์ค (Linux Foundation) | Apache Software Foundation, ๋ฒค๋ ์ค๋ฆฝ |
ํํฐ์ ๋ | ์ฌ์ฉ์๊ฐ ๋ช ์์ ์ผ๋ก ํํฐ์ ์ ์ ํ์ | ์จ๊ฒจ์ง ํํฐ์ ๋์ผ๋ก ์๋ ์ต์ ํ, ์ฌ์ฉ์ ๋ถ๋ด ๊ฐ์ |
๋ฉํ๋ฐ์ดํฐ ๊ด๋ฆฌ | JSON ๊ธฐ๋ฐ ํธ๋์ญ์ ๋ก๊ทธ, ๋๊ท๋ชจ ๋ฐ์ดํฐ์์ ๋ฉํ๋ฐ์ดํฐ ๋ถํ ๊ฐ๋ฅ | Parquet ๊ธฐ๋ฐ ๋ฉํ๋ฐ์ดํฐ, ๋๊ท๋ชจ ๋ฐ์ดํฐ์ ํํฐ์ ์ ๋ ํจ์จ์ |
์คํค๋ง ์งํ | ์คํค๋ง ๋ณ๊ฒฝ ์ง์, MergeSchema ์ต์ ์ผ๋ก ์ ์ฐ์ฑ ์ ๊ณต | ๊ณ ๊ธ ์คํค๋ง ์งํ(์ปฌ๋ผ ์์ ๋ณ๊ฒฝ, ํ์ ๋ณ๊ฒฝ ๋ฑ) ์ง์ |
์ฑ๋ฅ ์ต์ ํ | Z-Order ์ธ๋ฑ์ฑ, ๋ฐ์ดํฐ ์คํคํ, ์ปดํฉ์ | ์ค๋ ์ท ๊ธฐ๋ฐ ์ฟผ๋ฆฌ ์ต์ ํ, ๋ฉํ๋ฐ์ดํฐ ์ธ๋ฑ์ฑ |
์์ง ํตํฉ | Spark์ ์ต์ ํ, ๋ค๋ฅธ ์์ง๋ ์ง์ | Spark, Flink, Trino ๋ฑ ๋ค์ํ ์์ง์ ๊ท ๋ฑํ ์ง์ |
UPSERT ์ฑ๋ฅ | Merge ๋ช ๋ น์ผ๋ก ํจ์จ์ , Databricks ํ๊ฒฝ์์ ์ต์ ํ | Merge ์ฑ๋ฅ์ ๊ฐ์ ์ค, ์์ง์ ๋ฐ๋ผ ์ฑ๋ฅ ์ฐจ์ด ์กด์ฌ |
์ปค๋ฎค๋ํฐ/์ํ๊ณ | Databricks ์ค์ฌ, ํด๋ผ์ฐ๋ ํ๊ฒฝ์ ๊ฐ์ | Apache ์ปค๋ฎค๋ํฐ ์ค์ฌ, ๋ฒค๋ ์ค๋ฆฝ์ ์ด๊ณ ์ ์ฐํ ํตํฉ |
๐ ์ ๋ฆฌ:
- Delta Lake๋ Spark ์ค์ฌ์ ํ๊ฒฝ์ ์ต์ ํ๋์ด ์์ผ๋ฉฐ, ์ค์๊ฐ ์คํธ๋ฆฌ๋ฐ ๋ฐ ๋ฐฐ์น ํตํฉ์ ๊ฐ์ ์ด ์์ต๋๋ค.
- Iceberg๋ ๋ค์ํ ์์ง๊ณผ์ ํตํฉ ๋ฐ ๋ฉํ๋ฐ์ดํฐ ๊ด๋ฆฌ ์ธก๋ฉด์์ ๋ ๋ฐ์ ๋ ๊ธฐ๋ฅ์ ์ ๊ณตํฉ๋๋ค.
โ ์ธ์ ๋ฌด์์ ์ ํํ ๊น?
- Delta Lake:
- Databricks ํ๊ฒฝ์์ ๋ฐ์ดํฐ ๋ ์ดํฌ๋ฅผ ์ด์ํ๊ฑฐ๋ Spark ์ค์ฌ ์ํฌํ๋ก์ฐ๋ฅผ ์ฌ์ฉํ ๋ ์ ํฉ.
- ๊ฐ๋จํ ์ค์ ๊ณผ ๋น ๋ฅธ UPSERT ์์ ์ด ํ์ํ ๊ฒฝ์ฐ.
- ํด๋ผ์ฐ๋ ๊ธฐ๋ฐ ๋ฐ์ดํฐ ๋ ์ดํฌ์์ ํตํฉ ๊ด๋ฆฌ์ ์ต์ ํ๋ ์ฑ๋ฅ์ ์ํ ๋.
- Apache Iceberg:
- ๋๊ท๋ชจ ๋ฐ์ดํฐ์ (์์ญ์ต ํ ์ด์)๊ณผ ๋ณต์กํ ํํฐ์ ๋ ์๊ตฌ์ฌํญ์ด ์๋ ๊ฒฝ์ฐ.
- ๋ค์ํ ์ฟผ๋ฆฌ ์์ง(Flink, Trino ๋ฑ)์ ํผํฉํด ์ฌ์ฉํ๊ฑฐ๋ ๋ฒค๋ ์ข ์์ฑ์ ํผํ๊ณ ์ถ์ ๋.
- ๋ฉํ๋ฐ์ดํฐ ๊ด๋ฆฌ์ ์คํค๋ง ์งํ์ ์ ์ฐ์ฑ์ด ์ค์ํ ๊ฒฝ์ฐ.
๐ ๋ค์ ๊ธ: Structed Streaming & Delta Lake
์ด๋ฒ ๊ธ์์๋ Delta Lake์ ๊ธฐ๋ณธ ๊ฐ๋ ๊ณผ ํ๊ฒฝ ์ค์ , ๊ทธ๋ฆฌ๊ณ PySpark ๊ธฐ๋ฐ์ ์ฃผ์ ๊ธฐ๋ฅ์ ์ค์ตํด ๋ณด์์ต๋๋ค. ๋ค์ ๊ธ์์๋ Structured Streaming๊ณผ Delta Lake๋ฅผ ๊ฒฐํฉํ ์ค์๊ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ํ์ดํ๋ผ์ธ ๊ตฌ์ฑ์ ๋ค๋ฃจ๊ฒ ์ต๋๋ค.