修改 TensorFlow 训练脚本 - Amazon SageMaker
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

修改 TensorFlow 训练脚本

以下步骤介绍如何修改 TensorFlow 训练脚本,以便使用 SageMaker 的分布式数据并行库。 

这些库 API 设计为类似于 Horovod API。有关该库为 TensorFlow 提供的各个 API 的更多详细信息,请参阅 SageMaker 分布式数据并行 TensorFlow API 文档

注意

SageMaker 分布式数据并行可以进行调整,以用于由 tf 核心模块(但 tf.keras 除外)组成的 TensorFlow 训练脚本。SageMaker 分布式数据并行不支持具有 Keras 实施的 TensorFlow。

注意

SageMaker 的分布式数据并行性库支持现成可用的自动混合精度 (AMP)。除了对训练脚本进行框架级别的修改之外,您无需执行额外操作即可启用 AMP。如果梯度位于 FP16 中,则 SageMaker 数据并行性库会在 FP16 中运行其 AllReduce 操作。有关对您的训练脚本实施 AMP API 的更多信息,请参阅以下资源:

  1. 导入库的 TensorFlow 客户端并对其进行初始化。

    import smdistributed.dataparallel.tensorflow as sdp  sdp.init()
  2. 使用 local_rank 将每个 GPU 固定到一个 smdistributed.dataparallel 进程,这表示进程在给定节点中的相对秩。sdp.tensorflow.local_rank() API 向您提供设备的局部秩。领导节点的秩为 0,Worker 节点的秩为 1、2、3,依此类推。以下代码块中将其作为 sdp.local_rank() 调用。set_memory_growth 与 SageMaker 分布式没有直接关系,但必须设置此项以使用 TensorFlow 进行分布式训练。

    gpus = tf.config.experimental.list_physical_devices('GPU') for gpu in gpus:     tf.config.experimental.set_memory_growth(gpu, True) if gpus:     tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], 'GPU')
  3. 根据工作线程数扩展学习率。sdp.tensorflow.size() API 为您提供集群中工作线程的数量。以下代码块中将其作为 sdp.size() 调用。

    learning_rate = learning_rate * sdp.size()
  4. 在训练期间,使用库的 DistributedGradientTape 来优化 AllReduce 操作。这会包装 tf.GradientTape。 

    with tf.GradientTape() as tape:       output = model(input)       loss_value = loss(label, output)      # SageMaker data parallel: Wrap tf.GradientTape with the library's DistributedGradientTape tape = sdp.DistributedGradientTape(tape)
  5. 将初始模型变量从领导节点(秩 0)广播到所有 Worker 节点(秩 1 到 n)。这是确保对所有工作线程秩进行一致的初始化所必需的。在模型和优化器变量初始化后使用 sdp.tensorflow.broadcast_variables API。以下代码块中将其作为 sdp.broadcast_variables() 调用。

    sdp.broadcast_variables(model.variables, root_rank=0) sdp.broadcast_variables(opt.variables(), root_rank=0)
  6. 最后,修改脚本,仅在领导节点上保存检查点。领导节点具有同步模型。这还可以避免 Worker 节点覆盖检查点并可能损坏检查点。

    if sdp.rank() == 0:     checkpoint.save(checkpoint_dir)

以下是使用库进行分布式训练的 TensorFlow 训练脚本示例。

import tensorflow as tf # SageMaker data parallel: Import the library TF API import smdistributed.dataparallel.tensorflow as sdp # SageMaker data parallel: Initialize the library sdp.init() gpus = tf.config.experimental.list_physical_devices('GPU') for gpu in gpus:     tf.config.experimental.set_memory_growth(gpu, True) if gpus:     # SageMaker data parallel: Pin GPUs to a single library process     tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], 'GPU') # Prepare Dataset dataset = tf.data.Dataset.from_tensor_slices(...) # Define Model mnist_model = tf.keras.Sequential(...) loss = tf.losses.SparseCategoricalCrossentropy() # SageMaker data parallel: Scale Learning Rate # LR for 8 node run : 0.000125 # LR for single node run : 0.001 opt = tf.optimizers.Adam(0.000125 * sdp.size()) @tf.function def training_step(images, labels, first_batch):     with tf.GradientTape() as tape:         probs = mnist_model(images, training=True)         loss_value = loss(labels, probs)     # SageMaker data parallel: Wrap tf.GradientTape with the library's DistributedGradientTape     tape = sdp.DistributedGradientTape(tape)     grads = tape.gradient(loss_value, mnist_model.trainable_variables)     opt.apply_gradients(zip(grads, mnist_model.trainable_variables))     if first_batch:        # SageMaker data parallel: Broadcast model and optimizer variables        sdp.broadcast_variables(mnist_model.variables, root_rank=0)        sdp.broadcast_variables(opt.variables(), root_rank=0)     return loss_value ... # SageMaker data parallel: Save checkpoints only from master node. if sdp.rank() == 0:     checkpoint.save(checkpoint_dir)

调整完训练脚本后,请继续到步骤 2:使用 SageMaker Python SDK 启动 SageMaker 分布式训练作业