在 Amazon Glue 中使用 Delta Lake 框架 - Amazon Glue
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Amazon Glue 中使用 Delta Lake 框架

Amazon Glue 3.0 及更高版本支持 Linux Foundation Delta Lake 框架。Delta Lake 是一个开源数据湖存储框架,可帮助您执行 ACID 交易、扩展元数据处理以及统一流式和批处理数据处理。本主题涵盖了在 Delta Lake 表中传输或存储数据时,在 Amazon Glue 中使用数据的可用功能。要了解有关 Delta Lake 的更多信息,请参阅 Delta Lake 官方文档

您可以使用 Amazon Glue 对 Amazon S3 中的 Delta Lake 表执行读写操作,也可以使用 Amazon Glue 数据目录处理 Delta Lake 表。还支持插入、更新和表批量读取和写入等其他操作。使用 Delta Lake 表时,也可以选择使用 Delta Lake Python 库中的方法,例如 DeltaTable.forPath。有关 Delta Lake Python 库的更多信息,请参阅 Delta Lake 的 Python 文档。

下表列出了 Amazon Glue 每个版本中包含的 Delta Lake 版本。

Amazon Glue 版本 支持的 Delta Lake 版本
4.0 2.1.0
3.0 1.0.0

要了解有关 Amazon Glue 支持的数据湖框架的更多信息,请参阅在 Amazon Glue ETL 任务中使用数据湖框架

为 Amazon Glue 启用 Delta Lake

要为 Amazon Glue 启用 Delta Lake,请完成以下任务:

  • 指定 delta 作为 --datalake-formats 作业参数的值。有关更多信息,请参阅 Amazon Glue 作业参数

  • --conf 为 Glue 作业创建一个名为 Amazon 的密钥,并将其设置为以下值。或者,您可以在脚本中使用 SparkConf 设置以下配置。这些设置有助于 Apache Spark 正确处理 Delta Lake 表。

    spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog --conf spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore
  • Amazon Glue 4.0 默认为 Delta 表启用了 Lake Formation 权限支持。无需额外配置即可读取/写入注册到 Lake Formation 的 Delta 表。Amazon Glue 作业 IAM 角色必须具有 SELECT 权限才能读取已注册的 Delta 表。Amazon Glue 作业 IAM 角色必须具有 SUPER 权限才能写入已注册的 Delta 表。要了解有关管理 Lake Formation 权限的更多信息,请参阅 Granting and revoking permissions on Data Catalog resources

使用不同的 Delta Lake 版本

要使用 Amazon Glue 不支持的 Delta Lake 版本,请使用 --extra-jars 作业参数指定您自己的 Delta Lake JAR 文件。请勿包含 delta 作为 --datalake-formats 作业参数的值。要在这种情况下使用 Delta Lake Python 库,必须使用 --extra-py-files 作业参数指定库 JAR 文件。Python 库打包在 Delta Lake JAR 文件中。

示例:将 Delta Lake 表写入 Amazon S3,并将其注册到 Amazon Glue 数据目录

以下 Amazon Glue ETL 脚本演示了如何将 Delta Lake 表写入 Amazon S3,并将该表注册到 Amazon Glue 数据目录。

Python
# Example: Create a Delta Lake table from a DataFrame # and register the table to Glue Data Catalog additional_options = { "path": "s3://<s3Path>" } dataFrame.write \ .format("delta") \ .options(**additional_options) \ .mode("append") \ .partitionBy("<your_partitionkey_field>") \ .saveAsTable("<your_database_name>.<your_table_name>")
Scala
// Example: Example: Create a Delta Lake table from a DataFrame // and register the table to Glue Data Catalog val additional_options = Map( "path" -> "s3://<s3Path>" ) dataFrame.write.format("delta") .options(additional_options) .mode("append") .partitionBy("<your_partitionkey_field>") .saveAsTable("<your_database_name>.<your_table_name>")

示例:使用 Amazon Glue 数据目录从 Amazon S3 读取 Delta Lake 表

以下 Amazon Glue ETL 脚本读取您在 示例:将 Delta Lake 表写入 Amazon S3,并将其注册到 Amazon Glue 数据目录 中创建的 Delta Lake 表。

Python

在本示例中,使用 create_data_frame.from_catalog 方法。

# Example: Read a Delta Lake table from Glue Data Catalog from awsglue.context import GlueContext from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) df = glueContext.create_data_frame.from_catalog( database="<your_database_name>", table_name="<your_table_name>", additional_options=additional_options )
Scala

在本示例中,使用 getCatalogSource 方法。

// Example: Read a Delta Lake table from Glue Data Catalog import com.amazonaws.services.glue.GlueContext import org.apacke.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val df = glueContext.getCatalogSource("<your_database_name>", "<your_table_name>", additionalOptions = additionalOptions) .getDataFrame() } }

示例:使用 Amazon Glue 数据目录在 Amazon S3 中将 DataFrame 插入 Delta Lake 表

