修改 TensorFlow 训练脚本 - Amazon SageMaker
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

修改 TensorFlow 训练脚本

以下是训练脚本的示例,您可以使用这些脚本通过自动分区和手动分区的 TensorFlow 版本 2.3.1 和 2.4.1 配置SageMaker分布式模型并行库。此选择的示例还包括一个与 Horovod 集成的示例,用于混合模型和数据并行度。我们建议您在创建训练脚本不支持的框架功能之前查看 。

中列出了您必须对训练脚本进行哪些必需修改才能使用 库TensorFlow

要了解如何修改训练脚本以将混合模型和数据并行度与 Horovod 结合使用,请参阅TensorFlow with Horovod

如果要使用手动分区,另请查看使用 TensorFlow 手动分区

提示

有关端到端的可运行笔记本示例,演示如何将 TensorFlow训练脚本与SageMaker分布式模型并行库结合使用,请参阅TensorFlow 示例

请注意,自动分区在默认情况下处于启用状态。除非另行指定,否则以下示例脚本使用自动分区。

不支持的框架功能

库不支持以下 TensorFlow 功能:

  • tf.GradientTape() 当前不支持 。您可以Optimizer.get_gradients()改为使用 Optimizer.compute_gradients() 或 计算梯度。

  • 目前不支持 tf.train.Checkpoint.restore() API。对于检查点操作smp.CheckpointManager,请改用 ,它提供了相同的 API 和功能。请注意,使用 的检查点还原smp.CheckpointManager应在第一步之后进行。

TensorFlow

要使用 的分布式模型并行库运行 TensorFlow 模型SageMaker,需要以下训练脚本更改:

  1. 使用 导入并初始化库smp.init()

  2. 通过从 smp.DistributedModel 而不是 Keras Model 类继承来定义 Keras 模型。从 smp.DistributedModel 对象的 调用方法返回模型输出。请注意,从调用方法返回的任何张量都将跨模型并行设备广播,从而导致通信开销,因此不应返回在调用方法之外不需要的任何张量(例如中间激活)。

  3. drop_remainder=True 方法tf.Dataset.batch()中设置 。这是为了确保批次大小始终可被微批次的数量整除。

  4. 使用 在数据管道中为随机操作做种smp.dp_rank(),例如shuffle(ds, seed=smp.dp_rank()),为了确保保存不同模型分区的 GPUs数据样本的一致性。

  5. 将前向和后向逻辑放在步骤函数中,并使用 smp.step 对其进行修饰。

  6. 使用 reduce_mean 等StepOutput方法对小批量的输出执行后处理。smp.step 函数必须具有一个取决于 的输出的返回值smp.DistributedModel

  7. 如果有评估步骤,同样地将前向逻辑放在 smp.step-decorated 函数中,并使用 StepOutput API 后处理输出。

要了解有关 SageMaker的分布式模型并行库 API 的更多信息,请参阅 API 文档

import tensorflow as tf # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # defin layers def call(self, x, training=None): # define forward pass and return the model output model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()

TensorFlow with Horovod

SageMaker 分布式模型并行库可与 Horovod 一起使用以实现混合模型和数据并行度。在这种情况下GPUs的总数必须按分区数来整除。商被推断为模型副本或数据并行度的数量。例如,如果使用 8 个进程(对应于 8 个 GPUs启动训练作业,并且 partitions 为 2,则库在 8 个 GPUs 上应用2-way模型并行度和4-way数据并行度。

要访问进程的数据并行排名和模型并行排名,您可以smp.dp_rank()分别使用 smp.mp_rank() 和 。要查看库公开的所有 MPI 基元,请参阅 API 文档中的 MPI 基础知识。

如果您使用的是 Horovod,则不应直接调用 hvd.init。而是在 "horovod" Python 开发工具包True参数SageMaker中modelparallel将 设置为 ,并且库内部初始化 Horovod。这是因为 Horovod 必须基于模型分区的设备分配进行初始化,并且调用 hvd.init() 会直接导致问题。

此外,使用 hvd.DistributedOptimizer 会直接导致性能不佳或挂起,因为这会将 AllReduce 操作隐式置于 内smp.step。将模型并行库与 Horovod 结合使用的推荐方法是,在调用 hvd.allreduce 之后直接调用 accumulate() ,或者在从 返回的梯度reduce_mean()上调用 smp.step,如以下示例中所示。

脚本中所需的四个主要更改包括:

  • 添加 hvd.allreduce

  • 根据 Horovod 的要求,在第一个批次后广播变量

  • 在 Python 开发工具包"horovod"True dict 中将 modelparallel 参数设置为

  • 使用 在数据管道中为乱序和/或分片操作做种smp.dp_rank()

要了解有关 SageMaker的分布式模型并行库 API 的更多信息,请参阅 API 文档

import tensorflow as tf import horovod.tensorflow as hvd # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: Seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return model outputs model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels, first_batch): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches # Horovod: Allreduce the accumulated gradients gradients = [hvd.allreduce(g.accumulate()) for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # Horovod: Broadcast the variables after first batch if first_batch: hvd.broadcast_variables(model.variables, root_rank=0) hvd.broadcast_variables(optimizer.variables(), root_rank=0) # smdistributed: Merge predictions across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for batch, (images, labels) in enumerate(train_ds): loss = train_step(images, labels, tf.constant(batch == 0))

使用 TensorFlow 手动分区

使用smp.partition上下文管理器将操作放置在特定分区中。未放置在任何smp.partition上下文中的任何操作都放置在 中default_partition。要了解有关 SageMaker的分布式模型并行库 API 的更多信息,请参阅 API 文档

import tensorflow as tf # smdistributed: Import TF2.x API. import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches. train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API. class MyModel(smp.DistributedModel): def __init__(self): # define layers def call(self, x): with smp.partition(0): x = self.layer0(x) with smp.partition(1): return self.layer1(x) model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()