MongoDB 连接 - Amazon Glue
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

MongoDB 连接

在 Amazon Glue 4.0 及更高版本中,您可以使用 Amazon Glue for Spark 来读取和写入 MongoDB 和 MongoDB Atlas 中的表。您可以通过 Amazon Glue 连接并使用存储在 Amazon Secrets Manager 中的用户名和密码凭证连接到 MongoDB。

有关 MongoDB 的更多信息,请参阅 MongoDB 文档

配置 MongoDB 连接

要从 Amazon Glue 连接到 MongoDB,您需要拥有 MongoDB 凭证 mongodbUsermongodbPass

要从 Amazon Glue 连接到 MongoDB,您可能需要满足一些先决条件:

  • 如果您的 MongoDB 实例位于某个 Amazon VPC 中,请确保您的 Amazon VPC 配置允许您的 Amazon Glue 作业与 MongoDB 实例进行通信,并且无需通过公共互联网路由流量。

    在 Amazon VPC 中,确定或创建将在执行 Amazon Glue 作业时使用的 VPC子网安全组。此外,您的 Amazon VPC 配置需要允许您的 MongoDB 实例与该位置之间的网络流量。根据您的网络布局,这可能需要更改安全组规则、网络 ACL、NAT 网关和对等连接。

然后您可以继续配置 Amazon Glue 以便与 MongoDB 配合使用。

配置 MongoDB 连接:
  1. 您还可以在 Amazon Secrets Manager 中使用您的 MongoDB 凭证创建密钥。要在 Secrets Manager 中创建密钥,请按照 Amazon Secrets Manager 文档中创建 Amazon Secrets Manager 密钥中的教程进行操作。创建密钥后,保留密钥名称 secretName,以供下一步使用。

    • 在选择键/值对时,请使用键 username 和值 mongodbUser 创建一个键值对。

      在选择键/值对时,请使用键 password 和值 mongodbPass 创建一个键值对。

  2. 在 Amazon Glue 控制台中,按照 添加 Amazon Glue 连接 中的步骤创建一个连接。创建连接后,保留连接名为 connectionName,以供未来在 Amazon Glue 中使用。

    • 选择连接类型时,请选择 MongoDBMongoDB Atlas

    • 选择 MongoDB URLMongoDB Atlas URL 时,请提供 MongoDB 实例的主机名。

      MongoDB URL 的格式为 mongodb://mongoHost:mongoPort/mongoDBname

      MongoDB Atlas URL 的格式为 mongodb+srv://mongoHost:mongoPort/mongoDBname

      此外还可以选择提供连接的默认数据库 mongoDBName

    • 如果您选择创建 Secrets Manager 密钥,请选择 Amazon Secrets Manager 凭证类型

      然后在 Amazon密钥中提供 secretName

    • 如果您选择提供用户名和密码,请提供 mongodbUser 和 mongodbPass

  3. 对于下列情况,您可能需要添加额外的配置:

    • 对于通过 Amazon VPC 在 Amazon 云端托管的 MongoDB 实例

      • 您需要向 Amazon Glue 连接提供用于定义 MongoDB 安全凭证的 Amazon VPC 连接信息。创建或更新连接时,请在网络选项中设置 VPC子网安全组

创建 Amazon Glue MongoDB 连接后,您需要执行以下操作,然后才能调用您的连接方法:

  • 如果您选择创建 Secrets Manager 密钥,请向与您的 Amazon Glue 作业关联的 IAM 角色授予读取 secretName 的权限。

  • 在 Amazon Glue 作业配置中,提供 connectionName 作为附加网络连接

要在 Amazon Glue for Spark 中使用 Amazon Glue MongoDB 连接,请在您连接方法调用中提供 connectionName 选项。您还可以按照 在 ETL 作业中使用 MongoDB 连接 中的步骤操作,将该连接与 Amazon Glue Data Catalog 结合使用。

使用 Amazon Glue 连接从 MongoDB 读取

先决条件:

  • 您要读取的 MongoDB 集合。您将需要该集合的标识信息。

    MongoDB 集合由数据库名 mongodbName 和集合名 mongodbCollection 来标识。

  • 为了提供身份验证信息而配置的 Amazon Glue MongoDB 连接。完成上一节“配置 MongoDB 连接”中的步骤,以配置您的身份验证信息。您需要 Amazon Glue 连接的名称 connectionName

