在 Amazon Glue ETL 中通过下推优化读取 - Amazon Glue
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Amazon Glue ETL 中通过下推优化读取

下推是一种优化技术,它可以将检索数据的逻辑推向离数据源更近的地方。源可以是数据库或文件系统,例如 Amazon S3。直接在源端执行某些操作时,无需将所有数据通过网络传送到由 Amazon Glue 管理的 Spark 引擎,从而节省时间和处理能力。

换言之,下推可以减少数据扫描量。要详细了解确定何时适合使用这种技术的过程,请参阅《Amazon 规范性指南》中“优化 Amazon Glue for Apache Spark 作业性能的最佳实践”指南中的 减少数据扫描量

对存储在 Amazon S3 上的文件的谓词下推

在 Amazon S3 上处理按前缀组织的文件时,可以通过定义下推谓词来筛选目标 Amazon S3 路径。可以直接将筛选器应用于 Amazon Glue Data Catalog 中存储的分区元数据,而不必读取完整的数据集并在 DynamicFrame 中应用筛选器。这种方法允许您有选择地列出和只读必要的数据。有关此过程的更多信息,包括按分区写入存储桶,请参阅 管理 Amazon Glue 中用于 ETL 输出的分区

通过使用 push_down_predicate 参数,可以在 Amazon S3 中实现谓词下推。假设按年、月和日分区的 Amazon S3 中的一个存储桶。如果您想检索 2022 年 6 月的客户数据,可以指示 Amazon Glue 仅读取相关的 Amazon S3 路径。在本例中,push_down_predicateyear='2022' and month='06'。综上所述,可以实现读取操作,如下所示:

Python
customer_records = glueContext.create_dynamic_frame.from_catalog( database = "customer_db", table_name = "customer_tbl", push_down_predicate = "year='2022' and month='06'" )
Scala
val customer_records = glueContext.getCatalogSource( database="customer_db", tableName="customer_tbl", pushDownPredicate="year='2022' and month='06'" ).getDynamicFrame()

在前面的场景中,push_down_predicate 从 Amazon Glue Data Catalog 中检索所有分区的列表,并在读取底层 Amazon S3 文件之前对其进行筛选。尽管这在大多数情况下都有帮助,但在处理具有数百万个分区的数据集时,列出分区的过程可能很耗时。为了解决这个问题,可以使用服务器端的分区修剪来提高性能。这通过在 Amazon Glue Data Catalog 中为数据建立分区索引来完成。有关分区索引的更多信息,请参阅 在 Amazon Glue 中使用分区索引。然后,您可以使用 catalogPartitionPredicate 选项来引用索引。有关使用 catalogPartitionPredicate 检索分区的示例,请参阅 使用目录分区谓词进行服务器端筛选

使用 JDBC 源时下推

GlueContext 中使用的 Amazon Glue JDBC 读取器通过提供可以直接在源上运行的自定义 SQL 查询,支持对支持的数据库进行下推。这可以通过设置 sampleQuery 参数来实现。您的示例查询可以指定要选择的列,还可以提供下推谓词来限制传输到 Spark 引擎的数据。

默认情况下,示例查询在单个节点上运行,这可能会在处理大量数据时导致作业失败。要使用此功能大规模查询数据,您应该通过设置 enablePartitioningForSampleQuery 为 true 来配置查询分区,这将通过您选择的键将查询分发到多个节点。查询分区还需要一些其他必要的配置参数。有关查询分区的更多信息,请参阅 从 JDBC 表并行读取

设置 enablePartitioningForSampleQuery 时,Amazon Glue 会在查询数据库时将您的下推谓词与分区谓词组合在一起。sampleQuery 必须以 AND for Amazon Glue 结尾才能附加分区条件。(如果您未提供下推谓词,则 sampleQuery 必须以 WHERE 结尾)。请参阅下面的示例,其中我们下推一个谓词以仅检索 id 大于 1000 的行。此 sampleQuery 将仅返回 id 大于指定值的行的名称和位置列:

Python
sample_query = "select name, location from customer_tbl WHERE id>=1000 AND" customer_records = glueContext.create_dynamic_frame.from_catalog( database="customer_db", table_name="customer_tbl", sample_query = "select name, location from customer_tbl WHERE id>=1000 AND", additional_options = { "hashpartitions": 36 , "hashfield":"id", "enablePartitioningForSampleQuery":True, "sampleQuery":sample_query } )
Scala
val additionalOptions = Map( "hashpartitions" -> "36", "hashfield" -> "id", "enablePartitioningForSampleQuery" -> "true", "sampleQuery" -> "select name, location from customer_tbl WHERE id >= 1000 AND" ) val customer_records = glueContext.getCatalogSource( database="customer_db", tableName="customer_tbl").getDynamicFrame()
注意

如果 customer_tbl 在数据目录和底层数据存储中的名称不同,则必须在 sample_query 中提供底层表的名称,因为查询将传递到底层数据存储。

您也可以在不与 Amazon Glue Data Catalog 集成的情况下对 JDBC 表进行查询。您可以通过提供 useConnectionPropertiesconnectionName 来重用来自先前存在连接的凭证,而不必提供用户名和密码作为该方法的参数。在本例中,我们从名为 my_postgre_connection 的连接检索凭证。

Python
connection_options_dict = { "useConnectionProperties": True, "connectionName": "my_postgre_connection", "dbtable":"customer_tbl", "sampleQuery":"select name, location from customer_tbl WHERE id>=1000 AND", "enablePartitioningForSampleQuery":True, "hashfield":"id", "hashpartitions":36 } customer_records = glueContext.create_dynamic_frame.from_options( connection_type="postgresql", connection_options=connection_options_dict )
Scala
val connectionOptionsJson = """ { "useConnectionProperties": true, "connectionName": "my_postgre_connection", "dbtable": "customer_tbl", "sampleQuery": "select name, location from customer_tbl WHERE id>=1000 AND", "enablePartitioningForSampleQuery" : true, "hashfield" : "id", "hashpartitions" : 36 } """ val connectionOptions = new JsonOptions(connectionOptionsJson) val dyf = glueContext.getSource("postgresql", connectionOptions).getDynamicFrame()

Amazon Glue 中下推的注意事项和限制

作为一个概念,“下推”适用于从非串流源读取数据。AmazonGlue 支持多种信号源,下推的能力取决于源和连接器。

  • 连接到 Snowflake 时,您可以使用 query 选项。Amazon Glue 4.0 及更高版本的 Redshift 连接器中也有类似的功能。有关使用 query 从 Snowflake 读取内容的更多信息,请参阅 从 Snowflake 表中读取

  • DynamoDB ETL 读取器不支持筛选条件或下推谓词。MongoDB 和 DocumentDB 也不支持这种功能。

  • 从以开放表格式存储在 Amazon S3 中的数据中读取数据时,Amazon S3 中文件的分区方法已不再足够。要使用开放表格式从分区读取和写入,请查阅格式文档。

  • DynamicFrame 方法不执行 Amazon S3 投影下推。所有列都将从传递谓词筛选器的文件中读取。

  • 在 Amazon Glue 中使用 custom.jdbc 连接器时,下推的能力取决于源和连接器。请查看相应的连接器文档,以确认它是否以及如何支持 Amazon Glue 中的下推。