开发 Amazon IoT TwinMaker 时间序列数据连接器 - Amazon IoT TwinMaker
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

开发 Amazon IoT TwinMaker 时间序列数据连接器

本节介绍如何在 step-by-step流程中开发时间序列数据连接器。此外,我们还提供了基于整个 cookie 出厂示例的时间序列数据连接器示例,其中包括 3D 模型、实体、组件、警报和连接器。Cookie 工厂示例源可在Amazon IoT TwinMaker 示例 GitHub 存储库中找到。

Amazon IoT TwinMaker 时间序列数据连接器先决条件

开发时间序列数据连接器之前,建议您完成以下任务:

注意

有关完全实施的连接器示例,请参阅我们的 cookie 出厂示例实现介绍。

时间序列数据连接器背景

想象一下,您正在与一家工厂合作,那里有一套曲奇搅拌机和一个水箱。您想构建这些物理实体的 Amazon IoT TwinMaker 数字双胞胎,以便您可以通过检查各种时间序列指标来监控它们的运行状态。

您已经在现场设置了传感器,并且已经在将测量数据流式传输至 Timestream 数据库。您希望能够以最小的成本查看和整理 Amazon IoT TwinMaker 中的测量数据。您可以使用时间序列数据连接器完成此任务。下图所示为遥测表示例,该表通过时间序列连接器填充。

遥测表数据示例,包括资产 ID、类型、度量、时间和值。

此屏幕截图中使用的数据集和 Timestream 表可在Amazon IoT TwinMaker 示例 GitHub 存储库中找到。另请参阅已经实施的 cookie 出厂示例连接器,其结果如前面的屏幕截图所示。

时间序列数据连接器数据流

对于数据平面查询,从组件和组件类型定义中 Amazon IoT TwinMaker 获取组件和组件类型的相应属性。 Amazon IoT TwinMaker 将属性与查询中的任何 API 查询参数一起转发给 Amazon Lambda 函数。

Amazon IoT TwinMaker 使用 Lambda 函数访问和解析来自数据源的查询,并返回这些查询的结果。Lambda 函数使用数据平面中的组件和组件类型属性解析初始请求。

Lambda 查询的结果将映射至 API 响应并返回给您。

Amazon IoT TwinMaker 定义数据连接器接口,并使用该接口与 Lambda 函数进行交互。您可通过数据连接器,从 Amazon IoT TwinMaker API 查询数据来源,而无需进行任何数据迁移。下图概述了前几段描述的基本数据流。

API 请求和响应使用 3P 连接器请求和访问数据源的响应。

开发时间序列数据连接器

