

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 `@step` 装饰函数创建管道
<a name="pipelines-step-decorator-create-pipeline"></a>

您可以使用 `@step` 装饰器将 Python 函数转换为管道步骤，在这些函数之间创建依赖关系以创建管道图（或有向无环图 (DAG)），并将该图的叶节点作为步骤列表传递给管道，从而创建管道。下文将结合示例详细解释这一程序。

**Topics**
+ [将函数转换为步骤](#pipelines-step-decorator-run-pipeline-convert)
+ [在各步骤之间建立依赖关系](#pipelines-step-decorator-run-pipeline-link)
+ [使用 `ConditionStep` 和 `@step` 装饰步骤](#pipelines-step-decorator-condition)
+ [使用步骤的 `DelayedReturn` 输出定义管道](#pipelines-step-define-delayed)
+ [创建管道](#pipelines-step-decorator-pipeline-create)

## 将函数转换为步骤
<a name="pipelines-step-decorator-run-pipeline-convert"></a>

要使用 `@step` 装饰器创建一个步骤，请使用 `@step` 对功能进行注释。下面的示例展示了一个预处理数据的 `@step` 装饰功能。

```
from sagemaker.workflow.function_step import step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    return procesed_dataframe
    
step_process_result = preprocess(raw_data)
```

当你调用`@step`装饰过的函数时， SageMaker AI 会返回一个`DelayedReturn`实例，而不是运行该函数。`DelayedReturn` 实例是该功能实际返回值的代理。`DelayedReturn` 实例可以作为参数传递给其他函数，也可以作为步骤直接传递给管道实例。有关该`DelayedReturn`课程的信息，请参阅 [sagemaker.workflow.function\_step。 DelayedReturn](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.function_step.DelayedReturn)。

## 在各步骤之间建立依赖关系
<a name="pipelines-step-decorator-run-pipeline-link"></a>

在两个步骤之间创建依赖关系时，就在管道图中的步骤之间创建了连接。下文将介绍在管道步骤之间创建依赖关系的多种方法。

### 通过输入参数实现数据依赖
<a name="pipelines-step-decorator-run-pipeline-link-interstep"></a>

将一个函数的 `DelayedReturn` 输出作为另一个函数的输入，会自动在管道 DAG 中创建数据依赖关系。在下面的示例中，将 `preprocess` 函数的 `DelayedReturn` 输出传递给 `train` 函数会在 `preprocess` 和 `train` 之间产生依赖关系。

```
from sagemaker.workflow.function_step import step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    return procesed_dataframe

@step
def train(training_data):
    ...
    return trained_model

step_process_result = preprocess(raw_data)    
step_train_result = train(step_process_result)
```

上一个示例定义了一个训练函数，该函数用 `@step` 修饰。调用该函数时，它会接收预处理管道步骤的 `DelayedReturn` 输出作为输入。调用训练函数会返回另一个 `DelayedReturn` 实例。该实例保存了该函数中定义的所有先前步骤的信息（即本例中的 `preprocess` 步骤），这些步骤构成了管道 DAG。

在上一个示例中，`preprocess` 函数返回一个值。有关列表或元组等更复杂的返回类型，请参阅 [限制](pipelines-step-decorator-limit.md)。

### 定义自定义依赖关系
<a name="pipelines-step-decorator-run-pipeline-link-custom"></a>

在上一个示例中，`train` 函数接收了 `preprocess` 的 `DelayedReturn` 输出，并创建了一个依赖关系。如果想明确定义依赖关系，而不传递前一步的输出结果，请在步骤中使用 `add_depends_on` 函数。您可以使用 `get_step()` 函数从其 `DelayedReturn` 实例中获取基础步骤，然后将依赖关系作为输入调用 `add_depends_on`\_on。要查看 `get_step()` 函数定义，请参阅 [sagemaker.workflow.step\_outputs.get\_step](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.step_outputs.get_step)。下面的示例演示了如何使用 `get_step()` 和 `add_depends_on()` 在 `preprocess` 和 `train` 之间创建依赖关系。

```
from sagemaker.workflow.step_outputs import get_step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    processed_data = ..
    return s3.upload(processed_data)

@step
def train():
    training_data = s3.download(....)
    ...
    return trained_model

step_process_result = preprocess(raw_data)    
step_train_result = train()

get_step(step_train_result).add_depends_on([step_process_result])
```

### 将数据从 `@step` 装饰函数传递到传统管道步骤
<a name="pipelines-step-decorator-run-pipeline-link-pass"></a>

您可以创建一个包含 `@step` 装饰步骤和传统管道步骤的管道，并在两者之间传递数据。例如，您可以使用 `ProcessingStep` 处理数据，并将处理结果传递给 `@step` 装饰的训练函数。在下面的示例中，`@step` 装饰的训练步骤引用了处理步骤的输出。

```
# Define processing step

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

sklearn_processor = SKLearnProcessor(
    framework_version='1.2-1',
    role='arn:aws:iam::123456789012:role/SagemakerExecutionRole',
    instance_type='ml.m5.large',
    instance_count='1',
)

inputs = [
    ProcessingInput(source={{input_data}}, destination="/opt/ml/processing/input"),
]
outputs = [
    ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
    ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
    ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
]

process_step = ProcessingStep(
    name="MyProcessStep",
    step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'),
)
```

```
# Define a @step-decorated train step which references the 
# output of a processing step

@step
def train(train_data_path, test_data_path):
    ...
    return trained_model
    
step_train_result = train(
   process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
   process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
)
```

## 使用 `ConditionStep` 和 `@step` 装饰步骤
<a name="pipelines-step-decorator-condition"></a>

管道支持一个 `ConditionStep` 类，该类会评估前几个步骤的结果，以决定在管道中采取什么行动。您也可以在 `@step` 装饰的步骤中使用 `ConditionStep`。要使用 `ConditionStep` 中任何 `@step` 装饰步骤的输出，请将该步骤的输出作为 `ConditionStep` 的参数输入。在下面的示例中，条件步骤接收 `@step` 装饰模型评测步骤的输出。

```
# Define steps

@step(name="evaluate")
def evaluate_model():
    # code to evaluate the model
    return {
        "rmse":rmse_value
    }
    
@step(name="register")
def register_model():
    # code to register the model
    ...
```

```
# Define ConditionStep

from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.fail_step import FailStep

conditionally_register = ConditionStep(
    name="conditional_register",
    conditions=[
        ConditionGreaterThanOrEqualTo(
            # Output of the evaluate step must be json serializable
            left=evaluate_model()["rmse"],  # 
            right=5,
        )
    ],
    if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")],
    else_steps=[register_model()],
)
```

## 使用步骤的 `DelayedReturn` 输出定义管道
<a name="pipelines-step-define-delayed"></a>

无论是否使用 `@step` 装饰器，定义管道的方式都是一样的。向管道传递 `DelayedReturn` 实例时，无需传递完整的步骤列表来构建管道。SDK 会根据您定义的依赖关系自动推导出前面的步骤。您传递给管道的所有上一步骤 `Step` 对象或相关 `DelayedReturn` 对象都包含在管道图中。在下面的示例中，管道为 `train` 函数接收 `DelayedReturn` 对象。 SageMaker AI 将该`preprocess`步骤作为上一步添加到管道图中。`train`

```
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name="{{<pipeline-name>}}",
    steps=[step_train_result],
    sagemaker_session={{<sagemaker-session>}},
)
```

如果步骤之间没有数据或自定义依赖关系，并且并行运行多个步骤，管道图就会有多个叶节点。如下例所示，将所有这些叶节点以列表形式传递给管道定义中的 `steps` 参数：

```
@step
def process1():
    ...
    return data
    
@step
def process2():
   ...
   return data
   
step_process1_result = process1()
step_process2_result = process2()

pipeline = Pipeline(
    name="{{<pipeline-name>}}",
    steps=[step_process1_result, step_process2_result],
    sagemaker_session={{sagemaker-session}},
)
```

管道运行时，两个步骤并行。

您只需将图的叶节点传递给管道，因为叶节点包含通过数据或自定义依赖关系定义的所有先前步骤的信息。在编译管道时， SageMaker AI 还会推断出构成管道图的所有后续步骤，并将每个步骤作为单独的步骤添加到管道中。

## 创建管道
<a name="pipelines-step-decorator-pipeline-create"></a>

通过调用 `pipeline.create()` 创建管道，如以下代码所示。有关 `create()` 的详细信息，请参阅 [sagemaker.workflow.pipeline.Pipeline.create](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline.Pipeline.create)。

```
role = "{{pipeline-role}}"
pipeline.create(role)
```

当你调用时`pipeline.create()`， SageMaker AI 会编译所有定义为管道实例一部分的步骤。 SageMaker AI 将序列化函数、参数和所有其他与步骤相关的项目上传到 Amazon S3。

数据按照以下结构存放在 S3 存储桶中：

```
s3_root_uri/
    {{pipeline_name}}/
        sm_rf_user_ws/
            workspace.zip  # archive of the current working directory (workdir)
        {{step_name}}/
            {{timestamp}}/
                arguments/                # serialized function arguments
                function/                 # serialized function
                pre_train_dependencies/   # any dependencies and pre_execution scripts provided for the step       
        {{execution_id}}/
            {{step_name}}/
                results     # returned output from the serialized function including the model
```

`s3_root_uri`在 SageMaker AI 配置文件中定义，适用于整个管道。如果未定义，则使用默认 SageMaker AI 存储桶。

**注意**  
每次 SageMaker AI 编译管道时， SageMaker AI 都会将步骤的序列化函数、参数和依赖项保存在带有当前时间戳的文件夹中。每次运行 `pipeline.create()`、`pipeline.update()`、`pipeline.upsert()` 或 `pipeline.definition()` 时都会出现这种情况。