创建带有@step装饰函数的管道 - Amazon SageMaker
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

创建带有@step装饰函数的管道

您可以使用@step装饰器将 Python 函数转换为工作流步骤,在这些函数之间创建依赖关系以创建流水线图(或有向无环图 (DAG)),并将该图的叶节点作为步骤列表传递给管道,从而创建管道。以下各节通过示例详细说明了此过程。

将函数转换为步骤

要使用@step装饰器创建步骤,请使用注释该函数。@step以下示例显示了一个预处理@step数据的经过装饰的函数。

from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe step_process_result = preprocess(raw_data)

当您调用@step装饰函数时,会 SageMaker 返回一个DelayedReturn实例,而不是运行该函数。DelayedReturn实例是该函数实际返回的代理。该DelayedReturn实例可以作为参数传递给另一个函数,也可以作为步骤直接传递给管道实例。有关该DelayedReturn课程的信息,请参阅 sagemaker.workflow.function_step。 DelayedReturn

当您在两个步骤之间创建依赖关系时,将在工作流图中的步骤之间创建连接。以下各节介绍了在工作流步骤之间创建依赖关系的多种方法。

将一个函数的DelayedReturn输出作为输入传递给另一个函数会自动在管道 DAG 中创建数据依赖关系。在以下示例中,将preprocess函数的DelayedReturn输出传递给函数会在trainpreprocess和之间创建依赖关系train

from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe @step def train(training_data): ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train(step_process_result)

前面的示例定义了一个用装饰的训练函数@step。调用此函数时,它将接收预处理管道步骤的DelayedReturn输出作为输入。调用训练函数会返回另一个DelayedReturn实例。此实例保存有关该函数中定义的所有先前步骤(即本示例中的步骤)的信息,这些preprocess步骤构成了管道 DAG。

在前面的示例中,该preprocess函数返回单个值。有关列表或元组等更复杂的返回类型,请参阅。限制

在前面的示例中,该train函数接收了的DelayedReturn输出preprocess并创建了一个依赖关系。如果要在不传递上一步输出的情况下显式定义依赖关系,请将该add_depends_on函数与步骤一起使用。您可以使用该get_step()函数从其DelayedReturn实例中检索底层步骤,然后使用依赖项作为输入调用 add_depends_on _on。要查看get_step()函数定义,请参阅 sagemaker.workflow.step_outputs .get_step。以下示例说明如何train使用preprocessget_step()和在和之间创建依赖关系add_depends_on()

from sagemaker.workflow.step_outputs import get_step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... processed_data = .. return s3.upload(processed_data) @step def train(): training_data = s3.download(....) ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train() get_step(step_train_result).add_depends_on([step_process_result])

您可以创建一个包含@step装饰步骤和传统工作流步骤的管道,并在它们之间传递数据。例如,您可以使用ProcessingStep处理数据并将其结果传递给经过@step装饰的训练函数。在以下示例中,经过@step装饰的训练步骤引用了处理步骤的输出。

# Define processing step from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep sklearn_processor = SKLearnProcessor( framework_version='1.2-1', role='arn:aws:iam::123456789012:role/SagemakerExecutionRole', instance_type='ml.m5.large', instance_count='1', ) inputs = [ ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] process_step = ProcessingStep( name="MyProcessStep", step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'), )
# Define a @step-decorated train step which references the # output of a processing step @step def train(train_data_path, test_data_path): ... return trained_model step_train_result = train( process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, )

@step装饰过ConditionStep的台阶一起使用

SageMaker Pipelines 支持一个ConditionStep类,该类可以评估前面步骤的结果,以决定在管道中执行什么操作。你也可以ConditionStep@step装饰过的台阶来使用。要将任何@step装饰步骤的输出与一起使用ConditionStep,请将该步骤的输出作为参数输入。ConditionStep在以下示例中,条件步骤接收经过@step装饰的模型评估步骤的输出。

# Define steps @step(name="evaluate") def evaluate_model(): # code to evaluate the model return { "rmse":rmse_value } @step(name="register") def register_model(): # code to register the model ...
# Define ConditionStep from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo from sagemaker.workflow.fail_step import FailStep conditionally_register = ConditionStep( name="conditional_register", conditions=[ ConditionGreaterThanOrEqualTo( # Output of the evaluate step must be json serializable left=evaluate_model()["rmse"], # right=5, ) ], if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")], else_steps=[register_model()], )

使用步骤DelayedReturn输出定义管道

无论你是否使用@step装饰器,你定义管道的方式都是一样的。当您将DelayedReturn实例传递给管道时,您无需传递完整的步骤列表即可构建管道。SDK 会根据您定义的依赖关系自动推断出之前的步骤。您传递给管道的Step对象或DelayedReturn对象的所有先前步骤都包含在管道图中。在以下示例中,管道接收train函数的DelayedReturn对象。 SageMaker 将该preprocess步骤作为之前的train步骤添加到管道图中。

from sagemaker.workflow.pipeline import Pipeline pipeline = Pipeline( name="<pipeline-name>", steps=[step_train_result], sagemaker_session=<sagemaker-session>, )

如果步骤之间没有数据或自定义依赖关系,并且您并行运行多个步骤,则管道图具有多个叶节点。将列表中的所有这些叶节点传递给管道定义中的steps参数,如以下示例所示:

@step def process1(): ... return data @step def process2(): ... return data step_process1_result = process1() step_process2_result = process2() pipeline = Pipeline( name="<pipeline-name>", steps=[step_process1_result, step_process2_result], sagemaker_session=sagemaker-session, )

当管道运行时,两个步骤并行运行。

您只能将图表的叶节点传递给管道,因为叶节点包含有关通过数据或自定义依赖关系定义的所有先前步骤的信息。当它编译管道时, SageMaker 还会推断出构成管道图的所有后续步骤,并将每个步骤作为单独的步骤添加到管道中。

创建管道

通过调用创建管道pipeline.create(),如以下代码段所示。有关详细信息,请参阅 sag emaker.workflow create() .pipeline.pipeline.create。

role = "pipeline-role" pipeline.create(role)

当您调用时pipeline.create(), SageMaker 会编译所有定义为管道实例一部分的步骤。 SageMaker 将序列化函数、参数和所有其他与步骤相关的项目上传到 Amazon S3。

数据根据以下结构驻留在 S3 存储桶中:

s3_root_uri/ pipeline_name/ sm_rf_user_ws/ workspace.zip # archive of the current working directory (workdir) step_name/ timestamp/ arguments/ # serialized function arguments function/ # serialized function pre_train_dependencies/ # any dependencies and pre_execution scripts provided for the step execution_id/ step_name/ results # returned output from the serialized function including the model

s3_root_uri在 SageMaker 配置文件中定义,适用于整个管道。如果未定义,则使用默认 SageMaker 存储桶。

注意

每次 SageMaker 编译管道时,都会将步骤的序列化函数、参数和依赖项 SageMaker 保存在带有当前时间戳的文件夹中。每次运行pipeline.create()pipeline.update()pipeline.upsert()或时都会发生这种情况pipeline.definition()