DynamoDB 连接 - Amazon Glue
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

DynamoDB 连接

您可以使用 Amazon Glue for Spark 在 Amazon Glue 中读取和写入 DynamoDB 中的表。您可以使用附加到您的 Amazon Glue 任务的 IAM 权限连接到 DynamoDB。Amazon Glue 支持将数据写入其他 Amazon 账户的 DynamoDB 表。有关更多信息,请参阅 跨账户、跨区域访问 DynamoDB 表

除了 Amazon Glue DynamoDB ETL 连接器外,您还可以使用 DynamoDB 导出连接器从 DynamoDB 读取数据,该连接器调用 DynamoDB ExportTableToPointInTime 请求并将其存储在您提供的 AmazonS3 位置,格式为 DynamoDB JSON。Amazon Glue然后,通过从 AmazonS3 导出位置读取数据来创建 DynamicFrame 对象。

DynamoDB 写入器在 Amazon Glue 版本 1.0 或更高版本中可用。Amazon Glue DynamoDB 导出连接器在 Amazon Glue 版本 2.0 或更高版本中可用。

有关 DynamoDB 的更多信息,请参阅 Amazon DynamoDB 文档。

注意

DynamoDB ETL 读取器不支持筛选条件或下推谓词。

配置 DynamoDB 连接

要从 Amazon Glue 连接到 DynamoDB,请授予与您的 Amazon Glue 任务关联的 IAM 角色与 DynamoDB 交互的权限。有关从 DynamoDB 读取或写入所需权限的更多信息,请参阅 IAM 文档中的 DynamoDB 的操作、资源和条件键

在以下情况下,您可能需要额外的配置:

  • 使用 DynamoDB 导出连接器时,您需要配置 IAM,以便您的任务可以请求 DynamoDB 表导出。此外,您还需要为导出标识一个 AmazonS3 桶,并在 IAM 中提供适当的权限,以便 DynamoDB 向其写入,以及 Amazon Glue 任务从中读取。有关更多信息,请参阅在 DynamoDB 中请求表导出

  • 如果您的 Amazon Glue 任务有特定的 Amazon VPC 连接要求,请使用 NETWORK Amazon Glue 连接类型提供网络选项。由于对 DynamoDB 的访问由 IAM 授权,因此无需使用 Amazon Glue DynamoDB 连接类型。

从 DynamoDB 读取和写入 DynamoDB

以下代码示例演示了如何从 DynamoDB 表中读取(通过 ETL 连接器)以及向其写入数据。它们演示了如何从一个表读取数据并将数据写入其他表。

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={"dynamodb.input.tableName": test_source, "dynamodb.throughput.read.percent": "1.0", "dynamodb.splits": "100" } ) print(dyf.getNumPartitions()) glue_context.write_dynamic_frame_from_options( frame=dyf, connection_type="dynamodb", connection_options={"dynamodb.output.tableName": test_sink, "dynamodb.throughput.write.percent": "1.0" } ) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.input.tableName" -> test_source, "dynamodb.throughput.read.percent" -> "1.0", "dynamodb.splits" -> "100" )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) val dynamoDbSink: DynamoDbDataSink = glueContext.getSinkWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.output.tableName" -> test_sink, "dynamodb.throughput.write.percent" -> "1.0" )) ).asInstanceOf[DynamoDbDataSink] dynamoDbSink.writeDynamicFrame(dynamicFrame) Job.commit() } }

使用 DynamoDB 导出连接器

在 DynamoDB 表大小超过 80 GB 时,导出连接器的性能优于 ETL 连接器。此外,鉴于导出请求在 Amazon Glue 任务中的 Spark 进程之外执行,您可以启用 Amazon Glue 任务的弹性伸缩以节省导出请求期间的 DPU 使用量。借助导出连接器,您也无需为 Spark 执行程序并行度或 DynamoDB 吞吐量读取百分比配置拆分数。

注意

DynamoDB 对调用 ExportTableToPointInTime 请求有特定的要求。有关更多信息,请参阅在 DynamoDB 中请求表导出。例如,表需要启用时间点恢复 (PITR) 才能使用此连接器。DynamoDB 连接器还支持在将 DynamoDB 导出到 Amazon S3 时进行 Amazon KMS 加密。在 Amazon Glue 任务配置中指定安全性配置,将为 DynamoDB 导出启用 Amazon KMS 加密。KMS 密钥必须与 Simple Storage Service (Amazon S3) 存储桶位于同一区域。

