教程:通过 MQTT 与本地IoT 设备交互 - Amazon IoT Greengrass
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

教程:通过 MQTT 与本地IoT 设备交互

完成本教程后,您可以将核心设备配置为与本地 IoT 设备进行交互,该设备名为Client 设备,它们通过 MQTT 连接到核心设备。在本教程中,将配置Amazon IoT要使用的东西云发现以连接到核心设备。配置云发现时,客户端设备可以向Amazon IoT Greengrass云服务来发现核心设备。来自的响应Amazon IoT Greengrass包括您配置客户端设备以发现的核心设备的连接信息和证书。然后,客户端设备可以使用此信息连接到可通过 MQTT 进行通信的可用核心设备。

在本教程中,您将执行以下操作:

  1. 如有必要,请查看并更新核心设备的权限。

  2. 将客户端设备与核心设备关联,这样他们就可以使用云发现来发现核心设备。

  3. 将 Greengrass 组件部署到核心设备以启用客户端设备支持。

  4. 将客户端设备连接到核心设备并测试与Amazon IoT Core云服务。

  5. 开发可与客户端设备通信的自定义 Greengrass 组件。

  6. 开发与客户端设备交互的自定义组件Amazon IoT设备影子.

本教程使用单核设备和单个客户端设备。您也可以按照教程来连接和测试多个客户端设备。

您预计需要花费 30-60 分钟来完成本教程。

先决条件

要完成本教程,您需要:

  • 一个 Amazon Web Services 账户。如果没有,请参阅设置Amazon Web Services 账户

  • 网络 ACL 和安全组都允许 (因此可到达您的实例) 的发起 ping 的Amazon Identity and Access Management具有管理员权限的 (IAM) 用户。

  • 一款Greengrass 核心设备。有关如何设置核心服务的更多信息,请参阅设置Amazon IoT Greengrass核心设备.

    • 核心设备必须运行 Greengrass nucleus v2.6.0 或更高版本。此版本支持本地发布/订阅通信中的通配符,以及对客户端设备影子的支持。

      注意

      客户端设备支持需要 Greengrass nucleus v2.2.0 或更高版本。但是,本教程将探讨更新的功能,例如在本地发布/订阅中支持 MQTT 通配符以及对客户端设备影子的支持。这些功能需要 Greengrass Nucleus v2.6.0 或更高版本。

    • 要进行连接,核心设备必须与客户端设备位于同一网络中。

    • (可选)要完成开发自定义 Greengrass 组件的模块,核心设备必须运行 Greengrass CLI。有关更多信息,请参阅 安装 Greengrass CL

  • 网络 ACL 和安全组都允许 (因此可到达您的实例) 的发起 ping 的Amazon IoT在本教程中作为客户端设备进行连接的东西。有关更多信息,请参阅 。CreateAmazon IoT资源中的Amazon IoT Core开发人员指南.

    • 客户端设备的Amazon IoT策略必须允许greengrass:Discover权限。有关更多信息,请参阅 最低Amazon IoT客户端设备的策略

    • 客户端设备必须与核心设备位于同一网络中。

    • 客户端设备必须运行Python 3.

    • 客户端设备必须运行饭桶.

第 1 步:审核和更新核心设备Amazon IoT政策

要支持客户端设备,核心设备的Amazon IoT策略必须提供以下权限:

  • greengrass:PutCertificateAuthorities

  • greengrass:VerifyClientDeviceIdentity

  • greengrass:VerifyClientDeviceIoTCertificateAssociation

  • greengrass:GetConnectivityInfo

  • greengrass:UpdateConnectivityInfo—(可选)需要此权限才能使用IP 探测器组件,它将核心设备的网络连接信息报告给Amazon IoT Greengrass云服务。

有关这些权限的更多信息,Amazon IoT核心设备策略,请参阅数据层面操作的 Amazon IoT 策略最低Amazon IoT支持客户端设备的策略.

在本节中,您将查看Amazon IoT策略并添加缺少的所有必需权限。如果您将Amazon IoT Greengrass用于配置资源的核心软件安装程序,您的核心设备有一个Amazon IoT允许访问所有访问的策略Amazon IoT Greengrass动作 (greengrass:*)。在此情况下,您必须更新Amazon IoT只有在计划将卷影管理器组件配置为与同步设备影子时才使用策略Amazon IoT Core. 否则,您可以跳过此部分。

查看和更新核心设备的Amazon IoT政策

  1. Amazon IoT Greengrass控制台导航菜单,选择核心设备.

  2. 在存储库的核心设备页面上,选择要更新的核心设备。

  3. 在核心设备详细信息页面上,选择指向核心设备的Thing. 此链接将打开事物详细信息页面Amazon IoT控制台。

  4. 在事物详细信息页中,选择证书.

  5. 证书选项卡上,选择事物的活动证书。

  6. 在证书详细信息页中,选择策略.

  7. 策略选项卡上,选择Amazon IoT要审查和更新的政策。您可以将所需权限添加到附加到核心设备的活动证书的任何策略中。

    注意

    如果您将Amazon IoT Greengrass用于配置资源的核心软件安装程序,你有两个Amazon IoT政策。建议您选择名为的策略GreengrassV2IoTThingPolicy,如果它存在。默认情况下,使用快速安装程序创建的核心设备使用此策略名称。如果您向此策略添加权限,则还会将这些权限授予使用此策略的其他核心设备。

  8. 在策略概述中,选择编辑活动版本.

  9. 查看策略以获取所需权限,并添加任何缺少的必需权限。

  10. 要将新策略版本设置为活动版本,请在策略版本状态,SELECT将编辑的版本设置为此策略的活动版本.

  11. 选择另存为新版本.

第 2 步:启用客户端设备支持

要使客户端设备使用云发现连接到核心设备,您必须关联这些设备。将客户端设备与核心设备关联时,可以使该客户端设备检索核心设备的 IP 地址和证书以用于连接。

使客户端设备能够安全地连接到核心设备并与Greengrass组件进行通信Amazon IoT Core,则将以下 Greengrass 组件部署到核心设备:

  • Clifiguration (aws.greengrass.clientdevices.Auth)

    部署客户端设备身份验证组件以对客户端设备进行身份验证并授权客户端设备操作。这个组件允许你的Amazon IoT连接到核心设备的东西。

    这个组件需要一些配置才能使用它。您必须指定客户端设备组以及每个组有权执行的操作,例如通过 MQTT 进行连接和通信。有关更多信息,请参阅 。客户端设备身份验证组件配置.

  • MQTT 3.1.1 经纪商(Moquette) (aws.greengrass.clientdevices.mqtt.Moquette)

    部署 Moquette MQTT 代理组件以运行轻量级 MQTT 代理。Moquette MQTT 代理符合 MQTT 3.1.1,包括对 QoS 0、QoS 1、QoS 2、保留消息、遗嘱消息和持久订阅的本地支持。

    您无需配置此组件即可使用它。但是,您可以配置此组件操作 MQTT 代理的端口。默认情况下,它使用端口 8883。

  • MQTT大桥 (aws.greengrass.clientdevices.mqtt.Bridge)

    (可选)部署 MQTT 桥组件以在客户端设备(本地 MQTT)、本地发布/订阅和Amazon IoT CoreMQTT. 将此组件配置为与同步客户端设备Amazon IoT Core并与来自 Greengrass 组件的客户端设备进行交互。

    此组件需要配置才能使用。您必须指定此组件中继消息的主题映射。有关更多信息,请参阅 。MQTT 桥接组件配置.

  • IP 探测器 (aws.greengrass.clientdevices.IPDetector)

    (可选)部署 IP 检测器组件以将核心设备的 MQTT 代理端点自动报告给Amazon IoT Greengrass云服务。如果网络设置复杂,例如路由器将 MQTT 代理端口转发到核心设备,则无法使用此组件。

    您无需配置此组件即可使用它。

