DynamicFrame 班级 - Amazon Glue
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

DynamicFrame 班级

Apache Spark 中的主要抽象之一是 SparkSQL DataFrame,它类似于在 R 和 Pandas 中找到的 DataFrame 构造。DataFrame 类似于表格,支持函数风格(map/reduce/filter/等)操作和 SQL 操作(select、project、aggregate)。

尽管 DataFrames 功能强大且运用广泛,但在提取、转换和加载 (ETL) 操作方面存在局限性。最明显的是,它们需要指定了架构后才能加载任何数据。SparkSQL 通过对数据进行两次扫描解决了这一问题 – 第一次为了推断架构,第二次为了加载数据。然而,此推断仍有局限性,且不能解决数据混乱的实际问题。例如,同样的字段在不同记录中可能属于不同类型。对此,Apache Spark 通常没有好的办法,只能使用原始字段文本将类型报告为 string。这或许不正确,同时您可能希望更精细地控制解决架构差异的方式。对于大型数据集而言,每扫描一次源数据都会付出高昂代价。

为了解决这些限制, Amazon Glue 推出了DynamicFrameDynamicFrame 类似于 DataFrame,不同之处在于每个记录都是自描述的,因此初始并不需要架构。取而代之的是, on-the-fly在需要时Amazon Glue计算架构,并使用选择(或联合)类型显式编码架构不一致之处。您可以解决这些不一致之处,以使您的数据集兼容需要固定架构的数据存储。

同样,一个 DynamicRecord 表示 DynamicFrame 中的一条逻辑记录。它类似于 Spark DataFrame 中的一行,只不过它是自描述的,可用于不符合固定架构的数据。将 Amazon Glue 与配合使用时 PySpark,通常不会独立操作DynamicRecords。相反,您需要通过 DynamicFrame 一起转换数据集。

在解决任何架构不一致问题后,就可以在 DynamicFramesDataFrames 之间来回转换。

 – 构造 –

__init__

__init__(jdf, glue_ctx, name)
  • jdf – 对 Java 虚拟机 (JVM) 中数据帧的引用。

  • glue_ctx – 一个 GlueContext 类 对象。

  • name – 可选的名称字符串,默认为空。

fromDF

fromDF(dataframe, glue_ctx, name)

通过将 DataFrame 字段转换为 DynamicRecord 字段将 DataFrame 转换为 DynamicFrame。返回新的 DynamicFrame

一个 DynamicRecord 表示 DynamicFrame 中的一条逻辑记录。它类似于 Spark DataFrame 中的一行,只不过它是自描述的,可用于不符合固定架构的数据。

此函数期望您 DataFrame 中名称重复的列已经被解析。

  • dataframe – 要转换的 Apache Spark SQL DataFrame (必需)。

  • glue_ctx – 为此转换指定上下文的 GlueContext 类 对象 (必需)。

  • name— 结果的名称DynamicFrame(从 Amazon Glue 3.0 起是可选的)。

toDF

toDF(options)

通过将 DynamicRecords 转换为 DataFrame 字段将 DynamicFrame 转换为 Apache Spark DataFrame。返回新的 DataFrame

一个 DynamicRecord 表示 DynamicFrame 中的一条逻辑记录。它类似于 Spark DataFrame 中的一行,只不过它是自描述的,可用于不符合固定架构的数据。

  • options – 一个选项列表。如果您选择 ProjectCast 操作类型,则指定目标类型。示例包括以下内容。

    >>>toDF([ResolveOption("a.b.c", "KeepAsStruct")]) >>>toDF([ResolveOption("a.b.c", "Project", DoubleType())])

 – 信息 –

count

count( ) – 返回底层 DataFrame 中的行数。

架构

schema( ) – 返回此 DynamicFrame 的架构,如果此架构不可用,则返回底层 DataFrame 的架构。

有关构成此架构的 DynamicFrame 类型的更多信息,请参阅 PySpark 扩展类型

printSchema

printSchema( ) – 打印底层 DataFrame 的架构。

show

show(num_rows) – 打印底层 DataFrame 中的行数。

repartition

repartition(numPartitions) – 返回具有 numPartitions 个分区的新 DynamicFrame

coalesce

coalesce(numPartitions) – 返回具有 numPartitions 个分区的新 DynamicFrame

 – 转换 –

apply_mapping

