Large model inference with TorchServe - Amazon SageMaker
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

Large model inference with TorchServe

This tutorial demonstrates how to deploy large models and serve inference in Amazon SageMaker with TorchServe on GPUs. This example deploys the OPT-30b model to an ml.g5 instance. You can modify this to work with other models and instance types. Replace the italicized placeholder text in the examples with your own information.

TorchServe is a powerful open platform for large distributed model inference. By supporting popular libraries like PyTorch, native PiPPy, DeepSpeed, and HuggingFace Accelerate, it offers uniform handler APIs that remain consistent across distributed large model and non-distributed model inference scenarios. For more information, see TorchServe’s large model inference documentation.

Deep learning containers with TorchServe

To deploy a large model with TorchServe on SageMaker, you can use one of the SageMaker deep learning containers (DLCs). By default, TorchServe is installed in all Amazon PyTorch DLCs. During model loading, TorchServe can install specialized libraries tailored for large models such as PiPPy, Deepspeed, and Accelerate.

The following table lists all of the SageMaker DLCs with TorchServe.

DLC cateogry Framework Hardware Example URL

SageMaker Framework Containers

PyTorch 2.0.0+

CPU, GPU

763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:2.0.1-gpu-py310-cu118-ubuntu20.04-sagemaker

SageMaker Framework Graviton Containers

PyTorch 2.0.0+

CPU

763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference-graviton:2.0.1-cpu-py310-ubuntu20.04-sagemaker

StabilityAI Inference Containers

PyTorch 2.0.0+

GPU

763104351884.dkr.ecr.us-east-1.amazonaws.com/stabilityai-pytorch-inference:2.0.1-sgm0.1.0-gpu-py310-cu118-ubuntu20.04-sagemaker

Neuron Containers

PyTorch 1.13.1

Neuronx

763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-inference-neuron:1.13.1-neuron-py310-sdk2.12.0-ubuntu20.04

Getting started

Before deploying your model, complete the prerequisites. You can also configure your model parameters and customize the handler code.

Prerequisites

To get started, ensure that you have the following prerequisites:

  1. Ensure you have access to an Amazon account. Set up your environment so that the Amazon CLI can access your account through either an Amazon IAM user or an IAM role. We recommend using an IAM role. For the purposes of testing in your personal account, you can attach the following managed permissions policies to the IAM role:

    For more information about attaching IAM policies to a role, see Adding and removing IAM identity permissions in the Amazon IAM User Guide.

  2. Locally configure your dependencies, as shown in the following examples.

    1. Install version 2 of the Amazon CLI:

      # Install the latest AWS CLI v2 if it is not installed !curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" !unzip awscliv2.zip #Follow the instructions to install v2 on the terminal !cat aws/README.md
    2. Install SageMaker and the Boto3 client:

      # If already installed, update your client #%pip install sagemaker pip --upgrade --quiet !pip install -U sagemaker !pip install -U boto !pip install -U botocore !pip install -U boto3

Configure model settings and parameters

TorchServe uses torchrun to set up the distributed environment for model parallel processing. TorchServe has the capability to support multiple workers for a large model. By default, TorchServe uses a round-robin algorithm to assign GPUs to a worker on a host. In the case of large model inference, the number of GPUs assigned to each worker is automatically calculated based on the number of GPUs specified in the model_config.yaml file. The environment variable CUDA_VISIBLE_DEVICES, which specifies the GPU device IDs that are visible at a given time, is set based this number.

