Amazon GlueScala 课 DynamicFrame - Amazon Glue
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Amazon GlueScala 课 DynamicFrame

程序包:com.amazonaws.services.glue

class DynamicFrame extends Serializable with Logging ( val glueContext : GlueContext, _records : RDD[DynamicRecord], val name : String = s"", val transformationContext : String = DynamicFrame.UNDEFINED, callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0, prevErrors : => Long = 0, errorExpr : => Unit = {} )

DynamicFrame 是自描述的 DynamicRecord 对象的分布式集合。

DynamicFrame 旨在为 ETL(提取、转换和加载)操作提供灵活的数据模型。它们不需要创建架构,可用于读取和转换具有杂乱或不一致的值和类型的数据。可以按需为需要架构的那些操作计算架构。

DynamicFrame 为数据清理和 ETL 提供了一系列转换。它们还支持与 SparkSQL 之间的转换 DataFrames ,以与现有代码和提供的许多分析操作集成。 DataFrames

跨构造 DynamicFrame 的许多 Amazon Glue 转换共享以下参数:

  • transformationContext — 此 DynamicFrame 的标识符。transformationContext 用作跨运行保存的作业书签状态的密钥。

  • callSite – 为错误报告提供上下文信息。在从 Python 调用时,会自动设置这些值。

  • stageThreshold – 在引发异常之前允许的来自此 DynamicFrame 计算的最大错误记录数,不包括以前的 DynamicFrame 中存在的记录。

  • totalThreshold – 引发异常之前的最大错误记录总数,包括以前的帧中的记录。

Val errorsCount

val errorsCount

DynamicFrame 中的错误记录数。这包括来自以前的操作的错误。

Def applyMapping

def applyMapping( mappings : Seq[Product4[String, String, String, String]], caseSensitive : Boolean = true, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • mappings – 用于构造新 DynamicFrame 的映射序列。

  • caseSensitive – 是否将源列视为区分大小写。在与不区分大小写的存储(如 Amazon Glue 数据目录)集成时,将此项设置为 false 可能很有帮助。

基于一系列的映射选择、投影和转换列。

每个映射由源列和类型以及目标列和类型构成。映射可指定为四元组 (source_pathsource_type target_pathtarget_type) 或包含相同信息的 MappingSpec 对象。

除了将映射用于简单的投影和转换外,还可以通过使用“.”(句点)分隔路径的组件来用于嵌套或取消嵌套字段。

例如,假设您有一个包含以下架构的 DynamicFrame

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- zip: int }}}

您可以进行以下调用来取消嵌套 statezip 字段。

{{{ df.applyMapping( Seq(("name", "string", "name", "string"), ("age", "int", "age", "int"), ("address.state", "string", "state", "string"), ("address.zip", "int", "zip", "int"))) }}}

生成的架构如下所示。

{{{ root |-- name: string |-- age: int |-- state: string |-- zip: int }}}

您还可以使用 applyMapping 来重新嵌套列。例如,以下代码反转以前的转换,并在目标中创建一个名为 address 的结构。

{{{ df.applyMapping( Seq(("name", "string", "name", "string"), ("age", "int", "age", "int"), ("state", "string", "address.state", "string"), ("zip", "int", "address.zip", "int"))) }}}

包含“.”(句点)字符的字段名称可以使用反引号 (``) 括起来。

注意

目前,您不能使用 applyMapping 方法映射嵌套在数组下的列。

Def assertErrorThreshold

def assertErrorThreshold : Unit

强制计算并验证错误记录数是否低于 stageThresholdtotalThreshold 的操作。如果任一条件失败,则引发异常。

Def count

lazy def count

返回 DynamicFrame 中的元素数量。

Def dropField

