使用分布式 Map 复制大规模 CSV 数据 - Amazon Step Functions
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用分布式 Map 复制大规模 CSV 数据

本教程将帮助您开始在分布式模式下使用 Map 状态。设置为分布式Map 状态被称为分布式 Map 状态。您可以在工作流中使用分布式 Map 状态来迭代大规模 Amazon S3 数据来源。Map 状态将每次迭代作为子工作流执行来运行,从而实现高并发数。有关分布式模式的更多信息,请参阅分布式模式下的 Map 状态

在本教程中,您将使用分布式地图状态对 Amazon S3 存储桶中的 CSV 文件进行迭代。然后,您将其内容以及子工作流执行的 ARN 返回到另一个 Amazon S3 存储桶中。首先,在 Workflow Studio 中创建一个工作流原型。接下来,将 Map 状态的处理模式设置为“分布式”,将 CSV 文件指定为数据集,然后将其位置提供给该 Map 状态。您还可以为子工作流执行指定工作流类型,分布式 Map 状态快速方式启动。

除了这些设置外,您还可以为本教程中使用的示例工作流指定其他配置,例如并发子工作流执行的最大数量和导出 Map 结果的位置。

先决条件

  • 将 CSV 文件上传到 Amazon S3 存储桶。您必须在 CSV 文件中定义标题行。有关对 CSV 文件的大小限制以及如何指定标题行的信息,请参阅 Amazon S3 存储桶中的 CSV 文件

  • 创建另一个 Amazon S3 存储桶,并在其中创建的一个文件夹,以便将 Map 状态结果导出到该存储桶中。

重要

确保您的 Amazon S3 存储桶 Amazon Web Services 账户 与状态机 Amazon Web Services 区域 相同。

第 1 步:创建工作流原型

在此步骤中,您将使用 Workflow Studio 为工作流创建原型。Workflow Studio 是一款可视化工作流设计器,可在 Step Functions 控制台中使用。您可以分别从操作选项卡中选择所需的状态和 API 操作。您将使用 Workflow Studio 的拖放特征来创建工作流原型。

  1. 打开 Step Functions 控制台,然后选择创建状态机

  2. 选择模板对话框中,选择空白

  3. 选择选择。这将在设计模式中打开 Workflow Studio。

  4. 选项卡中,将 Map 状态拖放到标有将第一个状态拖至此处的空白状态处。

  5. 配置选项卡下,在状态名称中输入 Process data

  6. 操作选项卡中,将 Amazon Lambda 调用 API 操作拖放到 Process data 状态中。

  7. Amazon Lambda 调用状态重命名为 Process CSV data

第 2 步:配置 Map 状态的必填字段

在此步骤中,您将配置分布式 Map 状态的以下必填字段:

  • ItemReader— 指定数据集及其位置,该Map州可以从中读取输入。

  • ItemProcessor – 指定以下值:

    • ProcessorConfig – 将 ModeExecutionType 分别设置为 DISTRIBUTEDEXPRESS。这将为分布式 Map 状态启动的子工作流执行设置 Map 状态的处理模式和工作流类型。

    • StartAt – Map 工作流中的第一个状态。

    • States – 定义 Map 工作流程,这是在每个子工作流执行中要重复的一组步骤。

  • ResultWriter— 指定 Step Functions 写入分布式地图状态结果的 Amazon S3 位置。

    重要

    确保用于导出 Map Run 结果的 Amazon S3 存储桶 Amazon Web Services 账户 与 Amazon Web Services 区域 您的状态机相同。否则,您的状态机执行将因 States.ResultWriterFailed 错误而失败。

要配置必填句字段,请执行以下操作:
  1. 选择 Process data 状态,然后在配置选项卡中执行以下操作:

    1. 对于处理模式,选择分布式

    2. 对于项目来源,选择 Amazon S3,然后从 S3 项目来源下拉列表中选择 S3 中的 CSV 文件

    3. 执行以下操作来指定 CSV 文件的 Amazon S3 位置:

      1. 对于 S3 对象,请从下拉列表中选择输入存储桶和密钥

      2. 对于存储桶,输入包含 CSV 文件的 Amazon S3 存储桶的名称。例如,sourceBucket

      3. 对于密钥,输入保存 CSV 文件的 Amazon S3 对象的名称。您还必须在此字段中指定 CSV 文件的名称。例如,csvDataset/ratings.csv

    4. 对于 CSV 文件,还必须指定列标题的位置。为此,请选择其他配置,如果 CSV 文件的第一行是标题,对于 CSV 标题位置,请保留第一行的默认选择。否则,请选择给定以在状态机定义中指定标题。有关更多信息,请参阅ReaderConfig

    5. 对于子执行类型,请选择快速

  2. 导出位置中,要将 Map Run 结果导出到特定的 Amazon S3 位置,请选择将 Map 状态的输出导出到 Amazon S3

  3. 执行以下操作:

    1. 对于 S3 存储桶,请从下拉列表中选择输入存储桶名称和前缀

    2. 对于存储桶,输入要将结果导出到的 Amazon S3 存储桶的名称。例如,mapOutputs

    3. 对于前缀,输入要将结果保存到的文件夹名称。例如,resultData

第 3 步:配置其他选项

除了分布式 Map 状态 所需的设置外,您还可以指定其他选项。这些选项可以包括子工作流并发执行的最大数量和导出 Map 状态结果的位置。

  1. 选择 Process data 状态。然后,在项目来源中,选择其他配置

  2. 执行以下操作:

    1. 选择 “修改项目”, ItemSelector为每个子工作流程执行指定自定义 JSON 输入。

    2. 输入以下 JSON 输入:

      { "index.$": "$$.Map.Item.Index", "value.$": "$$.Map.Item.Value" }

      有关如何创建自定义输入的信息,请参阅 ItemSelector

  3. 运行时设置中,对于并发限制,指定分布式 Map 状态 可以启动的并发子工作流执行数量。例如,输入 100

  4. 在浏览器中打开一个新窗口或选项卡,完成您将在此工作流中使用的 Lambda 函数的配置,如第 4 步:配置 Lambda 函数中所述。

