Amazon Glue Scala DynamicFrame 类 - Amazon Glue
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Amazon Glue Scala DynamicFrame 类

程序包:com.amazonaws.services.glue

class DynamicFrame extends Serializable with Logging ( val glueContext : GlueContext, _records : RDD[DynamicRecord], val name : String = s"", val transformationContext : String = DynamicFrame.UNDEFINED, callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0, prevErrors : => Long = 0, errorExpr : => Unit = {} )

DynamicFrame 是自描述的 DynamicRecord 对象的分布式集合。

DynamicFrame 旨在为 ETL(提取、转换和加载)操作提供灵活的数据模型。它们不需要创建架构,可用于读取和转换具有杂乱或不一致的值和类型的数据。可以按需为需要架构的那些操作计算架构。

DynamicFrame 为数据清理和 ETL 提供了一系列转换。它们还支持转换为 SparkSQL DataFrame 和从其转换以与现有代码和 DataFrame 提供的许多分析操作集成。

跨构造 DynamicFrame 的许多 Amazon Glue 转换共享以下参数:

  • transformationContext — 此 DynamicFrame 的标识符。transformationContext 用作跨运行保存的作业书签状态的密钥。

  • callSite – 为错误报告提供上下文信息。在从 Python 调用时,会自动设置这些值。

  • stageThreshold – 在引发异常之前允许的来自此 DynamicFrame 计算的最大错误记录数,不包括以前的 DynamicFrame 中存在的记录。

  • totalThreshold – 引发异常之前的最大错误记录总数,包括以前的帧中的记录。

Val errorsCount

val errorsCount

DynamicFrame 中的错误记录数。这包括来自以前的操作的错误。

Def applyMapping

def applyMapping( mappings : Seq[Product4[String, String, String, String]], caseSensitive : Boolean = true, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • mappings – 用于构造新 DynamicFrame 的映射序列。

  • caseSensitive – 是否将源列视为区分大小写。在与不区分大小写的存储(如 Amazon Glue 数据目录)集成时,将此项设置为 false 可能很有帮助。

基于一系列的映射选择、投影和转换列。

每个映射由源列和类型以及目标列和类型构成。映射可指定为四元组 (source_pathsource_type target_pathtarget_type) 或包含相同信息的 MappingSpec 对象。

除了将映射用于简单的投影和转换外,还可以通过使用“.”(句点)分隔路径的组件来用于嵌套或取消嵌套字段。

例如,假设您有一个包含以下架构的 DynamicFrame

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- zip: int }}}

您可以进行以下调用来取消嵌套 statezip 字段。

{{{ df.applyMapping( Seq(("name", "string", "name", "string"), ("age", "int", "age", "int"), ("address.state", "string", "state", "string"), ("address.zip", "int", "zip", "int"))) }}}

生成的架构如下所示。

{{{ root |-- name: string |-- age: int |-- state: string |-- zip: int }}}

您还可以使用 applyMapping 来重新嵌套列。例如,以下代码反转以前的转换,并在目标中创建一个名为 address 的结构。

{{{ df.applyMapping( Seq(("name", "string", "name", "string"), ("age", "int", "age", "int"), ("state", "string", "address.state", "string"), ("zip", "int", "address.zip", "int"))) }}}

包含“.”(句点)字符的字段名称可以使用反引号 (``) 括起来。

注意

目前,您不能使用 applyMapping 方法映射嵌套在数组下的列。

Def assertErrorThreshold

def assertErrorThreshold : Unit

强制计算并验证错误记录数是否低于 stageThresholdtotalThreshold 的操作。如果任一条件失败,则引发异常。

Def count

lazy def count

返回 DynamicFrame 中的元素数量。

Def dropField

