ResultWriter(Map)
管理状态和转换数据
ResultWriter 字段是一个 JSON 对象,用于为分布式 Map 状态启动的子工作流执行的输出结果提供选项。如果选择导出,可为输出结果指定不同的格式选项,并指定用于存储它们的 Amazon S3 地址。默认情况下,Step Functions 不会导出这些结果。
ResultWriter 字段的内容
ResultWriter 字段包含以下子字段。字段的选择决定了输出的格式以及是否将其导出到 Amazon S3。
ResultWriter-
一个 JSON 对象,用于指定以下详细信息:
-
ResourceStep Functions 为导出执行结果而调用的 Amazon S3 API 操作。
-
Parameters一个 JSON 对象,用于指定存储执行输出的 Amazon S3 存储桶名称和前缀。
-
WriterConfig您可以通过此字段配置以下选项。
-
Transformation-
NONE- 除了返回工作流元数据外,还原封不动地返回子工作流执行的输出。将子工作流执行结果导出到 Amazon S3 且未指定WriterConfig时的默认选项。 -
COMPACT- 返回子工作流执行的输出。未指定ResultWriter时的默认选项。 -
FLATTEN- 返回子工作流执行的输出。如果子工作流执行输出返回一个数组,则在将结果返回到状态输出或将结果写入 Amazon S3 对象之前,此选项会先对数组进行扁平化。注意
如果子工作流执行失败,Step Functions 会原封不动地返回其执行结果。结果等同于将
Transformation设置为NONE。
-
-
OutputType-
JSON- 将结果格式化为 JSON 数组。 -
JSONL- 将结果格式化为 JSON Lines。
-
-
-
必填字段组合
ResultWriter 字段不能为空。必须至少指定其中一个子字段集。
-
WriterConfig- 用于预览格式化后的输出,而不将结果保存到 Amazon S3。 -
Resource和Parameters- 用于将结果保存到 Amazon S3,且无需额外进行格式化。 -
所有三个字段:
WriterConfig、Resource和Parameters- 用于格式化输出并将其保存到 Amazon S3。
示例配置和转换输出
以下主题演示了 ResultWriter 的可能配置设置以及不同转换选项的处理结果示例。
以下示例演示了三个字段(WriterConfig、Resources 和 Parameters)可能组合所采用的配置。
仅限 WriterConfig
此示例配置了状态输出在预览中的呈现方式,输出格式和转换均在 WriterConfig 字段中指定。不存在的 Resource 和 Parameters 字段(原本应提供 Amazon S3 存储桶规范)表示状态输出资源。结果传递到下一个状态。
"ResultWriter": { "WriterConfig": { "Transformation": "FLATTEN", "OutputType": "JSON" } }
仅限 Resources 和 Parameters
此示例将状态输出导出到指定的 Amazon S3 存储桶,不会像不存在的 WriterConfig 字段所指定的那样进行额外的格式化和转换。
"ResultWriter": { "Resource": "arn:aws:states:::s3:putObject", "Parameters": { "Bucket": "amzn-s3-demo-destination-bucket", "Prefix": "csvProcessJobs" }
所有三个字段:WriterConfig、Resources 和 Parameters
此示例会根据 WriterConfig 字段中的规范,将状态输出格式化。它还会根据 Resource 和 Parameters 字段中的规范将其导出到 Amazon S3 存储桶。
"ResultWriter": { "WriterConfig": { "Transformation": "FLATTEN", "OutputType": "JSON" }, "Resource": "arn:aws:states:::s3:putObject", "Parameters": { "Bucket": "amzn-s3-demo-destination-bucket", "Prefix": "csvProcessJobs" } }
在这些示例中,假设每个子工作流执行都会返回一个输出,该输出是一个对象数组。
[ { "customer_id": "145538", "order_id": "100000" }, { "customer_id": "898037", "order_id": "100001" } ]
这些示例演示了不同 Transformation 值的格式化输出,其中 OutputType 为 JSON。
转换 NONE
这是使用 NONE 转换时的处理结果示例。输出保持不变,并且包含工作流元数据。
[ { "ExecutionArn": "arn:aws:states:region:account-id:execution:orderProcessing/getOrders:da4e9fc7-abab-3b27-9a77-a277e463b709", "Input": ..., "InputDetails": { "Included": true }, "Name": "da4e9fc7-abab-3b27-9a77-a277e463b709", "Output": "[{\"customer_id\":\"145538\",\"order_id\":\"100000\"},{\"customer_id\":\"898037\",\"order_id\":\"100001\"}]", "OutputDetails": { "Included": true }, "RedriveCount": 0, "RedriveStatus": "NOT_REDRIVABLE", "RedriveStatusReason": "Execution is SUCCEEDED and cannot be redriven", "StartDate": "2025-02-04T01:49:50.099Z", "StateMachineArn": "arn:aws:states:region:account-id:stateMachine:orderProcessing/getOrders", "Status": "SUCCEEDED", "StopDate": "2025-02-04T01:49:50.163Z" }, ... { "ExecutionArn": "arn:aws:states:region:account-id:execution:orderProcessing/getOrders:f43a56f7-d21e-3fe9-a40c-9b9b8d0adf5a", "Input": ..., "InputDetails": { "Included": true }, "Name": "f43a56f7-d21e-3fe9-a40c-9b9b8d0adf5a", "Output": "[{\"customer_id\":\"169881\",\"order_id\":\"100005\"},{\"customer_id\":\"797471\",\"order_id\":\"100006\"}]", "OutputDetails": { "Included": true }, "RedriveCount": 0, "RedriveStatus": "NOT_REDRIVABLE", "RedriveStatusReason": "Execution is SUCCEEDED and cannot be redriven", "StartDate": "2025-02-04T01:49:50.135Z", "StateMachineArn": "arn:aws:states:region:account-id:stateMachine:orderProcessing/getOrders", "Status": "SUCCEEDED", "StopDate": "2025-02-04T01:49:50.227Z" } ]
转换 COMPACT
这是使用 COMPACT 转换时的处理结果示例。请注意,它是具有原始数组结构的子工作流执行的组合输出。
[ [ { "customer_id": "145538", "order_id": "100000" }, { "customer_id": "898037", "order_id": "100001" } ], ..., [ { "customer_id": "169881", "order_id": "100005" }, { "customer_id": "797471", "order_id": "100006" } ] ]
转换 FLATTEN
这是使用 FLATTEN 转换时的处理结果示例。请注意,它是展平为一个数组的子工作流执行数组的组合输出。
[ { "customer_id": "145538", "order_id": "100000" }, { "customer_id": "898037", "order_id": "100001" }, ... { "customer_id": "169881", "order_id": "100005" }, { "customer_id": "797471", "order_id": "100006" } ]
导出到 Amazon S3
重要
确保您用于导出 Map Run 结果的 Amazon S3 存储桶与状态机位于同一个 Amazon Web Services 账户和 Amazon Web Services 区域下。否则,您的状态机执行将因 States.ResultWriterFailed 错误而失败。
如果您的输出有效载荷大小超过 256 KiB,将结果导出到 Amazon S3 存储桶会很有帮助。Step Functions 整合了所有子工作流执行数据,例如执行输入和输出、ARN 和执行状态。然后,它将状态相同的执行导出到指定 Amazon S3 位置的相应文件中。
以下示例(使用 JSONPath)显示了用于导出子工作流执行结果的 ResultWriter 字段(带 Parameters)的语法。在此示例中,您将结果存储在名为 amzn-s3-demo-destination-bucket 存储桶中,该存储桶位于名为 csvProcessJobs 的前缀中。
{
"ResultWriter": {
"Resource": "arn:aws:states:::s3:putObject",
"Parameters": {
"Bucket": "amzn-s3-demo-destination-bucket",
"Prefix": "csvProcessJobs"
}
}
}
对于 JSONata 状态,Parameters 将替换为 Arguments。
{
"ResultWriter": {
"Resource": "arn:aws:states:::s3:putObject",
"Arguments": {
"Bucket": "amzn-s3-demo-destination-bucket",
"Prefix": "csvProcessJobs"
}
}
}
提示
在 Workflow Studio 中,您可以通过选择将 Map 状态结果导出到 Amazon S3 来导出子工作流执行结果。然后,提供您要将结果导出到其中的 Amazon S3 存储桶的名称和前缀。
Step Functions 需要适当的权限才能访问要导出结果的存储桶和文件夹。有关所需 IAM 策略的信息,请参阅 用于 ResultWriter 的 IAM 策略。
如果要导出子工作流执行结果,分布式 Map 状态执行将按以下格式返回 Map Run ARN 以及有关 Amazon S3 导出位置的数据:
{
"MapRunArn": "arn:aws:states:us-east-2:account-id:mapRun:csvProcess/Map:ad9b5f27-090b-3ac6-9beb-243cd77144a7",
"ResultWriterDetails": {
"Bucket": "amzn-s3-demo-destination-bucket",
"Key": "csvProcessJobs/ad9b5f27-090b-3ac6-9beb-243cd77144a7/manifest.json"
}
}
Step Functions 将具有相同状态的执行结果导出到各自的文件中。例如,如果您的子工作流执行结果为 500 个成功和 200 个失败,则 Step Functions 将在指定的 Amazon S3 位置为成功和失败结果创建两个文件。在此示例中,成功结果文件包含 500 个成功结果,而失败结果文件包含 200 个失败结果。
对于给定的执行尝试,Step Functions 会根据您的执行输出在指定的 Amazon S3 位置创建以下文件:
-
manifest.json– 包含 Map Run 元数据,例如导出位置、Map Run ARN 以及有关结果文件的信息。如果您redriven了 Map Run,
manifest.json文件会包含 Map Run 的所有尝试中所有成功子工作流执行的引用。但是,此文件包含对特定redrive的失败和待执行的引用。 -
SUCCEEDED_n.json– 包含所有成功执行子工作流的合并数据。n 表示文件的索引号。索引号从 0 开始。例如SUCCEEDED_1.json。 -
FAILED_n.json– 包含所有失败、超时和中止的子工作流执行的合并数据。使用此文件从失败的执行中恢复。n 表示文件的索引。索引号从 0 开始。例如FAILED_1.json。 -
PENDING_n.json– 包含由于 Map Run 失败或中止而未启动的所有子工作流执行的合并数据。n 表示文件的索引。索引号从 0 开始。例如PENDING_1.json。
Step Functions 支持最大为 5 GB 的单个结果文件。如果文件大小超过 5 GB,Step Functions 会创建另一个文件来写入超出部分的执行结果,并在文件名后面附加一个索引号。例如,如果 SUCCEEDED_0.json 文件的大小超过 5 GB,Step Functions 会创建一个 SUCCEEDED_1.json 文件来记录超出部分结果。
如果您未指定导出子工作流执行结果,则状态机执行将返回子工作流执行结果数组,如以下示例所示:
[
{
"statusCode": 200,
"inputReceived": {
"show_id": "s1",
"release_year": "2020",
"rating": "PG-13",
"type": "Movie"
}
},
{
"statusCode": 200,
"inputReceived": {
"show_id": "s2",
"release_year": "2021",
"rating": "TV-MA",
"type": "TV Show"
}
},
...
]
注意
如果返回的输出大小超过 256 KiB,则状态机执行失败并返回一个 States.DataLimitExceeded 错误。
用于 ResultWriter 的 IAM 策略
当您使用 Step Functions 控制台创建工作流时,Step Functions 可以根据工作流定义中的资源自动生成 IAM 策略。生成的策略包括允许状态机角色调用分布式 Map 状态的 StartExecution API 操作和访问 Amazon 资源(例如 Amazon S3 存储桶和对象以及 Lambda 函数)所需的最低权限。
我们建议在 IAM 策略中仅包括必需的权限。例如,如果您的工作流包含分布式模式下的 Map 状态,则将策略范围缩小到包含您的数据的特定 Amazon S3 存储桶和文件夹。
重要
如果您在分布式 Map 状态 输入中指定了 Amazon S3 存储桶和对象或前缀,并将参考路径指向现有键值对,请务必更新工作流程的 IAM 策略。将策略范围缩小到运行时该路径解析到的存储桶和对象名称。
下面的 IAM 策略示例授予使用 PutObject API 操作将子工作流执行结果写入 Amazon S3 存储桶中名为 csvJobs 的文件夹所需的最低权限。
-
{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:ListMultipartUploadParts", "s3:AbortMultipartUpload" ], "Resource": [ "arn:aws:s3:::amzn-s3-demo-destination-bucket/csvJobs/*" ] } ] }
如果要将子工作流执行结果写入的 Amazon S3 存储桶使用 Amazon Key Management Service (Amazon KMS) 密钥加密,则必须在 IAM 策略中包含必要的 Amazon KMS 权限。有关更多信息,请参阅 Amazon KMS key 加密 Amazon S3 存储桶的 IAM 权限。