apply_mapping(mappings, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

将声明映射应用于此 DynamicFrame 并返回已将那些映射应用于所指定字段的新 DynamicFrame。新 DynamicFrame 中省略了未指定的字段。

  • mappings – 映射元组列表(必需)。每个元组包括:(源列,源类型,目标列,目标类型)。

    如果源列的名称有一个点“.”,则必须在名称外加上反引号“``”。例如,要将 this.old.name(字符串)映射到 thisNewName,可以使用以下元组:

    ("`this.old.name`", "string", "thisNewName", "string")
  • transformation_ctx – 用于标识状态信息的唯一字符串 (可选)。

  • info – 与此转换的错误报告关联的字符串 (可选)。

  • stageThreshold – 此转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。

  • totalThreshold – 此转换之前及转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。

示例:使用 apply_mapping 重命名字段并更改字段类型

以下代码示例展示了如何使用 apply_mapping 方法重命名选定字段并更改字段类型。

注意

要访问本示例中使用的数据集,请参阅 代码示例:对数据进行联接和关系化 并按照 步骤 1:爬取 Amazon S3 存储桶中的数据 中的说明进行操作。

# Example: Use apply_mapping to reshape source data into # the desired column names and types as a new DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Select and rename fields, change field type print("Schema for the persons_mapped DynamicFrame, created with apply_mapping:") persons_mapped = persons.apply_mapping( [ ("family_name", "String", "last_name", "String"), ("name", "String", "first_name", "String"), ("birth_date", "String", "date_of_birth", "Date"), ] ) persons_mapped.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the persons_mapped DynamicFrame, created with apply_mapping: root |-- last_name: string |-- first_name: string |-- date_of_birth: date

drop_fields

drop_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

调用 FlatMap 类 转换以从 DynamicFrame 中删除字段。返回指定字段已删除的新的 DynamicFrame

  • paths – 字符串列表。每个字符串都包含要删除的字段节点的完整路径。您可以使用点表示法来指定嵌套字段。例如,如果字段 first 在树结构中是字段 name 的子字段,则为路径指定 "name.first"

    如果字段节点的名称有文字 .,则必须用反引号(`)将名称括起来。

  • transformation_ctx – 用于标识状态信息的唯一字符串 (可选)。

  • info – 与此转换的错误报告关联的字符串 (可选)。

  • stageThreshold – 此转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。

  • totalThreshold – 此转换之前及转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。

示例:使用 drop_fields 从 DynamicFrame 中删除字段

此代码示例使用 drop_fields 方法从 DynamicFrame 中移除选定的顶级字段和嵌套字段。

示例数据集

该示例使用以下数据集,其在代码中由 EXAMPLE-FRIENDS-DATA 表表示:

{"name": "Sally", "age": 23, "location": {"state": "WY", "county": "Fremont"}, "friends": []} {"name": "Varun", "age": 34, "location": {"state": "NE", "county": "Douglas"}, "friends": [{"name": "Arjun", "age": 3}]} {"name": "George", "age": 52, "location": {"state": "NY"}, "friends": [{"name": "Fred"}, {"name": "Amy", "age": 15}]} {"name": "Haruki", "age": 21, "location": {"state": "AK", "county": "Denali"}} {"name": "Sheila", "age": 63, "friends": [{"name": "Nancy", "age": 22}]}

示例代码

# Example: Use drop_fields to remove top-level and nested fields from a DynamicFrame. # Replace MY-EXAMPLE-DATABASE with your Glue Data Catalog database name. # Replace EXAMPLE-FRIENDS-DATA with your table name. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame from Glue Data Catalog glue_source_database = "MY-EXAMPLE-DATABASE" glue_source_table = "EXAMPLE-FRIENDS-DATA" friends = glueContext.create_dynamic_frame.from_catalog( database=glue_source_database, table_name=glue_source_table ) print("Schema for friends DynamicFrame before calling drop_fields:") friends.printSchema() # Remove location.county, remove friends.age, remove age friends = friends.drop_fields(paths=["age", "location.county", "friends.age"]) print("Schema for friends DynamicFrame after removing age, county, and friend age:") friends.printSchema()
Schema for friends DynamicFrame before calling drop_fields: root |-- name: string |-- age: int |-- location: struct | |-- state: string | |-- county: string |-- friends: array | |-- element: struct | | |-- name: string | | |-- age: int Schema for friends DynamicFrame after removing age, county, and friend age: root |-- name: string |-- location: struct | |-- state: string |-- friends: array | |-- element: struct | | |-- name: string

filter

filter(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

返回新 DynamicFrame,其中包含输入 DynamicFrame 中满足指定谓词函数 f 的所有 DynamicRecords

  • f – 应用到 DynamicFrame 的谓词函数。该函数必须采用 DynamicRecord 作为参数,如果 DynamicRecord 满足筛选要求,则返回 True,否则返回 False (必需)。

    一个 DynamicRecord 表示 DynamicFrame 中的一条逻辑记录。其类似于 Spark DataFrame 中的一行,但其具有自描述性,可用于不符合固定架构的数据。

  • transformation_ctx – 用于标识状态信息的唯一字符串 (可选)。

  • info – 与此转换的错误报告关联的字符串 (可选)。

  • stageThreshold – 此转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。

  • totalThreshold – 此转换之前及转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。

示例:使用筛选条件获取筛选后的字段选择

此示例用 filter 方法创建新 DynamicFrame,其中包括筛选后的其他 DynamicFrame 字段选择。

类似于 map 方法,filter 采用一个函数作为实际参数,以将其应用于原 DynamicFrame 中的每个记录。该函数将记录作为输入并返回布尔值。如果返回值为 true,则该记录将包含在生成的 DynamicFrame 中。如果返回值为 false,则该记录将被排除在外。

注意

要访问本示例中使用的数据集,请参阅 代码示例:使用 ResolveChoice、Lambda 和进行数据准备 ApplyMapping 并按照 步骤 1:爬取 Amazon S3 存储桶中的数据 中的说明进行操作。

# Example: Use filter to create a new DynamicFrame # with a filtered selection of records from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create DynamicFrame from Glue Data Catalog medicare = glueContext.create_dynamic_frame.from_options( "s3", { "paths": [ "s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv" ] }, "csv", {"withHeader": True}, ) # Create filtered DynamicFrame with custom lambda # to filter records by Provider State and Provider City sac_or_mon = medicare.filter( f=lambda x: x["Provider State"] in ["CA", "AL"] and x["Provider City"] in ["SACRAMENTO", "MONTGOMERY"] ) # Compare record counts print("Unfiltered record count: ", medicare.count()) print("Filtered record count: ", sac_or_mon.count())
Unfiltered record count: 163065 Filtered record count: 564

join

join(paths1, paths2, frame2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

执行与另一个 DynamicFrame 的等式联接并返回生成的 DynamicFrame

  • paths1 – 此帧中要联接的键列表。

  • paths2 – 另一帧中要联接的键列表。

  • frame2 – 要联接的另一个 DynamicFrame

  • transformation_ctx – 用于标识状态信息的唯一字符串 (可选)。

  • info – 与此转换的错误报告关联的字符串 (可选)。

  • stageThreshold – 此转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。

  • totalThreshold – 此转换之前及转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。

示例:使用联接来组合 DynamicFrames

此示例使用join方法对三个执行连接DynamicFrames。 Amazon Glue 根据您提供的字段键执行连接。由此生成的 DynamicFrame 包含来自指定密钥相匹配的两个原始帧中的行。

请注意,join 转换会让所有字段保持不变。这意味着您指定要匹配的字段会出现在结果中 DynamicFrame,即使它们是多余的并且包含相同的键也是如此。在此示例中,我们在联接后使用 drop_fields 移除这些冗余密钥。

注意

要访问本示例中使用的数据集,请参阅 代码示例:对数据进行联接和关系化 并按照 步骤 1:爬取 Amazon S3 存储桶中的数据 中的说明进行操作。

# Example: Use join to combine data from three DynamicFrames from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load DynamicFrames from Glue Data Catalog persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json" ) orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() print("Schema for the memberships DynamicFrame:") memberships.printSchema() print("Schema for the orgs DynamicFrame:") orgs.printSchema() # Join persons and memberships by ID persons_memberships = persons.join( paths1=["id"], paths2=["person_id"], frame2=memberships ) # Rename and drop fields from orgs # to prevent field name collisions with persons_memberships orgs = ( orgs.drop_fields(["other_names", "identifiers"]) .rename_field("id", "org_id") .rename_field("name", "org_name") ) # Create final join of all three DynamicFrames legislators_combined = orgs.join( paths1=["org_id"], paths2=["organization_id"], frame2=persons_memberships ).drop_fields(["person_id", "org_id"]) # Inspect the schema for the joined data print("Schema for the new legislators_combined DynamicFrame:") legislators_combined.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the memberships DynamicFrame: root |-- area_id: string |-- on_behalf_of_id: string |-- organization_id: string |-- role: string |-- person_id: string |-- legislative_period_id: string |-- start_date: string |-- end_date: string Schema for the orgs DynamicFrame: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string Schema for the new legislators_combined DynamicFrame: root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string

映射

map(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

返回由于将指定映射函数应用到原始 DynamicFrame 中的所有记录而产生的新的 DynamicFrame

  • f – 应用到 DynamicFrame 中所有记录的映射函数。该函数必须采用 DynamicRecord 作为参数并返回一个新的 DynamicRecord (必需)。

    一个 DynamicRecord 表示 DynamicFrame 中的一条逻辑记录。它类似于 Apache Spark DataFrame 中的一行,但其具有自描述性,可用于不符合固定架构的数据。

  • transformation_ctx – 用于标识状态信息的唯一字符串 (可选)。

  • info – 与转换中的错误关联的字符串(可选)。

  • stageThreshold – 在转换出错之前可能在其中发生的最大错误数(可选)。默认值为 0。

  • totalThreshold – 在处理出错之前可能全面发生的最大错误数(可选)。默认值为 0。

示例:使用映射将函数应用于 DynamicFrame 中的每个记录

此示例展示了如何使用 map 方法将函数应用于 DynamicFrame 的每个记录。具体而言,此示例将名为 MergeAddress 的函数应用于每个记录,以便将多个地址字段合并为一个 struct 类型。

注意

要访问本示例中使用的数据集,请参阅 代码示例:使用 ResolveChoice、Lambda 和进行数据准备 ApplyMapping 并按照 步骤 1:爬取 Amazon S3 存储桶中的数据 中的说明进行操作。

# Example: Use map to combine fields in all records # of a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema medicare = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv"]}, "csv", {"withHeader": True}) print("Schema for medicare DynamicFrame:") medicare.printSchema() # Define a function to supply to the map transform # that merges address fields into a single field def MergeAddress(rec): rec["Address"] = {} rec["Address"]["Street"] = rec["Provider Street Address"] rec["Address"]["City"] = rec["Provider City"] rec["Address"]["State"] = rec["Provider State"] rec["Address"]["Zip.Code"] = rec["Provider Zip Code"] rec["Address"]["Array"] = [rec["Provider Street Address"], rec["Provider City"], rec["Provider State"], rec["Provider Zip Code"]] del rec["Provider Street Address"] del rec["Provider City"] del rec["Provider State"] del rec["Provider Zip Code"] return rec # Use map to apply MergeAddress to every record mapped_medicare = medicare.map(f = MergeAddress) print("Schema for mapped_medicare DynamicFrame:") mapped_medicare.printSchema()
Schema for medicare DynamicFrame: root |-- DRG Definition: string |-- Provider Id: string |-- Provider Name: string |-- Provider Street Address: string |-- Provider City: string |-- Provider State: string |-- Provider Zip Code: string |-- Hospital Referral Region Description: string |-- Total Discharges: string |-- Average Covered Charges: string |-- Average Total Payments: string |-- Average Medicare Payments: string Schema for mapped_medicare DynamicFrame: root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

mergeDynamicFrame

mergeDynamicFrame(stage_dynamic_frame, primary_keys, transformation_ctx = "", options = {}, info = "", stageThreshold = 0, totalThreshold = 0)

基于指定主键的将此 DynamicFrame 与暂存 DynamicFrame 合并以标识记录。不会对重复记录(具有相同主键的记录)去除重复。如果暂存帧中没有匹配的记录,则从源中保留所有记录(包括重复记录)。如果暂存帧具有匹配的记录,则暂存帧中的记录将覆盖 Amazon Glue 中的源中的记录。

  • stage_dynamic_frame – 要合并的暂存 DynamicFrame

  • primary_keys – 要匹配源和暂存动态帧中的记录的主键字段列表。

  • transformation_ctx – 用于检索有关当前转换的元数据的唯一字符串(可选)。

  • options – 为此转换提供其他信息的 JSON 名称-值对的字符串。当前未使用此参数。

  • info – 一个 String。要与此转换中的错误关联的任何字符串。

  • stageThreshold – 一个 Long。给定转换中处理需要排除的错误的数目。

  • totalThreshold – 一个 Long。此转换中处理需要排除的错误的总数。

该方法将返回通过将此 DynamicFrame 与暂存 DynamicFrame 合并获取的新 DynamicFrame

在这些情况下,返回的 DynamicFrame 将包含记录 A:

  • 如果 A 在源帧和暂存帧中都存在,则返回暂存帧中的 A

  • 如果 A 在源表中,且 A.primaryKeys 不在 stagingDynamicFrame 中,这意味着未在暂存表中更新 A

源帧和暂存帧不需要具有相同的架构。

示例: mergeDynamicFrame 用于DynamicFrames根据主键合并两个

以下代码示例显示了如何使用 mergeDynamicFrame 方法根据主键 idDynamicFrame 与“staging”DynamicFrame 合并。

示例数据集

该示例使用了名为 split_rows_collectionDynamicFrameCollection 中的两个 DynamicFrames。以下是 split_rows_collection 中的键列表。

dict_keys(['high', 'low'])

示例代码

# Example: Use mergeDynamicFrame to merge DynamicFrames # based on a set of specified primary keys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import SelectFromCollection # Inspect the original DynamicFrames frame_low = SelectFromCollection.apply(dfc=split_rows_collection, key="low") print("Inspect the DynamicFrame that contains rows where ID < 10") frame_low.toDF().show() frame_high = SelectFromCollection.apply(dfc=split_rows_collection, key="high") print("Inspect the DynamicFrame that contains rows where ID > 10") frame_high.toDF().show() # Merge the DynamicFrames based on the "id" primary key merged_high_low = frame_high.mergeDynamicFrame( stage_dynamic_frame=frame_low, primary_keys=["id"] ) # View the results where the ID is 1 or 20 print("Inspect the merged DynamicFrame that contains the combined rows") merged_high_low.toDF().where("id = 1 or id= 20").orderBy("id").show()
Inspect the DynamicFrame that contains rows where ID < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 2| 0| fax| 202-225-3307| | 2| 1| phone| 202-225-5731| | 3| 0| fax| 202-225-3307| | 3| 1| phone| 202-225-5731| | 4| 0| fax| 202-225-3307| | 4| 1| phone| 202-225-5731| | 5| 0| fax| 202-225-3307| | 5| 1| phone| 202-225-5731| | 6| 0| fax| 202-225-3307| | 6| 1| phone| 202-225-5731| | 7| 0| fax| 202-225-3307| | 7| 1| phone| 202-225-5731| | 8| 0| fax| 202-225-3307| | 8| 1| phone| 202-225-5731| | 9| 0| fax| 202-225-3307| | 9| 1| phone| 202-225-5731| | 10| 0| fax| 202-225-6328| | 10| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains rows where ID > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| fax| 202-225-6328| | 11| 1| phone| 202-225-4576| | 11| 2| twitter| RepTrentFranks| | 12| 0| fax| 202-225-6328| | 12| 1| phone| 202-225-4576| | 12| 2| twitter| RepTrentFranks| | 13| 0| fax| 202-225-6328| | 13| 1| phone| 202-225-4576| | 13| 2| twitter| RepTrentFranks| | 14| 0| fax| 202-225-6328| | 14| 1| phone| 202-225-4576| | 14| 2| twitter| RepTrentFranks| | 15| 0| fax| 202-225-6328| | 15| 1| phone| 202-225-4576| | 15| 2| twitter| RepTrentFranks| | 16| 0| fax| 202-225-6328| | 16| 1| phone| 202-225-4576| | 16| 2| twitter| RepTrentFranks| | 17| 0| fax| 202-225-6328| | 17| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the merged DynamicFrame that contains the combined rows +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 20| 0| fax| 202-225-5604| | 20| 1| phone| 202-225-6536| | 20| 2| twitter| USRepLong| +---+-----+------------------------+-------------------------+

relationalize

relationalize(root_table_name, staging_path, options, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

DynamicFrame 转换为适用于关系数据库的形式。将数据从 DynamoDB 等 NoSQL 环境移动到 MySQL 等关系数据库时,对 DynamicFrame 进行关系化尤其有用。

该转换将通过取消嵌套列的嵌套并透视数组列来生成帧列表。可使用在取消嵌套阶段生成的联接键将透视数组列联接到根表。

  • root_table_name – 根表的名称。

  • staging_path – 要将 CSV 格式的透视表分区存储到的路径(可选)。从该路径读取透视表。

  • options – 可选参数的词典。

  • transformation_ctx – 用于标识状态信息的唯一字符串 (可选)。

  • info – 与此转换的错误报告关联的字符串 (可选)。

  • stageThreshold – 此转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。

  • totalThreshold – 此转换之前及转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。

示例:使用 relatialize 在 DynamicFrame 中展平嵌套架构

此代码示例使用 relationalize 方法将嵌套架构展平为适用于关系数据库的形式。

示例数据集

该示例通过以下架构使用了名为 legislators_combinedDynamicFramelegislators_combined 有多个嵌套字段,例如 linksimagescontact_details,这些字段将通过 relationalize 转换进行展平。

root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string

示例代码

# Example: Use relationalize to flatten # a nested schema into a format that fits # into a relational database. # Replace DOC-EXAMPLE-S3-BUCKET/tmpDir with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Apply relationalize and inspect new tables legislators_relationalized = legislators_combined.relationalize( "l_root", "s3://DOC-EXAMPLE-BUCKET/tmpDir" ) legislators_relationalized.keys() # Compare the schema of the contact_details # nested field to the new relationalized table that # represents it legislators_combined.select_fields("contact_details").printSchema() legislators_relationalized.select("l_root_contact_details").toDF().where( "id = 10 or id = 75" ).orderBy(["id", "index"]).show()

以下输出可让您将名为 contact_details 的嵌套字段的架构与 relationalize 转换创建的表进行比较。请注意,表记录可使用名为 id 的外键和表示数组位置的 index 列链接回主表。

dict_keys(['l_root', 'l_root_images', 'l_root_links', 'l_root_other_names', 'l_root_contact_details', 'l_root_identifiers']) root |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 10| 0| fax| 202-225-4160| | 10| 1| phone| 202-225-3436| | 75| 0| fax| 202-225-6791| | 75| 1| phone| 202-225-2861| | 75| 2| twitter| RepSamFarr| +---+-----+------------------------+-------------------------+

rename_field

rename_field(oldName, newName, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

重命名此 DynamicFrame 中的一个字段并返回包含该重命名字段的新的 DynamicFrame

  • oldName – 要重命名的节点的完整路径。

    如果旧名称中包含点,则 RenameField 将不起作用,除非使用反引号(`)将其引起来。例如,要将 this.old.name 替换为 thisNewName,应按如下方式调用 rename_field。

    newDyF = oldDyF.rename_field("`this.old.name`", "thisNewName")
  • newName – 新名称,作为完整路径。

  • transformation_ctx – 用于标识状态信息的唯一字符串 (可选)。

  • info – 与此转换的错误报告关联的字符串 (可选)。

  • stageThreshold – 此转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。

  • totalThreshold – 此转换之前及转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。

示例:使用 rename_field 重命名 DynamicFrame 中的字段

此代码示例可使用 rename_field 方法重命名 DynamicFrame 中的字段。请注意,该示例可使用方法链同时重命名多个字段。

注意

要访问本示例中使用的数据集,请参阅 代码示例:对数据进行联接和关系化 并按照 步骤 1:爬取 Amazon S3 存储桶中的数据 中的说明进行操作。

示例代码

# Example: Use rename_field to rename fields # in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Inspect the original orgs schema orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Original orgs schema: ") orgs.printSchema() # Rename fields and view the new schema orgs = orgs.rename_field("id", "org_id").rename_field("name", "org_name") print("New orgs schema with renamed fields: ") orgs.printSchema()
Original orgs schema: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string New orgs schema with renamed fields: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- classification: string |-- org_id: string |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string

resolveChoice

resolveChoice(specs = None, choice = "" , database = None , table_name = None , transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, catalog_id = None)

解析此 DynamicFrame 内的一个选择类型并返回新的 DynamicFrame

  • specs – 要解析的特定歧义列表,每个歧义均采用元组形式:(field_path, action)

    可通过两种方式使用 resolveChoice。第一种是使用 specs 参数指定一系列特定字段以及如何解析它们。resolveChoice 的另一种模式是使用 choice 参数为所有 ChoiceTypes 指定单个解析方法。

    specs 的值被指定为由 (field_path, action) 对组成的元组。field_path 值标识特定歧义元素,action 值标识相应解析。可能的操作如下:

    • cast:type – 尝试将所有值转换为指定的类型。例如:cast:int

    • make_cols – 将每个不同的类型转换为名为 columnName_type 的列。通过展平数据来解析潜在的歧义。例如,如果 columnAintstring,则解析就是在结果 DynamicFrame 中生成名为 columnA_intcolumnA_string 的两个列。

    • make_struct – 使用 struct 表示数据,解析潜在的歧义。例如,如果某个列中的数据是 intstring,则使用 make_struct 操作会在结果 DynamicFrame 中生成结构列。每个结构都包含 intstring

    • project:type – 将所有数据投影到可能的数据类型之一,解析潜在的歧义。例如,如果某个列中的数据是 intstring,则使用 project:string 操作会在结果 DynamicFrame 中生成一个列,其中所有 int 值都已转换为字符串。

    如果 field_path 识别到数组,则在数组名称后放置一个空的方括号可避免歧义。例如,假设您正在使用结构如下的数据:

    "myList": [ { "price": 100.00 }, { "price": "$100.00" } ]

    可以通过将 field_path 设置为 "myList[].price"、将 action 设置为 "cast:double" 来选择价格的数字而非字符串版本。

    注意

    只能使用 specschoice 参数之一。如果 specs 参数不为 None,则 choice 参数必须为空字符串。反过来,如果 choice 不为空字符串,则 specs 参数必须为 None

  • choice – 为所有 ChoiceTypes 指定单个解析方法。这可以在运行前不知道 ChoiceTypes 的完整列表的情况下使用。除了之前为 specs 列出的操作外,此参数还支持以下操作:

    • match_catalog – 尝试将每个 ChoiceType 转换为指定数据目录表中的对应类型。

  • database – 与 match_catalog 操作一起使用的数据目录数据库。

  • table_name – 与 match_catalog 操作一起使用的数据目录表。

  • transformation_ctx – 用于标识状态信息的唯一字符串 (可选)。

  • info – 与此转换的错误报告关联的字符串 (可选)。

  • stageThreshold – 此转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。

  • totalThreshold – 此转换之前及转换过程中遇到的将导致过程出错的错误数(可选)。默认为零,表示此过程应该不会出错。

  • catalog_id – 正在访问的数据目录的目录 ID(数据目录的账户 ID)。如果设置为 None(默认值),它使用调用账户的目录 ID。

示例:使用 resolveChoice 处理包含多种类型的列

此代码示例使用 resolveChoice 方法来指定如何处理包含多种类型值的 DynamicFrame 列。该示例演示了处理不同类型的列的两种常用方法:

  • 将该列转换为单一数据类型。

  • 在单独的列中保留所有类型。

示例数据集

注意

要访问本示例中使用的数据集,请参阅 代码示例:使用 ResolveChoice、Lambda 和进行数据准备 ApplyMapping 并按照 步骤 1:爬取 Amazon S3 存储桶中的数据 中的说明进行操作。

该示例通过以下架构使用了名为 medicareDynamicFrame

root |-- drg definition: string |-- provider id: choice | |-- long | |-- string |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string

示例代码

# Example: Use resolveChoice to handle # a column that contains multiple types from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input data and inspect the "provider id" column medicare = glueContext.create_dynamic_frame.from_catalog( database="payments", table_name="medicare_hospital_provider_csv" ) print("Inspect the provider id column:") medicare.toDF().select("provider id").show() # Cast provider id to type long medicare_resolved_long = medicare.resolveChoice(specs=[("provider id", "cast:long")]) print("Schema after casting provider id to type long:") medicare_resolved_long.printSchema() medicare_resolved_long.toDF().select("provider id").show() # Create separate columns # for each provider id type medicare_resolved_cols = medicare.resolveChoice(choice="make_cols") print("Schema after creating separate columns for each type:") medicare_resolved_cols.printSchema() medicare_resolved_cols.toDF().select("provider id_long", "provider id_string").show()
Inspect the 'provider id' column: +-----------+ |provider id| +-----------+ | [10001,]| | [10005,]| | [10006,]| | [10011,]| | [10016,]| | [10023,]| | [10029,]| | [10033,]| | [10039,]| | [10040,]| | [10046,]| | [10055,]| | [10056,]| | [10078,]| | [10083,]| | [10085,]| | [10090,]| | [10092,]| | [10100,]| | [10103,]| +-----------+ only showing top 20 rows Schema after casting 'provider id' to type long: root |-- drg definition: string |-- provider id: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +-----------+ |provider id| +-----------+ | 10001| | 10005| | 10006| | 10011| | 10016| | 10023| | 10029| | 10033| | 10039| | 10040| | 10046| | 10055| | 10056| | 10078| | 10083| | 10085| | 10090| | 10092| | 10100| | 10103| +-----------+ only showing top 20 rows Schema after creating separate columns for each type: root |-- drg definition: string |-- provider id_string: string |-- provider id_long: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +----------------+------------------+ |provider id_long|provider id_string| +----------------+------------------+ | 10001| null| | 10005| null| | 10006| null| | 10011| null| | 10016| null| | 10023| null| | 10029| null| | 10033| null| | 10039| null| | 10040| null| | 10046| null| | 10055| null| | 10056| null| | 10078| null| | 10083| null| | 10085| null| | 10090| null| | 10092| null| | 10100| null| | 10103| null| +----------------+------------------+ only showing top 20 rows

select_fields

select_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

返回包含选定字段的新 DynamicFrame

  • paths – 字符串列表。每个字符串就是一个指向您要选择的顶层节点的路径。

  • transformation_ctx – 用于标识状态信息的唯一字符串 (可选)。

  • info – 与此转换的错误报告关联的字符串 (可选)。

  • stageThreshold – 此转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。

  • totalThreshold – 此转换之前及转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。

示例:使用 select_fields 以利用选定字段创建新 DynamicFrame

以下代码示例展示了如何使用 select_fields 方法以利用从现有 DynamicFrame 中选定的字段列表来创建新 DynamicFrame

注意

要访问本示例中使用的数据集,请参阅 代码示例:对数据进行联接和关系化 并按照 步骤 1:爬取 Amazon S3 存储桶中的数据 中的说明进行操作。

# Example: Use select_fields to select specific fields from a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Create a new DynamicFrame with chosen fields names = persons.select_fields(paths=["family_name", "given_name"]) print("Schema for the names DynamicFrame, created with select_fields:") names.printSchema() names.toDF().show()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the names DynamicFrame: root |-- family_name: string |-- given_name: string +-----------+----------+ |family_name|given_name| +-----------+----------+ | Collins| Michael| | Huizenga| Bill| | Clawson| Curtis| | Solomon| Gerald| | Rigell| Edward| | Crapo| Michael| | Hutto| Earl| | Ertel| Allen| | Minish| Joseph| | Andrews| Robert| | Walden| Greg| | Kazen| Abraham| | Turner| Michael| | Kolbe| James| | Lowenthal| Alan| | Capuano| Michael| | Schrader| Kurt| | Nadler| Jerrold| | Graves| Tom| | McMillan| John| +-----------+----------+ only showing top 20 rows

simplify_ddb_json

simplify_ddb_json(): DynamicFrame

简化 a DynamicFrame 中专门在 DynamoDB JSON 结构中的嵌套列,并返回一个新的简化列。DynamicFrame如果 List 类型中有多种类型或 Map 类型,则不会简化列表中的元素。请注意,这是一种特定类型的转换,其行为与常规unnest转换不同,并且要求数据已经在 DynamoDB JSON 结构中。有关更多信息,请参阅 DynamoDB JSON

例如,使用 DynamoDB JSON 结构读取导出的架构可能如下所示:

root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean

simplify_ddb_json() 转换会将此转换为:

root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null

示例:使用 simplify_ddb_json 调用 DynamoDB JSON 简化

此代码示例使用该simplify_ddb_json方法使用 Amazon Glue DynamoDB 导出连接器、调用 DynamoDB JSON 简化并打印分区数量。

示例代码

from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext() glueContext = GlueContext(sc) dynamicFrame = glueContext.create_dynamic_frame.from_options( connection_type = "dynamodb", connection_options = { 'dynamodb.export': 'ddb', 'dynamodb.tableArn': '<table arn>', 'dynamodb.s3.bucket': '<bucket name>', 'dynamodb.s3.prefix': '<bucket prefix>', 'dynamodb.s3.bucketOwner': '<account_id of bucket>' } ) simplified = dynamicFrame.simplify_ddb_json() print(simplified.getNumPartitions())

spigot

spigot(path, options={})

将示例记录写入指定的目标,以帮助您验证作业执行的转换。

  • path – 要写入的目标的路径(必需)。

  • options – 指定选项的键值对(可选)。"topk" 选项指定应写入第一条 k 记录。"prob" 选项指定选择任何给定记录的可能性(以十进制数字形式表示)。您可以在选择要写入的记录时使用它。

  • transformation_ctx – 用于标识状态信息的唯一字符串 (可选)。

示例:使用 spigot 将 DynamicFrame 中的示例字段写入 Amazon S3

此代码示例使用 spigot 方法在应用了 select_fields 转换后将示例记录写入 Amazon S3 存储桶。

示例数据集

注意

要访问本示例中使用的数据集,请参阅 代码示例:对数据进行联接和关系化 并按照 步骤 1:爬取 Amazon S3 存储桶中的数据 中的说明进行操作。

该示例通过以下架构使用了名为 personsDynamicFrame

root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string

示例代码

# Example: Use spigot to write sample records # to a destination during a transformation # from pyspark.context import SparkContext. # Replace DOC-EXAMPLE-BUCKET with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load table data into a DynamicFrame persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) # Perform the select_fields on the DynamicFrame persons = persons.select_fields(paths=["family_name", "given_name", "birth_date"]) # Use spigot to write a sample of the transformed data # (the first 10 records) spigot_output = persons.spigot( path="s3://DOC-EXAMPLE-BUCKET", options={"topk": 10} )

以下是 spigot 写入 Amazon S3 的数据示例。由于指定了示例代码 options={"topk": 10},因此示例数据包含前 10 条记录。

{"family_name":"Collins","given_name":"Michael","birth_date":"1944-10-15"} {"family_name":"Huizenga","given_name":"Bill","birth_date":"1969-01-31"} {"family_name":"Clawson","given_name":"Curtis","birth_date":"1959-09-28"} {"family_name":"Solomon","given_name":"Gerald","birth_date":"1930-08-14"} {"family_name":"Rigell","given_name":"Edward","birth_date":"1960-05-28"} {"family_name":"Crapo","given_name":"Michael","birth_date":"1951-05-20"} {"family_name":"Hutto","given_name":"Earl","birth_date":"1926-05-12"} {"family_name":"Ertel","given_name":"Allen","birth_date":"1937-11-07"} {"family_name":"Minish","given_name":"Joseph","birth_date":"1916-09-01"} {"family_name":"Andrews","given_name":"Robert","birth_date":"1957-08-04"}

split_fields

split_fields(paths, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

返回一个新的 DynamicFrameCollection,其中包含两个 DynamicFrames。第一个 DynamicFrame 包含所有已拆分的节点,第二个包含其余节点。

  • paths – 字符串列表,每个字符串是到一个您要拆分为新的 DynamicFrame 的节点的完整路径。

  • name1 – 拆分的 DynamicFrame 的名称字符串。

  • name2 – 指定节点拆分后留存的 DynamicFrame 的名称字符串。

  • transformation_ctx – 用于标识状态信息的唯一字符串 (可选)。

  • info – 与此转换的错误报告关联的字符串 (可选)。

  • stageThreshold – 此转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。

  • totalThreshold – 此转换之前及转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。

示例:使用 split_fields 将选定字段拆分为单独的 DynamicFrame

此代码示例使用 split_fields 方法将指定字段列表拆分为单独的 DynamicFrame

示例数据集

该示例使用了集合 legislators_relationalized 中名为 l_root_contact_detailsDynamicFrame

l_root_contact_details 有以下架构和条目。

root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| ...

示例代码

# Example: Use split_fields to split selected # fields into a separate DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input DynamicFrame and inspect its schema frame_to_split = legislators_relationalized.select("l_root_contact_details") print("Inspect the input DynamicFrame schema:") frame_to_split.printSchema() # Split id and index fields into a separate DynamicFrame split_fields_collection = frame_to_split.split_fields(["id", "index"], "left", "right") # Inspect the resulting DynamicFrames print("Inspect the schemas of the DynamicFrames created with split_fields:") split_fields_collection.select("left").printSchema() split_fields_collection.select("right").printSchema()
Inspect the input DynamicFrame's schema: root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string Inspect the schemas of the DynamicFrames created with split_fields: root |-- id: long |-- index: int root |-- contact_details.val.type: string |-- contact_details.val.value: string

split_rows

split_rows(comparison_dict, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

DynamicFrame 中的一个或多个行拆分到新的 DynamicFrame 中。

该方法可返回一个新的 DynamicFrameCollection,其中包含两个 DynamicFrames。第一个 DynamicFrame 包含所有已拆分的行,第二个包含其余行。

  • comparison_dict – 一个词典,其中的键是到一个列的路径,值是另一个词典,用于将比较器映射到与该列值所比较的值。例如,{"age": {">": 10, "<": 20}} 拆分其年龄列中的值大于 10 但小于 20 的行。

  • name1 – 拆分的 DynamicFrame 的名称字符串。

  • name2 – 指定节点拆分后留存的 DynamicFrame 的名称字符串。

  • transformation_ctx – 用于标识状态信息的唯一字符串 (可选)。

  • info – 与此转换的错误报告关联的字符串 (可选)。

  • stageThreshold – 此转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。

  • totalThreshold – 此转换之前及转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。

示例:使用 split_rows 拆分 DynamicFrame 中的行

此代码示例使用 split_rows 方法根据 id 字段值拆分 DynamicFrame 中的行。

示例数据集

该示例使用了从集合 legislators_relationalized 中选择的名为 l_root_contact_detailsDynamicFrame

l_root_contact_details 有以下架构和条目。

root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+

示例代码

# Example: Use split_rows to split up # rows in a DynamicFrame based on value from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Retrieve the DynamicFrame to split frame_to_split = legislators_relationalized.select("l_root_contact_details") # Split up rows by ID split_rows_collection = frame_to_split.split_rows({"id": {">": 10}}, "high", "low") # Inspect the resulting DynamicFrames print("Inspect the DynamicFrame that contains IDs < 10") split_rows_collection.select("low").toDF().show() print("Inspect the DynamicFrame that contains IDs > 10") split_rows_collection.select("high").toDF().show()
Inspect the DynamicFrame that contains IDs < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains IDs > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| phone| 202-225-5476| | 11| 1| twitter| RepDavidYoung| | 12| 0| phone| 202-225-4035| | 12| 1| twitter| RepStephMurphy| | 13| 0| fax| 202-226-0774| | 13| 1| phone| 202-225-6335| | 14| 0| fax| 202-226-0774| | 14| 1| phone| 202-225-6335| | 15| 0| fax| 202-226-0774| | 15| 1| phone| 202-225-6335| | 16| 0| fax| 202-226-0774| | 16| 1| phone| 202-225-6335| | 17| 0| fax| 202-226-0774| | 17| 1| phone| 202-225-6335| | 18| 0| fax| 202-226-0774| | 18| 1| phone| 202-225-6335| | 19| 0| fax| 202-226-0774| | 19| 1| phone| 202-225-6335| | 20| 0| fax| 202-226-0774| | 20| 1| phone| 202-225-6335| +---+-----+------------------------+-------------------------+ only showing top 20 rows

unbox

unbox(path, format, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, **options)

取消装箱(重新格式化)DynamicFrame 中的一个字符串字段并返回包含已取消装箱 DynamicRecords 的新的 DynamicFrame

一个 DynamicRecord 表示 DynamicFrame 中的一条逻辑记录。它类似于 Apache Spark DataFrame 中的一行,但其具有自描述性,可用于不符合固定架构的数据。

  • path – 要取消装箱的字符串节点的完整路径。

  • format – 格式规范(可选)。您可将其用于 Amazon S3 或支持多种格式的 Amazon Glue 连接。相关受支持的格式,请参阅 Amazon Glue for Spark 中的输入和输出的数据格式选项

  • transformation_ctx – 用于标识状态信息的唯一字符串 (可选)。

  • info – 与此转换的错误报告关联的字符串 (可选)。

  • stageThreshold – 此转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。

  • totalThreshold – 此转换之前及转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。

  • options – 下列一个或多个:

    • separator – 包含分隔符字符的字符串。

    • escaper – 包含转义字符的字符串。

    • skipFirst – 指示是否跳过第一个实例的布尔值。

    • withSchema - 包含节点架构的 JSON 表示形式的字符串。架构的 JSON 表示格式由 StructType.json() 的输出定义。

    • withHeader – 指示是否包括标头的布尔值。

示例:使用 unbox 将字符串字段拆开为结构

此代码示例使用 unbox 方法将 DynamicFrame 中的字符串字段拆开或重新格式化为结构类型的字段。

示例数据集

该示例通过以下架构和条目使用了名为 DynamicFramemapped_with_string

请注意名为 AddressString 的字段。这是拆开为结构示例中的字段。

root |-- Average Total Payments: string |-- AddressString: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|{"Street": "1108 ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|{"Street": "2505 ...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|{"Street": "205 M...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|{"Street": "50 ME...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| ...

示例代码

# Example: Use unbox to unbox a string field # into a struct in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) unboxed = mapped_with_string.unbox("AddressString", "json") unboxed.printSchema() unboxed.toDF().show()
root |-- Average Total Payments: string |-- AddressString: struct | |-- Street: string | |-- City: string | |-- State: string | |-- Zip.Code: string | |-- Array: array | | |-- element: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|[1108 ROSS CLARK ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|[2505 U S HIGHWAY...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|[205 MARENGO STRE...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|[50 MEDICAL PARK ...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| | $5658.33|[1000 FIRST STREE...| $31633.27|039 - EXTRACRANIA...| $4851.44| AL - Birmingham|[35007, ALABASTER...| 10016| 18|SHELBY BAPTIST ME...| | $6653.80|[2105 EAST SOUTH ...| $16920.79|039 - EXTRACRANIA...| $5374.14| AL - Montgomery|[36116, MONTGOMER...| 10023| 67|BAPTIST MEDICAL C...| | $5834.74|[2000 PEPPERELL P...| $11977.13|039 - EXTRACRANIA...| $4761.41| AL - Birmingham|[36801, OPELIKA, ...| 10029| 51|EAST ALABAMA MEDI...| | $8031.12|[619 SOUTH 19TH S...| $35841.09|039 - EXTRACRANIA...| $5858.50| AL - Birmingham|[35233, BIRMINGHA...| 10033| 32|UNIVERSITY OF ALA...| | $6113.38|[101 SIVLEY RD, H...| $28523.39|039 - EXTRACRANIA...| $5228.40| AL - Huntsville|[35801, HUNTSVILL...| 10039| 135| HUNTSVILLE HOSPITAL| | $5541.05|[1007 GOODYEAR AV...| $75233.38|039 - EXTRACRANIA...| $4386.94| AL - Birmingham|[35903, GADSDEN, ...| 10040| 34|GADSDEN REGIONAL ...| | $5461.57|[600 SOUTH THIRD ...| $67327.92|039 - EXTRACRANIA...| $4493.57| AL - Birmingham|[35901, GADSDEN, ...| 10046| 14|RIVERVIEW REGIONA...| | $5356.28|[4370 WEST MAIN S...| $39607.28|039 - EXTRACRANIA...| $4408.20| AL - Dothan|[36305, DOTHAN, [...| 10055| 45| FLOWERS HOSPITAL| | $5374.65|[810 ST VINCENT'S...| $22862.23|039 - EXTRACRANIA...| $4186.02| AL - Birmingham|[35205, BIRMINGHA...| 10056| 43|ST VINCENT'S BIRM...| | $5366.23|[400 EAST 10TH ST...| $31110.85|039 - EXTRACRANIA...| $4376.23| AL - Birmingham|[36207, ANNISTON,...| 10078| 21|NORTHEAST ALABAMA...| | $5282.93|[1613 NORTH MCKEN...| $25411.33|039 - EXTRACRANIA...| $4383.73| AL - Mobile|[36535, FOLEY, [1...| 10083| 15|SOUTH BALDWIN REG...| | $5676.55|[1201 7TH STREET ...| $9234.51|039 - EXTRACRANIA...| $4509.11| AL - Huntsville|[35609, DECATUR, ...| 10085| 27|DECATUR GENERAL H...| | $5930.11|[6801 AIRPORT BOU...| $15895.85|039 - EXTRACRANIA...| $3972.85| AL - Mobile|[36608, MOBILE, [...| 10090| 27| PROVIDENCE HOSPITAL| | $6192.54|[809 UNIVERSITY B...| $19721.16|039 - EXTRACRANIA...| $5179.38| AL - Tuscaloosa|[35401, TUSCALOOS...| 10092| 31|D C H REGIONAL ME...| | $4968.00|[750 MORPHY AVENU...| $10710.88|039 - EXTRACRANIA...| $3898.88| AL - Mobile|[36532, FAIRHOPE,...| 10100| 18| THOMAS HOSPITAL| | $5996.00|[701 PRINCETON AV...| $51343.75|039 - EXTRACRANIA...| $4962.45| AL - Birmingham|[35211, BIRMINGHA...| 10103| 33|BAPTIST MEDICAL C...| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ only showing top 20 rows

联合

union(frame1, frame2, transformation_ctx = "", info = "", stageThreshold = 0, totalThreshold = 0)

联盟二 DynamicFrames。返回 DynamicFrame 包含两个输入中的所有记录 DynamicFrames。这种变换可能会从两个数据 DataFrames 与等效数据的合并中返回不同的结果。如果你需要 Spark DataFrame 联合行为,可以考虑使用toDF

  • frame1— 首先 DynamicFrame 加入工会。

  • frame2— 仅次 DynamicFrame 于工会。

  • transformation_ctx –(可选)用于标识统计信息/状态信息的唯一字符串

  • info –(可选)与转换中的错误关联的任何字符串

  • stageThreshold –(可选)在处理出错之前转换中出现的最大错误数

  • totalThreshold –(可选)在处理出错之前出现的最大总错误数

unnest

unnest(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

取消嵌套 DynamicFrame 中的嵌套对象,使其成为顶级对象,并返回新的取消嵌套的 DynamicFrame

  • transformation_ctx – 用于标识状态信息的唯一字符串 (可选)。

  • info – 与此转换的错误报告关联的字符串 (可选)。

  • stageThreshold – 此转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。

  • totalThreshold – 此转换之前及转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。

示例:使用 unnest 将嵌套字段转换为顶级字段

此代码示例使用 unnest 方法将 DynamicFrame 中的所有嵌套字段展平为顶级字段。

示例数据集

该示例通过以下架构使用了名为 mapped_medicareDynamicFrame。请注意,Address 字段是唯一包含嵌套数据的字段。

root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

示例代码

# Example: Use unnest to unnest nested # objects in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Unnest all nested fields unnested = mapped_medicare.unnest() unnested.printSchema()
root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address.Zip.Code: string |-- Address.City: string |-- Address.Array: array | |-- element: string |-- Address.State: string |-- Address.Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

unnest_ddb_json

解除 DynamicFrame 中的嵌套列,其具体位于 DynamoDB JSON 结构中,并返回一个新的非嵌套 DynamicFrame。属于结构类型数组的列将不会被解除嵌套。请注意,这是一种特定类型的非嵌套转换,其行为与常规 unnest 转换不同,并且要求数据已存在于 DynamoDB JSON 结构中。有关更多信息,请参阅 DynamoDB JSON

unnest_ddb_json(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
  • transformation_ctx – 用于标识状态信息的唯一字符串 (可选)。

  • info – 与此转换的错误报告关联的字符串 (可选)。

  • stageThreshold – 此转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。

  • totalThreshold – 此转换之前及转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。

例如,使用 DynamoDB JSON 结构读取导出的架构可能如下所示:

root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null

unnest_ddb_json() 转换会将此转换为:

root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null

以下代码示例展示了如何使用 Amazon Glue DynamoDB 导出连接器、调用 DynamoDB JSON 取消嵌套以及如何打印分区数量:

import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dynamicFrame = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": "<test_source>", "dynamodb.s3.bucket": "<bucket name>", "dynamodb.s3.prefix": "<bucket prefix>", "dynamodb.s3.bucketOwner": "<account_id>", } ) unnested = dynamicFrame.unnest_ddb_json() print(unnested.getNumPartitions()) job.commit()

write

write(connection_type, connection_options, format, format_options, accumulator_size)

从此 DynamicFrameGlueContext 类 获得指定连接类型的 DataSink(object) 并将其用于格式化和写入此 DynamicFrame 的内容。返回按指定进行格式化和写入的新的 DynamicFrame

  • connection_type – 要使用的连接类型。有效值包括 s3mysqlpostgresqlredshiftsqlserveroracle

  • connection_options – 要使用的连接选项(可选)。对于 connection_types3,将会定义 Amazon S3 路径。

    connection_options = {"path": "s3://aws-glue-target/temp"}

    对于 JDBC 连接,必须定义多个属性。请注意,数据库名称必须是 URL 的一部分。它可以选择性地包含在连接选项中。

    警告

    不建议在脚本中存储密码。考虑使用boto3从 Amazon Secrets Manager 或 Amazon Glue 数据目录中检索它们。

    connection_options = {"url": "jdbc-url/database", "user": "username", "password": passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}
  • format – 格式规范(可选)。这用于 Amazon Simple Storage Service(Amazon S3)或支持多种格式的 Amazon Glue 连接。有关支持的格式,请参阅 Amazon Glue for Spark 中的输入和输出的数据格式选项

  • format_options – 指定格式的格式选项。有关支持的格式,请参阅 Amazon Glue for Spark 中的输入和输出的数据格式选项

  • accumulator_size - 要使用的可累积大小(以字节为单位)(可选)。

 – 错误 –

assertErrorThreshold

assertErrorThreshold( ) – 创建此 DynamicFrame 的转换中的错误的资产。从底层 DataFrame 返回 Exception

errorsAsDynamic框架

errorsAsDynamicFrame( ) – 返回其中包含嵌套的错误记录的 DynamicFrame

示例:使用 errorsAsDynamic Frame 查看错误记录

以下代码示例展示了如何使用 errorsAsDynamicFrame 方法查看 DynamicFrame 的错误记录。

示例数据集

该示例使用以下数据集,您可以将以下数据集作为 JSON 上传到 Amazon S3。请注意,第二条记录的格式有误。当您使用 SparkSQL 时,格式错误的数据通常会中断文件解析。但是,DynamicFrame 会识别格式错误的问题,并将格式错误的行转换为您能单独处理的错误记录。

{"id": 1, "name": "george", "surname": "washington", "height": 178} {"id": 2, "name": "benjamin", "surname": "franklin", {"id": 3, "name": "alexander", "surname": "hamilton", "height": 171} {"id": 4, "name": "john", "surname": "jay", "height": 190}

示例代码

# Example: Use errorsAsDynamicFrame to view error records. # Replace s3://DOC-EXAMPLE-S3-BUCKET/error_data.json with your location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create errors DynamicFrame, view schema errors = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["s3://DOC-EXAMPLE-S3-BUCKET/error_data.json"]}, "json" ) print("Schema of errors DynamicFrame:") errors.printSchema() # Show that errors only contains valid entries from the dataset print("errors contains only valid records from the input dataset (2 of 4 records)") errors.toDF().show() # View errors print("Errors count:", str(errors.errorsCount())) print("Errors:") errors.errorsAsDynamicFrame().toDF().show() # View error fields and error data error_record = errors.errorsAsDynamicFrame().toDF().head() error_fields = error_record["error"] print("Error fields: ") print(error_fields.asDict().keys()) print("\nError record data:") for key in error_fields.asDict().keys(): print("\n", key, ": ", str(error_fields[key]))
Schema of errors DynamicFrame: root |-- id: int |-- name: string |-- surname: string |-- height: int errors contains only valid records from the input dataset (2 of 4 records) +---+------+----------+------+ | id| name| surname|height| +---+------+----------+------+ | 1|george|washington| 178| | 4| john| jay| 190| +---+------+----------+------+ Errors count: 1 Errors: +--------------------+ | error| +--------------------+ |[[ File "/tmp/20...| +--------------------+ Error fields: dict_keys(['callsite', 'msg', 'stackTrace', 'input', 'bytesread', 'source', 'dynamicRecord']) Error record data: callsite : Row(site=' File "/tmp/2060612586885849088", line 549, in <module>\n sys.exit(main())\n File "/tmp/2060612586885849088", line 523, in main\n response = handler(content)\n File "/tmp/2060612586885849088", line 197, in execute_request\n result = node.execute()\n File "/tmp/2060612586885849088", line 103, in execute\n exec(code, global_dict)\n File "<stdin>", line 10, in <module>\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 625, in from_options\n format_options, transformation_ctx, push_down_predicate, **kwargs)\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 233, in create_dynamic_frame_from_options\n source.setFormat(format, **format_options)\n', info='') msg : error in jackson reader stackTrace : com.fasterxml.jackson.core.JsonParseException: Unexpected character ('{' (code 123)): was expecting either valid name character (for unquoted name) or double-quote (for quoted) to start field name at [Source: com.amazonaws.services.glue.readers.BufferedStream@73492578; line: 3, column: 2] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:462) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleOddName(UTF8StreamJsonParser.java:2012) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseName(UTF8StreamJsonParser.java:1650) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:740) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at scala.collection.Iterator$$anon$9.next(Iterator.scala:162) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:599) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:598) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:120) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:116) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErr(DynamicRecordBuilder.scala:209) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErrorWithException(DynamicRecordBuilder.scala:202) at com.amazonaws.services.glue.readers.JacksonReader.nextFailSafe(JacksonReader.scala:116) at com.amazonaws.services.glue.readers.JacksonReader.next(JacksonReader.scala:109) at com.amazonaws.services.glue.readers.JSONReader.next(JSONReader.scala:247) at com.amazonaws.services.glue.hadoop.TapeHadoopRecordReaderSplittable.nextKeyValue(TapeHadoopRecordReaderSplittable.scala:103) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) input : bytesread : 252 source : dynamicRecord : Row(id=2, name='benjamin', surname='franklin')

errorsCount

errorsCount( ) – 返回 DynamicFrame 中的错误总数。

stageErrorsCount

stageErrorsCount – 返回生成此 DynamicFrame 的过程中发生的错误数。