AWS Lambda
开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

将 AWS Lambda 与 Amazon DynamoDB 结合使用

您可以使用 AWS Lambda 函数来处理 Amazon DynamoDB Streams 流中的记录。使用 DynamoDB 流,每次更新 DynamoDB 表时,您都可以触发 Lambda 函数以执行额外的工作。

Lambda 从流中读取记录,并使用包含流记录的事件同步调用您的函数。Lambda 以批量方式读取记录并调用您的函数来处理批次中的记录。

例 DynamoDB 流 记录事件

{ "Records": [ { "eventID": "1", "eventVersion": "1.0", "dynamodb": { "Keys": { "Id": { "N": "101" } }, "NewImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES", "SequenceNumber": "111", "SizeBytes": 26 }, "awsRegion": "us-west-2", "eventName": "INSERT", "eventSourceARN": eventsourcearn, "eventSource": "aws:dynamodb" }, { "eventID": "2", "eventVersion": "1.0", "dynamodb": { "OldImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "SequenceNumber": "222", "Keys": { "Id": { "N": "101" } }, "SizeBytes": 59, "NewImage": { "Message": { "S": "This item has changed" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "awsRegion": "us-west-2", "eventName": "MODIFY", "eventSourceARN": sourcearn, "eventSource": "aws:dynamodb" }

Lambda 将针对记录轮询 DynamoDB 流 流中的分片(按照每秒 4 次的基本频率)。当记录可用时,Lambda 调用您的函数并等待结果。如果处理成功,Lambda 将恢复轮询,直到它收到更多记录。

默认情况下,只要流中有记录,Lambda 就会调用您的函数。如果从流中读取的批处理中只有一条记录,则 Lambda 只向该函数发送一条记录。为避免在少量记录的情况下调用该函数,您可以通过配置批处理时段让事件源缓冲记录,最多可达 5 分钟。在调用该函数之前,Lambda 会继续从流中读取记录,直到收集了完整批次,或者直到批处理时段到期。

如果您的函数返回一个错误,则 Lambda 将重试批处理,直到处理成功或数据过期。在此问题得到解决之前,不会处理分片中的任何数据。处理代码中的任何记录处理错误,以避免分片停滞和潜在的数据丢失。

执行角色权限

Lambda 需要以下权限才能管理与您的 DynamoDB 流 流相关的资源。将这些权限添加到您的函数的执行角色中。

AWSLambdaDynamoDBExecutionRole 托管策略包含这些权限。有关更多信息,请参阅 AWS Lambda 执行角色

将流配置为事件源

创建事件源映射以指示 Lambda 将流中的记录发送到 Lambda 函数。您可以创建多个事件源映射,以使用多个 Lambda 函数处理相同的数据,或使用单个函数处理来自多个流的项目。

要在 Lambda 控制台中将您的函数配置为从 DynamoDB 流 读取,请创建 DynamoDB 触发器。

创建触发器

  1. 打开 Lambda 控制台 函数页面

  2. 选择函数。

  3. Designer 下方,选择 Add trigger (添加触发器)

  4. 选择触发器类型。

  5. 配置所需选项,然后选择 Add (添加)

Lambda 支持 DynamoDB 事件源的以下选项。

事件源选项

  • DynamoDB 表 – 要从中读取记录的 DynamoDB 表。

  • 批处理大小 – 每个批处理中从分片读取的记录的数量(多达 1,000 条)。Lambda 通过单个调用将批处理中的所有记录传递给函数,前提是事件的总大小未超出同步调用的负载限制 (6 MB)。

  • 批处理时段 – 指定在调用函数之前收集记录的最长时间(以秒为单位)。

  • 起始位置 – 仅处理新记录或所有现有记录。

    • 最新 – 处理已添加到流中的新记录。

    • 时间范围 – 处理流中的所有记录。

    在处理任何现有记录后,函数将继续处理新记录。

  • 已启用 – 禁用事件源可停止处理记录。Lambda 将跟踪已处理的最后一条记录,并在重新启用映射后从停止位置重新开始处理。

之后,要管理事件源配置,请在设计器中选择触发器。

事件源映射 API

要使用 AWS CLI 或 AWS 开发工具包管理事件源映射,请使用以下 API。

以下示例使用 AWS Command Line Interface 将名为 my-function 的函数映射到由 Amazon 资源名称 (ARN) 指定的 DynamoDB 流 流(批处理大小为 500)

$ aws lambda create-event-source-mapping --function-name my-function --batch-size 500 --starting-position LATEST \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525 { "UUID": "14e0db71-5d35-4eb5-b481-8945cf9d10c2", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 0, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1560209851.963, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action" }

Amazon CloudWatch 指标

在您的函数处理完一批记录后,Lambda 将发出 IteratorAge 指标。该指标指示处理完成时批处理中最后一条记录的时间。如果您的函数正在处理新事件,则可使用迭代器期限来估算新记录的添加时间与函数处理新记录的时间之间的延迟。

迭代器期限中的上升趋势可以指示您的函数问题。有关更多信息,请参阅AWS Lambda 指标