本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
在 Step Functions 中使用 Lambda 函数处理批量数据
在本教程中,您将使用分布式 Map 状态的 ItemBatcher (地图) 字段来处理 Lambda 函数中的整批项目。每批最多包含三个项目。分布式 Map 状态启动四个子工作流执行,其中每个执行处理三个项目,而一个执行处理单个项目。每个子工作流执行都会调用一个 Lambda 函数,该函数对批次中存在的各个项目进行迭代。
您将创建一个对整数数组执行乘法的状态机。假设输入的整数数组是 [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
,乘法因子是 7
。这些整数与乘法因子 7 相乘后形成的数组将是 [7, 14, 21, 28, 35, 42, 49, 56, 63, 70]
。
第 1 步:创建状态机
在此步骤中,您将创建状态机的工作流原型,该原型将整批数据传递给将在第 2 步中创建的 Lambda 函数。
-
使用以下定义通过 Step Functions 控制台
创建状态机。有关创建状态机的信息,请参阅开始使用分布式 Map 状态教程中的第 1 步:创建工作流原型。 在此状态机中,您可以定义一个分布式 Map 状态,该状态接受 10 个整数的数组作为输入,并以
3
个整数为一批将该数组传递给一个 Lambda 函数。Lambda 函数迭代批次中在的各个项目,并返回名为multiplied
的输出数组。输出数组包含对输入数组中传递的项目执行乘法的结果。重要
{ "StartAt": "Pass", "States": { "Pass": { "Type": "Pass", "Next": "Map", "Result": { "MyMultiplicationFactor": 7, "MyItems": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] } }, "Map": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "STANDARD" }, "StartAt": "Lambda Invoke", "States": { "Lambda Invoke": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:
functionName
" }, "Retry": [ { "ErrorEquals": [ "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException", "Lambda.TooManyRequestsException" ], "IntervalSeconds": 2, "MaxAttempts": 6, "BackoffRate": 2 } ], "End": true } } }, "End": true, "Label": "Map", "MaxConcurrency": 1000, "ItemBatcher": { "MaxItemsPerBatch": 3, "BatchInput": { "MyMultiplicationFactor.$": "$.MyMultiplicationFactor" } }, "ItemsPath": "$.MyItems" } } }
第 2 步:创建 Lambda 函数
在此步骤中,您将创建 Lambda 函数,该函数可处理批次传递的所有项目。
重要
确保您的 Lambda 函数与状态机处于同一 Amazon Web Services 区域。
创建 Lambda 函数
-
使用 Lambda 控制台
创建一个 Python Lambda 函数,命名为 ProcessEntireBatch
。有关创建 Lambda 函数的信息,请参阅开始使用分布式 Map 状态教程中的第 4 步:配置 Lambda 函数。 -
复制以下 Lambda 函数代码,并将其粘贴到 Lambda 函数的代码源部分。
import json def lambda_handler(event, context): multiplication_factor = event['BatchInput']['MyMultiplicationFactor'] items = event['Items'] results = [multiplication_factor * item for item in items] return { 'statusCode': 200, 'multiplied': results }
-
创建 Lambda 函数后,复制显示在页面右上角的函数 ARN。以下是一个 ARN 示例,其中
是 Lambda 函数的名称(在本例中为function-name
ProcessEntireBatch
):arn:aws:lambda:us-east-1:123456789012:function:
function-name
您需要在第 1 步中创建的状态机中提供函数 ARN。
-
选择部署,以部署更改。
第 3 步:运行状态机
运行状态机时,分布式 Map 状态会启动四个子工作流程执行,其中每个执行处理三个项目,而一个执行处理单个项目。
以下示例显示了由其中一个子工作流执行传递给 ProcessEntireBatch 函数的数据。
{
"BatchInput": {
"MyMultiplicationFactor": 7
},
"Items": [1, 2, 3]
}
给定此输入,以下示例显示了 Lambda 函数返回的名为 multiplied
的输出数组。
{
"statusCode": 200,
"multiplied": [7, 14, 21]
}
状态机返回以下输出,其中包含为四个子工作流执行四个数组,名为 multiplied
。这些数组包含各个输入项的乘法结果。
[
{
"statusCode": 200,
"multiplied": [7, 14, 21]
},
{
"statusCode": 200,
"multiplied": [28, 35, 42]
},
{
"statusCode": 200,
"multiplied": [49, 56, 63]
},
{
"statusCode": 200,
"multiplied": [70]
}
]
要将返回的所有数组项合并到一个输出数组中,可以使用 ResultSelector 字段。在分布式 Map 状态中定义此字段,用于查找所有 multiplied
数组,提取这些数组中的所有项目,然后将它们组合成一个输出数组。
要使用 ResultSelector
字段,请更新您的状态机定义,如以下示例所示。
{ "StartAt": "Pass", "States": { ... ... "Map": { "Type": "Map", ... ... "ItemsPath": "$.MyItems", "ResultSelector": { "multiplied.$": "$..multiplied[*]" } } } }
更新的状态机返回统一的输出数组,如以下示例所示。
{
"multiplied": [7, 14, 21, 28, 35, 42, 49, 56, 63, 70]
}