教程:使用 DynamoDB Streams 和 Lambda 处理新项目 - Amazon DynamoDB
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

教程:使用 DynamoDB Streams 和 Lambda 处理新项目

在本教程中,您将创建 Amazon Lambda 触发器以处理来自 DynamoDB 表的流。

本教程的场景就是 Woofer 这个简单的社交网络。Woofer 用户使用发送给其他 Woofer 用户的 bark(短文本消息)进行通信。下图显示了此应用程序的组件和工作流。

  1. 用户将项目写入 DynamoDB 表 (BarkTable)。表中的每个项目代表一个 bark。

  2. 写入新的流记录,体现添加到 BarkTable 中的新项目。

  3. 新的流记录触发 Amazon Lambda 函数 (publishNewBark)。

  4. 如果流记录指示新项目已添加到 BarkTable,则 Lambda 函数会从流记录读取数据并将消息发布到 Amazon Simple Notification Service (Amazon SNS) 中的主题。

  5. Amazon SNS 主题的订阅者收到消息。(在本教程中,唯一的订阅者是一个电子邮件地址。)

开始前的准备工作

本教程使用 Amazon Command Line Interface Amazon CLI。如果您尚未配置,请按照 Amazon Command Line Interface 用户指南中的说明安装和配置 Amazon CLI。

步骤 1:创建启用了流的 DynamoDB 表

在此步骤中,您将创建 DynamoDB 表 (BarkTable) 以存储来自 Woofer 用户的所有 bark。主键由 Username(分区键)和 Timestamp(排序键)组成。这两个属性的类型为字符串。

BarkTable 启用了流。在本教程后面的部分中,您通过将 Amazon Lambda 函数与流关联来创建触发器。

  1. 输入以下命令以创建表。

    aws dynamodb create-table \ --table-name BarkTable \ --attribute-definitions AttributeName=Username,AttributeType=S AttributeName=Timestamp,AttributeType=S \ --key-schema AttributeName=Username,KeyType=HASH AttributeName=Timestamp,KeyType=RANGE \ --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \ --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES
  2. 在输出中,查找 LatestStreamArn

    ... "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp ...

    记录 regionaccountID,因为您在本教程接下来的步骤中需要这些信息。

步骤 2:创建 Lambda 执行角色

在此步骤中,您将创建 Amazon Identity and Access Management (IAM) 角色 (WooferLambdaRole) 并向其分配权限。此角色将由您在步骤 4:创建并测试 Lambda 函数中创建的 Lambda 函数使用。

