自定义托管服务的推理代码 - 亚马逊 SageMaker AI
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

自定义托管服务的推理代码

本节介绍了 Amazon SageMaker AI 如何与运行您自己的托管服务推理代码的 Docker 容器交互。使用此信息编写推理代码并创建 Docker 镜像。

SageMaker AI 如何运行你的推理图像

要配置容器以作为可执行文件运行,请使用 Dockerfile 中的 ENTRYPOINT 指令。注意以下几点:

  • 对于模型推理, SageMaker AI 按以下方式运行容器:

    docker run image serve

    SageMaker AI 通过在图像名称后指定serve参数来覆盖容器中的默认CMD语句。serve 参数覆盖您使用 Dockerfile 中的 CMD 命令提供的参数。

     

  • SageMaker AI 期望所有容器都以 root 用户身份运行。创建您的容器,使其仅使用根用户。当 SageMaker AI 运行您的容器时,没有 root 级访问权限的用户可能会导致权限问题。

     

  • 建议您使用 exec 形式的 ENTRYPOINT 指令:

    ENTRYPOINT ["executable", "param1", "param2"]

    例如:

    ENTRYPOINT ["python", "k_means_inference.py"]

    exec 形式的 ENTRYPOINT 指令直接启动可执行文件,而不是 /bin/sh 的子级。这使它能够接收 SageMaker API 操作SIGKILL的信号,这是必需的。SIGTERM

     

    例如,当您使用 CreateEndpointAPI 创建终端节点时, SageMaker AI 会预配置您在请求中指定的终端节点配置所需的机器学习计算实例数量。 SageMaker AI 在这些实例上运行 Docker 容器。

     

    如果您减少支持终端节点的实例数量(通过调用 UpdateEndpointWeightsAndCapacitiesAPI), SageMaker AI 会运行命令来停止正在终止的实例上的 Docker 容器。此命令发送 SIGTERM 信号,然后在 30 秒后发送 SIGKILL 信号。

     

    如果您更新终端节点(通过调用 UpdateEndpointAPI), SageMaker AI 会启动另一组 ML 计算实例,并运行其中包含您的推理代码的 Docker 容器。然后,它会运行一条命令来停止以前的 Docker 容器。为了停止 Docker 容器,此命令发送 SIGTERM 信号,然后在 30 秒后发送 SIGKILL 信号。

     

  • SageMaker AI 使用您在CreateModel请求中提供的容器定义来设置环境变量和容器的 DNS 主机名,如下所示:

     

    • 它使用ContainerDefinition.Environment string-to-string地图设置环境变量。

    • 它使用 ContainerDefinition.ContainerHostname 设置 DNS 主机名。

       

  • 如果您计划为模型推理使用 GPU 设备(通过在 CreateEndpointConfig 请求中指定基于 GPU 的 ML 计算实例),请确保您的容器与 nvidia-docker 兼容。不要将 NVIDIA 驱动程序与映像捆绑。有关 nvidia-docker 的更多信息,请参阅 NVIDIA/nvidia-docker

     

  • 你不能使用tini初始化器作为 SageMaker AI 容器中的入口点,因为它会被trainserve参数所混淆。

SageMaker AI 如何加载你的模型工件

在您CreateModel的 API 请求中,您可以使用ModelDataUrlS3DataSource参数来标识存储模型工件的 S3 位置。 SageMaker AI 将您的模型工件从 S3 位置复制到/opt/ml/model目录中,供您的推理代码使用。您的容器具有对 /opt/ml/model 的只读访问权限。请勿写入此目录。

ModelDataUrl 必须指向 tar.gz 文件。否则, SageMaker AI 将无法下载该文件。

如果您使用 SageMaker AI 训练模型,则模型工件将作为单个压缩的 tar 文件保存在 Amazon S3 中。如果您在 SageMaker AI 之外训练模型,则需要创建这个压缩的 tar 文件并将其保存在 S3 位置。 SageMaker 在你的容器启动之前,AI 会将这个 tar 文件解压缩到/ opt/ml/model 目录中。

要部署大型模型,建议您按照部署未压缩的模型中的说明操作。

容器应如何响应推理请求

为了获得推论,客户端应用程序向 A SageMaker I 终端节点发送 POST 请求。 SageMaker AI 将请求传递到容器,并将推理结果从容器返回给客户端。

有关您的容器将收到的推理请求的更多信息,请参阅 Amazon AI AP SageMaker I 参考中的以下操作:

