使用分布式模式下的 Map 状态编排大规模并行工作负载 - Amazon Step Functions
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用分布式模式下的 Map 状态编排大规模并行工作负载

借助 Step Functions,您可以编排大规模的并行工作负载来执行任务,例如按需处理半结构化数据。这些并行工作负载允许您同时处理存储在 Amazon S3 中的大规模数据来源。例如,您可以处理包含大量数据的单个 JSON 或 CSV 文件。或者,您也可以处理一大组 Amazon S3 对象。

要在工作流中设置大规模的并行工作负载,请添加一个分布式模式下的 Map 状态。Map 状态 可同时处理数据集中的项目。设置为分布式Map 状态被称为分布式 Map 状态。在分布式模式下,Map 状态允许高并发处理。在分布式模式下,Map 状态在称为子工作流执行 迭代中处理数据集中的项目。您可以指定可并行运行的子工作流执行的数量。每个子工作流执行都有自己的、独立于父工作流的执行历史记录。如果您未指定,Step Functions 将并行运行 1 万个并行子工作流执行。

下图说明了如何在工作流中设置大规模的并行工作负载。


            图解说明编排大规模并行工作负载的概念。

关键术语

分布式模式

Map 状态的一种处理模式。在此模式下,Map 状态的每次迭代都作为子工作流执行来运行,从而实现高并发数。每个子工作流执行都有自己的执行历史记录,独立于父工作流的执行历史。该模式支持从大规模 Amazon S3 数据来源读取输入。

分布式 Map 状态

设置为分布式处理模式的 Map 状态。

Map 工作流

Map 状态运行的一组步骤。

父工作流

包含一个或多个分布式 Map 状态的工作流。

子工作流执行

分布式 Map 状态的一次迭代。子工作流执行有自己的执行历史记录,独立于父工作流的执行历史。

Map Run

当您运行分布式模式下的 Map 状态时,Step Functions 会创建一个 Map Run 资源。Map Run 是指分布式 Map 状态 启动的一组子工作流执行,以及控制这些执行的运行时设置。Step Functions 会为 Map Run 分配一个 Amazon 资源名称 (ARN)。您可以在 Step Functions 控制台中查看 Map Run。您也可以调用 DescribeMapRun API 操作。Map Run 还会向 CloudWatch 发出指标。

有关更多信息,请参阅检查 Map Run

分布式 Map 状态定义示例

当您需要编排满足以下任意条件组合的大规模并行工作负载时,请使用分布式模式的 Map 状态:

  • 数据集的大小超过 256 KB。

  • 该工作流程的执行事件历史记录超过 2.5 万个条目。

  • 您需要一个超过 40 次并行迭代的并发数。

下面的分布式 Map 状态 定义示例将数据集指定为存储在 Amazon S3 存储桶中的 CSV 文件。它还指定了一个 Lambda 函数,用于处理 CSV 文件每一行中的数据。由于此示例使用了一个 CSV 文件,因此它还指定了 CSV 列标题的位置。要查看此示例的完整状态机定义,请参阅教程使用分布式 Map 复制大规模 CSV 数据

{ "Map": { "Type": "Map", "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "Database", "Key": "csv-dataset/ratings.csv" } }, "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "EXPRESS" }, "StartAt": "LambdaTask", "States": { "LambdaTask": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-2:123456789012:function:processCSVData" }, "End": true } } }, "Label": "Map", "End": true, "ResultWriter": { "Resource": "arn:aws:states:::s3:putObject", "Parameters": { "Bucket": "myOutputBucket", "Prefix": "csvProcessJobs" } } } }

运行分布式 Map 的权限

当您在工作流中包含分布式 Map 状态 时,Step Functions 需要适当的权限才能允许状态机角色为分布式 Map 状态 调用 StartExecution API 操作。

以下 IAM 策略示例授予您的状态机角色运行分布式 Map 状态 所需的最低权限。

注意

确保将 stateMachineName 替换为使用分布式 Map 状态 的状态机的名称。例如,arn:aws:states:us-east-2:123456789012:stateMachine:mystateMachine

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "states:StartExecution" ], "Resource": [ "arn:aws:states:region:accountID:stateMachine:stateMachineName" ] }, { "Effect": "Allow", "Action": [ "states:DescribeExecution", "states:StopExecution" ], "Resource": "arn:aws:states:region:accountID:execution:stateMachineName:*" } ] }