def dropField( path : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

返回已删除指定列的新 DynamicFrame

Def dropFields

def dropFields( fieldNames : Seq[String], // The column names to drop. transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

返回已删除指定列的新 DynamicFrame

您可以使用此方法删除嵌套列(包括数组中的列),但不能用于删除特定数组元素。

Def dropNulls

def dropNulls( transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 )

返回已删除所有空列的新 DynamicFrame

注意

这只删除类型为 NullType 的列。不删除或修改其他列中的单个空值。

Def errorsAsDynamicFrame

def errorsAsDynamicFrame

返回包含此 DynamicFrame 中的错误记录的新 DynamicFrame

Def filter

def filter( f : DynamicRecord => Boolean, errorMsg : String = "", transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

构造只包含函数“f”为其返回 true 的那些记录的新 DynamicFrame。筛选器函数“f”不应转变输入记录。

Def getName

def getName : String

返回此 DynamicFrame 的名称。

Def getNumPartitions

def getNumPartitions

返回 DynamicFrame 中的分区数量。

Def getSchemaIfComputed

def getSchemaIfComputed : Option[Schema]

如果架构已经过计算,则返回该架构。如果架构尚未经过计算,则不扫描数据。

Def isSchemaComputed

def isSchemaComputed : Boolean

如果已为此 DynamicFrame 计算架构,则返回 true;否则返回 false。如果此方法返回 false,则调用 schema 方法将需要再次扫描此 DynamicFrame 中的记录。

Def javaToPython

def javaToPython : JavaRDD[Array[Byte]]

Def join

def join( keys1 : Seq[String], keys2 : Seq[String], frame2 : DynamicFrame, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • keys1 – 此 DynamicFrame 中用于联接的列。

  • keys2frame2 中用于联接的列。必须与 keys1 的长度相同。

  • frame2 – 要联接的 DynamicFrame

返回使用指定的键对 frame2 执行 equijoin 的结果。

Def map

def map( f : DynamicRecord => DynamicRecord, errorMsg : String = "", transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

返回通过对此 DynamicFrame 中的每个记录应用指定函数“f”构造的新 DynamicFrame

此方法先复制每个记录,然后再应用指定函数,因此可以安全地转变记录。如果映射函数在给定记录上引发异常,则该记录将标记为错误,并且堆栈跟踪将另存为错误记录中的一个列。

Def mergeDynamicFrames

def mergeDynamicFrames( stageDynamicFrame: DynamicFrame, primaryKeys: Seq[String], transformationContext: String = "", options: JsonOptions = JsonOptions.empty, callSite: CallSite = CallSite("Not provided"), stageThreshold: Long = 0, totalThreshold: Long = 0): DynamicFrame
  • stageDynamicFrame – 要合并的暂存 DynamicFrame

  • primaryKeys – 要匹配源和暂存 DynamicFrame 中的记录的主键字段列表。

  • transformationContext – 用于检索有关当前转换的元数据的唯一字符串(可选)。

  • options – 为此转换提供其他信息的 JSON 名称-值对的字符串。

  • callSite – 用于为错误报告提供上下文信息。

  • stageThreshold — 一个 Long。给定转换中处理需要排除的错误的数目。

  • totalThreshold — 一个 Long。此转换中处理需要排除的错误的总数。

基于指定主键的将此 DynamicFrame 与暂存 DynamicFrame 合并以标识记录。不会对重复记录(具有相同主键的记录)去除重复。如果暂存帧中没有匹配的记录,则从源中保留所有记录(包括重复记录)。如果暂存帧具有匹配的记录,则暂存帧中的记录将覆盖 Amazon Glue 中的源中的记录。

在以下情况下,返回的 DynamicFrame 将包含记录 A:

  1. 如果 A 在源帧和暂存帧中都存在,则返回暂存帧中的 A

  2. 如果 A 在源表中,且A.primaryKeys 不在 stagingDynamicFrame 中(这意味着,未在暂存表中更新 A)。

源帧和暂存帧不需要具有相同的架构。

val mergedFrame: DynamicFrame = srcFrame.mergeDynamicFrames(stageFrame, Seq("id1", "id2"))

Def printSchema

def printSchema : Unit

以人类可读的格式将此 DynamicFrame 的架构输出到 stdout

Def recomputeSchema

def recomputeSchema : Schema

强制重新计算架构。这需要扫描数据,但如果数据中不存在当前架构中的一些字段,则可能会“压缩”架构。

返回经过重新计算的架构。

Def relationalize

def relationalize( rootTableName : String, stagingPath : String, options : JsonOptions = JsonOptions.empty, transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : Seq[DynamicFrame]
  • rootTableName – 在输出中用于基本 DynamicFrame 的名称。通过透视数组创建的 DynamicFrame 以此作为前缀开头。

  • stagingPath – 用于写入中间数据的 Amazon Simple Storage Service(Amazon S3)路径。

  • options – 关系化选项和配置。目前未使用。

展平所有嵌套的结构并将数组透视为单独的表。

您可以使用此操作准备深度嵌套的数据以提取到关系数据库中。使用与 Unnest 转换相同的方式展平嵌套结构。此外,将数组透视为单独的表,其中每个数组元素成为一行。例如,假设您有一个包含以下数据的 DynamicFrame

{"name": "Nancy", "age": 47, "friends": ["Fred", "Lakshmi"]} {"name": "Stephanie", "age": 28, "friends": ["Yao", "Phil", "Alvin"]} {"name": "Nathan", "age": 54, "friends": ["Nicolai", "Karen"]}

运行以下代码。

{{{ df.relationalize("people", "s3:/my_bucket/my_path", JsonOptions.empty) }}}

这会生成两个表。第一个表名为“people”,其中包含以下内容。

{{{ {"name": "Nancy", "age": 47, "friends": 1} {"name": "Stephanie", "age": 28, "friends": 2} {"name": "Nathan", "age": 54, "friends": 3) }}}

在这里,friends 数组已被替换为自动生成的联接键。创建了名为 people.friends 的单独表,其中包含以下内容。

{{{ {"id": 1, "index": 0, "val": "Fred"} {"id": 1, "index": 1, "val": "Lakshmi"} {"id": 2, "index": 0, "val": "Yao"} {"id": 2, "index": 1, "val": "Phil"} {"id": 2, "index": 2, "val": "Alvin"} {"id": 3, "index": 0, "val": "Nicolai"} {"id": 3, "index": 1, "val": "Karen"} }}}

在此表中,“id”是标识数组元素来自哪个记录的联接键,“index”是指在原始数组中的位置,“val”是实际的数组条目。

relationalize 方法返回通过以递归方式对所有数组应用此过程创建的 DynamicFrame 序列。

注意

Amazon Glue 库自动为新表生成联接键。为确保联接键在作业运行中是唯一的,您必须启用作业书签。

Def renameField

def renameField( oldName : String, newName : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • oldName – 列的原始名称。

  • newName – 列的新名称。

返回新 DynamicFrame,其中的指定字段进行了重命名。

您可以使用此方法重命名嵌套字段。例如,以下代码在 address 结构中将 state 重命名为 state_code

{{{ df.renameField("address.state", "address.state_code") }}}

Def repartition

def repartition( numPartitions : Int, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

返回具有 numPartitions 分区的新 DynamicFrame

Def resolveChoice

def resolveChoice( specs : Seq[Product2[String, String]] = Seq.empty[ResolveSpec], choiceOption : Option[ChoiceOption] = None, database : Option[String] = None, tableName : Option[String] = None, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • choiceOption – 应用于 specs 序列中未列出的所有 ChoiceType 列的操作。

  • database – 与 match_catalog 操作一起使用的数据目录数据库。

  • tableName – 与 match_catalog 操作一起使用的数据目录表。

通过将一个或多个 ChoiceType 替换为更具体的类型来返回新 DynamicFrame

可通过两种方式使用 resolveChoice。第一种是指定一系列特定列以及如何解析它们。这些指定为由 (列, 操作) 对组成的元组。

可能的操作如下:

  • cast:type – 尝试将所有值转换为指定的类型。

  • make_cols – 将每个不同的类型转换为名为 columnName_type 的列。

  • make_struct – 将列转换为具有每个不同类型的键的结构。

  • project:type – 仅保留指定类型的值。

resolveChoice 的另一种模式是为所有 ChoiceType 指定单个解析方法。这可以在执行前不知道 ChoiceType 的完整列表的情况下使用。除了上面列出的操作外,此模式还支持以下操作:

  • match_catalogChoiceType – 尝试将每个 转换为指定目录表中的对应类型。

示例:

通过转换为 int 来解析 user.id 列,并使 address 字段仅保留结构。

{{{ df.resolveChoice(specs = Seq(("user.id", "cast:int"), ("address", "project:struct"))) }}}

通过将每个选择转换为单独的列来解析所有 ChoiceType

{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("make_cols"))) }}}

通过转换为指定目录表中的类型来解析所有 ChoiceType

{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("match_catalog")), database = Some("my_database"), tableName = Some("my_table")) }}}

Def schema

def schema : Schema

返回此 DynamicFrame 的架构。

保证返回的架构包含此 DynamicFrame 中的记录中存在的每个字段。但在少数情况下,它可能还包含其他字段。您可以使用 Unnest 方法基于此 DynamicFrame 中的记录来“压缩”架构。

Def selectField

def selectField( fieldName : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

返回作为 DynamicFrame 的单个字段。

Def selectFields

def selectFields( paths : Seq[String], transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • paths – 要选择的列名称序列。

返回包含指定列的新 DynamicFrame

注意

您只能使用 selectFields 方法选择顶级列。您可以使用 applyMapping 方法选择嵌套列。

Def show

def show( numRows : Int = 20 ) : Unit
  • numRows – 要输出的行数。

以 JSON 格式输出此 DynamicFrame 中的行。

Def simplifyDDBJson

使用 Amazon Glue DynamoDB 导出连接器进行 DynamoDB 导出时会生成具有特定嵌套结构的 JSON 文件。有关更多信息,请参阅数据对象simplifyDDBJson简化此数据类型的 DynamicFrame 中的嵌套列,并返回新的简化 DynamicFrame。如果 List 类型中包含多种类型或 Map 类型,则不会简化 List 中的元素。此方法仅支持 DynamoDB 导出 JSON 格式的数据。考虑使用 unnest 来对其他类型的数据进行类似更改。

def simplifyDDBJson() : DynamicFrame

此方法不使用任何参数。

示例输入

考虑 DynamoDB 导出生成的以下架构:

root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean

示例代码

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContextimport scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "ddbTableARN", "dynamodb.s3.bucket" -> "exportBucketLocation", "dynamodb.s3.prefix" -> "exportBucketPrefix", "dynamodb.s3.bucketOwner" -> "exportBucketAccountID", )) ).getDynamicFrame() val simplified = dynamicFrame.simplifyDDBJson() simplified.printSchema() Job.commit() } }

