修改 PyTorch 训练脚本 - Amazon SageMaker
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

修改 PyTorch 训练脚本

在此部分中,您将学习如何修改 PyTorch 训练脚本以配置 SageMaker 模型并行性库,用于自动分区和手动分区。

注意

要了解该库支持哪些 PyTorch 版本,请参阅支持的框架和 Amazon Web Services 区域

提示

有关演示如何将 PyTorch 训练脚本与 SageMaker 模型并行性库结合使用的端到端笔记本示例,请参阅 PyTorch 示例

请注意,默认情况下启用自动分区。除非另行指定,否则以下脚本使用自动分区。

PyTorch 的自动拆分

要使用 SageMaker 的模型并行性库运行 PyTorch 训练脚本,训练脚本需要进行以下更改:

  1. 使用 smdistributed.modelparallel.torch.init() 导入和初始化库。

  2. 使用 smdistributed.modelparallel.torch.DistributedModel 包装模型。请注意,从底层 nn.Module 对象的 forward 方法返回的任何张量都将在模型并行设备之间广播,这会产生通信开销,因此在调用方法之外不需要的任何张量(例如中间激活)都不应返回。

    注意

    对于 FP16 训练,您需要使用 smdistributed.modelparallel.torch.model_creation() 上下文管理器来包装模型。有关更多信息,请参阅 使用模型并行性的 FP16 训练

  3. 使用 smdistributed.modelparallel.torch.DistributedOptimizer 包装优化器。

    注意

    对于 FP16 训练,您需要设置静态或动态损失缩放。有关更多信息,请参阅 使用模型并行性的 FP16 训练

  4. 使用返回的 DistributedModel 对象而不是用户模型。

  5. 将向前和向后逻辑放在步进函数中,然后用 smdistributed.modelparallel.torch.step 进行修饰。

  6. 通过 torch.cuda.set_device(smp.local_rank()) 将每个进程限制在自己的设备上。

  7. smp.step 调用之前,使用 .to() API 将输入张量移至 GPU(参见下面的示例)。

  8. torch.Tensor.backwardtorch.autograd.backward 替换为 DistributedModel.backward

  9. 使用 StepOutput 方法(例如 reduce_mean)对微批次的输出进行后处理。

  10. 如果有评估步骤,则同样将向前逻辑放在 smp.step 修饰的函数中,然后使用 StepOutput API 对输出进行后处理。

  11. DataLoader 中设置 drop_last=True。或者,如果批次大小不能被微批次数量整除,则可以手动跳过训练循环中的批次。

要了解有关 SageMaker 的模型并行性库 API 的更多信息,请参阅 API 文档

import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class GroupedNet(nn.Module): def __init__(self): super(GroupedNet, self).__init__() # define layers def forward(self, x): # define forward pass and return model outputs # smdistributed: Define smp.step. Return any tensors needed outside. @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step() # smdistributed: initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time dataset = datasets.MNIST("../data", train=True, download=False) # smdistributed: Shard the dataset based on data-parallel ranks if smp.dp_size() > 1: partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())} dataset = SplitDataset(dataset, partitions=partitions_dict) dataset.select(f"{smp.dp_rank()}") # smdistributed: Set drop_last=True to ensure that batch size is always divisible # by the number of microbatches train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True) model = GroupedNet() optimizer = optim.Adadelta(model.parameters(), lr=4.0) # smdistributed: Use the DistributedModel container to provide the model # to be partitioned across different ranks. For the rest of the script, # the returned DistributedModel object should be used in place of # the model provided for DistributedModel class instantiation. model = smp.DistributedModel(model) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)

PyTorch 的手动拆分

使用 smp.partition 上下文管理器将模块放置在特定设备中。未放在任何 smp.partition 上下文中的任何模块都放在 default_partition 中。如果 auto_partition 设置为 False,则需要提供 default_partition。在特定 smp.partition 上下文中创建的模块将放置在对应的分区上。

要了解有关 SageMaker 的模型并行性库 API 的更多信息,请参阅 API 文档

import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class GroupedNet(nn.Module): def __init__(self): super(GroupedNet, self).__init__() with smp.partition(0): # define child modules on device 0 with smp.partition(1): # define child modules on device 1 def forward(self, x): # define forward pass and return model outputs # smdistributed: Define smp.step. Return any tensors needed outside. @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step() # smdistributed: initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time dataset = datasets.MNIST("../data", train=True, download=False) # smdistributed: Shard the dataset based on data-parallel ranks if smp.dp_size() > 1: partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())} dataset = SplitDataset(dataset, partitions=partitions_dict) dataset.select(f"{smp.dp_rank()}") # smdistributed: Set drop_last=True to ensure that batch size is always divisible # by the number of microbatches train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True) model = GroupedNet() optimizer = optim.Adadelta(model.parameters(), lr=4.0) # smdistributed: Use the DistributedModel container to provide the model # to be partitioned across different ranks. For the rest of the script, # the returned DistributedModel object should be used in place of # the model provided for DistributedModel class instantiation. model = smp.DistributedModel(model) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)

注意事项

