修改 PyTorch 训练脚本 - Amazon SageMaker
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

修改 PyTorch 训练脚本

在本节中,您将了解如何修改 PyTorch 训练脚本来配置 SageMaker 用于自动分区和手动分区的分布式模型 parallel 库。

PyTorch 版本支持 SageMaker 和 SageMaker 分布式模型 parallel 库
PyTorch 版本 SageMaker 分布式模型 parallel 库版本 smdistributed-modelparallel集成映像 URI
v1.11.0* smdistributed-modelparallel==v1.9.0 763104351884.dkr.ecr.<region>.amazonaws.com/pytorch-training:1.11.0-gpu-py38-cu113-ubuntu20.04-sagemaker
v1.10.2* smdistributed-modelparallel==v1.7.0

763104351884.dkr.ecr.<region>.amazonaws.com/pytorch-training:1.10.2-gpu-py38-cu113-ubuntu20.04-sagemaker

v1.10.0 smdistributed-modelparallel==v1.5.0

763104351884.dkr.ecr.<region>.amazonaws.com/pytorch-training:1.10.0-gpu-py38-cu113-ubuntu20.04-sagemaker

v1.9.1 smdistributed-modelparallel==v1.4.0

763104351884.dkr.ecr.<region>.amazonaws.com/pytorch-training:1.9.1-gpu-py38-cu111-ubuntu20.04

v1.8.1* smdistributed-modelparallel==v1.6.0 763104351884.dkr.ecr.<region>.amazonaws.com/pytorch-training:1.8.1-gpu-py36-cu111-ubuntu18.04

* 该 SageMaker 分布式模型 parallel 库 v1.6.0 及更高版本为 PyTorch 提供扩展功能。有关更多信息,请参阅 PyTorch 的 SageMaker 模型并行库的扩展功能

要查看库的最新更新,请参阅SageMaker 分布式模型并行发行说中的SageMaker Python 开发工具包.

以下主题展示了训练脚本的示例,您可以使用这些脚本配置 SageMaker 的模型 parallel 库以进行自动分区和手动分区 PyTorch 模型。建议您查看重要注意事项不支持的框架功能在创建训练脚本之前。

中列出了您必须对训练脚本进行的使用库所需的修改PyTorch.

如果你想使用手动分区,还可以查看使用 PyTorch 手动分区.

提示

适用于 end-to-end 演示如何使用 PyTorch 使用 SageMaker 分布式模型 parallel 库,请参阅PyTorch 示例.

请注意,默认情况下,将启用自动分区。除非另行指定,否则以下脚本使用自动分区。

PyTorch

需要更改以下训练脚本才能运行 PyTorch 使用 SageMaker 的分布式模型 parallel 库的模型:

  1. 使用导入并初始化库smp.init().

  2. 用模型包裹smp.DistributedModel. 请注意,从forward基础方法nn.Module对象将在模型并行设备之间广播,从而产生通信开销,因此不应返回调用方法之外不需要的任何张量(例如中间激活)。

  3. 用包装优化器smp.DistributedOptimizer.

  4. 使用返回的DistributedModel对象而不是用户模型。

  5. 将向前和向后逻辑放在一个步骤函数中然后用来装饰它smp.step.

  6. 将每个进程限制在自己的设备上torch.cuda.set_device(smp.local_rank()).

  7. 使用将输入张量移动到 GPU.to()之前的 APIsmp.step电话(见下面的例子)。

  8. Replacetorch.Tensor.backwardtorch.autograd.backwardDistributedModel.backward.

  9. 使用跨微批量对输出执行后处理StepOutput方法,例如reduce_mean.

  10. 如果有评估步骤,同样地将向前逻辑放在smp.step-装饰功能并使用后处理输出StepOutputAPI.

  11. Setdrop_last=TrueDataLoader. 或者,如果批量大小不能被微批次数整除,则手动跳过训练循环中的批次。

要了解 SageMaker 的分布式模型 parallel 库 API 的更多信息,请参阅API 文档.

import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class GroupedNet(nn.Module): def __init__(self): super(GroupedNet, self).__init__() # define layers def forward(self, x): # define forward pass and return model outputs # smdistributed: Define smp.step. Return any tensors needed outside. @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step() # smdistributed: initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time dataset = datasets.MNIST("../data", train=True, download=False) # smdistributed: Shard the dataset based on data-parallel ranks if smp.dp_size() > 1: partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())} dataset = SplitDataset(dataset, partitions=partitions_dict) dataset.select(f"{smp.dp_rank()}") # smdistributed: Set drop_last=True to ensure that batch size is always divisible # by the number of microbatches train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True) model = GroupedNet() optimizer = optim.Adadelta(model.parameters(), lr=4.0) # smdistributed: Use the DistributedModel container to provide the model # to be partitioned across different ranks. For the rest of the script, # the returned DistributedModel object should be used in place of # the model provided for DistributedModel class instantiation. model = smp.DistributedModel(model) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)

使用 PyTorch 手动分区

使用smp.partition上下文管理器将模块放置在特定设备中。任何未放置在任何模块中smp.partition上下文放置在default_partition. 这些区域有:default_partition如果需要提供auto_partition设置为False. 在特定内部创建的模块smp.partition上下文放置在相应的分区上。

要了解 SageMaker 的分布式模型 parallel 库 API 的更多信息,请参阅API 文档.