在本部分中,您将使用Amazon IoT Greengrass控制台以关联客户端设备并将客户端设备组件部署到核心设备。

启用客户端设备支持

  1. 导航到 Amazon IoT Greengrass 控制台

  2. 在左侧导航菜单中,选择核心设备.

  3. 在存储库的核心设备页面上,选择要在其中启用客户端设备支持的核心设备。

  4. 在核心设备详细信息页面上,选择Client 设备选项卡。

  5. 在存储库的Client 设备选项卡上,选择配置云发现.

    这些区域有:配置核心设备发现页面随即打开。在此页面上,您可以将客户端设备与核心设备关联并部署客户端设备组件。此页面在中为您选择核心设备第 1 步:选择目标核心设备.

    注意

    您还可以使用此页面为事物组配置核心设备发现。如果选择此选项,则可以将客户端设备组件部署到事物组中的所有核心设备。但是,如果选择此选项,则必须在稍后创建部署后手动将客户端设备与每个核心设备关联。在本教程中,您将配置单核设备。

  6. In步骤 2: 关联客户端设备,关联客户端设备的Amazon IoT核心设备上的东西。这使客户端设备能够使用云发现来检索核心设备的连接信息和证书。执行以下操作:

    1. 选择关联客户端设备.

    2. 将客户端设备与核心设备关联modal,输入Amazon IoT要关联的东西。

    3. 选择 Add(添加)。

    4. 选择 Associate

  7. In步骤 3: 配置和部署 Greengrass 组件中,部署组件以启用客户端设备支持。如果目标核心设备具有以前的部署,则此页面将修改该部署。否则,此页面将为核心设备创建新的部署。要配置和部署客户端设备组件,请执行以下操作:

    1. 核心设备必须运行Greengrass 核v2.6.0 或更高版本完成本教程。如果核心设备运行的是早期版本,请执行以下操作:

      1. 选中该框以部署aws.greengrass.Nucleus组件。

      2. 对于aws.greengrass.Nucleus组件,选择编辑配置.

      3. 适用于组件版本,选择 Version 2.0 或更高版本。

      4. 选择 Confirm(确认)。

      注意

      如果你从较早的次要版本升级 Greengrass 核,并且核心设备运行Amazon-提供的组件依赖于原子核的,您还必须更新Amazon-为较新版本提供的组件。在本教程稍后复习部署时,您可以配置这些组件的版本。有关更多信息,请参阅 更新Amazon IoT Greengrass核心软件 (OTA)

    2. 对于aws.greengrass.clientdevices.Auth组件,选择编辑配置.

    3. 编辑配置modal 对于客户端设备身份验证组件,配置授权策略,允许客户端设备在核心设备上发布和订阅 MQTT 代理。执行以下操作:

      1. UNDER配置,在要合并的配置code block,输入以下配置,其中包含客户端设备授权策略. 每个设备组授权策略都指定了一组操作以及客户端设备可以对其执行这些操作的资源。

        • 此策略允许名称以开头的客户端设备MyClientDevice就所有 MQTT 主题进行联系和交流。ReplaceMyClientDevice*NameAmazon IoT东西作为客户端设备进行连接。此外,您还可以使用*与客户端设备名称匹配的通配符。这些区域有:*通配符必须位于名称末端。

          如果您有第二台客户端设备要连接,请将MyOtherClientDevice*使用该客户端设备的名称,或者使用与该客户端设备名称匹配的通配符模式。否则,您可以删除或保留选择规则的这一部分,该部分允许名称匹配的客户端设备MyOtherClientDevice*进行连接和通信。

        • 此策略使用OR运算符也允许名称以开头的客户端设备MyOtherClientDevice就所有 MQTT 主题进行联系和交流。您可以在选择规则中删除此子句,也可以修改它以匹配要连接的客户端设备。

        • 此策略允许客户端设备发布和订阅所有 MQTT 主题。要遵循最佳安全实践,请限制mqtt:publishmqtt:subscribe操作仅限于客户端设备用于通信的最小主题集。

        { "deviceGroups": { "formatVersion": "2021-03-05", "definitions": { "MyDeviceGroup": { "selectionRule": "thingName: MyClientDevice* OR thingName: MyOtherClientDevice*", "policyName": "MyClientDevicePolicy" } }, "policies": { "MyClientDevicePolicy": { "AllowConnect": { "statementDescription": "Allow client devices to connect.", "operations": [ "mqtt:connect" ], "resources": [ "*" ] }, "AllowPublish": { "statementDescription": "Allow client devices to publish to all topics.", "operations": [ "mqtt:publish" ], "resources": [ "*" ] }, "AllowSubscribe": { "statementDescription": "Allow client devices to subscribe to all topics.", "operations": [ "mqtt:subscribe" ], "resources": [ "*" ] } } } } }

        有关更多信息,请参阅 。客户端设备身份验证组件配置.

      2. 选择 Confirm(确认)。

    4. 对于aws.greengrass.clientdevices.mqtt.Bridge组件,选择编辑配置.

    5. 编辑配置modal 对于 MQTT 桥接组件,配置主题映射,将 MQTT 消息从客户端设备中继到Amazon IoT Core. 执行以下操作:

      1. UNDER配置,在要合并的配置代码块,输入以下配置。此配置指定在clients/+/hello/world主题过滤器从客户端设备到Amazon IoT Core云服务。例如,此主题筛选器与clients/MyClientDevice1/hello/world主题。

        { "mqttTopicMapping": { "HelloWorldIotCoreMapping": { "topic": "clients/+/hello/world", "source": "LocalMqtt", "target": "IotCore" } } }

        有关更多信息,请参阅 。MQTT 桥接组件配置.

      2. 选择 Confirm(确认)。

  8. 选择审核和部署以查看此页面为您创建的部署。

  9. 如果您之前未设置Greengrass 服务角色在此区域中,控制台打开一个模组,为您设置服务角色。客户端设备身份验证组件使用此服务角色来验证客户端设备的身份,而 IP 检测器组件使用此服务角色来管理核心设备连接信息。选择 Grant permissions(授予权限)。

  10. 在存储库的审核页面上,选择部署以开始部署到核心设备。

  11. 要验证部署是否成功,请检查部署状态,并检查核心设备上的日志。要查看核心设备上的部署状态,您可以选择目标在部署中概述. 有关更多信息,请参阅下列内容:

第 3 步:Connect 客户端设备

客户端设备可以使用Amazon IoT Device SDK发现、连接核心设备并与之通信。在本节中,您将安装Amazon IoT Device SDKv2然后运行 Greengrass 发现示例应用程序Amazon IoT Device SDK.

注意

这些区域有:Amazon IoT Device SDK还提供有其他编程语言版本。本教程使用Amazon IoT Device SDKv2 for Python,但你可以根据自己的用例探索其他 SDK。有关更多信息,请参阅 。Amazon IoT设备开发工具包中的Amazon IoT Core开发人员指南.

