响应 Amazon EMR 集群实例集调整大小超时事件 - Amazon EMR
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

响应 Amazon EMR 集群实例集调整大小超时事件

概述

Amazon EMR 集群在对实例集集群执行调整大小操作时会发出事件。超时到期后,当 Amazon EMR 停止为实例集预置竞价型或按需容量时,就会发出预置超时事件。用户可以将超时持续时间配置为实例集调整大小规范的一部分。在对相同实例集连续调整大小的情况下,当前调整大小操作的超时到期时,Amazon EMR 会发出 Spot provisioning timeout - continuing resizeOn-Demand provisioning timeout - continuing resize 事件。然后,它开始为队列的下一次调整大小操作预置容量。

响应实例集调整大小超时事件

我们建议您通过以下方式之一来响应预置超时事件:

  • 重访调整大小规范,然后重试调整大小操作。由于容量频繁变化,一旦 Amazon EC2 容量可用,您的集群就会成功调整大小。我们建议客户为要求更严格的任务配置较低的超时持续时间值SLAs。

  • 或者,您可以:

  • 对于配置超时:继续调整事件大小,您还可以等待调整大小操作的处理。Amazon EMR 将继续按顺序处理针对实例触发的调整大小操作,同时遵守配置的调整大小规范。

您还可以设置对该事件的规则或自动响应,如下一节所述。

自动从预置超时事件中恢复

您可以使用 Spot Provisioning timeout 事件代码构建自动化以响应 Amazon EMR 事件。例如,以下 Amazon Lambda 函数关闭了具有使用竞价型实例作为任务节点的实例集的 EMR 集群,然后创建一个新的 EMR 集群,其实例集包含的实例类型比原始请求更加多样化。在此示例中,为任务节点发出的 Spot Provisioning timeout 事件将触发 Lambda 函数的执行。

例响应 Spot Provisioning timeout 事件的示例函数
// Lambda code with Python 3.10 and handler is lambda_function.lambda_handler // Note: related IAM role requires permission to use Amazon EMR import json import boto3 import datetime from datetime import timezone SPOT_PROVISIONING_TIMEOUT_EXCEPTION_DETAIL_TYPE = "EMR Instance Fleet Resize" SPOT_PROVISIONING_TIMEOUT_EXCEPTION_EVENT_CODE = ( "Spot Provisioning timeout" ) CLIENT = boto3.client("emr", region_name="us-east-1") # checks if the incoming event is 'EMR Instance Fleet Resize' with eventCode 'Spot provisioning timeout' def is_spot_provisioning_timeout_event(event): if not event["detail"]: return False else: return ( event["detail-type"] == SPOT_PROVISIONING_TIMEOUT_EXCEPTION_DETAIL_TYPE and event["detail"]["eventCode"] == SPOT_PROVISIONING_TIMEOUT_EXCEPTION_EVENT_CODE ) # checks if the cluster is eligible for termination def is_cluster_eligible_for_termination(event, describeClusterResponse): # instanceFleetType could be CORE, MASTER OR TASK instanceFleetType = event["detail"]["instanceFleetType"] # Check if instance fleet receiving Spot provisioning timeout event is TASK if (instanceFleetType == "TASK"): return True else: return False # create a new cluster by choosing different InstanceType. def create_cluster(event): # instanceFleetType cloud be CORE, MASTER OR TASK instanceFleetType = event["detail"]["instanceFleetType"] # the following two lines assumes that the customer that created the cluster already knows which instance types they use in original request instanceTypesFromOriginalRequestMaster = "m5.xlarge" instanceTypesFromOriginalRequestCore = "m5.xlarge" # select new instance types to include in the new createCluster request instanceTypesForTask = [ "m5.xlarge", "m5.2xlarge", "m5.4xlarge", "m5.8xlarge", "m5.12xlarge" ] print("Starting to create cluster...") instances = { "InstanceFleets": [ { "InstanceFleetType":"MASTER", "TargetOnDemandCapacity":1, "TargetSpotCapacity":0, "InstanceTypeConfigs":[ { 'InstanceType': instanceTypesFromOriginalRequestMaster, "WeightedCapacity":1, } ] }, { "InstanceFleetType":"CORE", "TargetOnDemandCapacity":1, "TargetSpotCapacity":0, "InstanceTypeConfigs":[ { 'InstanceType': instanceTypesFromOriginalRequestCore, "WeightedCapacity":1, } ] }, { "InstanceFleetType":"TASK", "TargetOnDemandCapacity":0, "TargetSpotCapacity":100, "LaunchSpecifications":{}, "InstanceTypeConfigs":[ { 'InstanceType': instanceTypesForTask[0], "WeightedCapacity":1, }, { 'InstanceType': instanceTypesForTask[1], "WeightedCapacity":2, }, { 'InstanceType': instanceTypesForTask[2], "WeightedCapacity":4, }, { 'InstanceType': instanceTypesForTask[3], "WeightedCapacity":8, }, { 'InstanceType': instanceTypesForTask[4], "WeightedCapacity":12, } ], "ResizeSpecifications": { "SpotResizeSpecification": { "TimeoutDurationMinutes": 30 } } } ] } response = CLIENT.run_job_flow( Name="Test Cluster", Instances=instances, VisibleToAllUsers=True, JobFlowRole="EMR_EC2_DefaultRole", ServiceRole="EMR_DefaultRole", ReleaseLabel="emr-6.10.0", ) return response["JobFlowId"] # terminated the cluster using clusterId received in an event def terminate_cluster(event): print("Trying to terminate cluster, clusterId: " + event["detail"]["clusterId"]) response = CLIENT.terminate_job_flows(JobFlowIds=[event["detail"]["clusterId"]]) print(f"Terminate cluster response: {response}") def describe_cluster(event): response = CLIENT.describe_cluster(ClusterId=event["detail"]["clusterId"]) return response def lambda_handler(event, context): if is_spot_provisioning_timeout_event(event): print( "Received spot provisioning timeout event for instanceFleet, clusterId: " + event["detail"]["clusterId"] ) describeClusterResponse = describe_cluster(event) shouldTerminateCluster = is_cluster_eligible_for_termination( event, describeClusterResponse ) if shouldTerminateCluster: terminate_cluster(event) clusterId = create_cluster(event) print("Created a new cluster, clusterId: " + clusterId) else: print( "Cluster is not eligible for termination, clusterId: " + event["detail"]["clusterId"] ) else: print("Received event is not spot provisioning timeout event, skipping")