将 Delta Lake 集群与 Spark 结合使用 - Amazon EMR
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将 Delta Lake 集群与 Spark 结合使用

从 Amazon 6.9.0 EMR 版本开始,您无需执行引导操作即可将 Delta Lake 与 Spark 集群配合使用。对于 Amazon 6.8.0 及更低EMR版本,您可以使用引导操作来预安装必要的依赖项。

以下示例使用在 Amazon EMR Spark 集群上使用 Delta Lake。 Amazon CLI

要将 Amazon 上的 Delta Lake EMR 与配合使用 Amazon Command Line Interface,请先创建一个集群。有关如何使用指定 Delta Lake 分类的信息 Amazon Command Line Interface,请参阅在创建集群 Amazon Command Line Interface 时使用提供配置或在创建集群SDK时使用 Java 提供配置。

  1. 创建文件 configurations.json 并输入以下内容:

    [{"Classification":"delta-defaults", "Properties":{"delta.enabled":"true"} }]
  2. 使用以下配置创建集群,将示例 Amazon S3 bucket pathsubnet ID 替换为您自己的值。

    aws emr create-cluster --release-label emr-6.9.0 --applications Name=Spark --configurations file://delta_configurations.json --region us-east-1 --name My_Spark_Delta_Cluster --log-uri s3://amzn-s3-demo-bucket/ --instance-type m5.xlarge --instance-count 2 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0

    或者,您可以创建 Amazon EMR 集群和 Spark 应用程序,并将以下文件作为 Spark 任务中的JAR依赖项:

    /usr/share/aws/delta/lib/delta-core.jar, /usr/share/aws/delta/lib/delta-storage.jar, /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar
    注意

    如果您使用的是亚马逊 6.9.0 或更高EMR版本,请/usr/share/aws/delta/lib/delta-spark.jar改用。/usr/share/aws/delta/lib/delta-core.jar

    有关更多信息,请参阅提交应用程序

    要将 jar 依赖项包含在 Spark 任务中,您可以将以下配置属性添加到 Spark 应用程序中:

    --conf “spark.jars=/usr/share/aws/delta/lib/delta-core.jar, /usr/share/aws/delta/lib/delta-storage.jar, /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar"

    有关 Spark 任务依赖项的更多信息,请参阅 Dependency Management(依赖项管理)。

    如果您使用的是 Amazon 6.9.0 或更高EMR版本,请改为添加/usr/share/aws/delta/lib/delta-spark.jar配置。

    --conf “spark.jars=/usr/share/aws/delta/lib/delta-spark.jar, /usr/share/aws/delta/lib/delta-storage.jar, /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar"

初始化 Delta Lake 的 Spark 会话

以下示例展示了如何启动交互式 Spark 外壳、使用 Spark 提交或使用亚马逊EMR笔记本在亚马逊上使用 Delta Lake EMR。

spark-shell
  1. 使用连接到主节点SSH。有关更多信息,请参阅Amazon EMR 管理指南》SSH中的 “使用连接到主节点”。

  2. 输入以下命令以启动 Spark shell。要使用 PySpark 外壳,请spark-shell替换为pyspark

    spark-shell \ --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

    如果您运行亚马逊EMR版本 6.15.0 或更高版本,则还必须使用以下配置来使用基于带有 Delta Lake 的 Lake Formation 的精细访问控制。

    spark-shell \ --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension \ --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \ --conf spark.sql.catalog.spark_catalog.lf.managed=true
spark-submit
  1. 使用连接到主节点SSH。有关更多信息,请参阅Amazon EMR 管理指南》SSH中的 “使用连接到主节点”。

  2. 输入以下命令以启动 Delta Lake 的 Spark 会话。

    spark-submit —conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" —conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

    如果您运行亚马逊EMR版本 6.15.0 或更高版本,则还必须使用以下配置来使用基于带有 Delta Lake 的 Lake Formation 的精细访问控制。

    spark-submit \ ` --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \ --conf spark.sql.catalog.spark_catalog.lf.managed=true
EMR Studio notebooks

要使用 Amazon EMR Studio 笔记本初始化 Spark 会话,请在亚马逊EMR笔记本中使用%%configure神奇命令配置 Spark 会话,如下例所示。有关更多信息,请参阅《Amazon EMR 管理指南》中的 “使用EMR笔记本魔法”。

%%configure -f { "conf": { "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog" } }

如果您运行亚马逊EMR版本 6.15.0 或更高版本,则还必须使用以下配置来使用基于带有 Delta Lake 的 Lake Formation 的精细访问控制。

%%configure -f { "conf": { "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog", "spark.sql.catalog.spark_catalog.lf.managed": "true" } }

写入 Delta Lake 表

以下示例说明如何创建 DataFrame 并将其写为 Delta Lake 数据集。该示例说明了如何在使用默认 hadoop 用户连接到主节点时使用 SSH Spark shell 处理数据集。

注意

要将代码示例粘贴到 Spark Shell 中,请在提示符处键入 :paste,粘贴示例,然后按 CTRL + D

PySpark

Spark 包含基于 Python 的 Shell pyspark,您可以用它来设计以 Python 编写的 Spark 程序的原型。就像使用 spark-shell 一样,在主节点上调用 pyspark

## Create a DataFrame data = spark.createDataFrame([("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")], ["id", "creation_date", "last_update_time"]) ## Write a DataFrame as a Delta Lake dataset to the S3 location spark.sql("""CREATE TABLE IF NOT EXISTS delta_table (id string, creation_date string, last_update_time string) USING delta location 's3://amzn-s3-demo-bucket/example-prefix/db/delta_table'"""); data.writeTo("delta_table").append()
Scala
import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ // Create a DataFrame val data = Seq(("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")).toDF("id", "creation_date", "last_update_time") // Write a DataFrame as a Delta Lake dataset to the S3 location spark.sql("""CREATE TABLE IF NOT EXISTS delta_table (id string, creation_date string, last_update_time string) USING delta location 's3://amzn-s3-demo-bucket/example-prefix/db/delta_table'"""); data.write.format("delta").mode("append").saveAsTable("delta_table")
SQL
-- Create a Delta Lake table with the S3 location CREATE TABLE delta_table(id string, creation_date string, last_update_time string) USING delta LOCATION 's3://amzn-s3-demo-bucket/example-prefix/db/delta_table'; -- insert data into the table INSERT INTO delta_table VALUES ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z");

从 Delta Lake 表中读取

PySpark
ddf = spark.table("delta_table") ddf.show()
Scala
val ddf = spark.table("delta_table") ddf.show()
SQL
SELECT * FROM delta_table;