定义管道 - Amazon SageMaker
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

定义管道

要使用 Amazon SageMaker 建模管线编排工作流,您需要以 JSON 管道定义的形式生成有向无环图 (DAG)。下图是您在本教程中创建的管道 DAG 的表示形式:

您可以使用 SageMaker Python SDK 生成 JSON 管道定义。以下教程展示了如何为解决回归问题的管道生成管道定义,以根据鲍鱼的物理测量值确定其年龄。有关可供运行且包含本教程内容的 Jupyter 笔记本,请参阅使用 Amazon SageMaker 建模管线编排作业

先决条件

要运行以下教程,必须执行以下操作:

  • 按照创建笔记本实例中所述的步骤设置笔记本实例。这样,您的角色就可以读取和写入 Amazon S3,以及在 SageMaker 中创建训练、批量转换和处理作业。

  • 授予笔记本获取和传递自身角色的权限,如修改角色权限策略中所示。添加以下 JSON 代码片段以将此策略附加到您的角色。将 <your-role-arn> 替换为用于创建笔记本实例的 ARN。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "iam:GetRole", "iam:PassRole" ], "Resource": "<your-role-arn>" } ] }
  • 按照修改角色信任策略中的步骤信任 SageMaker 服务主体。将以下语句片段添加到角色的信任关系中:

    { "Sid": "", "Effect": "Allow", "Principal": { "Service": "sagemaker.amazonaws.com" }, "Action": "sts:AssumeRole" }

设置环境

使用以下代码块创建新的 SageMaker 会话。这将返回会话的角色 ARN。此角色 ARN 应是您设置为先决条件的执行角色 ARN。

import boto3 import sagemaker import sagemaker.session region = boto3.Session().region_name sagemaker_session = sagemaker.session.Session() role = sagemaker.get_execution_role() default_bucket = sagemaker_session.default_bucket() model_package_group_name = f"AbaloneModelPackageGroupName"

创建管道

在 SageMaker 笔记本实例中运行以下步骤来创建管道,包括预处理、训练、评估、条件评估和模型注册等步骤。

步骤 1:下载数据集

此笔记本使用 UCI 机器学习鲍鱼数据集。该数据集包含以下特征:

  • length - 鲍鱼外壳最长测量值。

  • diameter - 垂直于长度方向的鲍鱼直径。

  • height - 带肉鲍鱼在壳内的高度。

  • whole_weight - 整只鲍鱼的重量。

  • shucked_weight - 从鲍鱼身上取出的肉的重量。

  • viscera_weight - 鲍鱼内脏出血后的重量。

  • shell_weight - 去肉和干燥后鲍鱼壳的重量。

  • sex - 鲍鱼的性别。“M”、“F”或“I”中的一个,其中“I”是幼鲍。

  • rings - 鲍鱼壳上的环数。

鲍鱼壳上的环数是其年龄的近似值,计算公式为 age=rings + 1.5。然而,获取这个数字是一项耗时的任务。您必须从锥体上切壳,将切面染色,然后通过显微镜计算环数。不过,其他物理测量值比较容易确定。此笔记本使用该数据集,利用其他物理测量值来构建 rings 变量的预测模型。

下载数据集
  1. 将数据集下载到您账户的默认 Amazon S3 存储桶中。

    !mkdir -p data local_path = "data/abalone-dataset.csv" s3 = boto3.resource("s3") s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file( "dataset/abalone-dataset.csv", local_path ) base_uri = f"s3://{default_bucket}/abalone" input_data_uri = sagemaker.s3.S3Uploader.upload( local_path=local_path, desired_s3_uri=base_uri, ) print(input_data_uri)
  2. 创建模型后,下载第二个数据集进行批量转换。

    local_path = "data/abalone-dataset-batch.csv" s3 = boto3.resource("s3") s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file( "dataset/abalone-dataset-batch", local_path ) base_uri = f"s3://{default_bucket}/abalone" batch_data_uri = sagemaker.s3.S3Uploader.upload( local_path=local_path, desired_s3_uri=base_uri, ) print(batch_data_uri)

步骤 2:定义管道参数

此代码块为您的管道定义了以下参数:

  • processing_instance_count - 处理作业的实例数。

  • input_data - 输入数据在 Amazon S3 中的位置。

  • batch_data - 用于批量转换的输入数据在 Amazon S3 中的位置。

  • model_approval_status - 为 CI/CD 注册已训练模型的批准状态。有关更多信息,请参阅 使用 SageMaker 项目自动执行 MLOps

from sagemaker.workflow.parameters import ( ParameterInteger, ParameterString, ) processing_instance_count = ParameterInteger( name="ProcessingInstanceCount", default_value=1 ) model_approval_status = ParameterString( name="ModelApprovalStatus", default_value="PendingManualApproval" ) input_data = ParameterString( name="InputData", default_value=input_data_uri, ) batch_data = ParameterString( name="BatchData", default_value=batch_data_uri, )

