AWS Lambda
开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

教程:将 AWS Lambda 与 Amazon Kinesis 结合使用

在本教程中,您将创建一个 Lambda 函数来处理来自 Kinesis 流的事件。下图说明应用程序的流程:

  1. 自定义应用程序将记录写入流。

  2. AWS Lambda 轮询流并在检测到流中的新记录时调用 Lambda 函数。

  3. AWS Lambda 通过代入您在创建 Lambda 函数时指定的执行角色来执行 Lambda 函数。

先决条件

This tutorial assumes that you have some knowledge of basic Lambda operations and the Lambda console. If you haven't already, follow the instructions in 开始使用 AWS Lambda to create your first Lambda function.

To follow the procedures in this guide, you will need a command line terminal or shell to run commands. Commands are shown in listings preceded by a prompt symbol ($) and the name of the current directory, when appropriate:

~/lambda-project$ this is a command this is output

For long commands, an escape character (\) is used to split a command over multiple lines.

On Linux and macOS, use your preferred shell and package manager. On Windows 10, you can install the Windows Subsystem for Linux to get a Windows-integrated version of Ubuntu and Bash.

创建执行角色

创建执行角色,向您的函数授予访问 AWS 资源的权限。

创建执行角色

  1. 打开 IAM 控制台中的“角色”页面

  2. 选择 Create role (创建角色)

  3. 创建具有以下属性的角色。

    • 可信任的实体AWS Lambda

    • Permissions (权限)AWSLambdaKinesisExecutionRole

    • 角色名称 (角色名称)lambda-kinesis-role

AWSLambdaKinesisExecutionRole 策略具有该函数从 Kinesis 中读取项目并将日志写入 CloudWatch Logs 所需的权限。

创建函数

以下示例代码接收 Kinesis 事件输入并对其所包含的消息进行处理。为了展示这个过程,代码会将一些传入的事件数据写入 CloudWatch Logs。

注意

有关使用其他语言的示例代码,请参阅 示例函数代码

例 index.js

console.log('Loading function'); exports.handler = function(event, context) { //console.log(JSON.stringify(event, null, 2)); event.Records.forEach(function(record) { // Kinesis data is base64 encoded so decode here var payload = new Buffer(record.kinesis.data, 'base64').toString('ascii'); console.log('Decoded payload:', payload); }); };

创建函数

  1. 将示例代码复制到名为 index.js 的文件中。

  2. 创建部署程序包。

    $ zip function.zip index.js
  3. 使用 create-function 命令创建 Lambda 函数。

    $ aws lambda create-function --function-name ProcessKinesisRecords \ --zip-file fileb://function.zip --handler index.handler --runtime nodejs8.10 \ --role arn:aws:iam::123456789012:role/lambda-kinesis-role

测试 Lambda 函数。

使用 invoke AWS Lambda CLI 命令和示例 Kinesis 事件手动调用 Lambda 函数。

测试 Lambda 函数

  1. 将以下 JSON 复制到文件中并将其保存为 input.txt

    { "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }
  2. 使用 invoke 命令将事件发送到该函数。

    $ aws lambda invoke --function-name ProcessKinesisRecords --payload file://input.txt out.txt

    响应将保存到 out.txt 中。

创建 Kinesis 流

使用 create-stream 命令创建流。

$ aws kinesis create-stream --stream-name lambda-stream --shard-count 1

运行下面的 describe-stream 命令以获取流 ARN。

$ aws kinesis describe-stream --stream-name lambda-stream { "StreamDescription": { "Shards": [ { "ShardId": "shardId-000000000000", "HashKeyRange": { "StartingHashKey": "0", "EndingHashKey": "340282366920746074317682119384634633455" }, "SequenceNumberRange": { "StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610" } } ], "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/lambda-stream", "StreamName": "lambda-stream", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 24, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "KeyId": null, "StreamCreationTimestamp": 1544828156.0 } }

您将使用下一步中的流 ARN 来将该流关联到您的 Lambda 函数。

在 AWS Lambda 中添加事件源。

运行以下 AWS CLI add-event-source 命令。

$ aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \ --event-source arn:aws:kinesis:us-west-2:123456789012:stream/lambda-stream \ --batch-size 100 --starting-position LATEST

记下映射 ID 以供将来使用。您可以通过运行 list-event-source-mappings 命令获取事件源映射的列表。

$ aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \ --event-source arn:aws:kinesis:us-west-2:123456789012:stream/lambda-stream

在该响应中,您可以验证状态值是否为 enabled。可以禁用事件源映射,以暂停临时丢失任何记录的轮询。

测试设置

要测试事件源映射,请将事件记录添加到 Kinesis 流中。--data 值是 "Hello, this is a test." 字符串的 base64 编码值。您可以多次运行同一命令来向流中添加多条记录。

$ aws kinesis put-record --stream-name lambda-stream --partition-key 1 \ --data "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg=="

Lambda 使用执行角色来读取来自流的记录。然后它将调用 Lambda 函数,批量传递记录。该函数解码每条记录中的数据并记录它,将输出发送到 CloudWatch Logs 中。在 CloudWatch 控制台中查看这些日志。