教程 2:对 DynamoDB 和 Lambda 使用筛选器来处理部分事件。 - Amazon DynamoDB
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

教程 2:对 DynamoDB 和 Lambda 使用筛选器来处理部分事件。

在本教程中,您将创建 Amazon Lambda 触发器以处理来自 DynamoDB 表的流中的部分事件。

通过 Lambda 事件筛选,您可以使用筛选表达式来控制 Lambda 将哪些事件发送给函数进行处理。每个 DynamoDB 流最多可以配置 5 个不同的筛选器。如果您使用的是批处理时段,则 Lambda 会对每个新事件应用筛选条件,以确定是否将其包括在当前批处理中。

筛选器通过名为 FilterCriteria 的结构来应用。FilterCriteria 的 3 个主要属性为 metadata propertiesdata propertiesfilter patterns

DynamoDB Streams 事件的示例结构如下所示:

{ "eventID": "c9fbe7d0261a5163fcb6940593e41797", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-2", "dynamodb": { "ApproximateCreationDateTime": 1664559083.0, "Keys": { "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" } }, "NewImage": { "quantity": { "N": "50" }, "company_id": { "S": "1000" }, "fabric": { "S": "Florida Chocolates" }, "price": { "N": "15" }, "stores": { "N": "5" }, "product_id": { "S": "1000" }, "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" }, "state": { "S": "FL" }, "type": { "S": "" } }, "SequenceNumber": "700000000000888747038", "SizeBytes": 174, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209" }

metadata properties 是事件对象的字段。在 DynamoDB Streams 中,metadata propertiesdynamodbeventName 这样的字段。

data properties 是事件主体的字段。要根据 data properties 进行筛选,请确保将它们包含在正确的键内的 FilterCriteria 中。对于 DynamoDB 事件源,数据键为 NewImageOldImage

最后,筛选条件规则将定义要应用到特定属性的筛选条件表达式。下面是一些示例:

比较运算符 示例 规则语法(部分)

Null

产品类型为 null

{ "product_type": { "S": null } }

产品名称为空

{ "product_name": { "S": [ ""] } }

等于

州为佛罗里达州

{ "state": { "S": ["FL"] } }

并且

产品的州为佛罗里达州且产品类别为巧克力

{ "state": { "S": ["FL"] } , "category": { "S": [ "CHOCOLATE"] } }

Or

产品的州为佛罗里达州或加利佛尼亚州

{ "state": { "S": ["FL","CA"] } }

Not

产品的州不是佛罗里达州

{"state": {"S": [{"anything-but": ["FL"]}]}}

存在

存在自制产品

{"homemade": {"S": [{"exists": true}]}}

不存在

不存在自制产品

{"homemade": {"S": [{"exists": false}]}}

开头

COMPANY 以 PK 开头

{"PK": {"S": [{"prefix": "COMPANY"}]}}

您最多可以为一个 Lambda 函数指定 5 个事件筛选模式。请注意,这 5 个事件中的每一个都将作为逻辑 OR 进行求值。因此,如果您配置了名为 Filter_OneFilter_Two 的两个筛选条件,则 Lambda 函数将执行 Filter_One OR Filter_Two

注意

Lambda 事件筛选页面中,有一些用于筛选和比较数值的选项,但不适用于 DynamoDB 筛选事件,因为 DynamoDB 中的数字作为字符串存储。例如 "quantity": { "N": "50" },由于 "N" 属性,我们知道它是一个数字。

组合起来 – Amazon CloudFormation

为了展示事件筛选功能的实际应用,下面提供了一个示例 CloudFormation 模板。此模板将生成一个简单的 DynamoDB 表,带有分区键 PK 和排序键 SK,并启用了 Amazon DynamoDB Streams。它将创建一个 Lambda 函数和一个简单的 Lambda 执行角色,允许将日志写入 Amazon Cloudwatch,并从 Amazon DynamoDB Stream 中读取事件。它还在 DynamoDB Streams 与 Lambda 函数之间添加事件源映射,因此每次在 Amazon DynamoDB Stream 中出现事件时都可以执行该函数。

