在分布式模式下使用地图状态 - Amazon Step Functions
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在分布式模式下使用地图状态

重要

目前,该Map州的分布式模式仅在商业区域可用。有关在工作流程中包含Map状态的信息,请参阅映射

Step Functions 为称为分布式模式的Map状态提供高并发模式。在此模式下,该Map州可以接受来自大规模 Amazon S3 数据源的输入。例如,您的输入可以是存储在 Amazon S3 存储桶中的 JSON 或 CSV 文件,也可以是从工作流程中先前步骤传递的 JSON 数组。设置为 “分布式” 的Map状态称为分布式地图状态。在此模式下,该Map状态将每次迭代作为子工作流执行运行,这样可以实现高并行执行多达 10,000 个子工作流的parallel 执行。每个子工作流程的执行都有自己独立于父工作流程的执行历史记录。

当您需要协调满足以下条件任意组合的大规模parallel 工作负载时,在分布式模式下使用该Map状态:

  • 数据集的大小超过 256 KB。

  • 工作流程的执行事件历史记录超过 25,000 个条目。

  • 您需要超过 40 次并行迭代的parallel 迭代。

有关处理大规模parallel 工作流程的更多信息,请参阅在状态机中协调大规模parallel 工作负载

有关使用分布式地图状态的简介,请参阅使用分布式地图复制大规模 CSV 数据教程。

提示

要向您Amazon Web Services 账户部署使用分布式地图状态的工作流程示例,请参阅 The Workshop 模块 14-数据处理中的使用分布式地图进行大规模并行Amazon Step Functions化

本主题中的关键概念

分布式模式

Map状态的处理模式。在此模式下,Map状态的每次迭代都作为子工作流执行运行,从而实现高并发性。每个子工作流程的执行都有自己的执行历史记录,该历史记录与父工作流程的执行历史是分开的。此模式支持从大型 Amazon S3 数据源读取输入。

分布式地图状态

一种Map状态设置为分布式处理模式。

映射工作流程数

Map状态运行的一组步骤。

执行子工作流程数

分布式地图状态的迭代。子工作流程执行有自己的执行历史记录,该历史记录与父工作流程的执行历史是分开的。

地图运行

在分布式模式下运行Map状态时,Step Functions 会创建 Map Run 资源。Map Run 是指分布式地图状态启动的一组子工作流执行以及控制这些执行的运行时设置。Step functictionctionctionctioncARN onctionctionctionss 您可以在 Step Functions 控制台中查看 Map Run。您也可以调用DescribeMapRun API 操作。Map Run 还会向发送指标 CloudWatch。

有关更多信息,请参阅检查 Map Run

分布式地图状态字段

要在工作流中使用 “分布式地图” 状态,请指定其中一个或多个字段。除了常用状态字段外,您还可以指定这些字段

Type (必需)

设置状态的类型,例如Map

ItemProcessor (必需)

包含以下 JSON 对象,用于指定Map状态处理模式和定义。

  • ProcessorConfig— 一个 JSON 对象,用于指定Map状态的配置。对象包含以下子字段:

    • Mode— 设置DISTRIBUTED为在分布式模式下使用Map状态。目前,您无法在 Express 工作流程DISTRIBUTEDMode将设置为。

    • ExecutionType— 将地图工作流的执行类型指定为 “标准” 或 “速”。如果您为Mode子字段指定DISTRIBUTED了此字段,则必须提供此字段。有关工作流程类型的更多信息,请参阅标准工作流程与快速工作流程

  • StartAt— 指定一个字符串,该字符串表示工作流中的第一个状态。此字符串区分大小写,并且必须与其中一个状态对象的名称相匹配。此状态首先针对数据集中的每个项目运行。您向Map状态提供的任何执行输入都将首先传递给该StartAt状态。

  • States— 包含一组以逗号分隔的状态的 JSON 对象。在此对象中,您可以定义Map workflow.

ItemReader(可选)

指定数据集及其位置。该Map州从指定数据集接收其输入数据。

在分布式模式下,您可以使用从先前状态传递的 JSON 负载或大规模 Amazon S3 数据源作为数据集。有关更多信息,请参阅ItemReader

ItemsPath(可选)

使用JsonPath语法指定参考路径,以选择在状态输入中包含一系列项目的 JSON 节点。

