管道步骤 - Amazon SageMaker
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

管道步骤

SageMaker 管道由步骤组成。这些步骤使用属性定义管道采取的操作以及各步骤之间的关系。

步骤类型

以下内容描述了每种步骤类型的要求,并提供了步骤的实施示例。这些不是功能实施,因为它们不提供所需的资源和输入。有关实施这些步骤的教程,请参阅创建和管理 SageMaker 管道

注意

您还可以使用@step装饰器将本地机器学习代码转换为 Pipelin SageMaker es 步骤,从而使用本地机器学习代码创建步骤。有关更多信息,请参阅 @step decorator

Amazon SageMaker 模型构建管道支持以下步骤类型:

@step decorator

您可以使用@step装饰器从本地机器学习代码创建步骤。测试代码后,您可以通过使用装饰器对其进行注释来将该函数转换为 SageMaker 流水线步骤。@step SageMaker 当您将@step装饰函数的输出作为步骤传递给管道时,Pipelines 会创建并运行管道。您还可以创建一个多步骤 DAG 管道,其中包括一个或多个@step经过装饰的函数以及传统的 SageMaker 管道步骤。有关如何使用@step装饰器创建步骤的更多详细信息,请参阅L 带有 @step 装饰器的 ift-and-shift Python 代码

处理步骤

使用处理步骤创建用于数据处理的处理作业。有关处理作业的更多信息,请参阅处理数据和评估模型

处理步骤需要处理器、定义处理代码的 Python 脚本、用于处理的输出以及作业参数。以下示例说明如何创建 ProcessingStep 定义。

from sagemaker.sklearn.processing import SKLearnProcessor sklearn_processor = SKLearnProcessor(framework_version='1.0-1', role=<role>, instance_type='ml.m5.xlarge', instance_count=1)
from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep inputs = [ ProcessingInput(source=<input_data>, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] step_process = ProcessingStep( name="AbaloneProcess", step_args = sklearn_processor.run(inputs=inputs, outputs=outputs, code="abalone/preprocessing.py") )

传递运行时参数

以下示例说明如何将运行时参数从 PySpark 处理器传递给ProcessingStep

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.spark.processing import PySparkProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep pipeline_session = PipelineSession() pyspark_processor = PySparkProcessor( framework_version='2.4', role=<role>, instance_type='ml.m5.xlarge', instance_count=1, sagemaker_session=pipeline_session, ) step_args = pyspark_processor.run( inputs=[ProcessingInput(source=<input_data>, destination="/opt/ml/processing/input"),], outputs=[ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ], code="preprocess.py", arguments=None, ) step_process = ProcessingStep( name="AbaloneProcess", step_args=step_args, )

有关处理步骤要求的更多信息,请参阅 sagemaker.workflow.steps。 ProcessingStep文档。有关深入的示例,请参阅 “使用 Amazon Pipelin SageMaker es 编排任务以训练和评估模型” 示例笔记本中的 “定义功能工程的处理步骤”。

训练步骤

您可以使用训练步骤创建训练作业来训练模型。有关训练作业的更多信息,请参阅使用 Amazon 训练模型 SageMaker

训练步骤需要估算器以及训练和验证数据输入。以下示例说明如何创建 TrainingStep 定义。有关训练步骤要求的更多信息,请参阅 sagemaker.workflow.steps。 TrainingStep文档。

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TrainingStep from sagemaker.xgboost.estimator import XGBoost pipeline_session = PipelineSession() xgb_estimator = XGBoost(..., sagemaker_session=pipeline_session) step_args = xgb_estimator.fit( inputs={ "train": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv" ), "validation": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv" ) } ) step_train = TrainingStep( name="TrainAbaloneModel", step_args=step_args, )

优化步骤

您可以使用优化步骤来创建超参数优化作业,也称为超参数优化 (HPO)。一个超参数优化作业运行多个训练作业,每个作业会生成一个模型版本。有关超参数优化的更多信息,请参阅使用执行自动模型调整 SageMaker

调优作业与管道的 SageMaker 实验相关联,而训练作业是作为试验创建的。有关更多信息,请参阅 实验集成

调整步骤需要HyperparameterTuner和训练输入。您可以通过指定 HyperparameterTunerwarm_start_config 参数来再训练以前的优化作业。有关超参数优化和温启动的更多信息,请参阅运行热启动超参数调优作业

你可以使用 sagemaker.workflow.step s 的 get_top_model_s3_uri 方法。 TuningStepclass,用于从性能最好的模型版本之一中获取模型工件。有关显示如何在 SageMaker 管道中使用调整步骤的笔记本,请参阅 sagemaker-pipelines-tuning-step.ipynb。

重要

Amaz SageMaker on Python SDK v2.48.0 和 Amazon Studio Classic v3.8.0 中引入了调整步骤。 SageMaker 在使用调整步骤之前,必须更新 Studio Classic,否则管道 DAG 将不会显示。要更新 Studio 经典版,请参阅关闭并更新 SageMaker Studio 经典版

以下示例说明如何创建 TuningStep 定义。

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.tuner import HyperparameterTuner from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TuningStep tuner = HyperparameterTuner(..., sagemaker_session=PipelineSession()) step_tuning = TuningStep( name = "HPTuning", step_args = tuner.fit(inputs=TrainingInput(s3_data="s3://my-bucket/my-data")) )

获取最佳模型版本

以下示例说明如何使用 get_top_model_s3_uri 方法从优化作业中获取最佳模型版本。最多只能根据排名前50位的版本进行排名HyperParameterTuningJobObjectivetop_k 参数是版本的索引,其中 top_k=0 是性能最好的版本,top_k=49 是性能最差的版本。

best_model = Model( image_uri=image_uri, model_data=step_tuning.get_top_model_s3_uri( top_k=0, s3_bucket=sagemaker_session.default_bucket() ), ... )

有关调整步骤要求的更多信息,请参阅 sagemaker.workflow.steps。 TuningStep文档。

AutoML 步骤

使用 AutoML API 创建 AutoML 作业以自动训练模型。有关 AutoML 任务的更多信息,请参阅使用 Ama SageMaker zon Autopilot 自动开发模型

注意

目前,AutoML 步骤仅支持组合训练模式

以下示例说明如何使用 AutoMLStep 创建定义。

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.workflow.automl_step import AutoMLStep pipeline_session = PipelineSession() auto_ml = AutoML(..., role="<role>", target_attribute_name="my_target_attribute_name", mode="ENSEMBLING", sagemaker_session=pipeline_session) input_training = AutoMLInput( inputs="s3://my-bucket/my-training-data", target_attribute_name="my_target_attribute_name", channel_type="training", ) input_validation = AutoMLInput( inputs="s3://my-bucket/my-validation-data", target_attribute_name="my_target_attribute_name", channel_type="validation", ) step_args = auto_ml.fit( inputs=[input_training, input_validation] ) step_automl = AutoMLStep( name="AutoMLStep", step_args=step_args, )

获取最佳模型版本

AutoML 步骤会自动训练多个候选模型。您可以使用 get_best_auto_ml_model 方法和 IAM role 从 AutoML 作业中获取具有最佳目标指标的模型,以访问模型构件,如下所示。

best_model = step_automl.get_best_auto_ml_model(role=<role>)

有关更多信息,请参阅 Pyth SageMaker on 软件开发工具包中的 AutoML 步骤。

模型步骤

使用ModelStep创建或注册 SageMaker 模型。有关ModelStep要求的更多信息,请参阅 sagemaker.workflow. model_step。 ModelStep文档。

创建模型

您可以使用ModelStep来创建 SageMaker 模型。A ModelStep 需要模型工件和有关创建模型时需要使用的 SageMaker 实例类型的信息。有关 SageMaker 模型的更多信息,请参阅使用 Amazon 训练模型 SageMaker

以下示例说明如何创建 ModelStep 定义。

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.model import Model from sagemaker.workflow.model_step import ModelStep step_train = TrainingStep(...) model = Model( image_uri=pytorch_estimator.training_image_uri(), model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, sagemaker_session=PipelineSession(), role=role, ) step_model_create = ModelStep( name="MyModelCreationStep", step_args=model.create(instance_type="ml.m5.xlarge"), )

注册模型

您可以使用在 Amazon SageMaker 模型注册表中注册sagemaker.model.Model或。ModelStep sagemaker.pipeline.PipelineModelPipelineModel 表示推理管道,是一个由处理推理请求的容器线性序列组成的模型。有关如何注册模型的更多信息,请参阅使用模型注册表注册和部署模型

以下示例说明如何创建一个注册 PipelineModelModelStep

