修改 TensorFlow 训练脚本 - Amazon SageMaker
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

修改 TensorFlow 训练脚本

在本节中,您将学习如何修改 TensorFlow 训练脚本以配置用于自动分区和手动分区的 SageMaker模型并行度库。这些示例还包括与 Horovod 集成的示例,用于混合模型和数据并行性。

注意

要了解该库支持哪些 TensorFlow 版本,请参阅支持的框架和 Amazon Web Services 区域

使用自动拆分 TensorFlow中列出了为使用库而必须对训练脚本进行的修改。

要了解如何修改训练脚本以在 Horovod 中使用混合模型和数据并行性,请参阅 使用 TensorFlow 和 Horovod 自动拆分,实现混合模型和数据并行性

如果您要使用手动分区,另请参阅 使用手动拆分 TensorFlow

以下主题显示了训练脚本的示例,您可以使用这些脚本来配置自动分区和手动分区模型 SageMaker的模型并行度库。 TensorFlow

注意

默认情况下启用自动分区。除非另行指定,否则示例脚本使用自动分区。

使用自动拆分 TensorFlow

要使用模型并行度库运行 TensorFlow 模型,需要对训练脚本进行 SageMaker以下更改:

  1. 使用 smp.init() 导入和初始化库。

  2. 通过从 smp.DistributedModel 继承来定义 Keras 模型,而不是从 Keras 模型类继承。从 smp.DistributedModel 对象的调用方法返回模型输出。请注意,从调用方法返回的任何张量都将在模型并行设备之间广播,这会产生通信开销,因此在调用方法之外不需要的任何张量(例如中间激活)都不应返回。

  3. tf.Dataset.batch() 方法中设置 drop_remainder=True。这是为了确保批次大小始终可以被微批次数量整除。

  4. 使用 smp.dp_rank() 在数据管道中植入随机操作(例如 shuffle(ds, seed=smp.dp_rank())),以确保存有不同模型分区的 GPU 之间数据样本的一致性。

  5. 将向前和向后逻辑放在步进函数中,然后用 smp.step 进行修饰。

  6. 使用 StepOutput 方法(例如 reduce_mean)对微批次的输出进行后处理。smp.step 函数必须具有一个取决于 smp.DistributedModel 的输出的返回值。

  7. 如果有评估步骤,则同样将向前逻辑放在 smp.step 修饰的函数中,然后使用 StepOutput API 对输出进行后处理。

要了解有关模型并行度库 API SageMaker 的更多信息,请参阅 AP I 文档。

以下 Python 脚本是进行更改后的训练脚本的示例。

import tensorflow as tf # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return the model output model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()

如果您已准备好训练脚本,请继续到第 2 步:使用 SageMaker Python 软件开发工具包启动训练 Job。如果要运行混合模型和数据并行训练作业,请继续到下一个部分。

使用 TensorFlow 和 Horovod 自动拆分,实现混合模型和数据并行性

您可以将 SageMaker 模型并行度库与 Horovod 一起使用,以实现混合模型和数据并行性。要详细了解库如何拆分模型以用于混合并行性,请参阅管道并行度(适用于 PyTorch 和) TensorFlow

在这一步中,我们将重点介绍如何修改训练脚本以适应 SageMaker模型并行度库。

要正确设置训练脚本,以便选取要在 第 2 步:使用 SageMaker Python 软件开发工具包启动训练 Job 中设置的混合并行度配置,请使用库的帮助程序函数 smp.dp_rank()smp.mp_rank(),它们分别自动检测数据并行秩和模型并行秩。

要查找该库支持的所有 MPI 原语,请参阅 Pyth SageMaker on SDK 文档中的 MPI 基础知识

脚本中需要进行以下更改:

  • 添加 hvd.allreduce

  • 按照 Horovod 的要求,在第一个批次之后广播变量

  • 使用 smp.dp_rank() 在数据管道中植入随机排序和/或分片操作。

注意

使用 Horovod 时,您不可在训练脚本中直接调用 hvd.init。相反,你必须在中的 SageMaker Python SDK modelparallel 参数True"horovod"将其设置为第 2 步:使用 SageMaker Python 软件开发工具包启动训练 Job。这使得库可以根据模型分区的设备分配,在内部初始化 Horovod。直接在训练脚本中调用 hvd.init() 可能会导致问题。

注意

在训练脚本中直接使用 hvd.DistributedOptimizer API 可能会导致训练性能和速度不佳,因为 API 会隐式地将 AllReduce 操作放入 smp.step 中。我们建议您在 smp.step 返回的梯度上调用 accumulate()reduce_mean() 后,直接调用 hvd.allreduce,以将模型并行性库与 Horovod 一起使用,如下例所示。

要了解有关模型并行度库 API SageMaker 的更多信息,请参阅 AP I 文档。

import tensorflow as tf import horovod.tensorflow as hvd # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: Seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return model outputs model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels, first_batch): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches # Horovod: AllReduce the accumulated gradients gradients = [hvd.allreduce(g.accumulate()) for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # Horovod: Broadcast the variables after first batch if first_batch: hvd.broadcast_variables(model.variables, root_rank=0) hvd.broadcast_variables(optimizer.variables(), root_rank=0) # smdistributed: Merge predictions across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for batch, (images, labels) in enumerate(train_ds): loss = train_step(images, labels, tf.constant(batch == 0))

使用手动拆分 TensorFlow

使用 smp.partition 上下文管理器将操作放在特定的分区中。未放在任何 smp.partition 上下文中的任何操作都放在 default_partition 中。要了解有关模型并行度库 API SageMaker 的更多信息,请参阅 AP I 文档。

import tensorflow as tf # smdistributed: Import TF2.x API. import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches. train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API. class MyModel(smp.DistributedModel): def __init__(self): # define layers def call(self, x): with smp.partition(0): x = self.layer0(x) with smp.partition(1): return self.layer1(x) model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()

不支持的框架功能

该库不支持以下 TensorFlow 功能:

  • 当前不支持 tf.GradientTape()。您可以改用 Optimizer.get_gradients()Optimizer.compute_gradients() 来计算梯度。

  • 目前不支持 tf.train.Checkpoint.restore() API。对于检查点操作,请改用 smp.CheckpointManager,它提供相同的 API 和功能。请注意,smp.CheckpointManager 的检查点还原应在第一步之后进行。