在 TensorFlow 训练脚本中使用该SMDDP库(已弃用) - Amazon SageMaker
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 TensorFlow 训练脚本中使用该SMDDP库(已弃用)

重要

在 v2.11.0 TensorFlow 之后,该SMDDP库已停止支持DLCs, TensorFlow 并且不再可用。要查找已安装SMDDP库 TensorFlow DLCs的先前内容,请参阅支持的框架

以下步骤向您展示如何修改 TensorFlow 训练脚本以利用分布式数据 p SageMaker arallel 库。 

该库APIs的设计类似于 Horovod APIs。有关该库提供的每种API功能的更多详细信息 TensorFlow,请参阅SageMaker 分布式数据 parallel TensorFlow API 文档

注意

SageMaker 分布式数据 parallel 适用于由除tf.keras模块之外的tf核心模块组成的 TensorFlow 训练脚本。 SageMaker 分布式数据 parallel 不支持 Ker TensorFlow as 实现。

注意

SageMaker 分布式数据并行度库支持开箱即用的自动混合精度 (AMP)。除了对训练脚本进行框架级别的修改外,无需执行任何AMP其他操作即可启用。如果有梯度FP16,则 SageMaker 数据并行度库将在中运行其操作。AllReduce FP16有关实现训练脚AMPAPIs本的更多信息,请参阅以下资源:

  1. 导入库的 TensorFlow 客户端并对其进行初始化。

    import smdistributed.dataparallel.tensorflow as sdp  sdp.init()
  2. 使用 local_rank —t GPU his将每个smdistributed.dataparallel进程固定到一个进程,这是指该进程在给定节点中的相对等级。sdp.tensorflow.local_rank()API为您提供设备的本地等级。领导节点的秩为 0,Worker 节点的秩为 1、2、3,依此类推。在以下代码块中将其调用为sdp.local_rank()set_memory_growth与 SageMaker 分布式没有直接关系,但必须将其设置为使用进行分布式训练 TensorFlow。

    gpus = tf.config.experimental.list_physical_devices('GPU') for gpu in gpus:     tf.config.experimental.set_memory_growth(gpu, True) if gpus:     tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], 'GPU')
  3. 根据工作线程数扩展学习率。sdp.tensorflow.size()API为您提供集群中工作人员的数量。以下代码块中将其作为 sdp.size() 调用。

    learning_rate = learning_rate * sdp.size()
  4. 在训练期间,使用库的 DistributedGradientTape 来优化 AllReduce 操作。这会包装 tf.GradientTape。 

    with tf.GradientTape() as tape:       output = model(input)       loss_value = loss(label, output)      # SageMaker data parallel: Wrap tf.GradientTape with the library's DistributedGradientTape tape = sdp.DistributedGradientTape(tape)
  5. 将初始模型变量从领导节点(秩 0)广播到所有 Worker 节点(秩 1 到 n)。这是确保对所有工作线程秩进行一致的初始化所必需的。在模型和优化器变量初始化sdp.tensorflow.broadcast_variablesAPI之后使用。 在以下代码块中将其调用为sdp.broadcast_variables()

    sdp.broadcast_variables(model.variables, root_rank=0) sdp.broadcast_variables(opt.variables(), root_rank=0)
  6. 最后,修改脚本,仅在领导节点上保存检查点。领导节点具有同步模型。这还可以避免 Worker 节点覆盖检查点并可能损坏检查点。

    if sdp.rank() == 0:     checkpoint.save(checkpoint_dir)

以下是使用库进行分布式 TensorFlow 训练的示例训练脚本。

import tensorflow as tf # SageMaker data parallel: Import the library TF API import smdistributed.dataparallel.tensorflow as sdp # SageMaker data parallel: Initialize the library sdp.init() gpus = tf.config.experimental.list_physical_devices('GPU') for gpu in gpus:     tf.config.experimental.set_memory_growth(gpu, True) if gpus:     # SageMaker data parallel: Pin GPUs to a single library process     tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], 'GPU') # Prepare Dataset dataset = tf.data.Dataset.from_tensor_slices(...) # Define Model mnist_model = tf.keras.Sequential(...) loss = tf.losses.SparseCategoricalCrossentropy() # SageMaker data parallel: Scale Learning Rate # LR for 8 node run : 0.000125 # LR for single node run : 0.001 opt = tf.optimizers.Adam(0.000125 * sdp.size()) @tf.function def training_step(images, labels, first_batch):     with tf.GradientTape() as tape:         probs = mnist_model(images, training=True)         loss_value = loss(labels, probs)     # SageMaker data parallel: Wrap tf.GradientTape with the library's DistributedGradientTape     tape = sdp.DistributedGradientTape(tape)     grads = tape.gradient(loss_value, mnist_model.trainable_variables)     opt.apply_gradients(zip(grads, mnist_model.trainable_variables))     if first_batch:        # SageMaker data parallel: Broadcast model and optimizer variables        sdp.broadcast_variables(mnist_model.variables, root_rank=0)        sdp.broadcast_variables(opt.variables(), root_rank=0)     return loss_value ... # SageMaker data parallel: Save checkpoints only from master node. if sdp.rank() == 0:     checkpoint.save(checkpoint_dir)

调整完训练脚本后,请继续到SMDDP使用 SageMaker Python 启动分布式训练作业 SDK