DynamicFrame 类 - AWS Glue
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

DynamicFrame 类

ApacheSpark的主要抽象之一是 SparkSQL DataFrame,类似于 DataFrame 在R和Pandas中找到构建体。DataFrame 类似于表格,支持函数风格(map/reduce/filter/等)操作和 SQL 操作(select、project、aggregate)。

尽管 DataFrames 功能强大且运用广泛,但在提取、转换和加载 (ETL) 操作方面存在局限性。最明显的是,它们需要指定了架构后才能加载任何数据。SparkSQL 通过对数据进行两次传递来解决此问题—第一个用于推断框架,第二个用于加载数据。然而,此推断仍有局限性,且不能解决数据混乱的实际问题。例如,同样的字段在不同记录中可能属于不同类型。对此,Apache Spark 通常没有好的办法,只能使用原始字段文本将类型报告为 string。这或许不正确,同时您可能希望更精细地控制解决架构差异的方式。对于大型数据集而言,每扫描一次源数据都会付出高昂代价。

为解决这些限制, AWS Glue 介绍 DynamicFrame。甲 DynamicFrame 类似于 DataFrame,除了每条记录都是自我描述之外,因此最初不需要框架。相反,AWS Glue 会在需要时实时计算一个架构,并使用选择(或联合)类型显式编码架构不一致之处。您可以解决这些不一致之处,以使您的数据集兼容需要固定架构的数据存储。

同样, DynamicRecord 表示 DynamicFrame。就像一个Spark中的一行 DataFrame,例外是它属于自我描述,并且可用于不符合固定框架的数据。

在解决任何架构不一致问题后,就可以在 DynamicFramesDataFrames 之间来回转换。

 — 构造 —

__init__

__init__(jdf, glue_ctx, name)

  • jdf – 对 Java 虚拟机 (JVM) 中数据帧的引用。

  • glue_ctx – 一个 GlueContext 类 对象。

  • name – 可选的名称字符串,默认为空。

fromDF

fromDF(dataframe, glue_ctx, name)

通过将 DataFrame 字段转换为 DynamicRecord 字段将 DataFrame 转换为 DynamicFrame。返回新的 DynamicFrame

DynamicRecord 表示 DynamicFrame。类似于Spark中的一行 DataFrame,例外是它属于自我描述,并且可用于不符合固定框架的数据。

  • dataframe – 要转换的 Apache Spark SQL DataFrame(必需)。

  • glue_ctx – 为此转换指定上下文的 GlueContext 类对象(必需)。

  • name – 生成的 DynamicFrame 的名称(必需)。

toDF

toDF(options)

通过将 DynamicRecords 转换为 DataFrame 字段将 DynamicFrame 转换为 Apache Spark DataFrame。返回新的 DataFrame

DynamicRecord 表示 DynamicFrame。类似于Spark中的一行 DataFrame,例外是它属于自我描述,并且可用于不符合固定框架的数据。

  • options – 一个选项列表。如果您选择 ProjectCast 操作类型,则指定目标类型。示例包括以下内容。

    >>>toDF([ResolveOption("a.b.c", "KeepAsStruct")]) >>>toDF([ResolveOption("a.b.c", "Project", DoubleType())])

 —信息—

count

count( ) – 返回底层 DataFrame 中的行数。

schema

schema( ) – 返回此 DynamicFrame 的架构,如果此架构不可用,则返回底层 DataFrame 的架构。

printSchema

printSchema( ) – 打印底层 DataFrame 的架构。

show

show(num_rows) – 打印底层 DataFrame 中的行数。

repartition

repartition(numPartitions) – 返回具有 numPartitions 个分区的新 DynamicFrame

coalesce

coalesce(numPartitions) – 返回具有 numPartitions 个分区的新 DynamicFrame

 — 转换 —

apply_mapping

