定义管道 - Amazon SageMaker
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

定义管道

要使用 Amazon SageMaker 模型构建管道协调工作流,您需要以 JSON 管道定义的形式生成有向非循环图 (DAG)。 下图是您在本教程中创建的管道 DAG 的表示:

您可以使用 SageMaker Python 软件开发工具包生成 JSON 管道定义。 以下教程介绍如何为管线生成一个管线定义,该定义可解决回归问题,从而根据鲍鱼的物理测量结果确定鲍鱼的年龄。有关包含本教程中可以运行的内容的 Jupyter 笔记本,请参阅使用 Amazon SageMaker 模型构建管道协调作业.

Prerequisites

要运行以下教程,您必须执行以下操作:

  • 设置笔记本实例,如创建笔记本实例. 这将授予您的角色读取和写入 Amazon S3 的权限,以及在 SageMaker 中创建培训、批量转换和处理作业。

  • 授予笔记本电脑获取和传递其自己的角色的权限,如修改角色权限策略. 添加以下 JSON 代码段以将此策略附加到您的角色。Replace<your-role-arn>与用于创建笔记本实例的 ARN 一起使用。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "iam:GetRole", "iam:PassRole" ], "Resource": "<your-role-arn>" } ] }
  • 信任 SageMaker 服务主体,方法是按照修改角色信任策略. 将以下语句片段添加到角色的信任关系中:

    { "Sid": "", "Effect": "Allow", "Principal": { "Service": "sagemaker.amazonaws.com" }, "Action": "sts:AssumeRole" }

设置环境

使用以下代码块创建新的 SageMaker 会话。这将返回会话的角色 ARN。 此角色 ARN 应该是您设置为先决条件的执行角色 ARN。

import boto3 import sagemaker import sagemaker.session region = boto3.Session().region_name sagemaker_session = sagemaker.session.Session() role = sagemaker.get_execution_role() default_bucket = sagemaker_session.default_bucket() model_package_group_name = f"AbaloneModelPackageGroupName"

创建管道

从 SageMaker 笔记本实例中运行以下步骤来创建管道,其中包括预处理、培训、评估、条件评估和模型注册的步骤。

第 1 步:下载数据集

本笔记本使用 UCI Machine Learning 鲍鱼数据集。数据集包含以下功能:

  • length— 鲍鱼的最长壳测量值。

  • diameter— 鲍鱼直径垂直于其长度。

  • height— 鲍鱼与肉在壳的高度。

  • whole_weight— 整个鲍鱼的重量。

  • shucked_weight— 从鲍鱼中删除的肉的重量。

  • viscera_weight— 出血后鲍鱼内脏的重量。

  • shell_weight— 肉类去除和干燥后鲍鱼壳的重量。

  • sex— 鲍鱼的性别。“M”、“F” 或 “I” 之一,其中 “I” 是鲍鱼婴儿。

  • rings— 鲍鱼壳中的环形数。

鲍鱼壳环的数量是一个很好的近似,它的年龄使用公式age=rings + 1.5. 但是,获取此数字是一项耗时的任务。你必须通过锥体切壳,染色部分,并计算环的数量,通过显微镜。但是,其他物理测量值更容易确定。此笔记本使用数据集构建使用其他物理测量值的可变环的预测模型。

下载数据集

  1. 将数据集下载到您账户的默认 Amazon S3 存储桶中。

    !mkdir -p data local_path = "data/abalone-dataset.csv" s3 = boto3.resource("s3") s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file( "dataset/abalone-dataset.csv", local_path ) base_uri = f"s3://{default_bucket}/abalone" input_data_uri = sagemaker.s3.S3Uploader.upload( local_path=local_path, desired_s3_uri=base_uri, ) print(input_data_uri)
  2. 创建模型后,下载第二个数据集以进行批量转换。

    local_path = "data/abalone-dataset-batch" s3 = boto3.resource("s3") s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file( "dataset/abalone-dataset-batch", local_path ) base_uri = f"s3://{default_bucket}/abalone" batch_data_uri = sagemaker.s3.S3Uploader.upload( local_path=local_path, desired_s3_uri=base_uri, ) print(batch_data_uri)

第 2 步:定义管道参数

此代码块为您的管道定义以下参数:

  • processing_instance_typeml.*实例类型的处理作业。

  • processing_instance_count— 处理作业的实例计数。

  • training_instance_typeml.*实例类型的培训作业。

  • input_data— Amazon S3 输入数据的位置。

  • batch_data— 用于批量转换的输入数据的 Amazon S3 位置。

  • model_approval_status— 为 CI/CD 注册训练的模型所使用的批准状态。有关更多信息,请参阅利用 SageMaker 项目实现 MLOP 自动化

