修改 PyTorch 训练脚本 - Amazon SageMaker
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

修改 PyTorch 训练脚本

以下是培训脚本示例,您可以使用这些脚本来配置 SageMaker 的模型并行库,使用 PyTorch 版本 1.7.1 和 1.6.0 配置自动分区和手动分区。建议您查看重要注意事项不支持的框架功能创建训练脚本之前。

要使用库,您必须对训练脚本进行的所需修改列于PyTorch.

如果您想要使用手动分区,请查看使用 PyTorch 手动分区.

提示

有关演示如何在 SageMaker 分布式模型并行库中使用 TensorFlow 训练脚本的端到端可运行的笔记本示例,请参阅PyTorch 示例.

请注意,默认情况下,自动分区处于启用状态。除非另行指定,否则以下脚本将使用自动分区。

重要注意事项

使用 SageMaker 的分布式模型并行库配置 PyTorch 训练脚本时,您应了解以下内容:

  • 如果使用依赖于全局渐变规范的优化技术,例如来自整个模型的梯度范数(例如 LAMB 优化器的某些变体或全局渐变裁剪),则需要收集整个模型分区中的所有规范以确保正确性。您可以使用库的通信基本数据类型来执行此操作。

  • 全部torch.Tensor参数转向方法的nn.Modules必须在计算模块输出时使用。换句话说,库不支持torch.Tensor参数设置为模块输出不依赖的模块。

  • 将参数转换为smp.DistributedModel.backward()调用必须依赖于所有模型输出。换句话说,不能从smp.DistributedModel.forward调用,该调用不用于计算被送入smp.DistributedModel.backward调用。

  • 如果有torch.cuda.synchronize()调用时,您可能需要调用torch.cuda.set_device(smp.local_rank())紧接在同步调用之前。否则,可能会在设备 0 中创建不必要的 CUDA 上下文,这将不必要地消耗内存。

  • 由于图书馆的地方nn.Modules在不同的设备上,模型中的模块不得依赖于在smp.step. 任何在整个训练过程中保持固定状态或在外部修改的状态smp.step允许以对所有进程都可见的方式进行。

  • 您无需将模型移动到 GPU(例如,使用model.to(device))使用库时。如果您尝试在模型被分区之前将模型移动到 GPU(在第一个smp.step调用),则会忽略移动调用。库会自动将分配给某个等级的模型部分移动到其 GPU。开始使用库进行训练后,不要将模型移动到 CPU 并使用它,因为对于未分配给进程保存的分区的模块,它将没有正确的参数。如果您想在使用模型并行库训练模型后重新训练模型或在没有库的情况下使用它进行推断,推荐的方法是使用我们的检查点 API 保存完整模型并将其加载回常规 PyTorch 模块。

  • 如果你有一个模块列表,使一个输出输出到另一个模块,那么用nn.Sequential可以显著提高性能。

  • 权重更新 (optimizer.step())需要发生在smp.step因为这是整个向后传递完成并且渐变准备就绪的时候。当使用具有模型和数据并行性的混合模型时,此时也保证完成渐变的 Allreduce。

  • 将库与数据并行性结合使用时,请确保所有数据并行等级上的批次数相同,以便 Allreduce 不会挂起等待未参与步骤的等级。

  • 如果您使用 ml.p4d 实例类型(如 ml.p4d.24xlarge)启动培训作业,则必须设置数据加载器变量num_workers=0. 例如,您可以定义您的DataLoader如下所示:

    dataloader = torch.utils.data.DataLoader( data, batch_size=batch_size, num_workers=0, pin_memory=True, drop_last=True, shuffle=shuffle, )
  • 输入smp.step必须是由DataLoader. 这是因为smp.step内部沿批处理尺寸分割输入张量并对其进行管道。这意味着,传递DataLoader自身转换为smp.step函数来生成内部的模型输入不起作用。

    例如,如果您定义DataLoader如下所示:

    train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True)

    您应该访问train_loader并将它们传递给smp.step装饰功能。不要通过train_loader直接转换到smp.stepfunction.

    def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): ... _, loss_mb = train_step(model, data, target) ... @smp.step def train_step(model, data, target): ... return output, loss
  • 输入张量smp.step必须使用.to()API,它必须在torch.cuda.set_device(local_rank())调用。

    例如,您可以定义train函数,如下。此函数添加datatarget添加到当前设备,使用.to()API,然后再使用这些输入张量调用train_step.

    def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step()

    输入张量smp.set装饰函数已移动到train函数。该模型需要将其移动到当前设备。库会自动将分配给某个等级的模型部分移动到其 GPU。

    @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss

不支持的框架功能

