使用分Step Functions 致电 Amazon EMR - Amazon Step Functions
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用分Step Functions 致电 Amazon EMR

Step Functions 可以控制某些Amazon服务直接来自Amazon 状态语言。有关使用的更多信息Amazon Step Functions及其集成,请参阅以下内容:

优化的亚马逊 EMR 集成与亚马逊 EMR 有何不同AmazonSDK 集成

优化的亚马逊 EMR 服务集成有一组自定义的 API,其中包含底层亚马逊 EMR API,如下所述。因此,它与亚马逊 EMR 有很大不同AmazonSDK 服务集成。此外,运行作业 (.sync)支持集成模式。

集成Amazon Step Functions使用亚马逊 EMR,您可以使用提供的亚马逊 EMR 服务集成 API。服务集成 API 与相应的 Amazon EMR API 类似,但在传递的字段和返回的响应中存在一些差异。

如果停止执行,Step Functions 不会自动终止 Amazon EMR 集群。如果您的状态机在 Amazon EMR 集群终止之前停止,则您的集群可能会无限期地继续运行,并可能产生额外费用。为避免这种情况,请确保正确终止您创建的任何 Amazon EMR 集群。有关更多信息,请参阅:

注意

截至目前emr-5.28.0,您可以指定参数StepConcurrencyLevel创建集群时,允许在单个集群上parallel 运行多个步骤。您可以使用 Step FunctionsMapParallel各州将与集群parallel 提交工作。

亚马逊 EMR 服务集成的可用性取决于亚马逊 EMR API 的可用性。请查看Amazon EMR特殊区域限制的文档。

注意

为了与 Amazon EMR 集成,Step Functions 的前 10 分钟和 300 秒的硬编码作业轮询频率为 60 秒。

下表描述了每个服务集成 API 与其对应的 Amazon EMR API 之间的区别。

亚马逊 EMR 服务集成 API 和相应的亚马逊 EMR API
Amazon EMR Service 集成 相应的 EMR API 区别
CreateCluster

创建并开始运行集群(作业流程)。

Amazon EMR 与一种独特类型的 IAM 角色,即服务相关角色。要使 createClustercreateCluster.sync 起作用,您必须配置必要的权限以创建与服务关联的角色 AWSServiceRoleForEMRCleanup。有关这方面的更多信息,包括可以添加到 IAM 权限策略中的声明,请参阅在 Amazon EMR 中使用服务相关角色.

runJobFlow createCluster使用与之相同的请求语法runJobFlow,但以下情况除外:
  • 必须填写 Instances.KeepJobFlowAliveWhenNoSteps 字段,且该字段必须具有 Boolean 值 TRUE

  • 不允许填写字段 Steps

  • 应提供 Instances.InstanceFleets[index].Name 字段的值,并且如果使用可选的 modifyInstanceFleetByName 连接器 API,该字段必须是唯一的。

  • 应提供 Instances.InstanceGroups[index].Name 字段的值,并且如果使用可选的 modifyInstanceGroupByName API,该字段必须是唯一的。

响应如下:
{ "ClusterId": "string" }
亚马逊 EMR 使用这个:
{ "JobFlowId": "string" }
createCluster.sync

创建并开始运行集群(作业流程)。

runJobFlow createCluster 相同,但等待集群达到 WAITING 状态。
setClusterTermination保护

锁定集群(作业流程),以使集群中的 EC2 实例无法通过用户干预、API 调用或作业流程错误终止。

setTerminationProtection 请求使用:
{ "ClusterId": "string" }
亚马逊 EMR 使用这个:
{ "JobFlowIds": ["string"] }
terminateCluster

关闭集群(作业流程)。

terminateJobFlows 请求使用:
{ "ClusterId": "string" }
亚马逊 EMR 使用这个:
{ "JobFlowIds": ["string"] }
terminateCluster.sync

关闭集群(作业流程)。

terminateJobFlows terminateCluster 相同,但等待集群终止。
addStep

向正在运行的集群添加新步骤。

另外,您还可以指定ExecutionRoleArn使用此 API 时的参数。

addJobFlow步骤

请求使用密钥"ClusterId". Amazon EMR 使用"JobFlowId". 请求使用单一步骤。
{ "Step": <"StepConfig object"> }
亚马逊 EMR 使用这个:
{ "Steps": [<StepConfig objects>] }
响应如下:
{ "StepId": "string" }
亚马逊 EMR 退回了这个:
{ "StepIds": [<strings>] }
addStep.sync