第 4 步:配置 Lambda 函数

重要

确保您的 Lambda 函数与状态机处于 Amazon Web Services 区域 同一位置。

  1. 打开 Lambda 控制台,然后选择创建函数

  2. 创建函数页面上,选择从头开始创作

  3. 基本信息部分中,配置您的 Lambda 函数:

    1. 对于 Function name(函数名称),请输入 distributedMapLambda

    2. 对于运行时系统,选择 Node.js 16.x

    3. 保留所有默认选项,然后选择创建函数

    4. 创建 Lambda 函数后,复制页面右上角显示的函数 Amazon 资源名称 (ARN)。您需要在工作流原型中提供此信息。要复制 ARN,请单击 
                                    copy Amazon Resource Name
                                。以下是示例 ARN:

      arn:aws:lambda:us-east-2:123456789012:function:distributedMapLambda
  4. 复制以下 Lambda 函数的代码,然后将其粘贴到页面的distributedMapLambda代码源部分。

    exports.handler = async function(event, context) { console.log("Received Input:\n", event); return { 'statusCode' : 200, 'inputReceived' : event //returns the input that it received } };
  5. 选择部署。函数部署后,选择测试,查看您的 Lambda 函数的输出。

第 5 步:更新工作流原型

在 Step Functions 控制台中,您将更新工作流,添加 Lambda 函数的 ARN。

  1. 返回到创建工作流原型的选项卡或窗口。

  2. 选择处理 CSV 数据步骤,然后在配置选项卡中执行以下操作:

    1. 对于集成类型,请选择已优化

    2. 对于函数名称,输入 Lambda 函数名称。从出现的下拉列表中选择函数,或者选择输入函数名称并提供 Lambda 函数 ARN。

第 6 步:查看自动生成的 Amazon States Language 定义并保存工作流

当您将状态从操作选项卡拖放到画布上时,Workflow Studio 会自动实时撰写工作流的 Amazon States Language 定义。您可以根据需要编辑此定义。

  1. (可选)在 Inspector 面板上选择定义,然后查看状态机定义。

    提示

    您也可以在 Workflow Studio 的代码编辑器中查看 ASL 的定义。在代码编辑器中,还可以编辑工作流的 ASL 定义。

    以下示例代码显示了为您的工作流自动生成的 Amazon States Language 定义。

    { "Comment": "Using Map state in Distributed mode", "StartAt": "Process data", "States": { "Process data": { "Type": "Map", "MaxConcurrency": 100, "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "sourceBucket", "Key": "csvDataset/ratings.csv" } }, "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "EXPRESS" }, "StartAt": "Process CSV data", "States": { "Process CSV data": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-2:123456789012:function:distributedMapLambda" }, "End": true } } }, "Label": "Processdata", "End": true, "ResultWriter": { "Resource": "arn:aws:states:::s3:putObject", "Parameters": { "Bucket": "mapOutputs", "Prefix": "resultData" } }, "ItemSelector": { "index.$": "$$.Map.Item.Index", "value.$": "$$.Map.Item.Value" } } } }
  2. 为状态机指定一个名称。为此,请选择默认状态机名称旁边的编辑图标MyStateMachine。然后,找到状态机配置,在状态机名称框中指定一个名称。

    对于本教程,请输入名称 DistributedMapDemo

  3. (可选)在状态机配置中,指定其他工作流设置,例如状态机类型及其执行角色。

    在本教程中,请保留状态机配置中的所有默认选项。

  4. 确认角色创建对话框中,选择确认继续。

    您也可以选择查看角色设置,返回至状态机配置

    注意

    如果您删除了 Step Functions 创建的 IAM 角色,Step Functions 在以后无法重新创建该角色。同样,如果您修改了该角色(例如,通过在 IAM 策略中从主体中删除 Step Functions),Step Functions 在以后也无法还原其原始设置。

第 7 步:运行状态机

执行是状态机的一个实例,您可以在其中运行工作流来执行任务。

  1. DistributedMapDemo页面上,选择开始执行

  2. 启动执行对话框中,执行以下操作:

    1. (可选)要识别您的执行,您可以在名称框中为其指定一个名称。默认情况下,Step Functions 会自动生成一个唯一的执行名称。

      注意

      Step Functions 允许您为状态机、执行、活动、速率控制和包含非 ASCII 字符的标签创建名称。这些非 ASCII 名称不适用于亚马逊。 CloudWatch为确保您可以跟踪 CloudWatch 指标,请选择仅使用 ASCII 字符的名称。

    2. (可选)在输入框中,以 JSON 格式输入输入值以便运行工作流。

    3. 选择启动执行

    4. Step Functions 控制台会将您引导到一个以您的执行 ID 为标题的页面。该页面被称为执行详细信息页面。在此页面上,您可以随着执行的进展或者在执行完成后查看执行结果。

      要查看执行结果,请在图表视图上选择各个状态,然后在步骤详细信息窗格中选择各个选项卡,分别查看每个状态的详细信息,包括输入、输出和定义。有关可在执行详细信息页面上查看的执行信息的详细信息,请参阅“执行详细信息”页面 – 界面概述

    例如,选择 Map 状态,然后选择 Map Run,打开 Map Run 详细信息页面。在此页面上,您可以查看分布式 Map 状态的所有执行细节及其启动的子工作流执行。有关该页面的信息,请参阅检查 Map Run