推理容器的要求

要响应推理请求,您的容器必须满足以下要求:

  • SageMaker 除了支持的POST标题外,AI 会删除所有标题InvokeEndpoint。 SageMaker AI 可能会添加其他标题。推理容器必须能够安全地忽略这些额外标头。

  • 要接收推理请求,容器必须有一个在端口 8080 上侦听的 Web 服务器,并且必须接受发送到 /invocations/ping 端点的 POST 请求。

  • 客户的模型容器必须在 250 毫秒内接受套接字连接请求。

  • 客户的模型容器必须在 60 秒内响应请求。在响应 /invocations 之前,模型本身可有最多 60 秒的处理时间。如果您的模型需要 50 到 60 秒的处理时间,则开发工具包套接字超时应设置为 70 秒。

  • 支持双向流媒体的客户模型容器必须:

    • 默认情况下,支持端口 8080 与/ invocations-bidirectional-stream 的 WebSockets 连接。

    • 让 Web 服务器在端口 8080 上侦听,并且必须接受发送到 /ping 端点的 POST 请求。

    • 除了通过 HTTP 进行容器运行状况检查外,对于发送 Ping Frame,容器还必须使用 Pon WebSocket g Frame per (RFC6455) 进行响应。

例 调用函数

以下示例演示了容器中的代码如何处理推理请求。这些示例处理客户端应用程序使用 InvokeEndpoint 操作发送的请求。

FastAPI

FastAPI 是一个用于使用 Python APIs 进行构建的网络框架。

from fastapi import FastAPI, status, Request, Response . . . app = FastAPI() . . . @app.post('/invocations') async def invocations(request: Request): # model() is a hypothetical function that gets the inference output: model_resp = await model(Request) response = Response( content=model_resp, status_code=status.HTTP_200_OK, media_type="text/plain", ) return response . . .

在此示例中,该invocations函数处理 SageMaker AI 向/invocations终端节点发送的推理请求。

Flask

Flask 是一个框架,用于通过 Python 开发 Web 应用程序。

import flask . . . app = flask.Flask(__name__) . . . @app.route('/invocations', methods=["POST"]) def invoke(request): # model() is a hypothetical function that gets the inference output: resp_body = model(request) return flask.Response(resp_body, mimetype='text/plain')

在此示例中,该invoke函数处理 SageMaker AI 向/invocations终端节点发送的推理请求。

例 用于流式处理请求的调用函数

以下示例演示了推理容器中的代码如何处理流式推理请求。这些示例处理客户端应用程序使用 InvokeEndpointWithResponseStream 操作发送的请求。

当容器处理流式推理请求时,它会在模型生成推理时,以递增形式返回一系列的内容,每个内容都是一部分模型推理。客户端应用程序会在相关响应可用时立即开始接收响应。它们无需等待模型生成完整的响应。您可以实施流式处理以支持快速的交互式体验,例如聊天机器人、虚拟助手和音乐生成器。

FastAPI

FastAPI 是一个用于使用 Python APIs 进行构建的网络框架。

from starlette.responses import StreamingResponse from fastapi import FastAPI, status, Request . . . app = FastAPI() . . . @app.post('/invocations') async def invocations(request: Request): # Streams inference response using HTTP chunked encoding async def generate(): # model() is a hypothetical function that gets the inference output: yield await model(Request) yield "\n" response = StreamingResponse( content=generate(), status_code=status.HTTP_200_OK, media_type="text/plain", ) return response . . .

在此示例中,该invocations函数处理 SageMaker AI 向/invocations终端节点发送的推理请求。为了流式处理响应,示例使用了 Starlette 框架中的 StreamingResponse 类。

Flask

Flask 是一个框架,用于通过 Python 开发 Web 应用程序。

import flask . . . app = flask.Flask(__name__) . . . @app.route('/invocations', methods=["POST"]) def invocations(request): # Streams inference response using HTTP chunked encoding def generate(): # model() is a hypothetical function that gets the inference output: yield model(request) yield "\n" return flask.Response( flask.stream_with_context(generate()), mimetype='text/plain') . . .

在此示例中,该invocations函数处理 SageMaker AI 向/invocations终端节点发送的推理请求。为了流式处理响应,示例使用了 Flask 框架中的 flask.stream_with_context 函数。

例 双向流媒体的调用函数示例