向正在运行的集群添加新步骤。

另外,您还可以指定ExecutionRoleArn使用此 API 时的参数。

addJobFlow步骤

addStep 相同,但等待步骤完成。
cancelStep

取消正在运行的集群中的一个待处理步骤。

cancelSteps 请求使用:
{ "StepId": "string" }
亚马逊 EMR 使用这个:
{ "StepIds": [<strings>] }
响应如下:
{ "CancelStepsInfo": <CancelStepsInfo object> }
亚马逊 EMR 使用这个:
{ "CancelStepsInfoList": [<CancelStepsInfo objects>] }
modifyInstanceFleetByName

使用指定的 InstanceFleetName 修改实例队列的目标按需容量和目标 Spot 容量。

modifyInstanceFleet 请求与 modifyInstanceFleet 相同,但以下情况除外:
  • 不允许填写字段 Instance.InstanceFleetId

  • 在运行时,通过调用 ListInstanceFleets 并解析结果,服务集成会自动确定 InstanceFleetId

modifyInstanceGroupByName

修改实例组的节点数和配置设置。

modifyInstanceGroups 请求如下:
{ "ClusterId": "string", "InstanceGroup": <InstanceGroupModifyConfig object> }
亚马逊 EMR 使用列表:
{ "ClusterId": ["string"], "InstanceGroups": [<InstanceGroupModifyConfig objects>] }

InstanceGroupModifyConfig 对象中,不允许填写 InstanceGroupId 字段。

已添加一个新字段 InstanceGroupName。在运行时,通过调用 ListInstanceGroups 并解析结果,服务集成会自动确定 InstanceGroupId

以下内容包含一个创建集群的 Task 状态。

"Create_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync", "Parameters": { "Name": "MyWorkflowCluster", "VisibleToAllUsers": true, "ReleaseLabel": "emr-5.28.0", "Applications": [ { "Name": "Hive" } ], "ServiceRole": "EMR_DefaultRole", "JobFlowRole": "EMR_EC2_DefaultRole", "LogUri": "s3n://aws-logs-123456789012-us-east-1/elasticmapreduce/", "Instances": { "KeepJobFlowAliveWhenNoSteps": true, "InstanceFleets": [ { "InstanceFleetType": "MASTER", "Name": "MASTER", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m4.xlarge" } ] }, { "InstanceFleetType": "CORE", "Name": "CORE", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m4.xlarge" } ] } ] } }, "End": true }

以下内容包括启用终止保护的 Task 状态。

"Enable_Termination_Protection": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:setClusterTerminationProtection", "Parameters": { "ClusterId.$": "$.ClusterId", "TerminationProtected": true }, "End": true }

以下内容包括向集群提交步骤的 Task 状态。

"Step_One": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync", "Parameters": { "ClusterId.$": "$.ClusterId", "ExecutionRoleArn": "arn:aws:iam::123456789012:role/myEMR-execution-role", "Step": { "Name": "The first step", "ActionOnFailure": "CONTINUE", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ "hive-script", "--run-hive-script", "--args", "-f", "s3://<region>.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q", "-d", "INPUT=s3://<region>.elasticmapreduce.samples", "-d", "OUTPUT=s3://<mybucket>/MyHiveQueryResults/" ] } } }, "End": true }

以下内容包括取消步骤的 Task 状态。

"Cancel_Step_One": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:cancelStep", "Parameters": { "ClusterId.$": "$.ClusterId", "StepId.$": "$.AddStepsResult.StepId" }, "End": true }

以下内容包括终止集群的 Task 状态。

"Terminate_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync", "Parameters": { "ClusterId.$": "$.ClusterId" }, "End": true }

以下内容包括为实例组向上或向下扩展集群的 Task 状态。

"ModifyInstanceGroupByName": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceGroupByName", "Parameters": { "ClusterId": "j-1234567890123", "InstanceGroupName": "MyCoreGroup", "InstanceGroup": { "InstanceCount": 8 } }, "End": true }

以下内容包括为实例队列向上或向下扩展集群的 Task 状态。

"ModifyInstanceFleetByName": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceFleetByName", "Parameters": { "ClusterId": "j-1234567890123", "InstanceFleetName": "MyCoreFleet", "InstanceFleet": { "TargetOnDemandCapacity": 8, "TargetSpotCapacity": 0 } }, "End": true }

有关在将 Step Functions 与其他功能一起使用时如何配置 IAM 的信息Amazon服务,请参阅集成服务的 IAM 政策.