教程:将 Amazon Lambda 与 Amazon DynamoDB Streams 结合使用 - Amazon Lambda
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

教程:将 Amazon Lambda 与 Amazon DynamoDB Streams 结合使用

在本教程中,您将创建 Lambda 函数处理来自 Amazon DynamoDB Streams 的事件。

先决条件

本教程假设您对 Lambda 基本操作和 Lambda 控制台有一定了解。如果您还没有了解,请按照 使用控制台创建 Lambda 函数 中的说明创建您的第一个 Lambda 函数。

要完成以下步骤,您需要 Amazon Command Line Interface(Amazon CLI)版本 2。在单独的数据块中列出了命令和预期输出:

aws --version

您应看到以下输出:

aws-cli/2.13.27 Python/3.11.6 Linux/4.14.328-248.540.amzn2.x86_64 exe/x86_64.amzn.2

对于长命令,使用转义字符 (\) 将命令拆分为多行。

在 Linux 和 macOS 中,可使用您首选的 shell 和程序包管理器。

注意

在 Windows 中,操作系统的内置终端不支持您经常与 Lambda 一起使用的某些 Bash CLI 命令(例如 zip)。安装 Windows Subsystem for Linux,获取 Ubuntu 和 Bash 与 Windows 集成的版本。本指南中的示例 CLI 命令使用 Linux 格式。如果您使用的是 Windows CLI,则必须重新格式化包含内联 JSON 文档的命令。

创建执行角色

创建执行角色,向您的函数授予访问 Amazon 资源的权限。

创建执行角色
  1. 在 IAM 控制台中,打开 Roles(角色)页面

  2. 选择创建角色

  3. 创建具有以下属性的角色。

    • Trusted entity(可信任的实体)– Lambda。

    • Permissions(权限)– AWSLambdaDynamoDBExecutionRole

    • Role name(角色名称)– lambda-dynamodb-role

AWSLambdaDynamoDBExecutionRole 具有该函数所需的权限以从 DynamoDB 中读取项目并将日志写入 CloudWatch Logs。

创建函数

创建一个 Lambda 函数来处理 DynamoDB 事件。函数代码会将一些传入的事件数据写入 CloudWatch Logs。

.NET
Amazon SDK for .NET
注意

在 GitHub 上查看更多内容。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

通过 .NET 将 DynamoDB 事件与 Lambda 结合使用。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text.Json; using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.DynamoDBEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace AWSLambda_DDB; public class Function { public void FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context) { context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records..."); foreach (var record in dynamoEvent.Records) { context.Logger.LogInformation($"Event ID: {record.EventID}"); context.Logger.LogInformation($"Event Name: {record.EventName}"); context.Logger.LogInformation(JsonSerializer.Serialize(record)); } context.Logger.LogInformation("Stream processing complete."); } }
Go
适用于 Go V2 的 SDK
注意

在 GitHub 上查看更多内容。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 Go 将 DynamoDB 事件与 Lambda 结合使用。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-lambda-go/events" "fmt" ) func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*string, error) { if len(event.Records) == 0 { return nil, fmt.Errorf("received empty event") } for _, record := range event.Records { LogDynamoDBRecord(record) } message := fmt.Sprintf("Records processed: %d", len(event.Records)) return &message, nil } func main() { lambda.Start(HandleRequest) } func LogDynamoDBRecord(record events.DynamoDBEventRecord){ fmt.Println(record.EventID) fmt.Println(record.EventName) fmt.Printf("%+v\n", record.Change) }
Java
SDK for Java 2.x
注意

在 GitHub 上查看更多内容。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 Java 将 DynamoDB 事件与 Lambda 结合使用。

import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord; import com.google.gson.Gson; import com.google.gson.GsonBuilder; public class example implements RequestHandler<DynamodbEvent, Void> { private static final Gson GSON = new GsonBuilder().setPrettyPrinting().create(); @Override public Void handleRequest(DynamodbEvent event, Context context) { System.out.println(GSON.toJson(event)); event.getRecords().forEach(this::logDynamoDBRecord); return null; } private void logDynamoDBRecord(DynamodbStreamRecord record) { System.out.println(record.getEventID()); System.out.println(record.getEventName()); System.out.println("DynamoDB Record: " + GSON.toJson(record.getDynamodb())); } }
JavaScript
适用于 JavaScript 的 SDK (v3)
注意

在 GitHub 上查看更多内容。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 JavaScript 将 DynamoDB 事件与 Lambda 结合使用。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 exports.handler = async (event, context) => { console.log(JSON.stringify(event, null, 2)); event.Records.forEach(record => { logDynamoDBRecord(record); }); }; const logDynamoDBRecord = (record) => { console.log(record.eventID); console.log(record.eventName); console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`); };

使用 TypeScript 将 DynamoDB 事件与 Lambda 结合使用。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 export const handler = async (event, context) => { console.log(JSON.stringify(event, null, 2)); event.Records.forEach(record => { logDynamoDBRecord(record); }); } const logDynamoDBRecord = (record) => { console.log(record.eventID); console.log(record.eventName); console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`); };
PHP
适用于 PHP 的 SDK
注意