以下程序概述了一种开发模型,该模型以增量方式构建功能性时间序列数据连接器。基本步骤如下所示:

  1. 创建有效的基础组件类型

    在组件类型中,您可以定义在组件间共享的通用属性。要了解有关组件类型定义的更多信息,请参阅使用和创建组件类型

    Amazon IoT TwinMaker 使用实体-组件建模模式,因此每个组件都连接到一个实体。我们建议您将每个物理项目建模为一个实体,并使用不同的组件类型对不同的数据源进行建模。

    以下示例介绍了带一项目属性的 Timestream 模板组件类型:

    {"componentTypeId": "com.example.timestream-telemetry", "workspaceId": "MyWorkspace", "functions": { "dataReader": { "implementedBy": { "lambda": { "arn": "lambdaArn" } } } }, "propertyDefinitions": { "telemetryType": { "dataType": { "type": "STRING" }, "isExternalId": false, "isStoredExternally": false, "isTimeSeries": false, "isRequiredInEntity": true }, "telemetryId": { "dataType": { "type": "STRING" }, "isExternalId": true, "isStoredExternally": false, "isTimeSeries": false, "isRequiredInEntity": true }, "Temperature": { "dataType": { "type": "DOUBLE" }, "isExternalId": false, "isTimeSeries": true, "isStoredExternally": true, "isRequiredInEntity": false } } }

    此组件类型的关键元素如下:

    • telemetryId属性标识相应数据源中物理项目的唯一密钥。数据连接器使用此属性作为筛选条件,仅查询与给定项目关联的值。此外,如果您将 telemetryId 属性值纳入数据平面 API 响应,则客户端会获取该 ID,必要时可以执行反向查找。

    • lambdaArn 字段标识组件类型所用的 Lambda 函数。

    • isRequiredInEntity 标志强制创建 ID。这个标志是必需的,这样在创建组件时,项目的 ID 也会被实例化。

    • TelemetryId作为外部 ID 添加到组件类型中,以便可以在 Timestream 表中识别该项目。

  2. 按组件类型创建组件

    要使用您创建的组件类型,必须创建组件并将其附加至数据检索目标实体。以下步骤详细介绍了此组件创建流程:

    1. 导航到 Amazon IoT TwinMaker 控制台

    2. 选择并打开您在其中创建组件类型的同一个工作区。

    3. 导航至实体页面。

    4. 创建新实体,或从表格中选择现有实体。

    5. 选择要使用的实体后,选择 添加组件以打开添加组件页面。

    6. 创建组件名称,对于组件类型,选择您要通过模板创建的组件类型。创建有效的基本组件类型

  3. 通过组建类型调用 Lambda 连接器

    Lambda 连接器需要访问数据来源,并根据输入生成查询语句并将其转发至数据来源。以下示例显示了执行此操作的 JSON 请求模板。

    { "workspaceId": "MyWorkspace", "entityId": "MyEntity", "componentName": "TelemetryData", "selectedProperties": ["Temperature"], "startTime": "2022-08-25T00:00:00Z", "endTime": "2022-08-25T00:00:05Z", "maxResults": 3, "orderByTime": "ASCENDING", "properties": { "telemetryType": { "definition": { "dataType": { "type": "STRING" }, "isExternalId": false, "isFinal": false, "isImported": false, "isInherited": false, "isRequiredInEntity": false, "isStoredExternally": false, "isTimeSeries": false }, "value": { "stringValue": "Mixer" } }, "telemetryId": { "definition": { "dataType": { "type": "STRING" }, "isExternalId": true, "isFinal": true, "isImported": false, "isInherited": false, "isRequiredInEntity": true, "isStoredExternally": false, "isTimeSeries": false }, "value": { "stringValue": "item_A001" } }, "Temperature": { "definition": { "dataType": { "type": "DOUBLE", }, "isExternalId": false, "isFinal": false, "isImported": true, "isInherited": false, "isRequiredInEntity": false, "isStoredExternally": false, "isTimeSeries": true } } } }

    请求关键要素:

    • selectedProperties 是一种列表,您可填充想要 Timestream 测量的属性。

    • startDateTimestartTimeEndDateTimeendTime 字段指定请求的时间范围。这决定了返回测量值的样本范围。

    • entityId 是您要从中查询数据的实体名称。

    • componentName 是您要从中查询数据的组件名称。

    • 使用 orderByTime 字段来整理结果的显示顺序。

    在前面的示例请求中,我们希望在给定时间窗口内按所选时间顺序获得给定项目的一系列样本。响应语句可概括如下:

    { "propertyValues": [ { "entityPropertyReference": { "entityId": "MyEntity", "componentName": "TelemetryData", "propertyName": "Temperature" }, "values": [ { "time": "2022-08-25T00:00:00Z", "value": { "doubleValue": 588.168 } }, { "time": "2022-08-25T00:00:01Z", "value": { "doubleValue": 592.4224 } }, { "time": "2022-08-25T00:00:02Z", "value": { "doubleValue": 594.9383 } } ] } ], "nextToken": "..." }
  4. 将您的组件类型,使其包含两种属性

    以下 JSON 模板显示了包含两个属性的有效组件类型:

    { "componentTypeId": "com.example.timestream-telemetry", "workspaceId": "MyWorkspace", "functions": { "dataReader": { "implementedBy": { "lambda": { "arn": "lambdaArn" } } } }, "propertyDefinitions": { "telemetryType": { "dataType": { "type": "STRING" }, "isExternalId": false, "isStoredExternally": false, "isTimeSeries": false, "isRequiredInEntity": true }, "telemetryId": { "dataType": { "type": "STRING" }, "isExternalId": true, "isStoredExternally": false, "isTimeSeries": false, "isRequiredInEntity": true }, "Temperature": { "dataType": { "type": "DOUBLE" }, "isExternalId": false, "isTimeSeries": true, "isStoredExternally": true, "isRequiredInEntity": false }, "RPM": { "dataType": { "type": "DOUBLE" }, "isExternalId": false, "isTimeSeries": true, "isStoredExternally": true, "isRequiredInEntity": false } } }
  5. 更新 Lambda 连接器,以处理第二属性

    Amazon IoT TwinMaker 数据平面 API 支持在单个请求中查询多个属性,并通过提供列表来 Amazon IoT TwinMaker 跟踪对连接器的单个请求selectedProperties

    以下 JSON 请求所示为修改后的模板,该模板现在支持对两个属性的请求。

    { "workspaceId": "MyWorkspace", "entityId": "MyEntity", "componentName": "TelemetryData", "selectedProperties": ["Temperature", "RPM"], "startTime": "2022-08-25T00:00:00Z", "endTime": "2022-08-25T00:00:05Z", "maxResults": 3, "orderByTime": "ASCENDING", "properties": { "telemetryType": { "definition": { "dataType": { "type": "STRING" }, "isExternalId": false, "isFinal": false, "isImported": false, "isInherited": false, "isRequiredInEntity": false, "isStoredExternally": false, "isTimeSeries": false }, "value": { "stringValue": "Mixer" } }, "telemetryId": { "definition": { "dataType": { "type": "STRING" }, "isExternalId": true, "isFinal": true, "isImported": false, "isInherited": false, "isRequiredInEntity": true, "isStoredExternally": false, "isTimeSeries": false }, "value": { "stringValue": "item_A001" } }, "Temperature": { "definition": { "dataType": { "type": "DOUBLE" }, "isExternalId": false, "isFinal": false, "isImported": true, "isInherited": false, "isRequiredInEntity": false, "isStoredExternally": false, "isTimeSeries": true } }, "RPM": { "definition": { "dataType": { "type": "DOUBLE" }, "isExternalId": false, "isFinal": false, "isImported": true, "isInherited": false, "isRequiredInEntity": false, "isStoredExternally": false, "isTimeSeries": true } } } }

    同样,还按下例更新相应的响应:

    { "propertyValues": [ { "entityPropertyReference": { "entityId": "MyEntity", "componentName": "TelemetryData", "propertyName": "Temperature" }, "values": [ { "time": "2022-08-25T00:00:00Z", "value": { "doubleValue": 588.168 } }, { "time": "2022-08-25T00:00:01Z", "value": { "doubleValue": 592.4224 } }, { "time": "2022-08-25T00:00:02Z", "value": { "doubleValue": 594.9383 } } ] }, { "entityPropertyReference": { "entityId": "MyEntity", "componentName": "TelemetryData", "propertyName": "RPM" }, "values": [ { "time": "2022-08-25T00:00:00Z", "value": { "doubleValue": 59 } }, { "time": "2022-08-25T00:00:01Z", "value": { "doubleValue": 60 } }, { "time": "2022-08-25T00:00:02Z", "value": { "doubleValue": 60 } } ] } ], "nextToken": "..." }
    注意

    就本例分页而言,请求中的页面大小适用于所有属性。这意味着查询中有五个属性,页面大小为 100,如果源中有足够的数据点,则每个属性应有 100 个数据点,总共有 500 个数据点。

    有关实现示例,请参阅上的 Snowflake 连接器示例。 GitHub

