使用SageMaker数据并行库修改您的训练脚本 - Amazon SageMaker
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用SageMaker数据并行库修改您的训练脚本

脚本修改概述

SageMaker的分布式数据并行库( 库APIs 旨在便于使用,并且与现有的分布式训练工具包无缝集成。

  • SageMaker 具有库 API 的 Python 开发工具包:在大多数情况下,您在训练脚本中只需更改的就是 Horovod 或其他数据并行库导入语句。使用SageMaker数据并行库等效项交换这些内容。

  • 专注于模型训练而不进行基础设施管理:在 上使用 库训练深度学习模型时SageMaker,您可以专注于模型训练,而 SageMaker 执行集群管理: 会启动节点并创建集群, 完成训练,然后停用集群。

要自定义您自己的训练脚本,您需要执行以下操作:

  • 您必须提供经过修改才能使用 库的 TensorFlow/PyTorch 训练脚本。以下部分提供了该操作的示例代码。

  • 您的输入数据必须位于 S3 存储桶中,或者位于 AWS 区域中的 FSx 中,用于启动训练作业。如果您使用提供的 Jupyter Notebooks,请在包含输入数据的 存储桶所在的区域中创建一个SageMaker笔记本实例。有关存储训练数据的更多信息,请参阅 SageMaker Python 开发工具包数据输入文档。

提示

考虑使用 FSx 而不是 Amazon S3 来提高训练绩效。它具有比 更高的吞吐量和更低的延迟Amazon S3。

使用以下部分可查看可用于转换 TensorFlow 或 PyTorch 训练脚本的训练脚本的示例。然后,使用您的模板的一个示例笔记本来启动训练作业。您需要将训练脚本与笔记本附带的脚本交换,并根据需要修改任何输入函数。启动训练作业后,您可以使用 监控该作业Amazon CloudWatch。

然后,您可以按照用于部署模型的示例笔记本之一了解如何将训练后的模型部署到 终端节点。

最后,您可以按照示例笔记本测试已部署模型的推理。

使用 SMD 数据并行修改 TensorFlow 2.x 训练脚本

以下步骤说明如何转换 TensorFlow 2.3.1 或 2.4.1 训练脚本以利用 SageMaker的分布式数据并行库。 

库 APIs 设计为类似于 Horovod APIs。有关库为 TensorFlow 提供的每个 API 的其他详细信息,请参阅SageMaker分布式数据并行 TensorFlow API 文档

  1. 导入库的 TensorFlow 客户端并将其初始化:

    import smdistributed.dataparallel.tensorflow as sdp  sdp.init()
  2. 使用 smdistributed.dataparallellocal_rank将每个 GPU 固定到单个—进程,这是指给定节点中进程的相对排名。sdp.tensorflow.local_rank()API 为您提供设备的本地排名。领导节点为排名 0,工作线程节点为排名 1、2、3,以此类推。这在下一个代码块中作为 调用sdp.local_rank()set_memory_growth不与SageMaker分布式直接相关,但必须为 TensorFlow的分布式训练设置 。

    gpus = tf.config.experimental.list_physical_devices('GPU') for gpu in gpus:     tf.config.experimental.set_memory_growth(gpu, True) if gpus:     tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], 'GPU')
  3. 按工作人员数来扩展学习率。sdp.tensorflow.size()API 为您提供集群中的工作线程数量。这在下一个代码块中作为 调用sdp.size()

    learning_rate = learning_rate * sdp.size()
  4. 在训练期间使用 库的 DistributedGradientTape优化AllReduce操作。此包装tf.GradientTape。 

    with tf.GradientTape() as tape:       output = model(input)       loss_value = loss(label, output)      # SageMaker data parallel: Wrap tf.GradientTape with the library's DistributedGradientTape tape = sdp.DistributedGradientTape(tape)
  5. 将初始模型变量从领导节点(排名 0)广播到所有工作线程节点(排名 1 到 n)。这是确保跨所有工作线程排名进行一致初始化所必需的。为此,您可以在初始化模型和优化程序变量后使用 sdp.tensorflow.broadcast_variablesAPI。这将在下一个代码块中作为 调用sdp.broadcast_variables()

    sdp.broadcast_variables(model.variables, root_rank=0) sdp.broadcast_variables(opt.variables(), root_rank=0)
  6. 最后,修改脚本以仅在领导节点上保存检查点。领导节点有一个同步的模型。这还会避免工作线程节点覆盖检查点,并可能损坏检查点。

    if sdp.rank() == 0:     checkpoint.save(checkpoint_dir)

以下是使用 库进行分布式训练的示例 TensorFlow2 训练脚本:

