定义管道 - Amazon SageMaker
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

定义管道

与亚马逊协调您的工作流程 SageMaker 模型构建管道,您需要以 JSON 管道定义的形式生成定向非循环图 (DAG)。 下图是您在本教程中创建的管道 DAG 的表示形式:

您可以使用生成 JSON 管道定义 SageMaker Python 开发工具包。 以下教程展示了如何为管道生成管道定义,该管道可以解决回归问题,以根据鲍鱼的物理测量结果来确定鲍鱼的年龄。有关包含本教程中可以运行的内容的 Jupyter 笔记本,请参阅与亚马逊协调工作 SageMaker 模型构建管道.

先决条件

要运行以下教程,您必须执行以下操作:

  • 按照中的概述设置笔记本实例创建笔记本实例. 这使您的角色能够在 SageMaker 中读取和写入 Amazon S3,以及在 SageMaker 中创建培训、批量转换和处理作业的权限。

  • 授予笔记本权限以获取和传递自己的角色,如中所示修改角色权限策略. 添加以下 JSON 片段以将此策略附加到您的角色。Replace<your-role-arn>使用 ARN 用于创建笔记本实例。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "iam:GetRole", "iam:PassRole" ], "Resource": "<your-role-arn>" } ] }
  • 相信 SageMaker 按照中的步骤执行服务主体修改角色信任策略. 将以下语句片段添加到角色的信任关系中:

    { "Sid": "", "Effect": "Allow", "Principal": { "Service": "sagemaker.amazonaws.com" }, "Action": "sts:AssumeRole" }

设置环境

创建新的 SageMaker 会话使用以下代码块。这将返回会话的角色 ARN。 此角色 ARN 应该是您作为先决条件设置的执行角色 ARN。

import boto3 import sagemaker import sagemaker.session region = boto3.Session().region_name sagemaker_session = sagemaker.session.Session() role = sagemaker.get_execution_role() default_bucket = sagemaker_session.default_bucket() model_package_group_name = f"AbaloneModelPackageGroupName"

创建管道

运行以下步骤 SageMaker notebook 实例来创建管道,包括预处理、训练、评估、条件评估和模型注册的步骤。

第 1 步:下载数据集

本笔记本使用 UCI Machine Learning 鲍鱼数据集。数据集包含以下功能:

  • length— 鲍鱼最长的贝壳测量。

  • diameter— 鲍鱼的直径垂直于它的长度。

  • height— 贝壳里有肉的鲍鱼的高度。

  • whole_weight— 整个鲍鱼的重量。

  • shucked_weight— 从鲍鱼中取出的肉的重量。

  • viscera_weight— 出血后鲍鱼内脏的重量。

  • shell_weight— 肉类去除和干燥后鲍鱼壳的重量。

  • sex— 鲍鱼的性别。“M”、“F” 或 “我” 之一,其中 “我” 是婴儿鲍鱼。

  • rings— 鲍鱼壳中的环形数。

使用公式,鲍鱼壳中的戒指数量对于其年龄来说是一个很好的近似值age=rings + 1.5. 但是,获取这个数字是一项耗时的任务。你必须通过锥体切割外壳,弄脏部分,然后通过显微镜计算戒指的数量。但是,其他物理测量值更容易确定。本笔记本使用数据集使用其他物理测量值来构建可变环的预测模型。

下载数据集

  1. 将数据集下载到您账户的默认 Amazon S3 存储桶中。

    !mkdir -p data local_path = "data/abalone-dataset.csv" s3 = boto3.resource("s3") s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file( "dataset/abalone-dataset.csv", local_path ) base_uri = f"s3://{default_bucket}/abalone" input_data_uri = sagemaker.s3.S3Uploader.upload( local_path=local_path, desired_s3_uri=base_uri, ) print(input_data_uri)
  2. 创建模型后,下载第二个数据集以进行批量转换。

    local_path = "data/abalone-dataset-batch" s3 = boto3.resource("s3") s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file( "dataset/abalone-dataset-batch", local_path ) base_uri = f"s3://{default_bucket}/abalone" batch_data_uri = sagemaker.s3.S3Uploader.upload( local_path=local_path, desired_s3_uri=base_uri, ) print(batch_data_uri)

第 2 步:定义管道参数

此代码块定义了管道的以下参数:

  • processing_instance_type— 该ml.*处理作业的实例类型。

  • processing_instance_count— 处理作业的实例计数。

  • training_instance_type— 该ml.*培训作业的实例类型。

  • input_data— 输入数据的 Amazon S3 位置。

  • batch_data— 用于批处理转换的输入数据的 Amazon S3 位置。

  • model_approval_status— 为 CI/CD 注册训练模型的批准状态。有关更多信息,请参阅 使用实现自动化 mLOP SageMaker 项目

