

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 具有自定义推理代码的容器
<a name="your-algorithms-inference-main"></a>

您可以使用 Amazon SageMaker AI 与 Docker 容器进行交互，并通过下列两种方式之一运行您自己的推理代码：
+ 要将您自己的推理代码与持久端点结合使用，从而一次获得一个预测，请使用 SageMaker AI 托管服务。
+ 要使用您自己的推理代码来获得对整个数据集的预测，请使用 SageMaker AI 批量转换。

**Topics**
+ [

# 自定义托管服务的推理代码
](your-algorithms-inference-code.md)
+ [

# 自定义用于批次转换的推理代码
](your-algorithms-batch-code.md)

# 自定义托管服务的推理代码
<a name="your-algorithms-inference-code"></a>

本节介绍了 Amazon SageMaker AI 如何与运行您自己的托管服务推理代码的 Docker 容器交互。使用此信息编写推理代码并创建 Docker 镜像。

**Topics**
+ [

## SageMaker AI 如何运行你的推理图像
](#your-algorithms-inference-code-run-image)
+ [

## SageMaker AI 如何加载你的模型工件
](#your-algorithms-inference-code-load-artifacts)
+ [

## 容器应如何响应推理请求
](#your-algorithms-inference-code-container-response)
+ [

## 容器应如何响应运行状况检查 (Ping) 请求
](#your-algorithms-inference-algo-ping-requests)
+ [

## 支持双向流媒体功能的 Support 容器合约
](#your-algorithms-inference-algo-bidi)
+ [

# 为实时推理容器使用私有 Docker 注册表
](your-algorithms-containers-inference-private.md)

## SageMaker AI 如何运行你的推理图像
<a name="your-algorithms-inference-code-run-image"></a>

要配置容器以作为可执行文件运行，请使用 Dockerfile 中的 `ENTRYPOINT` 指令。注意以下几点：
+ 对于模型推理， SageMaker AI 按以下方式运行容器：

  ```
  docker run image serve
  ```

  SageMaker AI 通过在图像名称后指定`serve`参数来覆盖容器中的默认`CMD`语句。`serve` 参数覆盖您使用 Dockerfile 中的 `CMD` 命令提供的参数。

   
+ SageMaker AI 期望所有容器都以 root 用户身份运行。创建您的容器，使其仅使用根用户。当 SageMaker AI 运行您的容器时，没有 root 级访问权限的用户可能会导致权限问题。

   
+ 建议您使用 `exec` 形式的 `ENTRYPOINT` 指令：

  ```
  ENTRYPOINT ["executable", "param1", "param2"]
  ```

  例如：

  ```
  ENTRYPOINT ["python", "k_means_inference.py"]
  ```

  `exec` 形式的 `ENTRYPOINT` 指令直接启动可执行文件，而不是 `/bin/sh` 的子级。这使它能够接收 SageMaker API 操作`SIGKILL`的信号，这是必需的。`SIGTERM`

   

  例如，当您使用 [https://docs.amazonaws.cn/sagemaker/latest/APIReference/API_CreateEndpoint.html](https://docs.amazonaws.cn/sagemaker/latest/APIReference/API_CreateEndpoint.html)API 创建终端节点时， SageMaker AI 会预配置您在请求中指定的终端节点配置所需的机器学习计算实例数量。 SageMaker AI 在这些实例上运行 Docker 容器。

   

  如果您减少支持终端节点的实例数量（通过调用 [https://docs.amazonaws.cn/sagemaker/latest/APIReference/API_UpdateEndpointWeightsAndCapacities.html](https://docs.amazonaws.cn/sagemaker/latest/APIReference/API_UpdateEndpointWeightsAndCapacities.html)API）， SageMaker AI 会运行命令来停止正在终止的实例上的 Docker 容器。此命令发送 `SIGTERM` 信号，然后在 30 秒后发送 `SIGKILL` 信号。

   

  如果您更新终端节点（通过调用 [https://docs.amazonaws.cn/sagemaker/latest/APIReference/API_UpdateEndpoint.html](https://docs.amazonaws.cn/sagemaker/latest/APIReference/API_UpdateEndpoint.html)API）， SageMaker AI 会启动另一组 ML 计算实例，并运行其中包含您的推理代码的 Docker 容器。然后，它会运行一条命令来停止以前的 Docker 容器。为了停止 Docker 容器，此命令发送 `SIGTERM` 信号，然后在 30 秒后发送 `SIGKILL` 信号。

   
+ SageMaker AI 使用您在[https://docs.amazonaws.cn/sagemaker/latest/APIReference/API_CreateModel.html](https://docs.amazonaws.cn/sagemaker/latest/APIReference/API_CreateModel.html)请求中提供的容器定义来设置环境变量和容器的 DNS 主机名，如下所示：

   
  + 它使用`ContainerDefinition.Environment` string-to-string地图设置环境变量。
  + 它使用 `ContainerDefinition.ContainerHostname` 设置 DNS 主机名。

     
+ 如果您计划为模型推理使用 GPU 设备（通过在 `CreateEndpointConfig` 请求中指定基于 GPU 的 ML 计算实例），请确保您的容器与 `nvidia-docker` 兼容。不要将 NVIDIA 驱动程序与映像捆绑。有关 `nvidia-docker` 的更多信息，请参阅 [NVIDIA/nvidia-docker](https://github.com/NVIDIA/nvidia-docker)。

   
+ 你不能使用`tini`初始化器作为 SageMaker AI 容器中的入口点，因为它会被`train`和`serve`参数所混淆。

  

## SageMaker AI 如何加载你的模型工件
<a name="your-algorithms-inference-code-load-artifacts"></a>

在您[https://docs.amazonaws.cn/sagemaker/latest/APIReference/API_CreateModel.html](https://docs.amazonaws.cn/sagemaker/latest/APIReference/API_CreateModel.html)的 API 请求中，您可以使用`ModelDataUrl`或`S3DataSource`参数来标识存储模型工件的 S3 位置。 SageMaker AI 将您的模型工件从 S3 位置复制到`/opt/ml/model`目录中，供您的推理代码使用。您的容器具有对 `/opt/ml/model` 的只读访问权限。请勿写入此目录。

`ModelDataUrl` 必须指向 tar.gz 文件。否则， SageMaker AI 将无法下载该文件。

如果您使用 SageMaker AI 训练模型，则模型工件将作为单个压缩的 tar 文件保存在 Amazon S3 中。如果您在 SageMaker AI 之外训练模型，则需要创建这个压缩的 tar 文件并将其保存在 S3 位置。 SageMaker 在你的容器启动之前，AI 会将这个 tar 文件解压缩到/ opt/ml/model 目录中。

要部署大型模型，建议您按照[部署未压缩的模型](large-model-inference-uncompressed.md)中的说明操作。

## 容器应如何响应推理请求
<a name="your-algorithms-inference-code-container-response"></a>

为了获得推论，客户端应用程序向 A SageMaker I 终端节点发送 POST 请求。 SageMaker AI 将请求传递到容器，并将推理结果从容器返回给客户端。

有关您的容器将收到的推理请求的更多信息，请参阅 *Amazon AI AP SageMaker I 参考*中的以下操作：
+ [ InvokeEndpoint](https://docs.amazonaws.cn/sagemaker/latest/APIReference/API_runtime_InvokeEndpoint.html)
+ [ InvokeEndpointAsync](https://docs.amazonaws.cn/sagemaker/latest/APIReference/API_runtime_InvokeEndpointAsync.html)
+ [ InvokeEndpointWithResponseStream](https://docs.amazonaws.cn/sagemaker/latest/APIReference/API_runtime_InvokeEndpointWithResponseStream.html)
+ [ InvokeEndpointWithResponseStream](https://docs.amazonaws.cn/sagemaker/latest/APIReference/API_runtime_InvokeEndpointWithBidirectionalStream.html)

**推理容器的要求**

要响应推理请求，您的容器必须满足以下要求：
+ SageMaker 除了支持的`POST`标题外，AI 会删除所有标题`InvokeEndpoint`。 SageMaker AI 可能会添加其他标题。推理容器必须能够安全地忽略这些额外标头。
+ 要接收推理请求，容器必须有一个在端口 8080 上侦听的 Web 服务器，并且必须接受发送到 `/invocations` 和 `/ping` 端点的 `POST` 请求。
+ 客户的模型容器必须在 250 毫秒内接受套接字连接请求。
+ 客户的模型容器必须在 60 秒内响应请求。在响应 `/invocations` 之前，模型本身可有最多 60 秒的处理时间。如果您的模型需要 50 到 60 秒的处理时间，则开发工具包套接字超时应设置为 70 秒。
+ 支持双向流媒体的客户模型容器必须：
  + 默认情况下，支持端口 8080 与/ invocations-bidirectional-stream 的 WebSockets 连接。
  + 让 Web 服务器在端口 8080 上侦听，并且必须接受发送到 /ping 端点的 POST 请求。
  + 除了通过 HTTP 进行容器运行状况检查外，对于发送 Ping Frame，容器还必须使用 Pon WebSocket g Frame per ([RFC6455](https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.3)) 进行响应。

**Example 调用函数**  
以下示例演示了容器中的代码如何处理推理请求。这些示例处理客户端应用程序使用 InvokeEndpoint 操作发送的请求。  
FastAPI 是一个用于使用 Python APIs 进行构建的网络框架。  

```
from fastapi import FastAPI, status, Request, Response
. . .
app = FastAPI()
. . .
@app.post('/invocations')
async def invocations(request: Request):
    # model() is a hypothetical function that gets the inference output:
    model_resp = await model(Request)

    response = Response(
        content=model_resp,
        status_code=status.HTTP_200_OK,
        media_type="text/plain",
    )
    return response
. . .
```
在此示例中，该`invocations`函数处理 SageMaker AI 向`/invocations`终端节点发送的推理请求。
Flask 是一个框架，用于通过 Python 开发 Web 应用程序。  

```
import flask
. . .
app = flask.Flask(__name__)
. . .
@app.route('/invocations', methods=["POST"])
def invoke(request):
    # model() is a hypothetical function that gets the inference output:
    resp_body = model(request)
    return flask.Response(resp_body, mimetype='text/plain')
```
在此示例中，该`invoke`函数处理 SageMaker AI 向`/invocations`终端节点发送的推理请求。

**Example 用于流式处理请求的调用函数**  
以下示例演示了推理容器中的代码如何处理流式推理请求。这些示例处理客户端应用程序使用 InvokeEndpointWithResponseStream 操作发送的请求。  
当容器处理流式推理请求时，它会在模型生成推理时，以递增形式返回一系列的内容，每个内容都是一部分模型推理。客户端应用程序会在相关响应可用时立即开始接收响应。它们无需等待模型生成完整的响应。您可以实施流式处理以支持快速的交互式体验，例如聊天机器人、虚拟助手和音乐生成器。  
FastAPI 是一个用于使用 Python APIs 进行构建的网络框架。  

```
from starlette.responses import StreamingResponse
from fastapi import FastAPI, status, Request
. . .
app = FastAPI()
. . .
@app.post('/invocations')
async def invocations(request: Request):
    # Streams inference response using HTTP chunked encoding
    async def generate():
        # model() is a hypothetical function that gets the inference output:
        yield await model(Request)
        yield "\n"

    response = StreamingResponse(
        content=generate(),
        status_code=status.HTTP_200_OK,
        media_type="text/plain",
    )
    return response
. . .
```
在此示例中，该`invocations`函数处理 SageMaker AI 向`/invocations`终端节点发送的推理请求。为了流式处理响应，示例使用了 Starlette 框架中的 `StreamingResponse` 类。
Flask 是一个框架，用于通过 Python 开发 Web 应用程序。  

```
import flask
. . .
app = flask.Flask(__name__)
. . .
@app.route('/invocations', methods=["POST"])
def invocations(request):
    # Streams inference response using HTTP chunked encoding

    def generate():
        # model() is a hypothetical function that gets the inference output:
        yield model(request)
        yield "\n"
    return flask.Response(
        flask.stream_with_context(generate()), mimetype='text/plain')
. . .
```
在此示例中，该`invocations`函数处理 SageMaker AI 向`/invocations`终端节点发送的推理请求。为了流式处理响应，示例使用了 Flask 框架中的 `flask.stream_with_context` 函数。

**Example 双向流媒体的调用函数示例**  
以下示例演示了容器中的代码如何处理流式推理请求和响应。这些示例处理客户端应用程序使用 InvokeEndpointWithBidirectionalStream操作发送的流式传输请求。  
具有双向流传输功能的容器可以处理流式推理请求，其中部分在客户端以增量方式生成并流式传输到容器。当模型生成模型时，它会将模型的推断作为一系列部分返回给客户端。客户端应用程序会在相关响应可用时立即开始接收响应。他们无需等待客户端完全生成的请求，也不需要等待模型生成整个响应。您可以实现双向流媒体以支持快速的交互体验，例如聊天机器人、交互式语音 AI 助手和实时翻译，从而获得更实时的体验。  
FastAPI 是一个用于使用 Python APIs 进行构建的网络框架。  

```
import sys
import asyncio
import json
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import JSONResponse
import uvicorn

app = FastAPI()
...
@app.websocket("/invocations-bidirectional-stream")
async def websocket_invoke(websocket: WebSocket):
    """
    WebSocket endpoint with RFC 6455 ping/pong and fragmentation support
    
    Handles:
    - Text messages (JSON) - including fragmented frames
    - Binary messages - including fragmented frames
    - Ping frames (automatically responds with pong)
    - Pong frames (logs receipt)
    - Fragmented frames per RFC 6455 Section 5.4
    """
    await manager.connect(websocket)
    
    # Fragment reassembly buffers per RFC 6455 Section 5.4
    text_fragments = []
    binary_fragments = []
    
    while True:
        # Use receive() to handle all WebSocket frame types
        message = await websocket.receive()
        print(f"Received message: {message}")
        if message["type"] == "websocket.receive":
            if "text" in message:
                # Handle text frames (including fragments)
                text_data = message["text"]
                more_body = message.get("more_body", False)
                
                if more_body:
                    # This is a fragment, accumulate it
                    text_fragments.append(text_data)
                    print(f"Received text fragment: {len(text_data)} chars (more coming)")
                else:
                    # This is the final frame or a complete message
                    if text_fragments:
                        # Reassemble fragmented message
                        text_fragments.append(text_data)
                        complete_text = "".join(text_fragments)
                        text_fragments.clear()
                        print(f"Reassembled fragmented text message: {len(complete_text)} chars total")
                        await handle_text_message(websocket, complete_text)
                    else:
                        # Complete message in single frame
                        await handle_text_message(websocket, text_data)
                
            elif "bytes" in message:
                # Handle binary frames (including fragments)
                binary_data = message["bytes"]
                more_body = message.get("more_body", False)
                
                if more_body:
                    # This is a fragment, accumulate it
                    binary_fragments.append(binary_data)
                    print(f"Received binary fragment: {len(binary_data)} bytes (more coming)")
                else:
                    # This is the final frame or a complete message
                    if binary_fragments:
                        # Reassemble fragmented message
                        binary_fragments.append(binary_data)
                        complete_binary = b"".join(binary_fragments)
                        binary_fragments.clear()
                        print(f"Reassembled fragmented binary message: {len(complete_binary)} bytes total")
                        await handle_binary_message(websocket, complete_binary)
                    else:
                        # Complete message in single frame
                        await handle_binary_message(websocket, binary_data)
                
        elif message["type"] == "websocket.ping":
            # Handle ping frames - RFC 6455 Section 5.5.2
            ping_data = message.get("bytes", b"")
            print(f"Received PING frame with payload: {ping_data}")
            # FastAPI automatically sends pong response
            
        elif message["type"] == "websocket.pong":
            # Handle pong frames
            pong_data = message.get("bytes", b"")
            print(f"Received PONG frame with payload: {pong_data}")
            
        elif message["type"] == "websocket.close":
            # Handle close frames - RFC 6455 Section 5.5.1
            close_code = message.get("code", 1000)
            close_reason = message.get("reason", "")
            print(f"Received CLOSE frame - Code: {close_code}, Reason: '{close_reason}'")
            
            # Send close frame response if not already closing
            try:
                await websocket.close(code=close_code, reason=close_reason)
                print(f"Sent CLOSE frame response - Code: {close_code}")
            except Exception as e:
                print(f"Error sending close frame: {e}")
            break
            
        elif message["type"] == "websocket.disconnect":
            print("Client initiated disconnect")
            break

        else:
            print(f"Received unknown message type: {message['type']}")
            break

                        
async def handle_binary_message(websocket: WebSocket, binary_data: bytes):
    """Handle incoming binary messages (complete or reassembled from fragments)"""
    print(f"Processing complete binary message: {len(binary_data)} bytes")
    
    try:
        # Echo back the binary data
        await websocket.send_bytes(binary_data)
    except Exception as e:
        print(f"Error handling binary message: {e}")

async def handle_text_message(websocket: WebSocket, data: str):
    """Handle incoming text messages"""
    try:
        # Send response back to the same client
        await manager.send_personal_message(data, websocket)
    except Exception as e:
        print(f"Error handling text message: {e}")

def main():
    if len(sys.argv) > 1 and sys.argv[1] == "serve":
        print("Starting server on port 8080...")
        uvicorn.run(app, host="0.0.0.0", port=8080)
    else:
        print("Usage: python app.py serve")
        sys.exit(1)

if __name__ == "__main__":
    main()
```
在此示例中，该`websocket_invoke`函数处理 SageMaker AI 向`/invocations-bidirectional-stream`终端节点发送的推理请求。它显示了如何处理流请求和将响应流回客户端。

## 容器应如何响应运行状况检查 (Ping) 请求
<a name="your-algorithms-inference-algo-ping-requests"></a>

SageMaker 在以下情况下，AI 会启动新的推理容器：
+ 响应 `CreateEndpoint`、`UpdateEndpoint` 和 `UpdateEndpointWeightsAndCapacities` API 调用
+ 安全修补
+ 替换运行状况不佳的实例

容器启动后不久， SageMaker AI 开始定期向`/ping`终端节点发送 GET 请求。

容器上的最简单要求是使用 HTTP 200 状态代码和空白正文进行响应。这向 SageMaker AI 表明容器已准备好接受`/invocations`终端节点的推理请求。

如果在启动后的 8 分钟内，容器没有稳定地响应 200 状态代码，那么容器就未能通过运行状况检查，新实例的启动会失败。这会导致 `CreateEndpoint` 失败，使端点处于失败状态。`UpdateEndpoint` 请求的更新将无法完成，不会应用安全补丁，也不会替换运行状况不佳的实例。

虽然最低限制供容器用来返回静态 200，但容器开发人员可使用此功能执行更深入的检查。`/ping` 尝试的请求超时为 2 秒。

此外，能够处理双向流媒体请求的容器必须使用 Pong Frame（按 WebSocket 协议 [RFC6455](https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.3)）响应 Ping Frame。如果连续 5 次未收到 Pong Frame，则 SageMaker 人工智能平台将关闭与容器的连接。 SageMaker 人工智能平台还将对装有 Pong Frames 的模型容器中的 Ping Frames 做出响应。

## 支持双向流媒体功能的 Support 容器合约
<a name="your-algorithms-inference-algo-bidi"></a>

如果您想将模型容器托管为支持双向流媒体功能的 SageMaker AI 端点，则模型容器必须支持以下合约：

**1。双向 Docker 标签**

模型容器应有一个 Docker 标签，向 SageMaker AI 平台表明该容器支持双向流媒体功能。

```
com.amazonaws.sagemaker.capabilities.bidirectional-streaming=true
```

**2。用于调用的 S WebSocket upport 连接**

默认情况下，支持双向流媒体的客户模型容器必须支持端口 8080 上的 WebSockets 连接。`/invocations-bidirectional-stream`

在调用 API 时，可以通过传递 X-Amzn-SageMaker-Model-Invocation-Path 标头来覆盖此路径。 InvokeEndpointWithBidirectionalStream 此外，用户可以通过在调用 API 时传递 X-Amzn-SageMaker-Model-Query-String 标头来指定要附加到此路径的查询字符串。 InvokeEndpointWithBidirectionalStream 

**3。请求流处理**

 InvokeEndpointWithBidirectionalStream <Blob>API 输入有效负载以一系列形式流式传入 PayloadParts，这只是二进制块（“Bytes”:）的封装：******

```
{
   "PayloadPart": { 
      "Bytes": <Blob>,
      "DataType": <String: UTF8 | BINARY>,
      "CompletionState": <String: PARTIAL | COMPLETE>
      "P": <String>
   }
}
```

**3.1。数据框**

SageMaker AI 将输入作为 WebSocket 数据框传递 PayloadParts 到模型容器（[RFC6455-Section-5.6](https://datatracker.ietf.org/doc/html/rfc6455#section-5.6)）

1. SageMaker AI 不会检查二进制数据块。

1. 收到输入时 PayloadPart
   + SageMaker AI 只从中创建了一个 WebSocket 数据框`PayloadPart.Bytes`，然后将其传递到模型容器。
   + 如果`PayloadPart.DataType = UTF8`， SageMaker AI 会创建文本数据框
   + 如果`PayloadPart.DataType`不存在或`PayloadPart.DataType = BINARY`， SageMaker AI 会创建二进制数据框

1. 对于以 with 结尾并 PayloadParts 以 `PayloadPart.CompletionState = PARTIAL` with 结尾的序列`PayloadPart.CompletionState = COMPLETE`， SageMaker AI 会将它们转换为 WebSocket 分段的消息 [RFC6455——第 5.4 节](https://datatracker.ietf.org/doc/html/rfc6455#section-5.4)：分段： PayloadPart 
   + 首字母 PayloadPart with `PayloadPart.CompletionState = PARTIAL` 将转换为 WebSocket 数据框，并清除 FIN 位。
   + 后续的 w PayloadParts it `PayloadPart.CompletionState = PARTIAL` h 将转换成带有 FIN 位清除的 WebSocket 延续帧。
   + 最后一个 w PayloadPart it `PayloadPart.CompletionState = COMPLETE` h 将转换为设置了 FIN 位的 WebSocket 延续帧。

1. SageMaker AI 不会对输入中的二进制块进行编码或解码 PayloadPart，而是按原样将字节传递到模型容器。

1. SageMaker AI 不会将多个输入合 PayloadParts 并为一个 BinaryDataFrame。

1. SageMaker AI 不会将一个输入分块 PayloadPart 为多个 BinaryDataFrames输入。

**示例：碎片化消息流**

```
Client sends:
PayloadPart 1: {Bytes: "Hello ", DataType: "UTF8", CompletionState: "PARTIAL"}
PayloadPart 2: {Bytes: "World", DataType: "UTF8", CompletionState: "COMPLETE"}

Container receives:
Frame 1: Text Data Frame with "Hello " (FIN=0)
Frame 2: Continuation Frame with "World" (FIN=1)
```

**3.2。控制框架**

除了数据帧之外， SageMaker AI 还会将控制帧发送到模型容器（[RFC6455-Section-5.5](https://datatracker.ietf.org/doc/html/rfc6455#section-5.5)）：

1. 关闭框架：如果出于任何原因关闭连接， SageMaker AI 可能会将 Close Frame（[RFC6455-Section-5.5.1](https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.1)）发送到模型容器。

1. Ping Frame： SageMaker AI 每 60 秒发送 Ping Frame（[RFC6455-Section-5.2](https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.2)）一次，模型容器必须使用 Pong Frame 做出响应。如果连续 5 次未收到 Pong 帧（[RFC6455-Section-5.5.3](https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.3)），则 AI 将关闭连接。 SageMaker 

1. Pong Frame： SageMaker AI 会用 Pong Frames 回应模型容器中的 Ping Frames。

**4。响应流处理**

输出以一系列 ModelStreamErrors 或的 PayloadParts形式流出 InternalStreamFailures。

```
{   
   "PayloadPart": { 
      "Bytes": <Blob>,
      "DataType": <String: UTF8 | BINARY>,
      "CompletionState": <String: PARTIAL | COMPLETE>,
   },
   "ModelStreamError": {
      "ErrorCode": <String>,
      "Message": <String>
   },
   "InternalStreamFailure": {
      "Message": <String>
   }
}
```

**4.1。数据框**

SageMaker AI 将从模型容器接收的数据帧转换为输出 PayloadParts：

1. 从模型容器接收 WebSocket 文本数据框后， SageMaker AI 会从文本数据框中获取原始字节，然后将其包装成响应 PayloadPart，同时进行设置`PayloadPart.DataType = UTF8`。

1. 从模型容器接收 WebSocket 二进制数据帧后， SageMaker AI 会直接将数据帧中的字节封装成响应 PayloadPart，同时进行设置`PayloadPart.DataType = BINARY`。

1. 对于-S [RFC6455ection-5.4](https://datatracker.ietf.org/doc/html/rfc6455#section-5.4)：分段中定义的碎片消息：
   + FIN 位清除的初始数据框将转换为 wit PayloadPart h `PayloadPart.CompletionState = PARTIAL`。
   + 随后带有 FIN 位清除的延续帧将转换为 w PayloadParts ith `PayloadPart.CompletionState = PARTIAL`。
   + 设置了 FIN 位的最终延续帧将转换为 w PayloadPart ith `PayloadPart.CompletionState = COMPLETE`。

1. SageMaker AI 不会对从模型容器接收的字节进行编码或解码，而是按原样将字节传递到模型容器。

1. SageMaker AI 不会将从模型容器接收到的多个数据帧合并成一个响应 PayloadPart。

1. SageMaker AI 不会将从模型容器接收到的数据帧分块为多个响应 PayloadParts。

**示例：流式传输响应流**

```
Container sends:
Frame 1: Text Data Frame with "Generating" (FIN=0)
Frame 2: Continuation Frame with " response..." (FIN=1)

Client receives:
PayloadPart 1: {Bytes: "Generating", DataType: "UTF8", CompletionState: "PARTIAL"}
PayloadPart 2: {Bytes: " response...", DataType: "UTF8", CompletionState: "COMPLETE"}
```

**4.2。控制框架**

SageMaker AI 对模型容器中的以下控制帧做出响应：

1. 在收到来自模型容器的闭合帧（[RFC6455-Section-5.5.1](https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.1)）后， SageMaker AI 会将状态代码（[RFC6455-Section-7.4](https://datatracker.ietf.org/doc/html/rfc6455#section-7.4)）和失败消息封装进去 ModelStreamError，然后将其流回最终用户。

1. 从模型容器中收到 Ping Frame（[RFC6455-Section-5.2](https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.2)）后， SageMaker AI 将使用 Pong Frame 做出回应。

1. Pong Frame（[RFC6455-Section-5.3](https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.3)）：如果连续 5 次未收到 Pong Frame，AI 将关闭连接。 SageMaker 

# 为实时推理容器使用私有 Docker 注册表
<a name="your-algorithms-containers-inference-private"></a>

默认情况下， SageMaker Amazon AI 托管允许您使用存储在 Amazon ECR 中的图像来构建容器以进行实时推理。或者，您可以从私有 Docker 注册表中的映像来构建容器用于实时推理。私有注册表必须可以从您账户中的 Amazon VPC 访问。您基于存储在私有 Docker 注册表中的映像创建的模型，必须配置为连接到可以访问私有 Docker 注册表的同一个 VPC。有关连接到 VPC 中模型的更多信息，请参阅[让 SageMaker AI 托管的终端节点访问您的 Amazon VPC 中的资源](host-vpc.md)。

您的 Docker 注册表必须使用来自已知公共证书颁发机构 (CA) 的 TLS 证书进行保护。

**注意**  
您的私有 Docker 注册表必须允许来自您在模型的 VPC 配置中指定的安全组的入站流量，这样 SageMaker AI 托管才能从您的注册表中提取模型映像。  
SageMaker DockerHub 如果您的 VPC 内有通往开放互联网的路径，AI 可以从中提取模型图像。

**Topics**
+ [

## 在 Amazon Elastic Container Registry 以外的私有 Docker 注册表中存储映像
](#your-algorithms-containers-inference-private-registry)
+ [

## 使用来自私有 Docker 注册表的映像进行实时推理
](#your-algorithms-containers-inference-private-use)
+ [

## 允许 SageMaker AI 向私有 Docker 注册表进行身份验证
](#inference-private-docker-authenticate)
+ [

## 创建 Lambda 函数
](#inference-private-docker-lambda)
+ [

## 向 Lambda 授予执行角色权限
](#inference-private-docker-perms)
+ [

## 为 Lambda 创建接口 VPC 端点
](#inference-private-docker-vpc-interface)

## 在 Amazon Elastic Container Registry 以外的私有 Docker 注册表中存储映像
<a name="your-algorithms-containers-inference-private-registry"></a>

要使用私有 Docker 注册表来存储用于 SageMaker 人工智能实时推断的图像，请创建一个可从您的 Amazon VPC 访问的私有注册表。有关创建 Docker 注册表的信息，请参阅 Docker 文档中的[部署注册表服务器](https://docs.docker.com/registry/deploying/)。Docker 注册表必须遵守以下要求：
+ 注册表必须是 [Docker 注册表 HTTP API V2](https://docs.docker.com/registry/spec/api/) 注册表。
+ Docker 注册表必须可以从您在创建模型时，在 `VpcConfig` 参数中指定的相同 VPC 访问。

## 使用来自私有 Docker 注册表的映像进行实时推理
<a name="your-algorithms-containers-inference-private-use"></a>

创建模型并将其部署到 SageMaker AI 托管时，您可以指定它使用私有 Docker 注册表中的镜像来构建推理容器。在您传递给 [create\$1model](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.create_model) 函数的调用的 `PrimaryContainer` 参数中，在 `ImageConfig` 对象中指定此项。

**将存储在私有 Docker 注册表中的映像用于推理容器**

1. 创建映像配置对象并为 `RepositoryAccessMode` 字段指定 `Vpc` 的值。

   ```
   image_config = {
                       'RepositoryAccessMode': 'Vpc'
                  }
   ```

1. 如果您的私有 Docker 注册表需要身份验证，请添加 `RepositoryAuthConfig` 对象到映像配置对象。在`RepositoryAuthConfig`对象的`RepositoryCredentialsProviderArn`字段中，指定函数的亚马逊资源名称 (ARN)，该 Amazon Lambda 函数提供允许 SageMaker AI 向您的私有 Docker 注册表进行身份验证的凭证。有关如何创建 Lambda 函数以提供身份验证的信息，请参阅[允许 SageMaker AI 向私有 Docker 注册表进行身份验证](#inference-private-docker-authenticate)。

   ```
   image_config = {
                       'RepositoryAccessMode': 'Vpc',
                       'RepositoryAuthConfig': {
                          'RepositoryCredentialsProviderArn': 'arn:aws:lambda:Region:Acct:function:FunctionName'
                        }
                  }
   ```

1. 创建要传递到 `create_model` 的主容器对象，使用您在上一步中创建的映像配置对象。

   在[摘要](https://docs.docker.com/engine/reference/commandline/pull/#pull-an-image-by-digest-immutable-identifier)表中提供您的映像。如果您使用`:latest`标签提供图像，则存在 SageMaker 人工智能提取比预期更新的图像版本的风险。使用摘要表单可确保 SageMaker AI 提取预期的图像版本。

   ```
   primary_container = {
       'ContainerHostname': 'ModelContainer',
       'Image': 'myteam.myorg.com/docker-local/my-inference-image:<IMAGE-TAG>',
       'ImageConfig': image_config
   }
   ```

1. 指定要传递给 `create_model` 的模型名称和执行角色。

   ```
   model_name = 'vpc-model'
   execution_role_arn = 'arn:aws:iam::123456789012:role/SageMakerExecutionRole'
   ```

1. 为您模型的 VPC 配置指定一个或多个安全组和子网。您的私有 Docker 注册表必须允许来自您指定的安全组的入站流量。您指定的子网必须与私有 Docker 注册表位于同一 VPC 中。

   ```
   vpc_config = {
       'SecurityGroupIds': ['sg-0123456789abcdef0'],
       'Subnets': ['subnet-0123456789abcdef0','subnet-0123456789abcdef1']
   }
   ```

1. 获取 Boto3 SageMaker 人工智能客户端。

   ```
   import boto3
   sm = boto3.client('sagemaker')
   ```

1. 通过调用 `create_model` 来创建模型，使用您在之前的步骤中为 `PrimaryContainer` 和 `VpcConfig` 参数指定的值。

   ```
   try:
       resp = sm.create_model(
           ModelName=model_name,
           PrimaryContainer=primary_container,
           ExecutionRoleArn=execution_role_arn,
           VpcConfig=vpc_config,
       )
   except Exception as e:
       print(f'error calling CreateModel operation: {e}')
   else:
       print(resp)
   ```

1. 最后，调用 [create\$1endpoint\$1config](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.create_endpoint_config) 和 [create\$1endpoint](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.create_endpoint) 以创建托管端点，使用您在之前步骤中创建的模型。

   ```
   endpoint_config_name = 'my-endpoint-config'
   sm.create_endpoint_config(
       EndpointConfigName=endpoint_config_name,
       ProductionVariants=[
           {
               'VariantName': 'MyVariant',
               'ModelName': model_name,
               'InitialInstanceCount': 1,
               'InstanceType': 'ml.t2.medium'
           },
       ],
   )
   
   endpoint_name = 'my-endpoint'
   sm.create_endpoint(
       EndpointName=endpoint_name,
       EndpointConfigName=endpoint_config_name,
   )
   
   sm.describe_endpoint(EndpointName=endpoint_name)
   ```

## 允许 SageMaker AI 向私有 Docker 注册表进行身份验证
<a name="inference-private-docker-authenticate"></a>

[要从需要身份验证的私有 Docker 注册表中提取推理映像，请创建一个提供证书的 Amazon Lambda 函数，并在调用 create\$1model 时提供 Lambda 函数的亚马逊资源名称 (ARN)。](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.create_model)当 SageMaker AI 运行时`create_model`，它会调用您指定的 Lambda 函数来获取凭证，以便向 Docker 注册表进行身份验证。

## 创建 Lambda 函数
<a name="inference-private-docker-lambda"></a>

创建一个返回以下格式的响应的 Amazon Lambda 函数：

```
def handler(event, context):
   response = {
      "Credentials": {"Username": "username", "Password": "password"}
   }
   return response
```

根据您为私有 Docker 注册表设置的身份验证方式，Lambda 函数返回的凭证可能是以下任一内容：
+ 如果您将私有 Docker 注册表设置为使用基本身份验证，请提供登录凭证以便向注册表进行身份验证。
+ 如果您将私有 Docker 注册表设置为使用持有者令牌身份验证，则登录凭证将发送到您的授权服务器，该服务器将返回一个持有者令牌，然后可用于向私有 Docker 注册表进行身份验证。

## 向 Lambda 授予执行角色权限
<a name="inference-private-docker-perms"></a>

用于调用的执行角色`create_model`必须具有调用 Amazon Lambda 函数的权限。将以下内容添加到您执行角色的权限策略中。

```
{
    "Effect": "Allow",
    "Action": [
        "lambda:InvokeFunction"
    ],
    "Resource": [
        "arn:aws:lambda:*:*:function:*myLambdaFunction*"
    ]
}
```

您*myLambdaFunction*的 Lambda 函数的名称在哪里。有关编辑角色权限策略的信息，请参阅《Amazon Identity and Access Management 用户指南》**中的[修改角色权限策略（控制台）](https://docs.amazonaws.cn/IAM/latest/UserGuide/roles-managingrole-editing-console.html#roles-modify_permissions-policy)。

**注意**  
附加了`AmazonSageMakerFullAccess`托管策略的执行角色有权调用其名称**SageMaker**中包含的任何 Lambda 函数。

## 为 Lambda 创建接口 VPC 端点
<a name="inference-private-docker-vpc-interface"></a>

创建接口端点，以便您的 Amazon VPC 无需通过互联网发送流量，即可与 Amazon Lambda 函数通信。有关如何完成此操作的更多信息，请参阅《Amazon Lambda 开发人员指南》**中的[为 Lambda 配置接口 VPC 端点](https://docs.amazonaws.cn/lambda/latest/dg/configuration-vpc-endpoints.html)。

SageMaker AI 托管通过您的 VPC 向发送请求以`lambda.region.amazonaws.com`调用您的 Lambda 函数。如果您在创建接口端点时选择私有 DNS 名称，则 Amazon Route 53 会将调用路由到 Lambda 接口端点。如果您使用不同的 DNS 提供商，请务必将 `lambda.region.amazonaws.com` 映射到您的 Lambda 接口端点。

# 自定义用于批次转换的推理代码
<a name="your-algorithms-batch-code"></a>

本节介绍了 Amazon A SageMaker I 如何与 Docker 容器交互，该容器运行您自己的推理代码以进行批量转换。使用此信息编写推理代码并创建 Docker 镜像。

**Topics**
+ [

## SageMaker AI 如何运行你的推理图像
](#your-algorithms-batch-code-run-image)
+ [

## SageMaker AI 如何加载你的模型工件
](#your-algorithms-batch-code-load-artifacts)
+ [

## 容器如何使用请求
](#your-algorithms-batch-code-how-containe-serves-requests)
+ [

## 容器应如何响应推理请求
](#your-algorithms-batch-code-how-containers-should-respond-to-inferences)
+ [

## 容器应如何响应运行状况检查 (Ping) 请求
](#your-algorithms-batch-algo-ping-requests)

## SageMaker AI 如何运行你的推理图像
<a name="your-algorithms-batch-code-run-image"></a>

要配置容器以作为可执行文件运行，请使用 Dockerfile 中的 `ENTRYPOINT` 指令。注意以下几点：
+ 对于批量变换， SageMaker AI 会代表你调用模型。 SageMaker AI 按以下方式运行容器：

  ```
  docker run image serve
  ```

  在批量转换时，输入格式必须是可以拆分为较小文件的格式，以便并行处理。这些格式包括 CSV、[JSON、JSO](https://www.json.org/json-en.html) [N Lines [TFRecord](https://www.tensorflow.org/tutorials/load_data/tfrecord)](https://jsonlines.org/)和 [Recordio。](https://mesos.apache.org/documentation/latest/recordio/)

  SageMaker AI 通过在图像名称后指定`serve`参数来覆盖容器中的默认`CMD`语句。`serve` 参数覆盖您使用 Dockerfile 中的 `CMD` 命令提供的参数。

   
+ 建议您使用 `exec` 形式的 `ENTRYPOINT` 指令：

  ```
  ENTRYPOINT ["executable", "param1", "param2"]
  ```

  例如：

  ```
  ENTRYPOINT ["python", "k_means_inference.py"]
  ```

   
+ SageMaker AI 会设置容器中[https://docs.amazonaws.cn/sagemaker/latest/APIReference/API_CreateModel.html](https://docs.amazonaws.cn/sagemaker/latest/APIReference/API_CreateModel.html)和容器[https://docs.amazonaws.cn/sagemaker/latest/APIReference/API_CreateTransformJob.html](https://docs.amazonaws.cn/sagemaker/latest/APIReference/API_CreateTransformJob.html)上指定的环境变量。此外，将填充以下环境变量：
  + 当容器运行批量转换时，`SAGEMAKER_BATCH` 设置为 `true`。
  + `SAGEMAKER_MAX_PAYLOAD_IN_MB` 设置为通过 HTTP 发送到容器的最大负载大小。
  + 在调用时，如果容器每个调用发送一条记录，则将 `SAGEMAKER_BATCH_STRATEGY`设置为 `SINGLE_RECORD`，如果容器获取负载中可以容纳的尽可能多的记录，则设置为 `MULTI_RECORD`。
  + `SAGEMAKER_MAX_CONCURRENT_TRANSFORMS` 设置为可以同时打开的 `/invocations` 请求的最大数量。
**注意**  
最后三个环境变量来自用户发出的 API 调用。如果用户没有为这些变量设置值，则不会传递它们。在这种情况下，使用默认值或算法请求的值（用于响应 `/execution-parameters`）。
+ 如果您为模型推理使用 GPU 设备（通过在 `CreateTransformJob` 请求中指定基于 GPU 的 ML 计算实例），请确保您的容器与 nvidia-docker 兼容。不要将 NVIDIA 驱动程序与映像捆绑。有关 nvidia-docker 的更多信息，请参阅 [NVIDIA/nvidia-docker](https://github.com/NVIDIA/nvidia-docker)。

   
+ 你不能使用`init`初始化器作为 SageMaker AI 容器中的入口点，因为它会被 train 和 serve 参数混淆。

  

## SageMaker AI 如何加载你的模型工件
<a name="your-algorithms-batch-code-load-artifacts"></a>

在 [https://docs.amazonaws.cn/sagemaker/latest/APIReference/API_CreateModel.html](https://docs.amazonaws.cn/sagemaker/latest/APIReference/API_CreateModel.html) 请求中，容器定义包含 `ModelDataUrl` 参数，它标识在 Amazon S3 中存储模型构件的位置。当您使用 SageMaker AI 进行推断时，它会使用此信息来确定从何处复制模型工件。它将构件复制到 Docker 容器中的 `/opt/ml/model` 目录，以供您的推理代码使用。

`ModelDataUrl` 参数必须指向 tar.gz 文件。否则， SageMaker AI 无法下载该文件。如果您使用 SageMaker AI 训练模型，它会将工件作为单个压缩的 tar 文件保存到 Amazon S3 中。如果您在其他框架中训练模型，则需要将模型工件作为压缩的 tar 文件存储在 Amazon S3 中。 SageMaker 在批处理转换作业开始之前，AI 会解压缩此 tar 文件并将其保存在容器中的`/opt/ml/model`目录中。

## 容器如何使用请求
<a name="your-algorithms-batch-code-how-containe-serves-requests"></a>

容器必须实施一个 Web 服务器，以响应端口 8080 上的调用和 ping 请求。对于批量转换，您可以选择设置算法以实现执行参数请求，从而向 AI 提供动态运行时配置。 SageMaker SageMaker AI 使用以下终端节点：
+ `ping`— 用于定期检查容器的运行状况。 SageMaker 在发送调用请求之前，AI 会等待 HTTP `200` 状态码和空正文以获得 ping 请求成功。当调用请求发出后，您可以使用 ping 请求将模型加载到内存中，以生成推理。
+ （可选）`execution-parameters` – 允许算法在运行期间为作业提供最佳调整参数。该算法根据内存和容器的 CPUs 可用内存为作业选择相应的`MaxConcurrentTransforms``BatchStrategy`、和`MaxPayloadInMB`值。

在调用调用请求之前， SageMaker AI 会尝试调用执行参数请求。创建批处理转换作业时，可以为`MaxConcurrentTransforms``BatchStrategy`、和`MaxPayloadInMB`参数提供值。 SageMaker AI 使用以下优先顺序确定这些参数的值：

1. 在您创建 `CreateTransformJob` 请求时提供的参数值。

1. 当 SageMaker AI 调用执行参数端点时，模型容器返回的值>

1. 默认参数值，如下表所列。    
[\[See the AWS documentation website for more details\]](http://docs.amazonaws.cn/sagemaker/latest/dg/your-algorithms-batch-code.html)

`GET` execution-parameters 请求的响应是一个 JSON 对象，带有 `MaxConcurrentTransforms`、`BatchStrategy` 和 `MaxPayloadInMB` 参数的键值。以下是一个有效响应的示例：

```
{
“MaxConcurrentTransforms”: 8,
“BatchStrategy": "MULTI_RECORD",
"MaxPayloadInMB": 6
}
```

## 容器应如何响应推理请求
<a name="your-algorithms-batch-code-how-containers-should-respond-to-inferences"></a>

为了获得推论，Amazon A SageMaker I 会向推理容器发送一个 POST 请求。POST 请求正文包含来自 Amazon S3 的数据。Amazon SageMaker AI 将请求传递到容器，然后从容器返回推理结果，将响应中的数据保存到 Amazon S3。

要接收推理请求，容器必须有一个在端口 8080 上侦听的 Web 服务器，并且必须接受到针对 `/invocations` 终端节点的 POST 请求。推理请求超时和最大重试次数可通过 `[ModelClientConfig](https://docs.amazonaws.cn/sagemaker/latest/APIReference/API_ModelClientConfig.html)` 配置。

## 容器应如何响应运行状况检查 (Ping) 请求
<a name="your-algorithms-batch-algo-ping-requests"></a>

容器上的最简单要求是使用 HTTP 200 状态代码和空白正文进行响应。这向 SageMaker AI 表明容器已准备好接受`/invocations`终端节点的推理请求。

虽然最低限制供容器用来返回静态 200，但容器开发人员可使用此功能执行更深入的检查。`/ping` 尝试的请求超时为 2 秒。