ItemReader - Amazon Step Functions
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

ItemReader

ItemReader 字段是一个 JSON 对象,用于指定数据集及其位置。分布式 Map 状态使用此数据集作为其输入。以下示例显示了如果数据集是存储在 Amazon S3 存储桶中的 CSV 文件时,ItemReader 字段的语法内容。

"ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "myBucket", "Key": "csvDataset/ratings.csv" } }
提示

在 Workflow Studio 中,您可以在项目来源字段中指定数据集及其位置。

ItemReader 字段的内容

根据您的数据集,ItemReader 字段的内容会有所不同。例如,如果您的数据集是从工作流的上一个步骤传递的 JSON 数组,则 ItemReader 字段将被省略。如果您的数据集是 Amazon S3 数据来源,则此字段包含以下子字段。

ReaderConfig

一个 JSON 对象,用于指定以下详细信息:

  • InputType

    指定 Amazon S3 数据来源的类型,例如 CSV 文件、对象、JSON 文件或 Amazon S3 清单列表。在 Workflow Studio 中,您可以从项目来源字段下的 Amazon S3 项目来源下拉列表中选择一种输入类型。

  • CSVHeaderLocation

    注意

    仅当使用 CSV 文件作为数据集时,才必须指定此字段。

    接受以下值之一来指定列标题的位置:

    重要

    目前,Step Functions 支持最大 10 KB 的 CSV 标题。

    • FIRST_ROW – 如果文件的第一行是标题,则使用此选项。

    • GIVEN – 使用此选项在状态机定义中指定标题。例如,如果您的 CSV 文件包含以下内容。

      1,307,3.5,1256677221 1,481,3.5,1256677456 1,1091,1.5,1256677471 ...

      提供以下 JSON 数组作为 CSV 标题。

      "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "GIVEN", "CSVHeaders": [ "userId", "movieId", "rating", "timestamp" ] } }
    提示

    在 Workflow Studio 中,您可以在项目来源字段的其他配置下找到此选项。

  • MaxItems

    限制传递给 Map 状态的数据项数量。例如,假设您提供的一个包含 1000 行的 CSV 文件并指定限制为 100。然后,解释器将Map 状态传递 100 行。Map 状态从标题行之后开始,按顺序处理项。

    默认情况下,Map 状态会迭代指定数据集中的所有项。

    注意

    目前,您可以将上限指定为 1 亿。分布式 Map 状态将停止读取超过此限制的项。

    提示

    在 Workflow Studio 中,您可以在项目来源字段的其他配置下找到此选项。

    或者,您可以在分布式 Map 状态输入中指定现有键值对的参考路径。此路径必须解析为正整数。您可以在 MaxItemsPath 子字段中指定参考路径。

    重要

    您可以指定 MaxItemsMaxItemsPath 子字段,但不能同时指定两者。

Resource

Step Functions 必须根据指定的数据集调用的 Amazon S3 API 操作。

Parameters

一个 JSON 对象,用于指定存储数据集的 Amazon S3 存储桶名称和对象密钥。

重要

确保您的 Amazon S3 存储桶与您的状态机处在同一个 Amazon Web Services 账户和 Amazon Web Services 区域下。

数据集示例

您可以指定下列选项之一作为数据集:

重要

Step Functions 需要适当的权限,才能访问您使用的Amazon S3 数据集。有关数据集的 IAM 政策信息,请参阅适用于数据集的 IAM 策略

分布式 Map 状态可以接受从工作流的上一个步骤中传递的 JSON 输入。此输入必须是数组,或者必须包含特定节点内的数组。要选择包含数组的节点,可以使用 ItemsPath 字段。

要处理数组中的单个项,分布式 Map 状态会为每个数组项启动子工作流执行。以下选项卡显示了传递给 Map 状态的输入以及子工作流执行的相应输入的示例。

注意

当您的数据集是上一个步骤中的 JSON 数组时,Step Functions 会省略 ItemReader 字段。

Input passed to the Map state

思考以下由三个项组成的 JSON 数组。

