响应 Amazon EMR 集群实例集调整大小超时事件 - Amazon EMR
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

响应 Amazon EMR 集群实例集调整大小超时事件

概述

Amazon EMR 集群在对实例集集群执行调整大小操作时会发出事件。超时到期后,当 Amazon EMR 停止为实例集预置竞价型或按需容量时,就会发出预置超时事件。用户可以将超时持续时间配置为实例集调整大小规范的一部分。在对相同实例集连续调整大小的情况下,当前调整大小操作的超时到期时,Amazon EMR 会发出 Spot provisioning timeout - continuing resizeOn-Demand provisioning timeout - continuing resize 事件。然后,它开始为队列的下一次调整大小操作预置容量。

响应实例集调整大小超时事件

我们建议您通过以下方式之一来响应预置超时事件:

  • 重访调整大小规范,然后重试调整大小操作。由于容量频繁变化,只要 Amazon EC2 容量可用,您的集群就会成功调整大小。我们建议客户为需要更严格 SLA 的任务配置较低的超时持续时间值。

  • 或者,您可以:

  • 对于配置超时 – 继续调整事件大小,您还可以等待调整大小操作的处理。Amazon EMR 将继续按顺序处理针对实例触发的调整大小操作,同时遵守配置的调整大小规范。

您还可以设置对该事件的规则或自动响应,如下一节所述。

自动从预置超时事件中恢复

您可以使用 Spot Provisioning timeout 事件代码构建自动化以响应 Amazon EMR 事件。例如,以下 Amazon Lambda 函数关闭了具有使用竞价型实例作为任务节点的实例集的 EMR 集群,然后创建一个新的 EMR 集群,其实例集包含的实例类型比原始请求更加多样化。在此示例中,为任务节点发出的 Spot Provisioning timeout 事件将触发 Lambda 函数的执行。

例 响应 Spot Provisioning timeout 事件的示例函数
// Lambda code with Python 3.10 and handler is lambda_function.lambda_handler // Note: related IAM role requires permission to use Amazon EMR import json import boto3 import datetime from datetime import timezone SPOT_PROVISIONING_TIMEOUT_EXCEPTION_DETAIL_TYPE = "EMR Instance Fleet Resize" SPOT_PROVISIONING_TIMEOUT_EXCEPTION_EVENT_CODE = ( "Spot Provisioning timeout" ) CLIENT = boto3.client("emr", region_name="us-east-1") # checks if the incoming event is 'EMR Instance Fleet Resize' with eventCode 'Spot provisioning timeout' def is_spot_provisioning_timeout_event(event): if not event["detail"]: return False else: return ( event["detail-type"] == SPOT_PROVISIONING_TIMEOUT_EXCEPTION_DETAIL_TYPE and event["detail"]["eventCode"] == SPOT_PROVISIONING_TIMEOUT_EXCEPTION_EVENT_CODE ) # checks if the cluster is eligible for termination def is_cluster_eligible_for_termination(event, describeClusterResponse): # instanceFleetType could be CORE, MASTER OR TASK instanceFleetType = event["detail"]["instanceFleetType"] # Check if instance fleet receiving Spot provisioning timeout event is TASK if (instanceFleetType == "TASK"): return True else: return False # create a new cluster by choosing different InstanceType. def create_cluster(event): # instanceFleetType cloud be CORE, MASTER OR TASK instanceFleetType = event["detail"]["instanceFleetType"] # the following two lines assumes that the customer that created the cluster already knows which instance types they use in original request instanceTypesFromOriginalRequestMaster = "m5.xlarge" instanceTypesFromOriginalRequestCore = "m5.xlarge" # select new instance types to include in the new createCluster request instanceTypesForTask = [ "m5.xlarge", "m5.2xlarge", "m5.4xlarge", "m5.8xlarge", "m5.12xlarge" ] print("Starting to create cluster...") instances = { "InstanceFleets": [ { "InstanceFleetType":"MASTER", "TargetOnDemandCapacity":1, "TargetSpotCapacity":0, "InstanceTypeConfigs":[ { 'InstanceType': instanceTypesFromOriginalRequestMaster, "WeightedCapacity":1, } ] }, { "InstanceFleetType":"CORE", "TargetOnDemandCapacity":1, "TargetSpotCapacity":0, "InstanceTypeConfigs":[ { 'InstanceType': instanceTypesFromOriginalRequestCore, "WeightedCapacity":1, } ] }, { "InstanceFleetType":"TASK", "TargetOnDemandCapacity":0, "TargetSpotCapacity":100, "LaunchSpecifications":{}, "InstanceTypeConfigs":[ { 'InstanceType': instanceTypesForTask[0], "WeightedCapacity":1, }, { 'InstanceType': instanceTypesForTask[1], "WeightedCapacity":2, }, { 'InstanceType': instanceTypesForTask[2], "WeightedCapacity":4, }, { 'InstanceType': instanceTypesForTask[3], "WeightedCapacity":8, }, { 'InstanceType': instanceTypesForTask[4], "WeightedCapacity":12, } ], "ResizeSpecifications": { "SpotResizeSpecification": { "TimeoutDurationMinutes": 30 } } } ] } response = CLIENT.run_job_flow( Name="Test Cluster", Instances=instances, VisibleToAllUsers=True, JobFlowRole="EMR_EC2_DefaultRole", ServiceRole="EMR_DefaultRole", ReleaseLabel="emr-6.10.0", ) return response["JobFlowId"] # terminated the cluster using clusterId received in an event def terminate_cluster(event): print("Trying to terminate cluster, clusterId: " + event["detail"]["clusterId"]) response = CLIENT.terminate_job_flows(JobFlowIds=[event["detail"]["clusterId"]]) print(f"Terminate cluster response: {response}") def describe_cluster(event): response = CLIENT.describe_cluster(ClusterId=event["detail"]["clusterId"]) return response def lambda_handler(event, context): if is_spot_provisioning_timeout_event(event): print( "Received spot provisioning timeout event for instanceFleet, clusterId: " + event["detail"]["clusterId"] ) describeClusterResponse = describe_cluster(event) shouldTerminateCluster = is_cluster_eligible_for_termination( event, describeClusterResponse ) if shouldTerminateCluster: terminate_cluster(event) clusterId = create_cluster(event) print("Created a new cluster, clusterId: " + clusterId) else: print( "Cluster is not eligible for termination, clusterId: " + event["detail"]["clusterId"] ) else: print("Received event is not spot provisioning timeout event, skipping")