本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
管理 中用于 ETL 输出的分区AWS Glue
分区是用于组织数据集以便高效查询数据集的重要技术。它根据一个或多个列的不同值用分层目录结构来组织数据。
例如,您可以决定按日期(包括年、月和日)对 Amazon Simple Storage Service (Amazon S3) 中的应用程序日志分区。然后,与某一天的数据相对应的文件将放置在前缀下,例如
s3://my_bucket/logs/year=2018/month=01/day=23/
. Amazon AthenaAmazon Redshift Spectrum 和 AWS Glue 等系统可以使用这些分区按分区值筛选数据,而无需读取 Amazon
S3. 中的所有底层数据。
爬网程序不仅推断文件类型和架构,它们还会在填充 时自动标识数据集的分区结构。AWS Glue 数据目录. 生成的分区列可用于在 AWS Glue ETL 作业或 Amazon Athena. 之类的查询引擎中进行查询。
在对表进行爬网后,可以通过在 AWS Glue 控制台中导航到该表并选择 View Partitions (查看分区). 来查看爬网程序创建的分区。
对于采用 key=val
样式的 Apache Hive 风格分区路径,爬网程序会使用键名自动填充列名称。否则,它使用默认名称,如 partition_0
、partition_1
等。要在控制台上更改默认名称,请导航到表,选择 Edit Schema,然后在其中修改分区列的名称。
然后,在您的 ETL 脚本中,便可以筛选分区列。由于分区信息存储在 数据目录 中,因此,请使用 from_catalog
API 调用在 DynamicFrame
中包含分区列。例如,使用 create_dynamic_frame.from_catalog
而不是 create_dynamic_frame.from_options
。
使用下推谓词进行预筛选
在许多情况下,您可以使用下推谓词来筛选分区,而不必列出并读取数据集中的所有文件。您可以直接对 中的分区元数据应用筛选,而不是读取整个数据集,然后在 DynamicFrame 中筛选。数据目录. 这样,只需将您实际需要的内容列出和读取到 DynamicFrame 中即可。
例如,在 Python 中,您可以写入以下内容。
glue_context.create_dynamic_frame.from_catalog( database = "my_S3_data_set", table_name = "catalog_data_table", push_down_predicate = my_partition_predicate)
这会创建一个 DynamicFrame,它仅在 数据目录 中加载满足谓词表达式的分区。根据您要加载的数据子集的规模,这样可以节省大量处理时间。
谓词表达式可以是 Spark SQL 支持的任何布尔表达式。您可以在 Spark SQL 查询的 WHERE
子句中放置的任何内容都可以使用。例如,谓词表达式 pushDownPredicate = "(year=='2017' and month=='04')"
仅加载数据目录中 year
等于 2017 并且 month
等于 04 的分区。有关更多信息,请参阅 Apache Spark SQL 文档
除了用于 Amazon S3 路径的 Hive 风格分区外,Apache Parquet 和 Apache ORC 文件格式会进一步将每个文件分区为表示列值的数据块。每个数据块还存储其包含的记录的统计信息,如列值的最小值/最大值。AWS Glue 支持将下推谓词以这些格式用于 Hive 风格分区和数据块分区。通过这种方法,您可以修剪 Parquet 和 ORC 格式的不必要的 Amazon S3 分区,并使用列统计信息跳过您认为不必要的数据块。
写入分区
默认情况下,DynamicFrame 在写入时不分区。所有输出文件都写入指定输出路径的顶级。直到最近,将 DynamicFrame 写入分区的唯一途径是在写入之前将其转换为 Spark SQL DataFrame。
但是,DynamicFrames 现在支持您在创建接收器时使用 partitionKeys
选项通过密钥序列进行本机分区。例如,以下 Python 代码将数据集以 Parquet 格式写出到 Amazon S3 中,写入到类型字段分区的目录中。然后,您可以使用其他系统(如
)处理这些分区。Amazon Athena.
glue_context.write_dynamic_frame.from_options(
frame = projectedEvents,
connection_type = "s3",
connection_options = {"path": "$outpath", "partitionKeys": ["type"]},
format = "parquet")