管道步骤 - Amazon SageMaker
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

管道步骤

SageMaker 管道由步骤组成。这些步骤定义管道执行的操作以及使用属性的步骤之间的关系。

步骤类型

下面介绍了每个步骤类型的要求,并提供了步骤的示例实现。这些不是功能性实现,因为它们不提供所需的资源和输入。有关实现这些步骤的教程,请参阅创建和管理 SageMaker 管道.

Amazon SageMaker 模型构建管道支持以下步骤类型:

处理步骤

您可以使用处理步骤创建用于数据处理的处理作业。有关处理任务的详细信息,请参阅处理数据和评估模型.

处理步骤需要处理器、定义处理代码的 Python 脚本、用于处理的输出以及作业参数。以下示例显示了如何在ProcessingStep定义。有关处理步骤要求的更多信息,请参阅制造商. 工作流程. 步骤. 处理步骤文档中)。

from sagemaker.sklearn.processing import SKLearnProcessor sklearn_processor = SKLearnProcessor(framework_version='0.20.0', role=<role>, instance_type='ml.m5.xlarge', instance_count=1)
from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep step_process = ProcessingStep( name="AbaloneProcess", processor=sklearn_processor, inputs=[ ProcessingInput(source=<input_data>, destination="/opt/ml/processing/input"), ], outputs=[ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ], code="abalone/preprocessing.py" )

传递运行时参数

您可以使用获取运行参数方法Amazon SageMaker Python 开发工具包. 这允许您使用除SKLearnProcessor例如PySparkProcessorSparkJarProcessor.

以下示例显示如何将运行时参数从 PySpark 处理器传递到ProcessingStep.

from sagemaker.spark.processing import PySparkProcessor pyspark_processor = PySparkProcessor(framework_version='2.4', role=<role>, instance_type='ml.m5.xlarge', instance_count=1)
from sagemaker.processing import ProcessingInput, ProcessingOutput run_args = pyspark_processor.get_run_args( "preprocess.py", inputs=[ ProcessingInput(source=<input_data>, destination="/opt/ml/processing/input"), ], outputs=[ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ], arguments=None )
from sagemaker.workflow.steps import ProcessingStep step_process = ProcessingStep( name="AbaloneProcess", processor=pyspark_processor, inputs=run_args.inputs, outputs=run_args.outputs, job_arguments=run_args.arguments, code=run_args.code )

训练步骤

您可以使用训练步骤创建训练作业来训练模型。有关培训任务的更多信息,请参阅与 Amazon SageMaker 一起训练模型.

训练步骤需要一个估算器以及培训和验证数据输入。以下示例显示了如何在TrainingStep定义。有关培训步骤要求的更多信息,请参阅制造商. 工作流程. 步骤. 培训步骤文档中)。

from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TrainingStep step_train = TrainingStep( name="TrainAbaloneModel", estimator=xgb_train, inputs={ "train": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv" ), "validation": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv" ) } )

优化步骤

您可以使用优化步骤创建超参数优化作业,也称为超参数优化 (HPO)。超参数调整作业运行多个培训作业,每个作业都生成一个模型版本。有关超参数优化的更多信息,请参阅。使用 SageMaker 执行自动模型优化.

调整作业与管道的 SageMaker 实验相关联,培训作业是作为试验创建的。有关更多信息,请参阅实验集成

调整步骤需要HyperparameterTuner以及培训投入. 您可以通过指定warm_start_configParameterHyperparameterTuner. 有关超参数调整和热启动的详细信息,请参阅运行热启动超参数优化作业.

您可以使用获取顶部模型 _ S3_ URI方法制造商. 工作流程. 步骤. 调整步骤类从性能最佳的模型版本之一获取模型工件。有关展示如何在 SageMaker 管道中使用优化步骤的笔记本,请参阅制造商-管线-调谐步骤 p.ipynb.

重要

Amazon SageMaker Python 软件开发工具包 v2.48.0 和亚马逊 SageMaker 工作室 v3.8.0 中引入了调整步骤。您必须在使用调整步骤之前更新 Studio,否则管道 DAG 不会显示。若要更新 Studio,请参阅。更新 SageMaker Studio.

以下示例显示了如何在TuningStep定义。

from sagemaker.tuner import HyperparameterTuner from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TuningStep step_tuning = TuningStep( name = "HPTuning", tuner = HyperparameterTuner(...), inputs = TrainingInput(s3_data=input_path) )

获取最佳模型版本

以下示例显示如何使用调整作业中的最佳模型版本get_top_model_s3_uri方法。最多,排名前 50 的性能版本根据超参数调节节节目. 这些区域有:top_k参数是版本的索引,其中top_k=0是性能最佳的版本,top_k=49是性能最差的版本。

best_model = Model( image_uri=image_uri, model_data=step_tuning.get_top_model_s3_uri( top_k=0, s3_bucket=sagemaker_session.default_bucket() ), ... )

有关调整步骤要求的更多信息,请参阅制造商. 工作流程. 步骤. 调整步骤文档中)。

CreateModel 步骤

您可以使用创建模型步骤创建 SageMaker 模型。有关 SageMaker 模型的更多信息,请参阅与 Amazon SageMaker 一起训练模型.

创建模型步骤需要模型工件以及需要用于创建模型的 SageMaker 实例类型的信息。以下示例显示了如何在CreateModel步骤定义。有关CreateModel步骤要求,请参阅制造商. 工作流. 步骤. 创建模型步骤文档中)。

from sagemaker.workflow.steps import CreateModelStep step_create_model = CreateModelStep( name="AbaloneCreateModel", model=best_model, inputs=inputs )

注册模型步骤

您可以使用寄存器模型步骤注册萨格制造商. 型号. 型号制造商、管线、管线模型使用亚 Amazon SageMaker 型号注册表。APipelineModel代表推理管道,它是由处理推理请求的线性容器序列组成的模型。

有关如何注册模型的更多信息,请参阅使用模型注册表注册表部署模型. 有关RegisterModel步骤要求,请参阅制造商工作流程步骤收藏注册模型文档中)。

以下示例显示了如何创建一个寄存器模型步骤,该步骤将PipelineModel.

from sagemaker.mxnet.model import SKLearnModel from sagemaker.mxnet.model import XGBoostModel sklearn_model = SKLearnModel(...) xgboost_model = XGBoostModel(...) from sagemaker.workflow.step_collections import RegisterModel from sagemaker import PipelineModel pipeline_model = PipelineModel( models=[sklearn_model,xgboost_model], role=role, sagemaker_session=sagemaker_session ) step_register = RegisterModel( name="AbaloneRegisterModel", model=pipeline_model, ... )

如果model,寄存器模型步骤需要估计器,如下例所示。

from sagemaker.workflow.step_collections import RegisterModel step_register = RegisterModel( name="AbaloneRegisterModel", estimator=xgb_train, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, content_types=["text/csv"], response_types=["text/csv"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name=model_package_group_name, approval_status=model_approval_status, model_metrics=model_metrics )

转换步骤

您可以使用批量转换的转换步骤对整个数据集运行推理。有关批量转换的详细信息,请参阅通过推理管道运行批量转换.

转换步骤需要一个变换器和数据来运行批量转换。以下示例显示了如何在Transform步骤定义。有关Transform步骤要求,请参阅制造商. 工作流. 步骤. 转换步骤. 文档中)。

from sagemaker.inputs import TransformInput from sagemaker.workflow.steps import TransformStep step_transform = TransformStep( name="AbaloneTransform", transformer=transformer, inputs=TransformInput(data=batch_data) )

状况步骤

您可以使用条件步骤评估步骤属性的条件,以评估下一步应在管道中执行哪些操作。