"facts": [ { "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }, { "verdict": "false", "statement_date": "6/7/2022", "statement_source": "television" }, { "verdict": "mostly-true", "statement_date": "5/18/2016", "statement_source": "news" } ]
Input passed to a child workflow execution

分布式 Map 状态将启动三个子工作流执行。每次执行都会接收一个数组项作为输入。以下示例显示了子工作流执行接收的输入。

{ "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }

分布式 Map 状态 可以迭代存储在 Amazon S3 存储桶中的对象。当工作流执行到达 Map 状态时,Step Functions 会调用 ListObjectsV2 API 操作,该操作会返回 Amazon S3 对象元数据的数组。在此数组中,每个项目都包含存储在存储桶中的数据,例如 ETagKey

要处理数组中的各个项目,分布式 Map 状态 会启动一个子工作流执行。例如,假设您的 Amazon S3 存储桶包含 100 张图片。然后,调用 ListObjectsV2 API 操作后返回的数组包含 100 个项目。然后,分布式 Map 状态 将启动 100 个子工作流执行,以处理每个数组项。

注意
  • 目前,Step Functions 还会为您使用 Amazon S3 控制台在特定 Amazon S3 存储桶中创建的每个文件夹都提供一个项目。这会导致由分布式 Map 状态 启动额外的子工作流执行。为避免为该文件夹创建额外的子工作流执行,我们建议您使用 Amazon CLI 来创建文件夹。有关更多信息,请参阅《Amazon Command Line Interface 用户指南》中的高级别 Amazon S3 命令

  • Step Functions 需要适当的权限,才能访问您使用的Amazon S3 数据集。有关数据集的 IAM 政策信息,请参阅适用于数据集的 IAM 策略

以下选项卡显示了该数据集的 ItemReader 字段语法和传递给子工作流执行的输入的示例。

ItemReader syntax

在此示例中,您已将数据(包括图像、JSON 文件和对象)组织在名为 myBucket 的 Amazon S3 存储桶中名为 processData 的前缀内。

"ItemReader": { "Resource": "arn:aws:states:::s3:listObjectsV2", "Parameters": { "Bucket": "myBucket", "Prefix": "processData" } }
Input passed to a child workflow execution

分布式 Map 状态 启动的子工作流执行与 Amazon S3 存储桶中存在的项目数量一样。以下示例显示了子工作流执行接收的输入。

{ "Etag": "\"05704fbdccb224cb01c59005bebbad28\"", "Key": "processData/images/n02085620_1073.jpg", "LastModified": 1668699881, "Size": 34910, "StorageClass": "STANDARD" }

分布式 Map 状态 可以接受存储在 Amazon S3 存储桶中的 JSON 文件作为数据集。JSON 文件必须包含一个数组。

当工作流执行到达 Map 状态时,Step Functions 会调用 GetObject API 操作来获取指定的 JSON 文件。然后,Map 状态会迭代数组中的每个项目,并开始对每个项目执行子工作流。例如,如果您的 JSON 文件包含 1000 个数组项,则 Map 状态将启动 1000 个子工作流执行。

注意
  • 用于启动子工作流执行的执行输入不能超过 256 KB。但是,如果您随后应用可选的 ItemSelector 字段来减小项目的大小,Step Functions 支持从 CSV 或 JSON 文件中读取最大 8 MB 的项目。

  • 目前,Step Functions 支持 Amazon S3 清单报告中单个文件的最大大小为 10 GB。但是,如果每个文件都小于 10 GB,Step Functions 能够处理大小可以超过 10 GB。

  • Step Functions 需要适当的权限,才能访问您使用的Amazon S3 数据集。有关数据集的 IAM 政策信息,请参阅适用于数据集的 IAM 策略

以下选项卡显示了该数据集的 ItemReader 字段语法和传递给子工作流执行的输入的示例。

在此示例中,假设您有一个名为 factcheck.json 的 JSON 文件。您已将此文件存储在 Amazon S3 存储桶中的名为 jsonDataset 的前缀中。以下是 JSON 数据集的示例:

[ { "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }, { "verdict": "false", "statement_date": "6/7/2022", "statement_source": "television" }, { "verdict": "mostly-true", "statement_date": "5/18/2016", "statement_source": "news" }, ... ]
ItemReader syntax
"ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "InputType": "JSON" }, "Parameters": { "Bucket": "myBucket", "Key": "jsonDataset/factcheck.json" } }
Input to a child workflow execution

分布式 Map 状态 启动的子工作流执行与 JSON 文件中存在的数组项数量一样多。以下示例显示了子工作流执行接收的输入。

{ "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }

分布式 Map 状态 可以接受存储在 Amazon S3 存储桶中的 CSV 文件作为数据集。如果您使用 CSV 文件作为数据集,则需要指定 CSV 列标题。有关如何指定 CSV 标题的信息,请参阅 ItemReader 字段的内容

由于在 CSV 文件中没有创建和维护数据的标准化格式,因此 Step Functions 会根据以下规则解析 CSV 文件:

  • 逗号 (,) 是用于分隔单个字段的分隔符。

  • 换行符是分隔单个记录的分隔符。

  • 字段被视为字符串。对于数据类型转换,使用 ItemSelector 中的 States.StringToJson 内置函数。

  • 不需要双引号 (" ") 将字符串括起来。但是,用双引号括起来的字符串可以包含逗号和换行符,但不能用作分隔符。

  • 通过重复双引号来转义双引号。

  • 如果一行中的字段数少于标题中的字段数,Step Functions 会为缺失的值提供空字符串。

  • 如果一行中的字段数大于标题中的字段数,Step Functions 会跳过多余的字段。

有关 Step Functions 如何解析 CSV 文件的更多信息,请参阅Example of parsing an input CSV file

当工作流执行到达 Map 状态时,Step Functions 会调用 GetObject 操作来获取指定的 CSV 文件。然后,Map 状态会迭代 CSV 文件中的每一行,并启动一个子工作流执行以处理每行中的项目。例如,假设您提供了一个包含 100 行的 CSV 文件作为输入。然后,解释器将每一行传递给 Map 状态。Map 状态从标题行之后开始,按顺序处理项目。

注意
  • 用于启动子工作流执行的执行输入不能超过 256 KB。但是,如果您随后应用可选的 ItemSelector 字段来减小项目的大小,Step Functions 支持从 CSV 或 JSON 文件中读取最大 8 MB 的项目。

  • 目前,Step Functions 支持 Amazon S3 清单报告中单个文件的最大大小为 10 GB。但是,如果每个文件都小于 10 GB,Step Functions 能够处理大小可以超过 10 GB。

  • Step Functions 需要适当的权限,才能访问您使用的Amazon S3 数据集。有关数据集的 IAM 政策信息,请参阅适用于数据集的 IAM 策略

以下选项卡显示了该数据集的 ItemReader 字段语法和传递给子工作流执行的输入的示例。

ItemReader syntax

例如,假设您有一个名为 ratings.csv 的 CSV 文件。然后,您已将此文件存储在 Amazon S3 存储桶中名为 csvDataset 的前缀中。

{ "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "myBucket", "Key": "csvDataset/ratings.csv" } } }
Input to a child workflow execution

分布式 Map 状态 启动的子工作流执行与 CSV 文件中存在的行数一样多,不包括标题行(如果文件有)。以下示例显示了子工作流执行接收的输入。

{ "rating": "3.5", "movieId": "307", "userId": "1", "timestamp": "1256677221" }

分布式 Map 状态 可以接受存储在 Amazon S3 存储桶中的 Amazon S3 清单文件作为数据集。

当工作流执行到达 Map 状态时,Step Functions 会调用 GetObject API 操作来获取指定的 Amazon S3 清单文件。然后,Map 状态会迭代清单中的对象,以返回 Amazon S3 清单对象元数据数组。

注意
  • 目前,Step Functions 支持 Amazon S3 清单报告中单个文件的最大大小为 10 GB。但是,如果每个文件都小于 10 GB,Step Functions 能够处理大小可以超过 10 GB。

  • Step Functions 需要适当的权限,才能访问您使用的Amazon S3 数据集。有关数据集的 IAM 政策信息,请参阅适用于数据集的 IAM 策略

以下是 CSV 格式的清单文件示例:此文件包含名为 csvDatasetimageDataset 的对象,其中存储在名为 sourceBucket 的 Amazon S3 存储桶中。

"sourceBucket","csvDataset/","0","2022-11-16T00:27:19.000Z" "sourceBucket","csvDataset/titles.csv","3399671","2022-11-16T00:29:32.000Z" "sourceBucket","imageDataset/","0","2022-11-15T20:00:44.000Z" "sourceBucket","imageDataset/n02085620_10074.jpg","27034","2022-11-15T20:02:16.000Z" ...
重要

目前,Step Functions 不支持将用户定义的 Amazon S3 清单报告作为数据集。您还必须确保 Amazon S3 清单报告的输出格式为 CSV。有关 Amazon S3 清单及其设置方法的更多信息,请参阅《Amazon S3 用户指南》中的 Amazon S3 清单

以下清单列表文件示例显示了清单对象元数据的 CSV 标题。

{ "sourceBucket" : "sourceBucket", "destinationBucket" : "arn:aws:s3:::inventory", "version" : "2016-11-30", "creationTimestamp" : "1668560400000", "fileFormat" : "CSV", "fileSchema" : "Bucket, Key, Size, LastModifiedDate", "files" : [ { "key" : "source-bucket/destination-prefix/data/20e55de8-9c21-45d4-99b9-46c732000228.csv.gz", "size" : 7300, "MD5checksum" : "a7ff4a1d4164c3cd55851055ec8f6b20" } ] }

以下选项卡显示了该数据集的 ItemReader 字段语法和传递给子工作流执行的输入的示例。

ItemReader syntax
{ "ItemReader": { "ReaderConfig": { "InputType": "MANIFEST" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "destinationBucket", "Key": "destination-prefix/source-bucket/config-ID/YYYY-MM-DDTHH-MMZ/manifest.json" } } }
Input to a child workflow execution
{ "LastModifiedDate": "2022-11-16T00:29:32.000Z", "Bucket": "sourceBucket", "Size": "3399671", "Key": "csvDataset/titles.csv" }

根据您在配置 Amazon S3 清单报告时选择的字段,您的 manifest.json 文件内容可能与所示示例有所不同。

适用于数据集的 IAM 策略

当您使用 Step Functions 控制台创建工作流时,Step Functions 可以根据工作流定义中的资源自动生成 IAM 策略。这些策略包括允许状态机角色调用分布式 Map 状态StartExecution API 操作所需的最低权限。这些策略还包括 Step Functions 访问 Amazon 资源(例如 Amazon S3 存储桶和对象以及 Lambda 函数)所需的最低权限。我们建议在您的 IAM 策略中仅包含这些必需的权限。例如,如果您的工作流包含分布式模式下的 Map 状态,则将策略范围缩小到包含您的数据集的特定 Amazon S3 存储桶和文件夹。

重要

如果您在分布式 Map 状态 输入中指定了 Amazon S3 存储桶和对象或前缀,并将参考路径指向现有键值对,请务必更新工作流程的 IAM 策略。将策略范围缩小到运行时该路径解析到的存储桶和对象名称。

以下 IAM 策略示例授予使用 ListObjectsV2GetObject API 操作访问 Amazon S3 数据集所需的最低权限。

例 Amazon S3 对象作为数据集的 IAM 策略

以下示例显示了一个 IAM 策略,该策略可授予访问名为 myBucket 的 Amazon S3 存储桶的 processImages 中组织的对象的最低权限。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::myBucket" ], "Condition": { "StringLike": { "s3:prefix": [ "processImages" ] } } } ] }
例 将 CSV 文件作为数据集的 IAM 政策

以下示例显示一个 IAM 策略,该策略授予可授予访问名为 ratings.csv 的 CSV 文件的最低权限。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject" ], "Resource": [ "arn:aws:s3:::myBucket/csvDataset/ratings.csv" ] } ] }
例 Amazon S3 清单作为数据集的 IAM 策略

以下示例显示了一个 IAM 策略,可授予访问 Amazon S3 清单报告的最低权限。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject" ], "Resource": [ "arn:aws:s3:::destination-prefix/source-bucket/config-ID/YYYY-MM-DDTHH-MMZ/manifest.json", "arn:aws:s3:::destination-prefix/source-bucket/config-ID/data/*" ] } ] }