将 AWS Lambda 与 Amazon DynamoDB 结合使用
您可以使用 AWS Lambda 函数来处理 Amazon DynamoDB 流中的记录。使用 DynamoDB 流,每次更新 DynamoDB 表时,您都可以触发 Lambda 函数以执行额外的工作。
Lambda 从流中读取记录,并使用包含流记录的事件同步调用您的函数。Lambda 以批量方式读取记录并调用您的函数来处理批次中的记录。
例 DynamoDB 流 记录事件
{ "Records": [ { "eventID": "1", "eventVersion": "1.0", "dynamodb": { "Keys": { "Id": { "N": "101" } }, "NewImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES", "SequenceNumber": "111", "SizeBytes": 26 }, "awsRegion": "us-west-2", "eventName": "INSERT", "eventSourceARN": eventsourcearn, "eventSource": "aws:dynamodb" }, { "eventID": "2", "eventVersion": "1.0", "dynamodb": { "OldImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "SequenceNumber": "222", "Keys": { "Id": { "N": "101" } }, "SizeBytes": 59, "NewImage": { "Message": { "S": "This item has changed" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "awsRegion": "us-west-2", "eventName": "MODIFY", "eventSourceARN": sourcearn, "eventSource": "aws:dynamodb" }
Lambda 将针对记录轮询 DynamoDB 流中的分片(按照每秒 4 次的基本频率)。当记录可用时,Lambda 调用您的函数并等待结果。如果处理成功,Lambda 将恢复轮询,直到它收到更多记录。
默认情况下,只要流中有记录,Lambda 就会调用您的函数。如果 Lambda 从流中读取的批处理中只有一条记录,则 Lambda 只会向该函数发送一条记录。为避免在记录数量较少的情况下调用该函数,您可以配置批处理时段,让事件源缓冲最多 5 分钟的记录。在调用该函数之前,Lambda 会继续从流中读取记录,直到收集了完整批次,或者直到批处理时段到期。
如果您的函数返回一个错误,则 Lambda 将重试批处理,直到处理成功或数据过期。为避免分片停滞,可以将事件源映射配置为以较小的批处理大小重试,限制重试次数或者丢弃太早的记录。要保留丢弃的事件,可以配置事件源映射,以将有关失败批处理的详细信息发送到 SQS 队列或 SNS 主题。
您还可以通过并行处理每个分片的多个批处理来提高并发性。在每个分区中,Lambda 最多可以同时处理 10 个批处理。如果您增加每个分区的并发批处理数量,则 Lambda 仍然需要确保在分区键级别进行有序处理。
将 ParallelizationFactor
设置配置为使用多个 Lambda 调用同时处理 Kinesis 或 DynamoDB 数据流的一个分区。您可以指定 Lambda 通过并行化因子(从 1 [默认值]
到 10)从某个分区轮询的并发批处理数量。例如,如果将 ParallelizationFactor
设置为 2,则最多可以有 200 次并发 Lambda 调用来处理 100 个 Kinesis 数据分区。这有助于在数据卷不稳定且 IteratorAge
较高时提高处理吞吐量。
小节目录
执行角色权限
Lambda 需要以下权限才能管理与您的 DynamoDB 流相关的资源。将这些权限添加到您的函数的执行角色中。
AWSLambdaDynamoDBExecutionRole
托管策略包含这些权限。有关更多信息,请参阅 AWS Lambda 执行角色。
要将失败批处理的记录发送到队列或主题,您的函数需要其他权限。每项目标服务均需要不同的权限,如下所示:
-
Amazon SQS – sqs:SendMessage
-
Amazon SNS – sns:Publish
将流配置为事件源
创建事件源映射以指示 Lambda 将流中的记录发送到 Lambda 函数。您可以创建多个事件源映射,以使用多个 Lambda 函数处理相同的数据,或使用单个函数处理来自多个流的项目。
要在 Lambda 控制台中将您的函数配置为从 DynamoDB 流 读取,请创建 DynamoDB 触发器。
创建触发器
-
打开 Lambda 控制台的“函数”页面
。 -
选择函数。
-
在 Designer 下方,选择 Add trigger (添加触发器)。
-
选择触发器类型。
-
配置所需选项,然后选择 Add (添加)。
Lambda 支持 DynamoDB 事件源的以下选项。
事件源选项
-
DynamoDB 表 – 要从中读取记录的 DynamoDB 表。
-
Batch size (批处理大小) – 每个批处理中发送到函数的记录的数量(最多 1000 条)。Lambda 通过单个调用将批处理中的所有记录传递给函数,前提是事件的总大小未超出同步调用的负载限制 (6 MB)。
-
批处理时段 – 指定在调用函数之前收集记录的最长时间(以秒为单位)。
-
起始位置 – 仅处理新记录或所有现有记录。
-
最新 – 处理已添加到流中的新记录。
-
时间范围 – 处理流中的所有记录。
在处理任何现有记录后,函数将继续处理新记录。
-
-
On-failure destination (故障目标) – SQS 队列或 SNS 主题,用于无法处理的记录。当 Lambda 由于时间太远或已用尽所有重试而丢弃一批记录时,它将有关该批处理的详细信息发送到队列或主题。
-
Retry attempts (重试) – 函数返回错误时 Lambda 重试的最大次数。这不适用于批处理未到达该函数的服务错误或限制。
-
Maximum age of record (最长记录期限) – Lambda 发送到函数的记录的最长期限。
-
Split batch on error (出错时拆分批处理) – 当函数返回错误时,请在重试之前将批处理拆分为两部分。
-
Concurrent batches per shard (每个分片的并发批处理) – 并发处理来自同一分片的多个批处理。
-
已启用 – 设置为 true 可启用事件源映射。设置为 false 可停止处理记录。Lambda 将跟踪已处理的最后一条记录,并在重新启用映射后从停止位置重新开始处理。
对于 Lambda 作为 DynamoDB 触发器的一部分调用的 GetRecords API 调用,您不需要付费。
之后,要管理事件源配置,请在设计器中选择触发器。
事件源映射 API
要使用 AWS CLI 或 AWS 开发工具包
以下示例使用 AWS CLI 将名为 my-function
的函数映射到由 Amazon 资源名称 (ARN) 指定的 DynamoDB 流(批处理大小为 500)。
$
aws lambda create-event-source-mapping --function-name my-function --batch-size 500 --starting-position LATEST \ --event-source-arn arn:aws:dynamodb:
{ "UUID": "14e0db71-5d35-4eb5-b481-8945cf9d10c2", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1560209851.963, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525
配置其他选项,以自定义如何处理批处理,并指定何时丢弃无法处理的记录。以下示例更新事件源映射,以在两次重试之后或者如果失败记录已存在一个小时以上,将失败记录发送到 SQS 队列。
$
aws lambda update-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 --maximum-record-age-in-seconds 3600 --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-2:123456789012:dlq"}}'
{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573243620.0, "LastProcessingResult": "PROBLEM: Function call failed","State": "Updating", "StateTransitionReason": "User action",
"DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }
更新的设置是异步应用的,并且直到该过程完成才反映在输出中。使用 get-event-source-mapping
命令可查看当前状态。
$
aws lambda get-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b
{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573244760.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Enabled", "StateTransitionReason": "User action","DestinationConfig": { "OnFailure": { "Destination": "arn:aws:sqs:us-east-2:123456789012:dlq" } }, "MaximumRecordAgeInSeconds": 3600,
"BisectBatchOnFunctionError": false,"MaximumRetryAttempts": 2
}
要同时处理多个批处理,请使用 --parallelization-factor
选项。
$
aws lambda update-event-source-mapping --uuid 2b733gdc-8ac3-cdf5-af3a-1827b3b11284 \ --parallelization-factor 5
错误处理
从 DynamoDB 流中读取记录的事件源映射将同步调用函数并在出错时重试。如果函数受到限制,或者 Lambda 服务未调用该函数而返回错误,Lambda 将重试,直到记录到期或者超过您在事件源映射上配置的最长期限。
如果函数接收到记录但返回错误,Lambda 将重试,直到批处理中的记录到期、超过最大使用期限或者达到配置的重试配额。对于函数错误,您还可以配置事件源映射,以将失败的批处理拆分为两个批处理。重试较小的批处理可以隔离不良记录并解决超时问题。拆分批处理不计入重试配额。
如果错误处理措施失败,Lambda 将丢弃记录并继续处理流中的批处理。使用默认设置,这意味着不良记录最多可以将针对受影响分片的处理操作阻止one day。为避免这种情况,请以合理的重试次数和适合您使用案例的最长记录期限来配置函数的事件源映射。
要保留废弃批处理的记录,请配置失败事件目标。Lambda 将文档和有关批处理的详细信息发送到目标队列或主题。
配置失败事件记录的目标
-
打开 Lambda 控制台的“函数”页面
。 -
选择函数。
-
在 Designer (设计器) 下,选择 Add destination (添加目标)。
-
对于 Source (源),选择 Stream invocation (流调用)。
-
对于 Stream (流),选择映射到函数的流。
-
对于 Destination type (目标类型),请选择接收调用记录的资源类型。
-
对于 Destination (目标),请选择一个资源。
-
选择保存。
以下示例显示了 DynamoDB 流的调用记录。
例 调用记录
{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:13:49.717Z", "DDBStreamBatchInfo": { "shardId": "shardId-00000001573689847184-864758bb", "startSequenceNumber": "800000000003126276362", "endSequenceNumber": "800000000003126276362", "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z", "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z", "batchSize": 1, "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388" } }
您可以使用此信息从流中检索受影响的记录以进行故障排除。实际的记录不包括在内,因此您必须处理这些记录,并在它们到期并丢失之前从流中检索它们。
Amazon CloudWatch 指标
在您的函数处理完一批记录后,Lambda 将发出 IteratorAge
指标。该指标指示处理完成时批处理中最后一条记录的时间。如果您的函数正在处理新事件,则可使用迭代器期限来估算新记录的添加时间与函数处理新记录的时间之间的延迟。
迭代器期限中的上升趋势可以指示您的函数问题。有关更多信息,请参阅使用 AWS Lambda 函数指标。