使用 Lambda 函数处理单个数据项 - Amazon Step Functions
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Lambda 函数处理单个数据项

在本教程中,您将使用分布式 Map 状态ItemBatcher 字段,使用 Lambda 函数迭代批次中的单个项目。分布式 Map 状态将启动四个子工作流执行。每个子工作流都运行一个内联 Map 状态内联 Map 状态每次迭代都会调用一个 Lambda 函数,并将批次中的单个项目传递给该函数。然后,Lambda 函数处理该项目并返回结果。

您将创建一个对整数数组执行乘法的状态机。假设输入的整数数组是 [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],乘法因子是 7。这些整数与乘法因子 7 相乘后形成的数组将是 [7, 14, 21, 28, 35, 42, 49, 56, 63, 70]

第 1 步:创建状态机

在此步骤中,您将创建状态机的工作流原型,该原型会将一批项目中的单个项目传递给您将在第 2 步中创建的 Lambda 函数的每次调用。

  • 使用以下定义通过 Step Functions 控制台创建状态机。有关创建状态机的信息,请参阅开始使用分布式 Map 状态教程中的第 1 步:创建工作流原型

    在此状态机中,您将定义一个分布式 Map 状态,该状态接受 10 个整数的数组作为输入,并将这些数组项分批传递给子工作流执行。每个子工作流程执行都会接收一批三个项目作为输入,并运行一个内联 Map 状态内联 Map 状态的每次迭代都会调用 Lambda 函数,并将批次中的一个项目传递给该函数。然后,此函数将该项与系数 7 相乘并返回结果。

    每个子工作流执行的输出都是一个 JSON 数组,其中包含每个传递项的乘法结果。

    { "StartAt": "Pass", "States": { "Pass": { "Type": "Pass", "Next": "Map", "Result": { "MyMultiplicationFactor": 7, "MyItems": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] } }, "Map": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "STANDARD" }, "StartAt": "InnerMap", "States": { "InnerMap": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "INLINE" }, "StartAt": "Lambda Invoke", "States": { "Lambda Invoke": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:functionName" }, "Retry": [ { "ErrorEquals": [ "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException", "Lambda.TooManyRequestsException" ], "IntervalSeconds": 2, "MaxAttempts": 6, "BackoffRate": 2 } ], "End": true } } }, "End": true, "ItemsPath": "$.Items", "ItemSelector": { "MyMultiplicationFactor.$": "$.BatchInput.MyMultiplicationFactor", "MyItem.$": "$$.Map.Item.Value" } } } }, "End": true, "Label": "Map", "MaxConcurrency": 1000, "ItemsPath": "$.MyItems", "ItemBatcher": { "MaxItemsPerBatch": 3, "BatchInput": { "MyMultiplicationFactor.$": "$.MyMultiplicationFactor" } } } } }

第 2 步:创建 Lambda 函数

在此步骤中,您将创建 Lambda 函数,用于处理批次传递的每个项目。

重要

确保您的 Lambda 函数与状态机处于同一 Amazon Web Services 区域。

创建 Lambda 函数
  1. 使用 Lambda 控制台创建一个 Python 3.9 Lambda 函数,命名为 ProcessSingleItem。有关创建 Lambda 函数的信息,请参阅开始使用分布式 Map 状态教程中的第 4 步:配置 Lambda 函数

  2. 复制以下 Lambda 函数代码,并将其粘贴到 Lambda 函数的代码源部分。

    import json def lambda_handler(event, context): multiplication_factor = event['MyMultiplicationFactor'] item = event['MyItem'] result = multiplication_factor * item return { 'statusCode': 200, 'multiplied': result }
  3. 创建 Lambda 函数后,复制显示在页面右上角的函数 ARN。要复制 ARN,请单击 
                  icon to copy the Lambda function's Amazon Resource Name
                。以下是一个 ARN 示例,其中 function-name 是 Lambda 函数的名称(在本例中为 ProcessSingleItem):

    arn:aws:lambda:us-east-1:123456789012:function:function-name

    您需要在第 1 步中创建的状态机中提供函数 ARN。

  4. 选择部署,以部署更改。

第 3 步:运行状态机

运行状态机时,分布式 Map 状态会启动四个子工作流程执行,其中每个执行处理三个项目,而一个执行处理单个项目。

以下示例显示了在子工作流执行中传递给其中一个 ProcessSingleItem 函数调用的数据。

{ "MyMultiplicationFactor": 7, "MyItem": 1 }

给定此输入,以下示例显示了 Lambda 函数返回的输出。

{ "statusCode": 200, "multiplied": 7 }

下面的示例显示了其中一个子工作流执行的输出 JSON 数组。

[ { "statusCode": 200, "multiplied": 7 }, { "statusCode": 200, "multiplied": 14 }, { "statusCode": 200, "multiplied": 21 } ]

状态机返回以下输出,其中包含四个子工作流执行的四个数组。这些数组包含各个输入项的乘法结果。

最后,状态机输出是一个名为 multiplied 的数组,组合了为四个子工作流执行返回的所有乘法结果。

[ [ { "statusCode": 200, "multiplied": 7 }, { "statusCode": 200, "multiplied": 14 }, { "statusCode": 200, "multiplied": 21 } ], [ { "statusCode": 200, "multiplied": 28 }, { "statusCode": 200, "multiplied": 35 }, { "statusCode": 200, "multiplied": 42 } ], [ { "statusCode": 200, "multiplied": 49 }, { "statusCode": 200, "multiplied": 56 }, { "statusCode": 200, "multiplied": 63 } ], [ { "statusCode": 200, "multiplied": 70 } ] ]

要将子工作流执行返回的所有乘法结果合并到单个输出数组中,可以使用 ResultSelector 字段。在分布式 Map 状态 中定义此字段以查找所有结果,提取单个结果,然后将它们组合成一个名为 multiplied 的输出数组。

要使用 ResultSelector 字段,请更新您的状态机定义,如以下示例所示。

{ "StartAt": "Pass", "States": { ... ... "Map": { "Type": "Map", ... ... "ItemBatcher": { "MaxItemsPerBatch": 3, "BatchInput": { "MyMultiplicationFactor.$": "$.MyMultiplicationFactor" } }, "ItemsPath": "$.MyItems", "ResultSelector": { "multiplied.$": "$..multiplied" } } } }

更新的状态机返回统一的输出数组,如以下示例所示。

{ "multiplied": [7, 14, 21, 28, 35, 42, 49, 56, 63, 70] }