以下示例演示了容器中的代码如何处理流式推理请求和响应。这些示例处理客户端应用程序使用 InvokeEndpointWithBidirectionalStream操作发送的流式传输请求。

具有双向流传输功能的容器可以处理流式推理请求,其中部分在客户端以增量方式生成并流式传输到容器。当模型生成模型时,它会将模型的推断作为一系列部分返回给客户端。客户端应用程序会在相关响应可用时立即开始接收响应。他们无需等待客户端完全生成的请求,也不需要等待模型生成整个响应。您可以实现双向流媒体以支持快速的交互体验,例如聊天机器人、交互式语音 AI 助手和实时翻译,从而获得更实时的体验。

FastAPI

FastAPI 是一个用于使用 Python APIs 进行构建的网络框架。

import sys import asyncio import json from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import JSONResponse import uvicorn app = FastAPI() ... @app.websocket("/invocations-bidirectional-stream") async def websocket_invoke(websocket: WebSocket): """ WebSocket endpoint with RFC 6455 ping/pong and fragmentation support Handles: - Text messages (JSON) - including fragmented frames - Binary messages - including fragmented frames - Ping frames (automatically responds with pong) - Pong frames (logs receipt) - Fragmented frames per RFC 6455 Section 5.4 """ await manager.connect(websocket) # Fragment reassembly buffers per RFC 6455 Section 5.4 text_fragments = [] binary_fragments = [] while True: # Use receive() to handle all WebSocket frame types message = await websocket.receive() print(f"Received message: {message}") if message["type"] == "websocket.receive": if "text" in message: # Handle text frames (including fragments) text_data = message["text"] more_body = message.get("more_body", False) if more_body: # This is a fragment, accumulate it text_fragments.append(text_data) print(f"Received text fragment: {len(text_data)} chars (more coming)") else: # This is the final frame or a complete message if text_fragments: # Reassemble fragmented message text_fragments.append(text_data) complete_text = "".join(text_fragments) text_fragments.clear() print(f"Reassembled fragmented text message: {len(complete_text)} chars total") await handle_text_message(websocket, complete_text) else: # Complete message in single frame await handle_text_message(websocket, text_data) elif "bytes" in message: # Handle binary frames (including fragments) binary_data = message["bytes"] more_body = message.get("more_body", False) if more_body: # This is a fragment, accumulate it binary_fragments.append(binary_data) print(f"Received binary fragment: {len(binary_data)} bytes (more coming)") else: # This is the final frame or a complete message if binary_fragments: # Reassemble fragmented message binary_fragments.append(binary_data) complete_binary = b"".join(binary_fragments) binary_fragments.clear() print(f"Reassembled fragmented binary message: {len(complete_binary)} bytes total") await handle_binary_message(websocket, complete_binary) else: # Complete message in single frame await handle_binary_message(websocket, binary_data) elif message["type"] == "websocket.ping": # Handle ping frames - RFC 6455 Section 5.5.2 ping_data = message.get("bytes", b"") print(f"Received PING frame with payload: {ping_data}") # FastAPI automatically sends pong response elif message["type"] == "websocket.pong": # Handle pong frames pong_data = message.get("bytes", b"") print(f"Received PONG frame with payload: {pong_data}") elif message["type"] == "websocket.close": # Handle close frames - RFC 6455 Section 5.5.1 close_code = message.get("code", 1000) close_reason = message.get("reason", "") print(f"Received CLOSE frame - Code: {close_code}, Reason: '{close_reason}'") # Send close frame response if not already closing try: await websocket.close(code=close_code, reason=close_reason) print(f"Sent CLOSE frame response - Code: {close_code}") except Exception as e: print(f"Error sending close frame: {e}") break elif message["type"] == "websocket.disconnect": print("Client initiated disconnect") break else: print(f"Received unknown message type: {message['type']}") break async def handle_binary_message(websocket: WebSocket, binary_data: bytes): """Handle incoming binary messages (complete or reassembled from fragments)""" print(f"Processing complete binary message: {len(binary_data)} bytes") try: # Echo back the binary data await websocket.send_bytes(binary_data) except Exception as e: print(f"Error handling binary message: {e}") async def handle_text_message(websocket: WebSocket, data: str): """Handle incoming text messages""" try: # Send response back to the same client await manager.send_personal_message(data, websocket) except Exception as e: print(f"Error handling text message: {e}") def main(): if len(sys.argv) > 1 and sys.argv[1] == "serve": print("Starting server on port 8080...") uvicorn.run(app, host="0.0.0.0", port=8080) else: print("Usage: python app.py serve") sys.exit(1) if __name__ == "__main__": main()

在此示例中,该websocket_invoke函数处理 SageMaker AI 向/invocations-bidirectional-stream终端节点发送的推理请求。它显示了如何处理流请求和将响应流回客户端。

容器应如何响应运行状况检查 (Ping) 请求

SageMaker 在以下情况下,AI 会启动新的推理容器:

  • 响应 CreateEndpointUpdateEndpointUpdateEndpointWeightsAndCapacities API 调用

  • 安全修补

  • 替换运行状况不佳的实例

容器启动后不久, SageMaker AI 开始定期向/ping终端节点发送 GET 请求。

容器上的最简单要求是使用 HTTP 200 状态代码和空白正文进行响应。这向 SageMaker AI 表明容器已准备好接受/invocations终端节点的推理请求。

如果在启动后的 8 分钟内,容器没有稳定地响应 200 状态代码,那么容器就未能通过运行状况检查,新实例的启动会失败。这会导致 CreateEndpoint 失败,使端点处于失败状态。UpdateEndpoint 请求的更新将无法完成,不会应用安全补丁,也不会替换运行状况不佳的实例。

虽然最低限制供容器用来返回静态 200,但容器开发人员可使用此功能执行更深入的检查。/ping 尝试的请求超时为 2 秒。

此外,能够处理双向流媒体请求的容器必须使用 Pong Frame(按 WebSocket 协议 RFC6455)响应 Ping Frame。如果连续 5 次未收到 Pong Frame,则 SageMaker 人工智能平台将关闭与容器的连接。 SageMaker 人工智能平台还将对装有 Pong Frames 的模型容器中的 Ping Frames 做出响应。

支持双向流媒体功能的 Support 容器合约

如果您想将模型容器托管为支持双向流媒体功能的 SageMaker AI 端点,则模型容器必须支持以下合约:

1。双向 Docker 标签

模型容器应有一个 Docker 标签,向 SageMaker AI 平台表明该容器支持双向流媒体功能。

com.amazonaws.sagemaker.capabilities.bidirectional-streaming=true

2。用于调用的 S WebSocket upport 连接

默认情况下,支持双向流媒体的客户模型容器必须支持端口 8080 上的 WebSockets 连接。/invocations-bidirectional-stream

在调用 API 时,可以通过传递 X-Amzn-SageMaker-Model-Invocation-Path 标头来覆盖此路径。 InvokeEndpointWithBidirectionalStream 此外,用户可以通过在调用 API 时传递 X-Amzn-SageMaker-Model-Query-String 标头来指定要附加到此路径的查询字符串。 InvokeEndpointWithBidirectionalStream

3。请求流处理

InvokeEndpointWithBidirectionalStream <Blob>API 输入有效负载以一系列形式流式传入 PayloadParts,这只是二进制块(“Bytes”:)的封装:

{ "PayloadPart": { "Bytes": <Blob>, "DataType": <String: UTF8 | BINARY>, "CompletionState": <String: PARTIAL | COMPLETE> "P": <String> } }

3.1。数据框

