将 Delta Lake 集群与 Spark 结合使用 - Amazon EMR
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将 Delta Lake 集群与 Spark 结合使用

从 Amazon EMR 版本 6.9.0 开始,您可以将 Delta Lake 与 Spark 集群结合使用,无需引导操作。对于 Amazon EMR 6.8.0 及更早版本,您可以使用引导操作来预安装需要的依赖项。

以下示例使用在 Amazon EMR Spark 集群上使用 Delta Lake。 Amazon CLI

要将 Amazon EMR 上的 Delta Lake 与配合使用 Amazon Command Line Interface,请先创建一个集群。有关如何使用指定 Delta Lake 分类的信息 Amazon Command Line Interface,请参阅在创建集群 Amazon Command Line Interface 时使用提供配置或在创建集群时使用 Java SDK 提供配置。

  1. 创建文件 configurations.json 并输入以下内容:

    [{"Classification":"delta-defaults", "Properties":{"delta.enabled":"true"} }]
  2. 使用以下配置创建集群,将示例 Amazon S3 bucket pathsubnet ID 替换为您自己的值。

    aws emr create-cluster --release-label emr-6.9.0 --applications Name=Spark --configurations file://delta_configurations.json --region us-east-1 --name My_Spark_Delta_Cluster --log-uri s3://DOC-EXAMPLE-BUCKET/ --instance-type m5.xlarge --instance-count 2 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0

    或者,您可以创建一个 Amazon EMR 集群和 Spark 应用程序,并在 Spark 作业中使用以下文件作为 JAR 依赖项:

    /usr/share/aws/delta/lib/delta-core.jar, /usr/share/aws/delta/lib/delta-storage.jar, /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar
    注意

    如果您使用的是 Amazon EMR 6.9.0 或更高版本,请改用。/usr/share/aws/delta/lib/delta-spark.jar /usr/share/aws/delta/lib/delta-core.jar

    有关更多信息,请参阅提交应用程序

    要将 jar 依赖项包含在 Spark 任务中,您可以将以下配置属性添加到 Spark 应用程序中:

    --conf “spark.jars=/usr/share/aws/delta/lib/delta-core.jar, /usr/share/aws/delta/lib/delta-storage.jar, /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar"

    有关 Spark 任务依赖项的更多信息,请参阅 Dependency Management(依赖项管理)。

    如果您使用的是 Amazon EMR 6.9.0 或更高版本,请改为添加配置。/usr/share/aws/delta/lib/delta-spark.jar

    --conf “spark.jars=/usr/share/aws/delta/lib/delta-spark.jar, /usr/share/aws/delta/lib/delta-storage.jar, /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar"

初始化 Delta Lake 的 Spark 会话

以下示例演示如何启动交互式 Spark Shell、使用 Spark 提交,或如何使用 Amazon EMR Notebooks 在 Amazon EMR 上使用 Delta Lake。

spark-shell
  1. 使用 SSH 连接到主节点。有关更多信息,请参阅《Amazon EMR 管理指南》中的使用 SSH 连接到主节点

  2. 输入以下命令以启动 Spark shell。要使用 PySpark 外壳,请spark-shell替换为pyspark

    spark-shell \ --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
spark-submit
  1. 使用 SSH 连接到主节点。有关更多信息,请参阅《Amazon EMR 管理指南》中的使用 SSH 连接到主节点

  2. 输入以下命令以启动 Delta Lake 的 Spark 会话。

    spark-submit —conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" —conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
EMR Studio notebooks

要使用 Amazon EMR Studio Notebooks 初始化 Spark 会话,请使用 Amazon EMR Notebook 中的 %%configure 魔术命令配置 Spark 会话,如下例所示。有关更多信息,请参阅 Amazon EMR 管理指南中的使用 EMR Notebooks 魔法命令

%%configure -f { "conf": { "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog" } }

写入 Delta Lake 表

以下示例说明如何创建 DataFrame 并将其写为 Delta Lake 数据集。此示例演示如何使用 Spark Shell 处理数据集,同时使用 SSH 作为默认 hadoop 用户连接到主节点。

注意

要将代码示例粘贴到 Spark Shell 中,请在提示符处键入 :paste,粘贴示例,然后按 CTRL + D

PySpark

Spark 包含基于 Python 的 Shell pyspark,您可以用它来设计以 Python 编写的 Spark 程序的原型。就像使用 spark-shell 一样,在主节点上调用 pyspark

## Create a DataFrame data = spark.createDataFrame([("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")], ["id", "creation_date", "last_update_time"]) ## Write a DataFrame as a Delta Lake dataset to the S3 location spark.sql("""CREATE TABLE IF NOT EXISTS delta_table (id string, creation_date string, last_update_time string) USING delta location 's3://DOC-EXAMPLE-BUCKET/example-prefix/db/delta_table'"""); data.writeTo("delta_table").append()
Scala
import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ // Create a DataFrame val data = Seq(("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")).toDF("id", "creation_date", "last_update_time") // Write a DataFrame as a Delta Lake dataset to the S3 location spark.sql("""CREATE TABLE IF NOT EXISTS delta_table (id string, creation_date string, last_update_time string) USING delta location 's3://DOC-EXAMPLE-BUCKET/example-prefix/db/delta_table'"""); data.write.format("delta").mode("append").saveAsTable("delta_table")
SQL
-- Create a Delta Lake table with the S3 location CREATE TABLE delta_table(id string, creation_date string, last_update_time string) USING delta LOCATION 's3://DOC-EXAMPLE-BUCKET/example-prefix/db/delta_table'; -- insert data into the table INSERT INTO delta_table VALUES ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z");

从 Delta Lake 表中读取

PySpark
ddf = spark.table("delta_table") ddf.show()
Scala
val ddf = spark.table("delta_table") ddf.show()
SQL
SELECT * FROM delta_table;