条件步骤需要条件列表以及条件计算结果为true以及条件计算结果为false. 以下示例显示了如何在Condition步骤定义。有关Condition步骤要求,请参阅制造商。工作流程条件 _ 步骤条件步骤文档中)。

限制

  • SageMaker 管道不支持使用嵌套条件步骤。不能将条件步骤作为另一条件步骤的输入传递。

  • 条件步骤不能在两个分支中使用相同的步骤。如果您在两个分支中需要相同的步骤功能,请复制该步骤并为其指定不同的名称。

from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo from sagemaker.workflow.condition_step import ( ConditionStep, JsonGet ) cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step=step_eval, property_file=evaluation_report, json_path="regression_metrics.mse.value" ), right=6.0 ) step_cond = ConditionStep( name="AbaloneMSECond", conditions=[cond_lte], if_steps=[step_register, step_create_model, step_transform], else_steps=[] )

回调步骤

您可以使用回调步骤将其他进程和Amazon服务添加到您的工作流程中,这些服务并非由 Amazon SageMaker 模型构建管道直接提供。当回调步骤运行时,会发生以下过程:

  • SageMaker 管道向买家指定的 Amazon Simple Queue Service (Amazon SQS) 队列发送消息。该消息包含一个 SageMaker 管道生成的令牌和一个客户提供的输入参数列表。发送消息后,SageMaker 管道将等待客户的回复。

  • 客户从 Amazon SQS 队列检索消息并启动自定义流程。

  • 流程完成后,客户将调用以下 API 之一并提交 SageMaker 管道生成的令牌:

  • API 调用会导致 SageMaker 管道继续管道执行或执行失败。

有关Callback步骤要求,请参阅制造者. 工作流. 回调 _ 步骤. 回调步骤文档中)。有关完整的解决方案,请参阅扩展 SageMaker 管道,使用回调步骤包括自定义步骤.

重要

回调步骤在亚 Amazon SageMaker Python SDK v2.45.0 和亚马逊 SageMaker 工作室版本 3.6.2 中引入。您必须在使用回调步骤之前更新 Studio,否则管道 DAG 不会显示。若要更新 Studio,请参阅。更新 SageMaker Studio.

以下示例演示了上述过程的实施情况。

from sagemaker.workflow.callback_step import CallbackStep step_callback = CallbackStep( name="MyCallbackStep", sqs_queue_url="https://sqs.us-east-2.amazonaws.com/012345678901/MyCallbackQueue", inputs={...}, outputs=[...] ) callback_handler_code = ' import boto3 import json def handler(event, context): sagemaker_client=boto3.client("sagemaker") for record in event["Records"]: payload=json.loads(record["body"]) token=payload["token"] # Custom processing # Call SageMaker to complete the step sagemaker_client.send_pipeline_execution_step_success( CallbackToken=token, OutputParameters={...} ) '

停止行为

在回调步骤运行时,管道执行不会停止。

当您调用StopPipelineExecution时,SageMaker 管道会向指定的 SQS 队列发送额外的 Amazon SQS 消息。SQS 消息的正文包含一个 “状态” 字段,该字段设置为 “停止”。下面显示了 SQS 消息正文示例。

{ "token": "26vcYbeWsZ", "pipelineExecutionArn": "arn:aws:sagemaker:us-east-2:012345678901:pipeline/callback-pipeline/execution/7pinimwddh3a", "arguments": { "number": 5, "stringArg": "some-arg", "inputData": "s3://sagemaker-us-west-2-012345678901/abalone/abalone-dataset.csv" }, "status": "Stopping" }

您应向 Amazon SQS 消息使用器添加逻辑,以便在收到消息后执行任何必要的操作(例如,资源清理),然后调用SendPipelineExecutionStepSuccess或者SendPipelineExecutionStepFailure.

只有当 SageMaker 管道收到其中一个调用时,它才会停止管道执行。

Lambda 步骤

