AWS Glue Scala DynamicFrame 类 - AWS Glue
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

如果我们为英文版本指南提供翻译,那么如果存在任何冲突,将以英文版本指南为准。在提供翻译时使用机器翻译。

AWS Glue Scala DynamicFrame 类

程序包:com.amazonaws.services.glue

class DynamicFrame extends Serializable with Logging ( val glueContext : GlueContext, _records : RDD[DynamicRecord], val name : String = s"", val transformationContext : String = DynamicFrame.UNDEFINED, callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0, prevErrors : => Long = 0, errorExpr : => Unit = {} )

DynamicFrame 是自描述的 DynamicRecord 对象的分布式集合。

DynamicFrame 旨在为 ETL(提取、转换和加载)操作提供灵活的数据模型。它们不需要创建架构,可用于读取和转换具有杂乱或不一致的值和类型的数据。可以按需为需要架构的那些操作计算架构。

DynamicFrame 为数据清理和 ETL 提供了一系列转换。它们还支持转换为 SparkSQL DataFrame 和从其转换以与现有代码和 DataFrame 提供的许多分析操作集成。

跨构造 DynamicFrame 的许多 AWS Glue 转换共享以下参数:

  • transformationContext — 此 DynamicFrame 的标识符。transformationContext 用作跨运行保存的作业书签状态的密钥。

  • callSite — 为错误报告提供上下文信息。在从 Python 调用时,会自动设置这些值。

  • stageThreshold — 来自此 DynamicFrame 计算的不会引发异常的最大错误记录数,不包括以前的 DynamicFrame 中存在的记录。

  • totalThreshold — 不会引发异常的最大错误记录总数,包括以前的帧中的记录。

val errorsCount

val errorsCount

DynamicFrame 中的错误记录数。这包括来自以前的操作的错误。

def applyMapping

def applyMapping( mappings : Seq[Product4[String, String, String, String]], caseSensitive : Boolean = true, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • mappings — 用于构造新 DynamicFrame 的映射序列。

  • caseSensitive — 处理源列时是否区分大小写。在与不区分大小写的存储(如 AWS Glue 数据目录)集成时,将此项设置为 false 可能很有帮助。

基于一系列的映射选择、投影和转换列。

每个映射由源列和类型以及目标列和类型构成。映射可指定为四元组 (source_pathsource_type target_pathtarget_type) 或包含相同信息的 MappingSpec 对象。

除了将映射用于简单的投影和转换外,还可以通过使用“.”(句点)分隔路径的组件来用于嵌套或取消嵌套字段。

例如,假设您有一个包含以下架构的 DynamicFrame

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- zip: int }}}

您可以进行以下调用来取消嵌套 statezip 字段。

{{{ df.applyMapping( Seq(("name", "string", "name", "string"), ("age", "int", "age", "int"), ("address.state", "string", "state", "string"), ("address.zip", "int", "zip", "int"))) }}}

生成的架构如下所示。

{{{ root |-- name: string |-- age: int |-- state: string |-- zip: int }}}

您还可以使用 applyMapping 来重新嵌套列。例如,以下代码反转以前的转换,并在目标中创建一个名为 address 的结构。

{{{ df.applyMapping( Seq(("name", "string", "name", "string"), ("age", "int", "age", "int"), ("state", "string", "address.state", "string"), ("zip", "int", "address.zip", "int"))) }}}

包含“.”(句点)字符的字段名称可以使用反引号 (``) 括起来。

注意

目前,您不能使用 applyMapping 方法映射嵌套在数组下的列。

def assertErrorThreshold

def assertErrorThreshold : Unit

强制计算并验证错误记录数是否低于 stageThresholdtotalThreshold 的操作。如果任一条件失败,则引发异常。

def count

lazy def count

返回 DynamicFrame 中的元素数量。

def dropField

