映射类 - AWS Glue
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

映射类

通过将函数应用于输入 DynamicFrame 中的所有记录来生成新的 DynamicFrame

Methods

__call__(帧,f,transformation_ctx="",info=", stageThreshold=0, totalThreshold=0)

返回一个新的 DynamicFrame,它源自将指定函数应用到原始 DynamicFrame 中的所有 DynamicRecords

  • frame – 要对其应用映射函数的原始 DynamicFrame(必需)。

  • f – 应用于所有 DynamicRecordsDynamicFrame。功能必须采用 DynamicRecord 作为参数并返回新的 DynamicRecord 由映射生成(必需)。

    DynamicRecord 表示 DynamicFrame。它类似于ApacheSpark中的一行 DataFrame,例外是它属于自我描述,并且可用于不符合固定框架的数据。

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

  • info – 与转换中的错误关联的字符串(可选)。

  • stageThreshold – 在转换出错之前可能在其中发生的最大错误数(可选;默认值为零)。

  • totalThreshold – 在处理出错之前可能全面发生的最大错误数(可选;默认值为零)。

返回一个新的 DynamicFrame,它源自将指定函数应用到原始 DynamicFrame 中的所有 DynamicRecords

apply(cls, *args, **kwargs)

继承自 GlueTransform apply

name(cls)

继承自 GlueTransform name (名称)

describeArgs(包括)

继承自 GlueTransform describeArgs

describeReturn(包括)

继承自 GlueTransform describeReturn

describeTransform(包括)

继承自 GlueTransform describeTransform

describeErrors(包括)

继承自 GlueTransform describeErrors

describe(cls)

继承自 GlueTransform describe

AWS Glue Python 示例

此示例使用 Map 转换将多个字段合并为一个 struct 类型。此处使用的数据集包括从两个 Data.CMS.gov 研究中心: 前100大诊断相关人群住院前瞻性支付系统提供者小结-2011财年),和 2011财年住院患者收费数据.

下载示例数据后,我们修改了它,以在文件末尾引入了几个错误的记录。此修改后的文件位于公共 Amazon S3 桶形 s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv。有关使用此数据集的另一个示例,请参阅 代码示例: 数据准备使用 ResolveChoice、Lambda和 ApplyMapping.

首先,为数据创建一个 DynamicFrame

from awsglue.context import GlueContext from awsglue.transforms import * from pyspark.context import SparkContext glueContext = GlueContext(SparkContext.getOrCreate()) dyF = glueContext.create_dynamic_frame.from_options( 's3', {'paths': ['s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv']}, 'csv', {'withHeader': True}) print "Full record count: ", dyF.count() dyF.printSchema()

此代码的输出应如下所示:

Full record count: 163065L root |-- DRG Definition: string |-- Provider Id: string |-- Provider Name: string |-- Provider Street Address: string |-- Provider City: string |-- Provider State: string |-- Provider Zip Code: string |-- Hospital Referral Region Description: string |-- Total Discharges: string |-- Average Covered Charges: string |-- Average Total Payments: string |-- Average Medicare Payments: string

接下来,创建一个映射函数,一边将 DynamicRecord 中的提供商地址字段合并到 struct 中,然后删除单个地址字段:

def MergeAddress(rec): rec["Address"] = {} rec["Address"]["Street"] = rec["Provider Street Address"] rec["Address"]["City"] = rec["Provider City"] rec["Address"]["State"] = rec["Provider State"] rec["Address"]["Zip.Code"] = rec["Provider Zip Code"] rec["Address"]["Array"] = [rec["Provider Street Address"], rec["Provider City"], rec["Provider State"], rec["Provider Zip Code"]] del rec["Provider Street Address"] del rec["Provider City"] del rec["Provider State"] del rec["Provider Zip Code"] return rec

在此映射函数中,行 rec["Address"] = {} 会在包含新结构的输入 DynamicRecord 中创建一个词典。

注意

Python map 字段在此受支持。例如,您不能有如下所示的行:

rec["Addresses"] = [] # ILLEGAL!

类似 rec["Address"]["Street"] = rec["Provider Street Address"] 的行会使用 Python 词典语法向新结构中添加字段。

在地址行添加到新结构中后,类似 del rec["Provider Street Address"] 的行会从 DynamicRecord 中删除单个字段。

现在,您可以使用 Map 转换将映射函数应用到 DynamicFrame 中的所有 DynamicRecords

mapped_dyF = Map.apply(frame = dyF, f = MergeAddress) mapped_dyF.printSchema()

您可以在一个 (扩展) 代码行中执行所有这些操作:

root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string