修改TensorFlow训练脚本 - Amazon SageMaker
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

修改TensorFlow训练脚本

在本节中,您将学习如何修改TensorFlow训练脚本以配置用于自动分区和手动分区的SageMaker模型并行度库。这部分示例还包括一个与 Horovod 集成的示例,用于混合模型和数据并行。

注意

要查找该库支持哪些TensorFlow版本,请参阅支持的框架和 Amazon Web Services 区域

中列出了为使用该库而必须对训练脚本做出的必要修改自动拆分为 TensorFlow

要了解如何修改训练脚本以在 Horovod 中使用混合模型和数据并行性,请参阅。使用TensorFlow和 Horovod 自动拆分,实现混合模型和数据并行性

如果您想使用手动分区,也请查看使用手动拆分 TensorFlow

提示

有关演示如何在SageMaker模型并行库中使用TensorFlow训练脚本的端到端笔记本示例,请参阅。TensorFlow例子

以下主题显示了训练脚本的示例,您可以使用这些脚本为自动分区和手动分区模型配置SageMaker模型并行度库。TensorFlow

注意

默认情况下,自动分区处于启用状态。除非另有说明,否则示例脚本使用自动分区。

自动拆分为 TensorFlow

要使用模型并行度库运行TensorFlow模型,需要对训练脚SageMaker本进行以下更改:

  1. 使用导入和初始化库smp.init()

  2. 通过继承自smp.DistributedModel而不是 Keras 模型类来定义 Keras 模型。返回smp.DistributedModel对象的调用方法的模型输出。请注意,从调用方法返回的任何张量都将在模型并行设备上广播,从而产生通信开销,因此不应返回调用方法之外不需要的任何张量(例如中间激活)。

  3. drop_remainder=Truetf.Dataset.batch()方法中设置。这是为了确保批量大小始终可以除以微批次数。

  4. 例如,使用smp.dp_rank()为数据管道中的随机操作设置种子,shuffle(ds, seed=smp.dp_rank())以确保数据样本在包含不同模型分区的 GPU 之间的一致性。

  5. 将向前和向后逻辑放在一个步骤函数中,并用它进行装饰smp.step

  6. 使用以下StepOutput方法对跨微批次的输出进行后处理。reduce_meansmp.step函数的返回值必须取决于的输出smp.DistributedModel

  7. 如果存在评估步骤,则同样将正向逻辑放置在 smp.step-dearded 函数中,然后使用 API 对输出进行StepOutput后处理。

要了解有关模型并行度库 API SageMaker 的更多信息,请参阅 AP I 文档。

以下 Python 脚本是更改后的训练脚本示例。

import tensorflow as tf # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return the model output model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()

如果您已准备完训练脚本,请继续第 2 步:使用 SageMaker Python 软件开发工具包启动训练作业。如果您想运行混合模型和数据并行训练作业,请继续阅读下一节。

使用TensorFlow和 Horovod 自动拆分,实现混合模型和数据并行性

您可以将SageMaker模型并行库与 Horovod 一起使用,实现混合模型和数据并行。要详细了解该库如何拆分模型以实现混合并行,请参阅。流水线并行性(适用于PyTorch和)TensorFlow

在本步骤中,我们将重点介绍如何修改训练脚本以调整SageMaker模型并行度库。

要正确设置训练脚本以获取您将在其中设置的混合并行配置,请使用库的辅助函数smp.dp_rank()smp.mp_rank(),它们分别自动检测数据并行等级和模型并行等级。第 2 步:使用 SageMaker Python 软件开发工具包启动训练作业

要查找该库支持的所有 MPI 基元,请参阅 P SageMaker ython SDK 文档中的 MPI 基础知识

脚本中需要的更改是:

  • 正在添加 hvd.allreduce

  • 按照 Horovod 的要求,在第一批之后广播变量

  • 为数据管道中的洗牌和/或分片操作提供种子。smp.dp_rank()

注意

使用 Horovod 时,不得直接调用训练hvd.init脚本。相反,你必须在中的 SageMaker Python SDK modelparallel 参数中设置为"horovod"第 2 步:使用 SageMaker Python 软件开发工具包启动训练作业True这允许库根据模型分区的设备分配在内部初始化 Horovod。hvd.init()直接在训练脚本中调用可能会导致问题。

注意

直接在训练脚本中使用 hvd.DistributedOptimizer API 可能会导致训练性能和速度变差,因为 API 会将AllReduce操作隐含在里面smp.step。我们建议您在 Horovod 中使用模型并行度库,方法hvd.allreduce是在调用后直接调用accumulate()reduce_mean()根据返回的梯度进行调用smp.step,如以下示例所示。

要了解有关模型并行度库 API SageMaker 的更多信息,请参阅 AP I 文档。

import tensorflow as tf import horovod.tensorflow as hvd # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: Seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return model outputs model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels, first_batch): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches # Horovod: AllReduce the accumulated gradients gradients = [hvd.allreduce(g.accumulate()) for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # Horovod: Broadcast the variables after first batch if first_batch: hvd.broadcast_variables(model.variables, root_rank=0) hvd.broadcast_variables(optimizer.variables(), root_rank=0) # smdistributed: Merge predictions across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for batch, (images, labels) in enumerate(train_ds): loss = train_step(images, labels, tf.constant(batch == 0))

使用手动拆分 TensorFlow

使用smp.partition上下文管理器在特定分区中放置操作。任何未置于任何smp.partition上下文中的操作都将置于中default_partition。要了解有关模型并行度库 API SageMaker 的更多信息,请参阅 AP I 文档。

import tensorflow as tf # smdistributed: Import TF2.x API. import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches. train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API. class MyModel(smp.DistributedModel): def __init__(self): # define layers def call(self, x): with smp.partition(0): x = self.layer0(x) with smp.partition(1): return self.layer1(x) model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()

不支持的框架功能

该库不支持以下TensorFlow功能:

  • tf.GradientTape()目前不支持。您可以Optimizer.compute_gradients()改用Optimizer.get_gradients()或来计算梯度。

  • 目前不支持tf.train.Checkpoint.restore()该 API。对于检查点,请smp.CheckpointManager改用,它提供相同的 API 和功能。请注意,检查点恢复smp.CheckpointManager应在第一步之后进行。