将客户端设备连接到核心设备

  1. 下载并安装Amazon IoT Device SDKv2转到Amazon IoT东西作为客户端设备进行连接。

    在客户端设备上,执行以下操作:

    1. 克隆的Amazon IoT Device SDKv2 让 Python 存储库下载它。

      git clone https://github.com/aws/aws-iot-device-sdk-python-v2.git
    2. 安装Amazon IoT Device SDKpython 的 v2。

      python3 -m pip install --user ./aws-iot-device-sdk-python-v2
  2. 切换到样本文件夹Amazon IoT Device SDKpython 的 v2。

    cd aws-iot-device-sdk-python-v2/samples
  3. 运行示例 Greengrass 发现应用程序。此应用程序需要参数来指定客户端设备事物名称、要使用的 MQTT 主题和消息,以及对连接进行身份验证和保护的证书。以下示例将向发送 “Hello World” 消息clients/MyClientDevice1/hello/world主题。

    注意

    本主题与您配置 MQTT 网桥以将消息中继到的主题相匹配Amazon IoT Core早些时候。

    • ReplaceMyClientDevice1使用客户端设备的事物名称。

    • Replace~/certs/AmazonRootca1.pem使用客户端设备上的 Amazon 根 CA 证书的路径。

    • Replace~/certs/device.pem.crt带有指向客户端设备上的设备证书的路径。

    • Replace~/certs/private.pem.key使用客户端设备上的私有密钥匙文件路径。

    • Replaceus-east-1用Amazon您的客户端设备和核心设备所在的区域。

    python3 basic_discovery.py \ --thing_name MyClientDevice1 \ --topic 'clients/MyClientDevice1/hello/world' \ --message 'Hello World!' \ --ca_file ~/certs/AmazonRootCA1.pem \ --cert ~/certs/device.pem.crt \ --key ~/certs/private.pem.key \ --region us-east-1 \ --verbosity Warn

    发现示例应用程序发送 10 次消息并断开连接。它还订阅了它发布消息的同一主题。如果输出表明应用程序收到了关于该主题的 MQTT 消息,则客户端设备可以成功与核心设备通信。

    Performing greengrass discovery... awsiot.greengrass_discovery.DiscoverResponse(gg_groups=[awsiot.greengrass_discovery.GGGroup(gg_group_id='greengrassV2-coreDevice-MyGreengrassCore', cores=[awsiot.greengrass_discovery.GGCore(thing_arn='arn:aws:iot:us-east-1:123456789012:thing/MyGreengrassCore', connectivity=[awsiot.greengrass_discovery.ConnectivityInfo(id='203.0.113.0', host_address='203.0.113.0', metadata='', port=8883)])], certificate_authorities=['-----BEGIN CERTIFICATE-----\nMIICiT...EXAMPLE=\n-----END CERTIFICATE-----\n'])]) Trying core arn:aws:iot:us-east-1:123456789012:thing/MyGreengrassCore at host 203.0.113.0 port 8883 Connected! Published topic clients/MyClientDevice1/hello/world: {"message": "Hello World!", "sequence": 0} Publish received on topic clients/MyClientDevice1/hello/world b'{"message": "Hello World!", "sequence": 0}' Published topic clients/MyClientDevice1/hello/world: {"message": "Hello World!", "sequence": 1} Publish received on topic clients/MyClientDevice1/hello/world b'{"message": "Hello World!", "sequence": 1}' ... Published topic clients/MyClientDevice1/hello/world: {"message": "Hello World!", "sequence": 9} Publish received on topic clients/MyClientDevice1/hello/world b'{"message": "Hello World!", "sequence": 9}'

    相反,如果应用程序输出错误,请参阅排除 Greengrass 发现问题.

    您还可以查看核心设备上的 Greengrass 日志,以验证客户端设备是否成功连接并发送消息。有关更多信息,请参阅 显示器Amazon IoT Greengrass圆木

  4. 验证 MQTT 网桥是否将来自客户端设备的消息中继到Amazon IoT Core. 您可以在 MQTT 测试客户端中使用 MQTT 测试客户端Amazon IoT Core控制台订阅 MQTT 主题筛选条件。执行以下操作:

    1. 导航到 Amazon IoT 控制台

    2. 在左侧导航菜单的下测试,选择MQTT 测试客户端.

    3. 在存储库的订阅主题选项卡,主题筛选条件, 输入clients/+/hello/world订阅来自核心服务的客户端设备消息。

    4. 选择 Subscribe

    5. 再次在客户端设备上运行发布/订阅应用程序。

      MQTT 测试客户端显示您从客户端设备发送的与该主题筛选器匹配的主题的消息。

第 4 步:开发与客户端设备通信的组件

您可以开发与客户端设备通信的 Greengrass 组件。组件使用进程间通信 (IPC)本地发布/订阅界面在核心设备上进行通信。要与客户端设备交互,请将 MQTT 桥组件配置为在客户端设备和本地发布/订阅接口之间中继消息。

在本节中,您将更新 MQTT 桥组件以将消息从客户端设备中继到本地发布/订阅界面。然后,开发一个订阅这些消息并在收到消息时打印这些消息的组件。