例如:

mongodb_read = glueContext.create_dynamic_frame.from_options( connection_type="mongodb", connection_options={ "connectionName": "connectionName", "database": "mongodbName", "collection": "mongodbCollection", "partitioner": "com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner", "partitionerOptions.partitionSizeMB": "10", "partitionerOptions.partitionKey": "_id", "disableUpdateUri": "false", } )

写入 MongoDB 表

此示例会将来自现有 DynamicFrame dynamicFrame 的信息写入 MongoDB。

先决条件:

  • 您要写入的 MongoDB 集合。您将需要该集合的标识信息。

    MongoDB 集合由数据库名 mongodbName 和集合名 mongodbCollection 来标识。

  • 为了提供身份验证信息而配置的 Amazon Glue MongoDB 连接。完成上一节“配置 MongoDB 连接”中的步骤,以配置您的身份验证信息。您需要 Amazon Glue 连接的名称 connectionName

例如:

glueContext.write_dynamic_frame.from_options( frame=dynamicFrame, connection_type="mongodb", connection_options={ "connectionName": "connectionName", "database": "mongodbName", "collection": "mongodbCollection", "disableUpdateUri": "false", "retryWrites": "false", }, )

读取和写入 MongoDB 表

此示例会将来自现有 DynamicFrame dynamicFrame 的信息写入 MongoDB。

先决条件:

  • 您要读取的 MongoDB 集合。您将需要该集合的标识信息。

    您要写入的 MongoDB 集合。您将需要该集合的标识信息。

    MongoDB 集合由数据库名 mongodbName 和集合名 mongodbCollection 来标识。

  • MongoDB 身份验证信息 mongodbUsermongodbPassword

例如:

Python
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext, SparkConf from awsglue.context import GlueContext from awsglue.job import Job import time ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) output_path = "s3://some_bucket/output/" + str(time.time()) + "/" mongo_uri = "mongodb://<mongo-instanced-ip-address>:27017" mongo_ssl_uri = "mongodb://<mongo-instanced-ip-address>:27017" write_uri = "mongodb://<mongo-instanced-ip-address>:27017" read_mongo_options = { "uri": mongo_uri, "database": "mongodbName", "collection": "mongodbCollection", "username": "mongodbUsername", "password": "mongodbPassword", "partitioner": "MongoSamplePartitioner", "partitionerOptions.partitionSizeMB": "10", "partitionerOptions.partitionKey": "_id"} ssl_mongo_options = { "uri": mongo_ssl_uri, "database": "mongodbName", "collection": "mongodbCollection", "ssl": "true", "ssl.domain_match": "false" } write_mongo_options = { "uri": write_uri, "database": "mongodbName", "collection": "mongodbCollection", "username": "mongodbUsername", "password": "mongodbPassword", } # Get DynamicFrame from MongoDB dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="mongodb", connection_options=read_mongo_options) # Write DynamicFrame to MongoDB glueContext.write_dynamic_frame.from_options(dynamicFrame, connection_type="mongodb", connection_options=write_mongo_options) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamicFrame import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { val DEFAULT_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" val WRITE_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" lazy val defaultJsonOption = jsonOptions(DEFAULT_URI) lazy val writeJsonOption = jsonOptions(WRITE_URI) def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Get DynamicFrame from MongoDB val dynamicFrame: DynamicFrame = glueContext.getSource("mongodb", defaultJsonOption).getDynamicFrame() // Write DynamicFrame to MongoDB glueContext.getSink("mongodb", writeJsonOption).writeDynamicFrame(dynamicFrame) Job.commit() } private def jsonOptions(uri: String): JsonOptions = { new JsonOptions( s"""{"uri": "${uri}", |"database":"mongodbName", |"collection":"mongodbCollection", |"username": "mongodbUsername", |"password": "mongodbPassword", |"ssl":"true", |"ssl.domain_match":"false", |"partitioner": "MongoSamplePartitioner", |"partitionerOptions.partitionSizeMB": "10", |"partitionerOptions.partitionKey": "_id"}""".stripMargin) } }