For example, suppose there are 8 GPUs on a node and one worker needs 4 GPUs on a node (nproc_per_node=4). In this case, TorchServe assigns four GPUs to the first worker (CUDA_VISIBLE_DEVICES="0,1,2,3") and four GPUs to the second worker (CUDA_VISIBLE_DEVICES="4,5,6,7”).

In addition to this default behavior, TorchServe provides the flexibility for users to specify GPUs for a worker. For instance, if you set the variable deviceIds: [2,3,4,5] in the model config YAML file, and set nproc_per_node=2, then TorchServe assigns CUDA_VISIBLE_DEVICES=”2,3” to the first worker and CUDA_VISIBLE_DEVICES="4,5” to the second worker.

In the following model_config.yaml example, we configure both front-end and back-end parameters for the OPT-30b model. The configured front-end parameters are parallelType, deviceType, deviceIds and torchrun. For more detailed information about the front-end parameters you can configure, see the PyTorch GitHub documentation. The back-end configuration is based on a YAML map that allows for free-style customization. For the back-end parameters, we define the DeepSpeed configuration and additional parameters used by custom handler code.

# TorchServe front-end parameters minWorkers: 1 maxWorkers: 1 maxBatchDelay: 100 responseTimeout: 1200 parallelType: "tp" deviceType: "gpu" # example of user specified GPU deviceIds deviceIds: [0,1,2,3] # sets CUDA_VISIBLE_DEVICES torchrun: nproc-per-node: 4 # TorchServe back-end parameters deepspeed: config: ds-config.json checkpoint: checkpoints.json handler: # parameters for custom handler code model_name: "facebook/opt-30b" model_path: "model/models--facebook--opt-30b/snapshots/ceea0a90ac0f6fae7c2c34bcb40477438c152546" max_length: 50 max_new_tokens: 10 manual_seed: 40

Customize handlers

TorchServe offers base handlers and handler utilities for large model inference built with popular libraries. The following example demonstrates how the custom handler class TransformersSeqClassifierHandler extends BaseDeepSpeedHandler and uses the handler utilities. For a full code example, see the custom_handler.py code on the PyTorch GitHub documentation.

class TransformersSeqClassifierHandler(BaseDeepSpeedHandler, ABC): """ Transformers handler class for sequence, token classification and question answering. """ def __init__(self): super(TransformersSeqClassifierHandler, self).__init__() self.max_length = None self.max_new_tokens = None self.tokenizer = None self.initialized = False def initialize(self, ctx: Context): """In this initialize function, the HF large model is loaded and partitioned using DeepSpeed. Args: ctx (context): It is a JSON Object containing information pertaining to the model artifacts parameters. """ super().initialize(ctx) model_dir = ctx.system_properties.get("model_dir") self.max_length = int(ctx.model_yaml_config["handler"]["max_length"]) self.max_new_tokens = int(ctx.model_yaml_config["handler"]["max_new_tokens"]) model_name = ctx.model_yaml_config["handler"]["model_name"] model_path = ctx.model_yaml_config["handler"]["model_path"] seed = int(ctx.model_yaml_config["handler"]["manual_seed"]) torch.manual_seed(seed) logger.info("Model %s loading tokenizer", ctx.model_name) self.tokenizer = AutoTokenizer.from_pretrained(model_name) self.tokenizer.pad_token = self.tokenizer.eos_token config = AutoConfig.from_pretrained(model_name) with torch.device("meta"): self.model = AutoModelForCausalLM.from_config( config, torch_dtype=torch.float16 ) self.model = self.model.eval() ds_engine = get_ds_engine(self.model, ctx) self.model = ds_engine.module logger.info("Model %s loaded successfully", ctx.model_name) self.initialized = True def preprocess(self, requests): """ Basic text preprocessing, based on the user's choice of application mode. Args: requests (list): A list of dictionaries with a "data" or "body" field, each containing the input text to be processed. Returns: tuple: A tuple with two tensors: the batch of input ids and the batch of attention masks. """ def inference(self, input_batch): """ Predicts the class (or classes) of the received text using the serialized transformers checkpoint. Args: input_batch (tuple): A tuple with two tensors: the batch of input ids and the batch of attention masks, as returned by the preprocess function. Returns: list: A list of strings with the predicted values for each input text in the batch. """ def postprocess(self, inference_output): """Post Process Function converts the predicted response into Torchserve readable format. Args: inference_output (list): It contains the predicted response of the input text. Returns: (list): Returns a list of the Predictions and Explanations. """

Prepare your model artifacts

Before deploying your model on SageMaker, you must package your model artifacts. For large models, we recommend that you use the PyTorch torch-model-archiver tool with the argument --archive-format no-archive, which skips compressing model artifacts. The following example saves all of the model artifacts to a new folder named opt/.

torch-model-archiver --model-name opt --version 1.0 --handler custom_handler.py --extra-files ds-config.json -r requirements.txt --config-file opt/model-config.yaml --archive-format no-archive

Once the opt/ folder is created, download the OPT-30b model to the folder using the PyTorch Download_model tool.

cd opt python path_to/Download_model.py --model_path model --model_name facebook/opt-30b --revision main

Lastly, upload the model artifacts to an Amazon S3 bucket.

aws s3 cp opt {your_s3_bucket}/opt --recursive

You should now have model artifacts stored in Amazon S3 that are ready to deploy to a SageMaker endpoint.

Deploy the model using the SageMaker Python SDK

After preparing your model artifacts, you can deploy your model to a SageMaker Hosting endpoint. This section describes how to deploy a single large model to an endpoint and make streaming response predictions. For more information about streaming responses from endpoints, see Invoke real-time endpoints.

To deploy your model, complete the following steps:

  1. Create a SageMaker session, as shown in the following example.

    import boto3 import sagemaker from sagemaker import Model, image_uris, serializers, deserializers boto3_session=boto3.session.Session(region_name="us-west-2") smr = boto3.client('sagemaker-runtime-demo') sm = boto3.client('sagemaker') role = sagemaker.get_execution_role() # execution role for the endpoint sess= sagemaker.session.Session(boto3_session, sagemaker_client=sm, sagemaker_runtime_client=smr) # SageMaker session for interacting with different Amazon APIs region = sess._region_name # region name of the current SageMaker Studio Classic environment account = sess.account_id() # account_id of the current SageMaker Studio Classic environment # Configuration: bucket_name = sess.default_bucket() prefix = "torchserve" output_path = f"s3://{bucket_name}/{prefix}" print(f'account={account}, region={region}, role={role}, output_path={output_path}')
  2. Create an uncompressed model in SageMaker, as shown in the following example.

    from datetime import datetime instance_type = "ml.g5.24xlarge" endpoint_name = sagemaker.utils.name_from_base("ts-opt-30b") s3_uri = {your_s3_bucket}/opt model = Model( name="torchserve-opt-30b" + datetime.now().strftime("%Y-%m-%d-%H-%M-%S"), # Enable SageMaker uncompressed model artifacts model_data={ "S3DataSource": { "S3Uri": s3_uri, "S3DataType": "S3Prefix", "CompressionType": "None", } }, image_uri=container, role=role, sagemaker_session=sess, env={"TS_INSTALL_PY_DEP_PER_MODEL": "true"}, ) print(model)
  3. Deploy the model to an Amazon EC2 instance, as shown in the following example.

    model.deploy( initial_instance_count=1, instance_type=instance_type, endpoint_name=endpoint_name, volume_size=512, # increase the size to store large model model_data_download_timeout=3600, # increase the timeout to download large model container_startup_health_check_timeout=600, # increase the timeout to load large model )
  4. Initialize a class to process the streaming response, as shown in the following example.

    import io class Parser: """ A helper class for parsing the byte stream input. The output of the model will be in the following format: ``` b'{"outputs": [" a"]}\n' b'{"outputs": [" challenging"]}\n' b'{"outputs": [" problem"]}\n' ... ``` While usually each PayloadPart event from the event stream will contain a byte array with a full json, this is not guaranteed and some of the json objects may be split across PayloadPart events. For example: ``` {'PayloadPart': {'Bytes': b'{"outputs": '}} {'PayloadPart': {'Bytes': b'[" problem"]}\n'}} ``` This class accounts for this by concatenating bytes written via the 'write' function and then exposing a method which will return lines (ending with a '\n' character) within the buffer via the 'scan_lines' function. It maintains the position of the last read position to ensure that previous bytes are not exposed again. """ def __init__(self): self.buff = io.BytesIO() self.read_pos = 0 def write(self, content): self.buff.seek(0, io.SEEK_END) self.buff.write(content) data = self.buff.getvalue() def scan_lines(self): self.buff.seek(self.read_pos) for line in self.buff.readlines(): if line[-1] != b'\n': self.read_pos += len(line) yield line[:-1] def reset(self): self.read_pos = 0
  5. Test a streaming response prediction, as shown in the following example.

    import json body = "Today the weather is really nice and I am planning on".encode('utf-8') resp = smr.invoke_endpoint_with_response_stream(EndpointName=endpoint_name, Body=body, ContentType="application/json") event_stream = resp['Body'] parser = Parser() for event in event_stream: parser.write(event['PayloadPart']['Bytes']) for line in parser.scan_lines(): print(line.decode("utf-8"), end=' ')

You have now deployed your model to a SageMaker endpoint and should be able to invoke it for responses. For more information about SageMaker real-time endpoints, see Host a single model.