教程 2:对 DynamoDB 和 Lambda 使用筛选器来处理部分事件。
在本教程中,您将创建 Amazon Lambda 触发器以处理来自 DynamoDB 表的流中的部分事件。
通过 Lambda 事件筛选,您可以使用筛选表达式来控制 Lambda 将哪些事件发送给函数进行处理。每个 DynamoDB 流最多可以配置 5 个不同的筛选器。如果您使用的是批处理时段,则 Lambda 会对每个新事件应用筛选条件,以确定是否将其包括在当前批处理中。
筛选器通过名为 FilterCriteria
的结构来应用。FilterCriteria
的 3 个主要属性为 metadata properties
、data properties
和 filter patterns
。
DynamoDB Streams 事件的示例结构如下所示:
{ "eventID": "c9fbe7d0261a5163fcb6940593e41797", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-2", "dynamodb": { "ApproximateCreationDateTime": 1664559083.0, "Keys": { "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" } }, "NewImage": { "quantity": { "N": "50" }, "company_id": { "S": "1000" }, "fabric": { "S": "Florida Chocolates" }, "price": { "N": "15" }, "stores": { "N": "5" }, "product_id": { "S": "1000" }, "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" }, "state": { "S": "FL" }, "type": { "S": "" } }, "SequenceNumber": "700000000000888747038", "SizeBytes": 174, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209" }
metadata properties
是事件对象的字段。在 DynamoDB Streams 中,metadata properties
是 dynamodb
或 eventName
这样的字段。
data properties
是事件主体的字段。要根据 data properties
进行筛选,请确保将它们包含在正确的键内的 FilterCriteria
中。对于 DynamoDB 事件源,数据键为 NewImage
或 OldImage
。
最后,筛选条件规则将定义要应用到特定属性的筛选条件表达式。下面是一些示例:
比较运算符 | 示例 | 规则语法(部分) |
---|---|---|
Null |
产品类型为 null |
|
空 |
产品名称为空 |
|
等于 |
州为佛罗里达州 |
|
并且 |
产品的州为佛罗里达州且产品类别为巧克力 |
|
Or |
产品的州为佛罗里达州或加利佛尼亚州 |
|
Not |
产品的州不是佛罗里达州 |
|
存在 |
存在自制产品 |
|
不存在 |
不存在自制产品 |
|
开头 |
COMPANY 以 PK 开头 |
|
您最多可以为一个 Lambda 函数指定 5 个事件筛选模式。请注意,这 5 个事件中的每一个都将作为逻辑 OR 进行求值。因此,如果您配置了名为 Filter_One
和 Filter_Two
的两个筛选条件,则 Lambda 函数将执行 Filter_One
OR Filter_Two
。
注意
在Lambda 事件筛选页面中,有一些用于筛选和比较数值的选项,但不适用于 DynamoDB 筛选事件,因为 DynamoDB 中的数字作为字符串存储。例如 "quantity": { "N": "50"
}
,由于 "N"
属性,我们知道它是一个数字。
组合起来 – Amazon CloudFormation
为了展示事件筛选功能的实际应用,下面提供了一个示例 CloudFormation 模板。此模板将生成一个简单的 DynamoDB 表,带有分区键 PK 和排序键 SK,并启用了 Amazon DynamoDB Streams。它将创建一个 Lambda 函数和一个简单的 Lambda 执行角色,允许将日志写入 Amazon Cloudwatch,并从 Amazon DynamoDB Stream 中读取事件。它还在 DynamoDB Streams 与 Lambda 函数之间添加事件源映射,因此每次在 Amazon DynamoDB Stream 中出现事件时都可以执行该函数。
AWSTemplateFormatVersion: "2010-09-09" Description: Sample application that presents AWS Lambda event source filtering with Amazon DynamoDB Streams. Resources: StreamsSampleDDBTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: "PK" AttributeType: "S" - AttributeName: "SK" AttributeType: "S" KeySchema: - AttributeName: "PK" KeyType: "HASH" - AttributeName: "SK" KeyType: "RANGE" StreamSpecification: StreamViewType: "NEW_AND_OLD_IMAGES" ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 LambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: "/" Policies: - PolicyName: root PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: arn:aws:logs:*:*:* - Effect: Allow Action: - dynamodb:DescribeStream - dynamodb:GetRecords - dynamodb:GetShardIterator - dynamodb:ListStreams Resource: !GetAtt StreamsSampleDDBTable.StreamArn EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST ProcessEventLambda: Type: AWS::Lambda::Function Properties: Runtime: python3.7 Timeout: 300 Handler: index.handler Role: !GetAtt LambdaExecutionRole.Arn Code: ZipFile: | import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def handler(event, context): LOGGER.info('Received Event: %s', event) for rec in event['Records']: LOGGER.info('Record: %s', rec) Outputs: StreamsSampleDDBTable: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.Arn StreamARN: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.StreamArn
部署此 CloudFormation 模板后,您可以插入以下 Amazon DynamoDB 项目:
{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK", "company_id": "1000", "type": "", "state": "FL", "stores": 5, "price": 15, "quantity": 50, "fabric": "Florida Chocolates" }
由于此 CloudFormation 模板中内嵌了简单的 Lambda 函数,您将在 Amazon CloudWatch 日志组中看到 Lambda 函数的事件,如下所示:
{ "eventID": "c9fbe7d0261a5163fcb6940593e41797", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-2", "dynamodb": { "ApproximateCreationDateTime": 1664559083.0, "Keys": { "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" } }, "NewImage": { "quantity": { "N": "50" }, "company_id": { "S": "1000" }, "fabric": { "S": "Florida Chocolates" }, "price": { "N": "15" }, "stores": { "N": "5" }, "product_id": { "S": "1000" }, "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" }, "state": { "S": "FL" }, "type": { "S": "" } }, "SequenceNumber": "700000000000888747038", "SizeBytes": 174, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209" }
筛选示例
-
仅限与给定州匹配的产品
此示例修改了 CloudFormation 模板,使其包含一个筛选条件,用于匹配来自佛罗里达州的所有产品,缩写为“FL”。
EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST
重新部署堆栈后,可以将以下 DynamoDB 项目添加到表中。请注意,它不会出现在 Lambda 函数日志中,因为本示例中的产品来自加利佛尼亚州。
{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK#1000", "company_id": "1000", "fabric": "Florida Chocolates", "price": 15, "product_id": "1000", "quantity": 50, "state": "CA", "stores": 5, "type": "" }
-
仅限以 PK 和 SK 中某些值开头的项目
此示例修改 CloudFormation 模板,使其包含以下条件:
EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST
请注意 AND 条件要求条件位于模式内,其中键 PK 和 SK 位于同一个表达式中,以逗号分隔。
或者是以 PK 和 SK 开头的某些值,或者来自特定状态。
此示例修改 CloudFormation 模板,使其包含以下条件:
EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}' - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST
请注意,OR 条件是通过在筛选条件部分引入新模式来添加的。
组合起来 – CDK
以下示例 CDK 项目 Formation 模板介绍了事件筛选功能。在使用此 CDK 项目之前,您需要安装先决条件,包括运行准备脚本。
创建 CDK 项目
首先通过在空目录中调用 cdk init
,创建一个新的 Amazon CDK 项目。
mkdir ddb_filters cd ddb_filters cdk init app --language python
cdk init
命令使用项目文件夹的名称来命名项目的各种元素,包括类、子文件夹和文件。文件夹名称中的所有连字符都将转换为下划线。否则,该名称应遵循 Python 标识符的格式。例如,名称不应以数字开头,也不能包含空格。
要使用新项目,请激活其虚拟环境。这允许将项目的依赖项安装在本地项目文件夹中,而不是全局安装。
source .venv/bin/activate python -m pip install -r requirements.txt
注意
您可将其视为用于激活虚拟环境的 Mac/Linux 命令。Python 模板包含一个批处理文件 source.bat
,该文件允许在 Windows 上使用相同的命令。也可以使用传统的 Windows 命令 .venv\Scripts\activate.bat
。如果您使用 Amazon CDK Toolkit v1.70.0 或更早版本来初始化 Amazon CDK 项目,则您的虚拟环境位于 .env
目录中,而不是 .venv
。
基本基础设施
使用首选文本编辑器打开文件 ./ddb_filters/ddb_filters_stack.py
。此文件在您创建 Amazon CDK 项目时自动生成。
接下来,添加函数 _create_ddb_table
和 _set_ddb_trigger_function
。这些函数将在预置模式/按需模式下创建一个 DynamoDB 表,该表带有分区键 PK 和排序键 SK,并且默认启用了 Amazon DynamoDB Streams 以显示新映像和旧映像。
Lambda 函数将存储在文件夹 lambda
下的文件 app.py
中。此文件将稍后创建。它包含一个环境变量 APP_TABLE_NAME
,这将成为此堆栈创建的 Amazon DynamoDB 表的名称。在同一个函数中,我们向 Lambda 函数授予流读取权限。最后,它将订阅 DynamoDB Streams 作为 Lambda 函数的事件源。
在 __init__
方法中文件的末尾,您将调用相应的构造以在堆栈中初始化它们。对于需要额外组件和服务的较大项目,最好在基础堆栈之外定义这些构造。
import os import json import aws_cdk as cdk from aws_cdk import ( Stack, aws_lambda as _lambda, aws_dynamodb as dynamodb, ) from constructs import Construct class DdbFiltersStack(Stack): def _create_ddb_table(self): dynamodb_table = dynamodb.Table( self, "AppTable", partition_key=dynamodb.Attribute( name="PK", type=dynamodb.AttributeType.STRING ), sort_key=dynamodb.Attribute( name="SK", type=dynamodb.AttributeType.STRING), billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST, stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES, removal_policy=cdk.RemovalPolicy.DESTROY, ) cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name) return dynamodb_table def _set_ddb_trigger_function(self, ddb_table): events_lambda = _lambda.Function( self, "LambdaHandler", runtime=_lambda.Runtime.PYTHON_3_9, code=_lambda.Code.from_asset("lambda"), handler="app.handler", environment={ "APP_TABLE_NAME": ddb_table.table_name, }, ) ddb_table.grant_stream_read(events_lambda) event_subscription = _lambda.CfnEventSourceMapping( scope=self, id="companyInsertsOnlyEventSourceMapping", function_name=events_lambda.function_name, event_source_arn=ddb_table.table_stream_arn, maximum_batching_window_in_seconds=1, starting_position="LATEST", batch_size=1, ) def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: super().__init__(scope, construct_id, **kwargs) ddb_table = self._create_ddb_table() self._set_ddb_trigger_function(ddb_table)
现在,我们将创建一个非常简单的 Lambda 函数,它将日志输出到 Amazon CloudWatch 中。为此,请创建一个名为 lambda
的新文件夹。
mkdir lambda touch app.py
使用您常用的文本编辑器,将以下内容添加到 app.py
文件中:
import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def handler(event, context): LOGGER.info('Received Event: %s', event) for rec in event['Records']: LOGGER.info('Record: %s', rec)
确保您位于 /ddb_filters/
文件夹中,键入以下命令创建示例应用程序:
cdk deploy
在某个时候,系统会要求您确认是否要部署解决方案。键入 Y
接受更改。
├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤ │ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │ └───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘ Do you wish to deploy these changes (y/n)? y ... ✨ Deployment time: 67.73s Outputs: DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP Stack ARN: arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6
部署更改后,打开 Amazon 控制台并向表中添加一个项目。
{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK", "company_id": "1000", "type": "", "state": "FL", "stores": 5, "price": 15, "quantity": 50, "fabric": "Florida Chocolates" }
现在,CloudWatch 日志应该包含此条目中的所有信息。
筛选示例
-
仅限与给定州匹配的产品
打开文件 ddb_filters/ddb_filters/ddb_filters_stack.py
并进行修改,使其包含与所有等于“FL”的产品相匹配的筛选条件。可以在第 45 行的 event_subscription
下方对其进行修改。
event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}} ) }, ] }, )
-
仅限以 PK 和 SK 中某些值开头的项目
修改 Python 脚本以包含以下条件:
event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( { { "dynamodb": { "Keys": { "PK": {"S": [{"prefix": "COMPANY"}]}, "SK": {"S": [{"prefix": "PRODUCT"}]}, } } } } ) }, ] },
-
或者是以 PK 和 SK 开头的某些值,或者来自特定状态。
修改 Python 脚本以包含以下条件:
event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( { { "dynamodb": { "Keys": { "PK": {"S": [{"prefix": "COMPANY"}]}, "SK": {"S": [{"prefix": "PRODUCT"}]}, } } } } ) }, { "Pattern": json.dumps( {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}} ) }, ] }, )
请注意,向筛选数组添加更多元素时,将会添加 OR 条件。
清除
在工作目录的底部找到筛选器堆栈,然后执行 cdk destroy
。系统将要求您确认删除资源:
cdk destroy Are you sure you want to delete: DdbFiltersStack (y/n)? y