from sagemaker.workflow.parameters import ( ParameterInteger, ParameterString, ) processing_instance_count = ParameterInteger( name="ProcessingInstanceCount", default_value=1 ) processing_instance_type = ParameterString( name="ProcessingInstanceType", default_value="ml.m5.xlarge" ) training_instance_type = ParameterString( name="TrainingInstanceType", default_value="ml.m5.xlarge" ) model_approval_status = ParameterString( name="ModelApprovalStatus", default_value="PendingManualApproval" ) input_data = ParameterString( name="InputData", default_value=input_data_uri, ) batch_data = ParameterString( name="BatchData", default_value=batch_data_uri, )

第 3 步:为要素工程定义处理步骤

本部分将介绍如何创建处理步骤,以便为训练准备数据集中的数据。

创建处理步骤

  1. 为处理脚本创建一个目录。

    !mkdir -p abalone
  2. 在中创建文件/abalone名为的目录preprocessing.py包含以下内容。此预处理脚本将传递到处理步骤,以便对输入数据执行。然后,训练步骤使用预处理的训练要素和标签来训练模型,评估步骤使用训练模型和预处理的测试要素和标签来评估模型。该脚本使用scikit-learn执行以下操作:

    • 填写丢失sex分类数据并对其进行编码,以便适合训练。

    • 缩放和标准化除外的所有数字字段ringssex.

    • 将数据拆分为训练、测试和验证数据集。

    %%writefile abalone/preprocessing.py import argparse import os import requests import tempfile import numpy as np import pandas as pd from sklearn.compose import ColumnTransformer from sklearn.impute import SimpleImputer from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler, OneHotEncoder # Because this is a headerless CSV file, specify the column names here. feature_columns_names = [ "sex", "length", "diameter", "height", "whole_weight", "shucked_weight", "viscera_weight", "shell_weight", ] label_column = "rings" feature_columns_dtype = { "sex": str, "length": np.float64, "diameter": np.float64, "height": np.float64, "whole_weight": np.float64, "shucked_weight": np.float64, "viscera_weight": np.float64, "shell_weight": np.float64 } label_column_dtype = {"rings": np.float64} def merge_two_dicts(x, y): z = x.copy() z.update(y) return z if __name__ == "__main__": base_dir = "/opt/ml/processing" df = pd.read_csv( f"{base_dir}/input/abalone-dataset.csv", header=None, names=feature_columns_names + [label_column], dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype) ) numeric_features = list(feature_columns_names) numeric_features.remove("sex") numeric_transformer = Pipeline( steps=[ ("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler()) ] ) categorical_features = ["sex"] categorical_transformer = Pipeline( steps=[ ("imputer", SimpleImputer(strategy="constant", fill_value="missing")), ("onehot", OneHotEncoder(handle_unknown="ignore")) ] ) preprocess = ColumnTransformer( transformers=[ ("num", numeric_transformer, numeric_features), ("cat", categorical_transformer, categorical_features) ] ) y = df.pop("rings") X_pre = preprocess.fit_transform(df) y_pre = y.to_numpy().reshape(len(y), 1) X = np.concatenate((y_pre, X_pre), axis=1) np.random.shuffle(X) train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))]) pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False) pd.DataFrame(validation).to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False) pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)
  3. 创建的实例SKLearnProcessor进入处理步骤。

    from sagemaker.sklearn.processing import SKLearnProcessor framework_version = "0.23-1" sklearn_processor = SKLearnProcessor( framework_version=framework_version, instance_type=processing_instance_type, instance_count=processing_instance_count, base_job_name="sklearn-abalone-process", role=role, )
  4. 创建处理步骤。这个步骤是在SKLearnProcessor、输入和输出通道以及preprocessing.py你创建的脚本。这与处理器实例非常相似run中的方法 SageMaker Python 开发工具包。这些区域有:input_data传递到的参数ProcessingStep是步骤本身的输入数据。处理器实例在运行时会使用此输入数据。

    注意"train"validation, 和"test"在处理作业的输出配置中指定的命名频道。步骤Properties例如它们可以在后续步骤中使用,并在执行时解析为它们的运行时值。

    from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep step_process = ProcessingStep( name="AbaloneProcess", processor=sklearn_processor, inputs=[ ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), ], outputs=[ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ], code="abalone/preprocessing.py", )

第 4 步:定义训练步骤

本节演示如何使用 SageMaker XGBoost 算法以便根据处理步骤的训练数据输出来训练逻辑回归模型。