from sagemaker.workflow.parameters import ( ParameterInteger, ParameterString, ) processing_instance_count = ParameterInteger( name="ProcessingInstanceCount", default_value=1 ) processing_instance_type = ParameterString( name="ProcessingInstanceType", default_value="ml.m5.xlarge" ) training_instance_type = ParameterString( name="TrainingInstanceType", default_value="ml.m5.xlarge" ) model_approval_status = ParameterString( name="ModelApprovalStatus", default_value="PendingManualApproval" ) input_data = ParameterString( name="InputData", default_value=input_data_uri, ) batch_data = ParameterString( name="BatchData", default_value=batch_data_uri, )

第 3 步:定义特征工程的处理步骤

本部分将介绍如何创建处理步骤以准备数据集中的数据以进行训练。

创建处理步骤

  1. 为处理脚本创建目录。

    !mkdir -p abalone
  2. /abalone名为的目录preprocessing.py,其中包含以下内容。此预处理脚本将传递到处理步骤,以便对输入数据执行。然后,训练步骤使用预处理的训练特征和标签来训练模型,评估步骤使用经过培训的模型和预处理的测试特征和标签来评估模型。该脚本使用scikit-learn执行以下操作:

    • 填写缺失sex分类数据并对其进行编码,以便它适合训练。

    • 缩放和规范化所有数值字段,除了ringssex.

    • 将数据拆分为训练、测试和验证数据集。

    %%writefile abalone/preprocessing.py import argparse import os import requests import tempfile import numpy as np import pandas as pd from sklearn.compose import ColumnTransformer from sklearn.impute import SimpleImputer from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler, OneHotEncoder # Because this is a headerless CSV file, specify the column names here. feature_columns_names = [ "sex", "length", "diameter", "height", "whole_weight", "shucked_weight", "viscera_weight", "shell_weight", ] label_column = "rings" feature_columns_dtype = { "sex": str, "length": np.float64, "diameter": np.float64, "height": np.float64, "whole_weight": np.float64, "shucked_weight": np.float64, "viscera_weight": np.float64, "shell_weight": np.float64 } label_column_dtype = {"rings": np.float64} def merge_two_dicts(x, y): z = x.copy() z.update(y) return z if __name__ == "__main__": base_dir = "/opt/ml/processing" df = pd.read_csv( f"{base_dir}/input/abalone-dataset.csv", header=None, names=feature_columns_names + [label_column], dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype) ) numeric_features = list(feature_columns_names) numeric_features.remove("sex") numeric_transformer = Pipeline( steps=[ ("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler()) ] ) categorical_features = ["sex"] categorical_transformer = Pipeline( steps=[ ("imputer", SimpleImputer(strategy="constant", fill_value="missing")), ("onehot", OneHotEncoder(handle_unknown="ignore")) ] ) preprocess = ColumnTransformer( transformers=[ ("num", numeric_transformer, numeric_features), ("cat", categorical_transformer, categorical_features) ] ) y = df.pop("rings") X_pre = preprocess.fit_transform(df) y_pre = y.to_numpy().reshape(len(y), 1) X = np.concatenate((y_pre, X_pre), axis=1) np.random.shuffle(X) train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))]) pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False) pd.DataFrame(validation).to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False) pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)
  3. 创建的实例SKLearnProcessor传递到处理步骤。

    from sagemaker.sklearn.processing import SKLearnProcessor framework_version = "0.23-1" sklearn_processor = SKLearnProcessor( framework_version=framework_version, instance_type=processing_instance_type, instance_count=processing_instance_count, base_job_name="sklearn-abalone-process", role=role, )
  4. 创建处理步骤。此步骤采用SKLearnProcessor、输入和输出通道以及preprocessing.py脚本。这与处理器实例的runSageMaker。这些区域有:input_data参数传递到ProcessingStep是步骤本身的输入数据。处理器实例在运行时使用此输入数据。

    请注意"train"validation, 和"test"在处理作业的输出配置中指定的命名通道。步骤Properties(例如这些)可以在后续步骤中使用,并在执行时解析为其运行时值。

    from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep step_process = ProcessingStep( name="AbaloneProcess", processor=sklearn_processor, inputs=[ ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), ], outputs=[ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ], code="abalone/preprocessing.py", )

第 4 步:定义培训步骤

本节介绍了如何使用 SageMakerXGBoost 算法根据处理步骤的训练数据输出训练 Logistic 回归模型。

