教程:将 Amazon Lambda 与 Amazon Kinesis 结合使用 - Amazon Lambda
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

教程:将 Amazon Lambda 与 Amazon Kinesis 结合使用

在本教程中,您将创建 Lambda 函数来处理 Kinesis 流中的事件。

  1. 自定义应用程序将记录写入流。

  2. Amazon Lambda 轮询流并在检测到流中的新记录时调用 Lambda 函数。

  3. Amazon Lambda 通过代入您在创建 Lambda 函数时指定的执行角色来运行 Lambda 函数。

先决条件

本教程假设您对 Lambda 基本操作和 Lambda 控制台有一定了解。如果您还没有了解,请按照 使用控制台创建 Lambda 函数 中的说明创建您的第一个 Lambda 函数。

要完成以下步骤,您需要命令行终端或 Shell,以便运行命令。在单独的数据块中列出了命令和预期输出:

aws --version

您应看到以下输出:

aws-cli/2.0.57 Python/3.7.4 Darwin/19.6.0 exe/x86_64

对于长命令,使用转义字符 (\) 将命令拆分为多行。

在 Linux 和 macOS 中,可使用您首选的外壳程序和程序包管理器。在 Windows 10 中,您可以 安装 Windows Subsystem for Linux,获取 Ubuntu 和 Bash 与 Windows 集成的版本。

创建执行角色

创建执行角色,向您的函数授予访问 Amazon 资源的权限。

创建执行角色

  1. 在 IAM 控制台中,打开 Roles(角色)页面

  2. 选择 Create role(创建角色)。

  3. 创建具有以下属性的角色。

    • Trusted entity(可信任的实体)– Amazon Lambda

    • Permissions(权限)– AWSLambdaKinesisExecutionRole

    • Role name(角色名称)– lambda-kinesis-role

AWSLambdaKinesisExecutionRole 策略具有该函数从 Kinesis 中读取项目并将日志写入 CloudWatch Logs 所需的权限。

创建函数

以下示例代码接收 Kinesis 事件输入并对其所包含的消息进行处理。为了展示这个过程,代码会将一些传入的事件数据写入 CloudWatch Logs。

注意

有关使用其他语言的示例代码,请参阅 示例 函数代码

例 index.js

console.log('Loading function'); exports.handler = function(event, context) { //console.log(JSON.stringify(event, null, 2)); event.Records.forEach(function(record) { // Kinesis data is base64 encoded so decode here var payload = Buffer.from(record.kinesis.data, 'base64').toString('ascii'); console.log('Decoded payload:', payload); }); };

创建函数

  1. 将示例代码复制到名为 index.js 的文件中。

  2. 创建部署程序包。

    zip function.zip index.js
  3. 使用 create-function 命令创建 Lambda 函数。

    aws lambda create-function --function-name ProcessKinesisRecords \ --zip-file fileb://function.zip --handler index.handler --runtime nodejs12.x \ --role arn:aws:iam::123456789012:role/lambda-kinesis-role

测试 Lambda 函数

使用 invoke Amazon Lambda CLI 命令和示例 Kinesis 事件手动调用 Lambda 函数。

测试 Lambda 函数

  1. 将以下 JSON 复制到文件中并将其保存为 input.txt

    { "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }
  2. 使用 invoke 命令将事件发送到该函数。

    aws lambda invoke --function-name ProcessKinesisRecords --payload file://input.txt out.txt

    如果您使用 Amazon CLI 版本 2,则 cli-binary-format 选项必填。您还可以在 Amazon CLI Config 文件中配置此选项。

    响应将保存到 out.txt 中。

创建 Kinesis 流

使用 create-stream 命令创建流。

aws kinesis create-stream --stream-name lambda-stream --shard-count 1

运行下面的 describe-stream 命令以获取流 ARN。

aws kinesis describe-stream --stream-name lambda-stream

您应看到以下输出:

{ "StreamDescription": { "Shards": [ { "ShardId": "shardId-000000000000", "HashKeyRange": { "StartingHashKey": "0", "EndingHashKey": "340282366920746074317682119384634633455" }, "SequenceNumberRange": { "StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610" } } ], "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/lambda-stream", "StreamName": "lambda-stream", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 24, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "KeyId": null, "StreamCreationTimestamp": 1544828156.0 } }

您将使用下一步中的流 ARN 来将该流关联到您的 Lambda 函数。

在 Amazon Lambda 中添加事件源

运行以下 Amazon CLI add-event-source 命令。

aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \ --event-source arn:aws:kinesis:us-west-2:123456789012:stream/lambda-stream \ --batch-size 100 --starting-position LATEST

记下映射 ID 以供将来使用。您可以通过运行 list-event-source-mappings 命令获取事件源映射的列表。

aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \ --event-source arn:aws:kinesis:us-west-2:123456789012:stream/lambda-stream

在该响应中,您可以验证状态值是否为 enabled。可以禁用事件源映射,以临时暂停轮询而不丢失任何记录。

测试设置

要测试事件源映射,请将事件记录添加到 Kinesis 流中。--data 值是一个字符串,CLI 先将其编码为 base64 字符串,然后才发送到 Kinesis。您可以多次运行同一命令来向流中添加多条记录。

aws kinesis put-record --stream-name lambda-stream --partition-key 1 \ --data "Hello, this is a test."

Lambda 使用执行角色来读取来自流的记录。然后它将调用 Lambda 函数,批量传递记录。该函数解码每条记录中的数据并记录它,将输出发送到 CloudWatch Logs 中。在 CloudWatch 控制台中查看这些日志。

清除资源

除非您想要保留为本教程创建的资源,否则可立即将其删除。通过删除不再使用的Amazon资源,可防止您的Amazon账户产生不必要的费用。

删除执行角色

  1. 打开 IAM 控制台的 Roles(角色)页面

  2. 选择您创建的执行角色。

  3. 选择 Delete role(删除角色)。

  4. 选择 Yes, delete(是,删除)。

删除 Lambda 函数

  1. 打开 Lamba 控制台的 Functions(函数)页面

  2. 选择您创建的函数。

  3. 选择 Actions (操作),然后选择 Delete (删除)

  4. 选择 Delete

删除 Kinesis 流

  1. 登录到 Amazon Web Services Management Console,然后通过以下网址打开 Kinesis 控制台:https://console.aws.amazon.com/kinesisvideo/home

  2. 选择您创建的流。

  3. 依次选择 Actions(操作)和 Delete(删除)。

  4. 在文本框中输入 delete

  5. 选择 Delete