定义管道 - Amazon SageMaker
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

定义管道

要使用 编排工作流程Amazon SageMaker Model Building Pipelines,您需要以 JSON 管道定义的形式生成有向无环图 (DAG)。下图是您在本教程中创建的管道 DAG 的表示形式:

您可以使用 SageMaker Python 开发工具包生成 JSON 管道定义。以下教程说明如何为管道生成管道定义,以解决回归问题,从而基于鲍鱼的物理测量值确定其存在时间。有关包含本教程中可运行的 内容的 Jupyter 笔记本,请参阅使用 Amazon SageMaker 模型生成管道编排作业。

Prerequisites

要运行以下教程,您必须执行以下操作:

  • 按照创建笔记本实例中所述设置笔记本实例。这将授予您的角色在 中读取和写入 Amazon S3以及创建训练、批量转换和处理作业的权限SageMaker。

  • 授予笔记本获取和传递其自己的角色的权限,如修改角色权限策略中所示。添加以下 JSON 代码段以将此策略附加到您的角色。将<your-role-arn> 替换为用于创建笔记本实例的 ARN。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "iam:GetRole", "iam:PassRole" ], "Resource": "<your-role-arn>" } ] }
  • 按照修改角色信任策略SageMaker中的步骤,信任 服务委托人。将以下语句片段添加到角色的信任关系中:

    { "Sid": "", "Effect": "Allow", "Principal": { "Service": "sagemaker.amazonaws.com" }, "Action": "sts:AssumeRole" }

设置环境

使用以下代码块创建新SageMaker会话。这将返回会话的角色 ARN。此角色 ARN 应是您设置为先决条件的执行角色 ARN。

import boto3 import sagemaker import sagemaker.session region = boto3.Session().region_name sagemaker_session = sagemaker.session.Session() role = sagemaker.get_execution_role() default_bucket = sagemaker_session.default_bucket() model_package_group_name = f"AbaloneModelPackageGroupName"

创建管道

从SageMaker笔记本实例运行以下步骤以创建管道,包括用于预处理、训练、评估、条件评估和模型注册的步骤。

步骤 1:下载数据集

此笔记本使用 UCI Machine Learning 独立数据集。数据集包含以下功能:

  • length – 鲍鱼的最长 Shell 测量值。

  • diameter – 独立于其长度的鲍鱼直径。

  • height – 在外壳中具有肉皮的鲍鱼的高度。

  • whole_weight – 整个鲍鱼的权重。

  • shucked_weight – 从鲍鱼中删除的肉类的权重。

  • viscera_weight – 出血后鲍鱼Viscera 的权重。

  • shell_weight – 分离和干燥动物后鲍鱼外壳的权重。

  • sex – 鲍鱼的性别。“M”、“F”或“I”之一,其中“I”是母鱼。

  • rings – 鲍鱼外壳中的环形数。

对于使用公式 的年龄,Apalanex shell 中的环形数是一个很好的近似值age=rings + 1.5。 但是,获取此数字是一项耗时的任务。您必须通过锥体剪切外壳,对部分进行着色,并计算通过管道的环数。但是,其他物理测量值更易于确定。此笔记本使用数据集来构建使用其他物理测量值对变量进行环形的预测模型。

下载数据集

  1. 将数据集下载到您账户的默认 Amazon S3 存储桶中。

    !mkdir -p data local_path = "data/abalone-dataset.csv" s3 = boto3.resource("s3") s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file( "dataset/abalone-dataset.csv", local_path ) base_uri = f"s3://{default_bucket}/abalone" input_data_uri = sagemaker.s3.S3Uploader.upload( local_path=local_path, desired_s3_uri=base_uri, ) print(input_data_uri)
  2. 在创建模型后,下载第二个数据集以进行批量转换。

    local_path = "data/abalone-dataset-batch" s3 = boto3.resource("s3") s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file( "dataset/abalone-dataset-batch", local_path ) base_uri = f"s3://{default_bucket}/abalone" batch_data_uri = sagemaker.s3.S3Uploader.upload( local_path=local_path, desired_s3_uri=base_uri, ) print(batch_data_uri)

步骤 2:定义管道参数

此代码块为您的管道定义以下参数:

  • processing_instance_type – 处理作业ml.*的实例类型。

  • processing_instance_count – 处理作业的实例计数。

  • training_instance_type – 训练作业ml.*的实例类型。

  • input_data – 输入数据Amazon S3的位置。

  • batch_data – 用于批量转换的输入数据Amazon S3的位置。

  • model_approval_status – 向 注册训练模型以用于 CI/CD 的审批状态。有关更多信息,请参阅通过SageMaker项目自动完成 MLOps .

from sagemaker.workflow.parameters import ( ParameterInteger, ParameterString, ) processing_instance_count = ParameterInteger( name="ProcessingInstanceCount", default_value=1 ) processing_instance_type = ParameterString( name="ProcessingInstanceType", default_value="ml.m5.xlarge" ) training_instance_type = ParameterString( name="TrainingInstanceType", default_value="ml.m5.xlarge" ) model_approval_status = ParameterString( name="ModelApprovalStatus", default_value="PendingManualApproval" ) input_data = ParameterString( name="InputData", default_value=input_data_uri, ) batch_data = ParameterString( name="BatchData", default_value=batch_data_uri, )

步骤 3:定义特征工程的处理步骤

本节介绍如何创建处理步骤来准备数据集中的数据以进行训练。

创建处理步骤

  1. 为处理脚本创建一个目录。

    !mkdir -p abalone
  2. 在名为 的/abalone目录中创建一个preprocessing.py包含以下内容的文件。此预处理脚本将传递到处理步骤以对输入数据执行。然后,训练步骤使用经过预处理的训练功能和标签来训练模型,评估步骤使用经过训练的模型以及经过预处理的测试功能和标签来评估模型。脚本使用scikit-learn 执行以下操作:

    • 填写缺失的sex分类数据并对其进行编码,使其适合用于训练。

    • 缩放和规范化除 rings 和 之外的所有数字字段sex

    • 将数据拆分为训练、测试和验证数据集。

    %%writefile abalone/preprocessing.py import argparse import os import requests import tempfile import numpy as np import pandas as pd from sklearn.compose import ColumnTransformer from sklearn.impute import SimpleImputer from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler, OneHotEncoder # Because this is a headerless CSV file, specify the column names here. feature_columns_names = [ "sex", "length", "diameter", "height", "whole_weight", "shucked_weight", "viscera_weight", "shell_weight", ] label_column = "rings" feature_columns_dtype = { "sex": str, "length": np.float64, "diameter": np.float64, "height": np.float64, "whole_weight": np.float64, "shucked_weight": np.float64, "viscera_weight": np.float64, "shell_weight": np.float64 } label_column_dtype = {"rings": np.float64} def merge_two_dicts(x, y): z = x.copy() z.update(y) return z if __name__ == "__main__": base_dir = "/opt/ml/processing" df = pd.read_csv( f"{base_dir}/input/abalone-dataset.csv", header=None, names=feature_columns_names + [label_column], dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype) ) numeric_features = list(feature_columns_names) numeric_features.remove("sex") numeric_transformer = Pipeline( steps=[ ("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler()) ] ) categorical_features = ["sex"] categorical_transformer = Pipeline( steps=[ ("imputer", SimpleImputer(strategy="constant", fill_value="missing")), ("onehot", OneHotEncoder(handle_unknown="ignore")) ] ) preprocess = ColumnTransformer( transformers=[ ("num", numeric_transformer, numeric_features), ("cat", categorical_transformer, categorical_features) ] ) y = df.pop("rings") X_pre = preprocess.fit_transform(df) y_pre = y.to_numpy().reshape(len(y), 1) X = np.concatenate((y_pre, X_pre), axis=1) np.random.shuffle(X) train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))]) pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False) pd.DataFrame(validation).to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False) pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)
  3. 创建SKLearnProcessor 的实例以传入处理步骤。

    from sagemaker.sklearn.processing import SKLearnProcessor framework_version = "0.23-1" sklearn_processor = SKLearnProcessor( framework_version=framework_version, instance_type=processing_instance_type, instance_count=processing_instance_count, base_job_name="sklearn-abalone-process", role=role, )
  4. 创建处理步骤。此步骤将进入 SKLearnProcessor、输入和输出通道以及您创建的preprocessing.py脚本。这与 run Python 开发工具包中处理器实例的SageMaker 方法非常相似。传入的input_data 参数ProcessingStep是步骤本身的输入数据。此输入数据由处理器实例在运行时使用。

    请注意处理作业的输出配置中指定的 "train"validation"test" 命名通道。诸如此类的步骤Properties可在后续步骤中使用,并在执行时解析为其运行时值。

    from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep step_process = ProcessingStep( name="AbaloneProcess", processor=sklearn_processor, inputs=[ ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), ], outputs=[ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ], code="abalone/preprocessing.py", )

步骤 4:定义训练步骤

本节介绍如何使用 SageMaker XGBoost 算法在处理步骤的训练数据输出上训练逻辑回归模型。

定义训练步骤

  1. 指定要从训练中保存模型的模型路径。

    model_path = f"s3://{default_bucket}/AbaloneTrain"
  2. 为XGBoost算法和输入数据集配置评估程序。training_instance_type 传递到评估程序中。典型的训练脚本从输入通道加载数据,使用超参数配置训练,训练模型,并将模型保存到model_dir ,以便稍后可以托管该模型。 在训练作业结束时SageMaker以Amazon S3 的形式将模型model.tar.gz上传到 。

    from sagemaker.estimator import Estimator image_uri = sagemaker.image_uris.retrieve( framework="xgboost", region=region, version="1.0-1", py_version="py3", instance_type=training_instance_type, ) xgb_train = Estimator( image_uri=image_uri, instance_type=training_instance_type, instance_count=1, output_path=model_path, role=role, ) xgb_train.set_hyperparameters( objective="reg:linear", num_round=50, max_depth=5, eta=0.2, gamma=4, min_child_weight=6, subsample=0.7, silent=0 )
  3. 使用 的评估程序实例和属性创建TrainingStep ProcessingStep。特别是,将 S3Uri"train" 输出通道"validation"的 传递到 TrainingStep。 

    from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TrainingStep step_train = TrainingStep( name="AbaloneTrain", estimator=xgb_train, inputs={ "train": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv" ), "validation": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv" ) }, )

步骤 5:定义模型评估的处理步骤

本节介绍如何创建处理步骤来评估模型的准确性。此模型评估结果在条件步骤中用于确定要采用的执行路径。

