AWS Lambda
开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

将 AWS Lambda 与 Amazon Kinesis 结合使用

您可以使用 AWS Lambda 函数来处理 Amazon Kinesis 数据流中的记录。借助 Kinesis,您可以从多个源收集数据并与多个使用者一起处理这些数据。Lambda 支持标准数据流迭代器和 HTTP/2 流使用者。

Lambda 从数据流中读取记录,并使用包含流记录的事件同步调用您的函数。Lambda 以批量方式读取记录并调用您的函数来处理批次中的记录。

例 Kinesis 记录事件

{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }

如果您有多个应用程序正在从同一个流中读取记录,则可以使用 Kinesis 流使用者而不是标准迭代器。使用者具有专用读取吞吐量,因此它们不必与相同数据的其他使用者竞争。对于使用者,Kinesis 通过 HTTP/2 连接将记录推送到 Lambda,这也可以减少添加记录和函数调用之间的延迟。

如果您的函数返回一个错误,则 Lambda 将重试批处理,直到处理成功或数据过期。在此问题得到解决之前,不会处理分片中的任何数据。为避免分片停滞和潜在的数据丢失,请确保处理和记录代码中的处理错误。

配置您的数据流和函数

您的 Lambda 函数是数据流的用户应用程序。对于每个分片,它一次处理一批记录。您可以将 Lambda 函数映射到数据流(标准迭代器),或映射到流的使用者(增强型扇出功能)。

对于标准迭代器,Lambda 将针对记录轮询 Kinesis 流中的每个分片(按照每秒一次的基本频率)。当有多条记录可用时,Lambda 将继续进行批处理,直到它收到小于配置的最大批处理大小的批处理。该函数与分片的其他使用者共享读取吞吐量。

要最大程度地降低延迟并最大程度提高读取吞吐量,请创建数据流使用者。流使用者将获得与每个分片的专用连接,这不会影响从流中读取信息的其他应用程序。如果您有许多应用程序读取相同的数据,或者您正在重新处理具有大记录的流,则专用吞吐量可以提供帮助。

流使用者使用 HTTP/2 通过长期连接将记录推送到 Lambda 并压缩请求头来减少延迟。您可以使用 Kinesis RegisterStreamConsumer API 创建流使用者。

$ aws kinesis register-stream-consumer --consumer-name con1 \ --stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream { "Consumer": { "ConsumerName": "con1", "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608", "ConsumerStatus": "CREATING", "ConsumerCreationTimestamp": 1540591608.0 } }

要提高函数处理记录的速度,请向数据流添加分片。Lambda 将按顺序处理每个分片中的记录,并在您的函数返回错误时停止处理分片中的额外记录。使用更多分片,可以同时处理更多批次,从而降低错误对并发性的影响。

如果您的函数无法向上扩展以针对每个分片处理一个并发执行,请为您的函数请求提升限制预留并发。可用于您的函数的并发应匹配或超出 Kinesis 数据流中的分片数。

执行角色权限

Lambda 需要以下权限才能管理与您的 Kinesis 数据流相关的资源。将这些权限添加到您的函数的执行角色中。

AWSLambdaKinesisExecutionRole 托管策略包含这些权限。有关更多信息,请参阅 AWS Lambda 执行角色

将流配置为事件源

创建事件源映射以指示 Lambda 将数据流中的记录发送到 Lambda 函数。您可以创建多个事件源映射,以使用多个 Lambda 函数处理相同的数据,或使用单个函数处理来自多个数据流的项目。

要在 Lambda 控制台中将您的函数配置为从 Kinesis 读取,请创建 Kinesis 触发器。

创建触发器

  1. 打开 Lambda 控制台 函数页面

  2. 选择函数。

  3. Designer 下方,选择 Add trigger (添加触发器)

  4. 选择触发器类型。

  5. 配置所需选项,然后选择 Add (添加)

Lambda 支持 Kinesis 事件源的以下选项。

事件源选项

  • Kinesis 流 – 从中读取记录的 Kinesis 流。

  • 使用者(可选)– 使用流使用者通过专用连接从流中读取。

  • 批处理大小 – 每个批处理中从分片读取的记录的数量(多达 10,000 条)。Lambda 通过单个调用将批处理中的所有记录传递给函数,前提是事件的总大小未超出同步调用的负载限制 (6 MB)。

  • 开始位置 – 仅处理新记录、所有现有记录或在特定日期之后创建的记录。

    • 最新 – 处理已添加到流中的新记录。

    • 时间范围 – 处理流中的所有记录。

    • 位于时间戳 – 处理从特定时间开始的记录。

    在处理任何现有记录后,函数将继续处理新记录。

  • 已启用 – 禁用事件源可停止处理记录。Lambda 将跟踪已处理的最后一条记录,并在重新启用后从停止位置重新开始处理。

之后,要管理事件源配置,请在设计器中选择触发器。

事件源映射 API

要使用 AWS CLI 创建事件源映射,请使用 CreateEventSourceMapping API。以下示例使用 AWS CLI 将名为 my-function 的函数映射到 Kinesis 数据流。数据流由 Amazon 资源名称 (ARN) 指定,批处理大小为 500,从以 Unix 时间表示的时间戳开始。

$ aws lambda create-event-source-mapping --function-name my-function \ --batch-size 500 --starting-position AT_TIMESTAMP --starting-position-timestamp 1541139109 \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream { "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 500, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1541139209.351, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action" }

要使用一个使用者,请指定使用者的 ARN 而不是流的 ARN。

Amazon CloudWatch 指标

在您的函数处理完一批记录后,Lambda 将发出 IteratorAge 指标。该指标指示处理完成时批处理中最后一条记录的时间。如果您的函数正在处理新事件,则可使用迭代器期限来估算新记录的添加时间与函数处理新记录的时间之间的延迟。

迭代器期限中的上升趋势可以指示您的函数问题。有关更多信息,请参阅对基于 Lambda 的应用程序进行监控和问题排查