使用大型模型推理 TorchServe - Amazon SageMaker
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用大型模型推理 TorchServe

本教程演示如何使用 GPU 在 Amazon SageMaker 中部署大型模型和提供推理。 TorchServe 此示例将 OPT-30b 模型部署到 ml.g5 实例。您可以对其进行修改以使用其他模型和实例类型。请将示例中的 italicized placeholder text 替换为您自己的信息。

TorchServe 是用于大型分布式模型推理的强大开放平台。通过支持原生 Pippy 和 HuggingFace Accelerate 等 PyTorch流行库 DeepSpeed,它提供了统一的处理程序 API,这些API在分布式大型模型和非分布式模型推理场景中保持一致。有关更多信息,请参阅TorchServe的大型模型推理文档

深度学习容器带有 TorchServe

要 TorchServe 在开启的情况下部署大型模型 SageMaker,您可以使用其中一个 SageMaker 深度学习容器 (DLC)。默认情况下,已安装 TorchServe 在所有 Amazon PyTorch DLC 中。在模型加载过程中, TorchServe 可以安装为大型模型量身定制的专用库,例如 Pippy、Deepspeed 和 Accelerate。

下表列出了所有带有 SageMaker DLC。 TorchServe

DLC 类别 框架 硬件 示例 URL

SageMaker 框架容器

PyTorch 2.0.0+

CPU,GPU

763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:2.0.1-gpu-py310-cu118-ubuntu20.04-sagemaker

SageMaker 框架 Graviton 容器

PyTorch 2.0.0+

CPU

763104351884.dkr。ecr.us-east-1.amazonaws.com/: 2.0.1-cpu-py310-ubuntu20.04-sagemaker pytorch-inference-graviton

StabilityAI 推理容器

PyTorch 2.0.0+

GPU

763104351884.dkr。ecr.us-east-1.amazonaws.com/: 2.0.1-sgm0.1.0-gpu-py310-cu118-ubuntu20.04-sagemaker stabilityai-pytorch-inference

Neuron 容器

PyTorch 1.13.1

Neuronx

763104351884.dkr。ecr.us-west-2.amazonaws.com /: 1.13.1-neuron-py310-sdk2.12.0-ubuntu20.04 pytorch-inference-neuron

开始使用

在部署模型之前,请确保满足先决条件。您还可以配置模型参数并自定义处理程序代码。

先决条件

要开始部署,请确保您具备以下先决条件:

  1. 确保您有权访问 Amazon 账户。设置您的环境,以便 Amazon CLI 可以通过 Amazon IAM 用户或 IAM 角色访问您的账户。我们建议使用 IAM 角色。为了在您的个人账户中进行测试,您可以将以下托管权限策略附加到 IAM 角色:

    有关将 IAM 策略附加到用户的更多信息,请参阅《Amazon IAM 用户指南》中的添加和删除 IAM 身份权限

  2. 在本地配置您的依赖项,如以下示例所示。

    1. 安装以下版本的第 2 版 Amazon CLI:

      # Install the latest AWS CLI v2 if it is not installed !curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" !unzip awscliv2.zip #Follow the instructions to install v2 on the terminal !cat aws/README.md
    2. 安装 SageMaker 并安装 Boto3 客户端:

      # If already installed, update your client #%pip install sagemaker pip --upgrade --quiet !pip install -U sagemaker !pip install -U boto !pip install -U botocore !pip install -U boto3

配置模型设置和参数

TorchServe 用于torchrun为模型并行处理设置分布式环境。 TorchServe 能够为大型模型支持多个工作人员。默认情况下, TorchServe 使用循环算法将 GPU 分配给主机上的工作程序。对于大型模型推理,系统根据 model_config.yaml 文件中指定的 GPU 数量,自动计算分配给每个工作线程的 GPU 数量。环境变量 CUDA_VISIBLE_DEVICES 指定在给定时间可见的 GPU 设备 ID,该变量根据此数字设置。

例如,假设一个节点上有 8 个 GPU,而一个工作人员在一个节点上需要 4 个 GPU () nproc_per_node=4。在这种情况下,将四个 GPU TorchServe 分配给第一个工作程序 (),为第二个工作程序 (CUDA_VISIBLE_DEVICES="0,1,2,3") 分配四个 GPU。CUDA_VISIBLE_DEVICES="4,5,6,7”

除此默认行为外, TorchServe 还允许用户灵活地为工作人员指定 GPU。例如,如果您在模型配置 YAML 文件deviceIds: [2,3,4,5]中设置变量,然后设置nproc_per_node=2,则 TorchServe 分配CUDA_VISIBLE_DEVICES=”2,3”给第一个工作程序和CUDA_VISIBLE_DEVICES="4,5”第二个工作程序。