开发与客户端设备通信的组件

  1. 修改对核心设备的部署,并配置 MQTT 桥接组件以将消息从客户端设备中继到本地发布/订阅。执行以下操作:

    1. 导航到 Amazon IoT Greengrass 控制台

    2. 在左侧导航菜单中,选择核心设备.

    3. 在存储库的核心设备页面上,选择在本教程中使用的核心设备。

    4. 在核心设备详细信息页面上,选择Client 设备选项卡。

    5. 在存储库的Client 设备选项卡上,选择配置云发现.

      这些区域有:配置核心设备发现页面随即打开。在此页面上,您可以更改或配置将哪些客户端设备组件部署到核心设备。

    6. In步骤 3,用于aws.greengrass.clientdevices.mqtt.Bridge组件,选择编辑配置.

    7. 编辑配置modal 对于 MQTT 桥接组件,配置主题映射,将 MQTT 消息从客户端设备中继到本地发布/订阅界面。执行以下操作:

      1. UNDER配置,在要合并的配置代码块,输入以下配置。此配置指定在匹配的主题上中继 MQTT 消息clients/+/hello/world主题过滤器从客户端设备到Amazon IoT Core云服务和当地的 Greengrass 发布/订阅经纪商。

        { "mqttTopicMapping": { "HelloWorldIotCoreMapping": { "topic": "clients/+/hello/world", "source": "LocalMqtt", "target": "IotCore" }, "HelloWorldPubsubMapping": { "topic": "clients/+/hello/world", "source": "LocalMqtt", "target": "Pubsub" } } }

        有关更多信息,请参阅 。MQTT 桥接组件配置.

      2. 选择 Confirm(确认)。

    8. 选择审核和部署以查看此页面为您创建的部署。

    9. 在存储库的审核页面上,选择部署以开始部署到核心设备。

    10. 要验证部署是否成功,请检查部署状态,并检查核心设备上的日志。要查看核心设备上的部署状态,您可以选择目标在部署中概述. 有关更多信息,请参阅下列内容:

  2. 开发和部署一个订阅来自客户端设备的 Hello World 消息的 Greengrass 组件。执行以下操作:

    1. 为核心设备上的配方和工件创建文件夹。

      Linux or Unix
      mkdir recipes mkdir -p artifacts/com.example.clientdevices.MyHelloWorldSubscriber/1.0.0
      Windows Command Prompt (CMD)
      mkdir recipes mkdir artifacts\com.example.clientdevices.MyHelloWorldSubscriber\1.0.0
      PowerShell
      mkdir recipes mkdir artifacts\com.example.clientdevices.MyHelloWorldSubscriber\1.0.0
      重要

      您必须使用以下格式作为项目文件夹路径。包括您在处方中指定的组件名称和版本。

      artifacts/componentName/componentVersion/
    2. 使用文本编辑器创建包含以下内容的组件配方。此配方指定安装Amazon IoT Device SDKv2 for Python,然后运行订阅主题并打印消息的脚本。

      例如,在基于 Linux 的系统上,你可以运行以下命令来使用 GNU nano 来创建文件。

      nano recipes/com.example.clientdevices.MyHelloWorldSubscriber-1.0.0.json

      复制以下配方到文件中。

      { "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.clientdevices.MyHelloWorldSubscriber", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to Hello World messages from client devices.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.clientdevices.MyHelloWorldSubscriber:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "Install": "python3 -m pip install --user awsiotsdk", "Run": "python3 -u {artifacts:path}/hello_world_subscriber.py" } }, { "Platform": { "os": "windows" }, "Lifecycle": { "Install": "py -3 -m pip install --user awsiotsdk", "Run": "py -3 -u {artifacts:path}/hello_world_subscriber.py" } } ] }
    3. 使用文本编辑器创建名为的 Python 脚本工件hello_world_subscriber.py包含下列内容。此应用程序使用发布/订阅 IPC 服务来订阅clients/+/hello/world主题和打印它收到的消息。

      例如,在基于 Linux 的系统上,你可以运行以下命令来使用 GNU nano 来创建文件。

      nano artifacts/com.example.clientdevices.MyHelloWorldSubscriber/1.0.0/hello_world_subscriber.py

      复制以下 Python 代码复制到文件中。

      import sys import time import traceback from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 CLIENT_DEVICE_HELLO_WORLD_TOPIC = 'clients/+/hello/world' TIMEOUT = 10 def on_hello_world_message(event): try: message = str(event.binary_message.message, 'utf-8') print('Received new message: %s' % message) except: traceback.print_exc() try: ipc_client = GreengrassCoreIPCClientV2() # SubscribeToTopic returns a tuple with the response and the operation. _, operation = ipc_client.subscribe_to_topic( topic=CLIENT_DEVICE_HELLO_WORLD_TOPIC, on_stream_event=on_hello_world_message) print('Successfully subscribed to topic: %s' % CLIENT_DEVICE_HELLO_WORLD_TOPIC) # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') operation.close() except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)
      注意

      此组件使用 IPC 客户端 V2 在Amazon IoT Device SDKv2以与Amazon IoT Greengrass核心软件。与原始 IPC 客户端相比,IPC 客户端 V2 减少了在自定义组件中使用 IPC 所需编写的代码量。

    4. 使用 Greengrass CLI 部署组件。

      Linux or Unix
      sudo /greengrass/v2/bin/greengrass-cli deployment create \ --recipeDir recipes \ --artifactDir artifacts \ --merge "com.example.clientdevices.MyHelloWorldSubscriber=1.0.0"
      Windows Command Prompt (CMD)
      C:\greengrass\v2/bin/greengrass-cli deployment create ^ --recipeDir recipes ^ --artifactDir artifacts ^ --merge "com.example.clientdevices.MyHelloWorldSubscriber=1.0.0"
      PowerShell
      C:\greengrass\v2/bin/greengrass-cli deployment create ` --recipeDir recipes ` --artifactDir artifacts ` --merge "com.example.clientdevices.MyHelloWorldSubscriber=1.0.0"
  3. 查看组件日志以验证组件是否成功安装并订阅了主题。

    Linux or Unix
    sudo tail -f /greengrass/v2/logs/com.example.clientdevices.MyHelloWorldSubscriber.log
    PowerShell
    gc C:\greengrass\v2/logs/com.example.clientdevices.MyHelloWorldSubscriber.log -Tail 10 -Wait

    您可以保持日志馈送处于打开状态,以验证核心设备是否收到消息。

  4. 在客户端设备上,再次运行示例 Greengrass 发现应用程序,将消息发送到核心设备。

    python3 basic_discovery.py \ --thing_name MyClientDevice1 \ --topic 'clients/MyClientDevice1/hello/world' \ --message 'Hello World!' \ --ca_file ~/certs/AmazonRootCA1.pem \ --cert ~/certs/device.pem.crt \ --key ~/certs/private.pem.key \ --region us-east-1 \ --verbosity Warn
  5. 再次查看组件日志以验证组件是否接收并打印来自客户端设备的消息。

    Linux or Unix
    sudo tail -f /greengrass/v2/logs/com.example.clientdevices.MyHelloWorldSubscriber.log
    PowerShell
    gc C:\greengrass\v2/logs/com.example.clientdevices.MyHelloWorldSubscriber.log -Tail 10 -Wait

第 5 步:开发与客户端设备影子交互的组件

你可以开发与客户端设备交互的 Greengrass 组件Amazon IoT设备影子. 一个影子是一个 JSON 文档,用于存储当前或所需状态信息Amazon IoT东西,例如客户端设备。自定义组件可以访问客户端设备的影子以管理其状态,即使客户端设备未连接到Amazon IoT. EACHAmazon IoT事物有一个未命名的阴影,你也可以为每个事物创建多个命名阴影。

在本部分中,您将部署影子管理器组件来管理核心设备上的阴影。您还可以更新 MQTT 桥组件,以便在客户端设备和影子管理器组件之间中继影子消息。然后,开发一个更新客户端设备影子的组件,并在客户端设备上运行一个示例应用程序来响应来自该组件的影子更新。此组件代表智能灯光管理应用程序,其中核心设备管理作为客户端设备连接到其的智能灯的颜色状态。

开发与客户端设备影子交互的组件

  1. 修改对核心设备的部署以部署影子管理器组件,并将 MQTT 桥组件配置为在客户端设备和本地发布/订阅(其中影子管理器进行通信)之间中继影子消息。执行以下操作:

    1. 导航到 Amazon IoT Greengrass 控制台

    2. 在左侧导航菜单中,选择核心设备.

    3. 在存储库的核心设备页面上,选择在本教程中使用的核心设备。

    4. 在核心设备详细信息页面上,选择Client 设备选项卡。

    5. 在存储库的Client 设备选项卡上,选择配置云发现.

      这些区域有:配置核心设备发现页面随即打开。在此页面上,您可以更改或配置将哪些客户端设备组件部署到核心设备。

    6. In步骤 3,用于aws.greengrass.clientdevices.mqtt.Bridge组件,选择编辑配置.

    7. 编辑配置modal 对于 MQTT 桥接组件,配置一个主题映射,将 MQTT 消息传递到设备影子主题在客户端设备和本地发布/订阅界面之间。您还要确认部署指定了兼容的 MQTT 桥版本。客户端设备影子支持需要 MQTT bridge v2.2.0 或更高版本。执行以下操作:

      1. 适用于组件版本,选择 Version 2.0 或更高版本。

      2. UNDER配置,在要合并的配置代码块,输入以下配置。此配置指定在影子主题上中继 MQTT 消息。

        { "mqttTopicMapping": { "HelloWorldIotCoreMapping": { "topic": "clients/+/hello/world", "source": "LocalMqtt", "target": "IotCore" }, "HelloWorldPubsubMapping": { "topic": "clients/+/hello/world", "source": "LocalMqtt", "target": "Pubsub" }, "ShadowsLocalMqttToPubsub": { "topic": "$aws/things/+/shadow/#", "source": "LocalMqtt", "target": "Pubsub" }, "ShadowsPubsubToLocalMqtt": { "topic": "$aws/things/+/shadow/#", "source": "Pubsub", "target": "LocalMqtt" } } }

        有关更多信息,请参阅 。MQTT 桥接组件配置.

      3. 选择 Confirm(确认)。

    8. In步骤 3,选择aws.greengrass.ShadowManager组件来部署它。

    9. 选择审核和部署以查看此页面为您创建的部署。

    10. 在存储库的审核页面上,选择部署以开始部署到核心设备。

    11. 要验证部署是否成功,请检查部署状态,并检查核心设备上的日志。要查看核心设备上的部署状态,您可以选择目标在部署中概述. 有关更多信息,请参阅下列内容:

  2. 开发和部署管理智能灯客户端设备的 Greengrass 组件。执行以下操作:

    1. 在核心设备上创建组件工件的文件夹。

      Linux or Unix
      mkdir -p artifacts/com.example.clientdevices.MyHelloWorldSubscriber/1.0.0
      Windows Command Prompt (CMD)
      mkdir artifacts\com.example.clientdevices.MyHelloWorldSubscriber\1.0.0
      PowerShell
      mkdir artifacts\com.example.clientdevices.MyHelloWorldSubscriber\1.0.0
      重要

      您必须使用以下格式作为项目文件夹路径。包括您在处方中指定的组件名称和版本。

      artifacts/componentName/componentVersion/
    2. 使用文本编辑器创建包含以下内容的组件配方。此配方指定安装Amazon IoT Device SDKv2 for Python,并运行一个脚本与智能灯光客户端设备的阴影进行交互以管理其颜色。

      例如,在基于 Linux 的系统上,你可以运行以下命令来使用 GNU nano 来创建文件。

      nano recipes/com.example.clientdevices.MySmartLightManager-1.0.0.json

      复制以下配方到文件中。

      { "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.clientdevices.MySmartLightManager", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that interacts with smart light client devices.", "ComponentPublisher": "Amazon", "ComponentDependencies": { "aws.greengrass.Nucleus": { "VersionRequirement": "^2.6.0" }, "aws.greengrass.ShadowManager": { "VersionRequirement": "^2.2.0" }, "aws.greengrass.clientdevices.mqtt.Bridge": { "VersionRequirement": "^2.2.0" } }, "ComponentConfiguration": { "DefaultConfiguration": { "smartLightDeviceNames": [], "accessControl": { "aws.greengrass.ShadowManager": { "com.example.clientdevices.MySmartLightManager:shadow:1": { "policyDescription": "Allows access to client devices' unnamed shadows", "operations": [ "aws.greengrass#GetThingShadow", "aws.greengrass#UpdateThingShadow" ], "resources": [ "$aws/things/MyClientDevice*/shadow" ] } }, "aws.greengrass.ipc.pubsub": { "com.example.clientdevices.MySmartLightManager:pubsub:1": { "policyDescription": "Allows access to client devices' unnamed shadow updates", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "$aws/things/+/shadow/update/accepted" ] } } } } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "Install": "python3 -m pip install --user awsiotsdk", "Run": "python3 -u {artifacts:path}/smart_light_manager.py" } }, { "Platform": { "os": "windows" }, "Lifecycle": { "Install": "py -3 -m pip install --user awsiotsdk", "Run": "py -3 -u {artifacts:path}/smart_light_manager.py" } } ] }
    3. 使用文本编辑器创建名为的 Python 脚本工件smart_light_manager.py包含下列内容。此应用程序使用影子 IPC 服务获取和更新客户端设备影子,使用本地发布/订阅 IPC 服务来接收报告的影子更新。

      例如,在基于 Linux 的系统上,你可以运行以下命令来使用 GNU nano 来创建文件。

      nano artifacts/com.example.clientdevices.MySmartLightManager/1.0.0/smart_light_manager.py

      复制以下 Python 代码复制到文件中。

      import json import random import sys import time import traceback from uuid import uuid4 from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ResourceNotFoundError SHADOW_COLOR_PROPERTY = 'color' CONFIGURATION_CLIENT_DEVICE_NAMES = 'smartLightDeviceNames' COLORS = ['red', 'orange', 'yellow', 'green', 'blue', 'purple'] SHADOW_UPDATE_TOPIC = '$aws/things/+/shadow/update/accepted' SET_COLOR_INTERVAL = 15 class SmartLightDevice(): def __init__(self, client_device_name: str, reported_color: str = None): self.name = client_device_name self.reported_color = reported_color self.desired_color = None class SmartLightDeviceManager(): def __init__(self, ipc_client: GreengrassCoreIPCClientV2): self.ipc_client = ipc_client self.devices = {} self.client_tokens = set() self.shadow_update_accepted_subscription_operation = None self.client_device_names_configuration_subscription_operation = None self.update_smart_light_device_list() def update_smart_light_device_list(self): # Update the device list from the component configuration. response = self.ipc_client.get_configuration( key_path=[CONFIGURATION_CLIENT_DEVICE_NAMES]) # Identify the difference between the configuration and the currently tracked devices. current_device_names = self.devices.keys() updated_device_names = response.value[CONFIGURATION_CLIENT_DEVICE_NAMES] added_device_names = set(updated_device_names) - set(current_device_names) removed_device_names = set(current_device_names) - set(updated_device_names) # Stop tracking any smart light devices that are no longer in the configuration. for name in removed_device_names: print('Removing %s from smart light device manager' % name) self.devices.pop(name) # Start tracking any new smart light devices that are in the configuration. for name in added_device_names: print('Adding %s to smart light device manager' % name) device = SmartLightDevice(name) device.reported_color = self.get_device_reported_color(device) self.devices[name] = device print('Current color for %s is %s' % (name, device.reported_color)) def get_device_reported_color(self, smart_light_device): try: response = self.ipc_client.get_thing_shadow( thing_name=smart_light_device.name, shadow_name='') shadow = json.loads(str(response.payload, 'utf-8')) if 'reported' in shadow['state']: return shadow['state']['reported'].get(SHADOW_COLOR_PROPERTY) return None except ResourceNotFoundError: return None def request_device_color_change(self, smart_light_device, color): # Generate and track a client token for the request. client_token = str(uuid4()) self.client_tokens.add(client_token) # Create a shadow payload, which must be a blob. payload_json = { 'state': { 'desired': { SHADOW_COLOR_PROPERTY: color } }, 'clientToken': client_token } payload = bytes(json.dumps(payload_json), 'utf-8') self.ipc_client.update_thing_shadow( thing_name=smart_light_device.name, shadow_name='', payload=payload) smart_light_device.desired_color = color def subscribe_to_shadow_update_accepted_events(self): if self.shadow_update_accepted_subscription_operation == None: # SubscribeToTopic returns a tuple with the response and the operation. _, self.shadow_update_accepted_subscription_operation = self.ipc_client.subscribe_to_topic( topic=SHADOW_UPDATE_TOPIC, on_stream_event=self.on_shadow_update_accepted_event) print('Successfully subscribed to shadow update accepted topic') def close_shadow_update_accepted_subscription(self): if self.shadow_update_accepted_subscription_operation is not None: self.shadow_update_accepted_subscription_operation.close() def on_shadow_update_accepted_event(self, event): try: message = str(event.binary_message.message, 'utf-8') accepted_payload = json.loads(message) # Check for reported states from smart light devices and ignore desired states from components. if 'reported' in accepted_payload['state']: # Process this update only if it uses a client token created by this component. client_token = accepted_payload.get('clientToken') if client_token is not None and client_token in self.client_tokens: self.client_tokens.remove(client_token) shadow_state = accepted_payload['state']['reported'] if SHADOW_COLOR_PROPERTY in shadow_state: reported_color = shadow_state[SHADOW_COLOR_PROPERTY] topic = event.binary_message.context.topic client_device_name = topic.split('/')[2] if client_device_name in self.devices: # Set the reported color for the smart light device. self.devices[client_device_name].reported_color = reported_color print( 'Received shadow update confirmation from client device: %s' % client_device_name) else: print("Shadow update doesn't specify color") except: traceback.print_exc() def subscribe_to_client_device_name_configuration_updates(self): if self.client_device_names_configuration_subscription_operation == None: # SubscribeToConfigurationUpdate returns a tuple with the response and the operation. _, self.client_device_names_configuration_subscription_operation = self.ipc_client.subscribe_to_configuration_update( key_path=[CONFIGURATION_CLIENT_DEVICE_NAMES], on_stream_event=self.on_client_device_names_configuration_update_event) print( 'Successfully subscribed to configuration updates for smart light device names') def close_client_device_names_configuration_subscription(self): if self.client_device_names_configuration_subscription_operation is not None: self.client_device_names_configuration_subscription_operation.close() def on_client_device_names_configuration_update_event(self, event): try: if CONFIGURATION_CLIENT_DEVICE_NAMES in event.configuration_update_event.key_path: print('Received configuration update for list of client devices') self.update_smart_light_device_list() except: traceback.print_exc() def choose_random_color(): return random.choice(COLORS) def main(): try: # Create an IPC client and a smart light device manager. ipc_client = GreengrassCoreIPCClientV2() smart_light_manager = SmartLightDeviceManager(ipc_client) smart_light_manager.subscribe_to_shadow_update_accepted_events() smart_light_manager.subscribe_to_client_device_name_configuration_updates() try: # Keep the main thread alive, or the process will exit. while True: # Set each smart light device to a random color at a regular interval. for device_name in smart_light_manager.devices: device = smart_light_manager.devices[device_name] desired_color = choose_random_color() print('Chose random color (%s) for %s' % (desired_color, device_name)) if desired_color == device.desired_color: print('Desired color for %s is already %s' % (device_name, desired_color)) elif desired_color == device.reported_color: print('Reported color for %s is already %s' % (device_name, desired_color)) else: smart_light_manager.request_device_color_change( device, desired_color) print('Requested color change for %s to %s' % (device_name, desired_color)) time.sleep(SET_COLOR_INTERVAL) except InterruptedError: print('Application interrupted') smart_light_manager.close_shadow_update_accepted_subscription() smart_light_manager.close_client_device_names_configuration_subscription() except Exception: print('Exception occurred', file=sys.stderr) traceback.print_exc() exit(1) if __name__ == '__main__': main()

      此 Python 应用程序执行以下操作:

      • 读取组件的配置以获取要管理的智能灯客户端设备列表。

      • 使用订阅配置更新通知SubscribeToConfigurationUpdateIPC 操作。这些区域有:Amazon IoT Greengrass每次组件的配置更改时,核心软件都会发送通知。当组件收到配置更新通知时,它会更新其管理的智能灯客户端设备列表。

      • 获取每个智能灯光客户端设备的阴影以获取其初始颜色状态。

      • 每隔 15 秒将每台智能灯客户端设备的颜色设置为随机颜色。该组件更新客户端设备的事物影子以更改其颜色。此操作通过 MQTT 向客户端设备发送影子增量事件。

      • 在本地发布/订阅界面上使用订阅卷影更新已接受消息SubscribeToTopicIPC 操作。此组件接收这些消息以跟踪每个智能灯客户端设备的颜色。当智能灯客户端设备收到影子更新时,它会发送 MQTT 消息以确认它已收到更新。MQTT 桥将此消息转发到本地发布/订阅界面。

    4. 使用 Greengrass CLI 部署组件。部署此组件时,需要指定客户端设备列表,smartLightDeviceNames,它管理着谁的影子。ReplaceMyClientDevice1使用客户端设备的事物名称。

      Linux or Unix
      sudo /greengrass/v2/bin/greengrass-cli deployment create \ --recipeDir recipes \ --artifactDir artifacts \ --merge "com.example.clientdevices.MySmartLightManager=1.0.0" \ --update-config '{ "com.example.clientdevices.MySmartLightManager": { "MERGE": { "smartLightDeviceNames": [ "MyClientDevice1" ] } } }'
      Windows Command Prompt (CMD)
      C:\greengrass\v2/bin/greengrass-cli deployment create ^ --recipeDir recipes ^ --artifactDir artifacts ^ --merge "com.example.clientdevices.MySmartLightManager=1.0.0" ^ --update-config '{"com.example.clientdevices.MySmartLightManager":{"MERGE":{"smartLightDeviceNames":["MyClientDevice1"]}}}'
      PowerShell
      C:\greengrass\v2/bin/greengrass-cli deployment create ` --recipeDir recipes ` --artifactDir artifacts ` --merge "com.example.clientdevices.MySmartLightManager=1.0.0" ` --update-config '{ "com.example.clientdevices.MySmartLightManager": { "MERGE": { "smartLightDeviceNames": [ "MyClientDevice1" ] } } }'
  3. 查看组件日志以验证组件是否成功安装和运行。

    Linux or Unix
    sudo tail -f /greengrass/v2/logs/com.example.clientdevices.MySmartLightManager.log
    PowerShell
    gc C:\greengrass\v2/logs/com.example.clientdevices.MySmartLightManager.log -Tail 10 -Wait

    该组件发送请求以更改智能灯客户端设备的颜色。影子管理器接收请求并设置影子的desired状态。但是,智能灯光客户端设备尚未运行,因此影子已经reportedstate 不会发生变化。组件的日志包括以下消息。

    2022-07-07T03:49:24.908Z [INFO] (Copier) com.example.clientdevices.MySmartLightManager: stdout. Chose random color (blue) for MyClientDevice1. {scriptName=services.com.example.clientdevices.MySmartLightManager.lifecycle.Run, serviceName=com.example.clientdevices.MySmartLightManager, currentState=RUNNING} 2022-07-07T03:49:24.912Z [INFO] (Copier) com.example.clientdevices.MySmartLightManager: stdout. Requested color change for MyClientDevice1 to blue. {scriptName=services.com.example.clientdevices.MySmartLightManager.lifecycle.Run, serviceName=com.example.clientdevices.MySmartLightManager, currentState=RUNNING}

    您可以保持日志馈送处于打开状态,以查看组件何时打印消息。

  4. 下载并运行使用 Greengrass 发现功能并订阅设备影子更新的示例应用程序。在客户端设备上,执行以下操作:

    1. 切换到样本文件夹Amazon IoT Device SDKpython 的 v2。此示例应用程序使用 samples 文件夹中的命令行解析模块。

      cd aws-iot-device-sdk-python-v2/samples
    2. 使用文本编辑器创建名为的 Python 脚本basic_discovery_shadow.py包含下列内容。此应用程序使用 Greengrass 发现和影子在客户端设备和核心设备之间保持属性同步。

      例如,在基于 Linux 的系统上,你可以运行以下命令来使用 GNU nano 来创建文件。

      nano basic_discovery_shadow.py

      复制以下 Python 代码复制到文件中。

      # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0. from awscrt import io from awscrt import mqtt from awsiot import iotshadow from awsiot.greengrass_discovery import DiscoveryClient from awsiot import mqtt_connection_builder from concurrent.futures import Future import sys import threading import traceback from uuid import uuid4 # Parse arguments import command_line_utils; cmdUtils = command_line_utils.CommandLineUtils("Basic Discovery - Greengrass discovery example with device shadows.") cmdUtils.add_common_mqtt_commands() cmdUtils.add_common_topic_message_commands() cmdUtils.add_common_logging_commands() cmdUtils.register_command("key", "<path>", "Path to your key in PEM format.", True, str) cmdUtils.register_command("cert", "<path>", "Path to your client certificate in PEM format.", True, str) cmdUtils.remove_command("endpoint") cmdUtils.register_command("thing_name", "<str>", "The name assigned to your IoT Thing", required=True) cmdUtils.register_command("region", "<str>", "The region to connect through.", required=True) cmdUtils.register_command("shadow_property", "<str>", "The name of the shadow property you want to change (optional, default='color'", default="color") # Needs to be called so the command utils parse the commands cmdUtils.get_args() # Using globals to simplify sample code is_sample_done = threading.Event() mqtt_connection = None shadow_thing_name = cmdUtils.get_command_required("thing_name") shadow_property = cmdUtils.get_command("shadow_property") SHADOW_VALUE_DEFAULT = "off" class LockedData: def __init__(self): self.lock = threading.Lock() self.shadow_value = None self.disconnect_called = False self.request_tokens = set() locked_data = LockedData() def on_connection_interupted(connection, error, **kwargs): print('connection interrupted with error {}'.format(error)) def on_connection_resumed(connection, return_code, session_present, **kwargs): print('connection resumed with return code {}, session present {}'.format(return_code, session_present)) # Try IoT endpoints until we find one that works def try_iot_endpoints(): for gg_group in discover_response.gg_groups: for gg_core in gg_group.cores: for connectivity_info in gg_core.connectivity: try: print('Trying core {} at host {} port {}'.format(gg_core.thing_arn, connectivity_info.host_address, connectivity_info.port)) mqtt_connection = mqtt_connection_builder.mtls_from_path( endpoint=connectivity_info.host_address, port=connectivity_info.port, cert_filepath=cmdUtils.get_command_required("cert"), pri_key_filepath=cmdUtils.get_command_required("key"), ca_bytes=gg_group.certificate_authorities[0].encode('utf-8'), on_connection_interrupted=on_connection_interupted, on_connection_resumed=on_connection_resumed, client_id=cmdUtils.get_command_required("thing_name"), clean_session=False, keep_alive_secs=30) connect_future = mqtt_connection.connect() connect_future.result() print('Connected!') return mqtt_connection except Exception as e: print('Connection failed with exception {}'.format(e)) continue exit('All connection attempts failed') # Function for gracefully quitting this sample def exit(msg_or_exception): if isinstance(msg_or_exception, Exception): print("Exiting sample due to exception.") traceback.print_exception(msg_or_exception.__class__, msg_or_exception, sys.exc_info()[2]) else: print("Exiting sample:", msg_or_exception) with locked_data.lock: if not locked_data.disconnect_called: print("Disconnecting...") locked_data.disconnect_called = True future = mqtt_connection.disconnect() future.add_done_callback(on_disconnected) def on_disconnected(disconnect_future): # type: (Future) -> None print("Disconnected.") # Signal that sample is finished is_sample_done.set() def on_get_shadow_accepted(response): # type: (iotshadow.GetShadowResponse) -> None try: with locked_data.lock: # check that this is a response to a request from this session try: locked_data.request_tokens.remove(response.client_token) except KeyError: return print("Finished getting initial shadow state.") if locked_data.shadow_value is not None: print(" Ignoring initial query because a delta event has already been received.") return if response.state: if response.state.delta: value = response.state.delta.get(shadow_property) if value: print(" Shadow contains delta value '{}'.".format(value)) change_shadow_value(value) return if response.state.reported: value = response.state.reported.get(shadow_property) if value: print(" Shadow contains reported value '{}'.".format(value)) set_local_value_due_to_initial_query(response.state.reported[shadow_property]) return print(" Shadow document lacks '{}' property. Setting defaults...".format(shadow_property)) change_shadow_value(SHADOW_VALUE_DEFAULT) return except Exception as e: exit(e) def on_get_shadow_rejected(error): # type: (iotshadow.ErrorResponse) -> None try: # check that this is a response to a request from this session with locked_data.lock: try: locked_data.request_tokens.remove(error.client_token) except KeyError: return if error.code == 404: print("Thing has no shadow document. Creating with defaults...") change_shadow_value(SHADOW_VALUE_DEFAULT) else: exit("Get request was rejected. code:{} message:'{}'".format( error.code, error.message)) except Exception as e: exit(e) def on_shadow_delta_updated(delta): # type: (iotshadow.ShadowDeltaUpdatedEvent) -> None try: print("Received shadow delta event.") if delta.state and (shadow_property in delta.state): value = delta.state[shadow_property] if value is None: print(" Delta reports that '{}' was deleted. Resetting defaults...".format(shadow_property)) change_shadow_value(SHADOW_VALUE_DEFAULT) return else: print(" Delta reports that desired value is '{}'. Changing local value...".format(value)) if (delta.client_token is not None): print (" ClientToken is: " + delta.client_token) change_shadow_value(value, delta.client_token) else: print(" Delta did not report a change in '{}'".format(shadow_property)) except Exception as e: exit(e) def on_publish_update_shadow(future): #type: (Future) -> None try: future.result() print("Update request published.") except Exception as e: print("Failed to publish update request.") exit(e) def on_update_shadow_accepted(response): # type: (iotshadow.UpdateShadowResponse) -> None try: # check that this is a response to a request from this session with locked_data.lock: try: locked_data.request_tokens.remove(response.client_token) except KeyError: return try: if response.state.reported != None: if shadow_property in response.state.reported: print("Finished updating reported shadow value to '{}'.".format(response.state.reported[shadow_property])) # type: ignore else: print ("Could not find shadow property with name: '{}'.".format(shadow_property)) # type: ignore else: print("Shadow states cleared.") # when the shadow states are cleared, reported and desired are set to None except: exit("Updated shadow is missing the target property") except Exception as e: exit(e) def on_update_shadow_rejected(error): # type: (iotshadow.ErrorResponse) -> None try: # check that this is a response to a request from this session with locked_data.lock: try: locked_data.request_tokens.remove(error.client_token) except KeyError: return exit("Update request was rejected. code:{} message:'{}'".format( error.code, error.message)) except Exception as e: exit(e) def set_local_value_due_to_initial_query(reported_value): with locked_data.lock: locked_data.shadow_value = reported_value def change_shadow_value(value, token=None): with locked_data.lock: if locked_data.shadow_value == value: print("Local value is already '{}'.".format(value)) return print("Changed local shadow value to '{}'.".format(value)) locked_data.shadow_value = value print("Updating reported shadow value to '{}'...".format(value)) reuse_token = token is not None # use a unique token so we can correlate this "request" message to # any "response" messages received on the /accepted and /rejected topics if not reuse_token: token = str(uuid4()) # if the value is "clear shadow" then send a UpdateShadowRequest with None # for both reported and desired to clear the shadow document completely. if value == "clear_shadow": tmp_state = iotshadow.ShadowState(reported=None, desired=None, reported_is_nullable=True, desired_is_nullable=True) request = iotshadow.UpdateShadowRequest( thing_name=shadow_thing_name, state=tmp_state, client_token=token, ) # Otherwise, send a normal update request else: # if the value is "none" then set it to a Python none object to # clear the individual shadow property if value == "none": value = None request = iotshadow.UpdateShadowRequest( thing_name=shadow_thing_name, state=iotshadow.ShadowState( reported={ shadow_property: value } ), client_token=token, ) future = shadow_client.publish_update_shadow(request, mqtt.QoS.AT_LEAST_ONCE) if not reuse_token: locked_data.request_tokens.add(token) future.add_done_callback(on_publish_update_shadow) if __name__ == '__main__': tls_options = io.TlsContextOptions.create_client_with_mtls_from_path(cmdUtils.get_command_required("cert"), cmdUtils.get_command_required("key")) if cmdUtils.get_command(cmdUtils.m_cmd_ca_file): tls_options.override_default_trust_store_from_path(None, cmdUtils.get_command(cmdUtils.m_cmd_ca_file)) tls_context = io.ClientTlsContext(tls_options) socket_options = io.SocketOptions() print('Performing greengrass discovery...') discovery_client = DiscoveryClient(io.ClientBootstrap.get_or_create_static_default(), socket_options, tls_context, cmdUtils.get_command_required("region")) resp_future = discovery_client.discover(cmdUtils.get_command_required("thing_name")) discover_response = resp_future.result() print(discover_response) if cmdUtils.get_command("print_discover_resp_only"): exit(0) mqtt_connection = try_iot_endpoints() shadow_client = iotshadow.IotShadowClient(mqtt_connection) try: # Subscribe to necessary topics. # Note that is **is** important to wait for "accepted/rejected" subscriptions # to succeed before publishing the corresponding "request". print("Subscribing to Update responses...") update_accepted_subscribed_future, _ = shadow_client.subscribe_to_update_shadow_accepted( request=iotshadow.UpdateShadowSubscriptionRequest(thing_name=shadow_thing_name), qos=mqtt.QoS.AT_LEAST_ONCE, callback=on_update_shadow_accepted) update_rejected_subscribed_future, _ = shadow_client.subscribe_to_update_shadow_rejected( request=iotshadow.UpdateShadowSubscriptionRequest(thing_name=shadow_thing_name), qos=mqtt.QoS.AT_LEAST_ONCE, callback=on_update_shadow_rejected) # Wait for subscriptions to succeed update_accepted_subscribed_future.result() update_rejected_subscribed_future.result() print("Subscribing to Get responses...") get_accepted_subscribed_future, _ = shadow_client.subscribe_to_get_shadow_accepted( request=iotshadow.GetShadowSubscriptionRequest(thing_name=shadow_thing_name), qos=mqtt.QoS.AT_LEAST_ONCE, callback=on_get_shadow_accepted) get_rejected_subscribed_future, _ = shadow_client.subscribe_to_get_shadow_rejected( request=iotshadow.GetShadowSubscriptionRequest(thing_name=shadow_thing_name), qos=mqtt.QoS.AT_LEAST_ONCE, callback=on_get_shadow_rejected) # Wait for subscriptions to succeed get_accepted_subscribed_future.result() get_rejected_subscribed_future.result() print("Subscribing to Delta events...") delta_subscribed_future, _ = shadow_client.subscribe_to_shadow_delta_updated_events( request=iotshadow.ShadowDeltaUpdatedSubscriptionRequest(thing_name=shadow_thing_name), qos=mqtt.QoS.AT_LEAST_ONCE, callback=on_shadow_delta_updated) # Wait for subscription to succeed delta_subscribed_future.result() # The rest of the sample runs asynchronously. # Issue request for shadow's current state. # The response will be received by the on_get_accepted() callback print("Requesting current shadow state...") with locked_data.lock: # use a unique token so we can correlate this "request" message to # any "response" messages received on the /accepted and /rejected topics token = str(uuid4()) publish_get_future = shadow_client.publish_get_shadow( request=iotshadow.GetShadowRequest(thing_name=shadow_thing_name, client_token=token), qos=mqtt.QoS.AT_LEAST_ONCE) locked_data.request_tokens.add(token) # Ensure that publish succeeds publish_get_future.result() except Exception as e: exit(e) # Wait for the sample to finish (user types 'quit', or an error occurs) is_sample_done.wait()

      此 Python 应用程序执行以下操作:

      • 使用 Greengrass 发现功能来发现并连接到核心设备。

      • 从核心设备请求影子文档以获取属性的初始状态。

      • 订阅影子增量事件,核心设备在属性发生时发送这些事件desired值不同于其reported值。当应用程序收到 shadow delta 事件时,它会更改属性的值,并向核心设备发送更新以将新值设置为其reported值。

      该应用程序结合了 Greengrass 的发现和来自Amazon IoT Device SDKv2

    3. 运行示例应用程序。此应用程序需要参数来指定客户端设备事物名称、要使用的影子属性以及对连接进行身份验证和保护的证书。

      • ReplaceMyClientDevice1使用客户端设备的事物名称。

      • Replace~/certs/AmazonRootca1.pem使用客户端设备上的 Amazon 根 CA 证书的路径。

      • Replace~/certs/device.pem.crt带有指向客户端设备上的设备证书的路径。

      • Replace~/certs/private.pem.key使用客户端设备上的私有密钥匙文件路径。

      • Replaceus-east-1用Amazon您的客户端设备和核心设备所在的区域。

      python3 basic_discovery_shadow.py \ --thing_name MyClientDevice1 \ --shadow_property color \ --ca_file ~/certs/AmazonRootCA1.pem \ --cert ~/certs/device.pem.crt \ --key ~/certs/private.pem.key \ --region us-east-1 \ --verbosity Warn

      示例应用程序订阅影子主题并等待从核心设备接收影子增量事件。如果输出表明应用程序接收并响应 shadow delta 事件,则客户端设备可以成功地与其在核心设备上的影子进行交互。

      Performing greengrass discovery... awsiot.greengrass_discovery.DiscoverResponse(gg_groups=[awsiot.greengrass_discovery.GGGroup(gg_group_id='greengrassV2-coreDevice-MyGreengrassCore', cores=[awsiot.greengrass_discovery.GGCore(thing_arn='arn:aws:iot:us-east-1:123456789012:thing/MyGreengrassCore', connectivity=[awsiot.greengrass_discovery.ConnectivityInfo(id='203.0.113.0', host_address='203.0.113.0', metadata='', port=8883)])], certificate_authorities=['-----BEGIN CERTIFICATE-----\nMIICiT...EXAMPLE=\n-----END CERTIFICATE-----\n'])]) Trying core arn:aws:iot:us-east-1:123456789012:thing/MyGreengrassCore at host 203.0.113.0 port 8883 Connected! Subscribing to Update responses... Subscribing to Get responses... Subscribing to Delta events... Requesting current shadow state... Received shadow delta event. Delta reports that desired value is 'purple'. Changing local value... ClientToken is: 3dce4d3f-e336-41ac-aa4f-7882725f0033 Changed local shadow value to 'purple'. Updating reported shadow value to 'purple'... Update request published.

      相反,如果应用程序输出错误,请参阅排除 Greengrass 发现问题.

      您还可以查看核心设备上的 Greengrass 日志,以验证客户端设备是否成功连接并发送消息。有关更多信息,请参阅 显示器Amazon IoT Greengrass圆木

  5. 再次查看组件日志以验证组件是否收到来自 smart light 客户端设备的影子更新确认。

    Linux or Unix
    sudo tail -f /greengrass/v2/logs/com.example.clientdevices.MySmartLightManager.log
    PowerShell
    gc C:\greengrass\v2/logs/com.example.clientdevices.MySmartLightManager.log -Tail 10 -Wait

    该组件记录消息以确认智能灯客户端设备更改了颜色。

    2022-07-07T03:49:24.908Z [INFO] (Copier) com.example.clientdevices.MySmartLightManager: stdout. Chose random color (blue) for MyClientDevice1. {scriptName=services.com.example.clientdevices.MySmartLightManager.lifecycle.Run, serviceName=com.example.clientdevices.MySmartLightManager, currentState=RUNNING} 2022-07-07T03:49:24.912Z [INFO] (Copier) com.example.clientdevices.MySmartLightManager: stdout. Requested color change for MyClientDevice1 to blue. {scriptName=services.com.example.clientdevices.MySmartLightManager.lifecycle.Run, serviceName=com.example.clientdevices.MySmartLightManager, currentState=RUNNING} 2022-07-07T03:49:24.959Z [INFO] (Copier) com.example.clientdevices.MySmartLightManager: stdout. Received shadow update confirmation from client device: MyClientDevice1. {scriptName=services.com.example.clientdevices.MySmartLightManager.lifecycle.Run, serviceName=com.example.clientdevices.MySmartLightManager, currentState=RUNNING}
注意

客户端设备的影子在核心设备和客户端设备之间同步。但是,核心设备不会将客户端设备的影子与Amazon IoT Core. 你可以将影子同步到Amazon IoT Core例如,查看或修改队列中所有设备的状态。有关如何配置阴影管理器组件以将阴影管理器组件配置为与同步阴影的更多信息Amazon IoT Core,请参阅将本地设备影子同步Amazon IoT Core.

您已完成本教程。客户端设备连接到核心设备,将 MQTT 消息发送到Amazon IoT Core和 Greengrass 组件,并从核心设备接收影子更新。有关本教程中涵盖主题的更多信息,请参阅以下内容: