修改 PyTorch训练脚本 - Amazon SageMaker
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

修改 PyTorch训练脚本

以下是训练脚本的示例,您可以使用这些脚本通过自动分区和手动分区来配置 PyTorch 版本 1.7.1 和 1.6.0 的 SageMaker模型并行库。我们建议您在创建训练脚本重要注意事项之前查看 不支持的框架功能 和 。

中列出了您必须对训练脚本进行的必要修改才能使用 库PyTorch

如果要使用手动分区,另请查看使用 PyTorch 手动分区

提示

有关端到端的可运行笔记本示例,演示如何将 TensorFlow 训练脚本与SageMaker分布式模型并行库结合使用,请参阅PyTorch 示例

请注意,自动分区在默认情况下处于启用状态。除非另行指定,否则以下脚本使用自动分区。

重要注意事项

当您使用 SageMaker的分布式模型并行库配置 PyTorch 训练脚本时,您应了解以下事项:

  • 如果您使用的是依赖于全局梯度标准的优化技术,例如来自整个模型的梯度法,例如 LAMB 优化程序或全局梯度剪切的一些变体,则需要跨模型分区收集所有规则,以确保正确性。您可以使用库的通信基本数据类型来执行此操作。

  • 在模块输出的计算中,必须使用torch.Tensor模型中 的转发方法的所有nn.Modules参数。换句话说,库不支持模块输出不依赖于的torch.Tensor模块参数的情况。

  • smp.DistributedModel.backward() 调用的 参数必须依赖所有模型输出。换句话说,无法对未用于传入smp.DistributedModel.forward到调用中的张量计算中的smp.DistributedModel.backward调用进行输出。

  • 如果您的代码中存在torch.cuda.synchronize()调用,您可能需要在同步调用之前torch.cuda.set_device(smp.local_rank())立即调用 。否则,可能会在设备 0 中创建不必要的 CUDA 上下文,这会不必要地占用内存。

  • 由于库放置在nn.Modules不同的设备上,模型中的模块不得依赖在 内修改的任何全局状态smp.step。允许在整个训练过程中保持固定状态或在外部修改并对所有进程smp.step可见的任何状态。

  • 您无需将模型移动到 GPU(使用 )。to() API)。如果您在模型分区之前(第一次smp.step调用之前)尝试将模型移动到 GPU,将忽略移动调用。库自动将分配给排名的模型的 部分移动到其 GPU。使用库开始训练后,请勿将模型移动到 CPU 并使用它,因为它对于未分配给 进程保留的分区的模块没有正确的参数。如果您希望在使用模型并行库训练后重新训练模型或将其用于没有库的推理,建议的方法是使用我们的检查点 API 保存完整模型,并将其加载回常规 PyTorch 模块。

  • 如果您具有模块列表,以便将一个模块的输出馈送到另一个模块,则将该列表替换为 nn.Sequential 可以显著提高性能。

  • 权重更新 (optimizer.step()) 需要在 外部进行smp.step,因为完成整个向后传递并准备好梯度时就是这样。在将混合模型与模型和数据并行度结合使用时,此时还保证了梯度的 Allreduce 已完成。

  • 在将库与数据并行度结合使用时,请确保所有数据并行排名上的批次数相同,以便 Allreduce 不会挂起等待未参与此步骤的排名。

  • 的输入smp.step必须是 生成的模型输入DataLoader。这是因为, 在smp.step内部拆分批处理维度的输入张量并对其进行管道处理。这意味着,将DataLoader自身传递给 smp.step 函数以在 中生成模型输入不起作用。

  • smp.step 必须使用 .to() API 将到 的输入张量移动到当前设备,该操作必须在torch.cuda.set_device(local_rank())调用后执行。

不支持的框架功能

的分布式模型并行库不支持以下 PyTorch SageMaker功能:

  • 在将数据并行度与 DED 结合使用时,不支持 DistributedDataParallel 包装程序。该库在内部管理与 DEP 的集成,包括参数广播和梯度 Allreduce。使用 库时,模块缓冲区在训练开始时仅广播一次。如果您的模型具有模块缓冲区,并且您需要在每个步骤中跨数据并行组同步模块缓冲区,则可以使用通过 获取的进程组通过 torch.distributed API 执行此操作smp.get_dp_process_group()

  • 对于混合精度训练,不支持 apex.amp 模块。将 库与自动混合精度结合使用的推荐方法是使用 torch.cuda.amp,但在 torch 中使用 smp.amp.GradScaler 而不是 实施。

  • torch.jit.ScriptModules 不支持 ScriptFunctionssmp.DistributedModel

  • apex FusedLayerNorm :不支持 FusedAdamFusedLAMB、、 FusedNovoGradapex 。您可以通过 smp.optimizerssmp.nn APIs 来改用这些 的库实施。