在以下 model_config.yaml 示例中,我们为 OPT-30b 模型配置前端和后端参数。配置的前端参数是 parallelTypedeviceTypedeviceIds torchrun。有关您可以配置的前端参数的更多详细信息,请参阅PyTorch GitHub 文档。后端配置基于 YAML 映射,允许自由格式的自定义。对于后端参数,我们定义了自定义处理程序代码使用的 DeepSpeed 配置和其他参数。

# TorchServe front-end parameters minWorkers: 1 maxWorkers: 1 maxBatchDelay: 100 responseTimeout: 1200 parallelType: "tp" deviceType: "gpu" # example of user specified GPU deviceIds deviceIds: [0,1,2,3] # sets CUDA_VISIBLE_DEVICES torchrun: nproc-per-node: 4 # TorchServe back-end parameters deepspeed: config: ds-config.json checkpoint: checkpoints.json handler: # parameters for custom handler code model_name: "facebook/opt-30b" model_path: "model/models--facebook--opt-30b/snapshots/ceea0a90ac0f6fae7c2c34bcb40477438c152546" max_length: 50 max_new_tokens: 10 manual_seed: 40

自定义处理程序

TorchServe 为使用常用库构建的大型模型推断提供基础处理程序和处理程序实用程序。以下示例演示了自定义处理程序类是如何TransformersSeqClassifierHandler扩展BaseDeepSpeedHandler和使用处理程序实用程序的。有关完整的代码示例,请参阅 PyTorch GitHub文档中的custom_handler.py代码

class TransformersSeqClassifierHandler(BaseDeepSpeedHandler, ABC): """ Transformers handler class for sequence, token classification and question answering. """ def __init__(self): super(TransformersSeqClassifierHandler, self).__init__() self.max_length = None self.max_new_tokens = None self.tokenizer = None self.initialized = False def initialize(self, ctx: Context): """In this initialize function, the HF large model is loaded and partitioned using DeepSpeed. Args: ctx (context): It is a JSON Object containing information pertaining to the model artifacts parameters. """ super().initialize(ctx) model_dir = ctx.system_properties.get("model_dir") self.max_length = int(ctx.model_yaml_config["handler"]["max_length"]) self.max_new_tokens = int(ctx.model_yaml_config["handler"]["max_new_tokens"]) model_name = ctx.model_yaml_config["handler"]["model_name"] model_path = ctx.model_yaml_config["handler"]["model_path"] seed = int(ctx.model_yaml_config["handler"]["manual_seed"]) torch.manual_seed(seed) logger.info("Model %s loading tokenizer", ctx.model_name) self.tokenizer = AutoTokenizer.from_pretrained(model_name) self.tokenizer.pad_token = self.tokenizer.eos_token config = AutoConfig.from_pretrained(model_name) with torch.device("meta"): self.model = AutoModelForCausalLM.from_config( config, torch_dtype=torch.float16 ) self.model = self.model.eval() ds_engine = get_ds_engine(self.model, ctx) self.model = ds_engine.module logger.info("Model %s loaded successfully", ctx.model_name) self.initialized = True def preprocess(self, requests): """ Basic text preprocessing, based on the user's choice of application mode. Args: requests (list): A list of dictionaries with a "data" or "body" field, each containing the input text to be processed. Returns: tuple: A tuple with two tensors: the batch of input ids and the batch of attention masks. """ def inference(self, input_batch): """ Predicts the class (or classes) of the received text using the serialized transformers checkpoint. Args: input_batch (tuple): A tuple with two tensors: the batch of input ids and the batch of attention masks, as returned by the preprocess function. Returns: list: A list of strings with the predicted values for each input text in the batch. """ def postprocess(self, inference_output): """Post Process Function converts the predicted response into Torchserve readable format. Args: inference_output (list): It contains the predicted response of the input text. Returns: (list): Returns a list of the Predictions and Explanations. """

准备模型构件

在部署模型之前 SageMaker,必须打包模型工件。对于大型模型,我们建议您使用带有参数的 PyTorch torch-model-archiver工具--archive-format no-archive,这样可以跳过压缩模型工件。以下示例将所有模型构件保存到名为 opt/ 的新文件夹中。

torch-model-archiver --model-name opt --version 1.0 --handler custom_handler.py --extra-files ds-config.json -r requirements.txt --config-file opt/model-config.yaml --archive-format no-archive