import time from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.sklearn import SKLearnModel from sagemaker.xgboost import XGBoostModel pipeline_session = PipelineSession() code_location = 's3://{0}/{1}/code'.format(bucket_name, prefix) sklearn_model = SKLearnModel( model_data=processing_step.properties.ProcessingOutputConfig.Outputs['model'].S3Output.S3Uri, entry_point='inference.py', source_dir='sklearn_source_dir/', code_location=code_location, framework_version='1.0-1', role=role, sagemaker_session=pipeline_session, py_version='py3' ) xgboost_model = XGBoostModel( model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts, entry_point='inference.py', source_dir='xgboost_source_dir/', code_location=code_location, framework_version='0.90-2', py_version='py3', sagemaker_session=pipeline_session, role=role ) from sagemaker.workflow.model_step import ModelStep from sagemaker import PipelineModel pipeline_model = PipelineModel( models=[sklearn_model, xgboost_model], role=role,sagemaker_session=pipeline_session, ) register_model_step_args = pipeline_model.register( content_types=["application/json"], response_types=["application/json"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name='sipgroup', ) step_model_registration = ModelStep( name="AbaloneRegisterModel", step_args=register_model_step_args, )

CreateModel 步骤

重要

我们建议使用从 P 模型步骤 ython 软件开发工具包的 2.90.0 版本开始创建模型。 SageMaker CreateModelStep将继续在以前版本的 SageMaker Python SDK 中运行,但不再受支持。

您可以使用CreateModel步骤来创建 SageMaker 模型。有关 SageMaker 模型的更多信息,请参阅使用 Amazon 训练模型 SageMaker

创建模型步骤需要模型工件和有关创建模型所需的 SageMaker 实例类型的信息。以下示例说明如何创建 CreateModel 步骤定义。有关CreateModel步骤要求的更多信息,请参阅 sagemaker.workflow.steps。 CreateModelStep文档。

from sagemaker.workflow.steps import CreateModelStep step_create_model = CreateModelStep( name="AbaloneCreateModel", model=best_model, inputs=inputs )

RegisterModel 步骤

重要

我们建议使用从 P 模型步骤 ython SDK 版本 2.90.0 起注册模型。 SageMaker RegisterModel将继续在以前版本的 SageMaker Python SDK 中运行,但不再受支持。

你可以使用RegisterModel步骤注册 sagemaker.model.model 或 sagemaker.pipeline。 PipelineModel使用 Amazon SageMaker 模型注册表。PipelineModel 表示推理管道,是一个由处理推理请求的容器线性序列组成的模型。

有关如何注册模型的更多信息,请参阅使用模型注册表注册和部署模型。有关RegisterModel步骤要求的更多信息,请参阅 s agemaker.workflow.step_collections。 RegisterModel文档。

以下示例说明如何创建注册 PipelineModelRegisterModel 步骤。

import time from sagemaker.sklearn import SKLearnModel from sagemaker.xgboost import XGBoostModel code_location = 's3://{0}/{1}/code'.format(bucket_name, prefix) sklearn_model = SKLearnModel(model_data=processing_step.properties.ProcessingOutputConfig.Outputs['model'].S3Output.S3Uri, entry_point='inference.py', source_dir='sklearn_source_dir/', code_location=code_location, framework_version='1.0-1', role=role, sagemaker_session=sagemaker_session, py_version='py3') xgboost_model = XGBoostModel(model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts, entry_point='inference.py', source_dir='xgboost_source_dir/', code_location=code_location, framework_version='0.90-2', py_version='py3', sagemaker_session=sagemaker_session, role=role) from sagemaker.workflow.step_collections import RegisterModel from sagemaker import PipelineModel pipeline_model = PipelineModel(models=[sklearn_model,xgboost_model],role=role,sagemaker_session=sagemaker_session) step_register = RegisterModel( name="AbaloneRegisterModel", model=pipeline_model, content_types=["application/json"], response_types=["application/json"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name='sipgroup', )

如果未提供 model,则注册模型步骤需要使用估算器,如以下示例所示。

from sagemaker.workflow.step_collections import RegisterModel step_register = RegisterModel( name="AbaloneRegisterModel", estimator=xgb_train, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, content_types=["text/csv"], response_types=["text/csv"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name=model_package_group_name, approval_status=model_approval_status, model_metrics=model_metrics )

转换步骤

您可以使用转换步骤进行批量转换,在整个数据集上运行推理。有关批量转换的更多信息,请参阅通过推理管道运行批量转换

转换步骤需要转换器和用于运行批量转换的数据。以下示例说明如何创建 Transform 步骤定义。有关Transform步骤要求的更多信息,请参阅 sagemaker.workflow.steps。 TransformStep文档。

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.transformer import Transformer from sagemaker.inputs import TransformInput from sagemaker.workflow.steps import TransformStep transformer = Transformer(..., sagemaker_session=PipelineSession()) step_transform = TransformStep( name="AbaloneTransform", step_args=transformer.transform(data="s3://my-bucket/my-data"), )

条件步骤

您可以使用条件步骤来评估步骤属性的条件,以评估管道中下一步应该采取的操作。

一个条件步骤需要一个条件列表、一个条件评估为 true 时要运行的步骤列表和一个条件为 false 时要运行的步骤列表。以下示例说明如何创建 ConditionStep 定义。

限制

  • SageMaker 管道不支持使用嵌套条件步骤。不能将一个条件步骤作为另一个条件步骤的输入进行传递。

  • 一个条件步骤不能在两个分支中使用相同的步骤。如果需要在两个分支中使用相同的步骤功能,请复制该步骤并为其指定不同的名称。

from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.functions import JsonGet cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step_name=step_eval.name, property_file=evaluation_report, json_path="regression_metrics.mse.value" ), right=6.0 ) step_cond = ConditionStep( name="AbaloneMSECond", conditions=[cond_lte], if_steps=[step_register, step_create_model, step_transform], else_steps=[] )

有关ConditionStep要求的更多信息,请参阅 sagemaker.workflow.con dition_step。 ConditionStepAPI 参考。有关支持条件的更多信息,请参阅 Python SDK 文档中的 Amaz SageMaker on SageMaker 模型构建管道-条件

回调步骤

您可以使用Callback步骤将其他流程和 Amazon 服务整合到工作流程中,这些流程和服务不是由 Amazon SageMaker 模型构建管道直接提供的。当 Callback 步骤运行时,会发生以下过程:

  • SageMaker Pipelines 向客户指定的亚马逊简单队列服务 (Amazon SQS) Simple SQUEE 队列发送消息。该消息包含 SageMaker Pipelines 生成的令牌和客户提供的输入参数列表。发送消息后,Pi SageMaker pelines 会等待客户的回复。

  • 客户从 Amazon SQS 队列中检索消息并启动他们的自定义流程。

  • 流程完成后,客户调用以下 API 之一,并提交 SageMaker Pipelines 生成的令牌:

  • API 调用会导致 Pipelin SageMaker es 要么继续管道进程,要么使流程失败。

有关Callback步骤要求的更多信息,请参阅 sagemaker.workflow.cal lback_step。 CallbackStep文档。有关完整的解决方案,请参阅使用回调步骤扩展 SageMaker 管道以包含自定义步骤

重要

Callback亚马逊 SageMaker Python SDK v2.45.0 和 Amazon SageMaker Studio Classic v3.6.2 中引入了步骤。在使用Callback步骤之前必须更新 Studio Classic,否则将不会显示管道 DAG。要更新 Studio 经典版,请参阅关闭并更新 SageMaker Studio 经典版

以下示例演示了上述过程的实施。

from sagemaker.workflow.callback_step import CallbackStep step_callback = CallbackStep( name="MyCallbackStep", sqs_queue_url="https://sqs.us-east-2.amazonaws.com/012345678901/MyCallbackQueue", inputs={...}, outputs=[...] ) callback_handler_code = ' import boto3 import json def handler(event, context): sagemaker_client=boto3.client("sagemaker") for record in event["Records"]: payload=json.loads(record["body"]) token=payload["token"] # Custom processing # Call SageMaker to complete the step sagemaker_client.send_pipeline_execution_step_success( CallbackToken=token, OutputParameters={...} ) '
注意

不得嵌套 CallbackStep 的输出参数。例如,如果您使用嵌套字典作为输出参数,则该字典将被视为单个字符串(例如 {"output1": "{\"nested_output1\":\"my-output\"}"})。如果您提供嵌套值,则当您尝试引用特定输出参数时,将引发不可重试的客户端错误。

停止行为

Callback 步骤运行时,管道进程不会停止。

当您使用正在运行的Callback步骤调用StopPipelineExecution管道进程时,Pipelin SageMaker es 会向指定的 SQS 队列发送额外的 Amazon SQS 消息。SQS 消息正文包含一个状态字段,该字段设置为 Stopping。以下是 SQS 消息正文示例。

{ "token": "26vcYbeWsZ", "pipelineExecutionArn": "arn:aws:sagemaker:us-east-2:012345678901:pipeline/callback-pipeline/execution/7pinimwddh3a", "arguments": { "number": 5, "stringArg": "some-arg", "inputData": "s3://sagemaker-us-west-2-012345678901/abalone/abalone-dataset.csv" }, "status": "Stopping" }

您应向 Amazon SQS 消息使用者添加逻辑,以便在收到消息后采取任何必要的操作(例如资源清理),然后调用 SendPipelineExecutionStepSuccessSendPipelineExecutionStepFailure

只有当 Pipelin SageMaker es 收到其中一个调用时,它才会停止管道进程。

Lambda 步骤

您可以使用 Lambda 步骤来运行函数。 Amazon Lambda 您可以运行现有的 Lambda 函数, SageMaker 也可以创建并运行新的 Lambda 函数。有关显示如何在 SageMaker 管道中使用 Lambda 步骤的笔记本,请参阅 sagemaker-pipelines-lambda-step .ipynb。

重要

亚马逊 Pyth SageMaker on SDK v2.51.0 和亚马 SageMaker 逊 Studio Classic v3.9.1 中引入了 Lambda 步骤。在使用 Lambda 步骤之前,您必须更新 Studio Classic,否则将不会显示管道 DAG。要更新 Studio 经典版,请参阅关闭并更新 SageMaker Studio 经典版

SageMaker 提供 sagemaker.lambda_Helper.Lambda 类来创建、更新、调用和删除 Lambda 函数。 Lambda具有以下签名。

Lambda( function_arn, # Only required argument to invoke an existing Lambda function # The following arguments are required to create a Lambda function: function_name, execution_role_arn, zipped_code_dir, # Specify either zipped_code_dir and s3_bucket, OR script s3_bucket, # S3 bucket where zipped_code_dir is uploaded script, # Path of Lambda function script handler, # Lambda handler specified as "lambda_script.lambda_handler" timeout, # Maximum time the Lambda function can run before the lambda step fails ... )

sagemaker. workflow.lambda_step。 LambdaStep类的lambda_func参数类型为Lambda。要调用现有 Lambda 函数,唯一的要求是将函数的 Amazon 资源名称 (ARN) 提供给 function_arn。如果不提供 function_arn 的值,则必须指定 handler 和以下内容之一:

  • zipped_code_dir - 压缩后的 Lambda 函数的路径

    s3_bucket - 要上传 zipped_code_dir 的 Amazon S3 存储桶

  • script - Lambda 函数脚本文件的路径

以下示例说明如何创建调用现有 Lambda 函数的 Lambda 步骤定义。

from sagemaker.workflow.lambda_step import LambdaStep from sagemaker.lambda_helper import Lambda step_lambda = LambdaStep( name="ProcessingLambda", lambda_func=Lambda( function_arn="arn:aws:lambda:us-west-2:012345678910:function:split-dataset-lambda" ), inputs={ s3_bucket = s3_bucket, data_file = data_file }, outputs=[ "train_file", "test_file" ] )

以下示例说明如何创建 Lambda 步骤定义,该定义使用 Lambda 函数脚本创建和调用 Lambda 函数。

from sagemaker.workflow.lambda_step import LambdaStep from sagemaker.lambda_helper import Lambda step_lambda = LambdaStep( name="ProcessingLambda", lambda_func=Lambda( function_name="split-dataset-lambda", execution_role_arn=execution_role_arn, script="lambda_script.py", handler="lambda_script.lambda_handler", ... ), inputs={ s3_bucket = s3_bucket, data_file = data_file }, outputs=[ "train_file", "test_file" ] )

输入和输出

如果您的 Lambda 函数有输入或输出,则还必须在 Lambda 步骤中定义这些输入或输出。

注意

不得嵌套输入和输出参数。例如,如果您使用嵌套字典作为输出参数,则该字典将被视为单个字符串(例如 {"output1": "{\"nested_output1\":\"my-output\"}"})。如果您提供嵌套值并尝试稍后再引用该值,则会引发不可重试的客户端错误。

定义 Lambda 步骤时,inputs 必须是键值对的字典。inputs 字典的每个值都必须是基元类型(字符串、整数或浮点数)。不支持嵌套对象。如果 inputs 值未定义,则默认为 None

outputs 值必须是键列表。这些键是指 Lambda 函数输出中定义的字典。比如 inputs,这些键必须是基元类型,并且不支持嵌套对象。

超时和停止行为

Lambda 类有一个 timeout 参数,用于指定 Lambda 函数可以运行的最长时间。默认值为 120 秒,最大值为 10 分钟。如果在达到超时条件时 Lambda 函数正在运行,则 Lambda 步骤将失败;但 Lambda 函数会继续运行。

当 Lambda 步骤正在运行时,无法停止管道进程,因为无法停止 Lambda 步骤调用的 Lambda 函数。如果您在 Lambda 函数正在运行时尝试停止进程,则管道将等待 Lambda 函数完成或直到达到超时(以先发生者为准),然后停止。如果 Lambda 函数完成,则管道进程状态为 Stopped。如果达到超时,则管道进程状态为 Failed

ClarifyCheck 步骤

您可以使用 ClarifyCheck 步骤对照先前的基准进行基准偏差检查,以进行偏差分析和实现模型可解释性。然后,您可以使用 model.register() 方法生成和注册基准,并使用 step_args 将该方法的输出传递给 模型步骤。Amazon SageMaker Model Monitor 可以将这些用于偏差检查的基准用于您的模型终端节点,因此您无需单独提出基准建议。ClarifyCheck 步骤还可以从模型注册表中提取偏差检查基准。该ClarifyCheck步骤利用了 Amazon Cl SageMaker arify 预建容器,该容器提供了一系列模型监控功能,包括约束建议和根据给定基准进行约束验证。有关更多信息,请参阅 Clar ify 容 SageMaker 器入门

配置 ClarifyCheck 步骤

您可以将 ClarifyCheck 步骤配置为每次在管道中使用时仅执行以下一种类型的检查。

  • 数据偏差检查

  • 模型偏差检查

  • 模型可解释性检查

您可以通过将 clarify_check_config 参数设置为以下检查类型值之一来实现此目的:

  • DataBiasCheckConfig

  • ModelBiasCheckConfig

  • ModelExplainabilityCheckConfig

ClarifyCheck步骤启动一个运行Clarify预 SageMaker建容器的处理作业,并且需要为支票和处理作业进行专门的配置ClarifyCheckConfigCheckJobConfig是这些配置的辅助函数,它们与 Clarify 处理作业用于检查模型偏差、数据偏差或模型可解释性的计算方式一致。 SageMaker 有关更多信息,请参阅 运行 Cl SageMaker arify 处理作业以实现偏见分析和可解释性

控制偏差检查的步骤行为

ClarifyCheck 步骤需要以下两个布尔标志来控制其行为:

  • skip_check:此参数表示是否跳过针对先前基准的偏差检查。如果将其设置为 False,则配置的检查类型的先前基准必须可用。

  • register_new_baseline:此参数表示是否可以通过步骤属性 BaselineUsedForDriftCheckConstraints 访问新计算的基准。如果将其设置为 False,则配置的检查类型的先前基准也必须可用。这可以通过 BaselineUsedForDriftCheckConstraints 属性访问。

有关更多信息,请参阅 基准计算、偏差检测和生命周期 ClarifyCheck 以及 Amazon SageMaker 模型构建管道中的 QualityCheck 步骤

使用基准

您可以选择指定 model_package_group_name 来查找现有基准,然后 ClarifyCheck 步骤会在模型包组中最新批准的模型包上提取 DriftCheckBaselines。或者,您可以通过 supplied_baseline_constraints 参数提供先前的基准。如果同时指定 model_package_group_namesupplied_baseline_constraints,则 ClarifyCheck 步骤将使用 supplied_baseline_constraints 参数指定的基准。

有关使用ClarifyCheck步骤要求的更多信息,请参阅 sagemaker.workflow.steps。 ClarifyCheckStep在适用于 Python 的亚马逊 SageMaker SageMaker SDK 中。有关展示如何使用 Pipelin SageMaker es 中ClarifyCheck步骤的 Amazon SageMaker Studio Classic 笔记本,请参阅 sagemaker-pipeline-model-monitor-clarify-steps.ipynb

例 创建 ClarifyCheck 步骤以进行数据偏差检查
from sagemaker.workflow.check_job_config import CheckJobConfig from sagemaker.workflow.clarify_check_step import DataBiasCheckConfig, ClarifyCheckStep from sagemaker.workflow.execution_variables import ExecutionVariables check_job_config = CheckJobConfig( role=role, instance_count=1, instance_type="ml.c5.xlarge", volume_size_in_gb=120, sagemaker_session=sagemaker_session, ) data_bias_data_config = DataConfig( s3_data_input_path=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, s3_output_path=Join(on='/', values=['s3:/', your_bucket, base_job_prefix, ExecutionVariables.PIPELINE_EXECUTION_ID, 'databiascheckstep']), label=0, dataset_type="text/csv", s3_analysis_config_output_path=data_bias_analysis_cfg_output_path, ) data_bias_config = BiasConfig( label_values_or_threshold=[15.0], facet_name=[8], facet_values_or_threshold=[[0.5]] ) data_bias_check_config = DataBiasCheckConfig( data_config=data_bias_data_config, data_bias_config=data_bias_config, )h data_bias_check_step = ClarifyCheckStep( name="DataBiasCheckStep", clarify_check_config=data_bias_check_config, check_job_config=check_job_config, skip_check=False, register_new_baseline=False supplied_baseline_constraints="s3://sagemaker-us-west-2-111122223333/baseline/analysis.json", model_package_group_name="MyModelPackageGroup" )

QualityCheck 步骤

您可以使用 QualityCheck 步骤,针对管道中的数据质量或模型质量,对照先前的基准提出基准建议和进行偏差检查。然后,您可以使用 model.register() 方法生成和注册基准,并使用 step_args 将该方法的输出传递给 模型步骤。Model Monitor 可以将这些偏差检查基准用于模型端点,这样您就无需单独提出基准建议。QualityCheck 步骤还可以从模型注册表中提取偏差检查基准。该QualityCheck步骤利用了 Amazon SageMaker Model Monitor 的预建容器,该容器具有一系列模型监控功能,包括约束建议、统计数据生成和根据基准进行约束验证。有关更多信息,请参阅 Amazon SageMaker 模型监控器预建容器

配置 QualityCheck 步骤

您可以将 QualityCheck 步骤配置为每次在管道中使用时仅执行以下一种类型的检查。

  • 数据质量检查

  • 模型质量检查

您可以通过将 quality_check_config 参数设置为以下检查类型值之一来实现此目的:

  • DataQualityCheckConfig

  • ModelQualityCheckConfig

QualityCheck 步骤会启动一个处理作业,该作业运行 Model Monitor 预构建容器,并且需要专门的用于检查和处理作业的配置。QualityCheckConfigCheckJobConfig 是这些配置的帮助程序函数,它们与 Model Monitor 为模型质量或数据质量监控创建基准的方式一致。有关 Model Monitor 基准建议的更多信息,请参阅创建基准创建模型质量基准

控制偏差检查的步骤行为

QualityCheck 步骤需要以下两个布尔标志来控制其行为:

  • skip_check:此参数表示是否跳过针对先前基准的偏差检查。如果将其设置为 False,则配置的检查类型的先前基准必须可用。

  • register_new_baseline:此参数表示是否可以通过步骤属性 BaselineUsedForDriftCheckConstraintsBaselineUsedForDriftCheckStatistics 访问新计算的基准。如果将其设置为 False,则配置的检查类型的先前基准也必须可用。它们可以通过 BaselineUsedForDriftCheckConstraintsBaselineUsedForDriftCheckStatistics 属性进行访问。

有关更多信息,请参阅基准计算、偏差检测和生命周期 ClarifyCheck 以及 Amazon SageMaker 模型构建管道中的 QualityCheck 步骤

使用基准

您可以直接通过 supplied_baseline_statisticssupplied_baseline_constraints 参数指定先前的基准,也可以简单地指定 model_package_group_name,然后 QualityCheck 步骤会在模型包组中最新批准的模型包上提取 DriftCheckBaselines。当您指定 model_package_group_namesupplied_baseline_constraintssupplied_baseline_statistics 时,QualityCheck 步骤将使用您正在运行的 QualityCheck 步骤检查类型上的 supplied_baseline_constraintssupplied_baseline_statistics 所指定的基准。

有关使用QualityCheck步骤要求的更多信息,请参阅 sagemaker.workflow.steps。 QualityCheckStep在适用于 Python 的亚马逊 SageMaker SageMaker SDK 中。有关展示如何使用 Pipelin SageMaker es 中QualityCheck步骤的 Amazon SageMaker Studio Classic 笔记本,请参阅 sagemaker-pipeline-model-monitor-clarify-steps.ipynb

例 创建 QualityCheck 步骤以进行数据质量检查
from sagemaker.workflow.check_job_config import CheckJobConfig from sagemaker.workflow.quality_check_step import DataQualityCheckConfig, QualityCheckStep from sagemaker.workflow.execution_variables import ExecutionVariables check_job_config = CheckJobConfig( role=role, instance_count=1, instance_type="ml.c5.xlarge", volume_size_in_gb=120, sagemaker_session=sagemaker_session, ) data_quality_check_config = DataQualityCheckConfig( baseline_dataset=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, dataset_format=DatasetFormat.csv(header=False, output_columns_position="START"), output_s3_uri=Join(on='/', values=['s3:/', your_bucket, base_job_prefix, ExecutionVariables.PIPELINE_EXECUTION_ID, 'dataqualitycheckstep']) ) data_quality_check_step = QualityCheckStep( name="DataQualityCheckStep", skip_check=False, register_new_baseline=False, quality_check_config=data_quality_check_config, check_job_config=check_job_config, supplied_baseline_statistics="s3://sagemaker-us-west-2-555555555555/baseline/statistics.json", supplied_baseline_constraints="s3://sagemaker-us-west-2-555555555555/baseline/constraints.json", model_package_group_name="MyModelPackageGroup" )

EMR 步骤

您可以使用亚马逊 SageMaker 模型构建管道 EMR 步骤在正在运行的 Amazon EMR 集群上处理 Amazon EMR 步骤,或者让管道为您创建和管理 Amazon EMR 集群。有关 Amazon EMR 的更多信息,请参阅 Amazon EMR 入门

EMR 步骤要求 EMRStepConfig 包括 Amazon EMR 集群要使用的 JAR 文件的位置以及要传递的所有参数。如果您想在正在运行的 EMR 集群上运行该步骤,则还需要提供 Amazon EMR 集群 ID;如果您想让 EMR 步骤在它为您创建、管理和终止的集群上运行,则还需要提供集群配置。以下几节包括演示这两种方法的示例笔记本的示例和链接。

注意
  • EMR 步骤要求传递给管道的角色具有其他权限。您应将 Amazon 托管策略:AmazonSageMakerPipelinesIntegrations 附加到管道角色,或者确保角色包含该策略中的权限。

  • EMR Serverless 和 Amazon EMR on EKS 上均不支持 EMR 步骤。

  • 如果您在正在运行的集群上处理 EMR 步骤,则只能使用处于以下状态之一的集群:STARTINGBOOTSTRAPPINGRUNNINGWAITING

  • 如果您在正在运行的集群上处理 EMR 步骤,则在 EMR 集群上最多可以有 256 个处于 PENDING 状态的 EMR 步骤。提交的 EMR 步骤超过此限制会导致管道执行失败。您可以考虑使用管道步骤的重试策略

  • 您可以指定集群 ID 或集群配置,但不能同时指定两者。

  • EMR 步骤依靠 Amazon EventBridge 来监控 EMR 步骤或集群状态的变化。如果您在正在运行的集群上处理 Amazon EMR 作业,则 EMR 步骤将使用 SageMakerPipelineExecutionEMRStepStatusUpdateRule 规则来监控 EMR 步骤的状态。如果您在 EMR 步骤为您创建的集群上处理作业,则该步骤将使用 SageMakerPipelineExecutionEMRClusterStatusRule 规则来监控集群状态的更改。如果您在 Amazon 账户中看到其中任何一条 EventBridge 规则,请不要将其删除,否则您的 EMR 步骤可能无法完成。

在正在运行的 Amazon EMR 集群上启动新作业

如果您想在正在运行的 Amazon EMR 集群上启动新作业,则可以将集群 ID 作为字符串传递给 EMRStepcluster_id 参数。以下示例演示了此过程。

from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig emr_config = EMRStepConfig( jar="jar-location", # required, path to jar file used args=["--verbose", "--force"], # optional list of arguments to pass to the jar main_class="com.my.Main1", # optional main class, this can be omitted if jar above has a manifest properties=[ # optional list of Java properties that are set when the step runs { "key": "mapred.tasktracker.map.tasks.maximum", "value": "2" }, { "key": "mapreduce.map.sort.spill.percent", "value": "0.90" }, { "key": "mapreduce.tasktracker.reduce.tasks.maximum", "value": "5" } ] ) step_emr = EMRStep ( name="EMRSampleStep", # required cluster_id="j-1ABCDEFG2HIJK", # include cluster_id to use a running cluster step_config=emr_config, # required display_name="My EMR Step", description="Pipeline step to execute EMR job" )

有关指导您完成完整示例的示例笔记本,请参阅运行 EMR 集群的 Pipelin SageMaker es EMR 步骤

在新的 Amazon EMR 集群上启动新作业

如果您想在为您EMRStep创建的新集群上启动新作业,请将您的集群配置作为字典提供,其结构与RunJobFlow请求相同。但是,请勿在集群配置中包含以下字段:

  • [Name]

  • [Steps]

  • [AutoTerminationPolicy]

  • [Instances][KeepJobFlowAliveWhenNoSteps]

  • [Instances][TerminationProtected]

所有其他 RunJobFlow 参数均可在您的集群配置中使用。有关请求语法的详细信息,请参阅RunJobFlow

以下示例将集群配置传递给 EMR 步骤定义,这会提示该步骤在新的 EMR 集群上启动新作业。此示例中的 EMR 集群配置包括主节点和核心 EMR 集群节点的规范。有关 Amazon EMR 节点类型的更多信息,请参阅了解节点类型:主节点、核心节点和任务节点

from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig emr_step_config = EMRStepConfig( jar="jar-location", # required, path to jar file used args=["--verbose", "--force"], # optional list of arguments to pass to the jar main_class="com.my.Main1", # optional main class, this can be omitted if jar above has a manifest properties=[ # optional list of Java properties that are set when the step runs { "key": "mapred.tasktracker.map.tasks.maximum", "value": "2" }, { "key": "mapreduce.map.sort.spill.percent", "value": "0.90" }, { "key": "mapreduce.tasktracker.reduce.tasks.maximum", "value": "5" } ] ) # include your cluster configuration as a dictionary emr_cluster_config = { "Applications": [ { "Name": "Spark", } ], "Instances":{ "InstanceGroups":[ { "InstanceRole": "MASTER", "InstanceCount": 1, "InstanceType": "m5.2xlarge" }, { "InstanceRole": "CORE", "InstanceCount": 2, "InstanceType": "m5.2xlarge" } ] }, "BootstrapActions":[], "ReleaseLabel": "emr-6.6.0", "JobFlowRole": "job-flow-role", "ServiceRole": "service-role" } emr_step = EMRStep( name="emr-step", cluster_id=None, display_name="emr_step", description="MyEMRStepDescription", step_config=emr_step_config, cluster_config=emr_cluster_config )

有关指导您完成完整示例的示例笔记本,请参阅使用集群生命周期管理的 Pipelin SageMaker es EMR 步骤

Notebook Job 步骤

使用NotebookJobStep以非交互方式运行您的 SageMaker Notebook Job 作为工作流步骤。有关 SageMaker 笔记本作业的更多信息,请参阅SageMaker 笔记本职位

A 至少NotebookJobStep需要输入笔记本、图像 URI 和内核名称。有关 Notebook Job 步骤要求以及可以设置为自定义步骤的其他参数的更多信息,请参阅 sagemaker.workflow.steps。 NotebookJobStep

以下示例使用最少的参数来定义NotebookJobStep

from sagemaker.workflow.notebook_job_step import NotebookJobStep notebook_job_step = NotebookJobStep( input_notebook=input_notebook, image_uri=image_uri, kernel_name=kernel_name )

您的工作NotebookJobStep流步骤被视为 SageMaker 笔记本作业,因此,如果您在tags参数中包含特定标签,则可以在 Studio Classic UI 笔记本作业仪表板中跟踪执行状态。有关要包含的标签的更多详细信息,请参阅在 Studio 用户界面控制面板中查看你的笔记本作业

此外,如果您使用 SageMaker Python SDK 安排笔记本作业,则只能指定某些图像来运行笔记本作业。有关更多信息,请参阅 SageMakerPython SDK 笔记本作业的图像限制

失败步骤

当未达到所需条件或状态时,您可以使用停止 Amazon SageMaker 模型构建管道的执行,并将该管道的执行标记为失败。FailStepFailStep 还允许您输入自定义错误消息,指明管道执行失败的原因。

注意

FailStep 和其他管道步骤同时执行时,在所有并发步骤完成之前,管道不会终止。

使用 FailStep 时的限制

  • 不能将 FailStep 添加到其他步骤的 DependsOn 列表中。有关更多信息,请参阅 步骤之间的自定义依赖关系

  • 其他步骤无法引用 FailStep。它始终 是管道执行的最后一步。

  • 无法重试以 FailStep 结束的管道执行。

可以创建静态文本字符串形式的 FailStep ErrorMessage。或者,也可以使用管道参数联接操作)或其他步骤属性来创建更详细的错误消息。

以下示例代码片段使用了一个 FailStep(带有 ErrorMessage,并配置了管道参数)和一项 Join 操作。

from sagemaker.workflow.fail_step import FailStep from sagemaker.workflow.functions import Join from sagemaker.workflow.parameters import ParameterInteger mse_threshold_param = ParameterInteger(name="MseThreshold", default_value=5) step_fail = FailStep( name="AbaloneMSEFail", error_message=Join( on=" ", values=["Execution failed due to MSE >", mse_threshold_param] ), )

步骤属性

properties 属性用于在管道中的步骤之间添加数据依赖关系。然后,Pipelines 使用这些数据依赖关系根据 SageMaker 管道定义构建 DAG。这些属性可以作为占位符值引用,并在运行时解析。

Pi properties pelin SageMaker es 步骤的属性与相应 SageMaker 作业类型的Describe调用所返回的对象相匹配。对于每种作业类型,Describe 调用都会返回以下响应对象:

要在创建数据依赖关系期间查看每种步骤类型哪些属性是可引用的,请参阅 Amaz SageMaker on Python SDK 中的数据依赖关系-属性参考

步骤并行度

当一个步骤不依赖于任何其他步骤时,它会在管道执行后立即运行。但是,并行执行过多的管道步骤可能会很快耗尽可用资源。使用 ParallelismConfiguration 控制管道执行的并发步骤数。

以下示例使用 ParallelismConfiguration 将并发步骤数限制设置为 5。

pipeline.create( parallelism_config=ParallelismConfiguration(5), )

步骤之间的数据依赖关系

您可以通过指定步骤之间的数据关系来定义 DAG 的结构。要在步骤之间创建数据依赖关系,请将一个步骤的属性作为输入传递给管道中的另一个步骤。接收输入的步骤要等到提供输入的步骤完成运行后才会开始。

数据依赖关系使用以下格式的 JsonPath 符号。这种格式会遍历 JSON 属性文件,这意味着您可以根据需要添加尽可能多的 <property> 实例,以便在文件中找到所需的嵌套属性。有关 JsonPath 符号的更多信息,请参阅 JsonPath repo

<step_name>.properties.<property>.<property>

以下内容说明了如何使用处理步骤的 ProcessingOutputConfig 属性指定 Amazon S3 存储桶。

step_process.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri

要创建数据依赖关系,请按如下方式将存储桶传递给训练步骤。

from sagemaker.workflow.pipeline_context import PipelineSession sklearn_train = SKLearn(..., sagemaker_session=PipelineSession()) step_train = TrainingStep( name="CensusTrain", step_args=sklearn_train.fit(inputs=TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train_data"].S3Output.S3Uri )) )

要在创建数据依赖关系期间查看每种步骤类型哪些属性是可引用的,请参阅 Amaz SageMaker on Python SDK 中的数据依赖关系-属性参考

步骤之间的自定义依赖关系

当您指定数据依赖关系时,Pipelin SageMaker es 会提供步骤之间的数据连接。或者,一个步骤可以访问上一步的数据,而无需直接使用 Pipelin SageMaker es。在这种情况下,你可以创建一个自定义依赖关系,告诉 Pip SageMaker elines 在另一个步骤完成运行之前不要启动一个步骤。您可以通过指定步骤的 DependsOn 属性来创建自定义依赖关系。

例如,以下内容定义了步骤 C,该步骤在步骤 A 和步骤 B 都完成运行后才开始。

{ 'Steps': [ {'Name':'A', ...}, {'Name':'B', ...}, {'Name':'C', 'DependsOn': ['A', 'B']} ] }

SageMaker 如果依赖关系会创建循环依赖关系,则管道会抛出验证异常。

以下示例创建了一个在处理步骤完成运行后开始的训练步骤。

processing_step = ProcessingStep(...) training_step = TrainingStep(...) training_step.add_depends_on([processing_step])

以下示例创建了一个训练步骤,该步骤要等到两个不同的处理步骤完成运行后才会开始。

processing_step_1 = ProcessingStep(...) processing_step_2 = ProcessingStep(...) training_step = TrainingStep(...) training_step.add_depends_on([processing_step_1, processing_step_2])

下面提供了另一种创建自定义依赖关系的方法。

training_step.add_depends_on([processing_step_1]) training_step.add_depends_on([processing_step_2])

以下示例创建了一个训练步骤,该步骤接收来自一个处理步骤的输入,然后等待另一个处理步骤完成运行。

processing_step_1 = ProcessingStep(...) processing_step_2 = ProcessingStep(...) training_step = TrainingStep( ..., inputs=TrainingInput( s3_data=processing_step_1.properties.ProcessingOutputConfig.Outputs[ "train_data" ].S3Output.S3Uri ) training_step.add_depends_on([processing_step_2])

以下示例说明如何检索步骤的自定义依赖关系的字符串列表。

custom_dependencies = training_step.depends_on

在步骤中使用自定义映像

在管道中创建步骤时,您可以使用任何可用的 SageMaker深度学习容器镜像

您还可以在管道步骤中使用自己的容器。由于您无法在 Amazon SageMaker Studio Classic 中创建图像,因此必须使用其他方法创建图像,然后才能将其与亚马逊 SageMaker 模型构建管道一起使用。

要在为管道创建步骤时使用自己的容器,请在估算器定义中包含映像 URI。有关将自己的容器与一起使用的更多信息 SageMaker,请参阅将 Docker 容器与一起 SageMaker使用