定义模型构建管道 - Amazon SageMaker
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

定义模型构建管道

要使用 Amazon SageMaker 模型构建管道协调工作流程,您需要以 JSON 管道定义的形式生成有向无环图 (DAG)。 下图是您在本教程中创建的管道 DAG 的表示形式:

管道定向无环图 (DAG) 示例。

你可以使用 SageMaker Python 软件开发工具包生成 JSON 管道定义。 以下教程展示了如何为管道生成管道定义,该管道求解了回归问题,从而根据鲍鱼的物理测量结果确定鲍鱼的年龄。有关包含本教程中可供您运行的内容的 Jupyter 笔记本,请参阅使用 Ama SageMaker zon 模型构建管道编排作业

先决条件

要运行以下教程,必须执行以下操作:

  • 按照创建笔记本实例中所述的步骤设置笔记本实例。这使您的角色有权读取和写入 Amazon S3,并在中创建训练、批量转换和处理任务 SageMaker。

  • 授予笔记本获取和传递自身角色的权限,如修改角色权限策略中所示。添加以下 JSON 代码片段以将此策略附加到您的角色。将 <your-role-arn> 替换为用于创建笔记本实例的 ARN。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "iam:GetRole", "iam:PassRole" ], "Resource": "<your-role-arn>" } ] }
  • 按照修改角色信任策略中的步骤信任 SageMaker 服务主体。将以下语句片段添加到角色的信任关系中:

    { "Sid": "", "Effect": "Allow", "Principal": { "Service": "sagemaker.amazonaws.com" }, "Action": "sts:AssumeRole" }

设置环境

使用以下代码块创建新 SageMaker 会话。这将返回会话的角色 ARN。此角色 ARN 应是您设置为先决条件的执行角色 ARN。

import boto3 import sagemaker import sagemaker.session from sagemaker.workflow.pipeline_context import PipelineSession region = boto3.Session().region_name sagemaker_session = sagemaker.session.Session() role = sagemaker.get_execution_role() default_bucket = sagemaker_session.default_bucket() pipeline_session = PipelineSession() model_package_group_name = f"AbaloneModelPackageGroupName"

创建管道

重要

允许 Amazon SageMaker Studio 或 Amazon SageMaker Studio Classic 创建亚马逊 SageMaker资源的自定义 IAM 策略还必须授予向这些资源添加标签的权限。需要向资源添加标签的权限,因为 Studio 和 Studio Classic 会自动标记他们创建的任何资源。如果 IAM 策略允许 Studio 和 Studio Classic 创建资源但不允许标记,则在尝试创建资源时可能会出现 AccessDenied “” 错误。有关更多信息,请参阅 提供为资源添加标签 SageMaker的权限

Amazon 适用于亚马逊的托管政策 SageMaker授予创建 SageMaker 资源的权限已经包括在创建这些资源时添加标签的权限。

在 n SageMaker otebook 实例中运行以下步骤来创建管道,包括预处理、训练、评估、条件评估和模型注册等步骤。

步骤 1:下载数据集

此笔记本使用 UCI 机器学习鲍鱼数据集。该数据集包含以下特征:

  • length - 鲍鱼外壳最长测量值。

  • diameter - 垂直于长度方向的鲍鱼直径。

  • height - 带肉鲍鱼在壳内的高度。

  • whole_weight - 整只鲍鱼的重量。

  • shucked_weight - 从鲍鱼身上取出的肉的重量。

  • viscera_weight - 鲍鱼内脏出血后的重量。

  • shell_weight - 去肉和干燥后鲍鱼壳的重量。

  • sex - 鲍鱼的性别。“M”、“F”或“I”中的一个,其中“I”是幼鲍。

  • rings - 鲍鱼壳上的环数。

鲍鱼壳上的环数是其年龄的近似值,计算公式为 age=rings + 1.5。然而,获取这个数字是一项耗时的任务。您必须从锥体上切壳,将切面染色,然后通过显微镜计算环数。不过,其他物理测量值比较容易确定。此笔记本使用该数据集,利用其他物理测量值来构建 rings 变量的预测模型。

