# set up a project
import pyspark
from delta import *
builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()Basic Tutorial

Code
# create a table (run once)
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")
# run again creates following error:
#AnalysisException: [DELTA_PATH_EXISTS] Cannot write to already existent path file# Read file from path
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()+---+
| id|
+---+
| 2|
| 3|
| 4|
| 0|
| 1|
+---+
# upate table data
data = spark.range(5, 10)
# brute force update
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")# conidtional update
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")
# Update every even value by adding 100 to it
deltaTable.update(
condition = expr("id % 2 == 0"),
set = { "id": expr("id + 100") })
# Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))
# Upsert (merge) new data
newData = spark.range(0, 20)
deltaTable.alias("oldData") \
.merge(
newData.alias("newData"),
"oldData.id = newData.id") \
.whenMatchedUpdate(set = { "id": col("newData.id") }) \
.whenNotMatchedInsert(values = { "id": col("newData.id") }) \
.execute()
deltaTable.toDF().show()+---+
| id|
+---+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+
# time travel
df = spark.read.format("delta") \
.option("versionAsOf", 0) \
.load("/tmp/delta-table")
df.show()+---+
| id|
+---+
| 2|
| 3|
| 4|
| 0|
| 1|
+---+
# perform a live streaming
streamingDf = spark.readStream.format("rate").load()
stream = streamingDf \
.selectExpr("value as id") \
.writeStream.format("delta") \
.option("checkpointLocation", "/tmp/checkpoint") \
.start("/tmp/delta-table")23/12/28 19:08:19 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/12/28 19:08:27 ERROR NonFateSharingFuture: Failed to get result from future
scala.runtime.NonLocalReturnControl
23/12/28 19:08:37 ERROR NonFateSharingFuture: Failed to get result from future
scala.runtime.NonLocalReturnControl
23/12/28 19:08:47 ERROR NonFateSharingFuture: Failed to get result from future
scala.runtime.NonLocalReturnControl
23/12/28 19:08:57 ERROR NonFateSharingFuture: Failed to get result from future
scala.runtime.NonLocalReturnControl
23/12/28 19:09:07 ERROR NonFateSharingFuture: Failed to get result from future
scala.runtime.NonLocalReturnControl
stream.stop()Build-in data?
spark.read.json("logs.json")