使用 Amazon Glue 对 Amazon S3 表运行 ETL 任务 - Amazon Simple Storage Service
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

使用 Amazon Glue 对 Amazon S3 表运行 ETL 任务

Amazon Glue 是一项无服务器数据集成服务,可让使用分析功能的用户轻松发现、准备、移动和集成来自多个来源的数据。可以使用 Amazon Glue 运行提取、转换、加载(ETL)管道,来将数据加载到数据湖中。有关 Amazon Glue 的更多信息,请参阅《Amazon Glue 开发人员指南》中的什么是 Amazon Glue?

Amazon Glue 作业将封装连接到源数据的脚本,处理该脚本,然后将其写入数据目标。通常,作业运行提取、转换和加载 (ETL) 脚本。作业可以运行专为 Apache Spark 运行时环境设计的脚本。您可以监控作业运行以了解运行时指标(例如完成状态、持续时间和开始时间)。

您可以使用 Amazon Glue 作业来处理 S3 表中的数据,方法是通过与 Amazon 分析服务的集成连接到表,或者直接使用 Amazon S3 表类数据存储服务 Iceberg REST 端点或适用于 Apache Iceberg 的 Amazon S3 表类数据存储服务目录进行连接。本指南涵盖了开始将 Amazon Glue 与 S3 表类数据存储服务结合使用的基本步骤,包括:

注意

Amazon Glue 版本 5.0 或更高版本支持 S3 表类数据存储服务。

先决条件

在可以从 Amazon Glue 任务中查询表之前,必须配置 Amazon Glue 可用于运行该任务的 IAM 角色,并将适用于 Apache Iceberg 的 Amazon S3 表类数据存储服务目录 JAR 上传到 Amazon Glue 运行任务时可以访问的 S3 存储桶。

  • 将表存储桶与 Amazon 分析服务集成

  • 为 Amazon Glue 创建 IAM 角色。

    • AmazonS3TablesFullAccess 托管式策略附加到角色。

    • AmazonS3FullAccess 托管式策略附加到角色。

  • (可选)如果您使用的是适用于 Apache Iceberg 的 Amazon S3 表类数据存储服务目录,则需要下载客户端目录 JAR 并将其上传到 S3 存储桶。

    下载目录 JAR
    1. Maven Central 上检查最新版本。您可以使用浏览器或使用以下命令从 Maven Central 下载 JAR。请务必将版本号替换为最新版本。

      wget https://repo1.maven.org/maven2/software/amazon/s3tables/s3-tables-catalog-for-iceberg-runtime/0.1.5/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar
    2. 将下载的 JAR 上传到 Amazon Glue IAM 角色可以访问的 S3 存储桶。您可以使用以下 Amazon CLI 命令来上传 JAR。请务必将版本号替换为最新版本,并将存储桶名称路径替换为您自己的信息。

      aws s3 cp s3-tables-catalog-for-iceberg-runtime-0.1.5.jar s3://amzn-s3-demo-bucket/jars/

创建用于连接到表存储桶的脚本

要在运行 Amazon Glue ETL 作业时访问表数据,可以为 Apache Iceberg 配置一个 Spark 会话,用于连接到 S3 表存储桶。可以修改现有脚本以连接到表存储桶或创建新脚本。有关创建 Amazon Glue 脚本的更多信息,请参阅《Amazon Glue 开发人员指南》中的教程:编写 Amazon Glue for Spark 脚本

可以将会话配置为通过以下任一 S3 表类数据存储服务访问方法连接到表存储桶:

  • S3 表类数据存储服务与 Amazon 分析服务集成

  • Amazon S3 表类数据存储服务 Iceberg REST 端点

  • 适用于 Apache Iceberg 的 Amazon S3 表类数据存储服务目录

从以下访问方法中进行选择,以查看设置说明和配置示例。

Amazon analytics services integration

作为使用 Amazon 分析服务集成在 Amazon Glue 上使用 Spark 查询表的先决条件,您必须将表存储桶与 Amazon 分析服务进行集成