此外,您需要确保您拥有访问分布式 Map 状态 中使用的 Amazon 资源所需的最低权限,例如 Amazon S3 存储桶。有关信息,请参阅使用分布式 Map 状态的 IAM 策略

分布式 Map 状态字段

要在工作流中使用分布式 Map 状态,请指定其中一个或多个字段。除了公共状态字段外,您还可以指定以下字段。

Type(必填)

设置状态的类型,例如 Map

ItemProcessor(必填)

包含以下 JSON 对象,用于指定 Map 状态处理模式和定义。

  • ProcessorConfig – 一个 JSON 对象,用于指定 Map 状态的配置。此对象包含以下子字段:

    • Mode – 设置为 DISTRIBUTED,以在分布式模式下使用 Map 状态。

      注意

      目前,如果您在快速工作流中使用 Map 状态,则无法将 Mode 设置为 DISTRIBUTED。但是,如果您在标准工作流中使用 Map 状态,则可以将 Mode 设置为 DISTRIBUTED

    • ExecutionType – 将 Map 工作流的执行类型指定为标准快速。如果您为 Mode 子字段指定了 DISTRIBUTED,则必须提供此字段。有关工作流类型的更多信息,请参阅标准和快速工作流

  • StartAt– 指定表示工作流中第一个状态的字符串。该字符串区分大小写,必须与某个状态对象的名称相匹配。此状态首先针对数据集中的每个项目运行。您向 Map 状态提供的任何执行输入都将首先传递给 StartAt 状态。

  • States – 一个 JSON 对象,其中包含逗号分隔的状态集合。在此对象中,您可以定义 Map workflow

ItemReader

指定一个数据集及其位置。Map 状态接收来自指定数据集的输入数据。

在分布式模式下,您可以使用从先前状态传递的 JSON 有效负载,也可以使用大规模的 Amazon S3 数据来源作为数据集。有关更多信息,请参阅ItemReader

ItemsPath(可选)

使用 JsonPath 语法指定一个参考路径,以选择包含状态输入内项目数组的 JSON 节点。

在分布式模式下,只有使用上一步中的 JSON 数组作为状态输入时,才能指定此字段。有关更多信息,请参阅ItemsPath

ItemSelector(可选)

在将单个数据集项目的值传递到每次 Map 状态迭代之前,覆盖这些值。

在此字段中,指定包含键值对集合的有效 JSON 输入。这些对可以是您在状态机定义中定义的静态值,使用路径从状态输入中选择的值,或者从上下文对象中获取的值。有关更多信息,请参阅ItemSelector

ItemBatcher(可选)

指定批量处理数据集项目。然后,每个子工作流执行都会收到一批这些项目作为输入。有关更多信息,请参阅ItemBatcher

MaxConcurrency(可选)

指定可并行运行的子工作流数量。解释器只允许执行指定数量的并行子工作流。如果您未指定并发数或将其设置为零,Step Functions 不会限制并发数,并且会运行 1 万个并行子工作流执行。

注意

虽然您可以为并行子工作流执行指定较高的并发数限制,但我们建议您不要超过下游 Amazon 服务(例如 Amazon Lambda)的容量。

MaxConcurrencyPath(可选)

如果要使用参考路径从状态输入中动态提供最大并发数值,请使用 MaxConcurrencyPath。解决后,参考路径必须选择一个值为非负整数的字段。

注意

一个 Map 状态不能同时包含 MaxConcurrencyMaxConcurrencyPath

ToleratedFailurePercentage(可选)

定义在 Map Run 中允许的失败项目的百分比。如果超过此百分比,Map Run 将自动失败。Step Functions 通过失败或超时项目总数除以项目总数计算失败项目的百分比。您可以指定 0 到 100 之间的值。有关更多信息,请参阅分布式 Map 状态容许的故障阈值