定义训练步骤

  1. 指定要从训练中保存模型的位置的模型路径。

    model_path = f"s3://{default_bucket}/AbaloneTrain"
  2. 为 XGBoost 算法和输入数据集配置估计器。这些区域有:training_instance_type传递给估计程序。典型的训练脚本从输入通道加载数据,使用超参数配置训练,训练模型,并将模型保存到model_dir,以便稍后可以托管它。SageMaker 将模型上传到 Amazon S3,以model.tar.gz在训练作业结束。

    from sagemaker.estimator import Estimator image_uri = sagemaker.image_uris.retrieve( framework="xgboost", region=region, version="1.0-1", py_version="py3", instance_type=training_instance_type, ) xgb_train = Estimator( image_uri=image_uri, instance_type=training_instance_type, instance_count=1, output_path=model_path, role=role, ) xgb_train.set_hyperparameters( objective="reg:linear", num_round=50, max_depth=5, eta=0.2, gamma=4, min_child_weight=6, subsample=0.7, silent=0 )
  3. 创建TrainingStep使用估计器实例和ProcessingStep。 特别是,传递S3Uri"train""validation"输出通道添加到TrainingStep

    from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TrainingStep step_train = TrainingStep( name="AbaloneTrain", estimator=xgb_train, inputs={ "train": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv" ), "validation": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv" ) }, )

第 5 步:定义模型评估的处理步骤

本部分将介绍如何创建处理步骤以评估模型的精度。此模型评估的结果用于条件步骤,以确定要采取的执行路径。

