本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
在 Step Functions 中使用 Lambda 函数处理各个项目
在本教程中,您将使用分布式 Map 状态的 ItemBatcher (地图) 字段,使用 Lambda 函数迭代批次中的单个项目。分布式 Map 状态将启动四个子工作流执行。每个子工作流都运行一个内联 Map 状态。内联 Map 状态每次迭代都会调用一个 Lambda 函数,并将批次中的单个项目传递给该函数。然后,Lambda 函数处理该项目并返回结果。
您将创建一个对整数数组执行乘法的状态机。假设输入的整数数组是 [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
,乘法因子是 7
。这些整数与乘法因子 7 相乘后形成的数组将是 [7, 14, 21, 28, 35, 42, 49, 56, 63, 70]
。
第 1 步:创建状态机
在此步骤中,您将创建状态机的工作流原型,该原型会将一批项目中的单个项目传递给您将在第 2 步中创建的 Lambda 函数的每次调用。
-
使用以下定义通过 Step Functions 控制台
创建状态机。有关创建状态机的信息,请参阅开始使用分布式 Map 状态教程中的第 1 步:创建工作流原型。 在此状态机中,您将定义一个分布式 Map 状态,该状态接受 10 个整数的数组作为输入,并将这些数组项分批传递给子工作流执行。每个子工作流程执行都会接收一批三个项目作为输入,并运行一个内联 Map 状态。内联 Map 状态的每次迭代都会调用 Lambda 函数,并将批次中的一个项目传递给该函数。然后,此函数将该项与系数
7
相乘并返回结果。每个子工作流程执行的输出都是一个 JSON 数组,其中包含每个所传递项目的乘法结果。
重要
{ "StartAt": "Pass", "States": { "Pass": { "Type": "Pass", "Next": "Map", "Result": { "MyMultiplicationFactor": 7, "MyItems": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] } }, "Map": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "STANDARD" }, "StartAt": "InnerMap", "States": { "InnerMap": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "INLINE" }, "StartAt": "Lambda Invoke", "States": { "Lambda Invoke": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:
functionName
" }, "Retry": [ { "ErrorEquals": [ "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException", "Lambda.TooManyRequestsException" ], "IntervalSeconds": 2, "MaxAttempts": 6, "BackoffRate": 2 } ], "End": true } } }, "End": true, "ItemsPath": "$.Items", "ItemSelector": { "MyMultiplicationFactor.$": "$.BatchInput.MyMultiplicationFactor", "MyItem.$": "$$.Map.Item.Value" } } } }, "End": true, "Label": "Map", "MaxConcurrency": 1000, "ItemsPath": "$.MyItems", "ItemBatcher": { "MaxItemsPerBatch": 3, "BatchInput": { "MyMultiplicationFactor.$": "$.MyMultiplicationFactor" } } } } }
第 2 步:创建 Lambda 函数
在此步骤中,您将创建 Lambda 函数,用于处理批次传递的每个项目。
重要
确保您的 Lambda 函数与状态机处于同一 Amazon Web Services 区域。
创建 Lambda 函数
-
使用 Lambda 控制台
创建一个 Python Lambda 函数,命名为 ProcessSingleItem
。有关创建 Lambda 函数的信息,请参阅开始使用分布式 Map 状态教程中的第 4 步:配置 Lambda 函数。 -
复制以下 Lambda 函数代码,并将其粘贴到 Lambda 函数的代码源部分。
import json def lambda_handler(event, context): multiplication_factor = event['MyMultiplicationFactor'] item = event['MyItem'] result = multiplication_factor * item return { 'statusCode': 200, 'multiplied': result }
-
创建 Lambda 函数后,复制显示在页面右上角的函数 ARN。以下是一个 ARN 示例,其中
是 Lambda 函数的名称(在本例中为function-name
ProcessSingleItem
):arn:aws:lambda:us-east-1:123456789012:function:
function-name
您需要在第 1 步中创建的状态机中提供函数 ARN。
-
选择部署,以部署更改。
第 3 步:运行状态机
运行状态机时,分布式 Map 状态会启动四个子工作流程执行,其中每个执行处理三个项目,而一个执行处理单个项目。
以下示例显示了在子工作流执行中传递给其中一个 ProcessSingleItem 函数调用的数据。
{
"MyMultiplicationFactor": 7,
"MyItem": 1
}
给定此输入,以下示例显示了 Lambda 函数返回的输出。
{
"statusCode": 200,
"multiplied": 7
}
下面的示例显示了其中一个子工作流执行的输出 JSON 数组。
[
{
"statusCode": 200,
"multiplied": 7
},
{
"statusCode": 200,
"multiplied": 14
},
{
"statusCode": 200,
"multiplied": 21
}
]
状态机返回以下输出,其中包含四个子工作流执行的四个数组。这些数组包含各个输入项的乘法结果。
最后,状态机输出是一个名为 multiplied
的数组,组合了为四个子工作流执行返回的所有乘法结果。
[
[
{
"statusCode": 200,
"multiplied": 7
},
{
"statusCode": 200,
"multiplied": 14
},
{
"statusCode": 200,
"multiplied": 21
}
],
[
{
"statusCode": 200,
"multiplied": 28
},
{
"statusCode": 200,
"multiplied": 35
},
{
"statusCode": 200,
"multiplied": 42
}
],
[
{
"statusCode": 200,
"multiplied": 49
},
{
"statusCode": 200,
"multiplied": 56
},
{
"statusCode": 200,
"multiplied": 63
}
],
[
{
"statusCode": 200,
"multiplied": 70
}
]
]
要将子工作流执行返回的所有乘法结果合并到单个输出数组中,可以使用 ResultSelector 字段。在分布式 Map 状态 中定义此字段以查找所有结果,提取单个结果,然后将它们组合成一个名为 multiplied
的输出数组。
要使用 ResultSelector
字段,请更新您的状态机定义,如以下示例所示。
{ "StartAt": "Pass", "States": { ... ... "Map": { "Type": "Map", ... ... "ItemBatcher": { "MaxItemsPerBatch": 3, "BatchInput": { "MyMultiplicationFactor.$": "$.MyMultiplicationFactor" } }, "ItemsPath": "$.MyItems", "ResultSelector": { "multiplied.$": "$..multiplied" } } } }
更新的状态机返回统一的输出数组,如以下示例所示。
{
"multiplied": [7, 14, 21, 28, 35, 42, 49, 56, 63, 70]
}