本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用Amazon CloudFormation自定义资源为实时端点创建监控计划
如果您使用的是实时终端节点,则可以使用Amazon CloudFormation自定义资源来创建监控计划。自定义资源位于 Python 中。要部署它,请参阅 Python Lambda 部署。
自定义资源
首先,将自定义资源添加到您的 Amazon CloudFormation 模板。这指向你在下一步中创建的Amazon Lambda函数。
此资源使您可以自定义监控计划的参数。您可以通过修改以下示例资源中的Amazon CloudFormation资源和 Lambda 函数来添加或删除更多参数。
{ "AWSTemplateFormatVersion": "2010-09-09", "Resources": { "MonitoringSchedule": { "Type": "Custom::MonitoringSchedule", "Version": "1.0", "Properties": { "ServiceToken": "arn:aws:lambda:us-west-2:111111111111:function:lambda-name", "ScheduleName": "YourScheduleName", "EndpointName": "YourEndpointName", "BaselineConstraintsUri": "s3://your-baseline-constraints/constraints.json", "BaselineStatisticsUri": "s3://your-baseline-stats/statistics.json", "PostAnalyticsProcessorSourceUri": "s3://your-post-processor/postprocessor.py", "RecordPreprocessorSourceUri": "s3://your-preprocessor/preprocessor.py", "InputLocalPath": "/opt/ml/processing/endpointdata", "OutputLocalPath": "/opt/ml/processing/localpath", "OutputS3URI": "s3://your-output-uri", "ImageURI": "111111111111.dkr.ecr.us-west-2.amazonaws.com/your-image", "ScheduleExpression": "cron(0 * ? * * *)", "PassRoleArn": "arn:aws:iam::111111111111:role/AmazonSageMaker-ExecutionRole" } } } }
Lambda 自定义资源代码
此Amazon CloudFormation自定义资源使用自定义资源助手pip install crhelper
该库与 pip 一起安装。
在创建和删除堆栈的过程中,Amazon CloudFormation 将调用此 Lambda 函数。此 Lambda 函数负责创建和删除监控计划,并使用上一部分中描述的自定义资源中定义的参数。
import boto3 import botocore import logging from crhelper import CfnResource from botocore.exceptions import ClientError logger = logging.getLogger(__name__) sm = boto3.client('sagemaker') # cfnhelper makes it easier to implement a CloudFormation custom resource helper = CfnResource() # CFN Handlers def handler(event, context): helper(event, context) @helper.create def create_handler(event, context): """ Called when CloudFormation custom resource sends the create event """ create_monitoring_schedule(event) @helper.delete def delete_handler(event, context): """ Called when CloudFormation custom resource sends the delete event """ schedule_name = get_schedule_name(event) delete_monitoring_schedule(schedule_name) @helper.poll_create def poll_create(event, context): """ Return true if the resource has been created and false otherwise so CloudFormation polls again. """ schedule_name = get_schedule_name(event) logger.info('Polling for creation of schedule: %s', schedule_name) return is_schedule_ready(schedule_name) @helper.update def noop(): """ Not currently implemented but crhelper will throw an error if it isn't added """ pass # Helper Functions def get_schedule_name(event): return event['ResourceProperties']['ScheduleName'] def create_monitoring_schedule(event): schedule_name = get_schedule_name(event) monitoring_schedule_config = create_monitoring_schedule_config(event) logger.info('Creating monitoring schedule with name: %s', schedule_name) sm.create_monitoring_schedule( MonitoringScheduleName=schedule_name, MonitoringScheduleConfig=monitoring_schedule_config) def is_schedule_ready(schedule_name): is_ready = False schedule = sm.describe_monitoring_schedule(MonitoringScheduleName=schedule_name) status = schedule['MonitoringScheduleStatus'] if status == 'Scheduled': logger.info('Monitoring schedule (%s) is ready', schedule_name) is_ready = True elif status == 'Pending': logger.info('Monitoring schedule (%s) still creating, waiting and polling again...', schedule_name) else: raise Exception('Monitoring schedule ({}) has unexpected status: {}'.format(schedule_name, status)) return is_ready def create_monitoring_schedule_config(event): props = event['ResourceProperties'] return { "ScheduleConfig": { "ScheduleExpression": props["ScheduleExpression"], }, "MonitoringJobDefinition": { "BaselineConfig": { "ConstraintsResource": { "S3Uri": props['BaselineConstraintsUri'], }, "StatisticsResource": { "S3Uri": props['BaselineStatisticsUri'], } }, "MonitoringInputs": [ { "EndpointInput": { "EndpointName": props["EndpointName"], "LocalPath": props["InputLocalPath"], } } ], "MonitoringOutputConfig": { "MonitoringOutputs": [ { "S3Output": { "S3Uri": props["OutputS3URI"], "LocalPath": props["OutputLocalPath"], } } ], }, "MonitoringResources": { "ClusterConfig": { "InstanceCount": 1, "InstanceType": "ml.t3.medium", "VolumeSizeInGB": 50, } }, "MonitoringAppSpecification": { "ImageUri": props["ImageURI"], "RecordPreprocessorSourceUri": props['PostAnalyticsProcessorSourceUri'], "PostAnalyticsProcessorSourceUri": props['PostAnalyticsProcessorSourceUri'], }, "StoppingCondition": { "MaxRuntimeInSeconds": 300 }, "RoleArn": props["PassRoleArn"], } } def delete_monitoring_schedule(schedule_name): logger.info('Deleting schedule: %s', schedule_name) try: sm.delete_monitoring_schedule(MonitoringScheduleName=schedule_name) except ClientError as e: if e.response['Error']['Code'] == 'ResourceNotFound': logger.info('Resource not found, nothing to delete') else: logger.error('Unexpected error while trying to delete monitoring schedule') raise e