修改简单 TensorFlow 训练脚本 - Amazon SageMaker
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

修改简单 TensorFlow 训练脚本

以下是培训脚本示例,您可以使用这些脚本将 SageMaker 分布式模型并行库配置为具有自动分区和手动分区的 TensorFlow 版本 2.3.1 和 2.4.1。这些示例还包括一个与 Horovod 集成的示例,用于混合模型和数据并行性。建议您查看不支持的框架功能创建训练脚本之前。

要使用库,您必须对训练脚本进行的所需修改列于TensorFlow.

要了解如何修改训练脚本以使用混合模型和数据并行性与 Horovod 结合使用,请参阅TensorFlow with Horovod.

如果您想要使用手动分区,请查看利用 TensorFlow 进行手动分区.

提示

有关演示如何在 SageMaker 分布式模型并行库中使用 TensorFlow 训练脚本的端到端可运行的笔记本示例,请参阅TensorFlow 示例.

请注意,默认情况下,自动分区处于启用状态。除非另行指定,否则以下示例脚本将使用自动分区。

不支持的框架功能

库不支持以下 TensorFlow 功能:

  • tf.GradientTape()当前不支持。您可以使用Optimizer.get_gradients()或者Optimizer.compute_gradients()来计算渐变。

  • 这些区域有:tf.train.Checkpoint.restore()API 目前不支持。对于检查点,请使用smp.CheckpointManager,它提供了相同的 API 和功能。请注意,检查点将使用smp.CheckpointManager应该在第一步之后进行。

TensorFlow

要使用 SageMaker 的分布式模型并行库运行 TensorFlow 模型,需要进行以下训练脚本更改:

  1. 导入并初始化库smp.init().

  2. 定义 Keras 模型,方法是继承自smp.DistributedModel而不是 Keras 模型类。返回从smp.DistributedModel对象。请注意,从调用方法返回的任何张量都将在模型并行设备之间进行广播,从而产生通信开销,因此不应返回调用方法之外不需要的任何张量(如中间激活)。

  3. Setdrop_remainder=Truetf.Dataset.batch()方法。这是为了确保批次大小始终可以除以微粒数量。

  4. 使用数据管道中的随机操作种子smp.dp_rank(),例如shuffle(ds, seed=smp.dp_rank()),以确保保存不同模型分区的 GPU 之间数据样本的一致性。

  5. 将前向和向后逻辑放在一个步骤函数中,并用 smp.step 装饰它。

  6. 对多个微粒的输出执行后处理,使用StepOutput方法,例如减少平均值。这些区域有:smp.step函数的返回值必须取决于smp.DistributedModel.

  7. 如果有评估步骤,同样地将前向逻辑放置在 smp.step 装饰函数中,并使用StepOutputAPI.

要了解有关 SageMaker 分布式模型并行库 API 的更多信息,请参阅API 文档.

import tensorflow as tf # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # defin layers def call(self, x, training=None): # define forward pass and return the model output model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()

TensorFlow with Horovod

SageMaker 分布式模型并行库可与 Horovod 一起使用,实现混合模型和数据并行性。在这种情况下,GPU 的总数必须可以除以分区数。商推断为模型副本的数量或数据并行度。例如,如果培训作业启动了 8 个进程(相当于 8 个 GPU),并且partitions为 2,则库在 8 个 GPU 上应用双向模型并行性和 4 路数据并行性。

要访问流程的数据并行排名和模型并行排名,可以使用smp.dp_rank()smp.mp_rank(),分别。要查看库公开的所有 MPI 基本体,请参阅MPI 基本知识中)。

如果您使用的是 Horovod,则不应直接调用hvd.init. 相反,将"horovod"True在 SageMaker Python 开发工具包中modelparallel参数,并且库在内部初始化 Horovod。这是因为 Horovod 必须基于模型分区的设备分配进行初始化,并调用hvd.init()直接导致问题。

此外,使用hvd.DistributedOptimizer直接导致性能不佳或挂起,因为这隐式地将 allReduce 操作置于smp.step. 推荐使用 Horovod 模型并行库的方法是直接调用hvd.allreduce在调用accumulate()或者reduce_mean()的渐变上返回smp.step,如以下示例所示。

脚本中需要的四个主要更改是:

  • 正在添加hvd.allreduce

  • 根据 Horovod 的要求,第一批后的广播变量

  • 设置"horovod"Parameter toTrue中的modelparallelPython 开发工具包中的字典

  • 在数据管道中使用播种混合和/或分片操作smp.dp_rank().

要了解有关 SageMaker 分布式模型并行库 API 的更多信息,请参阅API 文档.

import tensorflow as tf import horovod.tensorflow as hvd # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: Seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return model outputs model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels, first_batch): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches # Horovod: Allreduce the accumulated gradients gradients = [hvd.allreduce(g.accumulate()) for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # Horovod: Broadcast the variables after first batch if first_batch: hvd.broadcast_variables(model.variables, root_rank=0) hvd.broadcast_variables(optimizer.variables(), root_rank=0) # smdistributed: Merge predictions across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for batch, (images, labels) in enumerate(train_ds): loss = train_step(images, labels, tf.constant(batch == 0))

利用 TensorFlow 进行手动分区

使用smp.partition上下文管理器将操作放置在特定分区中。任何未放置在任何smp.partition上下文放置在default_partition. 要了解有关 SageMaker 分布式模型并行库 API 的更多信息,请参阅API 文档.

import tensorflow as tf # smdistributed: Import TF2.x API. import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches. train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API. class MyModel(smp.DistributedModel): def __init__(self): # define layers def call(self, x): with smp.partition(0): x = self.layer0(x) with smp.partition(1): return self.layer1(x) model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()