import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class GroupedNet(nn.Module): def __init__(self): super(GroupedNet, self).__init__() with smp.partition(0): # define child modules on device 0 with smp.partition(1): # define child modules on device 1 def forward(self, x): # define forward pass and return model outputs # smdistributed: Define smp.step. Return any tensors needed outside. @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step() # smdistributed: initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time dataset = datasets.MNIST("../data", train=True, download=False) # smdistributed: Shard the dataset based on data-parallel ranks if smp.dp_size() > 1: partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())} dataset = SplitDataset(dataset, partitions=partitions_dict) dataset.select(f"{smp.dp_rank()}") # smdistributed: Set drop_last=True to ensure that batch size is always divisible # by the number of microbatches train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True) model = GroupedNet() optimizer = optim.Adadelta(model.parameters(), lr=4.0) # smdistributed: Use the DistributedModel container to provide the model # to be partitioned across different ranks. For the rest of the script, # the returned DistributedModel object should be used in place of # the model provided for DistributedModel class instantiation. model = smp.DistributedModel(model) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)

重要注意事项

当你配置 PyTorch 使用 SageMaker 的分布式模型 parallel 库使用训练脚本,您应了解以下内容:

  • 如果您使用的优化技术依赖于全局渐变规范,例如整个模型的渐变范数,例如 LAMP 优化器或全局渐变剪切的某些变体,则需要在模型分区中收集所有规范以确保正确性。你可以使用图书馆的通信基本数据类型来做到这一点。

  • 全部torch.Tensor对前瞻方法的参数nn.Modules在模型中必须用于计算模块输出。换句话说,图书馆不支持存在torch.Tensor参数指向模块输出不依赖的模块。

  • smp.DistributedModel.backward()调用必须取决于所有模型输出。换句话说,无法从smp.DistributedModel.forward在计算被输入的张量时没有使用的调用smp.DistributedModel.backward调用。

  • 如果有torch.cuda.synchronize()在你的代码中调用,你可能需要打电话torch.cuda.set_device(smp.local_rank())紧接在同步调用之前。否则,可能会在设备 0 中创建不必要的 CUDA 上下文,这将不必要地消耗内存。

  • 自图书馆放置以来nn.Modules在不同的设备上,模型中的模块不能依赖于内部修改的任何全局状态smp.step. 在整个训练过程中保持固定或在外部修改的任何状态smp.step允许以所有进程都可见的方式。

  • 您不需要将模型移动到 GPU(例如,使用model.to(device)) 在使用图书馆时。如果在模型分区之前尝试将模型移动到 GPU(在第一个模型之前)smp.step调用),移动呼叫将被忽略。库会自动将分配给排名的模型部分移动到其 GPU。一旦开始使用库进行训练,不要将模型移动到 CPU 并使用它,因为对于没有分配给过程保存的分区的模块,它将没有正确的参数。如果您想在使用模型并行库训练后重新训练模型或在没有库的情况下使用它进 parallel 推理,建议的方法是使用我们的检查点 API 保存完整模型并将其加载回常规模。 PyTorch 模块。

  • 如果你有一个模块列表,例如一个输出的输出到另一个模块,请将该列表替换为nn.Sequential可以显著提高性能。

  • 权重更新(optimizer.step()) 需要在外面发生smp.step因为那是整个向后传递完成并且渐变准备就绪的时候。当使用具有模型和数据并行性的混合模型时,此时,梯度的 Allreduce 也可以保证完成。

  • 将库与数据并行结合使用时,请确保所有数据并 parallel 排名上的批次数相同,以便 Allreduce 不会挂起等待未参与该步骤的排名。

  • 如果您使用 ml.p4d 实例类型(例如 ml.p4d.24xlarge)启动训练作业,则必须设置数据加载器变量num_workers=0. 例如,您可以定义您的DataLoader如下所示:

    dataloader = torch.utils.data.DataLoader( data, batch_size=batch_size, num_workers=0, pin_memory=True, drop_last=True, shuffle=shuffle, )
  • 对的输入smp.step必须是由生成的模型输入DataLoader. 这是因为smp.step沿批处理维度内部拆分输入张量并将它们管道。这意味着通过DataLoader本身转换为smp.step在里面生成模型输入的函数不起作用。

    例如,如果你定义了DataLoader如下所示:

    train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True)

    你应该访问生成的模型输入train_loader然后把那些传递给smp.step装饰功能。不要通过train_loader直接转换为smp.stepfunction.

    def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): ... _, loss_mb = train_step(model, data, target) ... @smp.step def train_step(model, data, target): ... return output, loss
  • 输入张量smp.step必须使用移动到当前设备.to()API,必须在torch.cuda.set_device(local_rank())调用。

    例如,您可以定义train函数如下方式。此函数添加datatarget使用调用当前设备.to()在使用这些输入张量调用之前的 APItrain_step.

    def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step()

    这个的输入张量smp.set装饰功能已移动到中的当前设备train上面的函数。模型确实如此需要移动到当前设备。库会自动将分配给排名的模型部分移动到其 GPU。

    @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss

不支持的框架功能

以下 PyTorch SageMaker 的分布式模型 parallel 库不支持功能:

  • 如果你将数据并行与本机一起使用PyTorch DDPtorch.nn.parallel.DistributedDataParallel库不支持包装模块。该图书馆内部管理与集成 PyTorch DDP,包括参数广播和梯度 allReduce。使用库时,模块缓冲区仅在训练开始时广播一次。如果模型的模块缓冲区在每个步骤都需要在数据 parallel 组之间进行同步,则可以通过torch.distributedAPI,使用可以通过以下方式获得的进程组smp.get_dp_process_group().

  • 对于混合精度训练,apex.amp不支持模块。使用自动混合精度库的推荐方法是使用torch.cuda.amp,除了使用smp.amp.GradScaler而不是在火炬中实施。

  • torch.jit.ScriptModules要么ScriptFunctions不支持smp.DistributedModel.

  • apexFusedLayerNormFusedAdamFusedLAMB, 和FusedNovoGradapex不支持。您可以通过以下方式使用库实现。smp.optimizerssmp.nn而不是 API。