

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 Amazon IoT Device SDK 与 Greengrass 原子核、其他组件进行通信，以及 Amazon IoT Core
<a name="interprocess-communication"></a>

在核心设备上运行的组件可以使用中的 Amazon IoT Greengrass 核心进程间通信 (IPC) 库与 Amazon IoT Greengrass 核心和其他 Gre Amazon IoT Device SDK engrass 组件进行通信。要开发和运行使用 IPC 的自定义组件，必须使用连接 Amazon IoT Device SDK 到 C Amazon IoT Greengrass ore IPC 服务并执行 IPC 操作。

IPC 接口支持两种类型的操作：
+ **请求/响应**

  组件向 IPC 服务发送请求并接收包含请求结果的响应。
+ **订阅**

  组件向 IPC 服务发送订阅请求，并期望响应中包含事件消息流。组件提供订阅处理程序，用于处理事件消息、错误和流关闭。 Amazon IoT Device SDK 包括一个处理程序接口，其中包含每个 IPC 操作的正确响应和事件类型。有关更多信息，请参阅 [订阅 IPC 事件流](#ipc-subscribe-operations)。

**Topics**
+ [IPC 客户端版本](#ipc-client-versions)
+ [SDKs 支持进程间通信](#ipc-requirements)
+ [Connect 到 C Amazon IoT Greengrass ore IPC 服务](#ipc-service-connect)
+ [授权组件执行 IPC 操作](#ipc-authorization-policies)
+ [订阅 IPC 事件流](#ipc-subscribe-operations)
+ [IPC 最佳实践](#ipc-best-practices)
+ [发布/订阅本地消息](ipc-publish-subscribe.md)
+ [发布/订阅 Amazon IoT Core MQTT 消息](ipc-iot-core-mqtt.md)
+ [与组件生命周期交互](ipc-component-lifecycle.md)
+ [与组件配置交互](ipc-component-configuration.md)
+ [检索密钥值](ipc-secret-manager.md)
+ [与本地影子交互](ipc-local-shadows.md)
+ [管理本地部署和组件](ipc-local-deployments-components.md)
+ [对客户端设备进行身份验证和授权](ipc-client-device-auth.md)

## IPC 客户端版本
<a name="ipc-client-versions"></a>

在更高版本的 Java 和 Python 中 SDKs， Amazon IoT Greengrass 提供了 IPC 客户端的改进版本，名为 IPC 客户端 V2。IPC 客户端 V2：
+ 减少使用 IPC 操作所需的代码量，并有助于避免 IPC 客户端 V1 可能出现的常见错误。
+ 在独立的线程中调用订阅处理程序回调，因此您现在可以在订阅处理程序回调中运行阻止代码，包括其他 IPC 函数调用。IPC 客户端 V1 使用相同的线程与 IPC 服务器通信并调用订阅处理程序回调。
+ 允许您使用 Lambda 表达式（Java）或函数（Python）调用订阅操作。IPC 客户端 V1 要求您定义订阅处理程序类。
+ 提供每个 IPC 操作的同步和异步版本。IPC 客户端 V1 仅提供每个操作的异步版本。

我们建议使用 IPC 客户端 V2 以利用这些改进。但是，本文档和一些在线内容中的许多示例仅演示如何使用 IPC 客户端 V1。您可以使用以下示例和教程来查看使用 IPC 客户端 V2 的示例组件：
+ [PublishToTopic例子](ipc-publish-subscribe.md#ipc-operation-publishtotopic-examples)
+ [SubscribeToTopic例子](ipc-publish-subscribe.md#ipc-operation-subscribetotopic-examples)
+ [教程：开发可延迟组件更新的 Greengrass 组件](defer-component-updates-tutorial.md)
+ [教程：通过 MQTT 与本地 IoT 设备进行交互](client-devices-tutorial.md)

目前， Amazon IoT Device SDK 适用于 C\$1\$1 的 v2 仅支持 IPC 客户端 V1。

## SDKs 支持进程间通信
<a name="ipc-requirements"></a>

C Amazon IoT Greengrass ore IPC 库包含在以下 Amazon IoT Device SDK 版本中。


| SDK | 最低版本 | 用法 | 
| --- | --- | --- | 
|  [Amazon IoT Device SDK 适用于 Java v2](https://github.com/aws/aws-iot-device-sdk-java-v2)  |  v1.6.0  |  请参阅 [用 Amazon IoT Device SDK 于 Java v2（IPC 客户端 V2）](#ipc-java-v2)。  | 
|  [Amazon IoT Device SDK 适用于 Python v2](https://github.com/aws/aws-iot-device-sdk-python-v2)  |  v1.9.0  |  请参阅 [用 Amazon IoT Device SDK 于 Python v2（IPC 客户端 V2）](#ipc-python-v2)。  | 
|  [Amazon IoT Device SDK 适用于 C\$1\$1 v2](https://github.com/aws/aws-iot-device-sdk-cpp-v2)  |  v1.17.0  |  请参阅 [用 Amazon IoT Device SDK 于 C\$1\$1 v2](#ipc-cpp)。  | 
|  [Amazon IoT Device SDK 适用于 JavaScript v2](https://github.com/aws/aws-iot-device-sdk-js-v2)  |  v1.12.0  |  请参阅 [用 Amazon IoT Device SDK 于 JavaScript v2（IPC 客户端 V1）](#ipc-nodejs)。  | 

## Connect 到 C Amazon IoT Greengrass ore IPC 服务
<a name="ipc-service-connect"></a>

要在自定义组件中使用进程间通信，必须创建与 C Amazon IoT Greengrass ore 软件运行的 IPC 服务器套接字的连接。完成以下任务，下载并使用您选择 Amazon IoT Device SDK 的语言。

### 用 Amazon IoT Device SDK 于 Java v2（IPC 客户端 V2）
<a name="ipc-java-v2"></a>

**要使用 Amazon IoT Device SDK 适用于 Java v2（IPC 客户端 V2）**

1. 下载[适用于 Java v2 的Amazon IoT Device SDK](https://github.com/aws/aws-iot-device-sdk-java-v2)（v1.6.0 或更高版本）

1. <a name="use-ipc-java-component-install-step"></a>执行以下某种操作以在组件中运行自定义代码：
   + 将您的组件构建为包含的 JAR 文件 Amazon IoT Device SDK，然后在组件配方中运行此 JAR 文件。
   + 将 Amazon IoT Device SDK JAR 定义为组件工件，并在组件配方中运行应用程序时将该构件添加到类路径中。

1. 使用以下代码创建 IPC 客户端。

   ```
   try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) {
       // Use client.
   } catch (Exception e) {
       LOGGER.log(Level.SEVERE, "Exception occurred when using IPC.", e);
       System.exit(1);
   }
   ```

### 用 Amazon IoT Device SDK 于 Python v2（IPC 客户端 V2）
<a name="ipc-python-v2"></a>

**要使用 Amazon IoT Device SDK 适用于 Python v2（IPC 客户端 V2）**

1. 下载[适用于 Python 的Amazon IoT Device SDK](https://github.com/aws/aws-iot-device-sdk-python-v2)（v1.9.0 或更高版本）。

1. <a name="use-ipc-python-component-install-step"></a>将 SDK 的[安装步骤](https://github.com/aws/aws-iot-device-sdk-python-v2#installation)添加到组件配方中的安装生命周期。

1. 创建与 Amazon IoT Greengrass 核心 IPC 服务的连接。使用以下代码创建 IPC 客户端。

   ```
   from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
   
   try:
       ipc_client = GreengrassCoreIPCClientV2()
       # Use IPC client.
   except Exception:
       print('Exception occurred when using IPC.', file=sys.stderr)
       traceback.print_exc()
       exit(1)
   ```

### 用 Amazon IoT Device SDK 于 C\$1\$1 v2
<a name="ipc-cpp"></a>

<a name="iot-device-sdk-cpp-v2-build-requirements-intro"></a>要构建 C\$1\$1 版 Amazon IoT Device SDK v2，设备必须具有以下工具：<a name="iot-device-sdk-cpp-v2-build-requirements"></a>
+ C\$1\$1 11 或更高版本
+ CMake 3.1 或更高版本
+ 以下编译器之一：
  + GCC 4.8 或更高版本
  + Clang 3.9 或更高版本
  + MSVC 2015 或更高版本

**要使用 Amazon IoT Device SDK 适用于 C\$1\$1 v2 的**

1. 下载[适用于 C\$1\$1 v2 的Amazon IoT Device SDK](https://github.com/aws/aws-iot-device-sdk-cpp-v2)（v1.17.0 或更高版本）。

1. 按照[自述文件中的安装说明](https://github.com/aws/aws-iot-device-sdk-cpp-v2#Installation)从源代码构建 C\$1\$1 v2 版。 Amazon IoT Device SDK 

1. 在 C\$1\$1 构建工具中，链接您在上一步中构建的 Greengrass IPC 库 `AWS::GreengrassIpc-cpp`。以下`CMakeLists.txt`示例将 Greengrass IPC 库链接到您用来构建的项目。 CMake

   ```
   cmake_minimum_required(VERSION 3.1)
   project (greengrassv2_pubsub_subscriber)
   
   file(GLOB MAIN_SRC
           "*.h"
           "*.cpp"
           )
   add_executable(${PROJECT_NAME} ${MAIN_SRC})
   
   set_target_properties(${PROJECT_NAME} PROPERTIES
           LINKER_LANGUAGE CXX
           CXX_STANDARD 11)
   find_package(aws-crt-cpp PATHS ~/sdk-cpp-workspace/build)
   find_package(EventstreamRpc-cpp PATHS ~/sdk-cpp-workspace/build)
   find_package(GreengrassIpc-cpp PATHS ~/sdk-cpp-workspace/build)
   target_link_libraries(${PROJECT_NAME} AWS::GreengrassIpc-cpp)
   ```

1. 在组件代码中，创建与 C Amazon IoT Greengrass ore IPC 服务的连接以创建 IPC 客户端 () `Aws::Greengrass::GreengrassCoreIpcClient`。您必须定义 IPC 连接生命周期处理程序，用于处理 IPC 连接、断开连接和错误事件。以下示例创建了 IPC 客户端和 IPC 连接生命周期处理程序，在 IPC 客户端连接、断开连接和遇到错误时进行打印。

   ```
   #include <iostream>
   
   #include <aws/crt/Api.h>
   #include <aws/greengrass/GreengrassCoreIpcClient.h>
   
   using namespace Aws::Crt;
   using namespace Aws::Greengrass;
   
   class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
       void OnConnectCallback() override {
           std::cout << "OnConnectCallback" << std::endl;
       }
   
       void OnDisconnectCallback(RpcError error) override {
           std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
           exit(-1);
       }
   
       bool OnErrorCallback(RpcError error) override {
           std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
           return true;
       }
   };
   
   int main() {
       // Create the IPC client.
       ApiHandle apiHandle(g_allocator);
       Io::EventLoopGroup eventLoopGroup(1);
       Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
       Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
       IpcClientLifecycleHandler ipcLifecycleHandler;
       GreengrassCoreIpcClient ipcClient(bootstrap);
       auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
       if (!connectionStatus) {
           std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
           exit(-1);
       }
       
       // Use the IPC client to create an operation request.
       
       // Activate the operation request.
       auto activate = operation.Activate(request, nullptr);
       activate.wait();
   
       // Wait for Greengrass Core to respond to the request.
       auto responseFuture = operation.GetResult();
       if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
           std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
           exit(-1);
       }
   
       // Check the result of the request.
       auto response = responseFuture.get();
       if (response) {
           std::cout << "Successfully published to topic: " << topic << std::endl;
       } else {
           // An error occurred.
           std::cout << "Failed to publish to topic: " << topic << std::endl;
           auto errorType = response.GetResultType();
           if (errorType == OPERATION_ERROR) {
               auto *error = response.GetOperationError();
               std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
           } else {
               std::cout << "RPC error: " << response.GetRpcError() << std::endl;
           }
           exit(-1);
       }
       
       return 0;
   }
   ```

1. 要在组件中运行自定义代码，请将代码构建为二进制构件，然后在组件配方中运行二进制构件。将构件的`Execute`权限设置为`OWNER`，以使 Amazon IoT Greengrass 核心软件能够运行二进制工件。

   组件配方的 `Manifests` 部分可能类似于以下示例。

------
#### [ JSON ]

   ```
   {
     ...
     "Manifests": [
       {
         "Lifecycle": {
           "Run": "{artifacts:path}/greengrassv2_pubsub_subscriber"
         },
         "Artifacts": [
           {
             "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber",
             "Permission": {
               "Execute": "OWNER"
             }
           }
         ]
       }
     ]
   }
   ```

------
#### [ YAML ]

   ```
   ...
   Manifests:
     - Lifecycle:
         Run: {artifacts:path}/greengrassv2_pubsub_subscriber
       Artifacts:
         - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber
           Permission:
             Execute: OWNER
   ```

------

### 用 Amazon IoT Device SDK 于 JavaScript v2（IPC 客户端 V1）
<a name="ipc-nodejs"></a>

<a name="iot-device-sdk-nodejs-build-requirements-intro"></a>要编译 Amazon IoT Device SDK 适用于 JavaScript v2 的，以便与 NodeJS 一起使用，设备必须具有以下工具：<a name="iot-device-sdk-nodejs-build-requirements"></a>
+ NodeJS 10.0 或更高版本
  + 运行 `node -v` 以检查 Node 版本。
+ CMake 3.1 或更高版本

**要使用 Amazon IoT Device SDK 适用于 JavaScript v2 的（IPC 客户端 V1）**

1. 下载[Amazon IoT Device SDK 适用于 JavaScript v2（v](https://github.com/aws/aws-iot-device-sdk-js-v2) 1.12.10 或更高版本）的。

1. 按照[自述文件中的安装说明](https://github.com/aws/aws-iot-device-sdk-js-v2/tree/v1.12.1#installation)从源代码构建 JavaScript v2 版。 Amazon IoT Device SDK 

1. 创建与 Amazon IoT Greengrass 核心 IPC 服务的连接。完成以下步骤，以创建 IPC 客户端并建立连接。

1. 使用以下代码创建 IPC 客户端。

   ```
   import * as greengrascoreipc from 'aws-iot-device-sdk-v2';
   
   let client = greengrascoreipc.createClient();
   ```

1. 使用以下代码，建立从组件到 Greengrass Nucleus 的连接。

   ```
   await client.connect();
   ```

## 授权组件执行 IPC 操作
<a name="ipc-authorization-policies"></a>

要允许您的自定义组件使用某些 IPC 操作，您必须定义*授权策略*，允许组件对某些资源执行操作。每个授权策略都定义了策略允许的操作列表和资源列表。例如， publish/subscribe 消息 IPC 服务定义了主题资源的发布和订阅操作。您可以使用 `*` 通配符以允许访问所有操作或所有资源。

您可以使用 `accessControl` 配置参数定义授权策略。您可以在组件配方中或在部署组件时设置该参数。`accessControl` 对象将 IPC 服务标识符映射到授权策略列表。您可以为每个 IPC 服务定义多个授权策略来控制访问权限。每个授权策略都有一个策略 ID，该 ID 在所有组件中必须唯一。

**提示**  
要创建唯一的策略 IDs，您可以组合组件名称、IPC 服务名称和计数器。例如，名为的组件`com.example.HelloWorld`可以定义两个 publish/subscribe 授权策略，如下所示 IDs：  
`com.example.HelloWorld:pubsub:1`
`com.example.HelloWorld:pubsub:2`

授权策略使用以下格式。此对象是 `accessControl` 配置参数。

------
#### [ JSON ]

```
{
  "IPC service identifier": {
    "policyId": {
      "policyDescription": "description",
      "operations": [
        "operation1",
        "operation2"
      ],
      "resources": [
        "resource1",
        "resource2"
      ]
    }
  }
}
```

------
#### [ YAML ]

```
IPC service identifier:
  policyId:
    policyDescription: description
    operations:
      - operation1
      - operation2
    resources:
      - resource1
      - resource2
```

------

### 授权策略中的通配符
<a name="ipc-authorization-policy-wildcards"></a>

您可以在 IPC 授权策略的 `resources` 元素中使用 `*` 通配符，以允许访问单个授权策略中的多个资源。
+ 在所有版本的 [Greengrass Nucleus](greengrass-nucleus-component.md) 中，您可以指定单个 `*` 作为资源，以允许访问所有资源。
+ 在 [Greengrass Nucleus](greengrass-nucleus-component.md) v2.6.0 及更高版本中，您可以在资源中指定 `*` 字符，以匹配任何字符组合。例如，您可以指定 `factory/1/devices/Thermostat*/status`，以允许访问工厂中所有恒温器设备的状态主题，其中每台设备的名称都以 `Thermostat` 开头。

在为 Amazon IoT Core MQTT IPC 服务定义授权策略时，也可以使用 MQTT 通配符（`+`和`#`）来匹配多个资源。有关更多信息，请参阅 [MQTT IPC 授权策略中的 Amazon IoT Core MQTT 通配符](ipc-iot-core-mqtt.md#ipc-iot-core-mqtt-authorization-mqtt-wildcards)。

### 授权策略中的配方变量
<a name="ipc-authorization-policy-recipe-variables"></a>

[如果你使用 [Greengrass](greengrass-nucleus-component.md) nucleus v2.6.0 或更高版本，并且将 Greeng [interpolateComponentConfiguration](greengrass-nucleus-component.md#greengrass-nucleus-component-configuration-interpolate-component-configuration)rass nucleus 的配置选项设置为，则可以在授权策略中使用配方变量。`true``{iot:thingName}`](component-recipe-reference.md#recipe-variables)当您需要包含核心设备名称的授权策略（例如针对 MQTT 主题或设备影子）时，您可以使用此配方变量为一组核心设备配置单个授权策略。例如，您可以允许组件访问以下适用于影子 IPC 操作的资源。

```
$aws/things/{iot:thingName}/shadow/
```

### 授权策略中的特殊字符
<a name="ipc-authorization-policy-special-characters"></a>

要在授权策略中指定文字 `*` 或 `?` 字符，您必须使用转义序列。以下转义序列指示 Amazon IoT Greengrass Core 软件使用字面值而不是字符的特殊含义。例如，`*` 字符是与任意字符组合匹配的[通配符](#ipc-authorization-policy-wildcards)。


| 文字字符 | 转义序列 | 注意 | 
| --- | --- | --- | 
|  `*`  |  `${*}`  |  | 
|  `?`  |  `${?}`  |  Amazon IoT Greengrass 目前不支持与任何单个字符匹配的`?`通配符。  | 
|  `$`  |  `${$}`  |  使用此转义序列可匹配包含 `${` 的资源。例如，要匹配名为 `${resourceName}` 的资源，您必须指定 `${$}{resourceName}`。否则，要匹配包含 `$` 的资源，您可以使用文字 `$`，例如允许访问以 `$aws` 开头的主题。  | 

### 授权策略示例
<a name="ipc-authorization-policy-examples"></a>

您可以参考以下授权策略示例，帮助您为组件配置授权策略。

**Example 具有授权策略的组件配方示例**  
以下示例组件配方包括可定义授权策略的 `accessControl` 对象。此策略授权 `com.example.HelloWorld` 组件发布到 `test/topic` 主题。  

```
{
  "RecipeFormatVersion": "2020-01-25",
  "ComponentName": "com.example.HelloWorld",
  "ComponentVersion": "1.0.0",
  "ComponentDescription": "A component that publishes messages.",
  "ComponentPublisher": "Amazon",
  "ComponentConfiguration": {
    "DefaultConfiguration": {
      "accessControl": {
        "aws.greengrass.ipc.pubsub": {
          "com.example.HelloWorld:pubsub:1": {
            "policyDescription": "Allows access to publish to test/topic.",
            "operations": [
              "aws.greengrass#PublishToTopic"
            ],
            "resources": [
              "test/topic"
            ]
          }
        }
      }
    }
  },
  "Manifests": [
    {
      "Lifecycle": {
        "Run": "java -jar {artifacts:path}/HelloWorld.jar"
      }
    }
  ]
}
```

```
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.HelloWorld
ComponentVersion: '1.0.0'
ComponentDescription: A component that publishes messages.
ComponentPublisher: Amazon
ComponentConfiguration:
  DefaultConfiguration:
    accessControl:
      aws.greengrass.ipc.pubsub:
        "com.example.HelloWorld:pubsub:1":
          policyDescription: Allows access to publish to test/topic.
          operations:
            - "aws.greengrass#PublishToTopic"
          resources:
            - "test/topic"
Manifests:
  - Lifecycle:
      Run: |-
        java -jar {artifacts:path}/HelloWorld.jar
```

**Example 使用授权策略的组件配置更新示例**  
部署中的以下配置更新示例指定使用可定义授权策略的 `accessControl` 对象来配置组件。此策略授权 `com.example.HelloWorld` 组件发布到 `test/topic` 主题。    
**要合并的配置**  

```
{
  "accessControl": {
    "aws.greengrass.ipc.pubsub": {
      "com.example.HelloWorld:pubsub:1": {
        "policyDescription": "Allows access to publish to test/topic.",
        "operations": [
          "aws.greengrass#PublishToTopic"
        ],
        "resources": [
          "test/topic"
        ]
      }
    }
  }
}
```
以下命令会创建对核心设备的部署。  

```
aws greengrassv2 create-deployment --cli-input-json file://hello-world-deployment.json
```
`hello-world-deployment.json` 文件包含以下 JSON 文档。  

```
{
  "targetArn": "arn:aws:iot:us-west-2:123456789012:thing/MyGreengrassCore",
  "deploymentName": "Deployment for MyGreengrassCore",
  "components": {
    "com.example.HelloWorld": {
      "componentVersion": "1.0.0",
      "configurationUpdate": {
        "merge": "{\"accessControl\":{\"aws.greengrass.ipc.pubsub\":{\"com.example.HelloWorld:pubsub:1\":{\"policyDescription\":\"Allows access to publish to test/topic.\",\"operations\":[\"aws.greengrass#PublishToTopic\"],\"resources\":[\"test/topic\"]}}}}"
      }
    }
  }
}
```
以下 [Greengrass CLI](greengrass-cli-component.md) 命令会在核心设备上创建本地部署。  

```
sudo greengrass-cli deployment create \
  --recipeDir recipes \
  --artifactDir artifacts \
  --merge "com.example.HelloWorld=1.0.0" \
  --update-config hello-world-configuration.json
```
`hello-world-configuration.json` 文件包含以下 JSON 文档。  

```
{
  "com.example.HelloWorld": {
    "MERGE": {
      "accessControl": {
        "aws.greengrass.ipc.pubsub": {
          "com.example.HelloWorld:pubsub:1": {
            "policyDescription": "Allows access to publish to test/topic.",
            "operations": [
              "aws.greengrass#PublishToTopic"
            ],
            "resources": [
              "test/topic"
            ]
          }
        }
      }
    }
  }
}
```

## 订阅 IPC 事件流
<a name="ipc-subscribe-operations"></a>

您可以使用 IPC 操作在 Greengrass 核心设备上订阅事件流。要使用订阅操作，请定义*订阅处理程序*并创建对 IPC 服务的请求。然后，每次核心设备将事件消息流式传输到您的组件时，IPC 客户端都会运行订阅处理程序的函数。

您可以关闭订阅以停止处理事件消息。为此，请在您用于打开订阅的订阅操作对象上调用 `closeStream()`（Java）、`close()`（Python）或 `Close()`（C\$1\$1）。

C Amazon IoT Greengrass ore IPC 服务支持以下订阅操作：
+ [SubscribeToTopic](ipc-publish-subscribe.md#ipc-operation-subscribetotopic)
+ [SubscribeToIoTCore](ipc-iot-core-mqtt.md#ipc-operation-subscribetoiotcore)
+ [SubscribeToComponentUpdates](ipc-component-lifecycle.md#ipc-operation-subscribetocomponentupdates)
+ [SubscribeToConfigurationUpdate](ipc-component-configuration.md#ipc-operation-subscribetoconfigurationupdate)
+ [SubscribeToValidateConfigurationUpdates](ipc-component-configuration.md#ipc-operation-subscribetovalidateconfigurationupdates)

**Topics**
+ [定义订阅处理程序](#ipc-define-subscription-handlers)
+ [订阅处理程序示例](#ipc-subscription-handler-examples)

### 定义订阅处理程序
<a name="ipc-define-subscription-handlers"></a>

要定义订阅处理程序，请定义回调函数，处理事件消息、错误和流关闭。如果您使用 IPC 客户端 V1，则必须在类中定义这些函数。如果您使用 IPC 客户端 V2（在 Java 和 Python 的更高版本中可用） SDKs，则无需创建订阅处理程序类即可定义这些函数。

------
#### [ Java ]

如果您使用 IPC 客户端 V1，则必须实现通用`software.amazon.awssdk.eventstreamrpc.StreamResponseHandler<StreamEventType>`接口。 *StreamEventType*是订阅操作的事件消息的类型。定义以下函数以处理事件消息、错误和流关闭。

如果您使用 IPC 客户端 V2，则可以在订阅处理程序类之外定义这些函数，也可以使用 [Lambda 表达式](https://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html)。

`void onStreamEvent(StreamEventType event)`  
IPC 客户端在收到事件消息（例如 MQTT 消息或组件更新通知）时调用的回调。

`boolean onStreamError(Throwable error)`  
IPC 客户端在流错误发生时调用的回调。  
<a name="ipc-subscription-handler-on-stream-error-return-value"></a>如果出现错误，则返回 True 以关闭订阅流，或返回 False 以保持流处于打开状态。

`void onStreamClosed()`  
IPC 客户端在流关闭时调用的回调。

------
#### [ Python ]

如果您使用 IPC 客户端 V1，则必须扩展与订阅操作对应的流响应处理程序类。 Amazon IoT Device SDK 包括每个订阅操作的订阅处理程序类。 *StreamEventType*是订阅操作的事件消息的类型。定义以下函数以处理事件消息、错误和流关闭。

如果您使用 IPC 客户端 V2，则可以在订阅处理程序类之外定义这些函数，也可以使用 [Lambda 表达式](https://docs.python.org/3/tutorial/controlflow.html#lambda-expressions)。

`def on_stream_event(self, event: StreamEventType) -> None`  
IPC 客户端在收到事件消息（例如 MQTT 消息或组件更新通知）时调用的回调。

`def on_stream_error(self, error: Exception) -> bool`  
IPC 客户端在流错误发生时调用的回调。  
<a name="ipc-subscription-handler-on-stream-error-return-value"></a>如果出现错误，则返回 True 以关闭订阅流，或返回 False 以保持流处于打开状态。

`def on_stream_closed(self) -> None`  
IPC 客户端在流关闭时调用的回调。

------
#### [ C\$1\$1 (IPC client V1) ]

实施从与订阅操作对应的流响应处理程序类派生的类。 Amazon IoT Device SDK 包括每个订阅操作的订阅处理程序基类。 *StreamEventType*是订阅操作的事件消息的类型。定义以下函数以处理事件消息、错误和流关闭。

`void OnStreamEvent(StreamEventType *event)`  
IPC 客户端在收到事件消息（例如 MQTT 消息或组件更新通知）时调用的回调。

`bool OnStreamError(OperationError *error)`  
IPC 客户端在流错误发生时调用的回调。  
<a name="ipc-subscription-handler-on-stream-error-return-value"></a>如果出现错误，则返回 True 以关闭订阅流，或返回 False 以保持流处于打开状态。

`void OnStreamClosed()`  
IPC 客户端在流关闭时调用的回调。

------
#### [ JavaScript ]

实施从与订阅操作对应的流响应处理程序类派生的类。 Amazon IoT Device SDK 包括每个订阅操作的订阅处理程序基类。 *StreamEventType*是订阅操作的事件消息的类型。定义以下函数以处理事件消息、错误和流关闭。

`on(event: 'ended', listener: StreamingOperationEndedListener)`  
IPC 客户端在流关闭时调用的回调。

`on(event: 'streamError', listener: StreamingRpcErrorListener)`  
IPC 客户端在流错误发生时调用的回调。  
<a name="ipc-subscription-handler-on-stream-error-return-value"></a>如果出现错误，则返回 True 以关闭订阅流，或返回 False 以保持流处于打开状态。

`on(event: 'message', listener: (message: InboundMessageType) => void)`  
IPC 客户端在收到事件消息（例如 MQTT 消息或组件更新通知）时调用的回调。

------

### 订阅处理程序示例
<a name="ipc-subscription-handler-examples"></a>

以下示例演示如何使用 [SubscribeToTopic](ipc-publish-subscribe.md#ipc-operation-subscribetotopic) 操作和订阅处理程序来订阅本地发布/订阅消息。

------
#### [ Java (IPC client V2) ]

**Example 示例：订阅本地 publish/subscribe 消息**  <a name="ipc-operation-subscribetotopic-example-java-v2"></a>

```
package com.aws.greengrass.docs.samples.ipc;

import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.*;

import java.nio.charset.StandardCharsets;
import java.util.Optional;

public class SubscribeToTopicV2 {

    public static void main(String[] args) {
        String topic = args[0];
        try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) {
            SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic);
            GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse,
                    SubscribeToTopicResponseHandler> response =
                    ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent,
                            Optional.of(SubscribeToTopicV2::onStreamError),
                            Optional.of(SubscribeToTopicV2::onStreamClosed));
            SubscribeToTopicResponseHandler responseHandler = response.getHandler();
            System.out.println("Successfully subscribed to topic: " + topic);

            // Keep the main thread alive, or the process will exit.
            try {
                while (true) {
                    Thread.sleep(10000);
                }
            } catch (InterruptedException e) {
                System.out.println("Subscribe interrupted.");
            }

            // To stop subscribing, close the stream.
            responseHandler.closeStream();
        } catch (Exception e) {
            if (e.getCause() instanceof UnauthorizedError) {
                System.err.println("Unauthorized error while publishing to topic: " + topic);
            } else {
                System.err.println("Exception occurred when using IPC.");
            }
            e.printStackTrace();
            System.exit(1);
        }
    }

    public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) {
        try {
            BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage();
            String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8);
            String topic = binaryMessage.getContext().getTopic();
            System.out.printf("Received new message on topic %s: %s%n", topic, message);
        } catch (Exception e) {
            System.err.println("Exception occurred while processing subscription response " +
                    "message.");
            e.printStackTrace();
        }
    }

    public static boolean onStreamError(Throwable error) {
        System.err.println("Received a stream error.");
        error.printStackTrace();
        return false; // Return true to close stream, false to keep stream open.
    }

    public static void onStreamClosed() {
        System.out.println("Subscribe to topic stream closed.");
    }
}
```

------
#### [ Python (IPC client V2) ]

**Example 示例：订阅本地 publish/subscribe 消息**  <a name="ipc-operation-subscribetotopic-example-python-v2"></a>

```
import sys
import time
import traceback

from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
from awsiot.greengrasscoreipc.model import (
    SubscriptionResponseMessage,
    UnauthorizedError
)


def main():
    args = sys.argv[1:]
    topic = args[0]

    try:
        ipc_client = GreengrassCoreIPCClientV2()
        # Subscription operations return a tuple with the response and the operation.
        _, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event,
                                                     on_stream_error=on_stream_error, on_stream_closed=on_stream_closed)
        print('Successfully subscribed to topic: ' + topic)

        # Keep the main thread alive, or the process will exit.
        try:
            while True:
                time.sleep(10)
        except InterruptedError:
            print('Subscribe interrupted.')

        # To stop subscribing, close the stream.
        operation.close()
    except UnauthorizedError:
        print('Unauthorized error while subscribing to topic: ' +
              topic, file=sys.stderr)
        traceback.print_exc()
        exit(1)
    except Exception:
        print('Exception occurred', file=sys.stderr)
        traceback.print_exc()
        exit(1)


def on_stream_event(event: SubscriptionResponseMessage) -> None:
    try:
        message = str(event.binary_message.message, 'utf-8')
        topic = event.binary_message.context.topic
        print('Received new message on topic %s: %s' % (topic, message))
    except:
        traceback.print_exc()


def on_stream_error(error: Exception) -> bool:
    print('Received a stream error.', file=sys.stderr)
    traceback.print_exc()
    return False  # Return True to close stream, False to keep stream open.


def on_stream_closed() -> None:
    print('Subscribe to topic stream closed.')


if __name__ == '__main__':
    main()
```

------
#### [ C\$1\$1 (IPC client V1) ]

**Example 示例：订阅本地 publish/subscribe 消息**  <a name="ipc-operation-subscribetotopic-example-cpp"></a>

```
#include <iostream>

#include </crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>

using namespace Aws::Crt;
using namespace Aws::Greengrass;

class SubscribeResponseHandler : public SubscribeToTopicStreamHandler {
    public:
        virtual ~SubscribeResponseHandler() {}

    private:
        void OnStreamEvent(SubscriptionResponseMessage *response) override {
            auto jsonMessage = response->GetJsonMessage();
            if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) {
                auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable();
                // Handle JSON message.
            } else {
                auto binaryMessage = response->GetBinaryMessage();
                if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) {
                    auto messageBytes = binaryMessage.value().GetMessage().value();
                    std::string messageString(messageBytes.begin(), messageBytes.end());
                    // Handle binary message.
                }
            }
        }

        bool OnStreamError(OperationError *error) override {
            // Handle error.
            return false; // Return true to close stream, false to keep stream open.
        }

        void OnStreamClosed() override {
            // Handle close.
        }
};

class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
    void OnConnectCallback() override {
        // Handle connection to IPC service.
    }

    void OnDisconnectCallback(RpcError error) override {
        // Handle disconnection from IPC service.
    }

    bool OnErrorCallback(RpcError error) override {
        // Handle IPC service connection error.
        return true;
    }
};

int main() {
    ApiHandle apiHandle(g_allocator);
    Io::EventLoopGroup eventLoopGroup(1);
    Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
    Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
    IpcClientLifecycleHandler ipcLifecycleHandler;
    GreengrassCoreIpcClient ipcClient(bootstrap);
    auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
    if (!connectionStatus) {
        std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
        exit(-1);
    }

    String topic("my/topic");
    int timeout = 10;

    SubscribeToTopicRequest request;
    request.SetTopic(topic);

    //SubscribeResponseHandler streamHandler;
    auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator());
    auto operation = ipcClient.NewSubscribeToTopic(streamHandler);
    auto activate = operation->Activate(request, nullptr);
    activate.wait();

    auto responseFuture = operation->GetResult();
    if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
        std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
        exit(-1);
    }

    auto response = responseFuture.get();
    if (!response) {
        // Handle error.
        auto errorType = response.GetResultType();
        if (errorType == OPERATION_ERROR) {
            auto *error = response.GetOperationError();
            (void)error;
            // Handle operation error.
        } else {
            // Handle RPC error.
        }
        exit(-1);
    }

    // Keep the main thread alive, or the process will exit.
    while (true) {
        std::this_thread::sleep_for(std::chrono::seconds(10));
    }

    operation->Close();
    return 0;
}
```

------
#### [ JavaScript ]

**Example 示例：订阅本地 publish/subscribe 消息**  <a name="ipc-operation-subscribetotopic-example-nodejs"></a>

```
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc";
import {SubscribeToTopicRequest, SubscriptionResponseMessage} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model";
import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc";
 
class SubscribeToTopic {
    private ipcClient : greengrasscoreipc.Client
    private readonly topic : string;
 
    constructor() {
        // define your own constructor, e.g.
        this.topic = "<define_your_topic>";
        this.subscribeToTopic().then(r => console.log("Started workflow"));
    }
 
    private async subscribeToTopic() {
        try {
            this.ipcClient = await getIpcClient();
 
            const subscribeToTopicRequest : SubscribeToTopicRequest = {
                topic: this.topic,
            }
 
            const streamingOperation = this.ipcClient.subscribeToTopic(subscribeToTopicRequest, undefined); // conditionally apply options
 
            streamingOperation.on("message", (message: SubscriptionResponseMessage) => {
                // parse the message depending on your use cases, e.g.
                if(message.binaryMessage && message.binaryMessage.message) {
                    const receivedMessage = message.binaryMessage?.message.toString();
                }
            });
 
            streamingOperation.on("streamError", (error : RpcError) => {
                // define your own error handling logic
            })
 
            streamingOperation.on("ended", () => {
                // define your own logic
            })
 
            await streamingOperation.activate();
 
            // Keep the main thread alive, or the process will exit.
            await new Promise((resolve) => setTimeout(resolve, 10000))
        } catch (e) {
            // parse the error depending on your use cases
            throw e
        }
    }
}
 
export async function getIpcClient(){
    try {
        const ipcClient = greengrasscoreipc.createClient();
        await ipcClient.connect()
            .catch(error => {
                // parse the error depending on your use cases
                throw error;
            });
        return ipcClient
    } catch (err) {
        // parse the error depending on your use cases
        throw err
    }
}
 
// starting point
const subscribeToTopic = new SubscribeToTopic();
```

------
#### [ Rust ]

**Example 示例：订阅本地 publish/subscribe 消息**  

```
use gg_sdk::{Sdk, SubscribeToTopicPayload};
use std::{thread, time::Duration};

fn main() {
    let sdk = Sdk::init();
    sdk.connect().expect("Failed to establish IPC connection");

    let topic = "my/topic";

    let callback = |topic: &str, payload: SubscribeToTopicPayload| match payload
    {
        SubscribeToTopicPayload::Binary(message) => {
            let message = String::from_utf8_lossy(message);
            println!("Received new message on topic {topic}: {message}");
        }
        SubscribeToTopicPayload::Json(_) => {
            println!("Received new message on topic {topic}: (JSON message)");
        }
    };

    let _sub = sdk
        .subscribe_to_topic(topic, &callback)
        .expect("Failed to subscribe to topic");

    println!("Successfully subscribed to topic: {topic}");

    // Keep the main thread alive, or the process will exit.
    loop {
        thread::sleep(Duration::from_secs(10));
    }
}
```

------
#### [ C ]

**Example 示例：订阅本地 publish/subscribe 消息**  

```
#include <assert.h>
#include <gg/error.h>
#include <gg/ipc/client.h>
#include <gg/object.h>
#include <gg/sdk.h>
#include <gg/types.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>

static void on_subscription_response(
    void *ctx, GgBuffer topic, GgObject payload, GgIpcSubscriptionHandle handle
) {
    (void) ctx;
    (void) handle;

    if (gg_obj_type(payload) == GG_TYPE_BUF) {
        GgBuffer message = gg_obj_into_buf(payload);
        printf(
            "Received new message on topic %.*s: %.*s\n",
            (int) topic.len,
            topic.data,
            (int) message.len,
            message.data
        );
    } else {
        assert(gg_obj_type(payload) == GG_TYPE_MAP);
        printf(
            "Received new message on topic %.*s: (JSON message)\n",
            (int) topic.len,
            topic.data
        );
    }
}

int main(void) {
    gg_sdk_init();

    GgError err = ggipc_connect();
    if (err != GG_ERR_OK) {
        fprintf(stderr, "Failed to establish IPC connection.\n");
        exit(-1);
    }

    GgBuffer topic = GG_STR("my/topic");

    GgIpcSubscriptionHandle handle;
    err = ggipc_subscribe_to_topic(
        topic, on_subscription_response, NULL, &handle
    );
    if (err != GG_ERR_OK) {
        fprintf(
            stderr,
            "Failed to subscribe to topic: %.*s\n",
            (int) topic.len,
            topic.data
        );
        exit(-1);
    }

    printf(
        "Successfully subscribed to topic: %.*s\n", (int) topic.len, topic.data
    );

    // Keep the main thread alive, or the process will exit.
    while (1) {
        sleep(10);
    }

    // To stop subscribing, close the stream.
    ggipc_close_subscription(handle);
}
```

------
#### [ C\$1\$1 (Component SDK) ]

**Example 示例：订阅本地 publish/subscribe 消息**  

```
#include <gg/ipc/client.hpp>
#include <gg/object.hpp>
#include <unistd.h>
#include <cassert>
#include <iostream>

class ResponseHandler : public gg::ipc::LocalTopicCallback {
    void operator()(
        std::string_view topic,
        gg::Object payload,
        gg::ipc::Subscription &handle
    ) override {
        (void) handle;
        if (payload.index() == GG_TYPE_BUF) {
            std::cout << "Received new message on topic " << topic << ": "
                      << get<gg::Buffer>(payload) << "\n";
        } else {
            assert(payload.index() == GG_TYPE_MAP);
            std::cout << "Received new message on topic " << topic
                      << ": (JSON message)\n";
        }
    }
};

int main() {
    auto &client = gg::ipc::Client::get();

    auto error = client.connect();
    if (error) {
        std::cerr << "Failed to establish IPC connection.\n";
        exit(-1);
    }

    std::string_view topic = "my/topic";

    static ResponseHandler handler;
    error = client.subscribe_to_topic(topic, handler);
    if (error) {
        std::cerr << "Failed to subscribe to topic: " << topic << "\n";
        exit(-1);
    }

    std::cout << "Successfully subscribed to topic: " << topic << "\n";

    // Keep the main thread alive, or the process will exit.
    while (1) {
        sleep(10);
    }
}
```

------

## IPC 最佳实践
<a name="ipc-best-practices"></a>

在自定义组件中使用 IPC 的最佳实践与在 IPC 客户端 V1 和 IPC 客户端 V2 之间有所不同。请遵循适用于您所使用的 IPC 客户端版本的最佳实践。

------
#### [ IPC client V2 ]

IPC 客户端 V2 在单独的线程中运行回调函数，因此与 IPC 客户端 V1 相比，在使用 IPC 和编写订阅处理程序函数时，您需要遵循的准则较少。
+ <a name="ipc-best-practice-reuse-one-client"></a>**重复使用一个 IPC 客户端**

  创建 IPC 客户端后，请将其保持打开状态并重复用于所有 IPC 操作。创建多个客户端会消耗额外的资源，并可能导致资源泄漏。
+ **处理异常**

  IPC 客户端 V2 记录订阅处理程序函数中未捕获的异常。您应该捕获处理程序函数中的异常，以处理代码中发生的错误。

------
#### [ IPC client V1 ]

IPC 客户端 V1 使用单线程，与 IPC 服务器通信并调用订阅处理程序。在编写订阅处理程序函数时，您必须考虑这种同步行为。
+ <a name="ipc-best-practice-reuse-one-client"></a>**重复使用一个 IPC 客户端**

  创建 IPC 客户端后，请将其保持打开状态并重复用于所有 IPC 操作。创建多个客户端会消耗额外的资源，并可能导致资源泄漏。
+ **异步运行阻止代码**

  当线程被阻止时，IPC 客户端 V1 无法发送新请求或处理新的事件消息。您应该在从处理程序函数运行的独立线程中运行阻止代码。阻塞代码包括`sleep`调用、持续运行的循环以及需要一段时间才能完成的同步 I/O 请求。
+ **异步发送新的 IPC 请求**

  IPC 客户端 V1 无法从订阅处理程序函数中发送新请求，因为如果您等待响应，该请求会阻止处理程序函数。您应该在从处理程序函数运行的独立线程中发送 IPC 请求。
+ **处理异常**

  IPC 客户端 V1 不处理订阅处理函数中未捕获的异常。如果您的处理函数抛出异常，则订阅关闭，并且该异常不会出现在您的组件日志中。您应该捕获处理程序函数中的异常，以确保订阅保持打开状态并记录代码中发生的错误。

------