Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅
中国的 Amazon Web Services 服务入门
(PDF)。
教程:将 Amazon Lambda 与 Amazon DynamoDB Streams 结合使用
在本教程中,您将创建 Lambda 函数处理来自 Amazon DynamoDB Streams 的事件。
先决条件
本教程假设您对 Lambda 基本操作和 Lambda 控制台有一定了解。如果您还没有了解,请按照 使用控制台创建 Lambda 函数 中的说明创建您的第一个 Lambda 函数。
要完成以下步骤,您需要 Amazon CLI 版本 2。在单独的数据块中列出了命令和预期输出:
aws --version
您应看到以下输出:
aws-cli/2.13.27 Python/3.11.6 Linux/4.14.328-248.540.amzn2.x86_64 exe/x86_64.amzn.2
对于长命令,使用转义字符 (\
) 将命令拆分为多行。
在 Linux 和 macOS 中,可使用您首选的 shell 和程序包管理器。
在 Windows 中,操作系统的内置终端不支持您经常与 Lambda 一起使用的某些 Bash CLI 命令(例如 zip
)。安装 Windows Subsystem for Linux,获取 Ubuntu 和 Bash 与 Windows 集成的版本。本指南中的示例 CLI 命令使用 Linux 格式。如果您使用的是 Windows CLI,则必须重新格式化包含内联 JSON 文档的命令。
创建执行角色
创建执行角色,向您的函数授予访问 Amazon 资源的权限。
创建执行角色
-
在 IAM 控制台中,打开 Roles(角色)页面。
-
选择创建角色。
-
创建具有以下属性的角色。
-
Trusted entity(可信任的实体)– Lambda。
-
Permissions(权限)– AWSLambdaDynamoDBExecutionRole。
-
Role name(角色名称)– lambda-dynamodb-role
。
AWSLambdaDynamoDBExecutionRole 具有该函数所需的权限以从 DynamoDB 中读取项目并将日志写入 CloudWatch Logs。
创建函数
创建一个 Lambda 函数来处理 DynamoDB 事件。函数代码会将一些传入的事件数据写入 CloudWatch Logs。
- .NET
-
- Amazon SDK for .NET
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
通过 .NET 将 DynamoDB 事件与 Lambda 结合使用。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text.Json;
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace AWSLambda_DDB;
public class Function
{
public void FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)
{
context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records...");
foreach (var record in dynamoEvent.Records)
{
context.Logger.LogInformation($"Event ID: {record.EventID}");
context.Logger.LogInformation($"Event Name: {record.EventName}");
context.Logger.LogInformation(JsonSerializer.Serialize(record));
}
context.Logger.LogInformation("Stream processing complete.");
}
}
- Go
-
- 适用于 Go V2 的 SDK
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
使用 Go 将 DynamoDB 事件与 Lambda 结合使用。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main
import (
"context"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-lambda-go/events"
"fmt"
)
func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*string, error) {
if len(event.Records) == 0 {
return nil, fmt.Errorf("received empty event")
}
for _, record := range event.Records {
LogDynamoDBRecord(record)
}
message := fmt.Sprintf("Records processed: %d", len(event.Records))
return &message, nil
}
func main() {
lambda.Start(HandleRequest)
}
func LogDynamoDBRecord(record events.DynamoDBEventRecord){
fmt.Println(record.EventID)
fmt.Println(record.EventName)
fmt.Printf("%+v\n", record.Change)
}
- Java
-
- 适用于 Java 的 SDK 2.x
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
使用 Java 将 DynamoDB 事件与 Lambda 结合使用。
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
public class example implements RequestHandler<DynamodbEvent, Void> {
private static final Gson GSON = new GsonBuilder().setPrettyPrinting().create();
@Override
public Void handleRequest(DynamodbEvent event, Context context) {
System.out.println(GSON.toJson(event));
event.getRecords().forEach(this::logDynamoDBRecord);
return null;
}
private void logDynamoDBRecord(DynamodbStreamRecord record) {
System.out.println(record.getEventID());
System.out.println(record.getEventName());
System.out.println("DynamoDB Record: " + GSON.toJson(record.getDynamodb()));
}
}
- JavaScript
-
- 适用于 JavaScript 的 SDK(v3)
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
使用 JavaScript 将 DynamoDB 事件与 Lambda 结合使用。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
console.log(JSON.stringify(event, null, 2));
event.Records.forEach(record => {
logDynamoDBRecord(record);
});
};
const logDynamoDBRecord = (record) => {
console.log(record.eventID);
console.log(record.eventName);
console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`);
};
使用 TypeScript 将 DynamoDB 事件与 Lambda 结合使用。
export const handler = async (event, context) => {
console.log(JSON.stringify(event, null, 2));
event.Records.forEach(record => {
logDynamoDBRecord(record);
});
}
const logDynamoDBRecord = (record) => {
console.log(record.eventID);
console.log(record.eventName);
console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`);
};
- PHP
-
- 适用于 PHP 的 SDK
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
使用 PHP 将 DynamoDB 事件与 Lambda 结合使用。
<?php
# using bref/bref and bref/logger for simplicity
use Bref\Context\Context;
use Bref\Event\DynamoDb\DynamoDbEvent;
use Bref\Event\DynamoDb\DynamoDbHandler;
use Bref\Logger\StderrLogger;
require __DIR__ . '/vendor/autoload.php';
class Handler extends DynamoDbHandler
{
private StderrLogger $logger;
public function __construct(StderrLogger $logger)
{
$this->logger = $logger;
}
/**
* @throws JsonException
* @throws \Bref\Event\InvalidLambdaEvent
*/
public function handleDynamoDb(DynamoDbEvent $event, Context $context): void
{
$this->logger->info("Processing DynamoDb table items");
$records = $event->getRecords();
foreach ($records as $record) {
$eventName = $record->getEventName();
$keys = $record->getKeys();
$old = $record->getOldImage();
$new = $record->getNewImage();
$this->logger->info("Event Name:".$eventName."\n");
$this->logger->info("Keys:". json_encode($keys)."\n");
$this->logger->info("Old Image:". json_encode($old)."\n");
$this->logger->info("New Image:". json_encode($new));
// TODO: Do interesting work based on the new data
// Any exception thrown will be logged and the invocation will be marked as failed
}
$totalRecords = count($records);
$this->logger->info("Successfully processed $totalRecords items");
}
}
$logger = new StderrLogger();
return new Handler($logger);
- Python
-
- 适用于 Python 的 SDK(Boto3)
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
通过 Python 将 DynamoDB 事件与 Lambda 结合使用。
import json
def lambda_handler(event, context):
print(json.dumps(event, indent=2))
for record in event['Records']:
log_dynamodb_record(record)
def log_dynamodb_record(record):
print(record['eventID'])
print(record['eventName'])
print(f"DynamoDB Record: {json.dumps(record['dynamodb'])}")
- Ruby
-
- 适用于 Ruby 的 SDK
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
通过 Ruby 将 DynamoDB 事件与 Lambda 结合使用。
def lambda_handler(event:, context:)
return 'received empty event' if event['Records'].empty?
event['Records'].each do |record|
log_dynamodb_record(record)
end
"Records processed: #{event['Records'].length}"
end
def log_dynamodb_record(record)
puts record['eventID']
puts record['eventName']
puts "DynamoDB Record: #{JSON.generate(record['dynamodb'])}"
end
- Rust
-
- 适用于 Rust 的 SDK
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
使用 Rust 将 DynamoDB 事件与 Lambda 结合使用。
use lambda_runtime::{service_fn, tracing, Error, LambdaEvent};
use aws_lambda_events::{
event::dynamodb::{Event, EventRecord},
};
// Built with the following dependencies:
//lambda_runtime = "0.11.1"
//serde_json = "1.0"
//tokio = { version = "1", features = ["macros"] }
//tracing = { version = "0.1", features = ["log"] }
//tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] }
//aws_lambda_events = "0.15.0"
async fn function_handler(event: LambdaEvent<Event>) ->Result<(), Error> {
let records = &event.payload.records;
tracing::info!("event payload: {:?}",records);
if records.is_empty() {
tracing::info!("No records found. Exiting.");
return Ok(());
}
for record in records{
log_dynamo_dbrecord(record);
}
tracing::info!("Dynamo db records processed");
// Prepare the response
Ok(())
}
fn log_dynamo_dbrecord(record: &EventRecord)-> Result<(), Error>{
tracing::info!("EventId: {}", record.event_id);
tracing::info!("EventName: {}", record.event_name);
tracing::info!("DynamoDB Record: {:?}", record.change );
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_target(false)
.without_time()
.init();
let func = service_fn(function_handler);
lambda_runtime::run(func).await?;
Ok(())
}
创建函数
-
将示例代码复制到名为 example.js
的文件中。
-
创建部署程序包。
zip function.zip example.js
-
使用 create-function
命令创建 Lambda 函数。
aws lambda create-function --function-name ProcessDynamoDBRecords \
--zip-file fileb://function.zip --handler example.handler --runtime nodejs18.x \
--role arn:aws:iam::111122223333
:role/lambda-dynamodb-role
测试 Lambda 函数
在本步骤中,您将使用 invoke
Amazon Lambda CLI 命令和以下示例 DynamoDB 事件手动调用 Lambda 函数。将以下内容复制到名为 input.txt
的文件中。
例 input.txt
{
"Records":[
{
"eventID":"1",
"eventName":"INSERT",
"eventVersion":"1.0",
"eventSource":"aws:dynamodb",
"awsRegion":"us-east-1",
"dynamodb":{
"Keys":{
"Id":{
"N":"101"
}
},
"NewImage":{
"Message":{
"S":"New item!"
},
"Id":{
"N":"101"
}
},
"SequenceNumber":"111",
"SizeBytes":26,
"StreamViewType":"NEW_AND_OLD_IMAGES"
},
"eventSourceARN":"stream-ARN"
},
{
"eventID":"2",
"eventName":"MODIFY",
"eventVersion":"1.0",
"eventSource":"aws:dynamodb",
"awsRegion":"us-east-1",
"dynamodb":{
"Keys":{
"Id":{
"N":"101"
}
},
"NewImage":{
"Message":{
"S":"This item has changed"
},
"Id":{
"N":"101"
}
},
"OldImage":{
"Message":{
"S":"New item!"
},
"Id":{
"N":"101"
}
},
"SequenceNumber":"222",
"SizeBytes":59,
"StreamViewType":"NEW_AND_OLD_IMAGES"
},
"eventSourceARN":"stream-ARN"
},
{
"eventID":"3",
"eventName":"REMOVE",
"eventVersion":"1.0",
"eventSource":"aws:dynamodb",
"awsRegion":"us-east-1",
"dynamodb":{
"Keys":{
"Id":{
"N":"101"
}
},
"OldImage":{
"Message":{
"S":"This item has changed"
},
"Id":{
"N":"101"
}
},
"SequenceNumber":"333",
"SizeBytes":38,
"StreamViewType":"NEW_AND_OLD_IMAGES"
},
"eventSourceARN":"stream-ARN"
}
]
}
运行以下 invoke
命令:
aws lambda invoke --function-name ProcessDynamoDBRecords \
--cli-binary-format raw-in-base64-out \
--payload file://input.txt outputfile.txt
如果使用 cli-binary-format 版本 2,则 Amazon CLI 选项是必需的。要将其设为默认设置,请运行 aws configure set cli-binary-format raw-in-base64-out
。有关更多信息,请参阅版本 2 的 Amazon Command Line Interface 用户指南中的 Amazon CLI 支持的全局命令行选项。
函数在响应正文中返回字符串 message
。
在 outputfile.txt
文件中验证输出。
创建启用流的 DynamoDB 表
创建启用流的 Amazon DynamoDB 表。
创建 DynamoDB 表
-
打开 DynamoDB 控制台。
-
选择 Create Table。
-
使用以下设置创建表。
-
选择创建。
启用流
-
打开 DynamoDB 控制台。
-
选择表。
-
选择 lambda-dynamodb-stream 表。
-
在 Exports and streams(导出和流)下,选择 DynamoDB stream details(DynamoDB 流详细信息)。
-
选择打开。
-
对于视图类型,选择仅键属性。
-
选择开启流。
记下流 ARN。在下一步中将该流与 Lambda 函数关联时,您将需要此类信息。有关启用流的更多信息,请参阅使用 DynamoDB Streams 捕获表活动。
在 Amazon Lambda 中添加事件源
在 Amazon Lambda 中创建事件源映射。此事件源映射将 DynamoDB Streams 与 Lambda 函数关联。创建此事件源映射后,Amazon Lambda 即开始轮询该流。
运行以下 Amazon CLI create-event-source-mapping
命令。命令运行后,记下 UUID。在任何命令中,如删除事件源映射时,您都需要该 UUID 来引用事件源映射。
aws lambda create-event-source-mapping --function-name ProcessDynamoDBRecords \
--batch-size 100 --starting-position LATEST --event-source DynamoDB-stream-arn
这会在指定的 DynamoDB Streams 和 Lambda 函数之间创建映射。您可将一个 DynamoDB Streams 关联到多个 Lambda 函数,也可将同一个 Lambda 函数关联到多个流。但是,Lambda 函数将共享其所共享的流的读取吞吐量。
您可以通过运行以下命令获取事件源映射的列表。
aws lambda list-event-source-mappings
该列表返回您创建的所有事件源映射,而对于每个映射,它都显示 LastProcessingResult
等信息。该字段用于在出现任何问题时提供信息性消息。No records processed
(指示 Amazon Lambda 未开始轮询或流中没有任何记录)和 OK
(指示 Amazon Lambda 已成功读取流中的记录并已调用 Lambda 函数)等值表示未出现任何问题。如果出现问题,您将收到一条错误消息。
如果您有大量事件源映射,请使用函数名称参数缩窄结果范围。
aws lambda list-event-source-mappings --function-name ProcessDynamoDBRecords
测试设置
测试端到端体验。当您更新表时,DynamoDB 会将事件记录写入流。当 Amazon Lambda 轮询该流时,它将在流中检测新记录并通过向该函数传递事件来代表您调用 Lambda 函数。
-
在 DynamoDB 控制台中,添加、更新、删除表中的项目。DynamoDB 会将这些操作记录写入流。
-
Amazon Lambda 会轮询该流,当检测到流有更新时,它会通过传递在流中发现的事件数据来调用 Lambda 函数。
-
函数运行并在 Amazon CloudWatch 中创建日志。您可以验证 Amazon CloudWatch 控制台中报告的日志。
清除资源
除非您想要保留为本教程创建的资源,否则可立即将其删除。通过删除您不再使用的 Amazon 资源,可防止您的 Amazon Web Services 账户 产生不必要的费用。
删除 Lambda 函数
-
打开 Lamba 控制台的 Functions(函数)页面。
-
选择您创建的函数。
-
依次选择操作和删除。
-
在文本输入字段中键入 confirm
,然后选择删除。
删除执行角色
-
打开 IAM 控制台的角色页面。
-
选择您创建的执行角色。
-
选择删除。
-
在文本输入字段中输入角色名称,然后选择删除。
删除 DynamoDB 表
-
打开 DynamoDB 控制台中 Tables page(表页面)。
-
选择您创建的表。
-
选择删除。
-
在文本框中输入 delete
。
-
选择 删除表。