教程:将 Amazon Lambda 与 Amazon Kinesis 结合使用 - Amazon Lambda
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

教程:将 Amazon Lambda 与 Amazon Kinesis 结合使用

在本教程中,您将创建一个 Lambda 函数来处理来自 Kinesis 流的事件。下图说明应用程序的流程:

  1. 自定义应用程序将记录写入流。

  2. Amazon Lambda 轮询流并在检测到流中的新记录时调用 Lambda 函数。

  3. Amazon Lambda 通过代入您在创建 Lambda 函数时指定的执行角色来运行 Lambda 函数。

先决条件

本教程假设您对基本 Lambda 操作和 Lambda 控制台有一定了解。如果您尚不了解,请按照开始使用 Lambda中的说明创建您的第一个 Lambda 函数。

要完成以下步骤,您需要命令行终端或 Shell 以运行命令。命令和预期输出在单独的数据块中列出:

aws --version

您应看到以下输出:

aws-cli/2.0.57 Python/3.7.4 Darwin/19.6.0 exe/x86_64

对于长命令,使用转义字符 (\) 将命令拆分到多行中。

在 Linux 和 macOS 中,可使用您首选的外壳程序和程序包管理器。在 Windows 10 中,您可以 安装 Windows Subsystem for Linux,获取 Ubuntu 和 Bash 与 Windows 集成的版本。

创建执行角色

创建执行角色,向您的函数授予访问 Amazon 资源的权限。

创建执行角色

  1. 打开 IAM 控制台中的“角色”页面

  2. 选择 Create role (创建角色)

  3. 创建具有以下属性的角色。

    • 可信任的实体Amazon Lambda

    • Permissions (权限)AWSLambdaKinesisExecutionRole

    • 角色名称 (角色名称)lambda-kinesis-role

AWSLambdaKinesisExecutionRole 策略具有该函数从 Kinesis 中读取项目并将日志写入 CloudWatch Logs 所需的权限。

创建函数

以下示例代码接收 Kinesis 事件输入并对其所包含的消息进行处理。为了展示这个过程,代码会将一些传入的事件数据写入 CloudWatch Logs。

注意

有关使用其他语言的示例代码,请参阅 示例函数代码

例 index.js

console.log('Loading function'); exports.handler = function(event, context) { //console.log(JSON.stringify(event, null, 2)); event.Records.forEach(function(record) { // Kinesis data is base64 encoded so decode here var payload = Buffer.from(record.kinesis.data, 'base64').toString('ascii'); console.log('Decoded payload:', payload); }); };

创建函数

  1. 将示例代码复制到名为 index.js 的文件中。

  2. 创建部署程序包。

    zip function.zip index.js
  3. 使用 create-function 命令创建 Lambda 函数。

    aws lambda create-function --function-name ProcessKinesisRecords \ --zip-file fileb://function.zip --handler index.handler --runtime nodejs12.x \ --role arn:aws-cn:iam::123456789012:role/lambda-kinesis-role

测试 Lambda 函数

使用 invoke Amazon Lambda CLI 命令和示例 Kinesis 事件手动调用 Lambda 函数。

测试 Lambda 函数

  1. 将以下 JSON 复制到文件中并将其保存为 input.txt

    { "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws-cn:iam::123456789012:role/lambda-kinesis-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws-cn:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }
  2. 使用 invoke 命令将事件发送到该函数。

    aws lambda invoke --function-name ProcessKinesisRecords --payload file://input.txt out.txt
    注意

    如果您使用的是 Amazon CLI 版本 2,请添加以下命令参数:

    --cli-binary-format raw-in-base64-out

    响应将保存到 out.txt 中。

创建 Kinesis 流

使用 create-stream 命令创建流。

aws kinesis create-stream --stream-name lambda-stream --shard-count 1

运行下面的 describe-stream 命令以获取流 ARN。

aws kinesis describe-stream --stream-name lambda-stream

您应看到以下输出:

{ "StreamDescription": { "Shards": [ { "ShardId": "shardId-000000000000", "HashKeyRange": { "StartingHashKey": "0", "EndingHashKey": "340282366920746074317682119384634633455" }, "SequenceNumberRange": { "StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610" } } ], "StreamARN": "arn:aws-cn:kinesis:us-west-2:123456789012:stream/lambda-stream", "StreamName": "lambda-stream", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 24, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "KeyId": null, "StreamCreationTimestamp": 1544828156.0 } }

您将使用下一步中的流 ARN 来将该流关联到您的 Lambda 函数。

在 Amazon Lambda 中添加事件源

运行以下 Amazon CLI add-event-source 命令。

aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \ --event-source arn:aws-cn:kinesis:us-west-2:123456789012:stream/lambda-stream \ --batch-size 100 --starting-position LATEST

记下映射 ID 以供将来使用。您可以通过运行 list-event-source-mappings 命令获取事件源映射的列表。

aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \ --event-source arn:aws-cn:kinesis:us-west-2:123456789012:stream/lambda-stream

在该响应中,您可以验证状态值是否为 enabled。可以禁用事件源映射,以临时暂停轮询而不丢失任何记录。

测试设置

要测试事件源映射,请将事件记录添加到 Kinesis 流中。--data 值是一个字符串,CLI 先将其编码为 base64 字符串,然后才发送到 Kinesis。您可以多次运行同一命令来向流中添加多条记录。

aws kinesis put-record --stream-name lambda-stream --partition-key 1 \ --data "Hello, this is a test."

Lambda 使用执行角色来读取来自流的记录。然后它将调用 Lambda 函数,批量传递记录。该函数解码每条记录中的数据并记录它,将输出发送到 CloudWatch Logs 中。在 CloudWatch 控制台中查看这些日志。

清除资源

如果您不想保留为本教程创建的资源,可以立即将其删除。通过删除您不再使用的 Amazon 资源,可防止您的 Amazon 账户产生不必要的费用。

删除执行角色

  1. 打开 IAM 控制台的“角色”页面

  2. 选择您创建的执行角色。

  3. 选择删除角色

  4. 选择 Yes, delete (是,删除)

删除 Lambda 函数

  1. 打开 Lambda 控制台的“函数”页面

  2. 选择您创建的函数。

  3. 依次选择 ActionsDelete

  4. 选择删除

删除 Kinesis 流

  1. 登录 Amazon Web Services Management Console并通过以下网址打开 Kinesis 控制台:https://console.amazonaws.cn/kinesis

  2. 选择您创建的流。

  3. 依次选择 ActionsDelete

  4. 在文本框中输入 delete

  5. 选择 Delete