import tensorflow as tf # SageMaker data parallel: Import the library TF API import smdistributed.dataparallel.tensorflow as sdp # SageMaker data parallel: Initialize the library sdp.init() gpus = tf.config.experimental.list_physical_devices('GPU') for gpu in gpus:     tf.config.experimental.set_memory_growth(gpu, True) if gpus:     # SageMaker data parallel: Pin GPUs to a single library process     tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], 'GPU') # Prepare Dataset dataset = tf.data.Dataset.from_tensor_slices(...) # Define Model mnist_model = tf.keras.Sequential(...) loss = tf.losses.SparseCategoricalCrossentropy() # SageMaker data parallel: Scale Learning Rate # LR for 8 node run : 0.000125 # LR for single node run : 0.001 opt = tf.optimizers.Adam(0.000125 * sdp.size()) @tf.function def training_step(images, labels, first_batch):     with tf.GradientTape() as tape:         probs = mnist_model(images, training=True)         loss_value = loss(labels, probs)     # SageMaker data parallel: Wrap tf.GradientTape with the library's DistributedGradientTape     tape = sdp.DistributedGradientTape(tape)     grads = tape.gradient(loss_value, mnist_model.trainable_variables)     opt.apply_gradients(zip(grads, mnist_model.trainable_variables))     if first_batch:        # SageMaker data parallel: Broadcast model and optimizer variables        sdp.broadcast_variables(mnist_model.variables, root_rank=0)        sdp.broadcast_variables(opt.variables(), root_rank=0)     return loss_value ... # SageMaker data parallel: Save checkpoints only from master node. if sdp.rank() == 0:     checkpoint.save(checkpoint_dir)

有关更高级的SageMaker用法,请参阅分布式数据并行 TensorFlow API 文档

使用 SMD 数据并行修改 PyTorch 训练脚本

以下步骤说明如何转换 PyTorch训练脚本以利用 SageMaker的受限数据并行库。

库 APIs 设计为类似于 PyTorch 分布式数据并行 (DDP) APIs。有关为 PyTorch 提供的每个数据并行 API 的其他详细信息,请参阅SageMaker详细数据并行 PyTorch API 文档

  1. 导入库的 PyTorch 客户端并进行初始化,然后导入 模块以进行分布式训练。

    import smdistributed.dataparallel.torch.distributed as dist from smdistributed.dataparallel.torch.parallel.distributed import DistributedDataParallel as DDP dist.init_process_group()
  2. 使用 SageMakerlocal_rank将每个 GPU 固定到单个—数据并行库进程,这是指给定节点中进程的相对排名。

    smdistributed.dataparallel.torch.get_local_rank()API 为您提供设备的本地排名。领导节点为排名 0,工作线程节点为排名 1、2、3,以此类推。这在下一个代码块中作为 调用dist.get_local_rank()

    torch.cuda.set_device(dist.get_local_rank())
  3. 使用库的 DEP 对 PyTorch 模型进行包装。

    model = ... # Wrap model with the library's DistributedDataParallel model = DDP(model)
  4. 修改 torch.utils.data.distributed.DistributedSampler以包含集群的信息。将 设置为num_replicas跨集群中的所有节点参与训练的 GPUs 总数。这称为 world_size。您可以使用 world_sizeAPI 获取 smdistributed.dataparallel.torch.get_world_size()。这在以下代码中作为 调用dist.get_world_size()。还可以使用 提供节点排名smdistributed.dataparallel.torch.get_rank()。这是作为 调用的dist.get_rank()

    train_sampler = DistributedSampler(train_dataset, num_replicas=dist.get_world_size(), rank=dist.get_rank())
  5. 修改脚本以仅在领导节点上保存检查点。领导节点具有同步的模型。这还会避免工作线程节点覆盖检查点,并可能损坏检查点。

以下是使用 库进行分布式训练的示例 PyTorch 训练脚本:

# SageMaker data parallel: Import the library PyTorch API import smdistributed.dataparallel.torch.distributed as dist # SageMaker data parallel: Import the library PyTorch DDP from smdistributed.dataparallel.torch.parallel.distributed import DistributedDataParallel as DDP # SageMaker data parallel: Initialize the library dist.init_process_group() class Net(nn.Module):     ...     # Define model def train(...):     ...     # Model training def test(...):     ...     # Model evaluation def main():          # SageMaker data parallel: Scale batch size by world size     batch_size //= dist.get_world_size() // 8     batch_size = max(batch_size, 1)     # Prepare dataset     train_dataset = torchvision.datasets.MNIST(...)       # SageMaker data parallel: Set num_replicas and rank in DistributedSampler     train_sampler = torch.utils.data.distributed.DistributedSampler(             train_dataset,             num_replicas=dist.get_world_size(),             rank=dist.get_rank())       train_loader = torch.utils.data.DataLoader(..)       # SageMaker data parallel: Wrap the PyTorch model with the library's DDP     model = DDP(Net().to(device))          # SageMaker data parallel: Pin each GPU to a single library process.     torch.cuda.set_device(local_rank)     model.cuda(local_rank)          # Train     optimizer = optim.Adadelta(...)     scheduler = StepLR(...)     for epoch in range(1, args.epochs + 1):         train(...)         if rank == 0:             test(...)         scheduler.step()     # SageMaker data parallel: Save model on master node.     if dist.get_rank() == 0:         torch.save(...) if __name__ == '__main__':     main()

有关更高级的SageMaker用法,请参阅分布式数据并行 PyTorch API 文档

启动训练作业

要使用SageMaker分布式数据并行配置的训练脚本启动训练作业,请使用 Estimator.fit() 函数。我们建议您使用SageMaker笔记本实例或 SageMaker Studio 启动训练作业。要查看有关如何使用 Studio 或笔记本实例启动训练作业的示例,请参阅分布式训练 Jupyter 笔记本示例

使用以下资源了解有关将 SageMaker Python 开发工具包与这些框架结合使用的更多信息: