DynamoDB Streams 和生存时间 - Amazon DynamoDB
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

DynamoDB Streams 和生存时间

您可以通过在表中启用 Amazon DynamoDB Streams 并处理已过期项目的流记录来备份或者处理按存活时间 (TTL) 删除的项目。

流记录包含用户身份字段Records[<index>].userIdentity

在过期后被存活时间过程删除的项目包含以下字段:

  • Records[<index>].userIdentity.type

    "Service"

  • Records[<index>].userIdentity.principalId

    "dynamodb.amazonaws.com"

注意

在全局表中使用 TTL 时,执行 TTL 的区域将设置该userIdentity字段。复制删除操作时,不会在其他区域设置此字段。

以下 JSON 显示单个流记录的相关部分。

"Records": [ { ... "userIdentity": { "type": "Service", "principalId": "dynamodb.amazonaws.com" } ... } ]

使用 DynamoDB Streams 和 Lambda 归档已删除 TTL 的项目

结合使用 DynamoDB 存活时间 (TTL)DynamoDB StreamsAmazon Lambda 可以帮助简化数据归档、降低 DynamoDB 存储成本并降低代码复杂性。使用 Lambda 作为流使用者提供了许多优势,最明显的是与 Kinesis Client Library (KCL) 等其他使用者相比,降低了成本。当通过 Lambda 来使用事件时,对 DynamoDB 流的 GetRecords API 调用不向您收费,并且 Lambda 可以通过识别流事件中的 JSON 模式来提供事件筛选。借助事件模式内容筛选,您可以定义多达五个不同的筛选条件来控制将哪些事件发送到 Lambda 进行处理。这有助于减少对 Lambda 函数的调用、简化代码并降低总体成本。

尽管 DynamoDB Streams 包含所有数据修改,例如 CreateModifyRemove 操作,但这可能导致不必要地调用归档 Lambda 函数。例如,假设一个每小时有 200 万项数据修改的表流入流中,但其中不到 5% 的数据修改是将在 TTL 流程中过期而需要归档的项目删除。使用 Lambda 事件源筛选条件,Lambda 函数每小时只调用 100,000 次。事件筛选的结果是,您只需为所需的调用付费。在没有事件筛选的情况下,您需要为获得的 200 万次调用付费。

事件筛选应用于 Lambda 事件源映射,它是一个从选定事件(DynamoDB 流)读取并调用 Lambda 函数的资源。在下图中,您可以看到 Lambda 函数如何通过流和事件筛选条件使用已删除存活时间的项目。

DynamoDB 存活时间事件筛选条件模式

将以下 JSON 添加到源映射筛选标准仅允许对已删除 TTL 的项目调用 Lambda 函数:

{ "Filters": [ { "Pattern": { "userIdentity": { "type": ["Service"], "principalId": ["dynamodb.amazonaws.com"] } } } ] }

创建 Amazon Lambda 事件源映射

使用以下代码段创建筛选的事件源映射,您可以将其连接到表的 DynamoDB 流。每个代码块都包括事件筛选条件模式。

Amazon CLI
aws lambda create-event-source-mapping \ --event-source-arn 'arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000' \ --batch-size 10 \ --enabled \ --function-name test_func \ --starting-position LATEST \ --filter-criteria '{"Filters": [{"Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"}]}'
Java
LambdaClient client = LambdaClient.builder() .region(Region.EU_WEST_1) .build(); Filter userIdentity = Filter.builder() .pattern("{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}") .build(); FilterCriteria filterCriteria = FilterCriteria.builder() .filters(userIdentity) .build(); CreateEventSourceMappingRequest mappingRequest = CreateEventSourceMappingRequest.builder() .eventSourceArn("arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000") .batchSize(10) .enabled(Boolean.TRUE) .functionName("test_func") .startingPosition("LATEST") .filterCriteria(filterCriteria) .build(); try{ CreateEventSourceMappingResponse eventSourceMappingResponse = client.createEventSourceMapping(mappingRequest); System.out.println("The mapping ARN is "+eventSourceMappingResponse.eventSourceArn()); }catch (ServiceException e){ System.out.println(e.getMessage()); }
Node
const client = new LambdaClient({ region: "eu-west-1" }); const input = { EventSourceArn: "arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000", BatchSize: 10, Enabled: true, FunctionName: "test_func", StartingPosition: "LATEST", FilterCriteria: { "Filters": [{ "Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}" }] } } const command = new CreateEventSourceMappingCommand(input); try { const results = await client.send(command); console.log(results); } catch (err) { console.error(err); }
Python
session = boto3.session.Session(region_name = 'eu-west-1') client = session.client('lambda') try: response = client.create_event_source_mapping( EventSourceArn='arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000', BatchSize=10, Enabled=True, FunctionName='test_func', StartingPosition='LATEST', FilterCriteria={ 'Filters': [ { 'Pattern': "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}" }, ] } ) print(response) except Exception as e: print(e)
JSON
{ "userIdentity": { "type": ["Service"], "principalId": ["dynamodb.amazonaws.com"] } }