apply_mapping(mappings, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

将声明映射应用于此 DynamicFrame 并返回已应用那些映射的新的 DynamicFrame

  • mappings – 映射元组列表,每个元组包括:(源列,源类型,目标列,目标类型)。必填项。

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

  • info – 与此转换的错误报告关联的字符串(可选)。

  • stageThreshold – 在此转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

  • totalThreshold – 在此转换之前及转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

drop_fields

drop_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

呼叫 FlatMap 类 转换以从删除字段 DynamicFrame。返回新的 DynamicFrame 指定字段已移除。

  • paths – 字符串列表,每个字符串包含要删除的字段节点的完整路径。

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

  • info – 与此转换的错误报告关联的字符串(可选)。

  • stageThreshold – 在此转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

  • totalThreshold – 在此转换之前及转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

filter

filter(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

返回通过选择输入 DynamicFrame 内满足指定谓词函数 f 的所有 DynamicRecords 而构建的新的 DynamicFrame

  • f – 要应用于 DynamicFrame。功能必须采用 DynamicRecord 作为引数,如果 DynamicRecord 符合过滤器要求,如果不符合,则为False(如果不符合)(要求)。

    DynamicRecord 表示 DynamicFrame。类似于Spark中的一行 DataFrame,例外是它属于自我描述,并且可用于不符合固定框架的数据。

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

  • info – 与此转换的错误报告关联的字符串(可选)。

  • stageThreshold – 在此转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

  • totalThreshold – 在此转换之前及转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

有关如何使用 filter 转换的示例,请参阅 筛选器类

join

join(paths1, paths2, frame2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

执行与另一个 DynamicFrame 的等式联接并返回生成的 DynamicFrame

  • paths1 – 此帧中要联接的键列表。

  • paths2 – 另一帧中要联接的键列表。

  • frame2 – 要联接的另一个 DynamicFrame

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

  • info – 与此转换的错误报告关联的字符串(可选)。

  • stageThreshold – 在此转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

  • totalThreshold – 在此转换之前及转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

map

map(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

返回由于将指定映射函数应用到原始 DynamicFrame 中的所有记录而产生的新的 DynamicFrame

  • f – 映射功能应用于 DynamicFrame。功能必须采用 DynamicRecord 作为参数并返回新的 DynamicRecord (必填)。

    DynamicRecord 表示 DynamicFrame。它类似于ApacheSpark中的一行 DataFrame,例外是它属于自我描述,并且可用于不符合固定框架的数据。

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

  • info – 与转换中的错误关联的字符串(可选)。

  • stageThreshold – 在转换出错之前可能在其中发生的最大错误数(可选;默认值为零)。

  • totalThreshold – 在处理出错之前可能全面发生的最大错误数(可选;默认值为零)。

有关如何使用 map 转换的示例,请参阅 映射类

mergeDynamicFrame

mergeDynamicFrame(stage_dynamic_frame, primary_keys, transformation_ctx = "", options = {}, info = "", stageThreshold = 0, totalThreshold = 0)

基于指定主键的将此 DynamicFrame 与暂存 DynamicFrame 合并以标识记录。不会对重复记录(具有相同主键的记录)去除重复。如果暂存帧中没有匹配的记录,则从源中保留所有记录(包括重复记录)。如果暂存帧具有匹配的记录,则暂存帧中的记录将覆盖 AWS Glue 中的源中的记录。

  • stage_dynamic_frame – 要合并的暂存 DynamicFrame

  • primary_keys – 要匹配源和暂存动态帧中的记录的主键字段列表。

  • transformation_ctx – 用于检索有关当前转换的元数据的唯一字符串(可选)。

  • options – 为此转换提供其他信息的 JSON 名称-值对的字符串。

  • info – 甲 String。要与此转换中的错误关联的任何字符串。

  • stageThreshold – 甲 Long。给定转换中处理需要出错的错误数。

  • totalThreshold – 甲 Long。在此转换中,处理需要出错的错误总数(包括)。

返回通过将此 DynamicFrame 与暂存 DynamicFrame 合并获取的新 DynamicFrame

在这些情况下,返回的 DynamicFrame 将包含记录 A:

  1. 如果 A 在源帧和暂存帧中都存在,则返回暂存帧中的 A

  2. 如果 A 在源表中,且A.primaryKeys 不在 stagingDynamicFrame 中(这意味着,未在暂存表中更新 A)。

源帧和暂存帧不需要具有相同的架构。

merged_frame = source_frame.mergeDynamicFrame(stage_frame, ["id"])

relationalize

relationalize(root_table_name, staging_path, options, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

通过生成由取消嵌套嵌套列和透视数组列而生成的帧列表来关系化 DynamicFrame。可使用在取消嵌套阶段生成的联接键将透视数组列联接到根表。

  • root_table_name – 根表的名称。

  • staging_path – 要将 CSV 格式的透视表分区存储到的路径(可选)。从该路径读取透视表。

  • options – 可选参数的词典。

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

  • info – 与此转换的错误报告关联的字符串(可选)。

  • stageThreshold – 在此转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

  • totalThreshold – 在此转换之前及转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

rename_field

rename_field(oldName, newName, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

重命名此 DynamicFrame 中的一个字段并返回包含该重命名字段的新的 DynamicFrame

  • oldName – 要重命名的节点的完整路径。

    如果旧名称中包含点,则 RenameField 将不起作用,除非使用反引号 (`) 将其引起来。例如,要将 this.old.name 替换为 thisNewName,应按如下方式调用 rename_field。

    newDyF = oldDyF.rename_field("`this.old.name`", "thisNewName")
  • newName – 新名称,作为完整路径。

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

  • info – 与此转换的错误报告关联的字符串(可选)。

  • stageThreshold – 在此转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

  • totalThreshold – 在此转换之前及转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

resolveChoice

resolveChoice(specs = None, option="", transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

解析此 DynamicFrame 内的一个选择类型并返回新的 DynamicFrame

  • specs – 要解决的特定歧义列表,每个都以组合的形式: (path, action)。的 path 值识别特定的模糊元素,以及 action 值标识对应的分辨率。只能使用 specsoption 参数之一。如果 spec 参数不为 None,则 option 参数必须为空字符串。相反,如果 option 不是空字符串,则 spec 参数必须 None。如果两个参数都未提供, AWS Glue 尝试解析框架并使用它来解决模糊性。

    specs 元组的 action 部分可以指定四个解析策略之一:

    • cast:   允许您指定要铸造的类型(例如, cast:int)。

    • make_cols:   通过精简数据来解决潜在的歧义。例如,如果 columnAintstring,则解析就是在结果 DynamicFrame 中生成名为 columnA_intcolumnA_string 的两个列。

    • make_struct:   使用结构表示数据,以解决潜在的歧义。例如,如果某个列中的数据是 intstring,则使用 make_struct 操作会在结果 DynamicFrame 中生成结构列,而每个结构都同时包含 intstring

    • project:   通过将所有数据投影到其中一种可能的数据类型,解决潜在的模糊性。例如,如果某个列中的数据是 intstring,则使用 project:string 操作会在结果 DynamicFrame 中生成一个列,其中所有 int 值都已转换为字符串。

    如果 path 识别到数组,则在数组名称后放置一个空的方括号可避免歧义。例如,假设您正在使用结构如下的数据:

    "myList": [ { "price": 100.00 }, { "price": "$100.00" } ]

    可以通过将 path 设置为 "myList[].price"、将 action 设置为 "cast:double" 来选择价格的数字而非字符串版本。

  • option – 如果 specs 参数是 None。如果 specs 参数不是 None,则不能将其设置为除空字符串之外的任何内容。

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

  • info – 与此转换的错误报告关联的字符串(可选)。

  • stageThreshold – 在此转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

  • totalThreshold – 在此转换之前及转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

df1 = df.resolveChoice(option = "make_cols") df2 = df.resolveChoice(specs = [("a.b", "make_struct"), ("c.d", "cast:double")])

select_fields

select_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

返回包含选定字段的新的 DynamicFrame

  • paths – 字符串列表,每个字符串就是一个到您要选择的顶层节点的路径。

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

  • info – 与此转换的错误报告关联的字符串(可选)。

  • stageThreshold – 在此转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

  • totalThreshold – 在此转换之前及转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

spigot

spigot(path, options={})

在转换期间将示例记录写入指定目标位置,并返回包含一个额外写入步骤的输入 DynamicFrame

  • path – 要向其中写入内容的目标位置的路径(必需)。

  • options – 指定选项的键值对(可选)。"topk" 选项指定应写入第一条 k 记录。"prob" 选项指定选择任何给定记录的可能性(以十进制数字形式表示),用于选择要写入的记录。

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

split_fields

split_fields(paths, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

返回一个新的 DynamicFrameCollection,其中包含两个 DynamicFrames:第一个包含所有已拆分的节点,第二个包含其余节点。

  • paths – 字符串列表,每个字符串是到一个您要拆分为新的 DynamicFrame 的节点的完整路径。

  • name1 – 拆分的 DynamicFrame 的名称字符串。

  • name2 – 指定节点拆分后留存的 DynamicFrame 的名称字符串。

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

  • info – 与此转换的错误报告关联的字符串(可选)。

  • stageThreshold – 在此转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

  • totalThreshold – 在此转换之前及转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

split_rows

DynamicFrame 中的一个或多个行拆分到新的 DynamicFrame 中。

split_rows(comparison_dict, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

返回一个新的 DynamicFrameCollection,其中包含两个 DynamicFrames:第一个包含所有已拆分的行,第二个包含其余行。

  • comparison_dict – 一个词典,其中的键是到一个列的路径,值是另一个词典,用于将比较器映射到与该列值所比较的值。例如,{"age": {">": 10, "<": 20}} 拆分其年龄列中的值大于 10 但小于 20 的行。

  • name1 – 拆分的 DynamicFrame 的名称字符串。

  • name2 – 指定节点拆分后留存的 DynamicFrame 的名称字符串。

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

  • info – 与此转换的错误报告关联的字符串(可选)。

  • stageThreshold – 在此转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

  • totalThreshold – 在此转换之前及转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

unbox

unbox(path, format, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, **options)

取消装箱 DynamicFrame 中的一个字符串字段并返回包含已取消装箱 DynamicRecords 的新的 DynamicFrame

DynamicRecord 表示 DynamicFrame。它类似于ApacheSpark中的一行 DataFrame,例外是它属于自我描述,并且可用于不符合固定框架的数据。

  • path – 要取消装箱的字符串节点的完整路径。

  • format – 格式规范(可选)。这用于 Amazon Simple Storage Service (Amazon S3) 或支持多种格式的 AWS Glue 连接。有关支持的格式,请参阅 中的 ETL 输入和输出的格式选项AWS Glue

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

  • info – 与此转换的错误报告关联的字符串(可选)。

  • stageThreshold – 在此转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

  • totalThreshold – 在此转换之前及转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

  • options – 下列一个或多个:

    • separator – 包含分隔符字符的字符串。

    • escaper – 包含转义字符的字符串。

    • skipFirst – 指示是否跳过第一个实例的布尔值。

    • withSchema – 包含架构的字符串;必须使用 StructType.json( ) 调用。

    • withHeader – 指示是否包括标头的布尔值。

例如:unbox("a.b.c", "csv", separator="|")

unnest

取消嵌套 DynamicFrame 中的嵌套对象,使其成为顶级对象,并返回新的取消嵌套的 DynamicFrame

unnest(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

取消嵌套 DynamicFrame 中的嵌套对象,使其成为顶级对象,并返回新的取消嵌套的 DynamicFrame

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

  • info – 与此转换的错误报告关联的字符串(可选)。

  • stageThreshold – 在此转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

  • totalThreshold – 在此转换之前及转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

例如:unnest( )

write

write(connection_type, connection_options, format, format_options, accumulator_size)

获取 DataSink(对象) 指定连接类型的 GlueContext 类DynamicFrame,并用于格式化和编写此 DynamicFrame。返回新的 DynamicFrame 按指定格式进行格式化和写入。

  • connection_type – 要使用的连接类型。有效值包括 s3, mysql, postgresql, redshift, sqlserver,和 oracle.

  • connection_options – 要使用的连接选项(可选)。对于 connection_types3 的情况,定义 Amazon S3 路径。

    connection_options = {"path": "s3://aws-glue-target/temp"}

    对于 JDBC 连接,必须定义多个属性。请注意,数据库名称必须是 URL 的一部分。它可以选择性地包含在连接选项中。

    connection_options = {"url": "jdbc-url/database", "user": "username", "password": "password","dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}
  • format – 格式规范(可选)。这用于 Amazon Simple Storage Service (Amazon S3) 或支持多种格式的 AWS Glue 连接。有关支持的格式,请参阅 中的 ETL 输入和输出的格式选项AWS Glue

  • format_options – 指定格式的格式选项。有关支持的格式,请参阅 中的 ETL 输入和输出的格式选项AWS Glue

  • accumulator_size – 要使用的可累积大小(可选)。

 — 错误 —

assertErrorThreshold

assertErrorThreshold( ) – 创建此的转换中的错误声明 DynamicFrame。返回 Exception 从底层 DataFrame.

errorsAsDynamicFrame

errorsAsDynamicFrame( ) – 返回其中包含嵌套的错误记录的 DynamicFrame

errorsCount

errorsCount( ) – 返回 DynamicFrame 中的错误总数。

stageErrorsCount

stageErrorsCount – 返回生成此 DynamicFrame 的过程中发生的错误数。