本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
ItemBatcher (地图)
管理状态和转换数据
Step Functions 最近添加了变量JSONata,用于管理状态和转换数据。
了解如何使用变量传递数据和使用转换数据JSONata。
该ItemBatcher
字段是一个JSON对象,它指定在单个子工作流程执行中处理一组项目。在处理大型CSV文件或JSON数组或大型 Amazon S3 对象时使用批处理。
以下示例显示了 ItemBatcher
字段的语法。在以下语法中,每个子工作流执行应处理的最大项目数设置为 100。
{
"ItemBatcher": {
"MaxItemsPerBatch": 100
}
}
默认情况下,数据集中的每个项目都作为输入传递给单个子工作流执行。例如,假设您指定一个包含以下数组JSON的文件作为输入:
[
{
"verdict": "true",
"statement_date": "6/11/2008",
"statement_source": "speech"
},
{
"verdict": "false",
"statement_date": "6/7/2022",
"statement_source": "television"
},
{
"verdict": "true",
"statement_date": "5/18/2016",
"statement_source": "news"
},
...
]
对于给定的输入,每个子工作流执行都会接收一个数组项作为其输入。以下示例显示了一个子工作流执行的输入:
{
"verdict": "true",
"statement_date": "6/11/2008",
"statement_source": "speech"
}
为了帮助优化处理任务的性能和成本,请选择能够平衡项目数量和项目处理时间的批次大小。如果使用批处理,Step Functions 会将这些项目添加到一个项目数组中。然后,它将数组作为输入传递给每个子工作流执行。以下示例显示了作为输入传递给子工作流执行的一批两个项目:
{
"Items": [
{
"verdict": "true",
"statement_date": "6/11/2008",
"statement_source": "speech"
},
{
"verdict": "false",
"statement_date": "6/7/2022",
"statement_source": "television"
}
]
}
提示
要了解有关在工作流中使用 ItemBatcher
字段的更多信息,请尝试以下教程和研讨会:
模块 14 中@@ 使用分布式地图进行大规模并行化
——研讨会的数据处理 Amazon Step Functions
用于指定项目批处理的字段
要对项目进行批处理,请指定要批处理的最大项目数、最大批量大小,或者同事指定两者。您至少指定一个值才能批处理项目。
- 每个批次的最大项目数
指定每个子工作流执行处理的最大项目数。解释器将
Items
数组中批处理的项目数限制为该值。如果您同时指定批处理数量和大小,解释器会减少批次中的项目数以避免超过指定的批次大小限制。如果您未指定此值但提供了最大批处理大小的值,Step Functions 会在不超过以字节为单位的最大批处理大小的情况下,在每个子工作流执行过程中处理尽可能多的项目。
例如,假设您使用包含 1130 个节点的输入JSON文件运行执行。如果您为每批指定的最大项目数为 100,Step Functions 将创建 12 个批次。其中,11 个批次各包含 100 个项目,而第十二批包含其余 30 个项目。
或者,您也可以将每个批次的最大项目指定为分布式 Map 状态输入中现有键值对的参考路径。此路径必须解析为正整数。
例如,给定以下输入:
{
"maxBatchItems"
:500
}您可以使用参考路径(JSONPath仅限)指定要批处理的最大项目数,如下所示:
{ ... "Map": { "Type": "Map", "MaxConcurrency": 2000, "ItemBatcher": {
"MaxItemsPerBatchPath"
:"$.maxBatchItems"
} ... ... } }对于JSONata基于状态的状态,您还可以提供计算结果为正整数的JSONata表达式。
重要
您可以指定
MaxItemsPerBatch
或MaxItemsPerBatchPath (JSONPath only)
子字段,但不能同时指定两者。- 每批最大 KiB
以字节为单位指定批次的最大大小,最多 256 KiB。如果您同时指定了最大批次数量和大小,Step Functions 会减少批次中的项目数量以避免超过指定的批量大小限制。
或者,您也可以将最大批量大小指定为分布式地图状态输入中现有键值对的参考路径。此路径必须解析为正整数。
注意
如果您使用批处理但未指定最大批处理大小,则解释器会在每个子工作流程执行中处理尽可能多的项目,最多可处理 256 KiB。
例如,给定以下输入:
{
"batchSize"
:131072
}您可以使用参考路径指定最大批次大小,如下所示:
{ ... "Map": { "Type": "Map", "MaxConcurrency": 2000, "ItemBatcher": {
"MaxInputBytesPerBatchPath"
:"$.batchSize"
} ... ... } }对于JSONata基于状态的状态,您还可以提供计算结果为正整数的JSONata表达式。
重要
您可以指定
MaxInputBytesPerBatch
或MaxInputBytesPerBatchPath
(JSONPath唯一)子字段,但不能同时指定两者。- 批量输入
或者,您也可以指定要包含在传递给每个子工作流程执行的每个批次中的固定JSON输入。Step Functions 将此输入与每个子工作流执行的输入合并。例如,给定以下关于项目数组的事实检查日期的固定输入:
"ItemBatcher": {
"BatchInput": {
"factCheck"
:"December 2022"
} }每个子工作流执行都会收到以下内容作为输入:
{ "BatchInput": { "factCheck": "December 2022" }, "Items": [ { "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }, { "verdict": "false", "statement_date": "6/7/2022", "statement_source": "television" }, ... ] }
对于JSONata基于状态的状态,您可以直接向对象或数组提供JSONata表达式 BatchInput,或者在JSON对象或数组中使用JSONata表达式。