定义训练步骤

  1. 指定您要在训练中保存模型的模型路径。

    model_path = f"s3://{default_bucket}/AbaloneTrain"
  2. 为 xgBoost 算法和输入数据集配置估计器。这些区域有:training_instance_type传递到估算程序中。典型的训练脚本从输入通道加载数据、使用超参数配置训练、训练模型并将模型保存到model_dir以便稍后可以托管它。 SageMaker 以一种形式将模型上传到 Amazon S3model.tar.gz在训练作业结束时。

    from sagemaker.estimator import Estimator image_uri = sagemaker.image_uris.retrieve( framework="xgboost", region=region, version="1.0-1", py_version="py3", instance_type=training_instance_type, ) xgb_train = Estimator( image_uri=image_uri, instance_type=training_instance_type, instance_count=1, output_path=model_path, role=role, ) xgb_train.set_hyperparameters( objective="reg:linear", num_round=50, max_depth=5, eta=0.2, gamma=4, min_child_weight=6, subsample=0.7, silent=0 )
  3. 创建TrainingStep使用估计器实例和ProcessingStep。 特别是,请传入S3Uri"train""validation"输出频道到TrainingStep

    from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TrainingStep step_train = TrainingStep( name="AbaloneTrain", estimator=xgb_train, inputs={ "train": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv" ), "validation": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv" ) }, )

第 5 步:定义模型评估的处理步骤

本部分将介绍如何创建处理步骤以评估模型的准确性。此模型评估的结果用于条件步骤,以确定要采取的执行路径。

