映射类 - AWS Glue
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

映射类

通过将函数应用于输入 DynamicFrame 中的所有记录来生成新的 DynamicFrame

方法

__call__(frame, f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

返回一个新的 DynamicFrame,它源自将指定函数应用到原始 DynamicFrame 中的所有 DynamicRecords

  • frame – 要对其应用映射函数的原始 DynamicFrame(必需)。

  • f – 要应用到 DynamicFrame 中的所有 DynamicRecords 的函数。该函数必须采用 DynamicRecord 作为参数并返回一个由映射生成的新 DynamicRecord (必需)。

    DynamicRecord 表示 DynamicFrame 中的逻辑记录。它类似于 Apache Spark DataFrame 中的一行,只不过它是自描述的,可用于不符合固定架构的数据。

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

  • info – 与转换中的错误关联的字符串(可选)。

  • stageThreshold – 在转换出错之前可能在其中发生的最大错误数(可选;默认值为零)。

  • totalThreshold – 在处理出错之前可能全面发生的最大错误数(可选;默认值为零)。

返回一个新的 DynamicFrame,它源自将指定函数应用到原始 DynamicFrame 中的所有 DynamicRecords

apply(cls, *args, **kwargs)

继承自 GlueTransform apply

name(cls)

继承自 GlueTransform name

describeArgs(cls)

继承自 GlueTransform describeArgs

describeReturn(cls)

继承自 GlueTransform describeReturn

describeTransform(cls)

继承自 GlueTransform describeTransform

describeErrors(cls)

继承自 GlueTransform describeErrors

describe(cls)

继承自 GlueTransform 描述

AWS Glue Python 示例

此示例使用 Map 转换将多个字段合并为一个 struct 类型。此处使用的数据集包括从两个 Data.CMS.gov 站点下载的医疗保健提供商付款数据:Inpatient Prospective Payment System Provider Summary for the Top 100 Diagnosis-Related Groups – FY2011) 和 Inpatient Charge Data FY 2011

下载示例数据后,我们修改了它,以在文件末尾引入了几个错误的记录。这个修改过的文件位于 s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv 上的公有 Amazon S3 存储桶中。有关使用此数据集的另一个示例,请参阅代码示例:使用 ResolveChoice、Lambda 和 ApplyMapping 进行数据准备

首先,为数据创建一个 DynamicFrame

from awsglue.context import GlueContext from awsglue.transforms import * from pyspark.context import SparkContext glueContext = GlueContext(SparkContext.getOrCreate()) dyF = glueContext.create_dynamic_frame.from_options( 's3', {'paths': ['s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv']}, 'csv', {'withHeader': True}) print "Full record count: ", dyF.count() dyF.printSchema()

此代码的输出应如下所示:

Full record count: 163065L root |-- DRG Definition: string |-- Provider Id: string |-- Provider Name: string |-- Provider Street Address: string |-- Provider City: string |-- Provider State: string |-- Provider Zip Code: string |-- Hospital Referral Region Description: string |-- Total Discharges: string |-- Average Covered Charges: string |-- Average Total Payments: string |-- Average Medicare Payments: string

接下来,创建一个映射函数,一边将 DynamicRecord 中的提供商地址字段合并到 struct 中,然后删除单个地址字段:

def MergeAddress(rec): rec["Address"] = {} rec["Address"]["Street"] = rec["Provider Street Address"] rec["Address"]["City"] = rec["Provider City"] rec["Address"]["State"] = rec["Provider State"] rec["Address"]["Zip.Code"] = rec["Provider Zip Code"] rec["Address"]["Array"] = [rec["Provider Street Address"], rec["Provider City"], rec["Provider State"], rec["Provider Zip Code"]] del rec["Provider Street Address"] del rec["Provider City"] del rec["Provider State"] del rec["Provider Zip Code"] return rec

在此映射函数中,行 rec["Address"] = {} 会在包含新结构的输入 DynamicRecord 中创建一个词典。

注意

Python map 字段在此受支持。例如,您不能有如下所示的行:

rec["Addresses"] = [] # ILLEGAL!

类似 rec["Address"]["Street"] = rec["Provider Street Address"] 的行会使用 Python 词典语法向新结构中添加字段。

在地址行添加到新结构中后,类似 del rec["Provider Street Address"] 的行会从 DynamicRecord 中删除单个字段。

现在,您可以使用 Map 转换将映射函数应用到 DynamicFrame 中的所有 DynamicRecords

mapped_dyF = Map.apply(frame = dyF, f = MergeAddress) mapped_dyF.printSchema()

输出如下所示:

root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string