定义模型评估的处理步骤

  1. 在名为 的/abalone目录中创建一个文件evaluation.py。 此脚本在处理步骤中用于执行模型评估。它采用经过训练的模型和测试数据集作为输入,然后生成一个包含分类评估指标的 JSON 文件。这些指标包括每个标签的精度、召回率和 F1 分数,以及模型的准确度和 ROC 曲线。

    %%writefile abalone/evaluation.py import json import pathlib import pickle import tarfile import joblib import numpy as np import pandas as pd import xgboost from sklearn.metrics import mean_squared_error if __name__ == "__main__": model_path = f"/opt/ml/processing/model/model.tar.gz" with tarfile.open(model_path) as tar: tar.extractall(path=".") model = pickle.load(open("xgboost-model", "rb")) test_path = "/opt/ml/processing/test/test.csv" df = pd.read_csv(test_path, header=None) y_test = df.iloc[:, 0].to_numpy() df.drop(df.columns[0], axis=1, inplace=True) X_test = xgboost.DMatrix(df.values) predictions = model.predict(X_test) mse = mean_squared_error(y_test, predictions) std = np.std(y_test - predictions) report_dict = { "regression_metrics": { "mse": { "value": mse, "standard_deviation": std }, }, } output_dir = "/opt/ml/processing/evaluation" pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True) evaluation_path = f"{output_dir}/evaluation.json" with open(evaluation_path, "w") as f: f.write(json.dumps(report_dict))
  2. 创建用于创建 ScriptProcessor 的 实例ProcessingStep

    from sagemaker.processing import ScriptProcessor script_eval = ScriptProcessor( image_uri=image_uri, command=["python3"], instance_type=processing_instance_type, instance_count=1, base_job_name="script-abalone-eval", role=role, )
  3. 使用处理器实例、输入和输出通道以及 ProcessingStep 脚本创建一个evaluation.py 。特别是,传入S3ModelArtifacts来自训练步骤step_trainS3Uri 属性,以及"test"处理步骤step_process的输出通道的 。这与 run Python 开发工具包中处理器实例的SageMaker 方法非常相似。 

    from sagemaker.workflow.properties import PropertyFile evaluation_report = PropertyFile( name="EvaluationReport", output_name="evaluation", path="evaluation.json" ) step_eval = ProcessingStep( name="AbaloneEval", processor=script_eval, inputs=[ ProcessingInput( source=step_train.properties.ModelArtifacts.S3ModelArtifacts, destination="/opt/ml/processing/model" ), ProcessingInput( source=step_process.properties.ProcessingOutputConfig.Outputs[ "test" ].S3Output.S3Uri, destination="/opt/ml/processing/test" ) ], outputs=[ ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"), ], code="abalone/evaluation.py", property_files=[evaluation_report], )

步骤 6:定义 CreateModelStep 进行批量转换

本节介绍如何从训练步骤的输出创建SageMaker模型。此模型用于在新数据集上进行批量转换。此步骤将传递到条件步骤,并且仅在条件步骤计算为 时才执行true

CreateModelStep 为批量转换定义

  1. 创建SageMaker模型。从S3ModelArtifacts训练步骤传入 step_train 属性。

    from sagemaker.model import Model model = Model( image_uri=image_uri, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, sagemaker_session=sagemaker_session, role=role, )
  2. 为您的模型定义SageMaker模型输入。

    from sagemaker.inputs import CreateModelInput inputs = CreateModelInput( instance_type="ml.m5.large", accelerator_type="ml.eia1.medium", )
  3. CreateModelStep 使用定义的 CreateModelInput 和 SageMaker 模型实例创建 。

    from sagemaker.workflow.steps import CreateModelStep step_create_model = CreateModelStep( name="AbaloneCreateModel", model=model, inputs=inputs, )

步骤 7:定义 TransformStep 以执行批量转换

本节介绍如何创建 TransformStep ,以便在训练模型后对数据集执行批量转换。此步骤将传递到 条件步骤,并且仅在条件步骤计算为 时才执行true

定义 TransformStep 以执行批量转换

  1. 使用适当的计算实例类型、实例计数和所需的输出Amazon S3存储桶 URI 创建转换器实例。从 ModelName step_create_model 步骤传入 CreateModel 属性。

    from sagemaker.transformer import Transformer transformer = Transformer( model_name=step_create_model.properties.ModelName, instance_type="ml.m5.xlarge", instance_count=1, output_path=f"s3://{default_bucket}/AbaloneTransform" )
  2. TransformStep 使用您定义的转换器实例和 batch_data 管道参数创建 。

    from sagemaker.inputs import TransformInput from sagemaker.workflow.steps import TransformStep step_transform = TransformStep( name="AbaloneTransform", transformer=transformer, inputs=TransformInput(data=batch_data) )

步骤 8:定义创建模型包RegisterModel的步骤

本节介绍如何构建 的实例RegisterModel。 在管道RegisterModel中执行 的结果是模型包。模型包是一种可重用的模型构件抽象,可打包推理所需的所有成分。它包含一个推理规范,该规范定义要与可选的模型权重位置一起使用的推理图像。模型包组是模型包的集合。您可以使用适用于 ModelPackageGroup 的 SageMaker Pipelines 为每个管道执行将新版本和模型包添加到 组。有关模型注册表的更多信息,请参阅使用模型注册表注册和部署模型

此步骤将传递到 条件步骤,并且仅在条件步骤计算为 时才执行true

定义创建模型包RegisterModel的步骤

  • 使用您用于训练RegisterModel步骤 的评估程序实例构造步骤。从S3ModelArtifacts训练步骤传入step_train 属性并指定 ModelPackageGroup。 SageMaker Pipelines 会为您ModelPackageGroup创建此属性。

    from sagemaker.model_metrics import MetricsSource, ModelMetrics from sagemaker.workflow.step_collections import RegisterModel model_metrics = ModelMetrics( model_statistics=MetricsSource( s3_uri="{}/evaluation.json".format( step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"] ), content_type="application/json" ) ) step_register = RegisterModel( name="AbaloneRegisterModel", estimator=xgb_train, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, content_types=["text/csv"], response_types=["text/csv"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name=model_package_group_name, approval_status=model_approval_status, model_metrics=model_metrics )

步骤 9:定义条件步骤以验证模型准确度

ConditionStep 允许 根据步骤属性的条件在管道 DAG 中SageMaker Pipelines支持条件执行。在这种情况下,只有在模型评估步骤确定的模型的准确性超过所需值时,您才希望注册模型包。如果精度超过所需值,管道还将创建SageMaker模型并对数据集运行批量转换。本节介绍如何定义 Condition 步骤。

定义条件步骤以验证模型准确性

  1. 使用模型评估处理步骤ConditionLessThanOrEqualTo的输出中的准确率值定义step_eval条件。 使用您在处理步骤中编制索引的属性文件和均方误差值 JSONPath 的相应 来获取此输出"mse"

    from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo from sagemaker.workflow.condition_step import ( ConditionStep, JsonGet, ) cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step=step_eval, property_file=evaluation_report, json_path="regression_metrics.mse.value" ), right=6.0 )
  2. 构造 ConditionStep。 传递ConditionEquals条件,然后将模型包注册和批量转换步骤设置为条件传递的后续步骤。

    step_cond = ConditionStep( name="AbaloneMSECond", conditions=[cond_lte], if_steps=[step_register, step_create_model, step_transform], else_steps=[], )

步骤 10:创建管道

现在,您已创建所有步骤,请将它们合并到管道中。

创建管道

  1. 为您的管道定义以下内容name:、 parameterssteps。 名称在 对(account, region)中必须是唯一的。

    注意

    一个步骤只能出现在管道的步骤列表或条件步骤的 if/else 步骤列表中一次。它不能同时出现在两者中。

    from sagemaker.workflow.pipeline import Pipeline pipeline_name = f"AbalonePipeline" pipeline = Pipeline( name=pipeline_name, parameters=[ processing_instance_type, processing_instance_count, training_instance_type, model_approval_status, input_data, batch_data, ], steps=[step_process, step_train, step_eval, step_cond], )
  2. (可选)检查 JSON 管道定义以确保其格式正确。

    import json json.loads(pipeline.definition())

此管道定义已准备好提交到 SageMaker。在下一个教程中,您将此管道提交到 SageMaker 并开始执行。

下一步: 运行管道