下载数据集
  1. 将数据集下载到您账户的默认 Amazon S3 存储桶中。

    !mkdir -p data local_path = "data/abalone-dataset.csv" s3 = boto3.resource("s3") s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file( "dataset/abalone-dataset.csv", local_path ) base_uri = f"s3://{default_bucket}/abalone" input_data_uri = sagemaker.s3.S3Uploader.upload( local_path=local_path, desired_s3_uri=base_uri, ) print(input_data_uri)
  2. 创建模型后,下载第二个数据集进行批量转换。

    local_path = "data/abalone-dataset-batch.csv" s3 = boto3.resource("s3") s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file( "dataset/abalone-dataset-batch", local_path ) base_uri = f"s3://{default_bucket}/abalone" batch_data_uri = sagemaker.s3.S3Uploader.upload( local_path=local_path, desired_s3_uri=base_uri, ) print(batch_data_uri)

步骤 2:定义管道参数

此代码块为您的管道定义了以下参数:

  • processing_instance_count - 处理作业的实例数。

  • input_data - 输入数据在 Amazon S3 中的位置。

  • batch_data - 用于批量转换的输入数据在 Amazon S3 中的位置。

  • model_approval_status - 为 CI/CD 注册已训练模型的批准状态。有关更多信息,请参阅 使用项目自动执行 MLOP SageMaker

from sagemaker.workflow.parameters import ( ParameterInteger, ParameterString, ) processing_instance_count = ParameterInteger( name="ProcessingInstanceCount", default_value=1 ) model_approval_status = ParameterString( name="ModelApprovalStatus", default_value="PendingManualApproval" ) input_data = ParameterString( name="InputData", default_value=input_data_uri, ) batch_data = ParameterString( name="BatchData", default_value=batch_data_uri, )

步骤 3:定义特征工程的处理步骤

本节介绍如何创建一个处理步骤,从数据集中准备用于训练的数据。

创建处理步骤
  1. 为处理脚本创建目录。

    !mkdir -p abalone
  2. /abalone 目录中创建一个包含以下内容的名为 preprocessing.py 的文件。此预处理脚本将传递到处理步骤,用于执行输入数据。然后,训练步骤使用预处理的训练特征和标签来训练模型,评估步骤使用经过训练的模型和预处理的测试特征和标签来评估模型。该脚本使用 scikit-learn 执行以下操作:

    • 填入缺失的 sex 分类数据并对其进行编码,使其适合训练。

    • 缩放和标准化除 ringssex 之外的所有数值字段。

    • 将数据拆分为训练、测试和验证数据集。

    %%writefile abalone/preprocessing.py import argparse import os import requests import tempfile import numpy as np import pandas as pd from sklearn.compose import ColumnTransformer from sklearn.impute import SimpleImputer from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler, OneHotEncoder # Because this is a headerless CSV file, specify the column names here. feature_columns_names = [ "sex", "length", "diameter", "height", "whole_weight", "shucked_weight", "viscera_weight", "shell_weight", ] label_column = "rings" feature_columns_dtype = { "sex": str, "length": np.float64, "diameter": np.float64, "height": np.float64, "whole_weight": np.float64, "shucked_weight": np.float64, "viscera_weight": np.float64, "shell_weight": np.float64 } label_column_dtype = {"rings": np.float64} def merge_two_dicts(x, y): z = x.copy() z.update(y) return z if __name__ == "__main__": base_dir = "/opt/ml/processing" df = pd.read_csv( f"{base_dir}/input/abalone-dataset.csv", header=None, names=feature_columns_names + [label_column], dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype) ) numeric_features = list(feature_columns_names) numeric_features.remove("sex") numeric_transformer = Pipeline( steps=[ ("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler()) ] ) categorical_features = ["sex"] categorical_transformer = Pipeline( steps=[ ("imputer", SimpleImputer(strategy="constant", fill_value="missing")), ("onehot", OneHotEncoder(handle_unknown="ignore")) ] ) preprocess = ColumnTransformer( transformers=[ ("num", numeric_transformer, numeric_features), ("cat", categorical_transformer, categorical_features) ] ) y = df.pop("rings") X_pre = preprocess.fit_transform(df) y_pre = y.to_numpy().reshape(len(y), 1) X = np.concatenate((y_pre, X_pre), axis=1) np.random.shuffle(X) train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))]) pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False) pd.DataFrame(validation).to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False) pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)
  3. 创建要传递到处理步骤的 SKLearnProcessor 的实例。

    from sagemaker.sklearn.processing import SKLearnProcessor framework_version = "0.23-1" sklearn_processor = SKLearnProcessor( framework_version=framework_version, instance_type="ml.m5.xlarge", instance_count=processing_instance_count, base_job_name="sklearn-abalone-process", sagemaker_session=pipeline_session, role=role, )
  4. 创建处理步骤。此步骤采用 SKLearnProcessor、输入和输出通道以及您创建的 preprocessing.py 脚本。这与 SageMaker Python SDK 中处理器实例的run方法非常相似。传入 ProcessingStepinput_data 参数是步骤本身的输入数据。处理器实例运行时会使用这些输入数据。

    请注意在处理作业的输出配置中指定的 "train"validation"test" 命名通道。诸如此类的步骤 Properties 可在后续步骤中使用,并在执行时解析为其运行时值。

    from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep processor_args = sklearn_processor.run( inputs=[ ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), ], outputs=[ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ], code="abalone/preprocessing.py", ) step_process = ProcessingStep( name="AbaloneProcess", step_args=processor_args )

步骤 4:定义训练步骤

本节介绍如何使用 SageMaker XGBoost 算法根据处理步骤输出的训练数据训练模型。

定义训练步骤
  1. 指定要保存训练模型的模型路径。

    model_path = f"s3://{default_bucket}/AbaloneTrain"
  2. 为 XGBoost 算法和输入数据集配置估算器。训练实例类型传递到估算器中。典型的训练脚本从输入通道加载数据,使用超参数配置训练,训练模型,并将模型保存到model_dir以便日后托管。 SageMaker 在训练作业结束model.tar.gz时将模型以 a 的形式上传到 Amazon S3。

    from sagemaker.estimator import Estimator image_uri = sagemaker.image_uris.retrieve( framework="xgboost", region=region, version="1.0-1", py_version="py3", instance_type="ml.m5.xlarge" ) xgb_train = Estimator( image_uri=image_uri, instance_type="ml.m5.xlarge", instance_count=1, output_path=model_path, sagemaker_session=pipeline_session, role=role, ) xgb_train.set_hyperparameters( objective="reg:linear", num_round=50, max_depth=5, eta=0.2, gamma=4, min_child_weight=6, subsample=0.7, silent=0 )
  3. 使用估算器实例和 ProcessingStep 的属性创建 TrainingStep。特别是,将 "train""validation" 输出通道的 S3Uri 传递到 TrainingStep。 

    from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TrainingStep train_args = xgb_train.fit( inputs={ "train": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv" ), "validation": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv" ) }, ) step_train = TrainingStep( name="AbaloneTrain", step_args = train_args )

步骤 5:定义模型评估的处理步骤

本节介绍如何创建处理步骤以评估模型的精度。在条件步骤中使用此模型评估的结果来确定要采用哪条执行路径。

定义模型评估的处理步骤
  1. 在名为 evaluation.py/abalone 目录中创建一个文件。此脚本在处理步骤中用于执行模型评估。它以经过训练的模型和测试数据集作为输入,然后生成包含分类评估指标的 JSON 文件。

    %%writefile abalone/evaluation.py import json import pathlib import pickle import tarfile import joblib import numpy as np import pandas as pd import xgboost from sklearn.metrics import mean_squared_error if __name__ == "__main__": model_path = f"/opt/ml/processing/model/model.tar.gz" with tarfile.open(model_path) as tar: tar.extractall(path=".") model = pickle.load(open("xgboost-model", "rb")) test_path = "/opt/ml/processing/test/test.csv" df = pd.read_csv(test_path, header=None) y_test = df.iloc[:, 0].to_numpy() df.drop(df.columns[0], axis=1, inplace=True) X_test = xgboost.DMatrix(df.values) predictions = model.predict(X_test) mse = mean_squared_error(y_test, predictions) std = np.std(y_test - predictions) report_dict = { "regression_metrics": { "mse": { "value": mse, "standard_deviation": std }, }, } output_dir = "/opt/ml/processing/evaluation" pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True) evaluation_path = f"{output_dir}/evaluation.json" with open(evaluation_path, "w") as f: f.write(json.dumps(report_dict))
  2. 创建 ScriptProcessor 的实例,用于创建 ProcessingStep

    from sagemaker.processing import ScriptProcessor script_eval = ScriptProcessor( image_uri=image_uri, command=["python3"], instance_type="ml.m5.xlarge", instance_count=1, base_job_name="script-abalone-eval", sagemaker_session=pipeline_session, role=role, )
  3. ProcessingStep使用处理器实例、输入和输出通道以及evaluation.py脚本创建。 特别是,传入step_train训练步骤中的S3ModelArtifacts属性,以及step_process处理步骤S3Uri"test"输出通道的属性。 这与 SageMaker Python SDK 中处理器实例的run方法非常相似。 

    from sagemaker.workflow.properties import PropertyFile evaluation_report = PropertyFile( name="EvaluationReport", output_name="evaluation", path="evaluation.json" ) eval_args = script_eval.run( inputs=[ ProcessingInput( source=step_train.properties.ModelArtifacts.S3ModelArtifacts, destination="/opt/ml/processing/model" ), ProcessingInput( source=step_process.properties.ProcessingOutputConfig.Outputs[ "test" ].S3Output.S3Uri, destination="/opt/ml/processing/test" ) ], outputs=[ ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"), ], code="abalone/evaluation.py", ) step_eval = ProcessingStep( name="AbaloneEval", step_args=eval_args, property_files=[evaluation_report], )

步骤 6: CreateModelStep 为批量转换定义一个

重要

我们建议使用从 P 模型步骤 ython 软件开发工具包的 2.90.0 版本开始创建模型。 SageMaker CreateModelStep将继续在以前版本的 SageMaker Python SDK 中运行,但不再受支持。

本节介绍如何根据训练步骤的输出创建 SageMaker 模型。此模型用于对新数据集进行批量转换。此步骤将传入条件步骤,并且仅在条件步骤评估为 true 时才会执行。

CreateModelStep 为批量转换定义一个
  1. 创建 SageMaker 模型。从 step_train 训练步骤传入 S3ModelArtifacts 属性。

    from sagemaker.model import Model model = Model( image_uri=image_uri, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, sagemaker_session=pipeline_session, role=role, )
  2. 为您的 SageMaker 模型定义模型输入。

    from sagemaker.inputs import CreateModelInput inputs = CreateModelInput( instance_type="ml.m5.large", accelerator_type="ml.eia1.medium", )
  3. CreateModelStep使用您定义的CreateModelInput和 SageMaker 模型实例创建您的。

    from sagemaker.workflow.steps import CreateModelStep step_create_model = CreateModelStep( name="AbaloneCreateModel", model=model, inputs=inputs, )

步骤 7:定义一个 TransformStep 以执行批量转换

本节介绍如何在模型训练完毕后创建 TransformStep 以对数据集执行批量转换。此步骤将传入条件步骤,并且仅在条件步骤评估为 true 时才会执行。

要定义 TransformStep 要执行批量转换
  1. 使用适当的计算实例类型、实例数量和所需的输出 Amazon S3 存储桶 URI 创建转换器实例。从 step_create_model CreateModel 步骤传入 ModelName 属性。

    from sagemaker.transformer import Transformer transformer = Transformer( model_name=step_create_model.properties.ModelName, instance_type="ml.m5.xlarge", instance_count=1, output_path=f"s3://{default_bucket}/AbaloneTransform" )
  2. 使用您定义的转换器实例和 batch_data 管道参数创建 TransformStep

    from sagemaker.inputs import TransformInput from sagemaker.workflow.steps import TransformStep step_transform = TransformStep( name="AbaloneTransform", transformer=transformer, inputs=TransformInput(data=batch_data) )

步骤 8:定义创建 Model Package 的 RegisterModel 步骤

重要

我们建议使用从 P 模型步骤 ython SDK 版本 2.90.0 起注册模型。 SageMaker RegisterModel将继续在以前版本的 SageMaker Python SDK 中运行,但不再受支持。

本节介绍如何构造 RegisterModel 的实例。在管道中执行 RegisterModel 会得到一个模型包。模型包是一种可重复使用的模型构件抽象,它封装了推理所需的所有要素。它由一个定义要使用的推理映像的推理规范和一个可选的模型权重位置组成。模型包组是模型包的集合。您可以使用 fo ModelPackageGroup r Pipelines 在每次执行 SageMaker 管道时向组中添加新版本和模型包。有关模型注册表的更多信息,请参阅使用模型注册表注册和部署模型

此步骤将传入条件步骤,并且仅在条件步骤评估为 true 时才会执行。

定义创建模型包的 RegisterModel 步骤
  • 使用您用于训练步骤的估算器实例构造一个 RegisterModel 步骤。从 step_train 训练步骤传入 S3ModelArtifacts 属性并指定 ModelPackageGroup。 SageMaker 管道为您创建了这个ModelPackageGroup

    from sagemaker.model_metrics import MetricsSource, ModelMetrics from sagemaker.workflow.step_collections import RegisterModel model_metrics = ModelMetrics( model_statistics=MetricsSource( s3_uri="{}/evaluation.json".format( step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"] ), content_type="application/json" ) ) step_register = RegisterModel( name="AbaloneRegisterModel", estimator=xgb_train, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, content_types=["text/csv"], response_types=["text/csv"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name=model_package_group_name, approval_status=model_approval_status, model_metrics=model_metrics )

步骤 9:定义条件步骤以验证模型精度

A ConditionStep 允许 Pipelin SageMaker es 支持基于步骤属性的条件在管道 DAG 中执行条件。在这种情况下,只有在模型评估步骤确定的模型精度超过所需值时,才需要注册模型包。如果精度超过所需值,管道还会创建一个 SageMaker 模型并对数据集运行批量转换。本节介绍如何定义条件步骤。

定义条件步骤以验证模型精度
  1. 使用模型评估处理步骤 step_eval 的输出中找到的精度值定义 ConditionLessThanOrEqualTo 条件。使用您在处理步骤中编制索引的属性文件以及均方误差值 "mse" 的相应 JSONPath 来获取此输出。

    from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.functions import JsonGet cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step_name=step_eval.name, property_file=evaluation_report, json_path="regression_metrics.mse.value" ), right=6.0 )
  2. 构造一个 ConditionStep。传入 ConditionEquals 条件,如果条件通过,则将模型包注册和批量转换步骤设置为后续步骤。

    step_cond = ConditionStep( name="AbaloneMSECond", conditions=[cond_lte], if_steps=[step_register, step_create_model, step_transform], else_steps=[], )

步骤 10:创建管道

现在,您已经创建了所有步骤,请将它们组合成一个管道。

要创建管道
  1. 为您的管道定义以下内容:nameparameterssteps。名称在 (account, region) 对中必须唯一。

    注意

    一个步骤只能在管道的步骤列表或条件步骤的 if/else 步骤列表中出现一次。不能同时出现在两者中。

    from sagemaker.workflow.pipeline import Pipeline pipeline_name = f"AbalonePipeline" pipeline = Pipeline( name=pipeline_name, parameters=[ processing_instance_count, model_approval_status, input_data, batch_data, ], steps=[step_process, step_train, step_eval, step_cond], )
  2. (可选)检查 JSON 管道定义以确保其格式正确。

    import json json.loads(pipeline.definition())

此管道定义已准备就绪,可以提交给 SageMaker。在下一个教程中,您将此管道提交到 SageMaker 并开始执行。

下一步:运行管道