ToleratedFailurePercentagePath(可选)

如果要使用参考路径从状态输入中动态提供容许的故障百分比值,请使用 ToleratedFailurePercentagePath。解决后,参考路径必须选择一个值介于 0 到 100 之间的字段。

ToleratedFailureCount(可选)

定义 Map Run 中允许的失败项目数量。如果超过此数量,Map Run 将自动失败。有关更多信息,请参阅分布式 Map 状态容许的故障阈值

ToleratedFailureCountPath(可选)

如果要使用参考路径从状态输入中动态提供容许的故障计数值,请使用 ToleratedFailureCountPath。解决后,参考路径必须选择一个值为非负整数的字段。

Label(可选)

唯一标识 Map 状态的字符串。对于每个 Map Run,Step Functions 都会将标签添加到 Map Run ARN。以下是带有名为 demoLabel 的自定义标签的 Map Run ARN 的示例:

arn:aws:states:us-east-1:123456789012:mapRun:demoWorkflow/demoLabel:3c39a231-69bb-3d89-8607-9e124eddbb0b

如果您未指定标签,则 Step Functions 会自动生成一个唯一标签。

注意

标签长度不能超过 40 个字符,在状态机定义中必须唯一,并且不能包含任何以下字符:

  • 空格字符

  • 通配符 (? *)

  • 方括号字符 (< > { } [ ])

  • 特殊字符 (: ; , \ | ^ ~ $ # % & ` ")

  • 控制字符(\\u0000 - \\u001f\\u007f - \\u009f

Step Functions 允许您为状态机、执行、活动、速率控制和包含非 ASCII 字符的标签创建名称。这些非 ASCII 名称不适用于 Amazon CloudWatch。为确保您可以跟踪 CloudWatch 指标,请选择一个仅使用 ASCII 字符的名称。

ResultWriter(可选)

指定 Step Functions 写入所有子工作流执行结果的 Amazon S3 位置。

Step Functions 整合了所有子工作流执行数据,例如执行输入和输出、ARN 和执行状态。然后,它将状态相同的执行导出到指定 Amazon S3 位置的相应文件中。有关更多信息,请参阅ResultWriter

如果您不导出 Map 状态结果,它将返回一个包含所有子工作流执行结果的数组。例如:

[1, 2, 3, 4, 5]
ResultPath(可选)

指定输入中放置迭代输出的位置。接下来,输入将按照 OutputPath 字段(如果存在)指定的内容进行筛选,然后再用作状态输出传递。有关更多信息,请参阅输入和输出处理

ResultSelector(可选)

传递一个键值对集合,其中,值为静态值或从结果中选择的值。有关更多信息,请参阅ResultSelector

提示

如果您在状态机中使用的 Parallel 或 Map 状态返回由数组组成的数组,您可以使用 ResultSelector 字段将他们转换为一个平面数组。有关更多信息,请参阅展平由数组组成的数组

Retry(可选)

一个称为重试器的对象数组,用于定义重试策略。如果状态遇到运行时错误,则执行将使用重试策略。有关更多信息,请参阅使用 Retry 和使用 Catch 的状态机示例

注意

如果您为分布式 Map 状态 定义了重试器,则重试策略将应用于 Map 状态启动的所有子工作流执行。例如,假设 Map 状态启动了三个子工作流执行,其中一个失败了。失败发生时,将执行将使用 Retry 字段(如果已定义),用于 Map 状态。重试策略应用于所有子工作流执行,而不仅仅是失败的执行。如果一个或多个子工作流执行失败,则 Map Run 将失败。

当您重试某个 Map 状态时,它会创建一个新的 Map Run。

Catch(可选)

一个称为捕获器的对象数组,用于定义回退状态。如果状态遇到运行时错误,Step Functions 将使用 Catch 中定义的捕获器。发生错误时,执行会首先使用 Retry 中定义的任何重试器。如果重试策略未定义或已用尽,则执行将使用其捕获器(如果已定义)。有关更多信息,请参阅回退状态

后续步骤

要继续了解有关分布式 Map 状态的更多信息,请参阅以下资源: