使用 Lambda 函数处理整批数据 - Amazon Step Functions
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Lambda 函数处理整批数据

在本教程中,您将使用分布式 Map 状态ItemBatcher 字段来处理 Lambda 函数中的整批项目。每批最多包含三个项目。分布式 Map 状态启动四个子工作流执行,其中每个执行处理三个项目,而一个执行处理单个项目。每个子工作流执行都会调用一个 Lambda 函数,该函数对批次中存在的各个项目进行迭代。

您将创建一个对整数数组执行乘法的状态机。假设输入的整数数组是 [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],乘法因子是 7。这些整数与乘法因子 7 相乘后形成的数组将是 [7, 14, 21, 28, 35, 42, 49, 56, 63, 70]

第 1 步:创建状态机

在此步骤中,您将创建状态机的工作流原型,该原型将整批数据传递给将在第 2 步中创建的 Lambda 函数。

  • 使用以下定义通过 Step Functions 控制台创建状态机。有关创建状态机的信息,请参阅开始使用分布式 Map 状态教程中的第 1 步:创建工作流原型

    在此状态机中,您可以定义一个分布式 Map 状态,该状态接受 10 个整数的数组作为输入,并以 3 个整数为一批将该数组传递给一个 Lambda 函数。Lambda 函数迭代批次中在的各个项目,并返回名为 multiplied 的输出数组。输出数组包含对输入数组中传递的项目执行乘法的结果。

    { "StartAt": "Pass", "States": { "Pass": { "Type": "Pass", "Next": "Map", "Result": { "MyMultiplicationFactor": 7, "MyItems": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] } }, "Map": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "STANDARD" }, "StartAt": "Lambda Invoke", "States": { "Lambda Invoke": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:functionName" }, "Retry": [ { "ErrorEquals": [ "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException", "Lambda.TooManyRequestsException" ], "IntervalSeconds": 2, "MaxAttempts": 6, "BackoffRate": 2 } ], "End": true } } }, "End": true, "Label": "Map", "MaxConcurrency": 1000, "ItemBatcher": { "MaxItemsPerBatch": 3, "BatchInput": { "MyMultiplicationFactor.$": "$.MyMultiplicationFactor" } }, "ItemsPath": "$.MyItems" } } }

第 2 步:创建 Lambda 函数

在此步骤中,您将创建 Lambda 函数,该函数可处理批次传递的所有项目。

重要

确保您的 Lambda 函数与状态机处于同一 Amazon Web Services 区域。

创建 Lambda 函数
  1. 使用 Lambda 控制台创建一个 Python 3.9 Lambda 函数,命名为 ProcessEntireBatch。有关创建 Lambda 函数的信息,请参阅开始使用分布式 Map 状态教程中的第 4 步:配置 Lambda 函数

  2. 复制以下 Lambda 函数代码,并将其粘贴到 Lambda 函数的代码源部分。

    import json def lambda_handler(event, context): multiplication_factor = event['BatchInput']['MyMultiplicationFactor'] items = event['Items'] results = [multiplication_factor * item for item in items] return { 'statusCode': 200, 'multiplied': results }
  3. 创建 Lambda 函数后,复制显示在页面右上角的函数 ARN。要复制 ARN,请单击 
                  icon to copy the Lambda function's Amazon Resource Name
                。以下是一个 ARN 示例,其中 function-name 是 Lambda 函数的名称(在本例中为 ProcessEntireBatch):

    arn:aws:lambda:us-east-1:123456789012:function:function-name

    您需要在第 1 步中创建的状态机中提供函数 ARN。

  4. 选择部署,以部署更改。

第 3 步:运行状态机

运行状态机时,分布式 Map 状态会启动四个子工作流程执行,其中每个执行处理三个项目,而一个执行处理单个项目。

以下示例显示了由其中一个子工作流执行传递给 ProcessEntireBatch 函数的数据。

{ "BatchInput": { "MyMultiplicationFactor": 7 }, "Items": [1, 2, 3] }

给定此输入,以下示例显示了 Lambda 函数返回的名为 multiplied 的输出数组。

{ "statusCode": 200, "multiplied": [7, 14, 21] }

状态机返回以下输出,其中包含为四个子工作流执行四个数组,名为 multiplied。这些数组包含各个输入项的乘法结果。

[ { "statusCode": 200, "multiplied": [7, 14, 21] }, { "statusCode": 200, "multiplied": [28, 35, 42] }, { "statusCode": 200, "multiplied": [49, 56, 63] }, { "statusCode": 200, "multiplied": [70] } ]

要将返回的所有数组项合并到一个输出数组中,可以使用 ResultSelector 字段。在分布式 Map 状态中定义此字段,用于查找所有 multiplied 数组,提取这些数组中的所有项目,然后将它们组合成一个输出数组。

要使用 ResultSelector 字段,请更新您的状态机定义,如以下示例所示。

{ "StartAt": "Pass", "States": { ... ... "Map": { "Type": "Map", ... ... "ItemsPath": "$.MyItems", "ResultSelector": { "multiplied.$": "$..multiplied[*]" } } } }

更新的状态机返回统一的输出数组,如以下示例所示。

{ "multiplied": [7, 14, 21, 28, 35, 42, 49, 56, 63, 70] }