def dropField( path : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

返回已删除指定列的新 DynamicFrame

def dropFields

def dropFields( fieldNames : Seq[String], // The column names to drop. transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

返回已删除指定列的新 DynamicFrame

您可以使用此方法删除嵌套列(包括数组中的列),但不能用于删除特定数组元素。

def dropNulls

def dropNulls( transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 )

返回已删除所有空列的新 DynamicFrame

注意

这只删除类型为 NullType 的列。不删除或修改其他列中的单个空值。

def errorsAsDynamicFrame

def errorsAsDynamicFrame

返回包含此 DynamicFrame 中的错误记录的新 DynamicFrame

def filter

def filter( f : DynamicRecord => Boolean, errorMsg : String = "", transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

构造只包含函数“f”为其返回 true 的那些记录的新 DynamicFrame。筛选器函数“f”不应转变输入记录。

def getName

def getName : String

返回此 DynamicFrame 的名称。

def getNumPartitions

def getNumPartitions

返回 DynamicFrame 中的分区数量。

def getSchemaIfComputed

def getSchemaIfComputed : Option[Schema]

如果架构已经过计算,则返回该架构。如果架构尚未经过计算,则不扫描数据。

def isSchemaComputed

def isSchemaComputed : Boolean

如果已为此 DynamicFrame 计算架构,则返回 true;否则返回 false。如果此方法返回 false,则调用 schema 方法将需要再次扫描此 DynamicFrame 中的记录。

def javaToPython

def javaToPython : JavaRDD[Array[Byte]]

def join

def join( keys1 : Seq[String], keys2 : Seq[String], frame2 : DynamicFrame, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • keys1 — 此 DynamicFrame 中用于联接的列。

  • keys2 — frame2 中用于联接的列。必须与 keys1 的长度相同。

  • frame2 — 要联接的 DynamicFrame

返回使用指定的键对 frame2 执行 equijoin 的结果。

def map

def map( f : DynamicRecord => DynamicRecord, errorMsg : String = "", transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

返回通过对此 DynamicFrame 中的每个记录应用指定函数“f”构造的新 DynamicFrame

此方法先复制每个记录,然后再应用指定函数,因此可以安全地转变记录。如果映射函数在给定记录上引发异常,则该记录将标记为错误,并且堆栈跟踪将另存为错误记录中的一个列。

def mergeDynamicFrames

def mergeDynamicFrames( stageDynamicFrame: DynamicFrame, primaryKeys: Seq[String], transformationContext: String = "", options: JsonOptions = JsonOptions.empty, callSite: CallSite = CallSite("Not provided"), stageThreshold: Long = 0, totalThreshold: Long = 0): DynamicFrame
  • stageDynamicFrame — 要合并的暂存 DynamicFrame

  • primaryKeys — 要匹配源和暂存 DynamicFrame 中的记录的主键字段列表。

  • transformationContext — 用于检索有关当前转换的元数据的唯一字符串(可选)。

  • options — 为此转换提供其他信息的 JSON 名称-值对的字符串。

  • callSite — 用于为错误报告提供上下文信息。

  • stageThreshold — 一个 Long。给定转换中处理需要排除的错误的数目。

  • totalThreshold — 一个 Long。此转换中处理需要排除的错误的总数。

基于指定主键的将此 DynamicFrame 与暂存 DynamicFrame 合并以标识记录。不会对重复记录(具有相同主键的记录)去除重复。如果暂存帧中没有匹配的记录,则从源中保留所有记录(包括重复记录)。如果暂存帧具有匹配的记录,则暂存帧中的记录将覆盖 AWS Glue 中的源中的记录。

在以下情况下,返回的 DynamicFrame 将包含记录 A:

  1. 如果 A 在源帧和暂存帧中都存在,则返回暂存帧中的 A

  2. 如果 A 在源表中,且A.primaryKeys 不在 stagingDynamicFrame 中(这意味着,未在暂存表中更新 A)。

源帧和暂存帧不需要具有相同的架构。

val mergedFrame: DynamicFrame = srcFrame.mergeDynamicFrames(stageFrame, Seq("id1", "id2"))

def printSchema

def printSchema : Unit

以人类可读的格式将此 DynamicFrame 的架构输出到 stdout

def recomputeSchema

def recomputeSchema : Schema

强制重新计算架构。这需要扫描数据,但如果数据中不存在当前架构中的一些字段,则可能会“压缩”架构。

返回经过重新计算的架构。

def relationalize

def relationalize( rootTableName : String, stagingPath : String, options : JsonOptions = JsonOptions.empty, transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : Seq[DynamicFrame]
  • rootTableName — 在输出中用于基本 DynamicFrame 的名称。通过透视数组创建的 DynamicFrame 以此作为前缀开头。

  • stagingPath — 用于写入中间数据的 Amazon Simple Storage Service (Amazon S3) 路径。

  • options — 关系化选项和配置。目前未使用。

展平所有嵌套的结构并将数组透视为单独的表。

您可以使用此操作准备深度嵌套的数据以提取到关系数据库中。使用与 unnest 转换相同的方式展平嵌套结构。此外,将数组透视为单独的表,其中每个数组元素成为一行。例如,假设您有一个包含以下数据的 DynamicFrame

{"name": "Nancy", "age": 47, "friends": ["Fred", "Lakshmi"]} {"name": "Stephanie", "age": 28, "friends": ["Yao", "Phil", "Alvin"]} {"name": "Nathan", "age": 54, "friends": ["Nicolai", "Karen"]}

执行以下代码。

{{{ df.relationalize("people", "s3:/my_bucket/my_path", JsonOptions.empty) }}}

这会生成两个表。第一个表名为“people”,其中包含以下内容。

{{{ {"name": "Nancy", "age": 47, "friends": 1} {"name": "Stephanie", "age": 28, "friends": 2} {"name": "Nathan", "age": 54, "friends": 3) }}}

在这里,friends 数组已被替换为自动生成的联接键。创建了名为 people.friends 的单独表,其中包含以下内容。

{{{ {"id": 1, "index": 0, "val": "Fred"} {"id": 1, "index": 1, "val": "Lakshmi"} {"id": 2, "index": 0, "val": "Yao"} {"id": 2, "index": 1, "val": "Phil"} {"id": 2, "index": 2, "val": "Alvin"} {"id": 3, "index": 0, "val": "Nicolai"} {"id": 3, "index": 1, "val": "Karen"} }}}

在此表中,“id”是标识数组元素来自哪个记录的联接键,“index”是指在原始数组中的位置,“val”是实际的数组条目。

relationalize 方法返回通过以递归方式对所有数组应用此过程创建的 DynamicFrame 序列。

注意

AWS Glue 库自动为新表生成联接键。为确保联接键在作业运行中是唯一的,您必须启用作业书签。

def renameField

def renameField( oldName : String, newName : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • oldName — 列的原始名称。

  • newName — 列的新名称。

返回新 DynamicFrame,其中的指定字段进行了重命名。

您可以使用此方法重命名嵌套字段。例如,以下代码在 address 结构中将 state 重命名为 state_code

{{{ df.renameField("address.state", "address.state_code") }}}

def repartition

def repartition( numPartitions : Int, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

返回具有 numPartitions 分区的新 DynamicFrame

def resolveChoice

def resolveChoice( specs : Seq[Product2[String, String]] = Seq.empty[ResolveSpec], choiceOption : Option[ChoiceOption] = None, database : Option[String] = None, tableName : Option[String] = None, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • choiceOption — 应用于 specs 序列中未列出的所有 ChoiceType 列的操作。

  • database — 与 match_catalog 操作一起使用的 数据目录 数据库。

  • tableName — 与 match_catalog 操作一起使用的 数据目录 表。

通过将一个或多个 ChoiceType 替换为更具体的类型来返回新 DynamicFrame

可通过两种方式使用 resolveChoice.第一种是指定一系列特定列以及如何解析它们。这些指定为由 (列, 操作) 对组成的元组。

可能的操作如下:

  • cast:type — 尝试将所有值转换为指定的类型。

  • make_cols — 将每个不同的类型转换为名为 columnName_type 的列。

  • make_struct — 将列转换为具有每个不同类型的键的结构。

  • project:type — 仅保留指定类型的值。

resolveChoice 的另一种模式是为所有 ChoiceType 指定单个解析方法。这可以在执行前不知道 ChoiceType 的完整列表的情况下使用。除了上面列出的操作外,此模式还支持以下操作:

  • match_catalog — 尝试将每个 ChoiceType 转换为指定目录表中的对应类型。

示例:

通过转换为 int 来解析 user.id 列,并使 address 字段仅保留结构。

{{{ df.resolveChoice(specs = Seq(("user.id", "cast:int"), ("address", "project:struct"))) }}}

通过将每个选择转换为单独的列来解析所有 ChoiceType

{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("make_cols"))) }}}

通过转换为指定目录表中的类型来解析所有 ChoiceType

{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("match_catalog")), database = Some("my_database"), tableName = Some("my_table")) }}}

def schema

def schema : Schema

返回此 DynamicFrame 的架构。

保证返回的架构包含此 DynamicFrame 中的记录中存在的每个字段。但在少数情况下,它可能还包含其他字段。您可以使用 unnest 方法基于此 DynamicFrame 中的记录来“压缩”架构。

def selectField

def selectField( fieldName : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

返回作为 DynamicFrame 的单个字段。

def selectFields

def selectFields( paths : Seq[String], transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • paths — 要选择的列名称序列。

返回包含指定列的新 DynamicFrame

注意

您只能使用 selectFields 方法选择顶级列。您可以使用 applyMapping 方法选择嵌套列。

def show

def show( numRows : Int = 20 ) : Unit
  • numRows — 要输出的行数。

以 JSON 格式输出此 DynamicFrame 中的行。

def spigot

def spigot( path : String, options : JsonOptions = new JsonOptions("{}"), transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

返回相同记录但写出一部分记录作为副作用的传递转换。

  • path — 将输出写入的 Amazon S3 中的路径,格式为 s3://bucket//path

  • options  —  描述取样行为的可选的 JsonOptions 映射。

返回包含与这一个相同的记录的 DynamicFrame

默认情况下,将 100 个任意记录写入通过 path 指定的位置。您可以使用 options 映射自定义此行为。有效键包括:

  • topk — 指定写出的记录的总数。默认值为 100。

  • prob — 指定包含单个记录的概率(小数形式)。默认值为 1。

例如,以下调用将对数据集进行取样,方法是以 20% 的概率选择每个记录并在写入 200 个记录后停止。

{{{ df.spigot("s3://my_bucket/my_path", JsonOptions(Map("topk" -> 200, "prob" -> 0.2))) }}}

def splitFields

def splitFields( paths : Seq[String], transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : Seq[DynamicFrame]
  • paths — 包括在第一个 DynamicFrame 中的路径。

返回两个 DynamicFrame 的序列。第一个 DynamicFrame 包含指定路径,第二个包含所有其他列。

def splitRows

def splitRows( paths : Seq[String], values : Seq[Any], operators : Seq[String], transformationContext : String, callSite : CallSite, stageThreshold : Long, totalThreshold : Long ) : Seq[DynamicFrame]

基于将列与常量比较的谓词来拆分行。

  • paths — 用于比较的列。

  • values — 用于比较的常量值。

  • operators — 用于比较的运算符。

返回两个 DynamicFrame 的序列。第一个包含其谓词为 true 的行,第二个包含其谓词为 false 的行。

使用三个序列指定谓词:“paths”包含(可能嵌套的)列名称,“values”包含要与其进行比较的常量值,“operators”包含用于比较的运算符。所有这三个序列必须长度相同:第 n 个运算符将用于将第 n 个列与第 n 个值进行比较。

每个运算符必须是以下运算符之一:“!=”、“=”、“<=”、“<”、“>=”或“>”。

例如,以下调用将拆分 DynamicFrame,以便第一个输出帧将包含美国 65 岁以上的人员记录,第二个将包含所有其他记录。

{{{ df.splitRows(Seq("age", "address.country"), Seq(65, "USA"), Seq(">=", "=")) }}}

def stageErrorsCount

def stageErrorsCount

返回在计算此 DynamicFrame 时创建的错误记录数。这不包括作为输入传递给此 DynamicFrame 的以前操作中的错误。

def toDF

def toDF( specs : Seq[ResolveSpec] = Seq.empty[ResolveSpec] ) : DataFrame

将此 DynamicFrame 转换为具有相同架构和记录的 Apache Spark SQL DataFrame

注意

由于 DataFrame 不支持 ChoiceType,因此此方法自动将 ChoiceType 列转换为 StructType。有关更多信息和用于解析选择的选项,请参阅 resolveChoice

def unbox

def unbox( path : String, format : String, optionString : String = "{}", transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • path — 要分析的列。必须是字符串或二进制。

  • format — 用于分析的格式。

  • optionString — 传递到格式的选项,如 CSV 分隔符。

根据指定的格式分析嵌入式字符串或二进制列。已分析的列嵌套在具有原始列名称的结构下。

例如,假设您具有包含一个嵌入式 JSON 列的 CSV 文件。

name, age, address Sally, 36, {"state": "NE", "city": "Omaha"} ...

在初始分析后,您将获取具有以下架构的 DynamicFrame

{{{ root |-- name: string |-- age: int |-- address: string }}}

您可以对地址列调用 unbox 来分析特定组件。

{{{ df.unbox("address", "json") }}}

这为我们提供了具有以下架构的 DynamicFrame

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}

def unnest

def unnest( transformationContext : String = "", callSite : CallSite = CallSite("Not Provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

返回平展了所有嵌套结构的新 DynamicFrame。构造名称时使用“.”(句点)字符。

例如,假设您有一个包含以下架构的 DynamicFrame

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}

以下调用取消嵌套 address 结构。

{{{ df.unnest() }}}

生成的架构如下所示。

{{{ root |-- name: string |-- age: int |-- address.state: string |-- address.city: string }}}

此方法还取消嵌套数组中的嵌套结构。但由于历史原因,此类字段的名称前附加了括起来的数组的名称和“.val”。

def withFrameSchema

def withFrameSchema( getSchema : () => Schema ) : DynamicFrame
  • getSchema — 返回要使用的架构的函数。指定为零参数函数来推迟可能昂贵的计算。

将此 DynamicFrame 的架构设置为指定值。这主要在内部使用以避免成本高昂的架构重新计算。传入的架构必须包含数据中存在的所有列。

def withName

def withName( name : String ) : DynamicFrame
  • name — 要使用的新名称。

返回使用新名称的此 DynamicFrame 的副本。

def withTransformationContext

def withTransformationContext( ctx : String ) : DynamicFrame

返回具有指定转换上下文的此 DynamicFrame 的副本。