def dropField( path : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

返回已删除指定列的新 DynamicFrame

Def dropFields

def dropFields( fieldNames : Seq[String], // The column names to drop. transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

返回已删除指定列的新 DynamicFrame

您可以使用此方法删除嵌套列(包括数组中的列),但不能用于删除特定数组元素。

Def dropNulls

def dropNulls( transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 )

返回已删除所有空列的新 DynamicFrame

注意

这只删除类型为 NullType 的列。不删除或修改其他列中的单个空值。

Def Fram errorsAsDynamic e

def errorsAsDynamicFrame

返回包含此 DynamicFrame 中的错误记录的新 DynamicFrame

Def filter

def filter( f : DynamicRecord => Boolean, errorMsg : String = "", transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

构造只包含函数“f”为其返回 true 的那些记录的新 DynamicFrame。筛选器函数“f”不应转变输入记录。

Def getName

def getName : String

返回此 DynamicFrame 的名称。

Def getNumPartitions

def getNumPartitions

返回 DynamicFrame 中的分区数量。

Def 已计算 getSchemaIf

def getSchemaIfComputed : Option[Schema]

如果架构已经过计算,则返回该架构。如果架构尚未经过计算,则不扫描数据。

Def isSchemaComputed

def isSchemaComputed : Boolean

如果已为此 DynamicFrame 计算架构,则返回 true;否则返回 false。如果此方法返回 false,则调用 schema 方法将需要再次扫描此 DynamicFrame 中的记录。

Def javaToPython

def javaToPython : JavaRDD[Array[Byte]]

Def join

def join( keys1 : Seq[String], keys2 : Seq[String], frame2 : DynamicFrame, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • keys1 – 此 DynamicFrame 中用于联接的列。

  • keys2frame2 中用于联接的列。必须与 keys1 的长度相同。

  • frame2 – 要联接的 DynamicFrame

返回使用指定的键对 frame2 执行 equijoin 的结果。

Def map

def map( f : DynamicRecord => DynamicRecord, errorMsg : String = "", transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

返回通过对此 DynamicFrame 中的每个记录应用指定函数“f”构造的新 DynamicFrame

此方法先复制每个记录,然后再应用指定函数,因此可以安全地转变记录。如果映射函数在给定记录上引发异常,则该记录将标记为错误,并且堆栈跟踪将另存为错误记录中的一个列。

Def mergeDynamicFrames

def mergeDynamicFrames( stageDynamicFrame: DynamicFrame, primaryKeys: Seq[String], transformationContext: String = "", options: JsonOptions = JsonOptions.empty, callSite: CallSite = CallSite("Not provided"), stageThreshold: Long = 0, totalThreshold: Long = 0): DynamicFrame
  • stageDynamicFrame – 要合并的暂存 DynamicFrame

  • primaryKeys – 要匹配源和暂存 DynamicFrame 中的记录的主键字段列表。

  • transformationContext – 用于检索有关当前转换的元数据的唯一字符串(可选)。

  • options – 为此转换提供其他信息的 JSON 名称-值对的字符串。

  • callSite – 用于为错误报告提供上下文信息。

  • stageThreshold — 一个 Long。给定转换中处理需要排除的错误的数目。

  • totalThreshold — 一个 Long。此转换中处理需要排除的错误的总数。

基于指定主键的将此 DynamicFrame 与暂存 DynamicFrame 合并以标识记录。不会对重复记录(具有相同主键的记录)去除重复。如果暂存帧中没有匹配的记录,则从源中保留所有记录(包括重复记录)。如果暂存帧具有匹配的记录,则暂存帧中的记录将覆盖 Amazon Glue 中的源中的记录。

在以下情况下,返回的 DynamicFrame 将包含记录 A:

  1. 如果 A 在源帧和暂存帧中都存在,则返回暂存帧中的 A

  2. 如果 A 在源表中,且A.primaryKeys 不在 stagingDynamicFrame 中(这意味着,未在暂存表中更新 A)。

源帧和暂存帧不需要具有相同的架构。

val mergedFrame: DynamicFrame = srcFrame.mergeDynamicFrames(stageFrame, Seq("id1", "id2"))

Def printSchema

def printSchema : Unit

以人类可读的格式将此 DynamicFrame 的架构输出到 stdout

Def recomputeSchema

def recomputeSchema : Schema

强制重新计算架构。这需要扫描数据,但如果数据中不存在当前架构中的一些字段,则可能会“压缩”架构。

返回经过重新计算的架构。

Def relationalize

def relationalize( rootTableName : String, stagingPath : String, options : JsonOptions = JsonOptions.empty, transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : Seq[DynamicFrame]
  • rootTableName – 在输出中用于基本 DynamicFrame 的名称。通过透视数组创建的 DynamicFrame 以此作为前缀开头。

  • stagingPath – 用于写入中间数据的 Amazon Simple Storage Service(Amazon S3)路径。

  • options – 关系化选项和配置。目前未使用。

展平所有嵌套的结构并将数组透视为单独的表。

您可以使用此操作准备深度嵌套的数据以提取到关系数据库中。使用与 Unnest 转换相同的方式展平嵌套结构。此外,将数组透视为单独的表,其中每个数组元素成为一行。例如,假设您有一个包含以下数据的 DynamicFrame

{"name": "Nancy", "age": 47, "friends": ["Fred", "Lakshmi"]} {"name": "Stephanie", "age": 28, "friends": ["Yao", "Phil", "Alvin"]} {"name": "Nathan", "age": 54, "friends": ["Nicolai", "Karen"]}

运行以下代码。

{{{ df.relationalize("people", "s3:/my_bucket/my_path", JsonOptions.empty) }}}

这会生成两个表。第一个表名为“people”,其中包含以下内容。

{{{ {"name": "Nancy", "age": 47, "friends": 1} {"name": "Stephanie", "age": 28, "friends": 2} {"name": "Nathan", "age": 54, "friends": 3) }}}

在这里,friends 数组已被替换为自动生成的联接键。创建了名为 people.friends 的单独表,其中包含以下内容。

{{{ {"id": 1, "index": 0, "val": "Fred"} {"id": 1, "index": 1, "val": "Lakshmi"} {"id": 2, "index": 0, "val": "Yao"} {"id": 2, "index": 1, "val": "Phil"} {"id": 2, "index": 2, "val": "Alvin"} {"id": 3, "index": 0, "val": "Nicolai"} {"id": 3, "index": 1, "val": "Karen"} }}}

在此表中,“id”是标识数组元素来自哪个记录的联接键,“index”是指在原始数组中的位置,“val”是实际的数组条目。

relationalize 方法返回通过以递归方式对所有数组应用此过程创建的 DynamicFrame 序列。

注意

Amazon Glue 库自动为新表生成联接键。为确保联接键在作业运行中是唯一的,您必须启用作业书签。

Def renameField

def renameField( oldName : String, newName : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • oldName – 列的原始名称。

  • newName – 列的新名称。

返回新 DynamicFrame,其中的指定字段进行了重命名。

您可以使用此方法重命名嵌套字段。例如,以下代码在 address 结构中将 state 重命名为 state_code

{{{ df.renameField("address.state", "address.state_code") }}}

Def repartition

def repartition( numPartitions : Int, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

返回具有 numPartitions 分区的新 DynamicFrame

Def resolveChoice

def resolveChoice( specs : Seq[Product2[String, String]] = Seq.empty[ResolveSpec], choiceOption : Option[ChoiceOption] = None, database : Option[String] = None, tableName : Option[String] = None, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • choiceOption – 应用于 specs 序列中未列出的所有 ChoiceType 列的操作。

  • database – 与 match_catalog 操作一起使用的数据目录数据库。

  • tableName – 与 match_catalog 操作一起使用的数据目录表。

通过将一个或多个 ChoiceType 替换为更具体的类型来返回新 DynamicFrame

可通过两种方式使用 resolveChoice。第一种是指定一系列特定列以及如何解析它们。这些指定为由 (列, 操作) 对组成的元组。

可能的操作如下:

  • cast:type – 尝试将所有值转换为指定的类型。

  • make_cols – 将每个不同的类型转换为名为 columnName_type 的列。

  • make_struct – 将列转换为具有每个不同类型的键的结构。

  • project:type – 仅保留指定类型的值。

resolveChoice 的另一种模式是为所有 ChoiceType 指定单个解析方法。这可以在执行前不知道 ChoiceType 的完整列表的情况下使用。除了上面列出的操作外,此模式还支持以下操作:

  • match_catalogChoiceType – 尝试将每个 转换为指定目录表中的对应类型。

示例:

通过转换为 int 来解析 user.id 列,并使 address 字段仅保留结构。

{{{ df.resolveChoice(specs = Seq(("user.id", "cast:int"), ("address", "project:struct"))) }}}

通过将每个选择转换为单独的列来解析所有 ChoiceType

{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("make_cols"))) }}}

通过转换为指定目录表中的类型来解析所有 ChoiceType

{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("match_catalog")), database = Some("my_database"), tableName = Some("my_table")) }}}

Def schema

def schema : Schema

返回此 DynamicFrame 的架构。

保证返回的架构包含此 DynamicFrame 中的记录中存在的每个字段。但在少数情况下,它可能还包含其他字段。您可以使用 Unnest 方法基于此 DynamicFrame 中的记录来“压缩”架构。

Def selectField

def selectField( fieldName : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

返回作为 DynamicFrame 的单个字段。

Def selectFields

def selectFields( paths : Seq[String], transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • paths – 要选择的列名称序列。

返回包含指定列的新 DynamicFrame

注意

您只能使用 selectFields 方法选择顶级列。您可以使用 applyMapping 方法选择嵌套列。

Def show

def show( numRows : Int = 20 ) : Unit
  • numRows – 要输出的行数。

以 JSON 格式输出此 DynamicFrame 中的行。

Def simplifyddbJSON

使用 DynamoDB 导出连接器导出的 Amazon Glue DynamoDB 会生成特定嵌套结构的 JSON 文件。有关更多信息,请参阅数据对象simplifyDDBJson简化此类数据中的嵌套列,并返回一个新的简化列 DynamicFrame。 DynamicFrame 如果 List 类型中包含多个类型或一个 Map 类型,则不会简化列表中的元素。此方法仅支持 DynamoDB 导出 JSON 格式的数据。考虑unnest对其他类型的数据进行类似的更改。

def simplifyDDBJson() : DynamicFrame

此方法不接受任何参数。

示例输入

考虑 DynamoDB 导出生成的以下架构:

root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean

示例代码

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContextimport scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "ddbTableARN", "dynamodb.s3.bucket" -> "exportBucketLocation", "dynamodb.s3.prefix" -> "exportBucketPrefix", "dynamodb.s3.bucketOwner" -> "exportBucketAccountID", )) ).getDynamicFrame() val simplified = dynamicFrame.simplifyDDBJson() simplified.printSchema() Job.commit() } }

