Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅
中国的 Amazon Web Services 服务入门
(PDF)。
教程:将 Amazon Lambda 与 Amazon DocumentDB 流结合使用
在本教程中,您将创建一个基本的 Lambda 函数,该函数会消耗来自 Amazon DocumentDB(与 MongoDB 兼容)更改流的事件。要完成本教程,您需要经历以下阶段:
-
设置 Amazon DocumentDB 集群,连接到该集群,并在其上激活更改流。
-
创建 Lambda 函数,并将 Amazon DocumentDB 集群配置为函数的事件源。
-
通过将项目插入到 Amazon DocumentDB 数据库中来测试设置。
创建 Amazon DocumentDB 集群
在 Secrets Manager 中创建密钥
在 Amazon DocumentDB 创建您的集群时,请创建一个 Amazon Secrets Manager 密钥来存储数据库凭证。在稍后的步骤中创建 Lambda 事件源映射时,您将提供此密钥。
在 Secrets Manager 中创建密钥
-
打开 Secrets Manager 控制台并选择存储新密钥。
-
对于选择密钥类型,请选择以下选项之一:
-
在基本详细信息下:
-
密钥类型:用于 Amazon DocumentDB 数据库的凭证
-
在凭证下,输入用于创建 Amazon DocumentDB 集群的相同用户名和密码。
-
数据库:选择 Amazon DocumentDB 集群。
-
选择下一步。
-
对于配置密钥,请选择以下选项之一:
-
密钥名称:DocumentDBSecret
-
选择下一步。
-
选择下一步。
-
选择 Store (存储)。
-
刷新控制台以验证 DocumentDBSecret
密钥是否成功存储。
记下密钥 ARN。您将在后面的步骤中用到它。
连接到集群
使用 Amazon CloudShell 连接到 Amazon DocumentDB 集群
-
在 Amazon DocumentDB 管理控制台上的集群下,找到您创建的集群。单击集群旁边的复选框,选择您的集群。
-
选择连接到集群。将出现 CloudShell 运行命令屏幕。
-
在新环境名称字段中,输入唯一的名称,例如“test”,然后选择创建并运行。
-
出现提示时请输入密码。当提示符变成 rs0 [direct: primary] <env-name>>
时,您成功连接到您的 Amazon DocumentDB 集群。
激活更改流
在本教程中,您将跟踪对 Amazon DocumentDB 集群中 docdbdemo
数据库 products
集合的更改。您可以通过激活更改流来完成此操作。
在集群内创建新数据库
-
运行以下命令,创建名为 docdbdemo
的新数据库:
use docdbdemo
-
在终端窗口中,使用以下命令将记录插入到 docdbdemo
中:
db.products.insertOne({"hello":"world"})
您应看到类似如下的输出:
{
acknowledged: true,
insertedId: ObjectId('67f85066ca526410fd531d59')
}
-
接下来,使用以下命令激活 docdbdemo
数据库 products
集合上的更改流:
db.adminCommand({modifyChangeStreams: 1,
database: "docdbdemo",
collection: "products",
enable: true});
应看到类似如下内容的输出:
{ "ok" : 1, "operationTime" : Timestamp(1680126165, 1) }
创建接口 VPC 端点
接下来,创建接口 VPC 端点,以确保 Lambda 和 Secrets Manager(稍后用于存储集群访问凭证)能够连接到默认 VPC。
创建接口 VPC 端点
-
打开 VPC 控制台。在左侧菜单的虚拟私有云下,选择端点。
-
选择创建端点。使用以下配置创建端点:
-
对于名称标签,输入 lambda-default-vpc
。
-
对于服务类别,选择 Amazon 服务。
-
对于服务,在搜索框中输入 lambda
。选择格式为 com.amazonaws.<region>.lambda
的服务。
-
对于 VPC,选择您的 Amazon DocumentDB 集群所在的 VPC。这通常是默认 VPC。
-
对于子网,选中每个可用区旁边的复选框。为每个可用区选择正确的子网 ID。
-
对于 IP 地址类型,选择 IPv4。
-
对于安全组,选择 Amazon DocumentDB 集群使用的安全组。这通常是 default
安全组。
-
保留所有其他默认设置。
-
选择创建端点。
-
再次选择创建端点。使用以下配置创建端点:
-
对于名称标签,输入 secretsmanager-default-vpc
。
-
对于服务类别,选择 Amazon 服务。
-
对于服务,在搜索框中输入 secretsmanager
。选择格式为 com.amazonaws.<region>.secretsmanager
的服务。
-
对于 VPC,选择您的 Amazon DocumentDB 集群所在的 VPC。这通常是默认 VPC。
-
对于子网,选中每个可用区旁边的复选框。为每个可用区选择正确的子网 ID。
-
对于 IP 地址类型,选择 IPv4。
-
对于安全组,选择 Amazon DocumentDB 集群使用的安全组。这通常是 default
安全组。
-
保留所有其他默认设置。
-
选择创建端点。
本教程的集群设置部分到此完成。
创建执行角色
在接下来的一组步骤中,您将创建 Lambda 函数。首先,您需要创建执行角色,以向函数授予访问集群的权限。为此,您可以先创建 IAM policy,然后将此策略附加到 IAM 角色。
创建 IAM policy
-
在 IAM 控制台中打开策略页面,然后选择创建策略。
-
选择 JSON 选项卡。在以下策略中,将语句最后一行中的 Secrets Manager 资源 ARN 替换为之前的密钥 ARN,然后将策略复制到编辑器中。
- JSON
-
-
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "LambdaESMNetworkingAccess",
"Effect": "Allow",
"Action": [
"ec2:CreateNetworkInterface",
"ec2:DescribeNetworkInterfaces",
"ec2:DescribeVpcs",
"ec2:DeleteNetworkInterface",
"ec2:DescribeSubnets",
"ec2:DescribeSecurityGroups",
"kms:Decrypt"
],
"Resource": "*"
},
{
"Sid": "LambdaDocDBESMAccess",
"Effect": "Allow",
"Action": [
"rds:DescribeDBClusters",
"rds:DescribeDBClusterParameters",
"rds:DescribeDBSubnetGroups"
],
"Resource": "*"
},
{
"Sid": "LambdaDocDBESMGetSecretValueAccess",
"Effect": "Allow",
"Action": [
"secretsmanager:GetSecretValue"
],
"Resource": "arn:aws:secretsmanager:us-east-1:123456789012:secret:DocumentDBSecret
"
}
]
}
-
选择下一步:标签,然后选择下一步:审核。
-
对于 Name (名称),请输入 AWSDocumentDBLambdaPolicy
。
-
选择创建策略。
创建 IAM 角色
-
在 IAM 控制台中打开角色页面,然后选择创建角色。
-
对于选择可信实体,请选择以下选项之一:
-
可信实体类型:Amazon 服务
-
服务或使用案例:Lambda
-
选择下一步。
-
对于添加权限,选择刚刚创建的 AWSDocumentDBLambdaPolicy
策略以及 AWSLambdaBasicExecutionRole
,以向函数授予写入 Amazon CloudWatch Logs 的权限。
-
选择下一步。
-
对于 Role name(角色名称),输入 AWSDocumentDBLambdaExecutionRole
。
-
请选择 Create role(创建角色)。
创建 Lambda 函数
本教程使用 Python 3.13 运行时系统,但我们还提供了适用于其他运行时系统的示例代码文件。您可以选择以下框中的选项卡,查看适用于您感兴趣的运行时系统的代码。
代码接收 Amazon DocumentDB 事件输入并对其所包含的消息进行处理。
创建 Lambda 函数
-
打开 Lamba 控制台的 Functions page(函数页面)。
-
选择 Create function(创建函数)。
-
选择从头开始编写。
-
在基本信息中,执行以下操作:
-
对于函数名称,输入 ProcessDocumentDBRecords
。
-
对于运行时,选择 Python 3.13。
-
对于架构,选择 x86_64。
-
在更改默认执行角色选项卡中,执行以下操作:
-
展开选项卡,然后选择使用现有角色。
-
选择您之前创建的 AWSDocumentDBLambdaExecutionRole
。
-
选择创建函数。
要部署函数代码
-
在下框中选择 Python 选项卡并复制代码。
- .NET
-
- 适用于 .NET 的 Amazon SDK
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
使用 .NET 将 Amazon DocumentDB 事件与 Lambda 结合使用。
using Amazon.Lambda.Core;
using System.Text.Json;
using System;
using System.Collections.Generic;
using System.Text.Json.Serialization;
//Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace LambdaDocDb;
public class Function
{
/// <summary>
/// Lambda function entry point to process Amazon DocumentDB events.
/// </summary>
/// <param name="event">The Amazon DocumentDB event.</param>
/// <param name="context">The Lambda context object.</param>
/// <returns>A string to indicate successful processing.</returns>
public string FunctionHandler(Event evnt, ILambdaContext context)
{
foreach (var record in evnt.Events)
{
ProcessDocumentDBEvent(record, context);
}
return "OK";
}
private void ProcessDocumentDBEvent(DocumentDBEventRecord record, ILambdaContext context)
{
var eventData = record.Event;
var operationType = eventData.OperationType;
var databaseName = eventData.Ns.Db;
var collectionName = eventData.Ns.Coll;
var fullDocument = JsonSerializer.Serialize(eventData.FullDocument, new JsonSerializerOptions { WriteIndented = true });
context.Logger.LogLine($"Operation type: {operationType}");
context.Logger.LogLine($"Database: {databaseName}");
context.Logger.LogLine($"Collection: {collectionName}");
context.Logger.LogLine($"Full document:\n{fullDocument}");
}
public class Event
{
[JsonPropertyName("eventSourceArn")]
public string EventSourceArn { get; set; }
[JsonPropertyName("events")]
public List<DocumentDBEventRecord> Events { get; set; }
[JsonPropertyName("eventSource")]
public string EventSource { get; set; }
}
public class DocumentDBEventRecord
{
[JsonPropertyName("event")]
public EventData Event { get; set; }
}
public class EventData
{
[JsonPropertyName("_id")]
public IdData Id { get; set; }
[JsonPropertyName("clusterTime")]
public ClusterTime ClusterTime { get; set; }
[JsonPropertyName("documentKey")]
public DocumentKey DocumentKey { get; set; }
[JsonPropertyName("fullDocument")]
public Dictionary<string, object> FullDocument { get; set; }
[JsonPropertyName("ns")]
public Namespace Ns { get; set; }
[JsonPropertyName("operationType")]
public string OperationType { get; set; }
}
public class IdData
{
[JsonPropertyName("_data")]
public string Data { get; set; }
}
public class ClusterTime
{
[JsonPropertyName("$timestamp")]
public Timestamp Timestamp { get; set; }
}
public class Timestamp
{
[JsonPropertyName("t")]
public long T { get; set; }
[JsonPropertyName("i")]
public int I { get; set; }
}
public class DocumentKey
{
[JsonPropertyName("_id")]
public Id Id { get; set; }
}
public class Id
{
[JsonPropertyName("$oid")]
public string Oid { get; set; }
}
public class Namespace
{
[JsonPropertyName("db")]
public string Db { get; set; }
[JsonPropertyName("coll")]
public string Coll { get; set; }
}
}
- Go
-
- 适用于 Go V2 的 SDK
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
使用 Go 将 Amazon DocumentDB 事件与 Lambda 结合使用。
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/aws/aws-lambda-go/lambda"
)
type Event struct {
Events []Record `json:"events"`
}
type Record struct {
Event struct {
OperationType string `json:"operationType"`
NS struct {
DB string `json:"db"`
Coll string `json:"coll"`
} `json:"ns"`
FullDocument interface{} `json:"fullDocument"`
} `json:"event"`
}
func main() {
lambda.Start(handler)
}
func handler(ctx context.Context, event Event) (string, error) {
fmt.Println("Loading function")
for _, record := range event.Events {
logDocumentDBEvent(record)
}
return "OK", nil
}
func logDocumentDBEvent(record Record) {
fmt.Printf("Operation type: %s\n", record.Event.OperationType)
fmt.Printf("db: %s\n", record.Event.NS.DB)
fmt.Printf("collection: %s\n", record.Event.NS.Coll)
docBytes, _ := json.MarshalIndent(record.Event.FullDocument, "", " ")
fmt.Printf("Full document: %s\n", string(docBytes))
}
- Java
-
- 适用于 Java 的 SDK 2.x
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
使用 Java 将 Amazon DocumentDB 事件与 Lambda 结合使用。
import java.util.List;
import java.util.Map;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
public class Example implements RequestHandler<Map<String, Object>, String> {
@SuppressWarnings("unchecked")
@Override
public String handleRequest(Map<String, Object> event, Context context) {
List<Map<String, Object>> events = (List<Map<String, Object>>) event.get("events");
for (Map<String, Object> record : events) {
Map<String, Object> eventData = (Map<String, Object>) record.get("event");
processEventData(eventData);
}
return "OK";
}
@SuppressWarnings("unchecked")
private void processEventData(Map<String, Object> eventData) {
String operationType = (String) eventData.get("operationType");
System.out.println("operationType: %s".formatted(operationType));
Map<String, Object> ns = (Map<String, Object>) eventData.get("ns");
String db = (String) ns.get("db");
System.out.println("db: %s".formatted(db));
String coll = (String) ns.get("coll");
System.out.println("coll: %s".formatted(coll));
Map<String, Object> fullDocument = (Map<String, Object>) eventData.get("fullDocument");
System.out.println("fullDocument: %s".formatted(fullDocument));
}
}
- JavaScript
-
- 适用于 JavaScript 的 SDK(v3)
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
使用 JavaScript 将 Amazon DocumentDB 事件与 Lambda 结合使用。
console.log('Loading function');
exports.handler = async (event, context) => {
event.events.forEach(record => {
logDocumentDBEvent(record);
});
return 'OK';
};
const logDocumentDBEvent = (record) => {
console.log('Operation type: ' + record.event.operationType);
console.log('db: ' + record.event.ns.db);
console.log('collection: ' + record.event.ns.coll);
console.log('Full document:', JSON.stringify(record.event.fullDocument, null, 2));
};
使用 TypeScript 将 Amazon DocumentDB 事件与 Lambda 结合使用
import { DocumentDBEventRecord, DocumentDBEventSubscriptionContext } from 'aws-lambda';
console.log('Loading function');
export const handler = async (
event: DocumentDBEventSubscriptionContext,
context: any
): Promise<string> => {
event.events.forEach((record: DocumentDBEventRecord) => {
logDocumentDBEvent(record);
});
return 'OK';
};
const logDocumentDBEvent = (record: DocumentDBEventRecord): void => {
console.log('Operation type: ' + record.event.operationType);
console.log('db: ' + record.event.ns.db);
console.log('collection: ' + record.event.ns.coll);
console.log('Full document:', JSON.stringify(record.event.fullDocument, null, 2));
};
- PHP
-
- 适用于 PHP 的 SDK
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
使用 PHP 将 Amazon DocumentDB 事件与 Lambda 结合使用。
<?php
require __DIR__.'/vendor/autoload.php';
use Bref\Context\Context;
use Bref\Event\Handler;
class DocumentDBEventHandler implements Handler
{
public function handle($event, Context $context): string
{
$events = $event['events'] ?? [];
foreach ($events as $record) {
$this->logDocumentDBEvent($record['event']);
}
return 'OK';
}
private function logDocumentDBEvent($event): void
{
// Extract information from the event record
$operationType = $event['operationType'] ?? 'Unknown';
$db = $event['ns']['db'] ?? 'Unknown';
$collection = $event['ns']['coll'] ?? 'Unknown';
$fullDocument = $event['fullDocument'] ?? [];
// Log the event details
echo "Operation type: $operationType\n";
echo "Database: $db\n";
echo "Collection: $collection\n";
echo "Full document: " . json_encode($fullDocument, JSON_PRETTY_PRINT) . "\n";
}
}
return new DocumentDBEventHandler();
- Python
-
- 适用于 Python 的 SDK(Boto3)
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
使用 Python 将 Amazon DocumentDB 事件与 Lambda 结合使用。
import json
def lambda_handler(event, context):
for record in event.get('events', []):
log_document_db_event(record)
return 'OK'
def log_document_db_event(record):
event_data = record.get('event', {})
operation_type = event_data.get('operationType', 'Unknown')
db = event_data.get('ns', {}).get('db', 'Unknown')
collection = event_data.get('ns', {}).get('coll', 'Unknown')
full_document = event_data.get('fullDocument', {})
print(f"Operation type: {operation_type}")
print(f"db: {db}")
print(f"collection: {collection}")
print("Full document:", json.dumps(full_document, indent=2))
- Ruby
-
- 适用于 Ruby 的 SDK
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
使用 Ruby 将 Amazon DocumentDB 事件与 Lambda 结合使用。
require 'json'
def lambda_handler(event:, context:)
event['events'].each do |record|
log_document_db_event(record)
end
'OK'
end
def log_document_db_event(record)
event_data = record['event'] || {}
operation_type = event_data['operationType'] || 'Unknown'
db = event_data.dig('ns', 'db') || 'Unknown'
collection = event_data.dig('ns', 'coll') || 'Unknown'
full_document = event_data['fullDocument'] || {}
puts "Operation type: #{operation_type}"
puts "db: #{db}"
puts "collection: #{collection}"
puts "Full document: #{JSON.pretty_generate(full_document)}"
end
- Rust
-
- 适用于 Rust 的 SDK
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
使用 Rust 将 Amazon DocumentDB 事件与 Lambda 结合使用。
use lambda_runtime::{service_fn, tracing, Error, LambdaEvent};
use aws_lambda_events::{
event::documentdb::{DocumentDbEvent, DocumentDbInnerEvent},
};
// Built with the following dependencies:
//lambda_runtime = "0.11.1"
//serde_json = "1.0"
//tokio = { version = "1", features = ["macros"] }
//tracing = { version = "0.1", features = ["log"] }
//tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] }
//aws_lambda_events = "0.15.0"
async fn function_handler(event: LambdaEvent<DocumentDbEvent>) ->Result<(), Error> {
tracing::info!("Event Source ARN: {:?}", event.payload.event_source_arn);
tracing::info!("Event Source: {:?}", event.payload.event_source);
let records = &event.payload.events;
if records.is_empty() {
tracing::info!("No records found. Exiting.");
return Ok(());
}
for record in records{
log_document_db_event(record);
}
tracing::info!("Document db records processed");
// Prepare the response
Ok(())
}
fn log_document_db_event(record: &DocumentDbInnerEvent)-> Result<(), Error>{
tracing::info!("Change Event: {:?}", record.event);
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_target(false)
.without_time()
.init();
let func = service_fn(function_handler);
lambda_runtime::run(func).await?;
Ok(())
}
-
在 Lambda 控制台的代码源窗格中,将代码粘贴到代码编辑器中,替换 Lambda 创建的代码。
-
在部署部分,选择部署以更新函数的代码:
创建 Lambda 事件源映射
创建事件源映射,将 Amazon DocumentDB 更改流与 Lambda 函数相关联。创建此事件源映射后,Amazon Lambda 即开始轮询该流。
创建事件源映射
-
在 Lambda 控制台中打开函数页面。
-
选择您之前创建的 ProcessDocumentDBRecords
函数。
-
选择配置选项卡,然后从左侧菜单中选择触发器。
-
选择添加触发器。
-
在触发器配置下,为源选择 Amazon DocumentDB。
-
使用以下配置创建事件源映射:
-
选择添加。创建事件源映射可能需要花费几分钟的时间。
测试函数
等待事件源映射达到已启用状态。这个过程可能需要几分钟。然后,通过插入、更新和删除数据库记录来测试端到端设置。开始前的准备工作:
在 docdbdemo
数据库的 products
集合中插入记录:
db.products.insertOne({"name":"Pencil", "price": 1.00})
通过查看 CloudWatch Logs 来验证函数是否成功处理该事件。您会看到如下日志条目:
使用以下命令更新您刚刚插入的记录:
db.products.updateOne(
{ "name": "Pencil" },
{ $set: { "price": 0.50 }}
)
通过查看 CloudWatch Logs 来验证函数是否成功处理该事件。您会看到如下日志条目:
使用以下命令删除您刚刚更新的记录:
db.products.deleteOne( { "name": "Pencil" } )
通过查看 CloudWatch Logs 来验证函数是否成功处理该事件。您会看到如下日志条目:
故障排除
如果您在函数的 CloudWatch 日志中没有看到任何数据库事件,请检查以下各项:
清除资源
除非您想要保留为本教程创建的资源,否则可立即将其删除。通过删除您不再使用的 Amazon 资源,可防止您的 Amazon Web Services 账户 产生不必要的费用。
删除 Lambda 函数
-
打开 Lamba 控制台的 Functions(函数)页面。
-
选择您创建的函数。
-
依次选择操作和删除。
-
在文本输入字段中键入 confirm
,然后选择删除。
删除执行角色
-
打开 IAM 控制台的角色页面。
-
选择您创建的执行角色。
-
选择删除。
-
在文本输入字段中输入角色名称,然后选择删除。
删除 VPC 端点
-
打开 VPC 控制台。在左侧菜单的虚拟私有云下,选择端点。
-
选择您创建的端点。
-
选择 Actions(操作)、Delete VPC Endpoint(删除 VPC 端点)。
-
在文本输入字段中输入 delete
。
-
选择删除。
删除 Amazon DocumentDB 集群
-
打开 Amazon DocumentDB 控制台。
-
选择您为本教程创建的 Amazon DocumentDB 集群,并禁用删除保护。
-
在主集群页面中,再次选择 Amazon DocumentDB 集群。
-
依次选择操作、删除。
-
对于创建最终集群快照,选择否。
-
在文本输入字段中输入 delete
。
-
选择删除。