教程:将 Amazon Lambda 与 Amazon Kinesis 结合使用
在本教程中,您将创建 Lambda 函数来处理 Kinesis 流中的事件。
-
自定义应用程序将记录写入流。
-
Amazon Lambda 轮询流并在检测到流中的新记录时调用 Lambda 函数。
-
Amazon Lambda 通过代入您在创建 Lambda 函数时指定的执行角色来运行 Lambda 函数。
先决条件
本教程假设您对 Lambda 基本操作和 Lambda 控制台有一定了解。如果您还没有了解,请按照 使用控制台创建 Lambda 函数 中的说明创建您的第一个 Lambda 函数。
要完成以下步骤,您需要命令行终端或 Shell,以便运行命令。在单独的数据块中列出了命令和预期输出:
aws --version
您应看到以下输出:
aws-cli/2.0.57 Python/3.7.4 Darwin/19.6.0 exe/x86_64
对于长命令,使用转义字符 (\
) 将命令拆分为多行。
在 Linux 和 macOS 中,可使用您首选的外壳程序和程序包管理器。在 Windows 10 中,您可以 安装 Windows Subsystem for Linux
创建执行角色
创建执行角色,向您的函数授予访问 Amazon 资源的权限。
创建执行角色
-
在 IAM 控制台中,打开 Roles(角色)页面
。 -
选择 Create role(创建角色)。
-
创建具有以下属性的角色。
-
Trusted entity(可信任的实体)– Amazon Lambda。
-
Permissions(权限)– AWSLambdaKinesisExecutionRole。
-
Role name(角色名称)–
lambda-kinesis-role
。
-
AWSLambdaKinesisExecutionRole 策略具有该函数从 Kinesis 中读取项目并将日志写入 CloudWatch Logs 所需的权限。
创建函数
以下示例代码接收 Kinesis 事件输入并对其所包含的消息进行处理。为了展示这个过程,代码会将一些传入的事件数据写入 CloudWatch Logs。
有关使用其他语言的示例代码,请参阅 示例 函数代码。
例 index.js
console.log('Loading function'); exports.handler = function(event, context) { //console.log(JSON.stringify(event, null, 2)); event.Records.forEach(function(record) { // Kinesis data is base64 encoded so decode here var payload = Buffer.from(record.kinesis.data, 'base64').toString('ascii'); console.log('Decoded payload:', payload); }); };
创建函数
-
将示例代码复制到名为
index.js
的文件中。 -
创建部署程序包。
zip function.zip index.js
-
使用
create-function
命令创建 Lambda 函数。aws lambda create-function --function-name ProcessKinesisRecords \ --zip-file fileb://function.zip --handler index.handler --runtime nodejs12.x \ --role arn:aws:iam::
123456789012
:role/lambda-kinesis-role
测试 Lambda 函数
使用 invoke
Amazon Lambda CLI 命令和示例 Kinesis 事件手动调用 Lambda 函数。
测试 Lambda 函数
-
将以下 JSON 复制到文件中并将其保存为
input.txt
。{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }
-
使用
invoke
命令将事件发送到该函数。aws lambda invoke --function-name ProcessKinesisRecords --payload file://input.txt out.txt
如果您使用 Amazon CLI 版本 2,则 cli-binary-format 选项必填。您还可以在 Amazon CLI Config 文件中配置此选项。
响应将保存到
out.txt
中。
创建 Kinesis 流
使用 create-stream
命令创建流。
aws kinesis create-stream --stream-name lambda-stream --shard-count 1
运行下面的 describe-stream
命令以获取流 ARN。
aws kinesis describe-stream --stream-name lambda-stream
您应看到以下输出:
{ "StreamDescription": { "Shards": [ { "ShardId": "shardId-000000000000", "HashKeyRange": { "StartingHashKey": "0", "EndingHashKey": "340282366920746074317682119384634633455" }, "SequenceNumberRange": { "StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610" } } ], "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/lambda-stream", "StreamName": "lambda-stream", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 24, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "KeyId": null, "StreamCreationTimestamp": 1544828156.0 } }
您将使用下一步中的流 ARN 来将该流关联到您的 Lambda 函数。
在 Amazon Lambda 中添加事件源
运行以下 Amazon CLI add-event-source
命令。
aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \ --event-source arn:aws:kinesis:us-west-2:123456789012:stream/lambda-stream \ --batch-size 100 --starting-position LATEST
记下映射 ID 以供将来使用。您可以通过运行 list-event-source-mappings
命令获取事件源映射的列表。
aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \ --event-source arn:aws:kinesis:us-west-2:123456789012:stream/lambda-stream
在该响应中,您可以验证状态值是否为 enabled
。可以禁用事件源映射,以临时暂停轮询而不丢失任何记录。
测试设置
要测试事件源映射,请将事件记录添加到 Kinesis 流中。--data
值是一个字符串,CLI 先将其编码为 base64 字符串,然后才发送到 Kinesis。您可以多次运行同一命令来向流中添加多条记录。
aws kinesis put-record --stream-name lambda-stream --partition-key 1 \ --data "Hello, this is a test."
Lambda 使用执行角色来读取来自流的记录。然后它将调用 Lambda 函数,批量传递记录。该函数解码每条记录中的数据并记录它,将输出发送到 CloudWatch Logs 中。在 CloudWatch 控制台
清除资源
除非您想要保留为本教程创建的资源,否则可立即将其删除。通过删除不再使用的Amazon资源,可防止您的Amazon账户产生不必要的费用。
删除执行角色
-
打开 IAM 控制台的 Roles(角色)页面
。 -
选择您创建的执行角色。
-
选择 Delete role(删除角色)。
-
选择 Yes, delete(是,删除)。
删除 Lambda 函数
-
打开 Lamba 控制台的 Functions(函数)页面
。 -
选择您创建的函数。
-
选择 Actions (操作),然后选择 Delete (删除)。
-
选择 Delete。
删除 Kinesis 流
-
登录到 Amazon Web Services Management Console,然后通过以下网址打开 Kinesis 控制台:https://console.aws.amazon.com/kinesisvideo/home
。 -
选择您创建的流。
-
依次选择 Actions(操作)和 Delete(删除)。
-
在文本框中输入
delete
。 -
选择 Delete。