在分布式模式下,只有在使用上一步中的 JSON 数组作为状态输入时,才能指定此字段。有关更多信息,请参阅ItemsPath

ItemSelector(可选)

在将单个数据集项目传递到每个Map状态迭代之前,先覆盖这些项目的值。

在此字段中,您指定包含键值对集合的有效 JS 输入。这些对可以是您在状态机定义中定义的静态值,也可以是使用路径从状态输入中选择的值,也可以是从上下文对象访问的值。有关更多信息,请参阅ItemSelector

ItemBatcher(可选)

指定批量处理数据集项目。然后,每个子工作流程执行都会收到一批这些项目作为输入。有关更多信息,请参阅ItemBatcher

MaxConcurrency(可选)

指定可以parallel 运行的子工作流程执行次数。解释器最多只允许指定数量的parallel 子工作流程执行。如果您未指定并发值或将其设置为零,则 Step Functions 不会限制parallel 执行 10,000 个子工作流程。

注意

虽然您可以为parallel 子工作流程执行指定更高的并发限制,但我们建议您不要超过下游Amazon服务的容量,例如Amazon Lambda。

ToleratedFailurePercentage(可选)

定义在地图运行中允许的失败物品的百分比。如果超过此百分比,Map Run 会自动失败。Step Functions 通过失败或超时项目总数除以项目总数的结果计算失败项的百分比。您必须指定一个介于零到 100 之间的值。有关更多信息,请参阅容许失效阈值

ToleratedFailureCount(可选)

定义在 Map Run 中允许的失败物品数量。如果超过此数字,Map Run 将自动失败。有关更多信息,请参阅容许失效阈值

Label(可选)

唯一标识Map状态的字符串。对于每次运行地图,Step Functions 都会向地图运行 ARN 添加标签。以下是带有名为的自定义标签的 Map Run ARN 的示例demoLabel

arn:aws:states:us-east-1:123456789012:mapRun:demoWorkflow/demoLabel:3c39a231-69bb-3d89-8607-9e124eddbb0b

如果未指定标签,则 Step funcctictionctionctionctionctionctions

注意

