本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
在 TensorFlow 训练脚本中使用 SMDDP 库(已过时)
重要
SMDDP 库已停止对 TensorFlow 的支持,在 TensorFlow 版本 2.11.0 之后的 DLC 中不再可用。要查找已安装 SMDDP 库的 TensorFlow DLC,请参阅 支持的框架。
以下步骤介绍如何修改 TensorFlow 训练脚本,以便使用 SageMaker AI 的分布式数据并行库。
这些库 API 设计为类似于 Horovod API。有关该库为 TensorFlow 提供的各个 API 的更多详细信息,请参阅 SageMaker AI 分布式数据并行 TensorFlow API 文档
注意
SageMaker AI 分布式数据并行可以进行调整,以用于由 tf 核心模块(但 tf.keras 除外)组成的 TensorFlow 训练脚本。SageMaker AI 分布式数据并行不支持具有 Keras 实施的 TensorFlow。
注意
SageMaker AI 的分布式数据并行库直接支持自动混合精度(AMP)。除了对训练脚本进行框架级别的修改之外,您无需执行额外操作即可启用 AMP。如果梯度位于 FP16 中,则 SageMaker AI 数据并行库会在 FP16 中运行其 AllReduce 操作。有关对您的训练脚本实施 AMP API 的更多信息,请参阅以下资源:
-
NVIDIA 深度学习性能文档中的框架 – TensorFlow
-
NVIDIA 开发人员文档中的适用于深度学习的自动混合精度
-
《TensorFlow 文档》中的 TensorFlow 混合精度 API
-
导入库的 TensorFlow 客户端并对其进行初始化。
import smdistributed.dataparallel.tensorflow as sdp sdp.init() -
使用
local_rank将每个 GPU 固定到一个smdistributed.dataparallel进程,这表示进程在给定节点中的相对秩。sdp.tensorflow.local_rank()API 向您提供设备的局部秩。领导节点的秩为 0,Worker 节点的秩为 1、2、3,依此类推。以下代码块中将其作为sdp.local_rank()调用。set_memory_growth与 SageMaker AI 分布式数据并行库没有直接关系,但必须设置此项以使用 TensorFlow 进行分布式训练。gpus = tf.config.experimental.list_physical_devices('GPU') for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) if gpus: tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], 'GPU') -
根据工作线程数扩展学习率。
sdp.tensorflow.size()API 为您提供集群中工作线程的数量。以下代码块中将其作为sdp.size()调用。learning_rate = learning_rate * sdp.size() -
在训练期间,使用库的
DistributedGradientTape来优化AllReduce操作。这会包装tf.GradientTape。with tf.GradientTape() as tape: output = model(input) loss_value = loss(label, output) # SageMaker AI data parallel: Wrap tf.GradientTape with the library's DistributedGradientTape tape = sdp.DistributedGradientTape(tape) -
将初始模型变量从领导节点(秩 0)广播到所有 Worker 节点(秩 1 到 n)。这是确保对所有工作线程秩进行一致的初始化所必需的。在模型和优化器变量初始化后使用
sdp.tensorflow.broadcast_variablesAPI。以下代码块中将其作为sdp.broadcast_variables()调用。sdp.broadcast_variables(model.variables, root_rank=0) sdp.broadcast_variables(opt.variables(), root_rank=0) -
最后,修改脚本,仅在领导节点上保存检查点。领导节点具有同步模型。这还可以避免 Worker 节点覆盖检查点并可能损坏检查点。
if sdp.rank() == 0: checkpoint.save(checkpoint_dir)
以下是使用库进行分布式训练的 TensorFlow 训练脚本示例。
import tensorflow as tf # SageMaker AI data parallel: Import the library TF API import smdistributed.dataparallel.tensorflow as sdp # SageMaker AI data parallel: Initialize the library sdp.init() gpus = tf.config.experimental.list_physical_devices('GPU') for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) if gpus: # SageMaker AI data parallel: Pin GPUs to a single library process tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], 'GPU') # Prepare Dataset dataset = tf.data.Dataset.from_tensor_slices(...) # Define Model mnist_model = tf.keras.Sequential(...) loss = tf.losses.SparseCategoricalCrossentropy() # SageMaker AI data parallel: Scale Learning Rate # LR for 8 node run : 0.000125 # LR for single node run : 0.001 opt = tf.optimizers.Adam(0.000125 * sdp.size()) @tf.function def training_step(images, labels, first_batch): with tf.GradientTape() as tape: probs = mnist_model(images, training=True) loss_value = loss(labels, probs) # SageMaker AI data parallel: Wrap tf.GradientTape with the library's DistributedGradientTape tape = sdp.DistributedGradientTape(tape) grads = tape.gradient(loss_value, mnist_model.trainable_variables) opt.apply_gradients(zip(grads, mnist_model.trainable_variables)) if first_batch: # SageMaker AI data parallel: Broadcast model and optimizer variables sdp.broadcast_variables(mnist_model.variables, root_rank=0) sdp.broadcast_variables(opt.variables(), root_rank=0) return loss_value ... # SageMaker AI data parallel: Save checkpoints only from master node. if sdp.rank() == 0: checkpoint.save(checkpoint_dir)
调整完训练脚本后,请继续到使用 SageMaker Python SDK 通过 SMDDP 启动分布式训练作业。