本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
缓存管道步骤
使用步骤签名缓存时,Pipelines 会尝试查找之前运行的当前工作流工序的某些属性值相同。如果找到,Pipelines 将传播上次运行的输出,而不是重新计算步骤。所检查的属性特定于步骤类型,并在 按管道步骤类型划分的默认缓存键属性 中列出。
您必须选择步骤缓存 - 默认情况下,它处于关闭状态。开启步骤缓存时,还必须定义超时。此超时定义了之前运行可以持续多久才能继续作为重复使用的候选项。
步骤缓存仅考虑成功的运行 - 它从不重复使用失败的运行。如果在超时时间内成功运行了多次,Pipelines 会将结果用于最近一次成功运行。如果在超时时间内没有成功的运行匹配项,Pipelines 将重新运行该步骤。如果执行程序发现之前的运行符合条件但仍在进行中,则两个步骤都将继续运行,如果成功运行,则会更新缓存。
步骤缓存仅适用于单个管道,因此即使存在步骤签名匹配项,也无法重复使用另一个管道中的步骤。
步骤缓存可用于以下步骤类型:
开启步骤缓存
要开启步骤缓存,必须向步骤定义中添加一个 CacheConfig
属性。
CacheConfig
属性在管道定义文件中采用以下格式:
{ "CacheConfig": { "Enabled": false, "ExpireAfter": "<time>" } }
Enabled
字段指示是否为特定步骤开启了缓存。您可以将该字段设置为true
,这会提示您 SageMaker 尝试查找具有相同属性的步骤的上一次运行。或者,您可以将该字段设置为false
,表示每次管道运行时都 SageMaker 要运行该步骤。 ExpireAfter
是 ISO8601 持续时间ExpireAfter
持续时间可以是年、月、周、日、小时或分钟值。每个值都由一个数字和一个表示持续时间单位的字母组成。例如:
-
“30d”= 30 天
-
“5y”= 5 年
-
“T16m”= 16 分钟
-
“30dT5h”= 30 天零 5 小时。
以下讨论描述了使用 Amaz SageMaker on Python SDK 为新的或预先存在的管道开启缓存的过程。
为新管道开启缓存
对于新管道,请通过 enable_caching=True
初始化 CacheConfig
实例,并将其作为管道步骤的输入。以下示例为训练步骤开启缓存,并设置 1 小时的超时时间:
from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.workflow.steps import CacheConfig cache_config = CacheConfig(enable_caching=True, expire_after="PT1H") estimator = Estimator(..., sagemaker_session=PipelineSession()) step_train = TrainingStep( name="TrainAbaloneModel", step_args=estimator.fit(inputs=inputs), cache_config=cache_config )
为预先存在的管道开启缓存
要为预先存在、已经定义的管道开启缓存,请打开该步骤的 enable_caching
属性,然后将 expire_after
设置为超时值。最后,使用 pipeline.upsert()
或 pipeline.update()
更新管道。再次运行后,以下代码示例将为训练步骤开启缓存,超时时间为 1 小时:
from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.workflow.steps import CacheConfig from sagemaker.workflow.pipeline import Pipeline cache_config = CacheConfig(enable_caching=True, expire_after="PT1H") estimator = Estimator(..., sagemaker_session=PipelineSession()) step_train = TrainingStep( name="TrainAbaloneModel", step_args=estimator.fit(inputs=inputs), cache_config=cache_config ) # define pipeline pipeline = Pipeline( steps=[step_train] ) # additional step for existing pipelines pipeline.update() # or, call upsert() to update the pipeline # pipeline.upsert()
或者,在定义(预先存在的)管道之后更新缓存配置,这样就可以连续运行一段代码。以下代码示例演示了此方法:
# turn on caching with timeout period of one hour pipeline.steps[0].cache_config.enable_caching = True pipeline.steps[0].cache_config.expire_after = "PT1H" # additional step for existing pipelines pipeline.update() # or, call upsert() to update the pipeline # pipeline.upsert()
有关更详细的代码示例以及有关 Python SDK 参数如何影响缓存的讨论,请参阅 Amaz SageMaker on Python SDK 文档中的缓存配置
关闭步骤缓存
如果您更改未在 按管道步骤类型划分的默认缓存键属性 中列出的管道步骤类型的任何属性,则不会重新运行该管道步骤。不过,您可能会决定无论如何都要重新运行该管道步骤。在这种情况下,您需要关闭步骤缓存。
要关闭步骤缓存,请将步骤定义的 CacheConfig
属性中的 Enabled
属性设置为 false
,如以下代码片段所示:
{ "CacheConfig": { "Enabled": false, "ExpireAfter": "<time>" } }
请注意,当 Enabled
为 false
时,将忽略 ExpireAfter
属性。
要使用 Amaz SageMaker on Python 关闭工作流工序的缓存SDK,请定义工作流工序的管道,关闭该enable_caching
属性,然后更新管道。
再次运行后,以下代码示例会关闭训练步骤的缓存:
from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.workflow.steps import CacheConfig from sagemaker.workflow.pipeline import Pipeline cache_config = CacheConfig(enable_caching=False, expire_after="PT1H") estimator = Estimator(..., sagemaker_session=PipelineSession()) step_train = TrainingStep( name="TrainAbaloneModel", step_args=estimator.fit(inputs=inputs), cache_config=cache_config ) # define pipeline pipeline = Pipeline( steps=[step_train] ) # update the pipeline pipeline.update() # or, call upsert() to update the pipeline # pipeline.upsert()
或者,在定义管道之后关闭 enable_caching
属性,这样就可以连续运行一段代码。以下代码示例演示了此解决方案:
# turn off caching for the training step pipeline.steps[0].cache_config.enable_caching = False # update the pipeline pipeline.update() # or, call upsert() to update the pipeline # pipeline.upsert()
有关更详细的代码示例以及有关 Python SDK 参数如何影响缓存的讨论,请参阅 Amaz SageMaker on Python SDK 文档中的缓存配置
按管道步骤类型划分的默认缓存键属性
在决定是重用上一个工作流步骤还是重新运行该步骤时,Pipelines 会检查某些属性是否已更改。如果这组属性与超时时间段内所有之前的运行不同,则将再次运行该步骤。这些属性包括输入构件、应用程序或算法规范以及环境变量。
以下列表显示了每种管道步骤类型和属性,这些属性如果发生更改,则会启动该步骤的重新运行。有关使用哪些 Python SDK 参数创建以下属性的更多信息,请参阅 Amaz SageMaker on Python SDK 文档中的缓存配置
-
AppSpecification
-
环境
-
ProcessingInputs。此属性包含有关预处理脚本的信息。
-
AlgorithmSpecification
-
CheckpointConfig
-
DebugHookConfig
-
DebugRuleConfigurations
-
环境
-
HyperParameters
-
InputDataConfig。此属性包含有关训练脚本的信息。
-
HyperParameterTuningJobConfig
-
TrainingJobDefinition。此属性由多个子属性组成,并非所有子属性都会导致该步骤重新运行。可能导致重新运行(如果已更改)的子属性如下:
-
AlgorithmSpecification
-
HyperParameterRanges
-
InputDataConfig
-
StaticHyperParameters
-
TuningObjective
-
-
TrainingJobDefinitions
-
A utoMLJob Config。此属性由多个子属性组成,并非所有子属性都会导致该步骤重新运行。可能导致重新运行(如果已更改)的子属性如下:
-
CompletionCriteria
-
CandidateGenerationConfig
-
DataSplitConfig
-
Mode
-
-
一个utoMLJob目标
InputDataConfig
ProblemType
-
DataProcessing
-
环境
-
ModelName
-
TransformInput
-
ClarifyCheckConfig
-
CheckJobConfig
-
SkipCheck
-
RegisterNewBaseline
-
ModelPackageGroupName
-
SuppliedBaselineConstraints
-
QualityCheckConfig
-
CheckJobConfig
-
SkipCheck
-
RegisterNewBaseline
-
ModelPackageGroupName
-
SuppliedBaselineConstraints
-
SuppliedBaselineStatistics
-
ClusterId
-
StepConfig
缓存数据访问控制
当 SageMaker 管道运行时,它会缓存与管道启动的 SageMaker作业相关的参数和元数据,并将其保存以供后续运行中重复使用。除了缓存的管道步骤外,还可以通过各种来源访问此元数据,包括以下类型:
-
Describe*Job
请求 -
CloudWatch 日志
-
CloudWatch 大事记
-
CloudWatch 指标
-
SageMaker 搜寻
请注意,对列表中每个数据源的访问权限均由其自己的一组IAM权限控制。删除特定角色对一个数据源的访问权限不会影响对其他数据源的访问级别。例如,账户管理员可能会移除来自来电者角色的Describe*Job
请求IAM权限。虽然调用方无法再发出 Describe*Job
请求,但只要他们有权运行管道,就仍然可以从使用缓存步骤的管道运行中检索元数据。如果账户管理员想要从特定 SageMaker 任务中完全移除对元数据的访问权限,则需要移除提供数据访问权限的每项相关服务的权限。