使用 Step Functions 创建和管理 Amazon EMR 集群 - Amazon Step Functions
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

使用 Step Functions 创建和管理 Amazon EMR 集群

了解如何使用提供的 Amazon EMR 服务集成 API 将 Amazon Step Functions 与 Amazon EMR 集成。服务集成 API 与相应的 Amazon EMR API 相似,但在传递的字段和返回的响应方面有所不同。

要了解如何在 Step Functions 中与 Amazon 服务集成,请参阅集成 服务在 Step Functions 中将参数传递给服务 API

经优化的 Amazon EMR 集成的主要功能
  • 优化的 Amazon EMR 服务集成有一组自定义的 API,用于封装底层的 Amazon EMR API,如下所述。因此,它与 Amazon EMR Amazon SDK 服务集成相比有很大不同。

  • 支持运行作业 (.sync) 集成模式。

如果执行停止,Step Functions 不会自动终止 Amazon EMR 集群。如果您的状态机在 Amazon EMR 集群终止之前停止,则您的集群可能会无限期地继续运行,并且可能会产生额外费用。为避免这种情况,请确保您创建的任何 Amazon EMR 集群都已正确终止。有关更多信息,请参阅:

注意

emr-5.28.0 起,您可以在创建集群时指定参数 StepConcurrencyLevel,以允许在单个集群上并行运行多个步骤。您可以使用 Step Functions MapParallel 状态将工作并行提交到集群。

Amazon EMR 服务集成的可用性取决于 Amazon EMR API 的可用性。请查看 Amazon EMR 文档,了解特殊区域的限制。

注意

为了与 Amazon EMR 集成,Step Functions 在前 10 分钟具有硬编码的 60 秒作业轮询频率,10 分钟后为 300 秒作业轮询频率。

经优化的 Amazon EMR API

下表描述了每个 Amazon EMR 服务集成 API 与相应 Amazon EMR API 之间的差异。

Amazon EMR 服务集成 API 相应的 EMR API 差异
createCluster

创建并开始运行集群(作业流程)。

Amazon EMR 与一种独特类型的 IAM 角色(称为服务相关角色)直接关联。要使 createClustercreateCluster.sync 起作用,您必须配置必要的权限以创建与服务关联的角色 AWSServiceRoleForEMRCleanup。有关此问题的更多信息,包括您可以添加到 IAM 权限策略的语句,请参阅使用 Amazon EMR 的服务关联角色

runJobFlow createCluster 使用与 runJobFlow 相同的请求语法,但以下情况除外:
  • 必须填写 Instances.KeepJobFlowAliveWhenNoSteps 字段,且该字段必须具有 Boolean 值 TRUE

  • 不允许填写字段 Steps

  • 应提供 Instances.InstanceFleets[index].Name 字段的值,并且如果使用可选的 modifyInstanceFleetByName 连接器 API,该字段必须是唯一的。

  • 应提供 Instances.InstanceGroups[index].Name 字段的值,并且如果使用可选的 modifyInstanceGroupByName API,该字段必须是唯一的。

响应如下:
{ "ClusterId": "string" }
Amazon EMR 使用以下信息:
{ "JobFlowId": "string" }
createCluster.sync

创建并开始运行集群(作业流程)。

runJobFlow createCluster 相同,但等待集群达到 WAITING 状态。
setClusterTerminationProtection

锁定集群(作业流程),以使集群中的 EC2 实例无法通过用户干预、API 调用或作业流程错误终止。

setTerminationProtection 请求使用:
{ "ClusterId": "string" }
Amazon EMR 使用以下信息:
{ "JobFlowIds": ["string"] }
terminateCluster

关闭集群(作业流程)。

terminateJobFlows 请求使用:
{ "ClusterId": "string" }
Amazon EMR 使用以下信息:
{ "JobFlowIds": ["string"] }
terminateCluster.sync

关闭集群(作业流程)。

terminateJobFlows terminateCluster 相同,但等待集群终止。
addStep

向正在运行的集群添加新步骤。

另外,使用此 API 时,还能指定 ExecutionRoleArn 参数。

addJobFlowSteps

请求使用密钥 "ClusterId"。Amazon EMR 使用 "JobFlowId"。请求使用单一步骤。
{ "Step": <"StepConfig object"> }
Amazon EMR 使用以下信息:
{ "Steps": [<StepConfig objects>] }
响应如下:
{ "StepId": "string" }
Amazon EMR 返回以下内容:
{ "StepIds": [<strings>] }
addStep.sync

向正在运行的集群添加新步骤。

另外,使用此 API 时,还能指定 ExecutionRoleArn 参数。

addJobFlowSteps

addStep 相同,但等待步骤完成。
cancelStep

取消正在运行的集群中的一个待处理步骤。

cancelSteps 请求使用:
{ "StepId": "string" }
Amazon EMR 使用以下信息:
{ "StepIds": [<strings>] }
响应如下:
{ "CancelStepsInfo": <CancelStepsInfo object> }
Amazon EMR 使用以下信息:
{ "CancelStepsInfoList": [<CancelStepsInfo objects>] }
modifyInstanceFleetByName

使用指定的 InstanceFleetName 修改实例队列的目标按需容量和目标 Spot 容量。

modifyInstanceFleet 请求与 modifyInstanceFleet 相同,但以下情况除外:
  • 不允许填写字段 Instance.InstanceFleetId

  • 在运行时,通过调用 InstanceFleetId 并解析结果,服务集成会自动确定 ListInstanceFleets

modifyInstanceGroupByName

修改实例组的节点数和配置设置。

modifyInstanceGroups 请求如下:
{ "ClusterId": "string", "InstanceGroup": <InstanceGroupModifyConfig object> }
Amazon EMR 使用以下列表:
{ "ClusterId": ["string"], "InstanceGroups": [<InstanceGroupModifyConfig objects>] }

