

# MongoDB 连接
<a name="aws-glue-programming-etl-connect-mongodb-home"></a>

在 Amazon Glue 4.0 及更高版本中，您可以使用 Amazon Glue for Spark 来读取和写入 MongoDB 和 MongoDB Atlas 中的表。您可以通过 Amazon Glue 连接并使用存储在 Amazon Secrets Manager 中的用户名和密码凭证连接到 MongoDB。

有关 MongoDB 的更多信息，请参阅 [MongoDB 文档](https://www.mongodb.com/docs/)。

## 配置 MongoDB 连接
<a name="aws-glue-programming-etl-connect-mongodb-configure"></a>

要从 Amazon Glue 连接到 MongoDB，您需要拥有 MongoDB 凭证 {{mongodbUser}} 和 {{mongodbPass}}。

要从 Amazon Glue 连接到 MongoDB，您可能需要满足一些先决条件：
+ 如果您的 MongoDB 实例位于某个 Amazon VPC 中，请确保您的 Amazon VPC 配置允许您的 Amazon Glue 作业与 MongoDB 实例进行通信，并且无需通过公共互联网路由流量。

  在 Amazon VPC 中，确定或创建将在执行 Amazon Glue 作业时使用的 **VPC**、**子网**和**安全组**。此外，您的 Amazon VPC 配置需要允许您的 MongoDB 实例与该位置之间的网络流量。根据您的网络布局，这可能需要更改安全组规则、网络 ACL、NAT 网关和对等连接。

然后您可以继续配置 Amazon Glue 以便与 MongoDB 配合使用。

**配置 MongoDB 连接：**

1. 您还可以在 Amazon Secrets Manager 中使用您的 MongoDB 凭证创建密钥。要在 Secrets Manager 中创建密钥，请按照 Amazon Secrets Manager 文档中[创建 Amazon Secrets Manager 密钥](https://docs.amazonaws.cn//secretsmanager/latest/userguide/create_secret.html)中的教程进行操作。创建密钥后，保留密钥名称 {{secretName}}，以供下一步使用。
   + 在选择**键/值对**时，请使用键 `username` 和值 {{mongodbUser}} 创建一个键值对。

     在选择**键/值对**时，请使用键 `password` 和值 {{mongodbPass}} 创建一个键值对。

1. 在 Amazon Glue 控制台中，按照 [添加 Amazon Glue 连接](console-connections.md) 中的步骤创建一个连接。创建连接后，保留连接名为 {{connectionName}}，以供未来在 Amazon Glue 中使用。
   + 选择**连接类型**时，请选择 **MongoDB** 或 **MongoDB Atlas**。
   + 选择 **MongoDB URL** 或 **MongoDB Atlas URL** 时，请提供 MongoDB 实例的主机名。

     MongoDB URL 的格式为 `mongodb://{{mongoHost}}:{{mongoPort}}/{{mongoDBname}}`。

     MongoDB Atlas URL 的格式为 `mongodb+srv://{{mongoHost}}/{{mongoDBname}}`。
   + 如果您选择创建 Secrets Manager 密钥，请选择 Amazon Secrets Manager **凭证类型**。

     然后在 **Amazon密钥**中提供 {{secretName}}。
   + {{如果您选择提供**用户名和密码**，请提供 {{mongodbUser}} 和 mongodbPass}}。

1. 对于下列情况，您可能需要添加额外的配置：
   + 

     对于通过 Amazon VPC 在 Amazon 云端托管的 MongoDB 实例
     + 您需要向 Amazon Glue 连接提供用于定义 MongoDB 安全凭证的 Amazon VPC 连接信息。创建或更新连接时，请在**网络选项**中设置 **VPC**、**子网**和**安全组**。

创建 Amazon Glue MongoDB 连接后，您需要执行以下操作，然后才能调用您的连接方法：
+ 如果您选择创建 Secrets Manager 密钥，请向与您的 Amazon Glue 作业关联的 IAM 角色授予读取 {{secretName}} 的权限。
+ 在 Amazon Glue 作业配置中，提供 {{connectionName}} 作为**附加网络连接**。

要在 Amazon Glue for Spark 中使用 Amazon Glue MongoDB 连接，请在您连接方法调用中提供 `connectionName` 选项。您还可以按照 [在 ETL 作业中使用 MongoDB 连接](integrate-with-mongo-db.md) 中的步骤操作，将该连接与 Amazon Glue Data Catalog 结合使用。

## 使用 Amazon Glue 连接从 MongoDB 读取
<a name="aws-glue-programming-etl-connect-mongodb-read"></a>

**先决条件：**
+ 您要读取的 MongoDB 集合。您将需要该集合的标识信息。

  MongoDB 集合由数据库名 {{mongodbName}} 和集合名 {{mongodbCollection}} 来标识。
+ 为了提供身份验证信息而配置的 Amazon Glue MongoDB 连接。完成上一节“配置 MongoDB 连接”中的步骤**，以配置您的身份验证信息。您需要 Amazon Glue 连接的名称 {{connectionName}}。

例如：

```
mongodb_read = glueContext.create_dynamic_frame.from_options(
    connection_type="mongodb",
    connection_options={
        "connectionName": "{{connectionName}}",
        "database": "{{mongodbName}}",
        "collection": "{{mongodbCollection}}",
        "partitioner": "com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner",
        "partitionerOptions.partitionSizeMB": "10",
        "partitionerOptions.partitionKey": "_id",
        "disableUpdateUri": "false",
    }
)
```

## 写入 MongoDB 表
<a name="aws-glue-programming-etl-connect-mongodb-write"></a>

此示例会将来自现有 DynamicFrame {{dynamicFrame}} 的信息写入 MongoDB。

**先决条件：**
+ 您要写入的 MongoDB 集合。您将需要该集合的标识信息。

  MongoDB 集合由数据库名 {{mongodbName}} 和集合名 {{mongodbCollection}} 来标识。
+ 为了提供身份验证信息而配置的 Amazon Glue MongoDB 连接。完成上一节“配置 MongoDB 连接”中的步骤**，以配置您的身份验证信息。您需要 Amazon Glue 连接的名称 {{connectionName}}。

例如：

```
glueContext.write_dynamic_frame.from_options(
    frame={{dynamicFrame}},
    connection_type="mongodb",
    connection_options={
        "connectionName": "{{connectionName}}",
        "database": "{{mongodbName}}",
        "collection": "{{mongodbCollection}}",
        "disableUpdateUri": "false",
        "retryWrites": "false", 
    },
)
```

## 读取和写入 MongoDB 表
<a name="aws-glue-programming-etl-connect-mongodb-read-write"></a>

此示例会将来自现有 DynamicFrame {{dynamicFrame}} 的信息写入 MongoDB。

**先决条件：**
+ 您要读取的 MongoDB 集合。您将需要该集合的标识信息。

  您要写入的 MongoDB 集合。您将需要该集合的标识信息。

  MongoDB 集合由数据库名 {{mongodbName}} 和集合名 {{mongodbCollection}} 来标识。
+ MongoDB 身份验证信息 {{mongodbUser}} 和 {{mongodbPassword}}。

例如：

------
#### [ Python ]

```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext, SparkConf
from awsglue.context import GlueContext
from awsglue.job import Job
import time

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

job = Job(glueContext)
job.init(args['JOB_NAME'], args)

output_path = "s3://some_bucket/output/" + str(time.time()) + "/"
mongo_uri = "mongodb://<mongo-instanced-ip-address>:27017"
mongo_ssl_uri = "mongodb://<mongo-instanced-ip-address>:27017"
write_uri = "mongodb://<mongo-instanced-ip-address>:27017"

read_mongo_options = {
    "uri": mongo_uri,
    "database": "{{mongodbName}}",
    "collection": "{{mongodbCollection}}",
    "username": "{{mongodbUsername}}",
    "password": "{{mongodbPassword}}",
    "partitioner": "MongoSamplePartitioner",
    "partitionerOptions.partitionSizeMB": "10",
    "partitionerOptions.partitionKey": "_id"}

ssl_mongo_options = {
    "uri": mongo_ssl_uri,
    "database": "{{mongodbName}}",
    "collection": "{{mongodbCollection}}",
    "ssl": "true",
    "ssl.domain_match": "false"
}

write_mongo_options = {
    "uri": write_uri,
    "database": "{{mongodbName}}",
    "collection": "{{mongodbCollection}}",
    "username": "{{mongodbUsername}}",
    "password": "{{mongodbPassword}}",
}

# Get DynamicFrame from MongoDB
dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="mongodb",
                                                              connection_options=read_mongo_options)

# Write DynamicFrame to MongoDB
glueContext.write_dynamic_frame.from_options(dynamicFrame, connection_type="mongodb", connection_options=write_mongo_options)

job.commit()
```

------
#### [ Scala ]

```
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.DynamicFrame
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._

object GlueApp {
  val DEFAULT_URI: String = "mongodb://<mongo-instanced-ip-address>:27017"
  val WRITE_URI: String = "mongodb://<mongo-instanced-ip-address>:27017"
  lazy val defaultJsonOption = jsonOptions(DEFAULT_URI)
  lazy val writeJsonOption = jsonOptions(WRITE_URI)
  def main(sysArgs: Array[String]): Unit = {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)

    // Get DynamicFrame from MongoDB
    val dynamicFrame: DynamicFrame = glueContext.getSource("mongodb", defaultJsonOption).getDynamicFrame()

    // Write DynamicFrame to MongoDB
    glueContext.getSink("mongodb", writeJsonOption).writeDynamicFrame(dynamicFrame)

    Job.commit()
  }

  private def jsonOptions(uri: String): JsonOptions = {
    new JsonOptions(
      s"""{"uri": "${uri}",
         |"database":"{{mongodbName}}",
         |"collection":"{{mongodbCollection}}",
         |"username": "{{mongodbUsername}}",
         |"password": "{{mongodbPassword}}",
         |"ssl":"true",
         |"ssl.domain_match":"false",
         |"partitioner": "MongoSamplePartitioner",
         |"partitionerOptions.partitionSizeMB": "10",
         |"partitionerOptions.partitionKey": "_id"}""".stripMargin)
  }
}
```

------

## MongoDB 连接选项参考
<a name="aws-glue-programming-etl-connect-mongodb"></a>

指定与 MongoDB 的连接。源连接和接收器连接的连接选项不同。

源连接和接收器连接之间会共享以下连接属性：
+ `connectionName` - 用于读/写。为了向您的连接方法提供身份验证和网络信息而配置的 Amazon Glue MongoDB 连接的名称。如果在按照上一节“[配置 MongoDB 连接](#aws-glue-programming-etl-connect-mongodb-configure)”所述配置 Amazon Glue 连接时提供 `connectionName`，则不再需要提供 `"uri"`、`"username"` 和 `"password"` 连接选项。
+ `"uri"`：（必需）要从中读取数据的 MongoDB 主机，格式为 `mongodb://<host>:<port>`。适用于 Amazon Glue 4.0 之前的 Amazon Glue 版本。
+ `"connection.uri"`：（必需）要从中读取数据的 MongoDB 主机，格式为 `mongodb://<host>:<port>`。适用于 Amazon Glue 4.0 及更高版本。
+ `"username"`：（必需）MongoDB 用户名。
+ `"password"`：（必需）MongoDB 密码。
+ `"database"`：（必需）要从中读取数据的 MongoDB 数据库。当在您的任务脚本中调用 `glue_context.create_dynamic_frame_from_catalog` 时，此选项还可以在 `additional_options` 中传递。
+ `"collection"`：（必需）要从中读取数据的 MongoDB 集合。当在您的任务脚本中调用 `glue_context.create_dynamic_frame_from_catalog` 时，此选项还可以在 `additional_options` 中传递。

### "connectionType": "mongodb" as Source
<a name="etl-connect-mongodb-as-source"></a>

将 `"connectionType": "mongodb"` 用作源时可使用以下连接选项：
+ `"ssl"`：（可选）如果为 `true`，则启动 SSL 连接。默认值为 `false`。
+ `"ssl.domain_match"`：（可选）如果为 `true`，且 `ssl` 为 `true`，则执行域匹配检查。默认值为 `true`。
+ `"batchSize"`：（可选）每个批处理返回的文档数量，在内部批处理的游标中使用。
+ `"partitioner"`：（可选）从 MongoDB 中读取输入数据的分区器的类名称。该连接器提供以下分区器：
  + `MongoDefaultPartitioner`（默认）（Amazon Glue 4.0 不支持）
  + `MongoSamplePartitioner`（需要 MongoDB 3.2 或更高版本）（但 Amazon Glue 4.0 不支持）
  + `MongoShardedPartitioner`（Amazon Glue 4.0 不支持）
  + `MongoSplitVectorPartitioner`（Amazon Glue 4.0 不支持）
  + `MongoPaginateByCountPartitioner`（Amazon Glue 4.0 不支持）
  + `MongoPaginateBySizePartitioner`（Amazon Glue 4.0 不支持）
  + `com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner`
  + `com.mongodb.spark.sql.connector.read.partitioner.ShardedPartitioner`
  + `com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner`
+ `"partitionerOptions"`：（可选）指定分区器的选项。各个分区器支持的选项如下：
  + `MongoSamplePartitioner`: `partitionKey`, `partitionSizeMB`, `samplesPerPartition`
  + `MongoShardedPartitioner`: `shardkey`
  + `MongoSplitVectorPartitioner`: `partitionKey`, `partitionSizeMB`
  + `MongoPaginateByCountPartitioner`: `partitionKey`, `numberOfPartitions`
  + `MongoPaginateBySizePartitioner`: `partitionKey`, `partitionSizeMB`

  有关这些选项的更多信息，请参阅 MongoDB 文档中的[分区器配置](https://docs.mongodb.com/spark-connector/master/configuration/#partitioner-conf)。

### "connectionType": "mongodb" as Sink
<a name="etl-connect-mongodb-as-sink"></a>

将 `"connectionType": "mongodb"` 用作连接器时可使用以下连接选项：
+ `"ssl"`：（可选）如果为 `true`，则启动 SSL 连接。默认值为 `false`。
+ `"ssl.domain_match"`：（可选）如果为 `true`，且 `ssl` 为 `true`，则执行域匹配检查。默认值为 `true`。
+ `"extendedBsonTypes"`：（可选）如果为 `true`，则在 MongoDB 中写入数据时会允许扩展 BSON 类型。默认值为 `true`。
+ `"replaceDocument"`：（可选）如果为 `true`，则在保存包含 `_id` 字段的数据集时会替换整个文档。如果为 `false`，则只会更新文档中与数据集中的字段匹配的字段。默认值为 `true`。
+ `"maxBatchSize"`：（可选）保存数据时的批量操作的最大批次大小。默认值为 512。
+ `"retryWrites"`：（可选）：如果 Amazon Glue 遇到网络错误，则会自动重试某些写入操作一次。