simplifyDDBJson 转换会将其简化为:

root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null

Def spigot

def spigot( path : String, options : JsonOptions = new JsonOptions("{}"), transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

返回相同记录但写出一部分记录作为副作用的传递转换。

  • path – 将输出写入的 Amazon S3 中的路径,格式为 s3://bucket//path

  • options – 描述取样行为的可选的 JsonOptions 映射。

返回包含与这一个相同的记录的 DynamicFrame

默认情况下,将 100 个任意记录写入通过 path 指定的位置。您可以使用 options 映射自定义此行为。有效键包括:

  • topk – 指定写出的记录的总数。默认值为 100。

  • prob – 指定包含单个记录的概率(小数形式)。默认值为 1。

例如,以下调用将对数据集进行取样,方法是以 20% 的概率选择每个记录并在写入 200 个记录后停止。

{{{ df.spigot("s3://my_bucket/my_path", JsonOptions(Map("topk" -> 200, "prob" -> 0.2))) }}}

Def splitFields

def splitFields( paths : Seq[String], transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : Seq[DynamicFrame]
  • paths — 包括在第一个 DynamicFrame 中的路径。

返回两个 DynamicFrame 的序列。第一个 DynamicFrame 包含指定路径,第二个包含所有其他列。

示例

此示例取自 Amazon Glue Data Catalog 中legislators数据库中的persons表 DynamicFrame 创建的,并将它们分 DynamicFrame 成两部分,指定的字段进入第一个 DynamicFrame 字段,其余字段进入第二个 DynamicFrame字段。然后,该示例 DynamicFrame 从结果中选择第一个。

val InputFrame = glueContext.getCatalogSource(database="legislators", tableName="persons", transformationContext="InputFrame").getDynamicFrame() val SplitField_collection = InputFrame.splitFields(paths=Seq("family_name", "name", "links.note", "links.url", "gender", "image", "identifiers.scheme", "identifiers.identifier", "other_names.lang", "other_names.note", "other_names.name"), transformationContext="SplitField_collection") val ResultFrame = SplitField_collection(0)

Def splitRows

def splitRows( paths : Seq[String], values : Seq[Any], operators : Seq[String], transformationContext : String, callSite : CallSite, stageThreshold : Long, totalThreshold : Long ) : Seq[DynamicFrame]

基于将列与常量比较的谓词来拆分行。

  • paths – 用于比较的列。

  • values – 用于比较的常量值。

  • operators – 用于比较的运算符。

返回两个 DynamicFrame 的序列。第一个包含其谓词为 true 的行,第二个包含其谓词为 false 的行。

使用三个序列指定谓词:“paths”包含(可能嵌套的)列名称,“values”包含要与其进行比较的常量值,“operators”包含用于比较的运算符。所有这三个序列必须长度相同:第 n 个运算符将用于将第 n 个列与第 n 个值进行比较。

每个运算符必须是以下运算符之一:“!=”、“=”、“<=”、“<”、“>=”或“>”。

例如,以下调用将拆分 DynamicFrame,以便第一个输出帧将包含美国 65 岁以上的人员记录,第二个将包含所有其他记录。

{{{ df.splitRows(Seq("age", "address.country"), Seq(65, "USA"), Seq("&gt;=", "=")) }}}

Def stageErrorsCount

def stageErrorsCount

返回在计算此 DynamicFrame 时创建的错误记录数。这不包括作为输入传递给此 DynamicFrame 的以前操作中的错误。

Def toDF

def toDF( specs : Seq[ResolveSpec] = Seq.empty[ResolveSpec] ) : DataFrame

将此 DynamicFrame 转换为具有相同架构和记录的 Apache Spark SQL DataFrame

注意

由于 DataFrame 不支持 ChoiceType,因此此方法自动将 ChoiceType 列转换为 StructType。有关更多信息和用于解析选择的选项,请参阅 resolveChoice

Def unbox

def unbox( path : String, format : String, optionString : String = "{}", transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • path – 要分析的列。必须是字符串或二进制。

  • format – 用于分析的格式。

  • optionString – 传递到格式的选项,如 CSV 分隔符。

根据指定的格式分析嵌入式字符串或二进制列。已分析的列嵌套在具有原始列名称的结构下。

例如,假设您具有包含一个嵌入式 JSON 列的 CSV 文件。

name, age, address Sally, 36, {"state": "NE", "city": "Omaha"} ...

在初始分析后,您将获取具有以下架构的 DynamicFrame

{{{ root |-- name: string |-- age: int |-- address: string }}}

您可以对地址列调用 unbox 来分析特定组件。

{{{ df.unbox("address", "json") }}}

这为我们提供了具有以下架构的 DynamicFrame

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}

Def unnest

def unnest( transformationContext : String = "", callSite : CallSite = CallSite("Not Provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

返回平展了所有嵌套结构的新 DynamicFrame。构造名称时使用“.”(句点)字符。

例如,假设您有一个包含以下架构的 DynamicFrame

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}

以下调用取消嵌套 address 结构。

{{{ df.unnest() }}}

生成的架构如下所示。

{{{ root |-- name: string |-- age: int |-- address.state: string |-- address.city: string }}}

此方法还取消嵌套数组中的嵌套结构。但由于历史原因,此类字段的名称前附加了括起来的数组的名称和“.val”。

Def unnestDDBJson

unnestDDBJson(transformationContext : String = "", callSite : CallSite = CallSite("Not Provided"), stageThreshold : Long = 0, totalThreshold : Long = 0): DynamicFrame

解除 DynamicFrame 中的嵌套列,具体位于 DynamoDB JSON 结构中,并返回一个新的非嵌套 DynamicFrame。属于结构类型数组的列将不会被解除嵌套。请注意,这是一种特定类型的非嵌套转换,其行为与常规 unnest 转换不同,并且要求数据已存在于 DynamoDB JSON 结构中。有关更多信息,请参阅 DynamoDB JSON

例如,使用 DynamoDB JSON 结构读取导出的架构可能如下所示:

root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null

unnestDDBJson() 转换会将此转换为:

root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null

以下代码示例演示了如何使用 Amazon Glue DynamoDB 导出连接器、调用 DynamoDB JSON 解除嵌套命令,以及打印分区数量:

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() val unnested = dynamicFrame.unnestDDBJson() print(unnested.getNumPartitions()) Job.commit() } }

Def withFrameSchema

def withFrameSchema( getSchema : () => Schema ) : DynamicFrame
  • getSchema – 返回要使用的架构的函数。指定为零参数函数来推迟可能昂贵的计算。

将此 DynamicFrame 的架构设置为指定值。这主要在内部使用以避免成本高昂的架构重新计算。传入的架构必须包含数据中存在的所有列。

Def withName

def withName( name : String ) : DynamicFrame
  • name – 要使用的新名称。

返回使用新名称的此 DynamicFrame 的副本。

Def withTransformationContext

def withTransformationContext( ctx : String ) : DynamicFrame

返回具有指定转换上下文的此 DynamicFrame 的副本。