本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
修改 TensorFlow 训练脚本
以下步骤向您显示如何修改 TensorFlow 用于利用 SageMaker 的分布式数据 parallel 库的训练脚本。
库 API 的设计与霍罗沃德 API 类似。有关库为 TensorFlow 提供的每个 API 的其他详细信息,请参阅SageMaker parallel 分布式数据 TensorFlow API 文档
SageMaker 分布式数据 parallel 可适应 TensorFlow 训练脚本由以下内容tf
核心模块除外tf.keras
模块。 SageMaker 不支持分布式数据 parallel TensorFlow 与 Keras 实施结合使用。
SageMaker 的分布式数据并行库支持开箱即用的自动混合精度 (AMP)。除了对训练脚本的框架级修改之外,无需执行额外操作即可启用 AMP。如果渐变位于 FP16 中,则 SageMaker 数据并行度库运行它AllReduce
在 FP16 中操作。有关在训练脚本中实施 AMP API 的更多信息,请参阅以下资源:
-
TensorFlow
中的NVIDIA 深度学习性能文档 -
深度学习自动混合精度
中的NVIDIA 开发人员文档 -
TensorFlow 混合精度 API
中的TensorFlow 文档
-
导入图书馆的 TensorFlow 客户端然后初始化它。
import smdistributed.dataparallel.tensorflow as sdp sdp.init()
-
将每个 GPU 固定到单个 GPU
smdistributed.dataparallel
使用处理local_rank
— 这是指给定节点内进程的相对排名。这些区域有:sdp.tensorflow.local_rank()
API 为您提供设备的本地排名。领导节点为等级 0,工作人员节点为等级 1、2、3 等。这在以下代码块中被调用为sdp.local_rank()
.set_memory_growth
与之无直接关系 SageMaker 分布式,但必须设置为使用 TensorFlow 进行分布式训练。gpus = tf.config.experimental.list_physical_devices('GPU') for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) if gpus: tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], 'GPU')
-
按工作人员数量扩展学习率。这些区域有:
sdp.tensorflow.size()
API 为您提供集群中的工作人员数量。这在以下代码块中被调用为sdp.size()
.learning_rate = learning_rate * sdp.size()
-
使用图书馆的
DistributedGradientTape
优化AllReduce
训练期间的操作。这个包装tf.GradientTape
.with tf.GradientTape() as tape: output = model(input) loss_value = loss(label, output) # SageMaker data parallel: Wrap tf.GradientTape with the library's DistributedGradientTape tape = sdp.DistributedGradientTape(tape)
-
将领导节点(等级 0)的初始模型变量广播到所有工作节点(等级 1 到 n)。这对于确保所有员工队伍的一致初始化是必要的。使用
sdp.tensorflow.broadcast_variables
初始化模型和优化程序变量之后的 API。 这在以下代码块中被调用为sdp.broadcast_variables()
.sdp.broadcast_variables(model.variables, root_rank=0) sdp.broadcast_variables(opt.variables(), root_rank=0)
-
最后,修改脚本以仅在领导节点上保存检查点。领导节点具有同步模型。这还可以避免工作人员节点覆盖检查点并可能破坏检查点。
if sdp.rank() == 0: checkpoint.save(checkpoint_dir)
以下是示例: TensorFlow 使用图书馆进行分布式训练的训练脚本。
import tensorflow as tf # SageMaker data parallel: Import the library TF API import smdistributed.dataparallel.tensorflow as sdp # SageMaker data parallel: Initialize the library sdp.init() gpus = tf.config.experimental.list_physical_devices('GPU') for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) if gpus: # SageMaker data parallel: Pin GPUs to a single library process tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], 'GPU') # Prepare Dataset dataset = tf.data.Dataset.from_tensor_slices(...) # Define Model mnist_model = tf.keras.Sequential(...) loss = tf.losses.SparseCategoricalCrossentropy() # SageMaker data parallel: Scale Learning Rate # LR for 8 node run : 0.000125 # LR for single node run : 0.001 opt = tf.optimizers.Adam(0.000125 * sdp.size()) @tf.function def training_step(images, labels, first_batch): with tf.GradientTape() as tape: probs = mnist_model(images, training=True) loss_value = loss(labels, probs) # SageMaker data parallel: Wrap tf.GradientTape with the library's DistributedGradientTape tape = sdp.DistributedGradientTape(tape) grads = tape.gradient(loss_value, mnist_model.trainable_variables) opt.apply_gradients(zip(grads, mnist_model.trainable_variables)) if first_batch: # SageMaker data parallel: Broadcast model and optimizer variables sdp.broadcast_variables(mnist_model.variables, root_rank=0) sdp.broadcast_variables(opt.variables(), root_rank=0) return loss_value ... # SageMaker data parallel: Save checkpoints only from master node. if sdp.rank() == 0: checkpoint.save(checkpoint_dir)
完成训练脚本的调整后,继续前往第 2 步:启动 SageMaker 使用分布式培训 Job SageMaker Python 开发工具包.