筛选器类 - AWS Glue
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

如果我们为英文版本指南提供翻译,那么如果存在任何冲突,将以英文版本指南为准。在提供翻译时使用机器翻译。

筛选器类

通过从满足指定谓词函数的输入 DynamicFrame 中选择记录来生成新的 DynamicFrame

方法

__call__(frame, f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0))

通过从满足指定谓词函数的输入 DynamicFrame 中选择记录来返回新的 DynamicFrame 生成。

  • frame – 要将指定的筛选函数应用到的源 DynamicFrame(必需)。

  • f – 要应用到 DynamicFrame 中的每个 DynamicRecord 的谓词函数。该函数必须采用 DynamicRecord 作为参数,并且如果 DynamicRecord 满足筛选要求,则返回 True,否则返回 False (必需)。

    DynamicRecord 表示 DynamicFrame 中的逻辑记录。它类似于 Spark DataFrame 中的一行,只不过它是自描述的,可用于不符合固定架构的数据。

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

  • info – 与转换中的错误关联的字符串(可选)。

  • stageThreshold – 在转换出错之前可能在其中发生的最大错误数(可选;默认值为零)。

  • totalThreshold – 在处理出错之前可能全面发生的最大错误数(可选;默认值为零)。

apply(cls, *args, **kwargs)

继承自 GlueTransform apply

name(cls)

继承自 GlueTransform name

describeArgs(cls)

继承自 GlueTransform describeArgs

describeReturn(cls)

继承自 GlueTransform describeReturn

describeTransform(cls)

继承自 GlueTransform describeTransform

describeErrors(cls)

继承自 GlueTransform describeErrors

describe(cls)

继承自 GlueTransform 描述

AWS Glue Python 示例

此示例使用 Filter 转换和一个简单的 Lambda 函数来筛选示例数据。此处使用的数据集包括从两个 Data.CMS.gov 站点下载的医疗保健提供商付款数据:Inpatient Prospective Payment System Provider Summary for the Top 100 Diagnosis-Related Groups – FY2011) 和 Inpatient Charge Data FY 2011

下载示例数据后,我们修改了它,以在文件末尾引入了几个错误的记录。这个修改过的文件位于 s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv 上的公有 Amazon S3 存储桶中。有关使用此数据集的另一个示例,请参阅代码示例:使用 ResolveChoice、Lambda 和 ApplyMapping 进行数据准备

首先,为数据创建一个 DynamicFrame

%pyspark from awsglue.context import GlueContext from awsglue.transforms import * from pyspark.context import SparkContext glueContext = GlueContext(SparkContext.getOrCreate()) dyF = glueContext.create_dynamic_frame.from_options( 's3', {'paths': ['s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv']}, 'csv', {'withHeader': True}) print "Full record count: ", dyF.count() dyF.printSchema()

输出应如下所示:

Full record count: 163065L root |-- DRG Definition: string |-- Provider Id: string |-- Provider Name: string |-- Provider Street Address: string |-- Provider City: string |-- Provider State: string |-- Provider Zip Code: string |-- Hospital Referral Region Description: string |-- Total Discharges: string |-- Average Covered Charges: string |-- Average Total Payments: string |-- Average Medicare Payments: string

接下来,使用 Filter 转换来压缩数据集,仅保留来自加利福尼亚州萨克拉门托或阿拉巴马州蒙哥马利的那些条目。筛选转换适用于任何接受 DynamicRecord 作为输入并在 DynamicRecord 满足筛选要求时返回 True,否则返回 False 的筛选函数。

注意

您可以使用 Python 的点表示法来访问 DynamicRecord 中的多个字段。例如,您可以访问 dynamic_record_X 中的 column_A 字段,如:dynamic_record_X.column_A

但是,此技术不用于包含除字母数字字符和下划线之外的任何内容的字段名称。对于包含其他字符 (如空格或句点) 的字段,您必须退回到 Python 的字典表示法。例如,要访问名为 col-B 的字段,请使用:dynamic_record_X["col-B"]

您可以使用一个简单的 Lambda 函数与 Filter 转换来删除所有不来自萨克拉门托或蒙哥马利的 DynamicRecords。要确认此操作是否有效,请打印出保留的记录数:

sac_or_mon_dyF = Filter.apply(frame = dyF, f = lambda x: x["Provider State"] in ["CA", "AL"] and x["Provider City"] in ["SACRAMENTO", "MONTGOMERY"]) print "Filtered record count: ", sac_or_mon_dyF.count()

您得到的输出如下所示:

Filtered record count: 564L