改进您的数据连接器

处理异常

Lambda 连接器可以安全抛出异常。在数据层面 API 调用中, Amazon IoT TwinMaker 服务等待 Lambda 函数返回响应。如果连接器实现引发异常,则将异常类型 Amazon IoT TwinMaker 转换为ConnectorFailure,让 API 客户端意识到连接器内部发生了问题。

处理分页

在示例中,Timestream 提供了实用函数,可以帮助本地支持分页。但是,对于其他部分查询接口(例如 SQL),可能需要额外操作才能实现高效分页算法。Snowflake 连接器示例可以处理 SQL 接口中的分页。

当 Amazon IoT TwinMaker 通过连接器响应接口返回新令牌时,该令牌在返回到 API 客户端之前会被加密。当令牌包含在另一个请求中时,先对其进行 Amazon IoT TwinMaker 解密,然后再将其转发到 Lambda 连接器。我们建议您避免向令牌中添加敏感信息。

测试您的连接器

尽管在将连接器连接至组件类型之后,您仍然可以更新实施,但我们强烈建议您在集成 Amazon IoT TwinMaker之前验证 Lambda 连接器。

有多种方法可以测试您的 Lambda 连接器:您可以在 Lambda 控制台中测试 Lambda 连接器,也可以在 Amazon CDK中本地测试。

有关测试 Lambda 函数的更多信息,请参阅测试 Lambda 函数和本地测试应用程序。 Amazon CDK

安全性

有关 Timestream 最佳安全方法文档,请参阅 Timestream 安全措施

有关 SQL 注入防护的示例,请参阅 Amazon IoT TwinMaker 示例 GitHub 存储库中的以下 Python 脚本。

创建 Amazon IoT TwinMaker 资源

实现 Lambda 函数后,您可以通过Amazon IoT TwinMaker 控制台或 Amazon IoT TwinMaker API 创建诸如组件类型、实体和组件之类的资源。

注意

如果您按照 GitHub 示例中的设置说明进行操作,则所有 Amazon IoT TwinMaker 资源都将自动可用。您可以在Amazon IoT TwinMaker GitHub 样本中查看组件类型定义。一旦任何组件使用了此组件类型,就无法更新该组件类型的属性定义和函数。

集成测试

我们建议与进行集成测试 Amazon IoT TwinMaker ,以验证数据平面查询是否有效 end-to-end。您可以通过 GetPropertyValueHistoryAPI 或在Amazon IoT TwinMaker 控制台中轻松执行此操作

TwinMaker 组件信息控制台页面显示组件的名称、类型、状态等。

在 Amazon IoT TwinMaker 控制台中,转到组件详细信息,然后在 “测试” 下,您会看到组件中的所有属性都列在那里。控制台的测试区域允许您测试时间序列属性和 non-time-series 属性。对于时间序列属性,您也可以使用 GetPropertyValueHistoryAPI,对于 non-time-series 属性,也可以使用 GetPropertyValueAPI。如果您的 Lambda 连接器支持多属性查询,则可以选择多种属性。

TwinMaker 组件信息控制台页面中显示组件测试的一部分。

接下来做什么

现在,您可以设置 Amazon IoT TwinMaker Grafana 控制面板以查看指标。您还可以在示例 GitHub 存储库中浏览其他数据连接器Amazon IoT TwinMaker 示例,以查看它们是否适合您的用例。