您还将为角色创建策略。策略包含 Lambda 函数在运行时需要的所有权限。

  1. 使用以下内容创建名为 trust-relationship.json 的文件。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }
  2. 输入以下命令来创建 WooferLambdaRole

    aws iam create-role --role-name WooferLambdaRole \ --path "/service-role/" \ --assume-role-policy-document file://trust-relationship.json
  3. 使用以下内容创建名为 role-policy.json 的文件。(将 regionaccountID 替换为您的 Amazon 区域和帐户 ID。)

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "lambda:InvokeFunction", "Resource": "arn:aws:lambda:region:accountID:function:publishNewBark*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:region:accountID:*" }, { "Effect": "Allow", "Action": [ "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams" ], "Resource": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/*" }, { "Effect": "Allow", "Action": [ "sns:Publish" ], "Resource": [ "*" ] } ] }

    策略有四个语句,允许 WooferLambdaRole 执行以下操作:

    • 运行 Lambda 函数 (publishNewBark)。您将在本教程的后面部分中创建函数。

    • 访问 Amazon CloudWatch Logs Lambda 函数在运行时将诊断信息写入 CloudWatch Logs。

    • BarkTable 的 DynamoDB 流读取数据。

    • 向 Amazon SNS 发布消息。

  4. 输入以下命令以将策略附加到 WooferLambdaRole

    aws iam put-role-policy --role-name WooferLambdaRole \ --policy-name WooferLambdaRolePolicy \ --policy-document file://role-policy.json

第 3 步:创建 Amazon SNS 主题

在此步骤中,您将创建 Amazon SNS 主题 (wooferTopic) 并使用电子邮件地址订阅该主题。您的 Lambda 函数使用此主题发布来自 Woofer 用户的新 bark。

  1. 输入以下命令以创建新 Amazon SNS 主题。

    aws sns create-topic --name wooferTopic
  2. 输入以下命令以使用电子邮件地址订阅 wooferTopic。(使用您的 Amazon 区域和账户 ID 替换 regionaccountID,并使用有效的电子邮件地址替换 example@example.com。)

    aws sns subscribe \ --topic-arn arn:aws:sns:region:accountID:wooferTopic \ --protocol email \ --notification-endpoint example@example.com
  3. Amazon SNS 将向您的电子邮件地址发送确认邮件。选择该邮件中的确认订阅链接以完成订阅过程。

步骤 4:创建并测试 Lambda 函数

在此步骤中,您将创建 Amazon Lambda 函数 (publishNewBark) 以处理来自 BarkTable 的流记录。

publishNewBark 函数仅处理与 BarkTable 中的新项目对应的流事件。该函数从此类事件读取数据,然后调用 Amazon SNS 以发布该事件。

  1. 使用以下内容创建名为 publishNewBark.js 的文件。将 regionaccountID 替换为您的 Amazon 区域和帐户 ID。

    'use strict'; var AWS = require("aws-sdk"); var sns = new AWS.SNS(); exports.handler = (event, context, callback) => { event.Records.forEach((record) => { console.log('Stream record: ', JSON.stringify(record, null, 2)); if (record.eventName == 'INSERT') { var who = JSON.stringify(record.dynamodb.NewImage.Username.S); var when = JSON.stringify(record.dynamodb.NewImage.Timestamp.S); var what = JSON.stringify(record.dynamodb.NewImage.Message.S); var params = { Subject: 'A new bark from ' + who, Message: 'Woofer user ' + who + ' barked the following at ' + when + ':\n\n ' + what, TopicArn: 'arn:aws:sns:region:accountID:wooferTopic' }; sns.publish(params, function(err, data) { if (err) { console.error("Unable to send message. Error JSON:", JSON.stringify(err, null, 2)); } else { console.log("Results from sending message: ", JSON.stringify(data, null, 2)); } }); } }); callback(null, `Successfully processed ${event.Records.length} records.`); };
  2. 创建包含 publishNewBark.js 的 zip 文件。如果您有 zip 命令行实用程序,则可以输入以下命令来完成此操作。

    zip publishNewBark.zip publishNewBark.js
  3. 当您创建 Lambda 函数时,为 WooferLambdaRole 指定您在 步骤 2:创建 Lambda 执行角色 中创建的 Amazon Resource Name (ARN)。输入以下命令检索此 ARN。

    aws iam get-role --role-name WooferLambdaRole

    在输出中,查找 WooferLambdaRole 的 ARN。

    ... "Arn": "arn:aws:iam::region:role/service-role/WooferLambdaRole" ...

    输入以下命令以创建 Lambda 函数。将 roleARN 替换为 WooferLambdaRole 的 ARN。

    aws lambda create-function \ --region region \ --function-name publishNewBark \ --zip-file fileb://publishNewBark.zip \ --role roleARN \ --handler publishNewBark.handler \ --timeout 5 \ --runtime nodejs10.x
  4. 现在测试 publishNewBark,验证它可以正常使用。为此,您将提供类似于来自 DynamoDB Streams 的真实记录的输入。

    使用以下内容创建名为 payload.json 的文件。

    { "Records": [ { "eventID": "7de3041dd709b024af6f29e4fa13d34c", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "region", "dynamodb": { "ApproximateCreationDateTime": 1479499740, "Keys": { "Timestamp": { "S": "2016-11-18:12:09:36" }, "Username": { "S": "John Doe" } }, "NewImage": { "Timestamp": { "S": "2016-11-18:12:09:36" }, "Message": { "S": "This is a bark from the Woofer social network" }, "Username": { "S": "John Doe" } }, "SequenceNumber": "13021600000000001596893679", "SizeBytes": 112, "StreamViewType": "NEW_IMAGE" }, "eventSourceARN": "arn:aws:dynamodb:region:123456789012:table/BarkTable/stream/2016-11-16T20:42:48.104" } ] }

    输入以下命令以测试 publishNewBark 函数。

    aws lambda invoke --function-name publishNewBark --payload file://payload.json output.txt

    如果测试成功,您将看到以下输出。

    { "StatusCode": 200 }

    此外,output.txt 文件将包含以下文本。

    "Successfully processed 1 records."

    您还会在数分钟内收到一封新电子邮件。

    注意

    Amazon Lambda 将诊断信息写入 Amazon CloudWatch Logs。如果您的 Lambda 函数出现错误,可以使用这些诊断信息排除故障:

    1. 通过以下网址打开 CloudWatch 控制台:https://console.aws.amazon.com/cloudwatch/

    2. 在导航窗格中,选择日志

    3. 选择下列日志组:/aws/lambda/publishNewBark

    4. 选择最新日志流以查看函数输出 (以及错误)。

步骤 5:创建并测试触发器

步骤 4:创建并测试 Lambda 函数 中,您测试了 Lambda 函数以确保它正确运行。在此步骤中,关联 Lambda 函数 (publishNewBark) 与事件源(BarkTable 流),创建触发器

  1. 在创建触发器时,您需要为 BarkTable 流指定 ARN。输入以下命令检索此 ARN。

    aws dynamodb describe-table --table-name BarkTable

    在输出中,查找 LatestStreamArn

    ... "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp ...
  2. 输入以下命令以创建触发器。使用实际流 ARN 替换 streamARN

    aws lambda create-event-source-mapping \ --region region \ --function-name publishNewBark \ --event-source streamARN \ --batch-size 1 \ --starting-position TRIM_HORIZON
  3. 测试触发器。键入以下命令以将项目添加到 BarkTable

    aws dynamodb put-item \ --table-name BarkTable \ --item Username={S="Jane Doe"},Timestamp={S="2016-11-18:14:32:17"},Message={S="Testing...1...2...3"}

    您应在数分钟内收到一封新电子邮件。

  4. 打开 DynamoDB 控制台并再将几个项目添加到 BarkTable。您必须为 UsernameTimestamp 属性指定值。(您还应为 Message 指定值,虽然该值并非必需。) 对于添加到 BarkTable 中的每个项目,您应收到一封新电子邮件。

    Lambda 函数仅处理您添加到 BarkTable 的新项目。如果您在表中更新或删除项目,函数不执行任何操作。

注意

Amazon Lambda 将诊断信息写入 Amazon CloudWatch Logs。如果您的 Lambda 函数出现错误,可以使用这些诊断信息排除故障。

  1. 通过以下网址打开 CloudWatch 控制台:https://console.aws.amazon.com/cloudwatch/

  2. 在导航窗格中,选择日志

  3. 选择下列日志组:/aws/lambda/publishNewBark

  4. 选择最新日志流以查看函数输出 (以及错误)。