使用托管服务自定义推理代码 - Amazon SageMaker
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用托管服务自定义推理代码

本节介绍了 Amazon 如何与运行您自己的托管服务推理代码的 Docker 容器进行 SageMaker 交互。使用此信息编写推理代码并创建 Docker 镜像。

如何 SageMaker 运行你的推理图像

要配置容器以作为可执行文件运行,请使用 Dockerfile 中的 ENTRYPOINT 指令。请注意以下几点:

  • 要进行模型推断,请按以下 SageMaker 方式运行容器:

    docker run image serve

    SageMaker 通过在图像名称后指定serve参数来覆盖容器中的默认CMD语句。serve 参数覆盖您使用 Dockerfile 中的 CMD 命令提供的参数。

     

  • SageMaker 期望所有容器都使用 root 用户运行。创建您的容器,使其仅使用根用户。 SageMaker 运行您的容器时,没有 root 级访问权限的用户可能会导致权限问题。

     

  • 建议您使用 exec 形式的 ENTRYPOINT 指令:

    ENTRYPOINT ["executable", "param1", "param2"]

    例如:

    ENTRYPOINT ["python", "k_means_inference.py"]

    exec 形式的 ENTRYPOINT 指令直接启动可执行文件,而不是 /bin/sh 的子级。这使它能够接收SIGKILL来自 SageMaker API操作的信号,这是必需的。SIGTERM

     

    例如,当您使用创建终端节点时, SageMaker会预置终端节点配置所需的 ML 计算实例数量,该数量由您在请求中指定。CreateEndpointAPI SageMaker 在这些实例上运行 Docker 容器。

     

    如果您减少支持终端节点的实例数量(通过调用 UpdateEndpointWeightsAndCapacitiesAPI),则会 SageMaker 运行命令以停止正在终止的实例上的 Docker 容器。此命令发送 SIGTERM 信号,然后在 30 秒后发送 SIGKILL 信号。

     

    如果您更新终端节点(通过调用 UpdateEndpointAPI),则会 SageMaker 启动另一组 ML 计算实例,并运行其中包含您的推理代码的 Docker 容器。然后,它会运行一条命令来停止以前的 Docker 容器。为了停止 Docker 容器,此命令发送 SIGTERM 信号,然后在 30 秒后发送 SIGKILL 信号。

     

  • SageMaker 使用您在CreateModel请求中提供的容器定义来设置环境变量和容器的DNS主机名,如下所示:

     

    • 它使用ContainerDefinition.Environment string-to-string地图设置环境变量。

    • 它使用设置DNS主机名ContainerDefinition.ContainerHostname

       

  • 如果您计划使用GPU设备进行模型推断(通过在CreateEndpointConfig请求中指定GPU基于机器学习的计算实例),请确保您的容器nvidia-docker兼容。不要将NVIDIA驱动程序与映像捆绑在一起。有关的更多信息nvidia-docker,请参阅 NVIDIA/nvidia-docker。

     

  • 你不能在 SageMaker容器中使用tini初始化器作为入口点,因为它会被trainserve参数所混淆。

如何 SageMaker 加载模型工件

在您的CreateModelAPI请求中,您可以使用ModelDataUrlS3DataSource参数来标识存储模型工件的 S3 位置。 SageMaker 将模型工件从 S3 位置复制到/opt/ml/model目录中,供推理代码使用。您的容器具有对 /opt/ml/model 的只读访问权限。请勿写入此目录。

ModelDataUrl 必须指向 tar.gz 文件。否则,将 SageMaker 无法下载该文件。

如果您在中训练模型 SageMaker,则模型工件将作为单个压缩的 tar 文件保存在 Amazon S3 中。如果您在外部训练模型 SageMaker,则需要创建这个压缩的 tar 文件并将其保存在 S3 位置。 SageMaker 在容器启动之前,将此 tar 文件解压缩到 /opt/ml/model 目录中。

要部署大型模型,建议您按照部署未压缩的模型中的说明操作。

容器应如何响应推理请求

为了获得推论,客户端应用程序向 SageMaker端点发送POST请求。 SageMaker 将请求传递到容器,并将推理结果从容器返回给客户端。

有关您的容器将收到的推理请求的更多信息,请参阅 Amazon SageMaker API 参考中的以下操作:

推理容器的要求

