使用 Lambda 处理 Amazon DocumentDB 事件 - Amazon Lambda
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

使用 Lambda 处理 Amazon DocumentDB 事件

您可以通过将 Amazon DocumentDB 集群配置为事件源,从而使用 Lambda 函数来处理 Amazon DocumentDB(与 MongoDB 兼容)更改流中的事件。然后,您可以在 Amazon DocumentDB 集群每次发生数据更改时调用 Lambda 函数,从而实现事件驱动型工作负载的自动化。

注意

Lambda 仅支持 Amazon DocumentDB 4.0 和 5.0 版本。Lambda 不支持 3.6 版本。

此外,对于事件源映射,Lambda 仅支持基于实例的集群和区域性集群。Lambda 不支持 弹性集群全球集群。当使用 Lambda 作为客户端连接到 Amazon DocumentDB 时,此限制不适用。Lambda 可以连接到所有集群类型来执行 CRUD 操作。

Lambda 按照事件到达的顺序依次处理 Amazon DocumentDB 更改流中的事件。因此,您的函数一次只能处理一条来自 DocumentDB 的并发调用。要监控函数,您可以跟踪其并发指标

警告

Lambda 事件源映射至少处理每个事件一次,有可能出现重复处理记录的情况。为避免与重复事件相关的潜在问题,我们强烈建议您将函数代码设为幂等性。要了解更多信息,请参阅 Amazon 知识中心的如何使我的 Lambda 函数具有幂等性

Amazon DocumentDB 事件示例

{ "eventSourceArn": "arn:aws:rds:us-east-1:123456789012:cluster:canaryclusterb2a659a2-qo5tcmqkcl03", "events": [ { "event": { "_id": { "_data": "0163eeb6e7000000090100000009000041e1" }, "clusterTime": { "$timestamp": { "t": 1676588775, "i": 9 } }, "documentKey": { "_id": { "$oid": "63eeb6e7d418cd98afb1c1d7" } }, "fullDocument": { "_id": { "$oid": "63eeb6e7d418cd98afb1c1d7" }, "anyField": "sampleValue" }, "ns": { "db": "test_database", "coll": "test_collection" }, "operationType": "insert" } } ], "eventSource": "aws:docdb" }

有关此示例中的事件及其形状的更多信息,请参阅 MongoDB 文档网站上的更改事件

先决条件和权限

将 Amazon DocumentDB 用作 Lambda 函数的事件源之前,请注意以下先决条件。您必须:

注意

尽管 Lambda 函数的最大超时限制通常为 15 分钟,但 Amazon MSK、自行管理的 Apache Kafka、Amazon DocumentDB、Amazon MQ for ActiveMQ 和 RabbitMQ 的事件源映射,仅支持最大超时限制为 14 分钟的函数。此约束可确保事件源映射可以正确处理函数错误和重试。

网络配置

为了让 Lambda 将 Amazon DocumentDB 集群用作事件源,需要访问集群所在的 Amazon VPC。建议为 Lambda 部署 Amazon PrivateLink VPC 端点。为 Lambda 部署 VPC 端点,如果集群使用身份验证,还需要为 Secrets Manager 部署 VPC 端点。

或者,请确保与 Amazon DocumentDB 集群关联的 VPC 在每个公有子网中都包含一个 NAT 网关。有关更多信息,请参阅 为连接到 VPC 的 Lambda 函数启用互联网访问权限

如果您使用 VPC 端点,则还必须将其配置为启用私有 DNS 名称

在为 Amazon DocumentDB 集群创建事件源映射时,Lambda 会检查集群 VPC 的子网和安全组是否已经存在弹性网络接口(ENI)。如果 Lambda 发现现有 ENI,则会尝试重用这些 ENI。否则,Lambda 会创建新的 ENI 来连接到事件源并调用函数。

注意

Lambda 函数始终在 Lambda 服务拥有的 Amazon VPC 中运行。这些 VPC 由服务自动维护,对客户不可见。您也可以将函数连接到 Amazon VPC。无论是哪种情况,函数的 VPC 配置都不会影响事件源映射。只有事件源 VPC 的配置才能决定 Lambda 连接到事件源的方式。

VPC 安全组规则

使用以下规则配置包含集群的 Amazon VPC 安全组(最低要求):

  • 入站规则 – 允许为事件源指定的安全组的所有流量通过 Amazon DocumentDB 集群端口。默认情况下,Amazon DocumentDB 使用端口 27017。

  • 出站规则 – 允许所有目标的端口 443 上的所有流量传输。允许 Amazon DocumentDB 集群端口上的所有流量。默认情况下,Amazon DocumentDB 使用端口 27017。

  • 如果您使用的是 VPC 端点而不是 NAT 网关,则与 VPC 端点关联的安全组必须允许来自事件源安全组的端口 443 上的所有入站流量。

使用 VPC 端点

在使用 VPC 端点时,调用函数的 API 调用会使用 ENI 通过这些端点进行路由。Lambda 服务主体需要针对使用这些 ENI 的任何函数调用 lambda:InvokeFunction

默认情况下,VPC 端点的 IAM 策略处于开放状态。最佳做法是,将这些策略限制为仅允许特定主体使用该端点执行所需操作。为确保事件源映射能够调用 Lambda 函数,VPC 端点策略必须允许 Lambda 服务主体调用 lambda:InvokeFunction。将 VPC 端点策略限制为仅允许来自组织内部的 API 调用,会导致事件源映射无法正常运行。

以下 VPC 端点策略示例展示了如何授予 Lambda 端点所需的访问权限。

例 VPC 端点策略 – Lambda 端点
{ "Statement": [ { "Action": "lambda:InvokeFunction", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }

如果 Amazon DocumentDB 集群使用身份验证,您还可以限制 Secrets Manager 端点的 VPC 端点策略。要调用 Secrets Manager API,Lambda 会使用函数角色而非 Lambda 服务主体。以下示例展示了 Secrets Manager 端点策略。

例 VPC 端点策略 – Secrets Manager 端点
{ "Statement": [ { "Action": "secretsmanager:GetSecretValue", "Effect": "Allow", "Principal": { "AWS": [ "customer_function_execution_role_arn" ] }, "Resource": "customer_secret_arn" } ] }

创建 Amazon DocumentDB 事件源映射(控制台)

要配置 Lambda 函数以从 Amazon DocumentDB 集群更改流中读取数据,请创建一个 事件源映射。这一部分介绍了如何从 Lambda 控制台执行此操作。有关 Amazon SDK 和 Amazon CLI 说明,请参阅 创建 Amazon DocumentDB 事件源映射(SDK 或 CLI)

要创建 Amazon DocumentDB 事件源映射(控制台)
  1. 打开 Lamba 控制台的函数页面

  2. 选择一个函数的名称。

  3. Function overview(函数概览)下,选择 Add trigger(添加触发器)。

  4. 触发器配置下的下拉列表中,选择 DocumentDB

  5. 配置必填选项,然后选择 Add(添加)。

Lambda 支持 Amazon DocumentDB 事件源的以下选项:

  • DocumentDB 集群 – 选择一个 Amazon DocumentDB 集群。

  • 激活触发器 – 选择是否要立即激活触发器。如果选中此复选框,则在创建事件源映射后,您的函数会立即开始接收来自指定 Amazon DocumentDB 更改流的流量。我们建议取消选中此复选框,以便在禁用状态下创建事件源映射以进行测试。创建事件源映射后,您可以随时将其激活。

  • 数据库名称 – 输入集群中要使用的数据库的名称。

  • (可选)集合名称 – 输入数据库中要使用的集合的名称。如果您未指定集合,Lambda 将侦听来自数据库中每个集合的所有事件。

  • 批处理大小 – 设置要在单个批处理中检索的最大消息数,最高为 10000。默认批处理大小为 100。

  • 开始位置 – 选择将从流中开始读取记录的位置。

    • 最新 – 仅处理添加到流中的新记录。您的函数仅在 Lambda 完成创建事件源后才会开始处理记录。这意味着在成功创建事件源之前,某些记录可能会被丢弃。

    • 最早:处理流中的所有记录。Lambda 使用集群的日志保留期来确定从哪里开始读取事件。具体而言,Lambda 将从 current_time - log_retention_duration 开始读取事件。更改流必须在此时间戳之前已经处于活动状态,才能确保 Lambda 正确读取所有事件。

    • 在时间戳处:处理从特定时间开始的记录。更改流必须在指定的时间戳之前已经处于活动状态,才能确保 Lambda 正确读取所有事件。

  • 身份验证 – 选择访问集群中的代理时将使用的身份验证方法。

    • BASIC_AUTH – 使用基本身份验证时,您必须提供包含访问集群凭据所需凭证的 Secrets Manager 密钥。

  • Secrets Manager 密钥 – 选择包含访问 Amazon DocumentDB 集群所需身份验证详细信息(用户名和密码)的 Secrets Manager 密钥。

  • (可选)批处理时长 – 在调用函数之前收集记录的最长时间(以秒为单位,最大为 300)。

  • (可选)完整文档配置 – 对于文档更新操作,请选择要发送到流中的内容。默认值为 Default,这意味着对于每个更改流事件,Amazon DocumentDB 仅发送一个描述所发生更改的增量。有关此字段的更多信息,请参阅 MongoDB Javadocs API 文档中的 FullDocument

    • 默认 – Lambda 将仅发送描述所发生更改的部分文档。

    • UpdateLookup – Lambda 将发送一个描述所发生更改的增量以及完整文档的副本。

创建 Amazon DocumentDB 事件源映射(SDK 或 CLI)

要使用 Amazon SDK 来管理 Amazon DocumentDB 事件源映射,您可以使用以下 API 操作:

要使用 Amazon CLI 创建事件源映射,请使用 create-event-source-mapping 命令。以下示例使用此命令将名为 my-function 的函数映射到一个 Amazon DocumentDB 更改流。事件源由 Amazon 资源名称(ARN)指定,批处理大小为 500,从以 Unix 时间表示的时间戳开始。该命令还指定了 Lambda 用来连接到 Amazon DocumentDB 的 Secrets Manager 密钥。此外,它还包含用于指定要从中读取数据的数据库和集合的 document-db-event-source-config 参数。

aws lambda create-event-source-mapping --function-name my-function \ --event-source-arn arn:aws:rds:us-west-2:123456789012:cluster:privatecluster7de2-epzcyvu4pjoy --batch-size 500 \ --starting-position AT_TIMESTAMP \ --starting-position-timestamp 1541139109 \ --source-access-configurations '[{"Type":"BASIC_AUTH","URI":"arn:aws:secretsmanager:us-east-1:123456789012:secret:DocDBSecret-BAtjxi"}]' \ --document-db-event-source-config '{"DatabaseName":"test_database", "CollectionName": "test_collection"}' \

应看到类似如下内容的输出:

{ "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 500, "DocumentDBEventSourceConfig": { "CollectionName": "test_collection", "DatabaseName": "test_database", "FullDocument": "Default" }, "MaximumBatchingWindowInSeconds": 0, "EventSourceArn": "arn:aws:rds:us-west-2:123456789012:cluster:privatecluster7de2-epzcyvu4pjoy", "FunctionArn": "arn:aws:lambda:us-west-2:123456789012:function:my-function", "LastModified": 1541348195.412, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action" }

创建后,您可以使用 update-event-source-mapping 命令更新 Amazon DocumentDB 事件源的设置。以下示例会将批处理大小更新为 1000,并将批处理时长更新为 10 秒。对于此命令,您需要有事件源映射的 UUID,您可以通过 list-event-source-mapping 命令或 Lambda 控制台检索该值。

aws lambda update-event-source-mapping --function-name my-function \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --batch-size 1000 \ --batch-window 10

您应看到如下输出:

{ "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 500, "DocumentDBEventSourceConfig": { "CollectionName": "test_collection", "DatabaseName": "test_database", "FullDocument": "Default" }, "MaximumBatchingWindowInSeconds": 0, "EventSourceArn": "arn:aws:rds:us-west-2:123456789012:cluster:privatecluster7de2-epzcyvu4pjoy", "FunctionArn": "arn:aws:lambda:us-west-2:123456789012:function:my-function", "LastModified": 1541359182.919, "LastProcessingResult": "OK", "State": "Updating", "StateTransitionReason": "User action" }

Lambda 会异步更新设置,因此在更新完成之前,您可能无法在输出中看到这些更改。要查看事件源映射的当前设置,请使用 get-event-source-mapping 命令。

aws lambda get-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b

您应看到如下输出:

{ "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "DocumentDBEventSourceConfig": { "CollectionName": "test_collection", "DatabaseName": "test_database", "FullDocument": "Default" }, "BatchSize": 1000, "MaximumBatchingWindowInSeconds": 10, "EventSourceArn": "arn:aws:rds:us-west-2:123456789012:cluster:privatecluster7de2-epzcyvu4pjoy", "FunctionArn": "arn:aws:lambda:us-west-2:123456789012:function:my-function", "LastModified": 1541359182.919, "LastProcessingResult": "OK", "State": "Enabled", "StateTransitionReason": "User action" }

要删除 Amazon DocumentDB 事件源映射,请使用 delete-event-source-mapping 命令:

aws lambda delete-event-source-mapping \ --uuid 2b733gdc-8ac3-cdf5-af3a-1827b3b11284

轮询和流的起始位置

请注意,事件源映射创建和更新期间的流轮询最终是一致的。

  • 在事件源映射创建期间,可能需要几分钟才能开始轮询来自流的事件。

  • 在事件源映射更新期间,可能需要几分钟才能停止和重新开始轮询来自流的事件。

此行为意味着,如果你指定 LATEST 作为流的起始位置,事件源映射可能会在创建或更新期间错过事件。为确保不会错过任何事件,请将流的起始位置指定为 TRIM_HORIZON 或 AT_TIMESTAMP

监控 Amazon DocumentDB 事件源

为便于您监控 Amazon DocumentDB 事件源,在函数处理完一批记录后,Lambda 将发送 IteratorAge 指标。迭代器期限是最近事件的时间戳和当前时间戳之间的差值。基本上,IteratorAge 指标表示自处理该批处理中最后一条记录后已经过的时间。如果函数当前正在处理新事件,则可使用迭代器期限来估算新记录的添加时间与函数处理该记录的时间之间的延迟。如果 IteratorAge 呈上升趋势,则可能说明您的函数存在问题。有关更多信息,请参阅 使用 Lambda 函数指标

Amazon DocumentDB 更改流未经过优化,无法处理事件之间的较大时间间隔。如果 Amazon DocumentDB 事件源在很长一段时间内都没有收到任何事件,那么 Lambda 可能会禁用事件源映射。此时段的长度可能从几周到几个月不等,具体取决于集群规模和其他工作负载。

Lambda 支持最大 6MB 的负载。但是,Amazon DocumentDB 更改流事件的大小可达 16MB。如果更改流尝试向 Lambda 发送大于 6MB 的更改流事件,Lambda 会丢弃该消息并发送 OversizedRecordCount 指标。Lambda 将尽力发送所有指标。