可以通过作业中的 Spark 会话或交互式会话中的 Amazon Glue Studio 魔法来配置与表存储桶的连接。要使用以下示例,请将占位符值替换为您自己的表存储桶的信息。

使用 PySpark 脚本

在 PySpark 脚本中使用以下代码段,以便将 Amazon Glue 作业配置为使用集成连接到表存储桶。

spark = SparkSession.builder.appName("SparkIcebergSQL") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog","s3tables") \ .config("spark.sql.catalog.s3tables", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3tables.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ .config("spark.sql.catalog.s3tables.glue.id", "111122223333:s3tablescatalog/amzn-s3-demo-table-bucket") \ .config("spark.sql.catalog.s3tables.warehouse", "s3://amzn-s3-demo-table-bucket/warehouse/") \ .getOrCreate()
使用交互式 Amazon Glue 会话

如果您将交互式笔记本会话与 Amazon Glue 5.0 结合使用,请在执行代码之前,在单元格中使用 %%configure 魔法指定相同的配置。

%%configure { "conf": { "spark.sql.defaultCatalog": "s3tables", "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.sql.catalog.s3tables": "org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.s3tables.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog", "spark.sql.catalog.s3tables.glue.id": "111122223333:s3tablescatalog/amzn-s3-demo-table-bucket", "spark.sql.catalog.s3tables.warehouse": "s3://amzn-s3-demo-table-bucket/warehouse/" } }
Amazon S3 Tables Iceberg REST endpoint

可以通过作业中的 Spark 会话或交互式会话中的 Amazon Glue Studio 魔法来配置与表存储桶的连接。要使用以下示例,请将占位符值替换为您自己的表存储桶的信息。

使用 PySpark 脚本

在 PySpark 脚本中使用以下代码段,以便将 Amazon Glue 作业配置为使用端点连接到表存储桶。

spark = SparkSession.builder.appName("glue-s3-tables-rest") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog", "s3_rest_catalog") \ .config("spark.sql.catalog.s3_rest_catalog", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3_rest_catalog.type", "rest") \ .config("spark.sql.catalog.s3_rest_catalog.uri", "https://s3tables.Region.amazonaws.com/iceberg") \ .config("spark.sql.catalog.s3_rest_catalog.warehouse", "arn:aws:s3tables:Region:111122223333:s3tablescatalog/amzn-s3-demo-table-bucket") \ .config("spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled", "true") \ .config("spark.sql.catalog.s3_rest_catalog.rest.signing-name", "s3tables") \ .config("spark.sql.catalog.s3_rest_catalog.rest.signing-region", "Region") \ .config('spark.sql.catalog.s3_rest_catalog.io-impl','org.apache.iceberg.aws.s3.S3FileIO') \ .config('spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled','false') \ .getOrCreate()
使用交互式 Amazon Glue 会话

如果您将交互式笔记本会话与 Amazon Glue 5.0 结合使用,请在执行代码之前,在单元格中使用 %%configure 魔法指定相同的配置。将占位符值 替换为您自己的表存储桶的信息。

%%configure { "conf": { "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.sql.defaultCatalog": "s3_rest_catalog", "spark.sql.catalog.s3_rest_catalog": "org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.s3_rest_catalog.type": "rest", "spark.sql.catalog.s3_rest_catalog.uri": "https://s3tables.Region.amazonaws.com/iceberg", "spark.sql.catalog.s3_rest_catalog.warehouse": "arn:aws:s3tables:Region:111122223333:s3tablescatalog/amzn-s3-demo-table-bucket", "spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled": "true", "spark.sql.catalog.s3_rest_catalog.rest.signing-name": "s3tables", "spark.sql.catalog.s3_rest_catalog.rest.signing-region": "Region", "spark.sql.catalog.s3_rest_catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO", "spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled": "false" } }
Amazon S3 Tables Catalog for Apache Iceberg