要响应推理请求,您的容器必须满足以下要求:

  • SageMaker 删除除支持的POST标题之外的所有标题InvokeEndpoint。 SageMaker 可能会添加其他标题。推理容器必须能够安全地忽略这些额外标头。

  • 要接收推理请求,容器必须有一个在端口 8080 上侦听的 Web 服务器,并且必须接受发送到 /invocations/ping 端点的 POST 请求。

  • 客户的模型容器必须在 250 毫秒内接受套接字连接请求。

  • 客户的模型容器必须在 60 秒内响应请求。在响应 /invocations 之前,模型本身可有最多 60 秒的处理时间。如果您的模型要花费 50-60 秒的处理时间,则应将SDK套接字超时设置为 70 秒。

例 调用函数

以下示例演示了容器中的代码如何处理推理请求。这些示例处理客户端应用程序使用 InvokeEndpoint 操作发送的请求。

FastAPI

Fast API 是一个用于使用 Python APIs 进行构建的网络框架。

from fastapi import FastAPI, status, Request, Response . . . app = FastAPI() . . . @app.post('/invocations') async def invocations(request: Request): # model() is a hypothetical function that gets the inference output: model_resp = await model(Request) response = Response( content=model_resp, status_code=status.HTTP_200_OK, media_type="text/plain", ) return response . . .

在此示例中,该invocations函数处理 SageMaker 发送到/invocations终端节点的推理请求。

Flask

Flask 是一个框架,用于通过 Python 开发 Web 应用程序。

import flask . . . app = flask.Flask(__name__) . . . @app.route('/invocations', methods=["POST"]) def invoke(request): # model() is a hypothetical function that gets the inference output: resp_body = model(request) return flask.Response(resp_body, mimetype='text/plain')

在此示例中,该invoke函数处理 SageMaker 发送到/invocations终端节点的推理请求。

例 用于流式处理请求的调用函数

以下示例演示了推理容器中的代码如何处理流式推理请求。这些示例处理客户端应用程序使用 InvokeEndpointWithResponseStream 操作发送的请求。

当容器处理流式推理请求时,它会在模型生成推理时,以递增形式返回一系列的内容,每个内容都是一部分模型推理。客户端应用程序会在相关响应可用时立即开始接收响应。它们无需等待模型生成完整的响应。您可以实施流式处理以支持快速的交互式体验,例如聊天机器人、虚拟助手和音乐生成器。

FastAPI

Fast API 是一个用于使用 Python APIs 进行构建的网络框架。

from starlette.responses import StreamingResponse from fastapi import FastAPI, status, Request . . . app = FastAPI() . . . @app.post('/invocations') async def invocations(request: Request): # Streams inference response using HTTP chunked encoding async def generate(): # model() is a hypothetical function that gets the inference output: yield await model(Request) yield "\n" response = StreamingResponse( content=generate(), status_code=status.HTTP_200_OK, media_type="text/plain", ) return response . . .

在此示例中,该invocations函数处理 SageMaker 发送到/invocations终端节点的推理请求。为了流式处理响应,示例使用了 Starlette 框架中的 StreamingResponse 类。

Flask

Flask 是一个框架,用于通过 Python 开发 Web 应用程序。

import flask . . . app = flask.Flask(__name__) . . . @app.route('/invocations', methods=["POST"]) def invocations(request): # Streams inference response using HTTP chunked encoding def generate(): # model() is a hypothetical function that gets the inference output: yield model(request) yield "\n" return flask.Response( flask.stream_with_context(generate()), mimetype='text/plain') . . .

在此示例中,该invocations函数处理 SageMaker 发送到/invocations终端节点的推理请求。为了流式处理响应,示例使用了 Flask 框架中的 flask.stream_with_context 函数。

容器应如何响应运行状况检查 (Ping) 请求

SageMaker 在以下情况下启动新的推理容器:

  • 回应CreateEndpointUpdateEndpoint、和UpdateEndpointWeightsAndCapacitiesAPI呼叫

  • 安全修补

  • 替换运行状况不佳的实例

容器启动后不久,就会 SageMaker 开始定期向/ping终端节点发送GET请求。

容器上最简单的要求是用 HTTP 200 的状态码和空的正文进行响应。这 SageMaker 表示容器已准备好接受/invocations终端节点的推理请求。

如果容器在启动后的 8 分钟内没有通过持续响应 200 来开始通过运行状况检查,则新实例启动将失败。这会CreateEndpoint导致失败,使端点处于故障状态。请求的更新UpdateEndpoint未完成,安全补丁未应用,运行状况不佳的实例也未被替换。

虽然最低限制供容器用来返回静态 200,但容器开发人员可使用此功能执行更深入的检查。/ping 尝试的请求超时为 2 秒。