请注意,您需要支付 DynamoDB 导出的额外费用和 Simple Storage Service (Amazon S3) 存储成本。任务运行完成后,Simple Storage Service (Amazon S3) 中的导出数据仍然存在,因此您无需其他 DynamoDB 导出即可重复使用这些数据。使用此连接器的一个要求是该表启用了时间点恢复 (PITR)。

DynamoDB ETL 连接器或导出连接器不支持在 DynamoDB 源应用筛选条件或下推谓词。

以下代码示例演示如何进行读取(通过导出连接器)以及打印分区数量。

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": test_source, "dynamodb.s3.bucket": bucket_name, "dynamodb.s3.prefix": bucket_prefix, "dynamodb.s3.bucketOwner": account_id_of_bucket, } ) print(dyf.getNumPartitions()) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> test_source, "dynamodb.s3.bucket" -> bucket_name, "dynamodb.s3.prefix" -> bucket_prefix, "dynamodb.s3.bucketOwner" -> account_id_of_bucket, )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) Job.commit() } }

以下示例演示如何从具有 dynamodb 分类的 Amazon Glue 数据目录表进行读取(通过导出连接器)以及打印分区数量:

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dynamicFrame = glue_context.create_dynamic_frame.from_catalog( database=catalog_database, table_name=catalog_table_name, additional_options={ "dynamodb.export": "ddb", "dynamodb.s3.bucket": s3_bucket, "dynamodb.s3.prefix": s3_bucket_prefix } ) print(dynamicFrame.getNumPartitions()) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getCatalogSource( database = catalog_database, tableName = catalog_table_name, additionalOptions = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.s3.bucket" -> s3_bucket, "dynamodb.s3.prefix" -> s3_bucket_prefix )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) )

简化 DynamoDB 导出 JSON 的使用

使用 Amazon Glue DynamoDB 导出连接器进行 DynamoDB 导出时会生成具有特定嵌套结构的 JSON 文件。有关更多信息,请参阅数据对象。Amazon Glue 提供 DynamicFrame 转换,可以将这些结构解除嵌套,成为易于下游应用程序使用的形式。

您可以通过以下两种方式之一调用该转换。在调用要从 DynamoDB 读取的方法时,您可以使用值 "true" 来设置连接选项 "dynamodb.simplifyDDBJson"。您也可以将转换作为 Amazon Glue 库中独立提供的方法进行调用。

考虑 DynamoDB 导出生成的以下架构:

root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean

simplifyDDBJson 转换会将其简化为:

root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null
注意

simplifyDDBJson 在 Amazon Glue 3.0 及更高版本中可用。unnestDDBJson 转换还可用于简化 DynamoDB 导出 JSON。我们鼓励用户从 unnestDDBJson 转换到 simplifyDDBJson

在 DynamoDB 操作中配置并行性

为了提高性能,可以调整可用于 DynamoDB 连接器的某些参数。在调整并行性参数时,您的目标是最大限度地使用预配置的 Amazon Glue 工作线程。然后,如果您需要更高的性能,我们建议您增加 DPU 数量以横向扩展任务。

使用 ETL 连接器时,可以使用 dynamodb.splits 参数更改 DynamoDB 读取操作中的并行性。使用导出连接器进行读取时,不需要为 Spark 执行程序并行度配置拆分数。您可以使用 dynamodb.output.numParallelTasks 更改 DynamoDB 写入操作中的并行度。

使用 DynamoDB ETL 连接器读取

我们建议您根据任务配置中设置的最大工作线程数和以下 numSlots 计算来计算 dynamodb.splits。如果自动扩缩,则在该上限之下,实际可用的工作线程数量可能会发生变化。有关设置最大工作线程数量的更多信息,请参阅 在 Amazon Glue 中添加作业 中的工作线程数 (NumberOfWorkers)。

  • numExecutors = NumberOfWorkers - 1

    就上下文而言,为 Spark 驱动程序保留了一个执行程序;其他执行程序用于处理数据。

  • numSlotsPerExecutor =

    Amazon Glue 3.0 and later versions
    • 4,如果 WorkerTypeG.1X

    • 8,如果 WorkerTypeG.2X

    • 16,如果 WorkerTypeG.4X

    • 32,如果 WorkerTypeG.8X

    Amazon Glue 2.0 and legacy versions
    • 8,如果 WorkerTypeG.1X

    • 16,如果 WorkerTypeG.2X

  • numSlots = numSlotsPerExecutor * numExecutors

我们建议您将 dynamodb.splits 设置为可用插槽数量 numSlots

写入 DynamoDB

dynamodb.output.numParallelTasks 参数用于通过以下计算确定每个 Spark 任务的 WCU:

permittedWcuPerTask = ( TableWCU * dynamodb.throughput.write.percent ) / dynamodb.output.numParallelTasks

如果配置准确地表示写入 DynamoDB 的 Spark 任务的数量,则 DynamoDB 写入器将发挥最佳作用。在某些情况下,您可能需要覆盖默认计算以提高写入性能。如果不指定此参数,则每个 Spark 任务允许的 WCU 将自动通过以下公式计算:

    • numPartitions = dynamicframe.getNumPartitions()

    • numSlots(如本节前面所定义)

    • numParallelTasks = min(numPartitions, numSlots)

  • 示例 1。DPU=10,WorkerType=Standard。输入 DynamicFrame 具有 100 个 RDD 分区。

    • numPartitions = 100

    • numExecutors = (10 - 1) * 2 - 1 = 17

    • numSlots = 4 * 17 = 68

    • numParallelTasks = min(100, 68) = 68

  • 示例 2。DPU=10,WorkerType=Standard。输入 DynamicFrame 具有 20 个 RDD 分区。

    • numPartitions = 20

    • numExecutors = (10 - 1) * 2 - 1 = 17

    • numSlots = 4 * 17 = 68

    • numParallelTasks = min(20, 68) = 20

注意

旧版 Amazon Glue 上的任务和使用标准工作线程的任务需要不同的方法来计算插槽数量。如果您需要对这些任务进行性能调整,我们建议您转换到支持的 Amazon Glue 版本。

DynamoDB 连接选项参考

指定与 Amazon DynamoDB 的连接。

源连接和接收器连接的连接选项不同。

"connectionType": "dynamodb" with the ETL connector as Source

在使用 Amazon Glue DynamoDB ETL 连接器时,请使用以下连接选项并将 "connectionType": "dynamodb" 作为源:

  • "dynamodb.input.tableName":(必需)要从中读取数据的 DynamoDB 表格。

  • "dynamodb.throughput.read.percent":(可选)要使用的读取容量单位 (RCU) 的百分比。默认设置为“0.5”。可接受的值从“0.1”到“1.5”,包含这两个值。

    • 0.5 表示默认读取速率,这意味着 Amazon Glue 将尝试占用表的一半的读取容量。如果增加值超过 0.5,Amazon Glue 将增加请求速率;将值降低到 0.5 以下将降低读取请求速率。(实际读取速率取决于 DynamoDB 表中是否存在统一键分配的等因素。)

    • 当 DynamoDB 表处于按需模式时,Amazon Glue 处理表的读取容量为 40000。要导出大型表,我们建议您将 DynamoDB 表切换为按需模式。

  • "dynamodb.splits":(可选)定义在读取时将此 DynamoDB 表分成多少个部分。默认设置为“1”。可接受的值从“1”到“1,000,000”,包含这两个值。

    1 表示没有并行度。我们强烈建议您使用以下公式指定更大的值以获得更好的性能。有关适当设置值的更多信息,请参阅 在 DynamoDB 操作中配置并行性

  • "dynamodb.sts.roleArn":(可选)用于跨账户访问的 IAM 角色 ARN。此参数适用于 Amazon Glue 1.0 或更高版本。

  • "dynamodb.sts.roleSessionName":(可选)STS 会话名称。默认设置为“glue-dynamodb-read-sts-session”。此参数适用于 Amazon Glue 1.0 或更高版本。

"connectionType": "dynamodb" with the Amazon Glue DynamoDB export connector as Source

在使用 Amazon Glue DynamoDB 导出连接器(仅适用于 Amazon Glue 版本 2.0 以上)时,使用以下连接选项并将 "connectionType": "dynamodb"用作源:

  • "dynamodb.export":(必需)字符串值:

    • 如果设置为 ddb,将启用 Amazon Glue DynamoDB 导出连接器,其中在 Amazon Glue 任务期间将调用新的 ExportTableToPointInTimeRequest。新的导出将通过从 dynamodb.s3.bucketdynamodb.s3.prefix 传递的位置生成。

    • 如果设置为 s3,将启用 Amazon Glue DynamoDB 导出连接器但会跳过创建新的 DynamoDB 导出,而使用 dynamodb.s3.bucketdynamodb.s3.prefix 作为该表以前导出的 Simple Storage Service (Amazon S3) 位置。

  • "dynamodb.tableArn":(必需)要从中读取数据的 DynamoDB 表格。

  • "dynamodb.unnestDDBJson":(可选)默认值:false。有效值:布尔值。如果设置为 true(真),则对导出中存在的 DynamoDB JSON 结构执行解除嵌套转换。同时将 "dynamodb.unnestDDBJson""dynamodb.simplifyDDBJson" 设置为 true 是错误的。在 Amazon Glue 3.0 及更高版本中,我们建议您在简化 DynamoDB Map 类型时使用 "dynamodb.simplifyDDBJson" 以获得更好的行为。有关更多信息,请参阅 简化 DynamoDB 导出 JSON 的使用

  • "dynamodb.simplifyDDBJson":(可选)默认值:false。有效值:布尔值。如果设置为 true,则执行转换以简化导出中存在的 DynamoDB JSON 结构的架构。这与 "dynamodb.unnestDDBJson" 选项的目的相同,但为 DynamoDB 表中的 DynamoDB Map 类型甚至嵌套 Map 类型提供了更好的支持。此选项在 Amazon Glue 3.0 及更高版本中可用。同时将 "dynamodb.unnestDDBJson""dynamodb.simplifyDDBJson" 设置为 true 是错误的。有关更多信息,请参阅 简化 DynamoDB 导出 JSON 的使用

  • "dynamodb.s3.bucket":(可选)指示将会执行 DynamoDB ExportTableToPointInTime 进程的 Amazon S3 存储桶位置。导出的文件格式为 DynamoDB JSON。

    • "dynamodb.s3.prefix":(可选)指示将用于存储 DynamoDB ExportTableToPointInTime 负载的 Amazon S3 存储桶内的 Amazon S3 前缀位置。如果既未指定 dynamodb.s3.prefix,也未指定 dynamodb.s3.bucket,则这些值将默认为 Amazon Glue 任务配置中指定的临时目录位置。有关更多信息,请参阅 Amazon Glue 使用的特殊参数

    • "dynamodb.s3.bucketOwner":指示跨账户 Amazon S3 访问所需的存储桶拥有者。

  • "dynamodb.sts.roleArn":(可选)跨账户访问和/或跨区域访问 DynamoDB 表时将会代入的 IAM 角色 ARN。注意:相同的 IAM 角色 ARN 将用于访问为 ExportTableToPointInTime 请求指定的 Amazon S3 位置。

  • "dynamodb.sts.roleSessionName":(可选)STS 会话名称。默认设置为“glue-dynamodb-read-sts-session”。

  • "dynamodb.exportTime"(可选)有效值:表示 ISO-8601 瞬时的字符串。应进行导出的时间点。

  • "dynamodb.sts.region":(如果使用区域端点进行跨区域调用,则为必填项)托管要读取的 DynamoDB 表的区域。

"connectionType": "dynamodb" with the ETL connector as Sink

"connectionType": "dynamodb" 用作连接器时可使用以下连接选项:

  • "dynamodb.output.tableName":(必需)要写入的 DynamoDB 表。

  • "dynamodb.throughput.write.percent":(可选)要使用的写入容量单位(WCU)的百分比。默认设置为“0.5”。可接受的值从“0.1”到“1.5”,包含这两个值。

    • 0.5 表示默认写入速率,这意味着 Amazon Glue 将尝试占用表的一半的写入容量。如果增加值超过 0.5,Amazon Glue 将增加请求速率;将值降低到 0.5 以下将降低写入请求速率。(实际写入速率取决于 DynamoDB 表中是否存在统一键分配等因素)。

    • 当 DynamoDB 表处于按需模式时,Amazon Glue 处理表的写入容量为 40000。要导入大型表,我们建议您将 DynamoDB 表切换为按需模式。

  • "dynamodb.output.numParallelTasks":(可选)定义同时向 DynamoDB 写入数据的并行任务数。用于计算每个 Spark 任务的允许 WCU。在大多数情况下,Amazon Glue 将为此值计算合理的默认值。有关更多信息,请参阅 在 DynamoDB 操作中配置并行性

  • "dynamodb.output.retry":(可选)定义存在 DynamoDB 中的 ProvisionedThroughputExceededException 时我们执行的重试次数。默认设置为“10”。

  • "dynamodb.sts.roleArn":(可选)用于跨账户访问的 IAM 角色 ARN。

  • "dynamodb.sts.roleSessionName":(可选)STS 会话名称。默认设置为“glue-dynamodb-write-sts-session”。