InstanceGroupModifyConfig 对象中,不允许填写 InstanceGroupId 字段。

已添加一个新字段 InstanceGroupName。在运行时,通过调用 InstanceGroupId 并解析结果,服务集成会自动确定 ListInstanceGroups

工作流程示例

以下内容包含一个创建集群的 Task 状态。

"Create_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync", "Arguments": { "Name": "MyWorkflowCluster", "VisibleToAllUsers": true, "ReleaseLabel": "emr-5.28.0", "Applications": [ { "Name": "Hive" } ], "ServiceRole": "EMR_DefaultRole", "JobFlowRole": "EMR_EC2_DefaultRole", "LogUri": "s3n://aws-logs-account-id-us-east-1/elasticmapreduce/", "Instances": { "KeepJobFlowAliveWhenNoSteps": true, "InstanceFleets": [ { "InstanceFleetType": "MASTER", "Name": "MASTER", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m4.xlarge" } ] }, { "InstanceFleetType": "CORE", "Name": "CORE", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m4.xlarge" } ] } ] } }, "End": true }

以下内容包括启用终止保护的 Task 状态。

"Enable_Termination_Protection": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:setClusterTerminationProtection", "Arguments": { "ClusterId": "{% $ClusterId %}", "TerminationProtected": true }, "End": true }

以下内容包括向集群提交步骤的 Task 状态。

"Step_One": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync", "Arguments": { "ClusterId": "{% $ClusterId %}", "ExecutionRoleArn": "arn:aws:iam::account-id:role/myEMR-execution-role", "Step": { "Name": "The first step", "ActionOnFailure": "TERMINATE_CLUSTER", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ "hive-script", "--run-hive-script", "--args", "-f", "s3://region.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q", "-d", "INPUT=s3://region.elasticmapreduce.samples", "-d", "OUTPUT=s3://<amzn-s3-demo-bucket>/MyHiveQueryResults/" ] } } }, "End": true }

以下内容包括取消步骤的 Task 状态。

"Cancel_Step_One": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:cancelStep", "Arguments": { "ClusterId": "{% $ClusterId %}", "StepId": "{% $AddStepsResult.StepId %}" }, "End": true }

以下内容包括终止集群的 Task 状态。

"Terminate_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync", "Arguments": { "ClusterId": "{% $ClusterId %}", }, "End": true }

以下内容包括为实例组向上或向下扩展集群的 Task 状态。

"ModifyInstanceGroupByName": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceGroupByName", "Arguments": { "ClusterId": "j-account-id3", "InstanceGroupName": "MyCoreGroup", "InstanceGroup": { "InstanceCount": 8 } }, "End": true }

以下内容包括为实例队列向上或向下扩展集群的 Task 状态。

"ModifyInstanceFleetByName": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceFleetByName", "Arguments": { "ClusterId": "j-account-id3", "InstanceFleetName": "MyCoreFleet", "InstanceFleet": { "TargetOnDemandCapacity": 8, "TargetSpotCapacity": 0 } }, "End": true }

用于调用 Amazon EMR 的 IAM 策略

以下示例模板展示了 Amazon Step Functions 如何根据状态机定义中的资源生成 IAM 策略。有关更多信息,请参阅Step Functions 如何为集成服务生成 IAM 策略探索 Step Functions 中的服务集成模式

addStep

静态资源

{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:AddJobFlowSteps", "elasticmapreduce:DescribeStep", "elasticmapreduce:CancelSteps" ], "Resource": [ "arn:aws:elasticmapreduce:us-east-1:123456789012:cluster/clusterId" ] } ] }

动态资源

{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:AddJobFlowSteps", "elasticmapreduce:DescribeStep", "elasticmapreduce:CancelSteps" ], "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

cancelStep

静态资源

{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:CancelSteps", "Resource": [ "arn:aws:elasticmapreduce:us-east-1:123456789012:cluster/myCluster-id" ] } ] }

动态资源

{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:CancelSteps", "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

createCluster

静态资源

{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:RunJobFlow", "elasticmapreduce:DescribeCluster", "elasticmapreduce:TerminateJobFlows" ], "Resource": "*" }, { "Effect": "Allow", "Action": "iam:PassRole", "Resource": [ "arn:aws:iam::123456789012:role/myRoleName" ] } ] }

setClusterTerminationProtection

静态资源

{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:SetTerminationProtection", "Resource": [ "arn:aws:elasticmapreduce:us-east-1:123456789012:cluster/myCluster-id" ] } ] }

动态资源

{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:SetTerminationProtection", "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

modifyInstanceFleetByName

静态资源

{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceFleet", "elasticmapreduce:ListInstanceFleets" ], "Resource": [ "arn:aws:elasticmapreduce:us-east-1:123456789012:cluster/myCluster-id" ] } ] }

动态资源

{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceFleet", "elasticmapreduce:ListInstanceFleets" ], "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

modifyInstanceGroupByName

静态资源

{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceGroups", "elasticmapreduce:ListInstanceGroups" ], "Resource": [ "arn:aws:elasticmapreduce:us-east-1:123456789012:cluster/myCluster-id" ] } ] }

动态资源

{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceGroups", "elasticmapreduce:ListInstanceGroups" ], "Resource": "*" } ] }

terminateCluster

静态资源

{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:TerminateJobFlows", "elasticmapreduce:DescribeCluster" ], "Resource": [ "arn:aws:elasticmapreduce:us-east-1:123456789012:cluster/myCluster-id" ] } ] }

动态资源

{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:TerminateJobFlows", "elasticmapreduce:DescribeCluster" ], "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }