Amazon IoT Analytics 不再向新客户提供。的现有客户 Amazon IoT Analytics 可以继续照常使用该服务。了解更多
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Docker 容器。
本节包含有关如何构建您自己的 Docker 容器的信息。如果重复使用第三方构建的 Docker 容器,则存在安全风险:这些容器可能使用您的用户权限执行任意代码。在使用之前,请确保您信任任何第三方容器的作者。
使用以下步骤可设置针对自上次执行分析以来到达的数据的定期数据分析:
-
创建一个 Docker 容器,其中包含您的数据应用程序以及任何必要的库或其他依赖项。
IotAnalytics Jupyter 扩展提供了容器化 API 来协助容器化过程。您还可以运行自己创建内容的映像,其中您创建或组装了应用程序工具集,以执行所需的数据分析或计算。通过 Amazon IoT Analytics,您借助变量来定义容器化应用程序的输入数据源和 Docker 容器输出数据的目标。(自定义 Docker 容器输入/输出变量包含有关将变量与自定义容器结合使用的更多信息。)
-
将容器上传到 Amazon ECR 注册表。
-
创建数据存储,以接收并存储来自设备的消息(数据)(
iotanalytics: CreateDatastore
) -
创建消息发送通道(
iotanalytics: CreateChannel
)。 -
创建用于将通道连接至数据存储的管道 (
iotanalytics: CreatePipeline
)。 -
创建一个 IAM 角色来授予向 Amazon IoT Analytics 通道发送数据的权限(
iam: CreateRole.
) -
创建一个 IoT 规则,以使用 SQL 查询将通道连接到消息数据源(
iot: CreateTopicRule
字段topicRulePayload:actions:iotAnalytics
)。在设备通过 MQTT 发送具有相应主题的消息时,它将路由到您的通道。或者,您可以使用iotanalytics: BatchPutMessage
从能够使用 Amazon 开发工具包或 Amazon CLI 的设备直接向通道中发送消息。 -
创建一个 SQL 数据集,其创建由时间计划(
iotanalytics: CreateDataset,
字段actions: queryAction:sqlQuery
)触发。您还可以指定要应用于消息数据的预筛选器,以帮助将消息限制为自上次执行操作以来到达的消息。(字段
actions:queryAction:filters:deltaTime:timeExpression
提供了可用于确定消息时间的表达式,而字段actions:queryAction:filters:deltaTime:offsetSeconds
则指定了消息到达的可能延迟。)预筛选器和触发器计划共同确定了您的增量时段。每个新的 SQL 数据集是使用在上次创建 SQL 数据集后收到的消息创建的。(首次创建 SQL 数据集是如何实现的?根据计划和预筛选器估算上次创建数据集的时间。)
-
创建由第一个 (CreateDataset 字段
trigger:dataset
) 的创建所触发的另一个数据集。对于此数据集,您需指定一个指向您在第一步中创建的 Docker 容器并提供运行该 Docker 容器所需信息的“容器操作”(字段actions:containerAction
)。在这里,您还可以指定:-
您的账户中存储的 Docker 容器的 ARN (
image
)。 -
角色的 ARN,该角色用于授予访问所需资源所需的系统权限,以便运行容器操作 (
executionRoleArn
)。 -
执行容器操作的资源的配置 (
resourceConfiguration
)。 -
用于执行容器操作的计算资源的类型(
computeType
,可能的值为:ACU_1 [vCPU=4, memory=16GiB] or ACU_2 [vCPU=8, memory=32GiB]
)。 -
用于执行容器操作的资源实例可以使用的持久性存储的大小(以 GB 为单位)(
volumeSizeInGB
)。 -
在应用程序的执行上下文中使用的变量值(基本上是传递给应用程序的参数)(
variables
)。在执行容器时,将替换这些变量。这样,您就可以使用在创建数据集内容时提供的不同变量(参数)来运行相同的容器。在容器化过程中,IotAnalytics Jupyter 扩展通过在笔记本中自动识别变量并将其标记为可用来简化此过程。您可以选择已识别的变量或添加自己的自定义变量。在运行容器之前,系统会将每个变量的值替换为执行时的当前值。
-
其中的一个变量是数据集的名称,数据集的最新内容将作为应用程序输入(这是您在上一步中创建的数据集的名称)(
datasetContentVersionValue:datasetName
)。
-
通过可生成数据集的 SQL 查询和增量窗口,以及应用程序的容器,Amazon IoT Analytics 会创建一个计划的生产数据集,它以您指定的时间间隔对增量窗口中的数据运行,会生成所需的输出并发送通知。
您可以随时暂停您的生产数据集应用程序,然后再恢复。当您恢复生产数据集应用程序时,Amazon IoT Analytics 在默认情况下将同步自上次执行以来已到达但尚未分析的所有数据。您还可通过执行一系列连续运行,来配置恢复生产数据集作业窗口长度的方式。或者,您也可以通过只捕获适合增量时段的指定大小的新到达数据来恢复生产数据集应用程序。
在创建或定义由其他数据集的创建所触发的数据集时,请注意以下限制:
-
只有容器数据集可由 SQL 数据集触发。
-
一个 SQL 数据集最多可以触发 10 个容器数据集。
在创建由 SQL 数据集触发的容器数据集时,可能会返回以下错误:
-
“Triggering dataset can only be added on a container dataset”(只能在容器数据集中添加触发数据集)
-
“There can only be one triggering dataset”(只能存在一个触发数据集)
如果您尝试定义由两个不同 SQL 数据集触发的容器数据集,则会发生此错误。
-
“The triggering data set <dataset-name> cannot be triggered by a container dataset”(触发数据集 <数据集名称> 不能被容器数据集触发)
如果您尝试定义由其他容器数据集触发的容器数据集,则会发生此错误。
-
“<N> datasets are already dependent on <dataset-name> dataset”(已经有 <N> 个数据集依赖于 <数据集名称> 数据集)
如果您尝试定义的另一个容器数据集是由已触发 10 个容器数据集的 SQL 数据集触发的,则会出现该错误。
-
“Exactly one trigger type should be provided”(应确切提供一种触发器类型)
如果您尝试定义同时由计划触发器和数据集触发器触发的数据集,则会发生此错误。
自定义 Docker 容器的输入/输出变量
本节演示您的自定义 Docker 映像运行的程序如何读取输入变量并上传其输出。
参数文件
输入变量以及要将输出上传到的目的地存储在一个 JSON 文件中,它位于执行 Docker 映像的实例上的 /opt/ml/input/data/iotanalytics/params
中。下面是该文件的内容示例。
{ "Context": { "OutputUris": { "html": "s3://aws-iot-analytics-dataset-xxxxxxx/notebook/results/iotanalytics-xxxxxxx/output.html", "ipynb": "s3://aws-iot-analytics-dataset-xxxxxxx/notebook/results/iotanalytics-xxxxxxx/output.ipynb" } }, "Variables": { "source_dataset_name": "mydataset", "source_dataset_version_id": "xxxx", "example_var": "hello world!", "custom_output": "s3://aws-iot-analytics/dataset-xxxxxxx/notebook/results/iotanalytics-xxxxxxx/output.txt" } }
除了数据集的名称和版本 ID 外,Variables
部分还包含在 iotanalytics:CreateDataset
调用中指定的变量 - 在此示例中,为变量 example_var
给定值 hello world!
。在 custom_output
变量中还提供了自定义输出 URI。该 OutputUris
字段包含容器可将其输出上传到的默认位置 - 在本示例中,同时提供了 ipynb 和 html 输出的默认输出 URI。
输入变量
通过 Docker 映像启动的程序可从 params
文件中读取变量。下面的示例程序将打开 params
文件、对文件进行分析并打印 example_var
变量的值。
import json with open("/opt/ml/input/data/iotanalytics/params") as param_file: params = json.loads(param_file.read()) example_var = params["Variables"]["example_var"] print(example_var)
上传输出
通过 Docker 映像启动的程序还可以将其输出存储在 Amazon S3 位置。必须使用“bucket-owner-full-control”访问控制列表加载输出。该访问列表将向 Amazon IoT Analytics 服务授予对已上传输出的控制。在本示例中,我们对上一示例进行扩展,将 example_var
的内容上传至 params
文件中由 custom_output
定义的 Amazon S3 位置。
import boto3 import json from urllib.parse import urlparse ACCESS_CONTROL_LIST = "bucket-owner-full-control" with open("/opt/ml/input/data/iotanalytics/params") as param_file: params = json.loads(param_file.read()) example_var = params["Variables"]["example_var"] outputUri = params["Variables"]["custom_output"] # break the S3 path into a bucket and key bucket = urlparse(outputUri).netloc key = urlparse(outputUri).path.lstrip("/") s3_client = boto3.client("s3") s3_client.put_object(Bucket=bucket, Key=key, Body=example_var, ACL=ACCESS_CONTROL_LIST)