本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
在 Step Functions 中使用分布式 Map 复制大规模 CSV 数据
本教程将帮助您开始在分布式模式下使用 Map
状态。设置为分布式的 Map
状态被称为分布式 Map 状态。您可以在工作流中使用分布式 Map 状态来迭代大规模 Amazon S3 数据来源。Map
状态将每次迭代作为子工作流执行来运行,从而实现高并发数。有关分布式模式的更多信息,请参阅分布式模式下的 Map 状态。
在本教程中,您将使用分布式地图状态对 Amazon S3 存储桶中的 CSV 文件进行迭代。然后,您将其内容以及子工作流执行的 ARN 返回到另一个 Amazon S3 存储桶中。首先,在 Workflow Studio 中创建一个工作流原型。接下来,将 Map 状态的处理模式设置为“分布式”,将 CSV 文件指定为数据集,然后将其位置提供给该 Map
状态。您还可以为子工作流执行指定工作流类型,分布式 Map 状态以快速方式启动。
除了这些设置外,您还可以为本教程中使用的示例工作流指定其他配置,例如并发子工作流执行的最大数量和导出 Map
结果的位置。
先决条件
将 CSV 文件上传到 Amazon S3 存储桶。您必须在 CSV 文件中定义标题行。有关对 CSV 文件的大小限制以及如何指定标题行的信息,请参阅 Amazon S3 存储桶中的 CSV 文件。
创建另一个 Amazon S3 存储桶,并在其中创建的一个文件夹,以便将
Map
状态结果导出到该存储桶中。
重要
确保您的 Amazon S3 存储桶与您的状态机处在同一个 Amazon Web Services 账户和 Amazon Web Services 区域下。
第 1 步:创建工作流原型
在此步骤中,您将使用 Workflow Studio 为工作流创建原型。Workflow Studio 是一款可视化工作流设计器,可在 Step Functions 控制台中使用。您可以分别从流和操作选项卡中选择所需的状态和 API 操作。您将使用 Workflow Studio 的拖放特征来创建工作流原型。
打开 Step Functions 控制台
,然后选择创建状态机。 在 选择模板对话框中,选择空白。
-
选择选择,以便在设计模式下打开工作流程工作室。
从流选项卡中,将 Map 状态拖放到标有将第一个状态拖至此处的空白状态处。
在配置选项卡下,在状态名称中输入
Process data
。从操作选项卡中,将 Amazon Lambda 调用 API 操作拖放到 Process data 状态中。
-
将 Amazon Lambda 调用状态重命名为
Process CSV data
。
第 2 步:配置 Map 状态的必填字段
在此步骤中,您将配置分布式 Map 状态的以下必填字段:
ItemReader – 指定
Map
状态可读取输入的数据集及其位置。ItemProcessor – 指定以下值:
ProcessorConfig
– 将Mode
和ExecutionType
分别设置为DISTRIBUTED
和EXPRESS
。这将为分布式 Map 状态启动的子工作流执行设置Map
状态的处理模式和工作流类型。StartAt
– Map 工作流中的第一个状态。States
– 定义 Map 工作流程,这是在每个子工作流执行中要重复的一组步骤。
ResultWriter – 指定 Step Functions 写入分布式 Map 状态结果的 Amazon S3 位置。
重要
确保您用于导出 Map Run 结果的 Amazon S3 存储桶与状态机位于同一个 Amazon Web Services 账户和 Amazon Web Services 区域下。否则,您的状态机执行将因
States.ResultWriterFailed
错误而失败。
要配置必填句字段,请执行以下操作:
选择 Process data 状态,然后在配置选项卡中执行以下操作:
对于处理模式,选择分布式。
对于项目来源,选择 Amazon S3,然后从 S3 项目来源下拉列表中选择 S3 中的 CSV 文件。
执行以下操作来指定 CSV 文件的 Amazon S3 位置:
对于 S3 对象,请从下拉列表中选择输入存储桶和密钥。
对于存储桶,输入包含 CSV 文件的 Amazon S3 存储桶的名称。例如,
amzn-s3-demo-source-bucket
。对于密钥,输入保存 CSV 文件的 Amazon S3 对象的名称。您还必须在此字段中指定 CSV 文件的名称。例如,
csvDataset/ratings.csv
。
对于 CSV 文件,还必须指定列标题的位置。为此,请选择其他配置,如果 CSV 文件的第一行是标题,对于 CSV 标题位置,请保留第一行的默认选择。否则,请选择给定以在状态机定义中指定标题。有关更多信息,请参阅
ReaderConfig
。对于子执行类型,请选择快速。
在导出位置中,要将 Map Run 结果导出到特定的 Amazon S3 位置,请选择将 Map 状态的输出导出到 Amazon S3。
执行以下操作:
对于 S3 存储桶,请从下拉列表中选择输入存储桶名称和前缀。
对于存储桶,输入要将结果导出到的 Amazon S3 存储桶的名称。例如,
mapOutputs
。对于前缀,输入要将结果保存到的文件夹名称。例如,
resultData
。
第 3 步:配置其他选项
除了分布式 Map 状态 所需的设置外,您还可以指定其他选项。这些选项可以包括子工作流并发执行的最大数量和导出 Map
状态结果的位置。
选择 Process data 状态。然后,在项目来源中,选择其他配置。
执行以下操作:
选择使用 ItemSelector 修改项目,为每个子工作流执行指定自定义 JSON 输入。
输入以下 JSON 输入:
{
"index.$":
"$$.Map.Item.Index"
,"value.$":
"$$.Map.Item.Value"
}有关如何创建自定义输入的信息,请参阅
ItemSelector (地图)
。
在运行时设置中,对于并发限制,指定分布式 Map 状态 可以启动的并发子工作流执行数量。例如,输入
100
。-
在浏览器中打开一个新窗口或选项卡,完成您将在此工作流中使用的 Lambda 函数的配置,如第 4 步:配置 Lambda 函数中所述。
第 4 步:配置 Lambda 函数
重要
确保您的 Lambda 函数与状态机处于同一 Amazon Web Services 区域。
-
打开 Lambda 控制台
,然后选择创建函数。 -
在创建函数页面上,选择从头开始创作。
-
在基本信息部分中,配置您的 Lambda 函数:
-
对于函数名称,请输入
distributedMapLambda
。 -
对于 Runtime (运行时),选择 Node.js。
-
保留所有默认选项,然后选择创建函数。
-
创建 Lambda 函数后,复制页面右上角显示的函数 Amazon 资源名称 (ARN)。您需要在工作流原型中提供此信息。以下是示例 ARN:
arn:aws:lambda:us-east-2:123456789012:function:distributedMapLambda
-
-
复制以下 Lambda 函数代码,并将其粘贴到 distributedMapLambda 页面的代码源部分。
exports.handler = async function(event, context) { console.log("Received Input:\n", event); return { 'statusCode' : 200, 'inputReceived' : event //returns the input that it received } };
-
选择部署。函数部署后,选择测试,查看您的 Lambda 函数的输出。
第 5 步:更新工作流原型
在 Step Functions 控制台中,您将更新工作流,添加 Lambda 函数的 ARN。
返回到创建工作流原型的选项卡或窗口。
选择处理 CSV 数据步骤,然后在配置选项卡中执行以下操作:
对于集成类型,请选择已优化。
对于函数名称,输入 Lambda 函数名称。从出现的下拉列表中选择函数,或者选择输入函数名称并提供 Lambda 函数 ARN。
第 6 步:查看自动生成的 Amazon States Language 定义并保存工作流
当您将状态从操作和流选项卡拖放到画布上时,Workflow Studio 会自动实时撰写工作流的 Amazon States Language 定义。您可以根据需要编辑此定义。
-
(可选)在 检查器面板 面板上选择定义,然后查看状态机定义。
提示
您也可以在 Workflow Studio 的代码编辑器中查看 ASL 的定义。在代码编辑器中,还可以编辑工作流的 ASL 定义。
以下示例代码显示了为您的工作流自动生成的 Amazon States Language 定义。
{ "Comment": "Using Map state in Distributed mode", "StartAt": "Process data", "States": { "Process data": { "Type": "Map", "MaxConcurrency": 100, "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "amzn-s3-demo-source-bucket", "Key": "csvDataset/ratings.csv" } }, "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "EXPRESS" }, "StartAt": "Process CSV data", "States": { "Process CSV data": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-2:123456789012:function:distributedMapLambda" }, "End": true } } }, "Label": "Processdata", "End": true, "ResultWriter": { "Resource": "arn:aws:states:::s3:putObject", "Parameters": { "Bucket": "mapOutputs", "Prefix": "resultData" } }, "ItemSelector": { "index.$": "$$.Map.Item.Index", "value.$": "$$.Map.Item.Value" } } } }
-
为状态机指定一个名称。要执行此操作,请选择默认状态机名称 MyStateMachine 旁边的编辑图标。然后,找到状态机配置,在状态机名称框中指定一个名称。
对于本教程,请输入名称
DistributedMapDemo
。 -
(可选)在状态机配置中,指定其他工作流设置,例如状态机类型及其执行角色。
在本教程中,请保留状态机配置中的所有默认选项。
-
在确认角色创建对话框中,选择确认继续。
您也可以选择查看角色设置,返回至状态机配置。
注意
如果您删除了 Step Functions 创建的 IAM 角色,Step Functions 在以后无法重新创建该角色。同样,如果您修改了该角色(例如,通过在 IAM 策略中从主体中删除 Step Functions),Step Functions 在以后也无法还原其原始设置。
第 7 步:运行状态机
执行是状态机的一个实例,您可以在其中运行工作流来执行任务。
-
在 DistributedMapDemo 页面上,选择启动执行。
-
在启动执行对话框中,执行以下操作:
-
(可选)输入自定义执行名称,以便覆盖生成的默认执行名称。
非 ASCII 名称和日志记录
Step Functions 对于状态机、执行、活动和标签接受包含非 ASCII 字符的名称。由于此类字符不适用于 Amazon CloudWatch,因此我们建议您仅使用 ASCII 字符,这样您就可以在 CloudWatch 中跟踪指标。
-
(可选)在输入框中,以 JSON 格式输入输入值以便运行工作流。
-
选择启动执行。
-
Step Functions 控制台会将您引导到一个以您的执行 ID 为标题的页面。该页面被称为执行详细信息页面。在此页面上,您可以随着执行的进展或者在执行完成后查看执行结果。
要查看执行结果,请在图表视图上选择各个状态,然后在步骤详细信息窗格中选择各个选项卡,分别查看每个状态的详细信息,包括输入、输出和定义。有关可在执行详细信息页面上查看的执行信息的详细信息,请参阅执行详细信息概览。
例如,选择
Map
状态,然后选择 Map Run,打开 Map Run 详细信息页面。在此页面上,您可以查看分布式 Map 状态的所有执行细节及其启动的子工作流执行。有关该页面的信息,请参阅查看 Map Run。 -