PyTorch

使用 的SageMaker分布式模型并行库运行 PyTorch 模型需要以下训练脚本更改:

  1. 使用 导入并初始化库smp.init()

  2. 使用 包装模型smp.DistributedModel。请注意,从基础forward对象的 nn.Module 方法返回的任何张量都将跨模型并行设备广播,从而导致通信开销,因此不应返回在调用方法之外不需要的任何张量(例如中间激活)。

  3. 使用 包装优化程序smp.DistributedOptimizer

  4. 使用返回DistributedModel的对象而不是用户模型。

  5. 将前向和后向逻辑放在步骤函数中,并使用 smp.step 对其进行修饰。

  6. 通过 将每个进程限制为其自己的设备torch.cuda.set_device(smp.local_rank())

  7. .to()调用之前,使用 smp.step API 将输入张量移动到 GPU (请参阅以下示例)。

  8. torch.Tensor.backwardtorch.autograd.backward 替换为 DistributedModel.backward

  9. 使用 等StepOutput方法对微批处理的输出执行后处理reduce_mean

  10. 如果有评估步骤,同样地将前向逻辑放在 smp.step装饰的函数中,并使用 StepOutput API 后处理输出。

  11. drop_last=True 中设置 DataLoader。或者,如果批次大小不可被微批次数整除,请在训练循环中手动跳过批次。

要了解有关 SageMaker的分布式模型并行库 API 的更多信息,请参阅 API 文档

import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class GroupedNet(nn.Module): def __init__(self): super(GroupedNet, self).__init__() # define layers def forward(self, x): # define forward pass and return model outputs # smdistributed: Define smp.step. Return any tensors needed outside. @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step() # smdistributed: initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time dataset = datasets.MNIST("../data", train=True, download=False) # smdistributed: Shard the dataset based on data-parallel ranks if smp.dp_size() > 1: partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())} dataset = SplitDataset(dataset, partitions=partitions_dict) dataset.select(f"{smp.dp_rank()}") # smdistributed: Set drop_last=True to ensure that batch size is always divisible # by the number of microbatches train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True) model = GroupedNet() optimizer = optim.Adadelta(model.parameters(), lr=4.0) # smdistributed: Use the DistributedModel container to provide the model # to be partitioned across different ranks. For the rest of the script, # the returned DistributedModel object should be used in place of # the model provided for DistributedModel class instantiation. model = smp.DistributedModel(model) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)

使用 PyTorch 手动分区

使用smp.partition上下文管理器将模块放置在特定设备中。未放置在任何smp.partition上下文中的任何模块都放置在 中default_partition。如果 default_partition 设置为 auto_partition ,则需要提供 False。在特定smp.partition上下文中创建的模块放在相应的分区上。

要了解有关 SageMaker的分布式模型并行库 API 的更多信息,请参阅 API 文档

import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class GroupedNet(nn.Module): def __init__(self): super(GroupedNet, self).__init__() with smp.partition(0): # define child modules on device 0 with smp.partition(1): # define child modules on device 1 def forward(self, x): # define forward pass and return model outputs # smdistributed: Define smp.step. Return any tensors needed outside. @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step() # smdistributed: initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time dataset = datasets.MNIST("../data", train=True, download=False) # smdistributed: Shard the dataset based on data-parallel ranks if smp.dp_size() > 1: partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())} dataset = SplitDataset(dataset, partitions=partitions_dict) dataset.select(f"{smp.dp_rank()}") # smdistributed: Set drop_last=True to ensure that batch size is always divisible # by the number of microbatches train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True) model = GroupedNet() optimizer = optim.Adadelta(model.parameters(), lr=4.0) # smdistributed: Use the DistributedModel container to provide the model # to be partitioned across different ranks. For the rest of the script, # the returned DistributedModel object should be used in place of # the model provided for DistributedModel class instantiation. model = smp.DistributedModel(model) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)