步骤 3:定义特征工程的处理步骤

本节介绍如何创建一个处理步骤,从数据集中准备用于训练的数据。

创建处理步骤
  1. 为处理脚本创建目录。

    !mkdir -p abalone
  2. /abalone 目录中创建一个包含以下内容的名为 preprocessing.py 的文件。此预处理脚本将传递到处理步骤,用于执行输入数据。然后,训练步骤使用预处理的训练特征和标签来训练模型,评估步骤使用经过训练的模型和预处理的测试特征和标签来评估模型。该脚本使用 scikit-learn 执行以下操作:

    • 填入缺失的 sex 分类数据并对其进行编码,使其适合训练。

    • 缩放和标准化除 ringssex 之外的所有数值字段。

    • 将数据拆分为训练、测试和验证数据集。

    %%writefile abalone/preprocessing.py import argparse import os import requests import tempfile import numpy as np import pandas as pd from sklearn.compose import ColumnTransformer from sklearn.impute import SimpleImputer from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler, OneHotEncoder # Because this is a headerless CSV file, specify the column names here. feature_columns_names = [ "sex", "length", "diameter", "height", "whole_weight", "shucked_weight", "viscera_weight", "shell_weight", ] label_column = "rings" feature_columns_dtype = { "sex": str, "length": np.float64, "diameter": np.float64, "height": np.float64, "whole_weight": np.float64, "shucked_weight": np.float64, "viscera_weight": np.float64, "shell_weight": np.float64 } label_column_dtype = {"rings": np.float64} def merge_two_dicts(x, y): z = x.copy() z.update(y) return z if __name__ == "__main__": base_dir = "/opt/ml/processing" df = pd.read_csv( f"{base_dir}/input/abalone-dataset.csv", header=None, names=feature_columns_names + [label_column], dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype) ) numeric_features = list(feature_columns_names) numeric_features.remove("sex") numeric_transformer = Pipeline( steps=[ ("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler()) ] ) categorical_features = ["sex"] categorical_transformer = Pipeline( steps=[ ("imputer", SimpleImputer(strategy="constant", fill_value="missing")), ("onehot", OneHotEncoder(handle_unknown="ignore")) ] ) preprocess = ColumnTransformer( transformers=[ ("num", numeric_transformer, numeric_features), ("cat", categorical_transformer, categorical_features) ] ) y = df.pop("rings") X_pre = preprocess.fit_transform(df) y_pre = y.to_numpy().reshape(len(y), 1) X = np.concatenate((y_pre, X_pre), axis=1) np.random.shuffle(X) train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))]) pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False) pd.DataFrame(validation).to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False) pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)
  3. 创建要传递到处理步骤的 SKLearnProcessor 的实例。

    from sagemaker.sklearn.processing import SKLearnProcessor framework_version = "0.23-1" sklearn_processor = SKLearnProcessor( framework_version=framework_version, instance_type="ml.m5.xlarge", instance_count=processing_instance_count, base_job_name="sklearn-abalone-process", role=role, )
  4. 创建处理步骤。此步骤采用 SKLearnProcessor、输入和输出通道以及您创建的 preprocessing.py 脚本。这与 SageMaker Python SDK 中处理器实例的 run 方法非常相似。传入 ProcessingStepinput_data 参数是步骤本身的输入数据。处理器实例运行时会使用这些输入数据。

    请注意在处理作业的输出配置中指定的 "train"validation"test" 命名通道。诸如此类的步骤 Properties 可在后续步骤中使用,并在执行时解析为其运行时值。

    from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep step_process = ProcessingStep( name="AbaloneProcess", processor=sklearn_processor, inputs=[ ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), ], outputs=[ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ], code="abalone/preprocessing.py", )

步骤 4:定义训练步骤

本节介绍如何使用 SageMaker XGBoost 算法根据处理步骤输出的训练数据训练模型。

定义训练步骤
  1. 指定要保存训练模型的模型路径。

    model_path = f"s3://{default_bucket}/AbaloneTrain"
  2. 为 XGBoost 算法和输入数据集配置估算器。训练实例类型传递到估算器中。一个典型的训练脚本从输入通道加载数据,使用超参数配置训练,训练模型,并将模型保存到 model_dir 以便日后托管。SageMaker 在训练作业结束时以 model.tar.gz 形式将模型上传到 Amazon S3。

    from sagemaker.estimator import Estimator image_uri = sagemaker.image_uris.retrieve( framework="xgboost", region=region, version="1.0-1", py_version="py3", instance_type="ml.m5.xlarge" ) xgb_train = Estimator( image_uri=image_uri, instance_type="ml.m5.xlarge", instance_count=1, output_path=model_path, role=role, ) xgb_train.set_hyperparameters( objective="reg:linear", num_round=50, max_depth=5, eta=0.2, gamma=4, min_child_weight=6, subsample=0.7, silent=0 )
  3. 使用估算器实例和 ProcessingStep 的属性创建 TrainingStep。特别是,将 "train""validation" 输出通道的 S3Uri 传递到 TrainingStep。 

    from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TrainingStep step_train = TrainingStep( name="AbaloneTrain", estimator=xgb_train, inputs={ "train": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv" ), "validation": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv" ) }, )

步骤 5:定义模型评估的处理步骤

本节介绍如何创建处理步骤以评估模型的精度。在条件步骤中使用此模型评估的结果来确定要采用哪条执行路径。

定义模型评估的处理步骤
  1. 在名为 evaluation.py/abalone 目录中创建一个文件。此脚本在处理步骤中用于执行模型评估。它以经过训练的模型和测试数据集作为输入,然后生成包含分类评估指标的 JSON 文件。

    %%writefile abalone/evaluation.py import json import pathlib import pickle import tarfile import joblib import numpy as np import pandas as pd import xgboost from sklearn.metrics import mean_squared_error if __name__ == "__main__": model_path = f"/opt/ml/processing/model/model.tar.gz" with tarfile.open(model_path) as tar: tar.extractall(path=".") model = pickle.load(open("xgboost-model", "rb")) test_path = "/opt/ml/processing/test/test.csv" df = pd.read_csv(test_path, header=None) y_test = df.iloc[:, 0].to_numpy() df.drop(df.columns[0], axis=1, inplace=True) X_test = xgboost.DMatrix(df.values) predictions = model.predict(X_test) mse = mean_squared_error(y_test, predictions) std = np.std(y_test - predictions) report_dict = { "regression_metrics": { "mse": { "value": mse, "standard_deviation": std }, }, } output_dir = "/opt/ml/processing/evaluation" pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True) evaluation_path = f"{output_dir}/evaluation.json" with open(evaluation_path, "w") as f: f.write(json.dumps(report_dict))
  2. 创建 ScriptProcessor 的实例,用于创建 ProcessingStep

    from sagemaker.processing import ScriptProcessor script_eval = ScriptProcessor( image_uri=image_uri, command=["python3"], instance_type="ml.m5.xlarge", instance_count=1, base_job_name="script-abalone-eval", role=role, )
  3. 使用处理器实例、输入和输出通道以及 evaluation.py 脚本创建 ProcessingStep。特别是,从 step_train 训练步骤传入 S3ModelArtifacts 属性,以及 step_process 处理步骤的 "test" 输出通道的 S3Uri。这与 SageMaker Python SDK 中处理器实例的 run 方法非常相似。 

    from sagemaker.workflow.properties import PropertyFile evaluation_report = PropertyFile( name="EvaluationReport", output_name="evaluation", path="evaluation.json" ) step_eval = ProcessingStep( name="AbaloneEval", processor=script_eval, inputs=[ ProcessingInput( source=step_train.properties.ModelArtifacts.S3ModelArtifacts, destination="/opt/ml/processing/model" ), ProcessingInput( source=step_process.properties.ProcessingOutputConfig.Outputs[ "test" ].S3Output.S3Uri, destination="/opt/ml/processing/test" ) ], outputs=[ ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"), ], code="abalone/evaluation.py", property_files=[evaluation_report], )

步骤 6:定义用于批量转换的 CreateModelStep

重要

我们建议从 SageMaker Python SDK v2.90.0 开始使用 模型步骤 创建模型。CreateModelStep 将继续在先前版本的 SageMaker Python SDK 中运行,但不再受到主动支持。

本节介绍如何使用训练步骤的输出创建 SageMaker 模型。此模型用于对新数据集进行批量转换。此步骤将传入条件步骤,并且仅在条件步骤评估为 true 时才会执行。

定义用于批量转换的 CreateModelStep
  1. 创建 SageMaker 模型。从 step_train 训练步骤传入 S3ModelArtifacts 属性。

    from sagemaker.model import Model model = Model( image_uri=image_uri, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, sagemaker_session=sagemaker_session, role=role, )
  2. 为 SageMaker 模型定义模型输入。

    from sagemaker.inputs import CreateModelInput inputs = CreateModelInput( instance_type="ml.m5.large", accelerator_type="ml.eia1.medium", )
  3. 使用您定义的 CreateModelInput 和 SageMaker 模型实例创建 CreateModelStep

    from sagemaker.workflow.steps import CreateModelStep step_create_model = CreateModelStep( name="AbaloneCreateModel", model=model, inputs=inputs, )

步骤 7:定义 TransformStep 以执行批量转换

本节介绍如何在模型训练完毕后创建 TransformStep 以对数据集执行批量转换。此步骤将传入条件步骤,并且仅在条件步骤评估为 true 时才会执行。

定义 TransformStep 以执行批量转换
  1. 使用适当的计算实例类型、实例数量和所需的输出 Amazon S3 存储桶 URI 创建转换器实例。从 step_create_model CreateModel 步骤传入 ModelName 属性。

    from sagemaker.transformer import Transformer transformer = Transformer( model_name=step_create_model.properties.ModelName, instance_type="ml.m5.xlarge", instance_count=1, output_path=f"s3://{default_bucket}/AbaloneTransform" )
  2. 使用您定义的转换器实例和 batch_data 管道参数创建 TransformStep

    from sagemaker.inputs import TransformInput from sagemaker.workflow.steps import TransformStep step_transform = TransformStep( name="AbaloneTransform", transformer=transformer, inputs=TransformInput(data=batch_data) )

第 8 步:定义 RegisterModel 步骤以创建模型包

重要

我们建议从 SageMaker Python SDK v2.90.0 开始使用 模型步骤 注册模型。RegisterModel 将继续在先前版本的 SageMaker Python SDK 中运行,但不再受到主动支持。

本节介绍如何构造 RegisterModel 的实例。在管道中执行 RegisterModel 会得到一个模型包。模型包是一种可重复使用的模型构件抽象,它封装了推理所需的所有要素。它由一个定义要使用的推理映像的推理规范和一个可选的模型权重位置组成。模型包组是模型包的集合。您可以对 SageMaker Pipelines 使用 ModelPackageGroup,以便在每次执行管道时向该组添加新版本和模型包。有关模型注册表的更多信息,请参阅使用模型注册表注册和部署模型

此步骤将传入条件步骤,并且仅在条件步骤评估为 true 时才会执行。

定义 RegisterModel 步骤以创建模型包
  • 使用您用于训练步骤的估算器实例构造一个 RegisterModel 步骤。从 step_train 训练步骤传入 S3ModelArtifacts 属性并指定 ModelPackageGroup。SageMaker Pipelines 为您创建此 ModelPackageGroup

    from sagemaker.model_metrics import MetricsSource, ModelMetrics from sagemaker.workflow.step_collections import RegisterModel model_metrics = ModelMetrics( model_statistics=MetricsSource( s3_uri="{}/evaluation.json".format( step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"] ), content_type="application/json" ) ) step_register = RegisterModel( name="AbaloneRegisterModel", estimator=xgb_train, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, content_types=["text/csv"], response_types=["text/csv"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name=model_package_group_name, approval_status=model_approval_status, model_metrics=model_metrics )

步骤 9:定义条件步骤以验证模型精度

ConditionStep 允许 SageMaker Pipelines 根据步骤属性的条件在管道 DAG 中支持条件执行。在这种情况下,只有在模型评估步骤确定的模型精度超过所需值时,才需要注册模型包。如果精度超过所需值,管道也会创建 SageMaker 模型并对数据集运行批量转换。本节介绍如何定义条件步骤。

定义条件步骤以验证模型精度
  1. 使用模型评估处理步骤 step_eval 的输出中找到的精度值定义 ConditionLessThanOrEqualTo 条件。使用您在处理步骤中编制索引的属性文件以及均方误差值 "mse" 的相应 JSONPath 来获取此输出。

    from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.functions import JsonGet cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step_name=step_eval.name, property_file=evaluation_report, json_path="regression_metrics.mse.value" ), right=6.0 )
  2. 构造一个 ConditionStep。传入 ConditionEquals 条件,如果条件通过,则将模型包注册和批量转换步骤设置为后续步骤。

    step_cond = ConditionStep( name="AbaloneMSECond", conditions=[cond_lte], if_steps=[step_register, step_create_model, step_transform], else_steps=[], )

步骤 10:创建管道

现在,您已经创建了所有步骤,请将它们组合成一个管道。

创建管道
  1. 为您的管道定义以下内容:nameparameterssteps。名称在 (account, region) 对中必须唯一。

    注意

    一个步骤只能在管道的步骤列表或条件步骤的 if/else 步骤列表中出现一次。不能同时出现在两者中。

    from sagemaker.workflow.pipeline import Pipeline pipeline_name = f"AbalonePipeline" pipeline = Pipeline( name=pipeline_name, parameters=[ processing_instance_count, model_approval_status, input_data, batch_data, ], steps=[step_process, step_train, step_eval, step_cond], )
  2. (可选)检查 JSON 管道定义以确保其格式正确。

    import json json.loads(pipeline.definition())

此管道定义已准备好提交给 SageMaker。在下一个教程中,您将此管道提交给 SageMaker 并启动执行。

下一步:运行管道