此示例将数据插入您在 示例:将 Delta Lake 表写入 Amazon S3,并将其注册到 Amazon Glue 数据目录 中创建的 Delta Lake 表。

注意

此示例要求您设置 --enable-glue-datacatalog 任务参数,才能将 Amazon Glue Data Catalog 用作 Apache Spark Hive 元存储。要了解更多信息,请参阅Amazon Glue 作业参数

Python

在本示例中,使用 write_data_frame.from_catalog 方法。

# Example: Insert into a Delta Lake table in S3 using Glue Data Catalog from awsglue.context import GlueContext from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) glueContext.write_data_frame.from_catalog( frame=dataFrame, database="<your_database_name>", table_name="<your_table_name>", additional_options=additional_options )
Scala

在本示例中,使用 getCatalogSink 方法。

// Example: Insert into a Delta Lake table in S3 using Glue Data Catalog import com.amazonaws.services.glue.GlueContext import org.apacke.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) glueContext.getCatalogSink("<your_database_name>", "<your_table_name>", additionalOptions = additionalOptions) .writeDataFrame(dataFrame, glueContext) } }

示例:使用 Spark API 从 Amazon S3 读取 Delta Lake 表

此示例使用 Spark API 从 Amazon S3 读取 Delta Lake 表。

Python
# Example: Read a Delta Lake table from S3 using a Spark DataFrame dataFrame = spark.read.format("delta").load("s3://<s3path/>")
Scala
// Example: Read a Delta Lake table from S3 using a Spark DataFrame val dataFrame = spark.read.format("delta").load("s3://<s3path/>")

示例:使用 Spark 向 Amazon S3 写入 Delta Lake 表

此示例使用 Spark 向 Amazon S3 写入 Delta Lake 表。

Python
# Example: Write a Delta Lake table to S3 using a Spark DataFrame dataFrame.write.format("delta") \ .options(**additional_options) \ .mode("overwrite") \ .partitionBy("<your_partitionkey_field>") .save("s3://<s3Path>")
Scala
// Example: Write a Delta Lake table to S3 using a Spark DataFrame dataFrame.write.format("delta") .options(additionalOptions) .mode("overwrite") .partitionBy("<your_partitionkey_field>") .save("s3://<s3path/>")

示例:读取和写入具有 Lake Formation 权限控制的 Delta Lake 表

此示例将读取和写入一个具有 Lake Formation 权限控制的 Delta Lake 表。

  1. 创建一个 Delta 表并将其注册到 Lake Formation

    1. 要启用 Lake Formation 权限控制,您首先需要将表的 Amazon S3 路径注册到 Lake Formation。有关更多信息,请参阅 Registering an Amazon S3 location(注册 Amazon S3 位置)。您可以通过 Lake Formation 控制台或使用 Amazon CLI 进行注册:

      aws lakeformation register-resource --resource-arn arn:aws:s3:::<s3-bucket>/<s3-folder> --use-service-linked-role --region <REGION>

      注册了 Amazon S3 位置后,对于任何指向该位置(或其任何子位置)的 Amazon Glue 表,GetTable 调用中的 IsRegisteredWithLakeFormation 参数都将返回值 true。

    2. 创建一个指向通过 Spark 注册的 Amazon S3 路径的 Delta 表:

      注意

      以下示例属于 Python 示例。

      dataFrame.write \ .format("delta") \ .mode("overwrite") \ .partitionBy("<your_partitionkey_field>") \ .save("s3://<the_s3_path>")

      将数据写入 Amazon S3 后,使用 Amazon Glue 爬网程序创建新的 Delta 目录表。有关更多信息,请参阅 Introducing native Delta Lake table support with Amazon Glue crawlers

      您也可以通过 Amazon Glue CreateTable API 手动创建表。

  2. 向 Amazon Glue 作业 IAM 角色授予 Lake Formation 权限。您可以通过 Lake Formation 控制台授予权限,也可以使用 Amazon CLI 授予权限。有关更多信息,请参阅 Granting table permissions using the Lake Formation console and the named resource method

  3. 读取注册到 Lake Formation 的 Delta 表。代码与读取未注册的 Delta 表相同。请注意,Amazon Glue 作业 IAM 角色需要具有 SELECT 权限才能成功读取。

    # Example: Read a Delta Lake table from Glue Data Catalog df = glueContext.create_data_frame.from_catalog( database="<your_database_name>", table_name="<your_table_name>", additional_options=additional_options )
  4. 写入注册到 Lake Formation 的 Delta 表。代码与写入未注册的 Delta 表相同。请注意,Amazon Glue 作业 IAM 角色需要具有 SUPER 权限才能成功写入。

    默认情况下,Amazon Glue 会将 Append 作为 saveMode 使用。您可以通过设置 additional_options 中的 saveMode 选项来对其进行更改。要了解 Delta 表中对 saveMode 的支持,请参阅 Write to a table

    glueContext.write_data_frame.from_catalog( frame=dataFrame, database="<your_database_name>", table_name="<your_table_name>", additional_options=additional_options )