SageMaker AI 将输入作为 WebSocket 数据框传递 PayloadParts 到模型容器(RFC6455-Section-5.6

  1. SageMaker AI 不会检查二进制数据块。

  2. 收到输入时 PayloadPart

    • SageMaker AI 只从中创建了一个 WebSocket 数据框PayloadPart.Bytes,然后将其传递到模型容器。

    • 如果PayloadPart.DataType = UTF8, SageMaker AI 会创建文本数据框

    • 如果PayloadPart.DataType不存在或PayloadPart.DataType = BINARY, SageMaker AI 会创建二进制数据框

  3. 对于以 with 结尾并 PayloadParts 以 PayloadPart.CompletionState = PARTIAL with 结尾的序列PayloadPart.CompletionState = COMPLETE, SageMaker AI 会将它们转换为 WebSocket 分段的消息 RFC6455——第 5.4 节:分段: PayloadPart

    • 首字母 PayloadPart with PayloadPart.CompletionState = PARTIAL 将转换为 WebSocket 数据框,并清除 FIN 位。

    • 后续的 w PayloadParts it PayloadPart.CompletionState = PARTIAL h 将转换成带有 FIN 位清除的 WebSocket 延续帧。

    • 最后一个 w PayloadPart it PayloadPart.CompletionState = COMPLETE h 将转换为设置了 FIN 位的 WebSocket 延续帧。

  4. SageMaker AI 不会对输入中的二进制块进行编码或解码 PayloadPart,而是按原样将字节传递到模型容器。

  5. SageMaker AI 不会将多个输入合 PayloadParts 并为一个 BinaryDataFrame。

  6. SageMaker AI 不会将一个输入分块 PayloadPart 为多个 BinaryDataFrames输入。

示例:碎片化消息流

Client sends: PayloadPart 1: {Bytes: "Hello ", DataType: "UTF8", CompletionState: "PARTIAL"} PayloadPart 2: {Bytes: "World", DataType: "UTF8", CompletionState: "COMPLETE"} Container receives: Frame 1: Text Data Frame with "Hello " (FIN=0) Frame 2: Continuation Frame with "World" (FIN=1)

3.2。控制框架

除了数据帧之外, SageMaker AI 还会将控制帧发送到模型容器(RFC6455-Section-5.5):

  1. 关闭框架:如果出于任何原因关闭连接, SageMaker AI 可能会将 Close Frame(RFC6455-Section-5.5.1)发送到模型容器。

  2. Ping Frame: SageMaker AI 每 60 秒发送 Ping Frame(RFC6455-Section-5.2)一次,模型容器必须使用 Pong Frame 做出响应。如果连续 5 次未收到 Pong 帧(RFC6455-Section-5.5.3),则 AI 将关闭连接。 SageMaker

  3. Pong Frame: SageMaker AI 会用 Pong Frames 回应模型容器中的 Ping Frames。

4。响应流处理

输出以一系列 ModelStreamErrors 或的 PayloadParts形式流出 InternalStreamFailures。

{ "PayloadPart": { "Bytes": <Blob>, "DataType": <String: UTF8 | BINARY>, "CompletionState": <String: PARTIAL | COMPLETE>, }, "ModelStreamError": { "ErrorCode": <String>, "Message": <String> }, "InternalStreamFailure": { "Message": <String> } }

4.1。数据框

SageMaker AI 将从模型容器接收的数据帧转换为输出 PayloadParts:

  1. 从模型容器接收 WebSocket 文本数据框后, SageMaker AI 会从文本数据框中获取原始字节,然后将其包装成响应 PayloadPart,同时进行设置PayloadPart.DataType = UTF8

  2. 从模型容器接收 WebSocket 二进制数据帧后, SageMaker AI 会直接将数据帧中的字节封装成响应 PayloadPart,同时进行设置PayloadPart.DataType = BINARY

  3. 对于-S RFC6455ection-5.4:分段中定义的碎片消息:

    • FIN 位清除的初始数据框将转换为 wit PayloadPart h PayloadPart.CompletionState = PARTIAL

    • 随后带有 FIN 位清除的延续帧将转换为 w PayloadParts ith PayloadPart.CompletionState = PARTIAL

    • 设置了 FIN 位的最终延续帧将转换为 w PayloadPart ith PayloadPart.CompletionState = COMPLETE

  4. SageMaker AI 不会对从模型容器接收的字节进行编码或解码,而是按原样将字节传递到模型容器。

  5. SageMaker AI 不会将从模型容器接收到的多个数据帧合并成一个响应 PayloadPart。

  6. SageMaker AI 不会将从模型容器接收到的数据帧分块为多个响应 PayloadParts。

示例:流式传输响应流

Container sends: Frame 1: Text Data Frame with "Generating" (FIN=0) Frame 2: Continuation Frame with " response..." (FIN=1) Client receives: PayloadPart 1: {Bytes: "Generating", DataType: "UTF8", CompletionState: "PARTIAL"} PayloadPart 2: {Bytes: " response...", DataType: "UTF8", CompletionState: "COMPLETE"}

4.2。控制框架

SageMaker AI 对模型容器中的以下控制帧做出响应:

  1. 在收到来自模型容器的闭合帧(RFC6455-Section-5.5.1)后, SageMaker AI 会将状态代码(RFC6455-Section-7.4)和失败消息封装进去 ModelStreamError,然后将其流回最终用户。

  2. 从模型容器中收到 Ping Frame(RFC6455-Section-5.2)后, SageMaker AI 将使用 Pong Frame 做出回应。

  3. Pong Frame(RFC6455-Section-5.3):如果连续 5 次未收到 Pong Frame,AI 将关闭连接。 SageMaker