AWS Lambda
开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

教程:将 AWS Lambda 与 Amazon Kinesis 结合使用

在本教程中,您将创建一个 Lambda 函数来处理来自 Kinesis 流的事件。下图说明应用程序的流程:

  1. 自定义应用程序将记录写入流。

  2. AWS Lambda 轮询流并在检测到流中的新记录时调用 Lambda 函数。

  3. AWS Lambda 通过代入您在创建 Lambda 函数时指定的执行角色来执行 Lambda 函数。

先决条件

本教程假设您对基本 Lambda 操作和 Lambda 控制台有一定了解。如果尚不了解,请按照开始使用 AWS Lambda中的说明创建您的第一个 Lambda 函数。

为了遵循本指南中的步骤,您需要命令行终端或外壳,以便运行命令。命令显示在列表中,以提示符 ($) 和当前目录名称(如果有)开头:

~/lambda-project$ this is a command this is output

对于长命令,使用转义字符 (\) 将命令拆分到多行中。

在 Linux 和 macOS 中,可使用您首选的外壳程序和程序包管理器。在 Windows 10 中,您可以 安装 Windows Subsystem for Linux,获取 Ubuntu 和 Bash 与 Windows 集成的版本。

创建执行角色

创建执行角色,向您的函数授予访问 AWS 资源的权限。

创建执行角色

  1. 打开 IAM 控制台中的“角色”页面

  2. 选择 Create role (创建角色)

  3. 创建具有以下属性的角色。

    • 可信任的实体AWS Lambda

    • Permissions (权限)AWSLambdaKinesisExecutionRole

    • 角色名称 (角色名称)lambda-kinesis-role

AWSLambdaKinesisExecutionRole 策略具有该函数从 Kinesis 中读取项目并将日志写入 CloudWatch Logs 所需的权限。

创建函数

以下示例代码接收 Kinesis 事件输入并对其所包含的消息进行处理。为了展示这个过程,代码会将一些传入的事件数据写入 CloudWatch Logs。

注意

有关使用其他语言的示例代码,请参阅 示例函数代码

例 index.js

console.log('Loading function'); exports.handler = function(event, context) { //console.log(JSON.stringify(event, null, 2)); event.Records.forEach(function(record) { // Kinesis data is base64 encoded so decode here var payload = new Buffer(record.kinesis.data, 'base64').toString('ascii'); console.log('Decoded payload:', payload); }); };

创建函数

  1. 将示例代码复制到名为 index.js 的文件中。

  2. 创建部署程序包。

    $ zip function.zip index.js
  3. 使用 create-function 命令创建 Lambda 函数。

    $ aws lambda create-function --function-name ProcessKinesisRecords \ --zip-file fileb://function.zip --handler index.handler --runtime nodejs8.10 \ --role arn:aws:iam::123456789012:role/lambda-kinesis-role

测试 Lambda 函数。

使用 invoke AWS Lambda CLI 命令和示例 Kinesis 事件手动调用 Lambda 函数。

测试 Lambda 函数

  1. 将以下 JSON 复制到文件中并将其保存为 input.txt

    { "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }
  2. 使用 invoke 命令将事件发送到该函数。

    $ aws lambda invoke --function-name ProcessKinesisRecords --payload file://input.txt out.txt

    响应将保存到 out.txt 中。

创建 Kinesis 流

使用 create-stream 命令创建流。

$ aws kinesis create-stream --stream-name lambda-stream --shard-count 1

运行下面的 describe-stream 命令以获取流 ARN。

$ aws kinesis describe-stream --stream-name lambda-stream { "StreamDescription": { "Shards": [ { "ShardId": "shardId-000000000000", "HashKeyRange": { "StartingHashKey": "0", "EndingHashKey": "340282366920746074317682119384634633455" }, "SequenceNumberRange": { "StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610" } } ], "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/lambda-stream", "StreamName": "lambda-stream", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 24, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "KeyId": null, "StreamCreationTimestamp": 1544828156.0 } }

您将使用下一步中的流 ARN 来将该流关联到您的 Lambda 函数。

在 AWS Lambda 中添加事件源。

运行以下 AWS CLI add-event-source 命令。

$ aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \ --event-source arn:aws:kinesis:us-west-2:123456789012:stream/lambda-stream \ --batch-size 100 --starting-position LATEST

记下映射 ID 以供将来使用。您可以通过运行 list-event-source-mappings 命令获取事件源映射的列表。

$ aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \ --event-source arn:aws:kinesis:us-west-2:123456789012:stream/lambda-stream

在该响应中,您可以验证状态值是否为 enabled。可以禁用事件源映射,以暂停临时丢失任何记录的轮询。

测试设置

要测试事件源映射,请将事件记录添加到 Kinesis 流中。--data 值是一个字符串,CLI 先将其编码为 base64 字符串,然后才发送到 Kinesis。您可以多次运行同一命令来向流中添加多条记录。

$ aws kinesis put-record --stream-name lambda-stream --partition-key 1 \ --data "Hello, this is a test."

Lambda 使用执行角色来读取来自流的记录。然后它将调用 Lambda 函数,批量传递记录。该函数解码每条记录中的数据并记录它,将输出发送到 CloudWatch Logs 中。在 CloudWatch 控制台中查看这些日志。