修改 TensorFlow 训练脚本 - Amazon SageMaker
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

修改 TensorFlow 训练脚本

在本节中,您将了解如何修改 TensorFlow 用于配置 SageMaker 分布式模型 parallel 库以进行自动分区和手动分区的训练脚本。这些示例选择还包括与 Horovod 集成的用于混合模型和数据并行性的示例。

支持的 TensorFlow 版本 SageMaker 和 SageMaker 分布式模型 parallel 库
TensorFlow 版本 SageMaker 分布式模型 parallel 库版本 smdistributed-modelparallel集成映像 URI 深度学习容器发行说明
v2.6.0 smdistributed-modelparallel==v1.4.0 763104351884.dkr.ecr.<region>.amazonaws.com/tensorflow-training:2.6.0-gpu-py38-cu112-ubuntu20.04 Py1.2-tf-2.6.8-Py38
v2.5.1 smdistributed-modelparallel==v1.4.0 763104351884.dkr.ecr.<region>.amazonaws.com/tensorflow-training:2.5.1-gpu-py37-cu112-ubuntu18.04 Py1.2-tf-2.1.1-Py37

要查看库的最新更新,请参阅SageMaker 分布式模型并行发行说中的SageMaker Python 开发工具包.

在继续修改之前 TensorFlow 训练脚本,建议您查看不支持的框架功能.

中列出了您必须对训练脚本进行的使用库所需的修改TensorFlow.

要了解如何修改训练脚本以将混合模型和数据并行与 Horovod 结合使用,请参阅TensorFlow 与 Horood 实现混合模型和数据并行性.

如果你想使用手动分区,还可以查看使用 TensorFlow 手动分区.

提示

适用于 end-to-end 演示如何使用 TensorFlow 使用 SageMaker 分布式模型 parallel 库,请参阅TensorFlow 示例.

以下主题展示了训练脚本的示例,您可以使用这些脚本配置 SageMaker 的模型 parallel 库以进行自动分区和手动分区 TensorFlow 模型。

注意

默认情况下,将启用自动分区。除非另行指定,否则示例脚本使用自动分区。

TensorFlow

需要更改以下训练脚本才能运行 TensorFlow 使用 SageMaker 的分布式模型 parallel 库的模型:

  1. 使用导入并初始化库smp.init().

  2. 通过从继承来定义 Keras 模型smp.DistributedModel而不是 Keras 模型类。从中的调用方法返回模型输出smp.DistributedModel对象。请注意,从调用方法返回的任何张量都将在模型并行设备之间广播,从而产生通信开销,因此不应返回调用方法之外不需要的任何张量(例如中间激活)。

  3. Setdrop_remainder=Truetf.Dataset.batch()方法。这是为了确保批量大小始终被微批次数整除。

  4. 使用为数据管道中的随机操作种子smp.dp_rank()例如,shuffle(ds, seed=smp.dp_rank())以确保包含不同模型分区的 GPU 之间的数据样本的一致性。

  5. 将向前和向后逻辑放在一个步骤函数中然后用来装饰它smp.step.

  6. 使用跨微批量对输出执行后处理StepOutput方法,例如reduce_mean. 这些区域有:smp.step函数必须具有取决于输出的返回值smp.DistributedModel.

  7. 如果有评估步骤,同样将向前逻辑放在 smp.step 装饰函数中,然后使用后处理输出StepOutputAPI.

要了解 SageMaker 的分布式模型 parallel 库 API 的更多信息,请参阅API 文档.

以下 Python 脚本是进行更改后训练脚本的示例。

import tensorflow as tf # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return the model output model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()

如果你准备完了训练脚本,请继续第 2 步:使用 SageMaker Python 开发工具包启动训练作业. 如果您想运行混合模型和数据并 parallel 训练作业,请继续执行下一节。

TensorFlow 与 Horood 实现混合模型和数据并行性

您可以使用 SageMaker Horovod 的分布式模型 parallel 库用于混合模型和数据并行性。要阅读有关库如何拆分混合并行模型的更多信息,请参阅鉴于配置参数,库如何拆分模型.

在此步骤中,我们重点介绍如何修改训练脚本以适应 SageMaker 分布式模型 parallel 库。

正确设置训练脚本以选择您将在其中设置的混合并行度配置第 2 步:使用 SageMaker Python 开发工具包启动训练作业,使用库的帮助函数,smp.dp_rank()smp.mp_rank(),它分别自动检测数据 parallel 等级和模型 parallel 排名。

要查找库支持的所有 MPI 基元,请参阅MPI 基础知识中的 SageMaker Python 开发工具包文档。

脚本中需要进行的更改是:

  • 添加hvd.allreduce

  • 根据 Horovod 的要求,在第一批之后广播变量

  • 在数据管道中使用播种进行洗牌和/或分片操作smp.dp_rank().

注意

当你使用 Horovod 时,你不能直接致电hvd.init在训练脚本中。相反,您必须设置"horovod"True中的 SageMaker Python 开发工具包modelparallel中的参数第 2 步:使用 SageMaker Python 开发工具包启动训练作业. 这允许库根据模型分区的设备分配来内部初始化 Horood。调用hvd.init()直接在训练脚本中可能会导致问题。

注意

使用hvd.DistributedOptimizer直接在训练脚本中的 API 可能会导致训练性能和速度较差,因为 API 隐含地放置AllReduce内操作smp.step. 我们建议您直接调用 Horovod 将模型 parallel 库与 Horovod 一起使用hvd.allreduce打电话后accumulate()要么reduce_mean()关于从返回的渐变smp.step,如以下示例所示。

要了解 SageMaker 的分布式模型 parallel 库 API 的更多信息,请参阅API 文档.

import tensorflow as tf import horovod.tensorflow as hvd # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: Seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return model outputs model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels, first_batch): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches # Horovod: Allreduce the accumulated gradients gradients = [hvd.allreduce(g.accumulate()) for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # Horovod: Broadcast the variables after first batch if first_batch: hvd.broadcast_variables(model.variables, root_rank=0) hvd.broadcast_variables(optimizer.variables(), root_rank=0) # smdistributed: Merge predictions across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for batch, (images, labels) in enumerate(train_ds): loss = train_step(images, labels, tf.constant(batch == 0))

使用 TensorFlow 手动分区

使用smp.partition上下文管理器将操作放在特定的分区中。任何未放入任何操作smp.partition上下文放置在default_partition. 要了解 SageMaker 的分布式模型 parallel 库 API 的更多信息,请参阅API 文档.

import tensorflow as tf # smdistributed: Import TF2.x API. import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches. train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API. class MyModel(smp.DistributedModel): def __init__(self): # define layers def call(self, x): with smp.partition(0): x = self.layer0(x) with smp.partition(1): return self.layer1(x) model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()

不支持的框架功能

以下 TensorFlow 库不支持功能:

  • tf.GradientTape()当前不支持。您可以使用Optimizer.get_gradients()要么Optimizer.compute_gradients()而是计算渐变。

  • 这些区域有:tf.train.Checkpoint.restore()当前不支持 API。对于检查点,请使用smp.CheckpointManager相反,它提供了相同的 API 和功能。请注意,检查点还原smp.CheckpointManager应该在第一步之后进行。