Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅
中国的 Amazon Web Services 服务入门
(PDF)。
将 Delta Lake 集群与 Spark 结合使用
从 Amazon EMR 版本 6.9.0 开始,您可以将 Delta Lake 与 Spark 集群结合使用,无需引导操作。对于 Amazon EMR 6.8.0 及更早版本,您可以使用引导操作来预安装需要的依赖项。
以下示例使用 Amazon CLI 在 Amazon EMR Spark 集群上使用 Delta Lake。
要在 Amazon EMR 上将 Delta Lake 与 Amazon Command Line Interface 结合使用,请首先创建集群。有关使用 Amazon Command Line Interface 指定 Delta Lake 分类的信息,请参阅 Supply a configuration using the Amazon Command Line Interface when you create a cluster 或 Supply a configuration with the Java SDK when you create a cluster。
-
创建文件 configurations.json 并输入以下内容:
[{"Classification":"delta-defaults", "Properties":{"delta.enabled":"true"} }]
-
使用以下配置创建集群,将示例 Amazon S3 bucket path 和 subnet
ID 替换为您自己的值。
aws emr create-cluster
--release-label emr-6.9.0
--applications Name=Spark
--configurations file://delta_configurations.json
--region us-east-1
--name My_Spark_Delta_Cluster
--log-uri s3://amzn-s3-demo-bucket/
--instance-type m5.xlarge
--instance-count 2
--service-role EMR_DefaultRole_V2
--ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
或者,您可以创建一个 Amazon EMR 集群和 Spark 应用程序,并在 Spark 作业中使用以下文件作为 JAR 依赖项:
/usr/share/aws/delta/lib/delta-core.jar,
/usr/share/aws/delta/lib/delta-storage.jar,
/usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar
如果您使用 Amazon EMR 7.0.0 或更高版本,请使用 /usr/share/aws/delta/lib/delta-spark.jar 而不是 /usr/share/aws/delta/lib/delta-core.jar。
有关更多信息,请参阅提交应用程序。
要将 jar 依赖项包含在 Spark 任务中,您可以将以下配置属性添加到 Spark 应用程序中:
--conf “spark.jars=/usr/share/aws/delta/lib/delta-core.jar,
/usr/share/aws/delta/lib/delta-storage.jar,
/usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar"
有关 Spark 任务依赖项的更多信息,请参阅 Dependency Management(依赖项管理)。
如果您使用 Amazon EMR 7.0.0 或更高版本,请改为添加 /usr/share/aws/delta/lib/delta-spark.jar 配置。
--conf “spark.jars=/usr/share/aws/delta/lib/delta-spark.jar,
/usr/share/aws/delta/lib/delta-storage.jar,
/usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar"
初始化 Delta Lake 的 Spark 会话
以下示例演示如何启动交互式 Spark Shell、使用 Spark 提交,或如何使用 Amazon EMR Notebooks 在 Amazon EMR 上使用 Delta Lake。
- spark-shell
-
-
使用 SSH 连接到主节点。有关更多信息,请参阅《Amazon EMR 管理指南》中的使用 SSH 连接到主节点。
-
输入以下命令以启动 Spark shell。要使用 PySpark Shell,请使用 pyspark 替换 spark-shell。
spark-shell \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
如果您运行 Amazon EMR 6.15.0 或更高版本,则还必须使用以下配置,将基于 Lake Formation 的精细访问控制与 Delta Lake 结合使用。
spark-shell \
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \
--conf spark.sql.catalog.spark_catalog.lf.managed=true
- spark-submit
-
-
使用 SSH 连接到主节点。有关更多信息,请参阅《Amazon EMR 管理指南》中的使用 SSH 连接到主节点。
-
输入以下命令以启动 Delta Lake 的 Spark 会话。
spark-submit
—conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"
—conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
如果您运行 Amazon EMR 6.15.0 或更高版本,则还必须使用以下配置,将基于 Lake Formation 的精细访问控制与 Delta Lake 结合使用。
spark-submit \ `
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \
--conf spark.sql.catalog.spark_catalog.lf.managed=true
- EMR Studio notebooks
-
要使用 Amazon EMR Studio Notebooks 初始化 Spark 会话,请使用 Amazon EMR Notebook 中的 %%configure 魔术命令配置 Spark 会话,如下例所示。有关更多信息,请参阅 Amazon EMR 管理指南中的使用 EMR Notebooks 魔法命令。
%%configure -f
{
"conf": {
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog"
}
}
如果您运行 Amazon EMR 6.15.0 或更高版本,则还必须使用以下配置,将基于 Lake Formation 的精细访问控制与 Delta Lake 结合使用。
%%configure -f
{
"conf": {
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog",
"spark.sql.catalog.spark_catalog.lf.managed": "true"
}
}
写入 Delta Lake 表
以下示例演示如何创建 DataFrame 并将其作为 Delta Lake 数据集写入。此示例演示如何使用 Spark Shell 处理数据集,同时使用 SSH 作为默认 hadoop 用户连接到主节点。
要将代码示例粘贴到 Spark Shell 中,请在提示符处键入 :paste,粘贴示例,然后按 CTRL +
D。
- PySpark
-
Spark 包含基于 Python 的 Shell pyspark,您可以用它来设计以 Python 编写的 Spark 程序的原型。就像使用 spark-shell 一样,在主节点上调用 pyspark。
## Create a DataFrame
data = spark.createDataFrame([("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")],
["id", "creation_date", "last_update_time"])
## Write a DataFrame as a Delta Lake dataset to the S3 location
spark.sql("""CREATE TABLE IF NOT EXISTS delta_table (id string, creation_date string,
last_update_time string)
USING delta location
's3://amzn-s3-demo-bucket/example-prefix/db/delta_table'""");
data.writeTo("delta_table").append()
- Scala
-
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
// Create a DataFrame
val data = Seq(("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")).toDF("id", "creation_date", "last_update_time")
// Write a DataFrame as a Delta Lake dataset to the S3 location
spark.sql("""CREATE TABLE IF NOT EXISTS delta_table (id string,
creation_date string,
last_update_time string)
USING delta location
's3://amzn-s3-demo-bucket/example-prefix/db/delta_table'""");
data.write.format("delta").mode("append").saveAsTable("delta_table")
- SQL
-
-- Create a Delta Lake table with the S3 location
CREATE TABLE delta_table(id string,
creation_date string,
last_update_time string)
USING delta LOCATION
's3://amzn-s3-demo-bucket/example-prefix/db/delta_table';
-- insert data into the table
INSERT INTO delta_table VALUES ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103", "2015-01-01", "2015-01-01T13:51:40.519832Z");
从 Delta Lake 表中读取
- PySpark
-
ddf = spark.table("delta_table")
ddf.show()
- Scala
-
val ddf = spark.table("delta_table")
ddf.show()
- SQL
-
SELECT * FROM delta_table;