AWSTemplateFormatVersion: "2010-09-09" Description: Sample application that presents AWS Lambda event source filtering with Amazon DynamoDB Streams. Resources: StreamsSampleDDBTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: "PK" AttributeType: "S" - AttributeName: "SK" AttributeType: "S" KeySchema: - AttributeName: "PK" KeyType: "HASH" - AttributeName: "SK" KeyType: "RANGE" StreamSpecification: StreamViewType: "NEW_AND_OLD_IMAGES" ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 LambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: "/" Policies: - PolicyName: root PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: arn:aws:logs:*:*:* - Effect: Allow Action: - dynamodb:DescribeStream - dynamodb:GetRecords - dynamodb:GetShardIterator - dynamodb:ListStreams Resource: !GetAtt StreamsSampleDDBTable.StreamArn EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST ProcessEventLambda: Type: AWS::Lambda::Function Properties: Runtime: python3.7 Timeout: 300 Handler: index.handler Role: !GetAtt LambdaExecutionRole.Arn Code: ZipFile: | import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def handler(event, context): LOGGER.info('Received Event: %s', event) for rec in event['Records']: LOGGER.info('Record: %s', rec) Outputs: StreamsSampleDDBTable: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.Arn StreamARN: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.StreamArn

部署此 CloudFormation 模板后,您可以插入以下 Amazon DynamoDB 项目:

{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK", "company_id": "1000", "type": "", "state": "FL", "stores": 5, "price": 15, "quantity": 50, "fabric": "Florida Chocolates" }

由于此 CloudFormation 模板中内嵌了简单的 Lambda 函数,您将在 Amazon CloudWatch 日志组中看到 Lambda 函数的事件,如下所示:

{ "eventID": "c9fbe7d0261a5163fcb6940593e41797", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-2", "dynamodb": { "ApproximateCreationDateTime": 1664559083.0, "Keys": { "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" } }, "NewImage": { "quantity": { "N": "50" }, "company_id": { "S": "1000" }, "fabric": { "S": "Florida Chocolates" }, "price": { "N": "15" }, "stores": { "N": "5" }, "product_id": { "S": "1000" }, "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" }, "state": { "S": "FL" }, "type": { "S": "" } }, "SequenceNumber": "700000000000888747038", "SizeBytes": 174, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209" }

筛选示例

  • 仅限与给定州匹配的产品

此示例修改了 CloudFormation 模板,使其包含一个筛选条件,用于匹配来自佛罗里达州的所有产品,缩写为“FL”。

EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST

重新部署堆栈后,可以将以下 DynamoDB 项目添加到表中。请注意,它不会出现在 Lambda 函数日志中,因为本示例中的产品来自加利佛尼亚州。

{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK#1000", "company_id": "1000", "fabric": "Florida Chocolates", "price": 15, "product_id": "1000", "quantity": 50, "state": "CA", "stores": 5, "type": "" }
  • 仅限以 PK 和 SK 中某些值开头的项目

此示例修改 CloudFormation 模板,使其包含以下条件:

EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST

请注意 AND 条件要求条件位于模式内,其中键 PK 和 SK 位于同一个表达式中,以逗号分隔。

或者是以 PK 和 SK 开头的某些值,或者来自特定状态。

此示例修改 CloudFormation 模板,使其包含以下条件:

EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}' - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST

请注意,OR 条件是通过在筛选条件部分引入新模式来添加的。

组合起来 – CDK

以下示例 CDK 项目 Formation 模板介绍了事件筛选功能。在使用此 CDK 项目之前,您需要安装先决条件,包括运行准备脚本

创建 CDK 项目

首先通过在空目录中调用 cdk init,创建一个新的 Amazon CDK 项目。

mkdir ddb_filters cd ddb_filters cdk init app --language python

cdk init 命令使用项目文件夹的名称来命名项目的各种元素,包括类、子文件夹和文件。文件夹名称中的所有连字符都将转换为下划线。否则,该名称应遵循 Python 标识符的格式。例如,名称不应以数字开头,也不能包含空格。

要使用新项目,请激活其虚拟环境。这允许将项目的依赖项安装在本地项目文件夹中,而不是全局安装。

source .venv/bin/activate python -m pip install -r requirements.txt
注意

您可将其视为用于激活虚拟环境的 Mac/Linux 命令。Python 模板包含一个批处理文件 source.bat,该文件允许在 Windows 上使用相同的命令。也可以使用传统的 Windows 命令 .venv\Scripts\activate.bat。如果您使用 Amazon CDK Toolkit v1.70.0 或更早版本来初始化 Amazon CDK 项目,则您的虚拟环境位于 .env 目录中,而不是 .venv

基本基础设施

使用首选文本编辑器打开文件 ./ddb_filters/ddb_filters_stack.py。此文件在您创建 Amazon CDK 项目时自动生成。

接下来,添加函数 _create_ddb_table_set_ddb_trigger_function。这些函数将在预置模式/按需模式下创建一个 DynamoDB 表,该表带有分区键 PK 和排序键 SK,并且默认启用了 Amazon DynamoDB Streams 以显示新映像和旧映像。

Lambda 函数将存储在文件夹 lambda 下的文件 app.py 中。此文件将稍后创建。它包含一个环境变量 APP_TABLE_NAME,这将成为此堆栈创建的 Amazon DynamoDB 表的名称。在同一个函数中,我们向 Lambda 函数授予流读取权限。最后,它将订阅 DynamoDB Streams 作为 Lambda 函数的事件源。

__init__ 方法中文件的末尾,您将调用相应的构造以在堆栈中初始化它们。对于需要额外组件和服务的较大项目,最好在基础堆栈之外定义这些构造。

import os import json import aws_cdk as cdk from aws_cdk import ( Stack, aws_lambda as _lambda, aws_dynamodb as dynamodb, ) from constructs import Construct class DdbFiltersStack(Stack): def _create_ddb_table(self): dynamodb_table = dynamodb.Table( self, "AppTable", partition_key=dynamodb.Attribute( name="PK", type=dynamodb.AttributeType.STRING ), sort_key=dynamodb.Attribute( name="SK", type=dynamodb.AttributeType.STRING), billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST, stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES, removal_policy=cdk.RemovalPolicy.DESTROY, ) cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name) return dynamodb_table def _set_ddb_trigger_function(self, ddb_table): events_lambda = _lambda.Function( self, "LambdaHandler", runtime=_lambda.Runtime.PYTHON_3_9, code=_lambda.Code.from_asset("lambda"), handler="app.handler", environment={ "APP_TABLE_NAME": ddb_table.table_name, }, ) ddb_table.grant_stream_read(events_lambda) event_subscription = _lambda.CfnEventSourceMapping( scope=self, id="companyInsertsOnlyEventSourceMapping", function_name=events_lambda.function_name, event_source_arn=ddb_table.table_stream_arn, maximum_batching_window_in_seconds=1, starting_position="LATEST", batch_size=1, ) def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: super().__init__(scope, construct_id, **kwargs) ddb_table = self._create_ddb_table() self._set_ddb_trigger_function(ddb_table)