在 GitHub 上查看更多内容。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 PHP 将 DynamoDB 事件与 Lambda 结合使用。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 <?php # using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\DynamoDb\DynamoDbEvent; use Bref\Event\DynamoDb\DynamoDbHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler extends DynamoDbHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handleDynamoDb(DynamoDbEvent $event, Context $context): void { $this->logger->info("Processing DynamoDb table items"); $records = $event->getRecords(); foreach ($records as $record) { $eventName = $record->getEventName(); $keys = $record->getKeys(); $old = $record->getOldImage(); $new = $record->getNewImage(); $this->logger->info("Event Name:".$eventName."\n"); $this->logger->info("Keys:". json_encode($keys)."\n"); $this->logger->info("Old Image:". json_encode($old)."\n"); $this->logger->info("New Image:". json_encode($new)); // TODO: Do interesting work based on the new data // Any exception thrown will be logged and the invocation will be marked as failed } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords items"); } } $logger = new StderrLogger(); return new Handler($logger);
Python
SDK for Python(Boto3)
注意

在 GitHub 上查看更多内容。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

通过 Python 将 DynamoDB 事件与 Lambda 结合使用。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import json def lambda_handler(event, context): print(json.dumps(event, indent=2)) for record in event['Records']: log_dynamodb_record(record) def log_dynamodb_record(record): print(record['eventID']) print(record['eventName']) print(f"DynamoDB Record: {json.dumps(record['dynamodb'])}")
Ruby
适用于 Ruby 的 SDK
注意

在 GitHub 上查看更多内容。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

通过 Ruby 将 DynamoDB 事件与 Lambda 结合使用。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def lambda_handler(event:, context:) return 'received empty event' if event['Records'].empty? event['Records'].each do |record| log_dynamodb_record(record) end "Records processed: #{event['Records'].length}" end def log_dynamodb_record(record) puts record['eventID'] puts record['eventName'] puts "DynamoDB Record: #{JSON.generate(record['dynamodb'])}" end
Rust
适用于 Rust 的 SDK
注意

在 GitHub 上查看更多内容。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 Rust 将 DynamoDB 事件与 Lambda 结合使用。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use lambda_runtime::{service_fn, tracing, Error, LambdaEvent}; use aws_lambda_events::{ event::dynamodb::{Event, EventRecord}, }; // Built with the following dependencies: //lambda_runtime = "0.11.1" //serde_json = "1.0" //tokio = { version = "1", features = ["macros"] } //tracing = { version = "0.1", features = ["log"] } //tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] } //aws_lambda_events = "0.15.0" async fn function_handler(event: LambdaEvent<Event>) ->Result<(), Error> { let records = &event.payload.records; tracing::info!("event payload: {:?}",records); if records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(()); } for record in records{ log_dynamo_dbrecord(record); } tracing::info!("Dynamo db records processed"); // Prepare the response Ok(()) } fn log_dynamo_dbrecord(record: &EventRecord)-> Result<(), Error>{ tracing::info!("EventId: {}", record.event_id); tracing::info!("EventName: {}", record.event_name); tracing::info!("DynamoDB Record: {:?}", record.change ); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) .with_target(false) .without_time() .init(); let func = service_fn(function_handler); lambda_runtime::run(func).await?; Ok(()) }
创建函数
  1. 将示例代码复制到名为 example.js 的文件中。

  2. 创建部署程序包。

    zip function.zip example.js
  3. 使用 create-function 命令创建 Lambda 函数。

    aws lambda create-function --function-name ProcessDynamoDBRecords \ --zip-file fileb://function.zip --handler example.handler --runtime nodejs18.x \ --role arn:aws:iam::111122223333:role/lambda-dynamodb-role

测试 Lambda 函数

在本步骤中,您将使用 invoke Amazon Lambda CLI 命令和以下示例 DynamoDB 事件手动调用 Lambda 函数。将以下内容复制到名为 input.txt 的文件中。

