本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Amazon CloudFormation 自定义资源为实时终端节点创建监控计划
如果您使用的是实时终端节点,则可以使用 Amazon CloudFormation 自定义资源来创建监控计划。自定义资源位于 Python 中。要部署它,请参阅 Python Lambda 部署。
自定义资源
首先向 Amazon CloudFormation 模板添加自定义资源。这指向您下一步将创建的 Amazon Lambda 函数。
此资源使您可以自定义监控计划的参数。您可以通过修改以下示例资源中的 Amazon CloudFormation 资源和 Lambda 函数来添加或删除更多参数。
{ "AWSTemplateFormatVersion": "2010-09-09", "Resources": { "MonitoringSchedule": { "Type": "Custom::MonitoringSchedule", "Version": "1.0", "Properties": { "ServiceToken": "arn:aws:lambda:us-west-2:111111111111:function:lambda-name", "ScheduleName": "YourScheduleName", "EndpointName": "YourEndpointName", "BaselineConstraintsUri": "s3://your-baseline-constraints/constraints.json", "BaselineStatisticsUri": "s3://your-baseline-stats/statistics.json", "PostAnalyticsProcessorSourceUri": "s3://your-post-processor/postprocessor.py", "RecordPreprocessorSourceUri": "s3://your-preprocessor/preprocessor.py", "InputLocalPath": "/opt/ml/processing/endpointdata", "OutputLocalPath": "/opt/ml/processing/localpath", "OutputS3URI": "s3://your-output-uri", "ImageURI": "111111111111.dkr.ecr.us-west-2.amazonaws.com/your-image", "ScheduleExpression": "cron(0 * ? * * *)", "PassRoleArn": "arn:aws:iam::111111111111:role/AmazonSageMaker-ExecutionRole" } } } }
Lambda 自定义资源代码
此 Amazon CloudFormation 自定义资源使用自定义资源助手pip install crhelper
pip 安装该库。
此 Lambda 函数由 Amazon CloudFormation 在创建和删除堆栈期间调用。此 Lambda 函数负责创建和删除监控计划,并使用上一部分中描述的自定义资源中定义的参数。
import boto3 import botocore import logging from crhelper import CfnResource from botocore.exceptions import ClientError logger = logging.getLogger(__name__) sm = boto3.client('sagemaker') # cfnhelper makes it easier to implement a CloudFormation custom resource helper = CfnResource() # CFN Handlers def handler(event, context): helper(event, context) @helper.create def create_handler(event, context): """ Called when CloudFormation custom resource sends the create event """ create_monitoring_schedule(event) @helper.delete def delete_handler(event, context): """ Called when CloudFormation custom resource sends the delete event """ schedule_name = get_schedule_name(event) delete_monitoring_schedule(schedule_name) @helper.poll_create def poll_create(event, context): """ Return true if the resource has been created and false otherwise so CloudFormation polls again. """ schedule_name = get_schedule_name(event) logger.info('Polling for creation of schedule: %s', schedule_name) return is_schedule_ready(schedule_name) @helper.update def noop(): """ Not currently implemented but crhelper will throw an error if it isn't added """ pass # Helper Functions def get_schedule_name(event): return event['ResourceProperties']['ScheduleName'] def create_monitoring_schedule(event): schedule_name = get_schedule_name(event) monitoring_schedule_config = create_monitoring_schedule_config(event) logger.info('Creating monitoring schedule with name: %s', schedule_name) sm.create_monitoring_schedule( MonitoringScheduleName=schedule_name, MonitoringScheduleConfig=monitoring_schedule_config) def is_schedule_ready(schedule_name): is_ready = False schedule = sm.describe_monitoring_schedule(MonitoringScheduleName=schedule_name) status = schedule['MonitoringScheduleStatus'] if status == 'Scheduled': logger.info('Monitoring schedule (%s) is ready', schedule_name) is_ready = True elif status == 'Pending': logger.info('Monitoring schedule (%s) still creating, waiting and polling again...', schedule_name) else: raise Exception('Monitoring schedule ({}) has unexpected status: {}'.format(schedule_name, status)) return is_ready def create_monitoring_schedule_config(event): props = event['ResourceProperties'] return { "ScheduleConfig": { "ScheduleExpression": props["ScheduleExpression"], }, "MonitoringJobDefinition": { "BaselineConfig": { "ConstraintsResource": { "S3Uri": props['BaselineConstraintsUri'], }, "StatisticsResource": { "S3Uri": props['BaselineStatisticsUri'], } }, "MonitoringInputs": [ { "EndpointInput": { "EndpointName": props["EndpointName"], "LocalPath": props["InputLocalPath"], } } ], "MonitoringOutputConfig": { "MonitoringOutputs": [ { "S3Output": { "S3Uri": props["OutputS3URI"], "LocalPath": props["OutputLocalPath"], } } ], }, "MonitoringResources": { "ClusterConfig": { "InstanceCount": 1, "InstanceType": "ml.t3.medium", "VolumeSizeInGB": 50, } }, "MonitoringAppSpecification": { "ImageUri": props["ImageURI"], "RecordPreprocessorSourceUri": props['PostAnalyticsProcessorSourceUri'], "PostAnalyticsProcessorSourceUri": props['PostAnalyticsProcessorSourceUri'], }, "StoppingCondition": { "MaxRuntimeInSeconds": 300 }, "RoleArn": props["PassRoleArn"], } } def delete_monitoring_schedule(schedule_name): logger.info('Deleting schedule: %s', schedule_name) try: sm.delete_monitoring_schedule(MonitoringScheduleName=schedule_name) except ClientError as e: if e.response['Error']['Code'] == 'ResourceNotFound': logger.info('Resource not found, nothing to delete') else: logger.error('Unexpected error while trying to delete monitoring schedule') raise e