现在,我们将创建一个非常简单的 Lambda 函数,它将日志输出到 Amazon CloudWatch 中。为此,请创建一个名为 lambda 的新文件夹。

mkdir lambda touch app.py

使用您常用的文本编辑器,将以下内容添加到 app.py 文件中:

import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def handler(event, context): LOGGER.info('Received Event: %s', event) for rec in event['Records']: LOGGER.info('Record: %s', rec)

确保您位于 /ddb_filters/ 文件夹中,键入以下命令创建示例应用程序:

cdk deploy

在某个时候,系统会要求您确认是否要部署解决方案。键入 Y 接受更改。

├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤ │ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │ └───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘ Do you wish to deploy these changes (y/n)? y ... ✨ Deployment time: 67.73s Outputs: DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP Stack ARN: arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6

部署更改后,打开 Amazon 控制台并向表中添加一个项目。

{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK", "company_id": "1000", "type": "", "state": "FL", "stores": 5, "price": 15, "quantity": 50, "fabric": "Florida Chocolates" }

现在,CloudWatch 日志应该包含此条目中的所有信息。

筛选示例

  • 仅限与给定州匹配的产品

打开文件 ddb_filters/ddb_filters/ddb_filters_stack.py 并进行修改,使其包含与所有等于“FL”的产品相匹配的筛选条件。可以在第 45 行的 event_subscription 下方对其进行修改。

event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}} ) }, ] }, )
  • 仅限以 PK 和 SK 中某些值开头的项目

修改 Python 脚本以包含以下条件:

event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( { { "dynamodb": { "Keys": { "PK": {"S": [{"prefix": "COMPANY"}]}, "SK": {"S": [{"prefix": "PRODUCT"}]}, } } } } ) }, ] },
  • 或者是以 PK 和 SK 开头的某些值,或者来自特定状态。

修改 Python 脚本以包含以下条件:

event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( { { "dynamodb": { "Keys": { "PK": {"S": [{"prefix": "COMPANY"}]}, "SK": {"S": [{"prefix": "PRODUCT"}]}, } } } } ) }, { "Pattern": json.dumps( {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}} ) }, ] }, )

请注意,向筛选数组添加更多元素时,将会添加 OR 条件。

清除

在工作目录的底部找到筛选器堆栈,然后执行 cdk destroy。系统将要求您确认删除资源:

cdk destroy Are you sure you want to delete: DdbFiltersStack (y/n)? y