将 Delta Lake 集群与 Spark 结合使用 - Amazon EMR
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

将 Delta Lake 集群与 Spark 结合使用

从 Amazon EMR 版本 6.9.0 开始,您可以将 Delta Lake 与 Spark 集群结合使用,无需引导操作。对于 Amazon EMR 6.8.0 及更早版本,您可以使用引导操作来预安装需要的依赖项。

以下示例使用 Amazon CLI 在 Amazon EMR Spark 集群上使用 Delta Lake。

要在 Amazon EMR 上将 Delta Lake 与 Amazon Command Line Interface 结合使用,请首先创建集群。有关使用 Amazon Command Line Interface 指定 Delta Lake 分类的信息,请参阅 Supply a configuration using the Amazon Command Line Interface when you create a clusterSupply a configuration with the Java SDK when you create a cluster

  1. 创建文件 configurations.json 并输入以下内容:

    [{"Classification":"delta-defaults", "Properties":{"delta.enabled":"true"} }]
  2. 使用以下配置创建集群,将示例 Amazon S3 bucket pathsubnet ID 替换为您自己的值。

    aws emr create-cluster --release-label emr-6.9.0 --applications Name=Spark --configurations file://delta_configurations.json --region us-east-1 --name My_Spark_Delta_Cluster --log-uri s3://amzn-s3-demo-bucket/ --instance-type m5.xlarge --instance-count 2 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0

    或者,您可以创建一个 Amazon EMR 集群和 Spark 应用程序,并在 Spark 作业中使用以下文件作为 JAR 依赖项:

    /usr/share/aws/delta/lib/delta-core.jar, /usr/share/aws/delta/lib/delta-storage.jar, /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar
    注意

    如果您使用 Amazon EMR 7.0.0 或更高版本,请使用 /usr/share/aws/delta/lib/delta-spark.jar 而不是 /usr/share/aws/delta/lib/delta-core.jar

    有关更多信息,请参阅提交应用程序

    要将 jar 依赖项包含在 Spark 任务中,您可以将以下配置属性添加到 Spark 应用程序中:

    --conf “spark.jars=/usr/share/aws/delta/lib/delta-core.jar, /usr/share/aws/delta/lib/delta-storage.jar, /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar"

    有关 Spark 任务依赖项的更多信息,请参阅 Dependency Management(依赖项管理)。

    如果您使用 Amazon EMR 7.0.0 或更高版本,请改为添加 /usr/share/aws/delta/lib/delta-spark.jar 配置。

    --conf “spark.jars=/usr/share/aws/delta/lib/delta-spark.jar, /usr/share/aws/delta/lib/delta-storage.jar, /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar"

初始化 Delta Lake 的 Spark 会话

以下示例演示如何启动交互式 Spark Shell、使用 Spark 提交,或如何使用 Amazon EMR Notebooks 在 Amazon EMR 上使用 Delta Lake。

spark-shell
  1. 使用 SSH 连接到主节点。有关更多信息,请参阅《Amazon EMR 管理指南》中的使用 SSH 连接到主节点

  2. 输入以下命令以启动 Spark shell。要使用 PySpark Shell,请使用 pyspark 替换 spark-shell

    spark-shell \ --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

    如果您运行 Amazon EMR 6.15.0 或更高版本,则还必须使用以下配置,将基于 Lake Formation 的精细访问控制与 Delta Lake 结合使用。

    spark-shell \ --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension \ --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \ --conf spark.sql.catalog.spark_catalog.lf.managed=true
spark-submit
  1. 使用 SSH 连接到主节点。有关更多信息,请参阅《Amazon EMR 管理指南》中的使用 SSH 连接到主节点

  2. 输入以下命令以启动 Delta Lake 的 Spark 会话。

    spark-submit —conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" —conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

    如果您运行 Amazon EMR 6.15.0 或更高版本,则还必须使用以下配置,将基于 Lake Formation 的精细访问控制与 Delta Lake 结合使用。

    spark-submit \ ` --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \ --conf spark.sql.catalog.spark_catalog.lf.managed=true
EMR Studio notebooks

要使用 Amazon EMR Studio Notebooks 初始化 Spark 会话,请使用 Amazon EMR Notebook 中的 %%configure 魔术命令配置 Spark 会话,如下例所示。有关更多信息,请参阅 Amazon EMR 管理指南中的使用 EMR Notebooks 魔法命令

%%configure -f { "conf": { "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog" } }

如果您运行 Amazon EMR 6.15.0 或更高版本,则还必须使用以下配置,将基于 Lake Formation 的精细访问控制与 Delta Lake 结合使用。

%%configure -f { "conf": { "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog", "spark.sql.catalog.spark_catalog.lf.managed": "true" } }

写入 Delta Lake 表

以下示例演示如何创建 DataFrame 并将其作为 Delta Lake 数据集写入。此示例演示如何使用 Spark Shell 处理数据集,同时使用 SSH 作为默认 hadoop 用户连接到主节点。

注意

要将代码示例粘贴到 Spark Shell 中,请在提示符处键入 :paste,粘贴示例,然后按 CTRL + D

PySpark

Spark 包含基于 Python 的 Shell pyspark,您可以用它来设计以 Python 编写的 Spark 程序的原型。就像使用 spark-shell 一样,在主节点上调用 pyspark

## Create a DataFrame data = spark.createDataFrame([("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")], ["id", "creation_date", "last_update_time"]) ## Write a DataFrame as a Delta Lake dataset to the S3 location spark.sql("""CREATE TABLE IF NOT EXISTS delta_table (id string, creation_date string, last_update_time string) USING delta location 's3://amzn-s3-demo-bucket/example-prefix/db/delta_table'"""); data.writeTo("delta_table").append()
Scala
import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ // Create a DataFrame val data = Seq(("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")).toDF("id", "creation_date", "last_update_time") // Write a DataFrame as a Delta Lake dataset to the S3 location spark.sql("""CREATE TABLE IF NOT EXISTS delta_table (id string, creation_date string, last_update_time string) USING delta location 's3://amzn-s3-demo-bucket/example-prefix/db/delta_table'"""); data.write.format("delta").mode("append").saveAsTable("delta_table")
SQL
-- Create a Delta Lake table with the S3 location CREATE TABLE delta_table(id string, creation_date string, last_update_time string) USING delta LOCATION 's3://amzn-s3-demo-bucket/example-prefix/db/delta_table'; -- insert data into the table INSERT INTO delta_table VALUES ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z");

从 Delta Lake 表中读取

PySpark
ddf = spark.table("delta_table") ddf.show()
Scala
val ddf = spark.table("delta_table") ddf.show()
SQL
SELECT * FROM delta_table;