例 input.txt
{ "Records":[ { "eventID":"1", "eventName":"INSERT", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"111", "SizeBytes":26, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"2", "eventName":"MODIFY", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"222", "SizeBytes":59, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"3", "eventName":"REMOVE", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "SequenceNumber":"333", "SizeBytes":38, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" } ] }

运行以下 invoke 命令:

aws lambda invoke --function-name ProcessDynamoDBRecords \ --cli-binary-format raw-in-base64-out \ --payload file://input.txt outputfile.txt

如果使用 cli-binary-format 版本 2,则 Amazon CLI 选项是必需的。要将其设为默认设置,请运行 aws configure set cli-binary-format raw-in-base64-out。有关更多信息,请参阅版本 2 的 Amazon Command Line Interface 用户指南中的 Amazon CLI 支持的全局命令行选项

函数在响应正文中返回字符串 message

outputfile.txt 文件中验证输出。

创建启用流的 DynamoDB 表

创建启用流的 Amazon DynamoDB 表。

创建 DynamoDB 表
  1. 打开 DynamoDB 控制台

  2. 选择 Create Table

  3. 使用以下设置创建表。

    • Table name(表名称)- lambda-dynamodb-stream

    • Primary key(主键)– id(字符串)

  4. 选择创建

启用流
  1. 打开 DynamoDB 控制台

  2. 选择

  3. 选择 lambda-dynamodb-stream 表。

  4. Exports and streams(导出和流)下,选择 DynamoDB stream details(DynamoDB 流详细信息)。

  5. 选择打开

  6. 对于视图类型,选择仅键属性

  7. 选择开启流

记下流 ARN。在下一步中将该流与 Lambda 函数关联时,您将需要此类信息。有关启用流的更多信息,请参阅使用 DynamoDB Streams 捕获表活动

在 Amazon Lambda 中添加事件源

在 Amazon Lambda 中创建事件源映射。此事件源映射将 DynamoDB Streams 与 Lambda 函数关联。创建此事件源映射后,Amazon Lambda 即开始轮询该流。

运行以下 Amazon CLI create-event-source-mapping 命令。命令运行后,记下 UUID。在任何命令中,如删除事件源映射时,您都需要该 UUID 来引用事件源映射。

aws lambda create-event-source-mapping --function-name ProcessDynamoDBRecords \ --batch-size 100 --starting-position LATEST --event-source DynamoDB-stream-arn

这会在指定的 DynamoDB Streams 和 Lambda 函数之间创建映射。您可将一个 DynamoDB Streams 关联到多个 Lambda 函数,也可将同一个 Lambda 函数关联到多个流。但是,Lambda 函数将共享其所共享的流的读取吞吐量。

您可以通过运行以下命令获取事件源映射的列表。

aws lambda list-event-source-mappings

该列表返回您创建的所有事件源映射,而对于每个映射,它都显示 LastProcessingResult 等信息。该字段用于在出现任何问题时提供信息性消息。No records processed(指示 Amazon Lambda 未开始轮询或流中没有任何记录)和 OK(指示 Amazon Lambda 已成功读取流中的记录并已调用 Lambda 函数)等值表示未出现任何问题。如果出现问题,您将收到一条错误消息。

如果您有大量事件源映射,请使用函数名称参数缩窄结果范围。

aws lambda list-event-source-mappings --function-name ProcessDynamoDBRecords

测试设置

测试端到端体验。当您更新表时,DynamoDB 会将事件记录写入流。当 Amazon Lambda 轮询该流时,它将在流中检测新记录并通过向该函数传递事件来代表您调用 Lambda 函数。

  1. 在 DynamoDB 控制台中,添加、更新、删除表中的项目。DynamoDB 会将这些操作记录写入流。

  2. Amazon Lambda 会轮询该流,当检测到流有更新时,它会通过传递在流中发现的事件数据来调用 Lambda 函数。

  3. 函数运行并在 Amazon CloudWatch 中创建日志。您可以验证 Amazon CloudWatch 控制台中报告的日志。

清除资源

除非您想要保留为本教程创建的资源,否则可立即将其删除。通过删除您不再使用的 Amazon 资源,可防止您的 Amazon Web Services 账户 产生不必要的费用。

删除 Lambda 函数
  1. 打开 Lamba 控制台的 Functions(函数)页面

  2. 选择您创建的函数。

  3. 依次选择操作删除

  4. 在文本输入字段中键入 delete,然后选择删除

删除执行角色
  1. 打开 IAM 控制台的角色页面

  2. 选择您创建的执行角色。

  3. 选择删除

  4. 在文本输入字段中输入角色名称,然后选择删除

删除 DynamoDB 表
  1. 打开 DynamoDB 控制台中 Tables page(表页面)。

  2. 选择您创建的表。

  3. 选择 删除

  4. 在文本框中输入 delete

  5. 选择 删除表