Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅
中国的 Amazon Web Services 服务入门
(PDF)。
使用 Lambda 处理 Amazon Kinesis Data Streams 记录
要使用 Lambda 处理 Amazon Kinesis Data Streams 记录,可创建 Lambda 事件源映射。您可以将 Lambda 函数映射到标准迭代器或增强型扇出功能使用者。有关更多信息,请参阅轮询和批处理流。
创建 Kinesis 事件源映射
要使用来自数据流的记录调用 Lambda 函数,请创建一个事件源映射。您可以创建多个事件源映射,以使用多个 Lambda 函数处理相同的数据,或使用单个函数处理来自多个数据流的项目。处理来自多个流的项目时,每个批处理将只包含来自单个分片或流的记录。
您可以配置事件源映射来处理来自不同 Amazon Web Services 账户中的流的记录。要了解更多信息,请参阅创建跨账户事件源映射。
在创建事件源映射之前,您需要向您的 Lambda 函数授予读取 Kinesis 数据流中数据的权限。Lambda 需要以下权限才能管理与您的 Kinesis 数据流相关的资源:
Amazon 托管式策略 AWSLambdaKinesisExecutionRole 包含这些权限。按照以下过程所述将此托管式策略添加到您的函数。
-
您不需要 kinesis:ListStreams 权限来创建和管理 Kinesis 的事件源映射。但是,如果您在控制台中创建事件源映射但没有此权限,则无法从下拉列表中选择 Kinesis 流,并且控制台将显示错误。要创建事件源映射,您需要手动输入流的 Amazon 资源名称(ARN)。
-
Lambda 在重试失败的调用时会进行 kinesis:GetRecords 和 kinesis:GetShardIterator API 调用。
- Amazon Web Services 管理控制台
-
为您的函数添加 Kinesis 权限
-
打开 Lambda 控制台的“函数”页面,然后选择函数。
-
在配置选项卡中,选择权限。
-
在执行角色窗格的角色名称下,选择指向函数的执行角色的链接。此链接将在 IAM 控制台中打开该角色的页面。
-
在权限策略窗格中,选择添加权限,然后选择附加策略。
-
在搜索字段中输入 AWSLambdaKinesisExecutionRole。
-
选中该策略名称旁边的复选框,然后选择添加权限。
- Amazon CLI
-
- Amazon SAM
-
为您的函数添加 Kinesis 权限
-
在函数定义中添加 Policies 属性,如以下示例所示:
Resources:
MyFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: ./my-function/
Handler: index.handler
Runtime: nodejs22.x
Policies:
- AWSLambdaKinesisExecutionRole
配置所需的权限后,创建事件源映射。
- Amazon Web Services 管理控制台
-
创建 Kinesis 事件源映射
-
打开 Lambda 控制台的“函数”页面,然后选择函数。
-
在函数概述窗格中,选择添加触发器。
-
在触发器配置下,对于源,请选择 Kinesis 。
-
选择要为其创建事件源映射的 Kinesis 流,也可以选择流的使用者。
-
(可选)编辑事件源映射的批处理大小、起始位置和批处理窗口。
-
选择添加。
在控制台中创建事件源映射时,您的 IAM 角色必须拥有 kinesis:ListStreams 和 kinesis:ListStreamConsumers 权限。
- Amazon CLI
-
创建 Kinesis 事件源映射
-
运行以下 CLI 命令以创建 Kinesis 事件源映射。根据您的应用场景选择自己的批量大小和起始位置。
aws lambda create-event-source-mapping \
--function-name MyFunction \
--event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \
--starting-position LATEST \
--batch-size 100
要指定批处理时间窗,请添加 --maximum-batching-window-in-seconds 选项。有关使用此参数和其他参数的更多信息,请参阅《Amazon CLI Command Reference》中的 create-event-source-mapping。
- Amazon SAM
-
创建 Kinesis 事件源映射
-
在函数定义中添加 KinesisEvent 属性,如以下示例所示:
Resources:
MyFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: ./my-function/
Handler: index.handler
Runtime: nodejs22.x
Policies:
- AWSLambdaKinesisExecutionRole
Events:
KinesisEvent:
Type: Kinesis
Properties:
Stream: !GetAtt MyKinesisStream.Arn
StartingPosition: LATEST
BatchSize: 100
MyKinesisStream:
Type: AWS::Kinesis::Stream
Properties:
ShardCount: 1
要了解有关在 Amazon SAM 中创建 Kinesis 数据流事件源映射的更多信息,请参阅《Amazon Serverless Application Model Developer Guide》中的 Kinesis。
轮询和流的起始位置
请注意,事件源映射创建和更新期间的流轮询最终是一致的。
此行为意味着,如果你指定 LATEST 作为流的起始位置,事件源映射可能会在创建或更新期间错过事件。为确保不会错过任何事件,请将流的起始位置指定为 TRIM_HORIZON 或 AT_TIMESTAMP。
创建跨账户事件源映射
Amazon Kinesis Data Streams 支持基于资源的策略。因此,您可以在一个 Amazon Web Services 账户中使用 Lambda 函数来处理另一个账户的流中摄入的数据。
要使用其他 Amazon Web Services 账户中的 Kinesis 流为您的 Lambda 函数创建事件源映射,您必须使用基于资源的策略配置该流,以向您的 Lambda 函数授予读取相关项目的权限。要了解如何配置流以允许跨账户存取,请参阅《Amazon Kinesis Streams 开发人员指南》中的 Sharing access with cross-account Amazon Lambda functions。
使用基于资源的策略配置流以向您的 Lambda 函数授予所需的权限后,请使用上一节中描述的任何方法创建事件源映射。
如果您选择使用 Lambda 控制台创建事件源映射,请将流的 ARN 直接粘贴到输入字段中。如果您想指定流的使用者,粘贴使用者的 ARN 会自动填充流字段。