作为使用适用于 Apache Iceberg 的 Amazon S3 表类数据存储服务目录连接到表的先决条件,您必须先下载最新的目录 jar 并将其上传到 S3 存储桶。然后,在创建作业时,将指向客户端目录 JAR 的路径添加为一个特殊参数。有关 Amazon Glue 中作业参数的更多信息,请参阅《Amazon Glue 开发人员指南》中的 Amazon Glue 作业中使用的特殊参数

可以通过作业中的 Spark 会话或交互式会话中的 Amazon Glue Studio 魔法来配置与表存储桶的连接。要使用以下示例,请将占位符值替换为您自己的表存储桶的信息。

使用 PySpark 脚本

在 PySpark 脚本中使用以下代码段,以便将 Amazon Glue 作业配置为使用 JAR 连接到表存储桶。将占位符值 替换为您自己的表存储桶的信息。

spark = SparkSession.builder.appName("glue-s3-tables") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog", "s3tablesbucket") \ .config("spark.sql.catalog.s3tablesbucket", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3tablesbucket.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog") \ .config("spark.sql.catalog.s3tablesbucket.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \ .getOrCreate()
使用交互式 Amazon Glue 会话

如果您将交互式笔记本会话与 Amazon Glue 5.0 结合使用,请在执行代码之前,在单元格中使用 %%configure 魔法指定相同的配置。将占位符值 替换为您自己的表存储桶的信息。

%%configure { "conf": { "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.sql.defaultCatalog": "s3tablesbucket", "spark.sql.catalog.s3tablesbucket": "org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.s3tablesbucket.catalog-impl": "software.amazon.s3tables.iceberg.S3TablesCatalog", "spark.sql.catalog.s3tablesbucket.warehouse": "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket" }, "extra-jars": "s3://amzn-s3-demo-bucket/jars/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar" }

示例脚本

以下示例 PySpark 脚本可用于测试通过 Amazon Glue 作业来查询 S3 表。这些脚本连接到表存储桶,然后运行查询来执行以下操作:创建新的命名空间、创建示例表、向表中插入数据以及返回表数据。要使用这些脚本,请将占位符值替换为您自己的表存储桶的信息。

根据 S3 表类数据存储服务访问方法从以下脚本中进行选择。

S3 Tables integration with Amazon analytics services
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("SparkIcebergSQL") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog","s3tables") .config("spark.sql.catalog.s3tables", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3tables.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ .config("spark.sql.catalog.s3tables.glue.id", "111122223333:s3tablescatalog/amzn-s3-demo-table-bucket") \ .config("spark.sql.catalog.s3tables.warehouse", "s3://amzn-s3-demo-table-bucket/bucket/amzn-s3-demo-table-bucket") \ .getOrCreate() namespace = "new_namespace" table = "new_table" spark.sql("SHOW DATABASES").show() spark.sql(f"DESCRIBE NAMESPACE {namespace}").show() spark.sql(f""" CREATE TABLE IF NOT EXISTS {namespace}.{table} ( id INT, name STRING, value INT ) """) spark.sql(f""" INSERT INTO {namespace}.{table} VALUES (1, 'ABC', 100), (2, 'XYZ', 200) """) spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()
Amazon S3 Tables Iceberg REST endpoint
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("glue-s3-tables-rest") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog", "s3_rest_catalog") \ .config("spark.sql.catalog.s3_rest_catalog", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3_rest_catalog.type", "rest") \ .config("spark.sql.catalog.s3_rest_catalog.uri", "https://s3tables.Region.amazonaws.com/iceberg") \ .config("spark.sql.catalog.s3_rest_catalog.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \ .config("spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled", "true") \ .config("spark.sql.catalog.s3_rest_catalog.rest.signing-name", "s3tables") \ .config("spark.sql.catalog.s3_rest_catalog.rest.signing-region", "Region") \ .config('spark.sql.catalog.s3_rest_catalog.io-impl','org.apache.iceberg.aws.s3.S3FileIO') \ .config('spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled','false') \ .getOrCreate() namespace = "s3_tables_rest_namespace" table = "new_table_s3_rest" spark.sql("SHOW DATABASES").show() spark.sql(f"DESCRIBE NAMESPACE {namespace}").show() spark.sql(f""" CREATE TABLE IF NOT EXISTS {namespace}.{table} ( id INT, name STRING, value INT ) """) spark.sql(f""" INSERT INTO {namespace}.{table} VALUES (1, 'ABC', 100), (2, 'XYZ', 200) """) spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()
Amazon S3 Tables Catalog for Apache Iceberg
from pyspark.sql import SparkSession #Spark session configurations spark = SparkSession.builder.appName("glue-s3-tables") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog", "s3tablesbucket") \ .config("spark.sql.catalog.s3tablesbucket", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3tablesbucket.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog") \ .config("spark.sql.catalog.s3tablesbucket.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \ .getOrCreate() #Script namespace = "new_namespace" table = "new_table" spark.sql(f"CREATE NAMESPACE IF NOT EXISTS s3tablesbucket.{namespace}") spark.sql(f"DESCRIBE NAMESPACE {namespace}").show() spark.sql(f""" CREATE TABLE IF NOT EXISTS {namespace}.{table} ( id INT, name STRING, value INT ) """) spark.sql(f""" INSERT INTO {namespace}.{table} VALUES (1, 'ABC', 100), (2, 'XYZ', 200) """) spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()

创建用于查询表的 Amazon Glue 作业

以下过程显示了如何设置连接到 S3 表存储桶的 Amazon Glue 作业。您可以使用 Amazon CLI 或将控制台与 Amazon Glue Studio 脚本编辑器结合使用来完成此操作。有关更多信息,请参阅《Amazon Glue 用户指南》中的在 Amazon Glue 中编写任务

以下过程介绍了如何使用 Amazon Glue Studio 脚本编辑器来创建用于查询 S3 表的 ETL 任务。

  1. 通过 https://console.aws.amazon.com/glue/ 打开 Amazon Glue 控制台。

  2. 在导航窗格中,选择 ETL 任务

  3. 选择脚本编辑器,然后选择上传脚本并上传您创建的用于查询 S3 表的 PySpark 脚本。

  4. 选择任务详细信息选项卡,然后对于基本属性输入以下内容。

    • 对于名称,输入任务的名称。

    • 对于 IAM 角色,选择您为 Amazon Glue 创建的角色。

  5. (可选)如果您使用适用于 Apache Iceberg 的 Amazon S3 表类数据存储服务目录访问方法,请展开高级属性,对于从属 JAR 路径,输入您作为先决条件上传到 S3 存储桶的客户端目录 jar 的 S3 URI。例如,s3://amzn-s3-demo-bucket1/jars/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar

  6. 选择保存以创建任务。

  7. 选择运行以启动任务,然后在运行选项卡下查看任务状态。

以下过程介绍了如何使用 Amazon CLI 来创建用于查询 S3 表的 ETL 任务。要使用这些命令,请将占位符值替换为您自己的值。

先决条件
  1. 创建 Amazon Glue 任务。

    aws glue create-job \ --name etl-tables-job \ --role arn:aws:iam::111122223333:role/AWSGlueServiceRole \ --command '{ "Name": "glueetl", "ScriptLocation": "s3://amzn-s3-demo-bucket1/scripts/glue-etl-query.py", "PythonVersion": "3" }' \ --default-arguments '{ "--job-language": "python", "--class": "GlueApp" }' \ --glue-version "5.0"
    注意

    (可选)如果您使用适用于 Apache Iceberg 的 Amazon S3 表类数据存储服务目录访问方法,请使用 --extra-jars 参数将客户端目录 JAR 添加到 --default-arguments 中。添加参数时,将输入占位符替换为您自己的信息。

    "--extra-jars": "s3://amzn-s3-demo-bucket/jar-path/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar"
  2. 启动任务。

    aws glue start-job-run \ --job-name etl-tables-job
  3. 要查看任务状态,请复制上一个命令中的运行 ID,然后将其输入到以下命令中。

    aws glue get-job-run --job-name etl-tables-job \ --run-id jr_ec9a8a302e71f8483060f87b6c309601ea9ee9c1ffc2db56706dfcceb3d0e1ad