您可以使用 lambda 步骤来运行Amazon Lambdafunction. 您可以运行现有的 Lambda 函数,或者 SageMaker 可以创建并运行一个新的 Lambda 函数。有关展示如何在 SageMaker 管道中使用 lambda 步骤的笔记本,请参阅制造商-管线-lambda-步骤 p.ipynb.

重要

Amazon SageMaker Python 软件开发工具包 v2.51.0 和亚马逊 SageMaker 工作室 v3.9.1 中引入了 Lambda 步骤。您必须在使用 lambda 步骤之前更新 Studio,否则管道 DAG 不会显示。若要更新 Studio,请参阅。更新 SageMaker Studio.

SageMaker 提供萨格制造商. 兰姆达 _ 助手 Lambda类来创建、更新、调用和删除 Lambda 函数。Lambda具有以下签名。

Lambda( function_arn, # Only required argument to invoke an existing Lambda function # The following arguments are required to create a Lambda function: function_name, execution_role_arn, zipped_code_dir, # Specify either zipped_code_dir and s3_bucket, OR script s3_bucket, # S3 bucket where zipped_code_dir is uploaded script, # Path of Lambda function script handler, # Lambda handler specified as "lambda_script.lambda_handler" timeout, # Maximum time the Lambda function can run before the lambda step fails ... )

这些区域有:制造商. 工作流程. Lambda_ 步骤.类有一个lambda_func参数类型Lambda. 要调用现有的 Lambda 函数,唯一的要求是将函数的 Amazon 资源名称 (ARN) 提供给function_arn. 如果您没有为function_arn,则必须指定handler和以下操作之一:

  • zipped_code_dir— 压缩的 Lambda 函数的路径

    s3_bucket— Amazon S3 存储桶zipped_code_dir将被上传

  • script— Lambda 函数脚本文件的路径

以下示例显示了如何在Lambda步骤定义,它调用现有的 Lambda 函数。

from sagemaker.workflow.lambda_step import LambdaStep from sagemaker.lambda_helper import Lambda step_lambda = LambdaStep( name="ProcessingLambda", lambda_func=Lambda( function_arn="arn:aws:lambda:us-west-2:012345678910:function:split-dataset-lambda" ), inputs={ s3_bucket = s3_bucket, data_file = data_file }, outputs=[ "train_file", "test_file" ] )

以下示例显示了如何在Lambda步骤定义,它使用 Lambda 函数脚本创建和调用 Lambda 函数。

from sagemaker.workflow.lambda_step import LambdaStep from sagemaker.lambda_helper import Lambda step_lambda = LambdaStep( name="ProcessingLambda", lambda_func=Lambda( function_name="split-dataset-lambda", execution_role_arn=execution_role_arn, script="lambda_script.py", handler="lambda_script.lambda_handler", ... ), inputs={ s3_bucket = s3_bucket, data_file = data_file }, outputs=[ "train_file", "test_file" ] )

超时和停止行为

这些区域有:Lambda类有一个timeout参数,该参数指定 Lambda 函数可以运行的最长时间。默认值为 120 秒,最大值为 10 分钟。如果 Lambda 函数在达到超时时运行,则 lambda 步骤将失败;但是,Lambda 函数继续运行。

在 lambda 步骤运行时,无法停止管道执行,因为 lambda 步骤调用的 Lambda 函数无法停止。如果您尝试在 Lambda 函数运行时停止执行,管道将等待 Lambda 函数完成或直到超时被击中(以先发生者为准),然后停止。如果 Lambda 函数完成,则管道执行状态为Stopped. 如果超时被击中,则管道执行状态为Failed.

步骤属性

这些区域有:properties属性用于添加管道中的步骤之间的数据依赖关系。然后,SageMaker 管道使用这些数据相关性从管线定义构建 DAG。这些属性可以作为占位符值引用,并在运行时解析。

这些区域有:properties属性与 SageMaker 管道步骤返回的对象匹配Describe调用相应的 SageMaker 作业类型。对于每种作业类型,Describe调用返回以下响应对象:

步骤之间的数据相关性

您可以通过指定步骤之间的数据关系来定义 DAG 的结构。要在步骤之间创建数据依赖关系,请将一个步骤的属性作为输入传递给管道中的另一个步骤。接收输入的步骤直到提供输入的步骤完成执行之后才开始。

数据依赖关系使用以下格式的 JSONPath 表示法。此格式遍历 JSON 属性文件,这意味着您可以追加尽可能多的<property>实例来达到文件中所需的嵌套属性。有关 JSONPath 表示法的更多信息,请参阅JsonPath 回购.

<step_name>.properties.<property>.<property>

以下介绍了如何使用 Amazon S3 存储桶指定 Amazon S3 存储桶。ProcessingOutputConfig属性。

step_process.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri

要创建数据依赖关系,请将存储桶传递给训练步骤,如下所示。

step_train = TrainingStep( name="CensusTrain", estimator=sklearn_train, inputs=TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train_data"].S3Output.S3Uri ) )

步骤间的自定义依赖项

指定数据相关性时,SageMaker 管道将提供步骤之间的数据连接。或者,一个步骤可以访问上一步中的数据,而无需直接使用 SageMaker 管道。在这种情况下,您可以创建一个自定义依赖关系,告诉 SageMaker 管道在另一个步骤完成执行之前不要启动某个步骤。您可以通过指定步骤的DependsOn属性。

作为一个例子,以下定义了一个步骤C仅在两个步骤之后启动A和步骤B完成执行。

{ 'Steps': [ {'Name':'A', ...}, {'Name':'B', ...}, {'Name':'C', 'DependsOn': ['A', 'B']} ] }

如果依赖关系会创建循环依赖关系,SageMaker 管道会引发验证异常。

以下示例创建一个在处理步骤完成执行后开始的训练步骤。

processing_step = ProcessingStep(...) training_step = TrainingStep(...) training_step.add_depends_on([processing_step])

以下示例创建一个训练步骤,该步骤在两个不同的处理步骤完成执行后才开始。

processing_step_1 = ProcessingStep(...) processing_step_2 = ProcessingStep(...) training_step = TrainingStep(...) training_step.add_depends_on([processing_step_1, processing_step_2])

以下提供了一种创建自定义依赖关系的替代方法。

training_step.add_depends_on([processing_step_1]) training_step.add_depends_on([processing_step_2])

以下示例创建一个训练步骤,该步骤接收来自一个处理步骤的输入,并等待另一个处理步骤完成执行。

processing_step_1 = ProcessingStep(...) processing_step_2 = ProcessingStep(...) training_step = TrainingStep( ..., inputs=TrainingInput( s3_data=processing_step_1.properties.ProcessingOutputConfig.Outputs[ "train_data" ].S3Output.S3Uri ) training_step.add_depends_on([processing_step_2])

以下示例说明如何检索某个步骤的自定义依赖关系的字符串列表。

custom_dependencies = training_step.depends_on

在步骤中使用自定义映像

您可以使用任何可用的 SageMaker深度学习容器映像在管道中创建步骤时。

您还可以使用 SageMaker S3 应用程序创建步骤。SageMaker S3 应用程序是一个 tar.gz 捆绑包,其中包含一个或多个可以在该捆绑包中运行的 Python 脚本。有关应用程序包捆绑的更多信息,请参阅直接从模型项目部署.

您还可以将自己的容器与管道步骤使用。由于您无法在 SageMaker Studio 中创建图像,因此必须先使用其他方法创建图像,然后才能将其与 Amazon SageMaker 模型构建管道一起使用。

要在为管道创建步骤时使用自己的容器,请在估计器定义中包含图像 URI。有关将您自己的容器与 SageMaker 搭配使用的更多信息,请参阅将码头容器与 SageMaker 一起使用.