SageMaker 的分布式模型并行库不支持以下 PyTorch 功能:

  • 如果将数据并行性与本机PyTorch DDPtorch.nn.parallel.DistributedDataParallel包装模块不受库支持。该库内部管理与 PyTorch DDP 集成,包括参数广播和渐变 ALLReduce。使用库时,模块缓冲区仅在训练开始时广播一次。如果模型具有需要在每个步骤中跨数据并行组同步的模块缓冲区,则可以通过torch.distributedAPI,使用可以通过smp.get_dp_process_group().

  • 对于混合精密训练,apex.amp不支持。使用具有自动混合精度的库的推荐方法是使用torch.cuda.amp,除了使用smp.amp.GradScaler而不是火炬的实施。

  • torch.jit.ScriptModules或者ScriptFunctions不支持smp.DistributedModel.

  • apexFusedLayerNormFusedAdamFusedLAMB, 和FusedNovoGradfromapex不支持。您可以使用这些库实现通过smp.optimizerssmp.nn请改用 API。

PyTorch

要使用 SageMaker 的分布式模型并行库运行 PyTorch 模型,需要进行以下训练脚本更改:

  1. 导入并初始化库smp.init().

  2. 将模型包装为smp.DistributedModel. 请注意,从forward基础方法nn.Module对象将在模型并行设备之间进行广播,从而产生通信开销,因此不应返回调用方法之外不需要的任何张量(如中间激活)。

  3. 将优化程序包装为smp.DistributedOptimizer.

  4. 使用返回的DistributedModel对象而不是用户模型。

  5. 将前向和向后逻辑放在一个步骤函数中,并用smp.step.

  6. 将每个进程限制在其自己的设备上,通过torch.cuda.set_device(smp.local_rank()).

  7. 将输入张量移动到 GPU 使用.to()之前的 APIsmp.step调用(请参阅下面的示例)。

  8. Replacetorch.Tensor.backwardtorch.autograd.backward替换为DistributedModel.backward.

  9. 对多个微粒的输出执行后处理,使用StepOutput方法,例如reduce_mean.

  10. 如果有评估步骤,则同样地将前向逻辑放置在smp.step-装饰函数和后处理输出使用StepOutputAPI.

  11. Setdrop_last=TrueDataLoader. 或者,如果批次大小无法除以微粒数量,则可以手动跳过训练循环中的批次。

要了解有关 SageMaker 分布式模型并行库 API 的更多信息,请参阅API 文档.

import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class GroupedNet(nn.Module): def __init__(self): super(GroupedNet, self).__init__() # define layers def forward(self, x): # define forward pass and return model outputs # smdistributed: Define smp.step. Return any tensors needed outside. @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step() # smdistributed: initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time dataset = datasets.MNIST("../data", train=True, download=False) # smdistributed: Shard the dataset based on data-parallel ranks if smp.dp_size() > 1: partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())} dataset = SplitDataset(dataset, partitions=partitions_dict) dataset.select(f"{smp.dp_rank()}") # smdistributed: Set drop_last=True to ensure that batch size is always divisible # by the number of microbatches train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True) model = GroupedNet() optimizer = optim.Adadelta(model.parameters(), lr=4.0) # smdistributed: Use the DistributedModel container to provide the model # to be partitioned across different ranks. For the rest of the script, # the returned DistributedModel object should be used in place of # the model provided for DistributedModel class instantiation. model = smp.DistributedModel(model) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)

使用 PyTorch 手动分区

使用smp.partition上下文管理器将模块放置在特定设备中。任何模块未放置在任何smp.partition上下文放置在default_partition. 这些区域有:default_partition需要提供auto_partition设置为False. 在一个特定的smp.partition上下文放置在相应的分区上。

要了解有关 SageMaker 分布式模型并行库 API 的更多信息,请参阅API 文档.

import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class GroupedNet(nn.Module): def __init__(self): super(GroupedNet, self).__init__() with smp.partition(0): # define child modules on device 0 with smp.partition(1): # define child modules on device 1 def forward(self, x): # define forward pass and return model outputs # smdistributed: Define smp.step. Return any tensors needed outside. @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step() # smdistributed: initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time dataset = datasets.MNIST("../data", train=True, download=False) # smdistributed: Shard the dataset based on data-parallel ranks if smp.dp_size() > 1: partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())} dataset = SplitDataset(dataset, partitions=partitions_dict) dataset.select(f"{smp.dp_rank()}") # smdistributed: Set drop_last=True to ensure that batch size is always divisible # by the number of microbatches train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True) model = GroupedNet() optimizer = optim.Adadelta(model.parameters(), lr=4.0) # smdistributed: Use the DistributedModel container to provide the model # to be partitioned across different ranks. For the rest of the script, # the returned DistributedModel object should be used in place of # the model provided for DistributedModel class instantiation. model = smp.DistributedModel(model) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)