DynamicFrame 类 - AWS Glue
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

如果我们为英文版本指南提供翻译,那么如果存在任何冲突,将以英文版本指南为准。在提供翻译时使用机器翻译。

DynamicFrame 类

Apache Spark 中的主要抽象之一是 SparkSQL DataFrame,它类似于在 R 和 Pandas 中找到的 DataFrame 构造。DataFrame 类似于表格,支持函数风格(map/reduce/filter/等)操作和 SQL 操作(select、project、aggregate)。

尽管 DataFrames 功能强大且运用广泛,但在提取、转换和加载 (ETL) 操作方面存在局限性。最明显的是,它们需要指定了架构后才能加载任何数据。SparkSQL 通过对数据进行两次扫描解决了这一问题 — 第一次为了推断架构,第二次为了加载数据。然而,此推断仍有局限性,且不能解决数据混乱的实际问题。例如,同样的字段在不同记录中可能属于不同类型。对此,Apache Spark 通常没有好的办法,只能使用原始字段文本将类型报告为 string。这或许不正确,同时您可能希望更精细地控制解决架构差异的方式。对于大型数据集而言,每扫描一次源数据都会付出高昂代价。

为了解决这些局限性,AWS Glue 引入了 DynamicFrameDynamicFrame 类似于 DataFrame,不同之处在于每个记录都是自描述的,因此初始并不需要架构。相反,AWS Glue 会在需要时实时计算一个架构,并使用选择(或联合)类型显式编码架构不一致之处。您可以解决这些不一致之处,以使您的数据集兼容需要固定架构的数据存储。

同样,一个 DynamicRecord 表示 DynamicFrame 中的一条逻辑记录。它类似于 Spark DataFrame 中的一行,只不过它是自描述的,可用于不符合固定架构的数据。

在解决任何架构不一致问题后,就可以在 DynamicFramesDataFrames 之间来回转换。

 — 构造 —

__init__

__init__(jdf, glue_ctx, name)

  • jdf – 对 Java 虚拟机 (JVM) 中数据帧的引用。

  • glue_ctx – 一个 GlueContext 类 对象。

  • name – 可选的名称字符串,默认为空。

fromDF

fromDF(dataframe, glue_ctx, name)

通过将 DataFrame 字段转换为 DynamicRecord 字段将 DataFrame 转换为 DynamicFrame。返回新的 DynamicFrame

一个 DynamicRecord 表示 DynamicFrame 中的一条逻辑记录。它类似于 Spark DataFrame 中的一行,只不过它是自描述的,可用于不符合固定架构的数据。

  • dataframe – 要转换的 Apache Spark SQL DataFrame(必需)。

  • glue_ctx – 为此转换指定上下文的 GlueContext 类对象(必需)。

  • name – 生成的 DynamicFrame 的名称(必需)。

toDF

toDF(options)

通过将 DynamicRecords 转换为 DataFrame 字段将 DynamicFrame 转换为 Apache Spark DataFrame。返回新的 DataFrame

一个 DynamicRecord 表示 DynamicFrame 中的一条逻辑记录。它类似于 Spark DataFrame 中的一行,只不过它是自描述的,可用于不符合固定架构的数据。

  • options – 一个选项列表。如果您选择 ProjectCast 操作类型,则指定目标类型。示例包括以下内容。

    >>>toDF([ResolveOption("a.b.c", "KeepAsStruct")]) >>>toDF([ResolveOption("a.b.c", "Project", DoubleType())])

 —信息—

count

count( ) – 返回底层 DataFrame 中的行数。

schema

schema( ) – 返回此 DynamicFrame 的架构,如果此架构不可用,则返回底层 DataFrame 的架构。

printSchema

printSchema( ) – 打印底层 DataFrame 的架构。

show

show(num_rows) – 打印底层 DataFrame 中的行数。

repartition

repartition(numPartitions) – 返回具有 numPartitions 个分区的新 DynamicFrame

coalesce

coalesce(numPartitions) – 返回具有 numPartitions 个分区的新 DynamicFrame

 — 转换 —

apply_mapping

apply_mapping(mappings, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

将声明映射应用于此 DynamicFrame 并返回已应用那些映射的新的 DynamicFrame

  • mappings – 映射元组列表,每个元组包括:(源列,源类型,目标列,目标类型)。必填项。

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

  • info – 与此转换的错误报告关联的字符串(可选)。

  • stageThreshold – 在此转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

  • totalThreshold – 在此转换之前及转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

drop_fields

drop_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

调用 FlatMap 类 转换以从 DynamicFrame 中删除字段。返回指定字段已删除的新的 DynamicFrame

  • paths – 字符串列表,每个字符串包含要删除的字段节点的完整路径。

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

  • info – 与此转换的错误报告关联的字符串(可选)。

  • stageThreshold – 在此转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

  • totalThreshold – 在此转换之前及转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

filter

filter(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

返回通过选择输入 DynamicFrame 内满足指定谓词函数 f 的所有 DynamicRecords 而构建的新的 DynamicFrame

  • f – 要应用到 DynamicFrame 的谓词函数。该函数必须采用 DynamicRecord 作为参数,如果 DynamicRecord 满足筛选要求,则返回 True,否则返回 False (必需)。

    一个 DynamicRecord 表示 DynamicFrame 中的一条逻辑记录。它类似于 Spark DataFrame 中的一行,只不过它是自描述的,可用于不符合固定架构的数据。

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

  • info – 与此转换的错误报告关联的字符串(可选)。

  • stageThreshold – 在此转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

  • totalThreshold – 在此转换之前及转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

有关如何使用 filter 转换的示例,请参阅 筛选器类

join

join(paths1, paths2, frame2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

执行与另一个 DynamicFrame 的等式联接并返回生成的 DynamicFrame

  • paths1 – 此帧中要联接的键列表。

  • paths2 – 另一帧中要联接的键列表。

  • frame2 – 要联接的另一个 DynamicFrame

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

  • info – 与此转换的错误报告关联的字符串(可选)。

  • stageThreshold – 在此转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

  • totalThreshold – 在此转换之前及转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

map

map(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

返回由于将指定映射函数应用到原始 DynamicFrame 中的所有记录而产生的新的 DynamicFrame

  • f – 应用到 DynamicFrame 中所有记录的映射函数。该函数必须采用 DynamicRecord 作为参数并返回一个新的 DynamicRecord (必需)。

    一个 DynamicRecord 表示 DynamicFrame 中的一条逻辑记录。它类似于 Apache Spark DataFrame 中的一行,只不过它是自描述的,可用于不符合固定架构的数据。

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

  • info – 与转换中的错误关联的字符串(可选)。

  • stageThreshold – 在转换出错之前可能在其中发生的最大错误数(可选;默认值为零)。

  • totalThreshold – 在处理出错之前可能全面发生的最大错误数(可选;默认值为零)。

有关如何使用 map 转换的示例,请参阅 映射类

mergeDynamicFrame

mergeDynamicFrame(stage_dynamic_frame, primary_keys, transformation_ctx = "", options = {}, info = "", stageThreshold = 0, totalThreshold = 0)

基于指定主键的将此 DynamicFrame 与暂存 DynamicFrame 合并以标识记录。不会对重复记录(具有相同主键的记录)去除重复。如果暂存帧中没有匹配的记录,则从源中保留所有记录(包括重复记录)。如果暂存帧具有匹配的记录,则暂存帧中的记录将覆盖 AWS Glue 中的源中的记录。

  • stage_dynamic_frame – 要合并的暂存 DynamicFrame

  • primary_keys – 要匹配源和暂存动态帧中的记录的主键字段列表。

  • transformation_ctx – 用于检索有关当前转换的元数据的唯一字符串(可选)。

  • options – 为此转换提供其他信息的 JSON 名称-值对的字符串。

  • info – 一个 String。要与此转换中的错误关联的任何字符串。

  • stageThreshold – 一个 Long。给定转换中处理需要排除的错误的数目。

  • totalThreshold – 一个 Long。此转换中处理需要排除的错误的总数。

返回通过将此 DynamicFrame 与暂存 DynamicFrame 合并获取的新 DynamicFrame

在这些情况下,返回的 DynamicFrame 将包含记录 A:

  1. 如果 A 在源帧和暂存帧中都存在,则返回暂存帧中的 A

  2. 如果 A 在源表中,且A.primaryKeys 不在 stagingDynamicFrame 中(这意味着,未在暂存表中更新 A)。

源帧和暂存帧不需要具有相同的架构。

merged_frame = source_frame.mergeDynamicFrame(stage_frame, ["id"])

relationalize

relationalize(root_table_name, staging_path, options, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

通过生成由取消嵌套嵌套列和透视数组列而生成的帧列表来关系化 DynamicFrame。可使用在取消嵌套阶段生成的联接键将透视数组列联接到根表。

  • root_table_name – 根表的名称。

  • staging_path – 要将 CSV 格式的透视表分区存储到的路径(可选)。从该路径读取透视表。

  • options – 可选参数的词典。

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

  • info – 与此转换的错误报告关联的字符串(可选)。

  • stageThreshold – 在此转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

  • totalThreshold – 在此转换之前及转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

rename_field

rename_field(oldName, newName, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

重命名此 DynamicFrame 中的一个字段并返回包含该重命名字段的新的 DynamicFrame

  • oldName – 要重命名的节点的完整路径。

    如果旧名称中包含点,则 RenameField 将不起作用,除非使用反引号 (`) 将其引起来。例如,要将 this.old.name 替换为 thisNewName,应按如下方式调用 rename_field。

    newDyF = oldDyF.rename_field("`this.old.name`", "thisNewName")
  • newName – 新名称,作为完整路径。

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

  • info – 与此转换的错误报告关联的字符串(可选)。

  • stageThreshold – 在此转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

  • totalThreshold – 在此转换之前及转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

resolveChoice

resolveChoice(specs = None, option="", transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

解析此 DynamicFrame 内的一个选择类型并返回新的 DynamicFrame

  • specs – 要解析的特定歧义列表,每个歧义均采用元组形式:(path, action)path 值标识特定歧义元素,action 值标识相应解析。只能使用 specsoption 参数之一。如果 spec 参数不为 None,则 option 参数必须为空字符串。反过来,如果 option 不为空字符串,则 spec 参数必须为 None。如果两个参数均未提供,则 AWS Glue 尝试解析架构并用它来解析歧义。

    specs 元组的 action 部分可以指定四个解析策略之一:

    • cast:   允许指定一种要强制转换到的类型 (例如,cast:int)。

    • make_cols:   通过展平数据来解析潜在的歧义。例如,如果 columnAintstring,则解析就是在结果 DynamicFrame 中生成名为 columnA_intcolumnA_string 的两个列。

    • make_struct:   通过用一种结构表示数据来解析潜在的歧义。例如,如果某个列中的数据是 intstring,则使用 make_struct 操作会在结果 DynamicFrame 中生成结构列,而每个结构都同时包含 intstring

    • project:   通过将所有数据投影到可能的数据类型之一来解析潜在的歧义。例如,如果某个列中的数据是 intstring,则使用 project:string 操作会在结果 DynamicFrame 中生成一个列,其中所有 int 值都已转换为字符串。

    如果 path 识别到数组,则在数组名称后放置一个空的方括号可避免歧义。例如,假设您正在使用结构如下的数据:

    "myList": [ { "price": 100.00 }, { "price": "$100.00" } ]

    可以通过将 path 设置为 "myList[].price"、将 action 设置为 "cast:double" 来选择价格的数字而非字符串版本。

  • optionspecs 参数为 None 时的默认解析操作。如果 specs 参数不为 None,则只能将此项设置为空字符串,不能设置为其他值。

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

  • info – 与此转换的错误报告关联的字符串(可选)。

  • stageThreshold – 在此转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

  • totalThreshold – 在此转换之前及转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

df1 = df.resolveChoice(option = "make_cols") df2 = df.resolveChoice(specs = [("a.b", "make_struct"), ("c.d", "cast:double")])

select_fields

select_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

返回包含选定字段的新的 DynamicFrame

  • paths – 字符串列表,每个字符串就是一个到您要选择的顶层节点的路径。

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

  • info – 与此转换的错误报告关联的字符串(可选)。

  • stageThreshold – 在此转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

  • totalThreshold – 在此转换之前及转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

spigot

spigot(path, options={})

在转换期间将示例记录写入指定目标位置,并返回包含一个额外写入步骤的输入 DynamicFrame

  • path – 要向其中写入内容的目标位置的路径(必需)。

  • options – 指定选项的键值对(可选)。"topk" 选项指定应写入第一条 k 记录。"prob" 选项指定选择任何给定记录的可能性(以十进制数字形式表示),用于选择要写入的记录。

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

split_fields

split_fields(paths, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

返回一个新的 DynamicFrameCollection,其中包含两个 DynamicFrames:第一个包含所有已拆分的节点,第二个包含其余节点。

  • paths – 字符串列表,每个字符串是到一个您要拆分为新的 DynamicFrame 的节点的完整路径。

  • name1 – 拆分的 DynamicFrame 的名称字符串。

  • name2 – 指定节点拆分后留存的 DynamicFrame 的名称字符串。

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

  • info – 与此转换的错误报告关联的字符串(可选)。

  • stageThreshold – 在此转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

  • totalThreshold – 在此转换之前及转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

split_rows

DynamicFrame 中的一个或多个行拆分到新的 DynamicFrame 中。

split_rows(comparison_dict, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

返回一个新的 DynamicFrameCollection,其中包含两个 DynamicFrames:第一个包含所有已拆分的行,第二个包含其余行。

  • comparison_dict – 一个词典,其中的键是到一个列的路径,值是另一个词典,用于将比较器映射到与该列值所比较的值。例如,{"age": {">": 10, "<": 20}} 拆分其年龄列中的值大于 10 但小于 20 的行。

  • name1 – 拆分的 DynamicFrame 的名称字符串。

  • name2 – 指定节点拆分后留存的 DynamicFrame 的名称字符串。

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

  • info – 与此转换的错误报告关联的字符串(可选)。

  • stageThreshold – 在此转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

  • totalThreshold – 在此转换之前及转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

unbox

unbox(path, format, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, **options)

取消装箱 DynamicFrame 中的一个字符串字段并返回包含已取消装箱 DynamicRecords 的新的 DynamicFrame

一个 DynamicRecord 表示 DynamicFrame 中的一条逻辑记录。它类似于 Apache Spark DataFrame 中的一行,只不过它是自描述的,可用于不符合固定架构的数据。

  • path – 要取消装箱的字符串节点的完整路径。

  • format – 格式规范(可选)。这用于 Amazon Simple Storage Service (Amazon S3) 或支持多种格式的 AWS Glue 连接。有关支持的格式,请参阅 中的 ETL 输入和输出的格式选项AWS Glue

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

  • info – 与此转换的错误报告关联的字符串(可选)。

  • stageThreshold – 在此转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

  • totalThreshold – 在此转换之前及转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

  • options – 下列一个或多个:

    • separator – 包含分隔符字符的字符串。

    • escaper – 包含转义字符的字符串。

    • skipFirst – 指示是否跳过第一个实例的布尔值。

    • withSchema – 包含架构的字符串;必须使用 StructType.json( ) 调用。

    • withHeader – 指示是否包括标头的布尔值。

例如:unbox("a.b.c", "csv", separator="|")

unnest

取消嵌套 DynamicFrame 中的嵌套对象,使其成为顶级对象,并返回新的取消嵌套的 DynamicFrame

unnest(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

取消嵌套 DynamicFrame 中的嵌套对象,使其成为顶级对象,并返回新的取消嵌套的 DynamicFrame

  • transformation_ctx – 用于标识状态信息的唯一字符串(可选)。

  • info – 与此转换的错误报告关联的字符串(可选)。

  • stageThreshold – 在此转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

  • totalThreshold – 在此转换之前及转换过程中遇到的将导致过程出错的错误数(可选,默认为零,表示此过程应该不会出错)。

例如:unnest( )

write

write(connection_type, connection_options, format, format_options, accumulator_size)

从此 DynamicFrameGlueContext 类 获得指定连接类型的 DataSink(object) 并将其用于格式化和写入此 DynamicFrame 的内容。返回按指定进行格式化和写入的新的 DynamicFrame

  • connection_type – 要使用的连接类型。有效值包括 s3mysqlpostgresqlredshiftsqlserveroracle

  • connection_options – 要使用的连接选项(可选)。对于 connection_types3 的情况,定义 Amazon S3 路径。

    connection_options = {"path": "s3://aws-glue-target/temp"}

    对于 JDBC 连接,必须定义多个属性。请注意,数据库名称必须是 URL 的一部分。它可以选择性地包含在连接选项中。

    connection_options = {"url": "jdbc-url/database", "user": "username", "password": "password","dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}
  • format – 格式规范(可选)。这用于 Amazon Simple Storage Service (Amazon S3) 或支持多种格式的 AWS Glue 连接。有关支持的格式,请参阅 中的 ETL 输入和输出的格式选项AWS Glue

  • format_options – 指定格式的格式选项。有关支持的格式,请参阅 中的 ETL 输入和输出的格式选项AWS Glue

  • accumulator_size – 要使用的可累积大小(可选)。

 — 错误 —

assertErrorThreshold

assertErrorThreshold( ) – 创建此 DynamicFrame 的转换中的错误的资产。从底层 DataFrame 返回 Exception

errorsAsDynamicFrame

errorsAsDynamicFrame( ) – 返回其中包含嵌套的错误记录的 DynamicFrame

errorsCount

errorsCount( ) – 返回 DynamicFrame 中的错误总数。

stageErrorsCount

stageErrorsCount – 返回生成此 DynamicFrame 的过程中发生的错误数。