标签长度上限为 40 个字符,在状态机定义中必须为唯一字符,并且不能包含以下任何字符:

  • 空格字符

  • 通配符 (? *)

  • 方括号字符 (< > { } [ ])

  • 特殊字符 (: ; , \ | ^ ~ $ # % & ` ")

  • 控制字符(\\u0000-\\u001f\\u007f-\\u009f)。

Step Functions 允许您为包含非 ASCII 字符的状态机、执行、活动和标签创建名称。这些非 ASCII 名称不适用于亚马逊 CloudWatch。为确保您可以跟踪 CloudWatch 指标,请选择仅使用 ASCII 字符的名称。

ResultWriter(可选)

指定将所有子工作流程执行结果写入Step Functions 的 Amazon S3 位置。

Step Functions 整合所有子工作流程执行数据,例如执行输入和输出、ARN 和执行状态。然后,它将具有相同状态的命令导出到指定的 Amazon S3 位置的相应文件中。有关更多信息,请参阅ResultWriter

如果您不导出Map状态结果,它将返回所有子工作流程执行结果的数组。例如:

[1, 2, 3, 4, 5]
ResultPath(可选)

指定在输入中的哪个位置放置迭代的输出。然后,按照OutputPath字段的指定筛选输入(如果存在),然后将其作为状态的输出传递。有关更多信息,请参阅输入和输出处理

ResultSelector(可选)

传递键值对集合,其中,值为静态值对或从结果中选择。有关更多信息,请参阅ResultSelector

Retry(可选)

一个名为 Retriers 的对象数组,用于定义重试策略。如果状态遇到运行时错误,则会使用重试策略。有关更多信息,请参阅使用重试和使用 Catch 的示例

注意

如果您为 “分布式地图” 状态定义了 Retriers,则重试策略将应用于该Map状态启动的所有子工作流程执行。例如,假设您的Map州启动了三个子工作流程执行,其中一个失败。当失败发生时,执行将使用该Retry字段(如果已定义)作为Map状态。重试策略适用于所有子工作流程的执行,而不仅仅是失败的执行。如果一个或多个子工作流程执行失败,Map Run 将失败。

当你重试某个Map状态时,它会创建一个新的 Map Run。

Catch(可选)

一个称为捕获器的对象的数组,用于定义回退状态。Catch如果状态遇到运行时错误,Step Functions 会使用中定义的 Catchers。发生错误时,执行将首先使用中定义的任何重试器Retry。如果未定义重试策略或已用尽重试策略,则执行将使用其 Catchers(如果已定义)。有关更多信息,请参阅回退状态

分布式地图状态示例

要在分布式模式下使用Map状态,必须配置以下必需选项:

  • ItemReader— 指定数据集及其位置,Map状态可以从中读取输入。

  • ItemProcessor— 指定以下值:

    • ProcessorConfigEXPRESS 分别将Mode和设置ExecutionTypeDISTRIBUTED和。这将为 “分布式地图”Map 状态启动的子工作流执行设置状态的处理模式和工作流类型。

    • StartAt— 地图工作流程中的第一个状态。

    • States— 定义 Map 工作流程,这是一组在每个子工作流执行中重复的步骤。

  • ResultWriter— 指定 Step Functions 写入分布式地图状态结果的 Amazon S3 位置。

    重要

    确保用于导出 Map Run 结果的 Amazon S3 存储桶Amazon Web Services 账户与Amazon Web Services 区域您的状态机位于同一存储桶下。否则,您的状态机执行将因States.ResultWriterFailed错误而失败。

您可以选择配置其他字段,如中所述分布式地图状态输入和输出配置

以下是分布式地图状态定义的示例,该定义将数据集指定为存储在 Amazon S3 存储桶中的 CSV 文件。它还指定了一个 Lambda 函数来处理 CSV 文件每一行中的数据。由于此示例使用 CSV 文件,因此它还指定了 CSV 列标题的位置。要查看此示例的完整状态机定义,请参阅使用分布式地图复制大规模 CSV 数据教程。

{ "Map": { "Type": "Map", "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "Database", "Key": "csv-dataset/ratings.csv" } }, "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "EXPRESS" }, "StartAt": "LambdaTask", "States": { "LambdaTask": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-2:123456789012:function:processCSVData" }, "End": true } } }, "Label": "Map", "End": true, "ResultWriter": { "Resource": "arn:aws:states:::s3:putObject", "Parameters": { "Bucket": "myOutputBucket", "Prefix": "csvProcessJobs" } } } }

分布式地图状态输入和输出配置

您可以使用分布式地图状态的字段配置其输入和输出,例如ItemReaderResultWriter

以下示例说明如何在 “分布式映射” 状态下配置输入和输出。要查看包含这些示例的完整状态机定义,请参阅使用分布式地图复制大规模 CSV 数据教程。

注意

根据您的使用案例,您可能不需要应用所有字段。有关以下字段的更多信息,请参阅映射状态输入和输出字段

ItemReader

使用该ItemReader字段指定数据集的位置,该Map州从中读取其输入数据。以下示例显示如何通过指定存储 CSV 文件的 Amazon S3 存储桶名称和对象密钥来将 CSV 文件用作数据集。

{ "Map": { "Type": "Map", ... "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "Database2022", "Key": "csv-dataset/ratings.csv" } }, ... "End": true } }
MaxItems

使用该MaxItems字段的子ItemReader字段限制该Map州可以从数据集中读取的项目数量。例如,如果您将 CSV 数据集的MaxItems子字段值指定为 90,则该Map州仅读取 CSV 文件的前 90 行,从标题行之后开始。

{ "Map": { "Type": "Map", ... "ItemReader": { "ReaderConfig": { "MaxItems": 90, "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, ... }, ... "End": true } }
ItemsPath

如果您的数据集是从工作流程中先前步骤传递的 JSON 输入,并且它包含一个数组,请使用该ItemsPath字段选择包含该数组的节点。

例如,给定以下 JSON 输入:

{ "facts": [ { "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }, { "verdict": "false", "statement_date": "6/7/2022", "statement_source": "television" }, { "verdict": "mostly-true", "statement_date": "5/18/2016", "statement_source": "news" } ] }

使用以下示例所示的ItemsPath字段选择包含数组的 JSON 节点:

"Map": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", ... } }, ... "ItemsPath": "$.facts", ... "End": true }
ItemSelector

在将单个数据集项目的值传递给Map状态迭代之前,使用该ItemSelector字段覆盖这些项目的值。要覆盖这些值,您可以指定包含键值对集合。这些对可以是您在状态机定义中提供的静态值,也可以是使用路径从状态输入中选择的值,也可以是从上下文对象访问的值。

例如,以下自定义 JSON 输入在传递到Map状态迭代之前替换了原始输入。每个子工作流程执行都会收到以下包含静态值和两个上下文对象数据项的自定义输入。

{ "Map": { "Type": "Map", ... "ItemSelector": { "foo": "bear", "index.$": "$$.Map.Item.Index", "value.$": "$$.Map.Item.Value" }, ... "End": true } }
ItemBatcher

默认情况下,该Map状态将数据集中的每个项目作为输入传递给单个子工作流程执行。使用该ItemBatcher字段处理每个子工作流程执行中的一组项目。在此字段中,您可以指定要批处理的最大项目数、以字节为单位的最大批处理大小,或者这两个值。解释器将指定数量的项目添加到 Items 数组中。然后,它将数组作为输入传递给每个子工作流程执行。例如,如果您将要批处理的最大项目数指定为 10,则解释器会将指定数量的项目添加到每个子工作流程执行的输入中的 Items 数组中。

以下示例说明如何通过指定MaxItemsPerBatch字段来批处理 10 个数据集项目:

{ "Map": { "Type": "Map", "ItemBatcher": { "MaxItemsPerBatch": 10 }, ... "End": true } }

以下示例显示了Items数组内的一批项目,Map状态将其作为输入传递给子工作流程执行:

{ "Items": [ { "rating": "3.0", "movieId": "1244", "userId": "2", "timestamp": "1192913551" }, { "rating": "4.5", "movieId": "1296", "userId": "2", "timestamp": "1192913608" }, ... ] }
ResultWriter

使用ResultWriter字段将所有子工作流程执行的结果导出Amazon S3 存储桶。如果您的输出有效负载大小超过 256 KB,将结果导出Amazon S3 存储桶会很有帮助。要导出所有子工作流程执行的结果,请指定要将结果存储到的 Amazon S3 存储桶名称和对象前缀。

{ "Map": { "Type": "Map", ... "ResultWriter": { "Resource": "arn:aws:states:::s3:putObject", "Parameters": { "Bucket": "processedOutput", "Prefix": "test-run" } }, ... "End": true } }

运行分布式地图状态的 IAM 策略

当您在工作流程中包含分布式地图状态时,Step Functions 需要相应的权限才能允许状态机角色调用分布式地图状态StartExecution的 API 操作。

以下 IAM 策略示例向您的状态机角色授予运行分布式地图状态所需的最低权限。

注意

确保stateMachineName替换为使用分布式地图状态的状态机的名称。例如,arn:aws:states:us-east-2:123456789012:stateMachine:mystateMachine

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "states:StartExecution" ], "Resource": [ "arn:aws:states:region:accountID:stateMachine:stateMachineName" ] }, { "Effect": "Allow", "Action": [ "states:DescribeExecution", "states:StopExecution" ], "Resource": "arn:aws:states:region:accountID:execution:stateMachineName/*" } ] }

此外,您需要确保拥有访问分布式地图状态下使用的Amazon资源(例如 Amazon S3 存储桶)所需的最低权限。有关信息,请参阅使用分布式地图状态的 IAM 策略

分布式地图状态执行

在分布式模式下运行Map状态时,Step Functions 会创建 Map Run 资源。地图运行是指分布式地图状态启动的一组子工作流执行。您可以在 Step Functions 控制台中查看 Map Run。您也可以调用DescribeMapRun API 操作。Map Run 还会向发送指标 CloudWatch。

Step Functions 控制台提供了 “地图运行详细信息” 页面,该页面显示了与分布式地图状态执行有关的所有信息。例如,您可以查看分布式地图状态的执行状态、Map Run ARN、由分布式地图状态启动的子工作流执行中处理的项目的状态以及所有子工作流程执行的列表。控制台以仪表板格式显示此信息。

有关在控制台中查看分布式地图状态执行情况的更多信息,请参阅检查 Map Run