Setup Spark and and delta Lake

basic
Delta
DuckDB
Code-Snippet
Author

F.L

Published

January 1, 2024

Basic Tutorial

image

Code

# set up a project
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()
# create a table (run once)
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")
# run again creates following error: 
#AnalysisException: [DELTA_PATH_EXISTS] Cannot write to already existent path file
# Read file from path
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()
+---+
| id|
+---+
|  2|
|  3|
|  4|
|  0|
|  1|
+---+
# upate table data
data = spark.range(5, 10)
# brute force update
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")
# conidtional update
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")

# Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })

# Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))

# Upsert (merge) new data
newData = spark.range(0, 20)

deltaTable.alias("oldData") \
  .merge(
    newData.alias("newData"),
    "oldData.id = newData.id") \
  .whenMatchedUpdate(set = { "id": col("newData.id") }) \
  .whenNotMatchedInsert(values = { "id": col("newData.id") }) \
  .execute()

deltaTable.toDF().show()
+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+
# time travel
df = spark.read.format("delta") \
  .option("versionAsOf", 0) \
  .load("/tmp/delta-table")

df.show()
+---+
| id|
+---+
|  2|
|  3|
|  4|
|  0|
|  1|
+---+
# perform a live streaming
streamingDf = spark.readStream.format("rate").load()

stream = streamingDf \
  .selectExpr("value as id") \
  .writeStream.format("delta") \
  .option("checkpointLocation", "/tmp/checkpoint") \
  .start("/tmp/delta-table")
23/12/28 19:08:19 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/12/28 19:08:27 ERROR NonFateSharingFuture: Failed to get result from future
scala.runtime.NonLocalReturnControl
23/12/28 19:08:37 ERROR NonFateSharingFuture: Failed to get result from future
scala.runtime.NonLocalReturnControl
23/12/28 19:08:47 ERROR NonFateSharingFuture: Failed to get result from future
scala.runtime.NonLocalReturnControl
23/12/28 19:08:57 ERROR NonFateSharingFuture: Failed to get result from future
scala.runtime.NonLocalReturnControl
23/12/28 19:09:07 ERROR NonFateSharingFuture: Failed to get result from future
scala.runtime.NonLocalReturnControl
stream.stop()

Build-in data?

spark.read.json("logs.json")