本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
修改TensorFlow训练脚本
在本节中,您将学习如何修改TensorFlow训练脚本以配置用于自动分区和手动分区的SageMaker模型并行度库。这部分示例还包括一个与 Horovod 集成的示例,用于混合模型和数据并行。
注意
要查找该库支持哪些TensorFlow版本,请参阅支持的框架和 Amazon Web Services 区域。
中列出了为使用该库而必须对训练脚本做出的必要修改自动拆分为 TensorFlow。
要了解如何修改训练脚本以在 Horovod 中使用混合模型和数据并行性,请参阅。使用TensorFlow和 Horovod 自动拆分,实现混合模型和数据并行性
如果您想使用手动分区,也请查看使用手动拆分 TensorFlow。
提示
有关演示如何在SageMaker模型并行库中使用TensorFlow训练脚本的端到端笔记本示例,请参阅。TensorFlow例子
以下主题显示了训练脚本的示例,您可以使用这些脚本为自动分区和手动分区模型配置SageMaker模型并行度库。TensorFlow
注意
默认情况下,自动分区处于启用状态。除非另有说明,否则示例脚本使用自动分区。
自动拆分为 TensorFlow
要使用模型并行度库运行TensorFlow模型,需要对训练脚SageMaker本进行以下更改:
-
使用导入和初始化库
smp.init()
。 -
通过继承自
smp.DistributedModel
而不是 Keras 模型类来定义 Keras 模型。返回 smp.DistributedModel
对象的调用方法的模型输出。请注意,从调用方法返回的任何张量都将在模型并行设备上广播,从而产生通信开销,因此不应返回调用方法之外不需要的任何张量(例如中间激活)。 -
drop_remainder=True
在tf.Dataset.batch()
方法中设置。这是为了确保批量大小始终可以除以微批次数。 -
例如,使用
smp.dp_rank()
为数据管道中的随机操作设置种子,shuffle(ds, seed=smp.dp_rank())
以确保数据样本在包含不同模型分区的 GPU 之间的一致性。 -
将向前和向后逻辑放在一个步骤函数中,并用它进行装饰
smp.step
。 -
使用以下
StepOutput
方法对跨微批次的输出进行后处理。 reduce_mean
该smp.step
函数的返回值必须取决于的输出 smp.DistributedModel
。 -
如果存在评估步骤,则同样将正向逻辑放置在
smp.step
-dearded 函数中,然后使用 API 对输出进行StepOutput
后处理。
要了解有关模型并行度库 API SageMaker 的更多信息,请参阅 AP I
以下 Python 脚本是更改后的训练脚本示例。
import tensorflow as tf # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return the model output model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()
如果您已准备完训练脚本,请继续第 2 步:使用 SageMaker Python 软件开发工具包启动训练作业。如果您想运行混合模型和数据并行训练作业,请继续阅读下一节。
使用TensorFlow和 Horovod 自动拆分,实现混合模型和数据并行性
您可以将SageMaker模型并行库与 Horovod 一起使用,实现混合模型和数据并行。要详细了解该库如何拆分模型以实现混合并行,请参阅。流水线并行性(适用于PyTorch和)TensorFlow
在本步骤中,我们将重点介绍如何修改训练脚本以调整SageMaker模型并行度库。
要正确设置训练脚本以获取您将在其中设置的混合并行配置,请使用库的辅助函数smp.dp_rank()
和smp.mp_rank()
,它们分别自动检测数据并行等级和模型并行等级。第 2 步:使用 SageMaker Python 软件开发工具包启动训练作业
要查找该库支持的所有 MPI 基元,请参阅 P SageMaker ython SDK 文档中的 MPI 基础知识
脚本中需要的更改是:
-
正在添加
hvd.allreduce
-
按照 Horovod 的要求,在第一批之后广播变量
-
为数据管道中的洗牌和/或分片操作提供种子。
smp.dp_rank()
注意
使用 Horovod 时,不得直接调用训练hvd.init
脚本。相反,你必须在中的 SageMaker Python SDK modelparallel
参数中设置为"horovod"
第 2 步:使用 SageMaker Python 软件开发工具包启动训练作业。True
这允许库根据模型分区的设备分配在内部初始化 Horovod。hvd.init()
直接在训练脚本中调用可能会导致问题。
注意
直接在训练脚本中使用 hvd.DistributedOptimizer
API 可能会导致训练性能和速度变差,因为 API 会将AllReduce
操作隐含在里面smp.step
。我们建议您在 Horovod 中使用模型并行度库,方法hvd.allreduce
是在调用后直接调用accumulate()
或reduce_mean()
根据返回的梯度进行调用smp.step
,如以下示例所示。
要了解有关模型并行度库 API SageMaker 的更多信息,请参阅 AP I
import tensorflow as tf import horovod.tensorflow as hvd # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: Seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return model outputs model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels, first_batch): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches # Horovod: AllReduce the accumulated gradients gradients = [hvd.allreduce(g.accumulate()) for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # Horovod: Broadcast the variables after first batch if first_batch: hvd.broadcast_variables(model.variables, root_rank=0) hvd.broadcast_variables(optimizer.variables(), root_rank=0) # smdistributed: Merge predictions across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for batch, (images, labels) in enumerate(train_ds): loss = train_step(images, labels, tf.constant(batch == 0))
使用手动拆分 TensorFlow
使用smp.partition
上下文管理器在特定分区中放置操作。任何未置于任何smp.partition
上下文中的操作都将置于中default_partition
。要了解有关模型并行度库 API SageMaker 的更多信息,请参阅 AP I
import tensorflow as tf # smdistributed: Import TF2.x API. import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches. train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API. class MyModel(smp.DistributedModel): def __init__(self): # define layers def call(self, x): with smp.partition(0): x = self.layer0(x) with smp.partition(1): return self.layer1(x) model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()
不支持的框架功能
该库不支持以下TensorFlow功能:
-
tf.GradientTape()
目前不支持。您可以Optimizer.compute_gradients()
改用Optimizer.get_gradients()
或来计算梯度。 -
目前不支持
tf.train.Checkpoint.restore()
该 API。对于检查点,请smp.CheckpointManager
改用,它提供相同的 API 和功能。请注意,检查点恢复smp.CheckpointManager
应在第一步之后进行。