本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
AWS Glue 斯卡拉 DynamicFrame 类
程序包:com.amazonaws.services.glue
class DynamicFrame extends Serializable with Logging (
val glueContext : GlueContext,
_records : RDD[DynamicRecord],
val name : String = s"",
val transformationContext : String = DynamicFrame.UNDEFINED,
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0,
prevErrors : => Long = 0,
errorExpr : => Unit = {} )
DynamicFrame
是自描述的 DynamicRecord 对象的分布式集合。
DynamicFrame
旨在为 ETL(提取、转换和加载)操作提供灵活的数据模型。它们不需要创建架构,可用于读取和转换具有杂乱或不一致的值和类型的数据。可以按需为需要架构的那些操作计算架构。
DynamicFrame
为数据清理和 ETL 提供了一系列转换。它们还支持向和向 SparkSQL DataFrames 与现有代码和许多 DataFrames 提供。
跨构造 DynamicFrame
的许多 AWS Glue 转换共享以下参数:
-
transformationContext
— 此的标识符DynamicFrame
。的transformationContext
用作在运行中持续的作业书签状态的键。 -
callSite
— 为错误报告提供上下文信息。在从 Python 调用时,会自动设置这些值。 -
stageThreshold
— 来自此DynamicFrame
计算的不会引发异常的最大错误记录数,不包括以前的DynamicFrame
中存在的记录。 -
totalThreshold
— 不会引发异常的最大错误记录总数,包括以前的帧中的记录。
阀 errorsCount
val errorsCount
此中的错误记录数 DynamicFrame
。这包括来自上一个操作的错误。
定义 applyMapping
def applyMapping( mappings : Seq[Product4[String, String, String, String]],
caseSensitive : Boolean = true,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
-
mappings
— 用于构造新DynamicFrame
的映射序列。 -
caseSensitive
— 处理源列时是否区分大小写。在与不区分大小写的存储(如 AWS Glue 数据目录)集成时,将此项设置为 false 可能很有帮助。
基于一系列的映射选择、投影和转换列。
每个映射由源列和类型以及目标列和类型构成。映射可指定为四元组 (source_path
、source_type
、 target_path
、target_type
) 或包含相同信息的 MappingSpec 对象。
除了将映射用于简单的投影和转换外,还可以通过使用“.
”(句点)分隔路径的组件来用于嵌套或取消嵌套字段。
例如,假设您有一个包含以下架构的 DynamicFrame
。
{{{
root
|-- name: string
|-- age: int
|-- address: struct
| |-- state: string
| |-- zip: int
}}}
您可以进行以下调用来取消嵌套 state
和 zip
字段。
{{{
df.applyMapping(
Seq(("name", "string", "name", "string"),
("age", "int", "age", "int"),
("address.state", "string", "state", "string"),
("address.zip", "int", "zip", "int")))
}}}
生成的架构如下所示。
{{{
root
|-- name: string
|-- age: int
|-- state: string
|-- zip: int
}}}
您还可以使用 applyMapping
来重新嵌套列。例如,以下代码反转以前的转换,并在目标中创建一个名为 address
的结构。
{{{
df.applyMapping(
Seq(("name", "string", "name", "string"),
("age", "int", "age", "int"),
("state", "string", "address.state", "string"),
("zip", "int", "address.zip", "int")))
}}}
包含“.
”(句点)字符的字段名称可以使用反引号 (``
) 括起来。
目前,您不能使用 applyMapping
方法映射嵌套在数组下的列。
定义 assertErrorThreshold
def assertErrorThreshold : Unit
强制计算和验证错误记录数量低于以下的操作 stageThreshold
和 totalThreshold
。如果任一条件失败,则抛出例外。
def count
lazy
def count
返回 DynamicFrame
中的元素数量。
定义 dropField
def dropField( path : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
返回已删除指定列的新 DynamicFrame
。
定义 dropFields
def dropFields( fieldNames : Seq[String], // The column names to drop.
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
返回已删除指定列的新 DynamicFrame
。
您可以使用此方法删除嵌套列(包括数组中的列),但不能用于删除特定数组元素。
定义 dropNulls
def dropNulls( transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0 )
返回已删除所有空列的新 DynamicFrame
。
这只移除了类型列 NullType
。不会删除或修改其他列中的单个空值。
定义 errorsAsDynamicFrame
def errorsAsDynamicFrame
返回包含此 DynamicFrame
中的错误记录的新 DynamicFrame
。
def filter
def filter( f : DynamicRecord => Boolean,
errorMsg : String = "",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
构建新的 DynamicFrame
仅包含其功能为“f
'退货 true
。筛选功能'f
'不应使输入记录进行突变。
定义 getName
def getName : String
返回此 DynamicFrame
的名称。
定义 getNumPartitions
def getNumPartitions
返回 DynamicFrame
中的分区数量。
定义 getSchemaIfComputed
def getSchemaIfComputed : Option[Schema]
如果架构已经过计算,则返回该架构。如果架构尚未经过计算,则不扫描数据。
定义 isSchemaComputed
def isSchemaComputed : Boolean
如果已为此 DynamicFrame
计算架构,则返回 true
;否则返回 false
。如果此方法返回 false,则调用 schema
方法将需要再次扫描此 DynamicFrame
中的记录。
定义 javaToPython
def javaToPython : JavaRDD[Array[Byte]]
def join
def join( keys1 : Seq[String],
keys2 : Seq[String],
frame2 : DynamicFrame,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
-
keys1
— 此DynamicFrame
中用于联接的列。 -
keys2
—frame2
中用于联接的列。必须与keys1
的长度相同。 -
frame2
— 要联接的DynamicFrame
。
返回使用指定的键对 frame2
执行 equijoin 的结果。
def map
def map( f : DynamicRecord => DynamicRecord,
errorMsg : String = "",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
返回通过对此 DynamicFrame
中的每个记录应用指定函数“f
”构造的新 DynamicFrame
。
此方法先复制每个记录,然后再应用指定函数,因此可以安全地转变记录。如果映射函数在给定记录上引发异常,则该记录将标记为错误,并且堆栈跟踪将另存为错误记录中的一个列。
定义 mergeDynamicFrames
def mergeDynamicFrames( stageDynamicFrame: DynamicFrame, primaryKeys: Seq[String], transformationContext: String = "",
options: JsonOptions = JsonOptions.empty, callSite: CallSite = CallSite("Not provided"),
stageThreshold: Long = 0, totalThreshold: Long = 0): DynamicFrame
-
stageDynamicFrame
— 要合并的暂存DynamicFrame
。 -
primaryKeys
— 要匹配源和暂存DynamicFrame
中的记录的主键字段列表。 -
transformationContext
— 用于检索有关当前转换的元数据的唯一字符串(可选)。 -
options
— 为此转换提供其他信息的 JSON 名称-值对的字符串。 -
callSite
— 用于为错误报告提供上下文信息。 -
stageThreshold
— 甲Long
。给定转换中处理需要出错的错误数。 -
totalThreshold
— 甲Long
。在此转换中,处理需要出错的错误总数(包括)。
基于指定主键的将此 DynamicFrame
与暂存 DynamicFrame
合并以标识记录。不会对重复记录(具有相同主键的记录)去除重复。如果暂存帧中没有匹配的记录,则从源中保留所有记录(包括重复记录)。如果暂存帧具有匹配的记录,则暂存帧中的记录将覆盖
AWS Glue 中的源中的记录。
在以下情况下,返回的 DynamicFrame
将包含记录 A:
-
如果
A
在源帧和暂存帧中都存在,则返回暂存帧中的A
。 -
如果
A
在源表中,且A.primaryKeys
不在stagingDynamicFrame
中(这意味着,未在暂存表中更新A
)。
源帧和暂存帧不需要具有相同的架构。
val mergedFrame: DynamicFrame = srcFrame.mergeDynamicFrames(stageFrame, Seq("id1", "id2"))
定义 printSchema
def printSchema : Unit
以人类可读的格式将此 DynamicFrame
的架构输出到 stdout
。
定义 recomputeSchema
def recomputeSchema : Schema
强制重新计算架构。这需要扫描数据,但如果数据中不存在当前架构中的一些字段,则可能会“压缩”架构。
返回经过重新计算的架构。
def relationalize
def relationalize( rootTableName : String,
stagingPath : String,
options : JsonOptions = JsonOptions.empty,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : Seq[DynamicFrame]
-
rootTableName
— 用于底座的名称DynamicFrame
在输出中。通过透视数组创建的DynamicFrame
以此作为前缀开头。 -
stagingPath
— 用于写入中间数据的 Amazon Simple Storage Service (Amazon S3) 路径。 -
options
— 关系化选项和配置。目前未使用。
展平所有嵌套的结构并将数组透视为单独的表。
您可以使用此操作准备深度嵌套的数据以提取到关系数据库中。使用与 unnest 转换相同的方式展平嵌套结构。此外,将数组透视为单独的表,其中每个数组元素成为一行。例如,假设您有一个包含以下数据的 DynamicFrame
。
{"name": "Nancy", "age": 47, "friends": ["Fred", "Lakshmi"]}
{"name": "Stephanie", "age": 28, "friends": ["Yao", "Phil", "Alvin"]}
{"name": "Nathan", "age": 54, "friends": ["Nicolai", "Karen"]}
执行以下代码。
{{{ df.relationalize("people", "s3:/my_bucket/my_path", JsonOptions.empty) }}}
这会生成两个表。第一个表名为“people”,其中包含以下内容。
{{{ {"name": "Nancy", "age": 47, "friends": 1} {"name": "Stephanie", "age": 28, "friends": 2} {"name": "Nathan", "age": 54, "friends": 3) }}}
在这里,friends 数组已被替换为自动生成的联接键。创建了名为 people.friends
的单独表,其中包含以下内容。
{{{ {"id": 1, "index": 0, "val": "Fred"} {"id": 1, "index": 1, "val": "Lakshmi"} {"id": 2, "index": 0, "val": "Yao"} {"id": 2, "index": 1, "val": "Phil"} {"id": 2, "index": 2, "val": "Alvin"} {"id": 3, "index": 0, "val": "Nicolai"} {"id": 3, "index": 1, "val": "Karen"} }}}
在此表中,“id
”是标识数组元素来自哪个记录的联接键,“index
”是指在原始数组中的位置,“val
”是实际的数组条目。
relationalize
方法返回通过以递归方式对所有数组应用此过程创建的 DynamicFrame
序列。
AWS Glue 库自动为新表生成联接键。为确保联接键在作业运行中是唯一的,您必须启用作业书签。
定义 renameField
def renameField( oldName : String,
newName : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
-
oldName
— 列的原始名称。 -
newName
— 列的新名称。
返回新 DynamicFrame
,其中的指定字段进行了重命名。
您可以使用此方法重命名嵌套字段。例如,以下代码在 address 结构中将 state
重命名为 state_code
。
{{{ df.renameField("address.state", "address.state_code") }}}
def repartition
def repartition( numPartitions : Int,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
返回具有 numPartitions
分区的新 DynamicFrame
。
定义 resolveChoice
def resolveChoice( specs : Seq[Product2[String, String]] = Seq.empty[ResolveSpec],
choiceOption : Option[ChoiceOption] = None,
database : Option[String] = None,
tableName : Option[String] = None,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
-
choiceOption
— 应用于 specs 序列中未列出的所有ChoiceType
列的操作。 -
database
— 与match_catalog
操作一起使用的 数据目录 数据库。 -
tableName
— 与match_catalog
操作一起使用的 数据目录 表。
通过将一个或多个 ChoiceType
替换为更具体的类型来返回新 DynamicFrame
。
有两种使用方法 resolveChoice
。第一个是指定特定列的顺序以及如何解决它们。这些指定为由 (列, 操作) 对组成的元组。
可能的操作如下:
-
cast:type
— 尝试将所有值转换为指定的类型。 -
make_cols
— 将每个不同的类型转换为名为columnName_type
的列。 -
make_struct
— 将列转换为具有每个不同类型的键的结构。 -
project:type
— 仅保留指定类型的值。
resolveChoice
的另一种模式是为所有 ChoiceType
指定单个解析方法。这可以在执行前不知道 ChoiceType
的完整列表的情况下使用。除了上面列出的操作外,此模式还支持以下操作:
-
match_catalog
— 尝试将每个ChoiceType
转换为指定目录表中的对应类型。
示例:
通过转换为 int 来解析 user.id
列,并使 address
字段仅保留结构。
{{{ df.resolveChoice(specs = Seq(("user.id", "cast:int"), ("address", "project:struct"))) }}}
通过将每个选择转换为单独的列来解析所有 ChoiceType
。
{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("make_cols"))) }}}
通过转换为指定目录表中的类型来解析所有 ChoiceType
。
{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("match_catalog")), database = Some("my_database"), tableName = Some("my_table")) }}}
def schema
def schema : Schema
返回此 DynamicFrame
的架构。
返回的方案保证包含此 DynamicFrame
。但是,在少数情况下,它还可能包含附加字段。您可以使用 unnest 方法基于此 DynamicFrame
中的记录来“压缩”架构。
定义 selectField
def selectField( fieldName : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
返回作为 DynamicFrame
的单个字段。
定义 selectFields
def selectFields( paths : Seq[String],
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
-
paths
— 要选择的列名称序列。
返回包含指定列的新 DynamicFrame
。
您只能使用 selectFields
方法选择顶级列。您可以使用 applyMapping 方法选择嵌套列。
def show
def show( numRows : Int = 20 ) : Unit
-
numRows
— 要输出的行数。
以 JSON 格式输出此 DynamicFrame
中的行。
def spigot
def spigot( path : String,
options : JsonOptions = new JsonOptions("{}"),
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
返回相同记录但写出一部分记录作为副作用的传递转换。
-
path
— 将输出写入的 Amazon S3 中的路径,格式为s3://bucket//path
。 -
options
— 描述取样行为的可选的JsonOptions
映射。
返回包含与这一个相同的记录的 DynamicFrame
。
默认情况下,将100条任意记录写入到指定的位置 path
。您可以使用 options
映射。有效键包括:
-
topk
— 指定写出的记录的总数。默认值为 100。 -
prob
— 指定包含单个记录的概率(小数形式)。默认值为 1。
例如,以下调用将对数据集进行取样,方法是以 20% 的概率选择每个记录并在写入 200 个记录后停止。
{{{ df.spigot("s3://my_bucket/my_path", JsonOptions(Map("topk" -> 200, "prob" -> 0.2))) }}}
定义 splitFields
def splitFields( paths : Seq[String],
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : Seq[DynamicFrame]
-
paths
— 包括在第一个DynamicFrame
中的路径。
返回两个 DynamicFrame
的序列。第一个 DynamicFrame
包含指定路径,第二个包含所有其他列。
定义 splitRows
def splitRows( paths : Seq[String],
values : Seq[Any],
operators : Seq[String],
transformationContext : String,
callSite : CallSite,
stageThreshold : Long,
totalThreshold : Long
) : Seq[DynamicFrame]
基于将列与常量比较的谓词来拆分行。
-
paths
— 用于比较的列。 -
values
— 用于比较的常量值。 -
operators
— 用于比较的运算符。
返回两个 DynamicFrame
的序列。第一个包含其谓词为 true 的行,第二个包含其谓词为 false 的行。
使用三个序列指定谓词: ”paths
'包含(可能嵌套)列名称,'values
'包含要比较的常数值,并且'operators
'包含用于比较的运算符。所有三个序列的长度必须相同: 的 n
使用运算符比较 n
第列, n
th值。
每个运算符必须是“!=
英寸、英寸=
英寸、英寸<=
英寸、英寸<
英寸、英寸>=
",或">
英寸。
例如,以下调用将拆分 DynamicFrame
,以便第一个输出帧将包含美国 65 岁以上的人员记录,第二个将包含所有其他记录。
{{{ df.splitRows(Seq("age", "address.country"), Seq(65, "USA"), Seq(">=", "=")) }}}
定义 stageErrorsCount
def stageErrorsCount
返回计算此错误时创建的错误记录数 DynamicFrame
。这不包括以前操作中导入此的错误 DynamicFrame
作为输入。
定义 toDF
def toDF( specs : Seq[ResolveSpec] = Seq.empty[ResolveSpec] ) : DataFrame
将此 DynamicFrame
转换为具有相同架构和记录的 Apache Spark SQL DataFrame
。
由于 DataFrame
不支持 ChoiceType
,因此此方法自动将 ChoiceType
列转换为 StructType
。有关更多信息和用于解析选择的选项,请参阅 resolveChoice。
def unbox
def unbox( path : String,
format : String,
optionString : String = "{}",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
-
path
— 要分析的列。必须是字符串或二进制。 -
format
— 用于分析的格式。 -
optionString
— 传递到格式的选项,如 CSV 分隔符。
根据指定的格式分析嵌入式字符串或二进制列。已分析的列嵌套在具有原始列名称的结构下。
例如,假设您具有包含一个嵌入式 JSON 列的 CSV 文件。
name, age, address Sally, 36, {"state": "NE", "city": "Omaha"} ...
在初始分析后,您将获取具有以下架构的 DynamicFrame
。
{{{ root |-- name: string |-- age: int |-- address: string }}}
您可以对地址列调用 unbox
来分析特定组件。
{{{ df.unbox("address", "json") }}}
这为我们提供了具有以下架构的 DynamicFrame
。
{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}
def unnest
def unnest( transformationContext : String = "",
callSite : CallSite = CallSite("Not Provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
返回平展了所有嵌套结构的新 DynamicFrame
。构造名称时使用“.
”(句点)字符。
例如,假设您有一个包含以下架构的 DynamicFrame
。
{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}
以下调用取消嵌套 address 结构。
{{{ df.unnest() }}}
生成的架构如下所示。
{{{ root |-- name: string |-- age: int |-- address.state: string |-- address.city: string }}}
此方法还取消嵌套数组中的嵌套结构。但由于历史原因,此类字段的名称前附加了括起来的数组的名称和“.val
”。
定义 withFrameSchema
def withFrameSchema( getSchema : () => Schema ) : DynamicFrame
-
getSchema
— 返回要使用的架构的函数。指定为零参数函数来推迟可能昂贵的计算。
将此 DynamicFrame
的架构设置为指定值。这主要在内部使用以避免成本高昂的架构重新计算。传入的架构必须包含数据中存在的所有列。
定义 withName
def withName( name : String ) : DynamicFrame
-
name
— 要使用的新名称。
返回使用新名称的此 DynamicFrame
的副本。
定义 withTransformationContext
def withTransformationContext( ctx : String ) : DynamicFrame
返回具有指定转换上下文的此 DynamicFrame
的副本。