simplifyDDBJson 转换会将其简化为:

root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null

Def spigot

def spigot( path : String, options : JsonOptions = new JsonOptions("{}"), transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

返回相同记录但写出一部分记录作为副作用的传递转换。

  • path – 将输出写入的 Amazon S3 中的路径,格式为 s3://bucket//path

  • options – 描述取样行为的可选的 JsonOptions 映射。

返回包含与这一个相同的记录的 DynamicFrame

默认情况下,将 100 个任意记录写入通过 path 指定的位置。您可以使用 options 映射自定义此行为。有效键包括:

  • topk – 指定写出的记录的总数。默认值为 100。

  • prob – 指定包含单个记录的概率(小数形式)。默认值为 1。

例如,以下调用将对数据集进行取样,方法是以 20% 的概率选择每个记录并在写入 200 个记录后停止。

{{{ df.spigot("s3://my_bucket/my_path", JsonOptions(Map("topk" -> 200, "prob" -> 0.2))) }}}

Def splitFields

def splitFields( paths : Seq[String], transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : Seq[DynamicFrame]
  • paths — 包括在第一个 DynamicFrame 中的路径。

返回两个 DynamicFrame 的序列。第一个 DynamicFrame 包含指定路径,第二个包含所有其他列。

示例

此示例采用从 Amazon Glue 数据目录中的 legislators 数据库中的 persons 表创建的 DynamicFrame,并将 DynamicFrame 拆分为两个,其中指定的字段进入第一个 DynamicFrame,其余字段进入第二个 DynamicFrame。然后,示例将从结果中选择第一个 DynamicFrame。

val InputFrame = glueContext.getCatalogSource(database="legislators", tableName="persons", transformationContext="InputFrame").getDynamicFrame() val SplitField_collection = InputFrame.splitFields(paths=Seq("family_name", "name", "links.note", "links.url", "gender", "image", "identifiers.scheme", "identifiers.identifier", "other_names.lang", "other_names.note", "other_names.name"), transformationContext="SplitField_collection") val ResultFrame = SplitField_collection(0)

Def splitRows

def splitRows( paths : Seq[String], values : Seq[Any], operators : Seq[String], transformationContext : String, callSite : CallSite, stageThreshold : Long, totalThreshold : Long ) : Seq[DynamicFrame]

基于将列与常量比较的谓词来拆分行。

  • paths – 用于比较的列。

  • values – 用于比较的常量值。

  • operators – 用于比较的运算符。

返回两个 DynamicFrame 的序列。第一个包含其谓词为 true 的行,第二个包含其谓词为 false 的行。

使用三个序列指定谓词:“paths”包含(可能嵌套的)列名称,“values”包含要与其进行比较的常量值,“operators”包含用于比较的运算符。所有这三个序列必须长度相同:第 n 个运算符将用于将第 n 个列与第 n 个值进行比较。

每个运算符必须是以下运算符之一:“!=”、“=”、“<=”、“<”、“>=”或“>”。

例如,以下调用将拆分 DynamicFrame,以便第一个输出帧将包含美国 65 岁以上的人员记录,第二个将包含所有其他记录。

{{{ df.splitRows(Seq("age", "address.country"), Seq(65, "USA"), Seq("&gt;=", "=")) }}}

Def stageErrorsCount

def stageErrorsCount

返回在计算此 DynamicFrame 时创建的错误记录数。这不包括作为输入传递给此 DynamicFrame 的以前操作中的错误。

Def toDF

def toDF( specs : Seq[ResolveSpec] = Seq.empty[ResolveSpec] ) : DataFrame

将此 DynamicFrame 转换为具有相同架构和记录的 Apache Spark SQL DataFrame

注意

由于 DataFrame 不支持 ChoiceType,因此此方法自动将 ChoiceType 列转换为 StructType。有关更多信息和用于解析选择的选项,请参阅 resolveChoice

Def unbox

def unbox( path : String, format : String, optionString : String = "{}", transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • path – 要分析的列。必须是字符串或二进制。

  • format – 用于分析的格式。

  • optionString – 传递到格式的选项,如 CSV 分隔符。

根据指定的格式分析嵌入式字符串或二进制列。已分析的列嵌套在具有原始列名称的结构下。

例如,假设您具有包含一个嵌入式 JSON 列的 CSV 文件。

name, age, address Sally, 36, {"state": "NE", "city": "Omaha"} ...

在初始分析后,您将获取具有以下架构的 DynamicFrame

{{{ root |-- name: string |-- age: int |-- address: string }}}

您可以对地址列调用 unbox 来分析特定组件。

{{{ df.unbox("address", "json") }}}

这为我们提供了具有以下架构的 DynamicFrame

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}

Def unnest

def unnest( transformationContext : String = "", callSite : CallSite = CallSite("Not Provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

返回平展了所有嵌套结构的新 DynamicFrame。构造名称时使用“.”(句点)字符。

例如,假设您有一个包含以下架构的 DynamicFrame

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}

以下调用取消嵌套 address 结构。

{{{ df.unnest() }}}

生成的架构如下所示。

{{{ root |-- name: string |-- age: int |-- address.state: string |-- address.city: string }}}

此方法还取消嵌套数组中的嵌套结构。但由于历史原因,此类字段的名称前附加了括起来的数组的名称和“.val”。

Def unnestDDBJson

unnestDDBJson(transformationContext : String = "", callSite : CallSite = CallSite("Not Provided"), stageThreshold : Long = 0, totalThreshold : Long = 0): DynamicFrame

解除 DynamicFrame 中的嵌套列,具体位于 DynamoDB JSON 结构中,并返回一个新的非嵌套 DynamicFrame。属于结构类型数组的列将不会被解除嵌套。请注意,这是一种特定类型的非嵌套转换,其行为与常规 unnest 转换不同,并且要求数据已存在于 DynamoDB JSON 结构中。有关更多信息,请参阅 DynamoDB JSON

例如,使用 DynamoDB JSON 结构读取导出的架构可能如下所示:

root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null

unnestDDBJson() 转换会将此转换为:

root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null

以下代码示例演示了如何使用 Amazon Glue DynamoDB 导出连接器、调用 DynamoDB JSON 解除嵌套命令,以及打印分区数量:

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() val unnested = dynamicFrame.unnestDDBJson() print(unnested.getNumPartitions()) Job.commit() } }

Def withFrameSchema

def withFrameSchema( getSchema : () => Schema ) : DynamicFrame
  • getSchema – 返回要使用的架构的函数。指定为零参数函数来推迟可能昂贵的计算。

将此 DynamicFrame 的架构设置为指定值。这主要在内部使用以避免成本高昂的架构重新计算。传入的架构必须包含数据中存在的所有列。

Def withName

def withName( name : String ) : DynamicFrame
  • name – 要使用的新名称。

返回使用新名称的此 DynamicFrame 的副本。

Def withTransformationContext

def withTransformationContext( ctx : String ) : DynamicFrame

返回具有指定转换上下文的此 DynamicFrame 的副本。