MongoDB 连接选项参考

指定与 MongoDB 的连接。源连接和接收器连接的连接选项不同。

源连接和接收器连接之间会共享以下连接属性:

  • connectionName - 用于读/写。为了向您的连接方法提供身份验证和网络信息而配置的 Amazon Glue MongoDB 连接的名称。如果在按照上一节“配置 MongoDB 连接”所述配置 Amazon Glue 连接时提供 connectionName,则不再需要提供 "uri""username""password" 连接选项。

  • "uri":(必需)要从中读取数据的 MongoDB 主机,格式为 mongodb://<host>:<port>。适用于 Amazon Glue 4.0 之前的 Amazon Glue 版本。

  • "connection.uri":(必需)要从中读取数据的 MongoDB 主机,格式为 mongodb://<host>:<port>。适用于 Amazon Glue 4.0 及更高版本。

  • "username":(必需)MongoDB 用户名。

  • "password":(必需)MongoDB 密码。

  • "database":(必需)要从中读取数据的 MongoDB 数据库。当在您的任务脚本中调用 glue_context.create_dynamic_frame_from_catalog 时,此选项还可以在 additional_options 中传递。

  • "collection":(必需)要从中读取数据的 MongoDB 集合。当在您的任务脚本中调用 glue_context.create_dynamic_frame_from_catalog 时,此选项还可以在 additional_options 中传递。

"connectionType": "mongodb" as Source

"connectionType": "mongodb" 用作源时可使用以下连接选项:

  • "ssl":(可选)如果为 true,则启动 SSL 连接。默认为 false

  • "ssl.domain_match":(可选)如果为 true,且 ssltrue,则执行域匹配检查。默认为 true

  • "batchSize":(可选)每个批处理返回的文档数量,在内部批处理的游标中使用。

  • "partitioner":(可选)从 MongoDB 中读取输入数据的分区器的类名称。该连接器提供以下分区器:

    • MongoDefaultPartitioner(默认)(Amazon Glue 4.0 不支持)

    • MongoSamplePartitioner(需要 MongoDB 3.2 或更高版本)(但 Amazon Glue 4.0 不支持)

    • MongoShardedPartitioner(Amazon Glue 4.0 不支持)

    • MongoSplitVectorPartitioner(Amazon Glue 4.0 不支持)

    • MongoPaginateByCountPartitioner(Amazon Glue 4.0 不支持)

    • MongoPaginateBySizePartitioner(Amazon Glue 4.0 不支持)

    • com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner

    • com.mongodb.spark.sql.connector.read.partitioner.ShardedPartitioner

    • com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner

  • "partitionerOptions":(可选)指定分区器的选项。各个分区器支持的选项如下:

    • MongoSamplePartitioner: partitionKey, partitionSizeMB, samplesPerPartition

    • MongoShardedPartitioner: shardkey

    • MongoSplitVectorPartitioner: partitionKey, partitionSizeMB

    • MongoPaginateByCountPartitioner: partitionKey, numberOfPartitions

    • MongoPaginateBySizePartitioner: partitionKey, partitionSizeMB

    有关这些选项的更多信息,请参阅 MongoDB 文档中的分区器配置

"connectionType": "mongodb" as Sink

"connectionType": "mongodb" 用作连接器时可使用以下连接选项:

  • "ssl":(可选)如果为 true,则启动 SSL 连接。默认为 false

  • "ssl.domain_match":(可选)如果为 true,且 ssltrue,则执行域匹配检查。默认为 true

  • "extendedBsonTypes":(可选)如果为 true,则在 MongoDB 中写入数据时会允许扩展 BSON 类型。默认为 true

  • "replaceDocument":(可选)如果为 true,则在保存包含 _id 字段的数据集时会替换整个文档。如果为 false,则只会更新文档中与数据集中的字段匹配的字段。默认为 true

  • "maxBatchSize":(可选)保存数据时的批量操作的最大批次大小。默认值为 512。

  • "retryWrites":(可选):如果 Amazon Glue 遇到网络错误,则会自动重试某些写入操作一次。