在使用 SageMaker 的模型并行性库配置 PyTorch 训练脚本时,您应注意以下内容:

  • 如果您使用的优化技术依赖于全局梯度范数,例如整个模型的梯度范数(如 LAMB 优化器或全局梯度剪裁的某些变体),则需要收集模型分区中的所有范数以确保正确性。您可以使用库的通信基本数据类型来完成此操作。

  • 模型中 nn.Modules 的所有向前方法的所有 torch.Tensor 参数都必须在模块输出的计算中使用。换而言之,该库不支持向模块传递模块输出所不依赖的 torch.Tensor 参数。

  • 传递给 smp.DistributedModel.backward() 调用的参数必须依赖于所有模型输出。换而言之,smp.DistributedModel.forward 调用的输出,必须在计算提供给 smp.DistributedModel.backward 的张量时全部使用。

  • 如果您的代码中有 torch.cuda.synchronize() 调用,则可能需要在同步调用之前立即调用 torch.cuda.set_device(smp.local_rank())。否则,可能会在设备 0 中创建不必要的 CUDA 上下文,这将不必要地消耗内存。

  • 由于库放置在不同的设备上的 nn.Modules 中,因此模型中的模块不得依赖于任何在 smp.step 内部修改的全局状态。在整个训练过程中保持固定的任何状态,或者在 smp.step 之外以对所有进程都可见的方式修改的任何状态,都是允许的。

  • 使用库时,您无需将模型移至 GPU(例如,使用 model.to(device))。如果您尝试在模型分区之前(在第一次 smp.step 调用之前)将模型移动到 GPU,则会忽略移动调用。对于模型中分配给某个秩的部分,库会自动将这个部分移动到其 GPU。一旦开始使用库进行训练,请不要将模型移至 CPU 并使用它,因为对于未分配给进程所持有的分区的模块,它不会有正确的参数。如果在使用模型并行性库训练了模型之后,您想重新训练模型或者在没有库的情况下将其用于推理,则推荐的方法是使用我们的检查点 API 保存完整的模型,然后将其加载回常规 PyTorch 模块。

  • 如果您有模块列表,这样一个模块的输出就会传送到另一个模块中,而使用 nn.Sequential 替换该列表可以显著提高性能。

  • 权重更新 (optimizer.step()) 需要在 smp.step 外部进行,因为此时整个向后传递已完成并且梯度准备就绪。使用具有模型和数据并行性的混合模型时,此时还可以保证完成梯度的 AllReduce。

  • 将库与数据并行性结合使用时,请确保所有数据并行秩上的批次数量相同,这样 AllReduce 就不会挂起以等待未参与该步骤的秩。

  • 如果您使用 ml.p4d 实例类型(例如 ml.p4d.24xlarge)启动训练作业,则必须设置数据加载程序变量 num_workers=0。例如,您可以如下所示定义 DataLoader

    dataloader = torch.utils.data.DataLoader( data, batch_size=batch_size, num_workers=0, pin_memory=True, drop_last=True, shuffle=shuffle, )
  • smp.step 的输入必须是 DataLoader 生成的模型输入。这是因为在 smp.step 内部按照批次维度拆分输入张量,并对其进行管道传输。这意味着将 DataLoader 本身传递给 smp.step 函数以在其中生成模型输入,这种方法行不通。

    例如,如果您如下所示定义 DataLoader

    train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True)

    您应该访问 train_loader 生成的模型输入,并将这些输入传递给 smp.step 修饰的函数。不要将 train_loader 直接传递给 smp.step 函数。

    def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): ... _, loss_mb = train_step(model, data, target) ... @smp.step def train_step(model, data, target): ... return output, loss
  • smp.step 的输入张量必须使用 .to() API 移动到当前设备,此操作必须在 torch.cuda.set_device(local_rank()) 调用后进行。

    例如,您可以如下所示定义 train 函数。在使用这些输入张量调用 train_step 之前,此函数使用 .to() API 将 datatarget 添加到当前设备。

    def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step()

    在上面的 train 函数中,此 smp.set 修饰的函数的输入张量已移至当前设备。模型无需移至当前设备。对于模型中分配给某个秩的部分,库会自动将这个部分移动到其 GPU。

    @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss

不支持的框架功能

SageMaker 的模型并行性库不支持以下 PyTorch 功能:

  • 如果您将数据并行性与原生 PyTorch DDP 结合使用,则该库不支持 torch.nn.parallel.DistributedDataParallel 包装器模块。库在内部管理与 PyTorch DDP 的集成,包括参数广播和梯度 AllReduce。使用库时,模块缓冲区在训练开始时仅广播一次。如果您的模型具有的模块缓冲区需要在每个步骤中跨数据并行组同步,则可以通过 torch.distributed API,使用可以通过 smp.get_dp_process_group() 获取的进程组进行同步。

  • 对于混合精度训练,不支持 apex.amp 模块。使用具有自动混合精度的库的推荐方法是使用 torch.cuda.amp,唯一的不同是使用 smp.amp.GradScaler 而不是在 Torch 中实施。

  • smp.DistributedModel 不支持 torch.jit.ScriptModulesScriptFunctions

  • 不支持 apex : FusedLayerNormFusedAdamFusedLAMB 以及来自 apexFusedNovoGrad。您可以改为通过 smp.optimizerssmp.nn API 来使用这些项的库实施。