Amazon DynamoDB
开发人员指南 (API Version 2012-08-10)
AWS 服务或AWS文档中描述的功能,可能因地区/位置而异。点 击 Getting Started with Amazon AWS to see specific differences applicable to the China (Beijing) Region.

教程:在 DynamoDB 表中处理新项目

在本教程中,您将创建 AWS Lambda 触发器以处理来自 DynamoDB 表的流。

本教程的场景就是 Woofer 这个简单的社交网络。Woofer 用户使用发送给其他 Woofer 用户的 bark (短文本消息) 进行通信。下图显示了此应用程序的组件和工作流:

  1. 用户将项目写入 DynamoDB 表 (BarkTable)。表中的每个项目代表一个 bark。

  2. 写入的新流记录表示已向 BarkTable 中添加新项目。

  3. 新的流记录触发 AWS Lambda 函数 (publishNewBark)。

  4. 如果流记录指示新项目已添加到 BarkTable,则 Lambda 函数会从流记录读取数据并发布消息到 Amazon Simple Notification Service (Amazon SNS) 中的主题。

  5. Amazon SNS 主题的订阅者收到消息。(在本教程中,唯一的订阅者是一个电子邮件地址。)

在您开始之前

本教程使用 AWS Command Line Interface。如果您尚未配置,请按照 AWS Command Line Interface 用户指南 中的说明安装和配置 AWS CLI。

步骤 1:创建启用了流的 DynamoDB 表

在此步骤中,您将创建 DynamoDB 表 (BarkTable) 以存储来自 Woofer 用户的所有 bark。主键由用户名 (分区键) 和时间戳 (排序键) 组成。这两个属性的类型为字符串。

BarkTable 将启用流。在本教程后面的部分中,您通过将 AWS Lambda 函数与流关联来创建触发器。

  1. 键入以下命令以创建表:

    Copy
    aws dynamodb create-table \ --table-name BarkTable \ --attribute-definitions AttributeName=Username,AttributeType=S AttributeName=Timestamp,AttributeType=S \ --key-schema AttributeName=Username,KeyType=HASH AttributeName=Timestamp,KeyType=RANGE \ --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \ --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES
  2. 在输出中,查找 LatestStreamArn

    Copy
    ... "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp ...

    记录 regionaccountID,因为您在本教程接下来的步骤中需要这些信息。

步骤 2:创建 Lambda 执行角色

在此步骤中,您将创建 IAM 角色 (WooferLambdaRole) 并向其分配权限。此角色将由您在步骤 4:创建并测试 Lambda 函数中创建的 Lambda 函数使用。

您还将为角色创建策略。策略将包含 Lambda 函数在运行时需要的所有权限。

  1. 使用以下内容创建名为 trust-relationship.json 的文件:

    Copy
    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }
  2. 键入以下命令来创建 WooferLambdaRole

    Copy
    aws iam create-role --role-name WooferLambdaRole \ --path "/service-role/" \ --assume-role-policy-document file://trust-relationship.json
  3. 使用以下内容创建名为 role-policy.json 的文件。(使用您的 AWS 区域和账户 ID 替换 regionaccountID。)

    Copy
    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "lambda:InvokeFunction", "Resource": "arn:aws:lambda:region:accountID:function:publishNewBark*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:region:accountID:*" }, { "Effect": "Allow", "Action": [ "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams" ], "Resource": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/*" }, { "Effect": "Allow", "Action": [ "sns:Publish" ], "Resource": [ "*" ] } ] }

    策略有四个语句,允许 WooferLambdaRole 执行以下操作:

    • 执行 Lambda 函数 (publishNewBark)。您将在本教程的后面部分中创建函数。

    • 访问 CloudWatch Logs。Lambda 函数在运行时将诊断信息写入到 CloudWatch Logs。

    • BarkTable 的 DynamoDB 流读取数据。

    • 发布消息到 Amazon SNS。

  4. 键入以下命令以将策略附加到 WooferLambdaRole

    Copy
    aws iam put-role-policy --role-name WooferLambdaRole \ --policy-name WooferLambdaRolePolicy \ --policy-document file://role-policy.json

步骤 3:创建 Amazon SNS 主题

在此步骤中,您将创建 Amazon SNS 主题 (wooferTopic) 并使用电子邮件地址订阅该主题。您的 Lambda 函数将使用此主题发布来自 Woofer 用户的新 bark。

  1. 键入以下命令以创建新 Amazon SNS 主题:

    Copy
    aws sns create-topic --name wooferTopic
  2. 键入以下命令以使用电子邮件地址订阅 wooferTopic。(使用您的 AWS 区域和账户 ID 替换 regionaccountID,并使用有效的电子邮件地址替换 example@example.com。)

    Copy
    aws sns subscribe \ --topic-arn arn:aws:sns:region:accountID:wooferTopic \ --protocol email \ --notification-endpoint example@example.com
  3. Amazon SNS 将向您的电子邮件地址发送确认邮件。单击该邮件中的 Confirm subscription 链接以完成订阅过程。

步骤 4:创建并测试 Lambda 函数

在此步骤中,您将创建 AWS Lambda 函数 (publishNewBark) 以处理来自 BarkTable 的流记录。

publishNewBark 函数仅处理与 BarkTable 中的新项目对应的流事件。该函数从此类事件读取数据,然后调用 Amazon SNS 以发布该事件。

  1. 使用以下内容创建名为 publishNewBark.js 的文件:(使用您的 AWS 区域和账户 ID 替换 regionaccountID。)

    Copy
    'use strict'; var AWS = require("aws-sdk"); var sns = new AWS.SNS(); exports.handler = (event, context, callback) => { event.Records.forEach((record) => { console.log('Stream record: ', JSON.stringify(record, null, 2)); if (record.eventName == 'INSERT') { var who = JSON.stringify(record.dynamodb.NewImage.Username.S); var when = JSON.stringify(record.dynamodb.NewImage.Timestamp.S); var what = JSON.stringify(record.dynamodb.NewImage.Message.S); var params = { Subject: 'A new bark from ' + who, Message: 'Woofer user ' + who + ' barked the following at ' + when + ':\n\n ' + what, TopicArn: 'arn:aws:sns:region:accountID:wooferTopic' }; sns.publish(params, function(err, data) { if (err) { console.error("Unable to send message. Error JSON:", JSON.stringify(err, null, 2)); } else { console.log("Results from sending message: ", JSON.stringify(data, null, 2)); } }); } }); callback(null, `Successfully processed ${event.Records.length} records.`); };
  2. 创建包含 publishNewBark.js 的 zip 文件。如果您有 zip 命令行实用程序,则可以键入以下命令来完成此操作:

    Copy
    zip publishNewBark.zip publishNewBark.js
  3. 在创建 Lambda 函数时,为 WooferLambdaRole 指定您在步骤 2:创建 Lambda 执行角色中创建的 ARN。键入以下命令检索此 ARN:

    Copy
    aws iam get-role --role-name WooferLambdaRole

    在输出中,查找 WooferLambdaRole 的 ARN:

    Copy
    ... "Arn": "arn:aws:iam::region:role/service-role/WooferLambdaRole" ...

    现在键入以下命令来创建 Lambda 函数。(使用 WooferLambdaRole 的 ARN 替换 roleARN。)

    Copy
    aws lambda create-function \ --region us-east-1 \ --function-name publishNewBark \ --zip-file fileb://publishNewBark.zip \ --role roleARN \ --handler publishNewBark.handler \ --timeout 5 \ --runtime nodejs4.3
  4. 现在您将测试 publishNewBark 以确保它正常工作。为此,您将提供类似于来自 DynamoDB 流 的真实记录的输入。

    使用以下内容创建名为 payload.json 的文件。

    Copy
    { "Records": [ { "eventID": "7de3041dd709b024af6f29e4fa13d34c", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-west-2", "dynamodb": { "ApproximateCreationDateTime": 1479499740, "Keys": { "Timestamp": { "S": "2016-11-18:12:09:36" }, "Username": { "S": "John Doe" } }, "NewImage": { "Timestamp": { "S": "2016-11-18:12:09:36" }, "Message": { "S": "This is a bark from the Woofer social network" }, "Username": { "S": "John Doe" } }, "SequenceNumber": "13021600000000001596893679", "SizeBytes": 112, "StreamViewType": "NEW_IMAGE" }, "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/BarkTable/stream/2016-11-16T20:42:48.104" } ] }

    键入以下命令以测试 publishNewBark 函数:

    Copy
    aws lambda invoke --function-name publishNewBark --payload file://payload.json output.txt

    如果测试成功,您将看到以下输出:

    Copy
    { "StatusCode": 200 }

    此外,output.txt 文件将包含此文本:

    Copy
    "Successfully processed 1 records."

    您还会在数分钟内收到一封新电子邮件。

    注意

    AWS Lambda 将诊断信息写入到 Amazon CloudWatch Logs。如果您的 Lambda 函数出现错误,可以使用这些诊断信息排除故障:

    1. 通过以下网址打开 CloudWatch 控制台:https://console.amazonaws.cn/cloudwatch/

    2. 在导航窗格中,选择 Logs

    3. 选择下列日志组:/aws/lambda/publishNewBark

    4. 选择最新日志流以查看函数输出 (以及错误)。

步骤 5:创建并测试触发器

步骤 4:创建并测试 Lambda 函数 中,您测试了 Lambda 函数以确保它正确运行。在此步骤中,您通过将 Lambda 函数 (publishNewBark) 与事件源 (BarkTable 流) 关联来创建触发器

  1. 在创建触发器时,您需要为 BarkTable 流指定 ARN。键入以下命令检索此 ARN:

    Copy
    aws dynamodb describe-table --table-name BarkTable

    在输出中,查找 LatestStreamArn

    Copy
    ... "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp ...
  2. 键入以下命令以创建触发器。(使用实际流 ARN 替换 streamARN。)

    Copy
    aws lambda create-event-source-mapping \ --region us-east-1 \ --function-name publishNewBark \ --event-source streamARN \ --batch-size 1 \ --starting-position TRIM_HORIZON
  3. 现在您将测试触发器。键入以下命令以将项目添加到 BarkTable

    Copy
    aws dynamodb put-item \ --table-name BarkTable \ --item Username={S="Jane Doe"},Timestamp={S="2016-11-18:14:32:17"},Message={S="Testing...1...2...3"}

    您应在数分钟内收到一封新电子邮件。

  4. 转到 DynamoDB 控制台并再添加几个项目到 BarkTable。您必须为 UsernameTimestamp 属性指定值。(您还应为 Message 指定值,虽然该值并非必需。)对于添加到 BarkTable 中的每个项目,您应收到一封新电子邮件。

    Lambda 函数仅处理您添加到 BarkTable 的新项目。如果您在表中更新或删除项目,函数不执行任何操作。

注意

AWS Lambda 将诊断信息写入到 Amazon CloudWatch Logs。如果您的 Lambda 函数出现错误,可以使用这些诊断信息排除故障。

  1. 通过以下网址打开 CloudWatch 控制台:https://console.amazonaws.cn/cloudwatch/

  2. 在导航窗格中,选择 Logs

  3. 选择下列日志组:/aws/lambda/publishNewBark

  4. 选择最新日志流以查看函数输出 (以及错误)。