定义模型评估的处理步骤

  1. /abalone名为的目录evaluation.py. 此脚本用于处理步骤来执行模型评估。它将经过训练的模型和测试数据集作为输入,然后生成包含分类评估指标的 JSON 文件。这些指标包括每个标签的精度、召回和 F1 分数,以及模型的准确度和 ROC 曲线。

    %%writefile abalone/evaluation.py import json import pathlib import pickle import tarfile import joblib import numpy as np import pandas as pd import xgboost from sklearn.metrics import mean_squared_error if __name__ == "__main__": model_path = f"/opt/ml/processing/model/model.tar.gz" with tarfile.open(model_path) as tar: tar.extractall(path=".") model = pickle.load(open("xgboost-model", "rb")) test_path = "/opt/ml/processing/test/test.csv" df = pd.read_csv(test_path, header=None) y_test = df.iloc[:, 0].to_numpy() df.drop(df.columns[0], axis=1, inplace=True) X_test = xgboost.DMatrix(df.values) predictions = model.predict(X_test) mse = mean_squared_error(y_test, predictions) std = np.std(y_test - predictions) report_dict = { "regression_metrics": { "mse": { "value": mse, "standard_deviation": std }, }, } output_dir = "/opt/ml/processing/evaluation" pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True) evaluation_path = f"{output_dir}/evaluation.json" with open(evaluation_path, "w") as f: f.write(json.dumps(report_dict))
  2. 创建ScriptProcessor,用于创建ProcessingStep.

    from sagemaker.processing import ScriptProcessor script_eval = ScriptProcessor( image_uri=image_uri, command=["python3"], instance_type=processing_instance_type, instance_count=1, base_job_name="script-abalone-eval", role=role, )
  3. 创建ProcessingStep使用处理器实例、输入和输出通道以及evaluation.py脚本。 特别是,传递S3ModelArtifacts属性step_train训练步骤,以及S3Uri"test"输出通道step_process处理步骤。 这与处理器实例的runSageMaker。 

    from sagemaker.workflow.properties import PropertyFile evaluation_report = PropertyFile( name="EvaluationReport", output_name="evaluation", path="evaluation.json" ) step_eval = ProcessingStep( name="AbaloneEval", processor=script_eval, inputs=[ ProcessingInput( source=step_train.properties.ModelArtifacts.S3ModelArtifacts, destination="/opt/ml/processing/model" ), ProcessingInput( source=step_process.properties.ProcessingOutputConfig.Outputs[ "test" ].S3Output.S3Uri, destination="/opt/ml/processing/test" ) ], outputs=[ ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"), ], code="abalone/evaluation.py", property_files=[evaluation_report], )

第 6 步:定义 Batch 转换的创建模型步骤

本部分将介绍如何通过训练步骤的输出创建 SageMaker 模型。此模型用于对新数据集进行批量转换。此步骤将传递到条件步骤,并且仅在条件步骤计算为true.

定义批转换的创建模型步骤

  1. 创建 SageMaker 模型。传递S3ModelArtifacts属性step_train训练步骤。

    from sagemaker.model import Model model = Model( image_uri=image_uri, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, sagemaker_session=sagemaker_session, role=role, )
  2. 为 SageMaker 模型定义模型输入。

    from sagemaker.inputs import CreateModelInput inputs = CreateModelInput( instance_type="ml.m5.large", accelerator_type="ml.eia1.medium", )
  3. 创建您的CreateModelStep使用CreateModelInput和您定义的 SageMaker 模型实例。

    from sagemaker.workflow.steps import CreateModelStep step_create_model = CreateModelStep( name="AbaloneCreateModel", model=model, inputs=inputs, )

步骤 7:定义用于执行 Batch 转换的转换步骤

此节演示如何创建TransformStep训练模型后对数据集执行批量转换。此步骤将传递到条件步骤,并且仅在条件步骤计算为true.

定义用于执行批处理转换的转换步骤

  1. 使用适当的计算实例类型、实例计数和所需输出 Amazon S3 存储桶 URI 创建转换器实例。传递ModelName属性step_create_model CreateModel步骤。

    from sagemaker.transformer import Transformer transformer = Transformer( model_name=step_create_model.properties.ModelName, instance_type="ml.m5.xlarge", instance_count=1, output_path=f"s3://{default_bucket}/AbaloneTransform" )
  2. 创建TransformStep使用您定义的变换器实例和batch_data管道参数。

    from sagemaker.inputs import TransformInput from sagemaker.workflow.steps import TransformStep step_transform = TransformStep( name="AbaloneTransform", transformer=transformer, inputs=TransformInput(data=batch_data) )

步骤 8:定义注册表模型步骤以创建模型包

此节演示如何在 SaaS 后端内构造RegisterModel. 执行的结果RegisterModel是模型包 模型包是一个可重复使用的模型工件抽象,它包含了推断所需的所有成分。它由一个推断规范组成,该规范定义了要与可选的模型权重位置一起使用的推理图像。模型包组是模型包的集合。您可以使用ModelPackageGroup为 SageMaker 管道添加一个新版本和模型包,以便为每次管道执行添加到组中。有关模型注册表的更多信息,请参阅使用模型注册表注册表部署模型.

此步骤将传递到条件步骤,并且仅在条件步骤计算为true.

定义用于创建模型包的注册模型步骤

  • 构造RegisterModel步骤,使用您用于训练步骤的估计器实例。传递S3ModelArtifacts属性step_train训练步骤并指定ModelPackageGroup. SageMaker 管道将创建此ModelPackageGroup为您提供。

    from sagemaker.model_metrics import MetricsSource, ModelMetrics from sagemaker.workflow.step_collections import RegisterModel model_metrics = ModelMetrics( model_statistics=MetricsSource( s3_uri="{}/evaluation.json".format( step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"] ), content_type="application/json" ) ) step_register = RegisterModel( name="AbaloneRegisterModel", estimator=xgb_train, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, content_types=["text/csv"], response_types=["text/csv"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name=model_package_group_name, approval_status=model_approval_status, model_metrics=model_metrics )

步骤 9:定义条件步骤以验证模型精度

AConditionStep允许 SageMaker 管道根据步骤属性的条件支持工作流 DAG 中的条件执行。在这种情况下,只有当模型评估步骤确定的模型精度超过所需值时,才需要注册模型包。如果精度超过所需值,管道还会创建 SageMaker 模型并对数据集运行批量转换。此部分说明如何定义条件步骤。

定义条件步骤以验证模型精度

  1. 定义ConditionLessThanOrEqualTo条件使用模型评估处理步骤输出中找到的精度值,step_eval. 使用您在处理步骤中索引的属性文件和平方误差值的相应 JSONPath 获取此输出,"mse".

    from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo from sagemaker.workflow.condition_step import ( ConditionStep, JsonGet, ) cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step=step_eval, property_file=evaluation_report, json_path="regression_metrics.mse.value" ), right=6.0 )
  2. 构造ConditionStep. 传递ConditionEquals条件,然后在条件通过时将模型包注册和批处理变换步骤设置为后续步骤。

    step_cond = ConditionStep( name="AbaloneMSECond", conditions=[cond_lte], if_steps=[step_register, step_create_model, step_transform], else_steps=[], )

步骤 10:创建管道

现在,您已经创建了所有步骤,请将它们合并到一个管道中。

创建管道

  1. 为管道定义以下内容:nameparameters, 和steps. 名称在(account, region)对。

    注意

    步骤只能在管道的步骤列表或条件步骤的 if/else 步骤列表中显示一次。它不能出现在两者中。

    from sagemaker.workflow.pipeline import Pipeline pipeline_name = f"AbalonePipeline" pipeline = Pipeline( name=pipeline_name, parameters=[ processing_instance_type, processing_instance_count, training_instance_type, model_approval_status, input_data, batch_data, ], steps=[step_process, step_train, step_eval, step_cond], )
  2. (可选)检查 JSON 管道定义以确保其格式正确。

    import json json.loads(pipeline.definition())

此管道定义已准备好提交给 SageMaker。在下一教程中,您将此管道提交给 SageMaker 并开始执行。

下一步: 运行管道