Amazon DocumentDB 连接 - Amazon Glue
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Amazon DocumentDB 连接

您可以使用 Amazon Glue for Spark 读取和写入 Amazon DocumentDB 中的表。您可以通过 Amazon Glue 连接使用存储在 Amazon Secrets Manager 中的凭证连接到 Amazon DocumentDB。

有关 Amazon DocumentDB 的更多信息,请参阅 Amazon DocumentDB 文档

注意

使用 Amazon Glue 连接器时,目前不支持 Amazon DocumentDB 弹性集群。有关弹性集群的更多信息,请参阅 Using Amazon DocumentDB elastic clusters

读取和写入 Amazon DocumentDB 集合

注意

当您创建连接到 Amazon DocumentDB 的 ETL 任务时,对于 Connections 任务属性,您必须指定一个连接对象,用于指定在其中运行 Amazon DocumentDB 的 Virtual Private Cloud(VPC)。对于该连接对象,连接类型必须为 JDBC,且 JDBC URL 必须为 mongo://<DocumentDB_host>:27017

注意

这些代码示例是为 Amazon Glue 3.0 开发的。要迁移到 Amazon Glue 4.0,请参阅 MongoDBuri 参数已更改。

注意

使用 Amazon DocumentDB 时,在某些情况下必须将 retryWrites 设置为 false,例如编写的文档指定 _id 时。有关更多信息,请参阅 Amazon DocumentDB 文档中的 与 MongoDB 之间的功能差异

以下 Python 脚本展示了如何使用连接类型和连接选项来读写 Amazon DocumentDB。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext, SparkConf from awsglue.context import GlueContext from awsglue.job import Job import time ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) output_path = "s3://some_bucket/output/" + str(time.time()) + "/" documentdb_uri = "mongodb://<mongo-instanced-ip-address>:27017" documentdb_write_uri = "mongodb://<mongo-instanced-ip-address>:27017" read_docdb_options = { "uri": documentdb_uri, "database": "test", "collection": "coll", "username": "username", "password": "1234567890", "ssl": "true", "ssl.domain_match": "false", "partitioner": "MongoSamplePartitioner", "partitionerOptions.partitionSizeMB": "10", "partitionerOptions.partitionKey": "_id" } write_documentdb_options = { "retryWrites": "false", "uri": documentdb_write_uri, "database": "test", "collection": "coll", "username": "username", "password": "pwd" } # Get DynamicFrame from DocumentDB dynamic_frame2 = glueContext.create_dynamic_frame.from_options(connection_type="documentdb", connection_options=read_docdb_options) # Write DynamicFrame to MongoDB and DocumentDB glueContext.write_dynamic_frame.from_options(dynamic_frame2, connection_type="documentdb", connection_options=write_documentdb_options) job.commit()

以下 Scala 脚本展示了如何使用连接类型和连接选项来读写 Amazon DocumentDB。

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamicFrame import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { val DOC_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" val DOC_WRITE_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" lazy val documentDBJsonOption = jsonOptions(DOC_URI) lazy val writeDocumentDBJsonOption = jsonOptions(DOC_WRITE_URI) def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Get DynamicFrame from DocumentDB val resultFrame2: DynamicFrame = glueContext.getSource("documentdb", documentDBJsonOption).getDynamicFrame() // Write DynamicFrame to DocumentDB glueContext.getSink("documentdb", writeJsonOption).writeDynamicFrame(resultFrame2) Job.commit() } private def jsonOptions(uri: String): JsonOptions = { new JsonOptions( s"""{"uri": "${uri}", |"database":"test", |"collection":"coll", |"username": "username", |"password": "pwd", |"ssl":"true", |"ssl.domain_match":"false", |"partitioner": "MongoSamplePartitioner", |"partitionerOptions.partitionSizeMB": "10", |"partitionerOptions.partitionKey": "_id"}""".stripMargin) } }

Amazon DocumentDB 连接选项参考

指定与 Amazon DocumentDB (with MongoDB compatibility) 的连接。

源连接和接收器连接的连接选项不同。

"connectionType": "documentdb" as Source

"connectionType": "documentdb" 用作源时可使用以下连接选项:

  • "uri":(必需)要从中读取数据的 Amazon DocumentDB 主机,格式为 mongodb://<host>:<port>

  • "database":(必需)要从中读取数据的 Amazon DocumentDB 数据库。

  • "collection":(必需)要从中读取数据的 Amazon DocumentDB 连接。

  • "username":(必需)Amazon DocumentDB 用户名。

  • "password":(必需)Amazon DocumentDB 密码。

  • "ssl":(如果使用 SSL,则必需)如果您的连接使用 SSL,则必须包含此选项且值为 "true"

  • "ssl.domain_match":(如果使用 SSL,则必需)如果您的连接使用 SSL,则必须包含此选项且值为 "false"

  • "batchSize":(可选)每个批处理返回的文档数量,在内部批处理的游标中使用。

  • "partitioner":(可选)从 Amazon DocumentDB 中读取输入数据的分区器的类名称。该连接器提供以下分区器:

    • MongoDefaultPartitioner(默认)(Amazon Glue 4.0 不支持)

    • MongoSamplePartitioner(Amazon Glue 4.0 不支持)

    • MongoShardedPartitioner

    • MongoSplitVectorPartitioner

    • MongoPaginateByCountPartitioner

    • MongoPaginateBySizePartitioner(Amazon Glue 4.0 不支持)

  • "partitionerOptions":(可选)指定分区器的选项。各个分区器支持的选项如下:

    • MongoSamplePartitioner: partitionKey, partitionSizeMB, samplesPerPartition

    • MongoShardedPartitioner: shardkey

    • MongoSplitVectorPartitionerpartitionKey、partitionSizeMB

    • MongoPaginateByCountPartitioner: partitionKey, numberOfPartitions

    • MongoPaginateBySizePartitionerpartitionKey、partitionSizeMB

    有关这些选项的更多信息,请参阅 MongoDB 文档中的分区器配置

"connectionType": "documentdb" as Sink

"connectionType": "documentdb" 用作连接器时可使用以下连接选项:

  • "uri":(必需)要在其中写入数据的 Amazon DocumentDB 主机,格式为 mongodb://<host>:<port>

  • "database":(必需)要在其中写入数据的 Amazon DocumentDB 数据库。

  • "collection":(必需)要在其中写入数据的 Amazon DocumentDB 连接。

  • "username":(必需)Amazon DocumentDB 用户名。

  • "password":(必需)Amazon DocumentDB 密码。

  • "extendedBsonTypes":(可选)如果为 true,则在 Amazon DocumentDB 中写入数据时会启用扩展 BSON 类型。默认为 true

  • "replaceDocument":(可选)如果为 true,则在保存包含 _id 字段的数据集时会替换整个文档。如果为 false,则只会更新文档中与数据集中的字段匹配的字段。默认为 true

  • "maxBatchSize":(可选)保存数据时的批量操作的最大批次大小。默认值为 512。

  • "retryWrites":(可选):如果 Amazon Glue 遇到网络错误,则会自动重试某些写入操作一次。