创建opt/文件夹后,使用 Download_Model 工具将 Opt-30b 模型下载到该文件夹。 PyTorch

cd opt python path_to/Download_model.py --model_path model --model_name facebook/opt-30b --revision main

最后,将模型构件上传到 Amazon S3 存储桶。

aws s3 cp opt {your_s3_bucket}/opt --recursive

现在,您应该已将模型项目存储在 Amazon S3 中,可以随时部署到 SageMaker终端节点。

使用 SageMaker Python 软件开发工具包部署模型

准备好模型构件后,您可以将模型部署到 SageMaker 托管端点。本节介绍如何将单个大型模型部署到端点并进行流式响应预测。有关从端点进行流式响应的更多信息,请参阅调用实时端点

要部署模型,请完成以下步骤:

  1. 创建 SageMaker 会话,如以下示例所示。

    import boto3 import sagemaker from sagemaker import Model, image_uris, serializers, deserializers boto3_session=boto3.session.Session(region_name="us-west-2") smr = boto3.client('sagemaker-runtime-demo') sm = boto3.client('sagemaker') role = sagemaker.get_execution_role() # execution role for the endpoint sess= sagemaker.session.Session(boto3_session, sagemaker_client=sm, sagemaker_runtime_client=smr) # SageMaker session for interacting with different Amazon APIs region = sess._region_name # region name of the current SageMaker Studio Classic environment account = sess.account_id() # account_id of the current SageMaker Studio Classic environment # Configuration: bucket_name = sess.default_bucket() prefix = "torchserve" output_path = f"s3://{bucket_name}/{prefix}" print(f'account={account}, region={region}, role={role}, output_path={output_path}')
  2. 在中创建未压缩的模型 SageMaker,如以下示例所示。

    from datetime import datetime instance_type = "ml.g5.24xlarge" endpoint_name = sagemaker.utils.name_from_base("ts-opt-30b") s3_uri = {your_s3_bucket}/opt model = Model( name="torchserve-opt-30b" + datetime.now().strftime("%Y-%m-%d-%H-%M-%S"), # Enable SageMaker uncompressed model artifacts model_data={ "S3DataSource": { "S3Uri": s3_uri, "S3DataType": "S3Prefix", "CompressionType": "None", } }, image_uri=container, role=role, sagemaker_session=sess, env={"TS_INSTALL_PY_DEP_PER_MODEL": "true"}, ) print(model)
  3. 将模型部署到 Amazon EC2 实例,如以下示例所示。

    model.deploy( initial_instance_count=1, instance_type=instance_type, endpoint_name=endpoint_name, volume_size=512, # increase the size to store large model model_data_download_timeout=3600, # increase the timeout to download large model container_startup_health_check_timeout=600, # increase the timeout to load large model )
  4. 初始化类以处理流式响应,如以下示例所示。

    import io class Parser: """ A helper class for parsing the byte stream input. The output of the model will be in the following format: ``` b'{"outputs": [" a"]}\n' b'{"outputs": [" challenging"]}\n' b'{"outputs": [" problem"]}\n' ... ``` While usually each PayloadPart event from the event stream will contain a byte array with a full json, this is not guaranteed and some of the json objects may be split across PayloadPart events. For example: ``` {'PayloadPart': {'Bytes': b'{"outputs": '}} {'PayloadPart': {'Bytes': b'[" problem"]}\n'}} ``` This class accounts for this by concatenating bytes written via the 'write' function and then exposing a method which will return lines (ending with a '\n' character) within the buffer via the 'scan_lines' function. It maintains the position of the last read position to ensure that previous bytes are not exposed again. """ def __init__(self): self.buff = io.BytesIO() self.read_pos = 0 def write(self, content): self.buff.seek(0, io.SEEK_END) self.buff.write(content) data = self.buff.getvalue() def scan_lines(self): self.buff.seek(self.read_pos) for line in self.buff.readlines(): if line[-1] != b'\n': self.read_pos += len(line) yield line[:-1] def reset(self): self.read_pos = 0
  5. 测试流式响应预测,如以下示例所示。

    import json body = "Today the weather is really nice and I am planning on".encode('utf-8') resp = smr.invoke_endpoint_with_response_stream(EndpointName=endpoint_name, Body=body, ContentType="application/json") event_stream = resp['Body'] parser = Parser() for event in event_stream: parser.write(event['PayloadPart']['Bytes']) for line in parser.scan_lines(): print(line.decode("utf-8"), end=' ')

现在,您已经将模型部署到 SageMaker 终端节点,应该可以调用它来获取响应。有关 SageMaker 实时终端节点的更多信息,请参阅托管单个模型