定义模型评估的处理步骤

  1. 在中创建文件/abalone名为的目录evaluation.py. 此脚本在处理步骤中用于执行模型评估。它接受训练的模型和测试数据集作为输入,然后生成包含分类评估指标的 JSON 文件。

    %%writefile abalone/evaluation.py import json import pathlib import pickle import tarfile import joblib import numpy as np import pandas as pd import xgboost from sklearn.metrics import mean_squared_error if __name__ == "__main__": model_path = f"/opt/ml/processing/model/model.tar.gz" with tarfile.open(model_path) as tar: tar.extractall(path=".") model = pickle.load(open("xgboost-model", "rb")) test_path = "/opt/ml/processing/test/test.csv" df = pd.read_csv(test_path, header=None) y_test = df.iloc[:, 0].to_numpy() df.drop(df.columns[0], axis=1, inplace=True) X_test = xgboost.DMatrix(df.values) predictions = model.predict(X_test) mse = mean_squared_error(y_test, predictions) std = np.std(y_test - predictions) report_dict = { "regression_metrics": { "mse": { "value": mse, "standard_deviation": std }, }, } output_dir = "/opt/ml/processing/evaluation" pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True) evaluation_path = f"{output_dir}/evaluation.json" with open(evaluation_path, "w") as f: f.write(json.dumps(report_dict))
  2. 创建实例ScriptProcessor用于创建ProcessingStep.

    from sagemaker.processing import ScriptProcessor script_eval = ScriptProcessor( image_uri=image_uri, command=["python3"], instance_type=processing_instance_type, instance_count=1, base_job_name="script-abalone-eval", role=role, )
  3. 创建ProcessingStep使用处理器实例、输入和输出通道以及evaluation.py脚本。 特别是,请传入S3ModelArtifacts属性来自step_train训练步骤,以及S3Uri"test"的输出通道step_process处理步骤。 这与处理器实例非常相似run中的方法 SageMaker Python 开发工具包。 

    from sagemaker.workflow.properties import PropertyFile evaluation_report = PropertyFile( name="EvaluationReport", output_name="evaluation", path="evaluation.json" ) step_eval = ProcessingStep( name="AbaloneEval", processor=script_eval, inputs=[ ProcessingInput( source=step_train.properties.ModelArtifacts.S3ModelArtifacts, destination="/opt/ml/processing/model" ), ProcessingInput( source=step_process.properties.ProcessingOutputConfig.Outputs[ "test" ].S3Output.S3Uri, destination="/opt/ml/processing/test" ) ], outputs=[ ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"), ], code="abalone/evaluation.py", property_files=[evaluation_report], )

第 6 步:定义 CreateModelStep 适用于 Batch 转换

本节演示如何创建 SageMaker 来自训练步骤输出的模型。此模型用于对新数据集进行批量转换。此步骤将传递到条件步骤中,并且只有在条件步骤的计算结果为时才执行true.

定义 CreateModelStep 用于批量转换

  1. 创建 SageMaker 模型。传递S3ModelArtifacts属性来自step_train训练步骤。

    from sagemaker.model import Model model = Model( image_uri=image_uri, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, sagemaker_session=sagemaker_session, role=role, )
  2. 为你的定义模型输入 SageMaker 模型。

    from sagemaker.inputs import CreateModelInput inputs = CreateModelInput( instance_type="ml.m5.large", accelerator_type="ml.eia1.medium", )
  3. 创建您的CreateModelStep使用CreateModelInput和 SageMaker 您定义的模型实例。

    from sagemaker.workflow.steps import CreateModelStep step_create_model = CreateModelStep( name="AbaloneCreateModel", model=model, inputs=inputs, )

步骤 7:定义 TransformStep 要执行 Batch 转换

本节演示如何创建TransformStep在训练模型后对数据集执行批处理转换。此步骤将传递到条件步骤中,并且只有在条件步骤的计算结果为时才执行true.

定义 TransformStep 执行批处理转换

  1. 使用适当的计算实例类型、实例计数和所需输出 Amazon S3 存储桶 URI 创建变压器实例。传递ModelName属性来自step_create_model CreateModel步骤。

    from sagemaker.transformer import Transformer transformer = Transformer( model_name=step_create_model.properties.ModelName, instance_type="ml.m5.xlarge", instance_count=1, output_path=f"s3://{default_bucket}/AbaloneTransform" )
  2. 创建TransformStep使用你定义的变压器实例和batch_data管道参数。

    from sagemaker.inputs import TransformInput from sagemaker.workflow.steps import TransformStep step_transform = TransformStep( name="AbaloneTransform", transformer=transformer, inputs=TransformInput(data=batch_data) )

步骤 8:定义 RegisterModel 创建模型 Package 的步骤

本节介绍如何构建RegisterModel. 执行的结果RegisterModel在管道中是模型包。模型包是一种可重复使用的模型工件抽象,它包含推断所需的所有要素。它包含一个推理规范,该规范定义了要使用的推理图像以及可选的模型权重位置。模型包组是模型包的集合。您可以使用ModelPackageGroup为了 SageMaker Pipeline 为每个管道执行添加新版本和模型包到组中。有关模型注册表的更多信息,请参阅使用模型注册表注册并部署模型.

此步骤将传递到条件步骤中,并且只有在条件步骤的计算结果为时才执行true.

定义 RegisterModel 创建模型包

  • 构建RegisterModel步骤使用您用于训练步骤的估计器实例。传递S3ModelArtifacts属性来自step_train训练步骤并指定ModelPackageGroup. SageMaker 管道创造了这个ModelPackageGroup为您而言。

    from sagemaker.model_metrics import MetricsSource, ModelMetrics from sagemaker.workflow.step_collections import RegisterModel model_metrics = ModelMetrics( model_statistics=MetricsSource( s3_uri="{}/evaluation.json".format( step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"] ), content_type="application/json" ) ) step_register = RegisterModel( name="AbaloneRegisterModel", estimator=xgb_train, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, content_types=["text/csv"], response_types=["text/csv"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name=model_package_group_name, approval_status=model_approval_status, model_metrics=model_metrics )

步骤 9:定义条件步骤以验证模型准确性

一个ConditionStep允许 SageMaker 用于根据步骤属性的条件在管道 DAG 中支持条件执行的管道。在这种情况下,只有在模型评估步骤确定的模型的准确性超过所需值时,才想注册模型包。如果精度超过所需值,则管道还会创建 SageMaker 对数据集进行建模并运行批量转换。此节演示如何定义条件步骤。

定义条件步骤以验证模型准确性

  1. 定义ConditionLessThanOrEqualTo条件使用模型评估处理步骤输出中找到的精度值,step_eval. 使用您在处理步骤中索引的属性文件以及平方误差值的相应 JSONPath 获取此输出,"mse".

    from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.functions import JsonGet cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step_name=step_eval.name, property_file=evaluation_report, json_path="regression_metrics.mse.value" ), right=6.0 )
  2. 构建ConditionStep. 传递ConditionEquals条件中,然后将模型包注册和批量转换步骤设置为条件通过后续步骤。

    step_cond = ConditionStep( name="AbaloneMSECond", conditions=[cond_lte], if_steps=[step_register, step_create_model, step_transform], else_steps=[], )

步骤 10:创建管道

既然您已经创建了所有步骤,请将它们合并到管道中。

创建管道

  1. 为管道定义以下内容:nameparameters, 和steps. 名称在(account, region)对。

    注意

    步骤只能在管道的步骤列表或条件步骤的 if/else 步骤列表中出现一次。它不能出现在两者中。

    from sagemaker.workflow.pipeline import Pipeline pipeline_name = f"AbalonePipeline" pipeline = Pipeline( name=pipeline_name, parameters=[ processing_instance_type, processing_instance_count, training_instance_type, model_approval_status, input_data, batch_data, ], steps=[step_process, step_train, step_eval, step_cond], )
  2. (可选)检查 JSON 管道定义以确保其格式正确。

    import json json.loads(pipeline.definition())

此管道定义已准备好提交给 SageMaker 了。在下一个教程中,您将此管道提交到 SageMaker 然后启动处决。

下一步: 运行管道