本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Docker 容器
本节包括有关如何构建自己的 Docker 容器的信息。如果重复使用第三方构建的 Docker 容器,则存在安全风险:这些容器可能使用您的用户权限执行任意代码。在使用之前,请确保您信任任何第三方容器的作者。
使用以下步骤可设置针对自上次执行分析以来到达的数据的定期数据分析:
-
创建一个 Docker 容器,其中包含您的数据应用程序以及任何必要的库或其他依赖项。
这些区域有: IotAnalytics Jupyter 扩展提供了容器化 API 来协助容器化过程。您还可以运行自己创建的图像,在其中创建或汇编应用程序工具集以执行所需的数据分析或计算。Amazon IoT Analytics允许您通过变量定义容器化应用程序的输入数据源和 Docker 容器输出数据的目的地。(自定义 Docker 容器输入/输出变量包含有关在自定义容器中使用变量的更多信息。)
-
将容器上传到 Amazon ECR 注册表。
-
创建数据存储库以接收和存储来自设备的消息(数据)(
iotanalytics: CreateDatastore
) -
创建发送消息的频道 (
iotanalytics: CreateChannel
)。 -
创建管道以将通道连接到数据存储 (
iotanalytics: CreatePipeline
)。 -
创建一个 IAM 角色,该角色授予将消息数据发送到Amazon IoT Analytics通道 (
iam: CreateRole.
) -
创建使用 SQL 查询将通道连接到消息数据源的 IoT 规则 (
iot: CreateTopicRule
领域topicRulePayload:actions:iotAnalytics
)。当设备通过 MQTT 发送带有相应主题的消息时,该消息将被路由到您的频道。或者,您可以使用iotanalytics: BatchPutMessage
将消息直接从能够使用的设备发送到频道Amazon软件开发工具开发工具Amazon CLI. -
创建一个由时间表触发创建的 SQL 数据集 (
iotanalytics: CreateDataset,
领域actions: queryAction:sqlQuery
)。您还可以指定要应用于消息数据的预筛选器,以帮助将消息限制为自上次执行操作以来到达的消息。(字段
actions:queryAction:filters:deltaTime:timeExpression
给出了用于确定消息时间的表达式。while fieldactions:queryAction:filters:deltaTime:offsetSeconds
指定消息到达时可能出现的延迟。)预过滤器以及触发时间表决定了您的增量窗口。每个新的 SQL 数据集都是使用自上次创建 SQL 数据集以来收到的消息创建的。(第一次创建 SQL 数据集怎么样? 根据计划和预过滤器估算上次创建数据集的时间。)
-
创建另一个由创建第一个数据集触发的数据集 (CreateDataset领域
trigger:dataset
)。对于此数据集,您可以指定容器操作(已归档)actions:containerAction
),它指向您在第一步中创建的 Docker 容器,并提供运行该容器所需的信息。在这里,您还可以指定:-
存储在您的账户中的 docker 容器的 ARN (
image
。) -
角色的 ARN,该角色用于授予访问所需资源所需的系统权限,以便运行容器操作 (
executionRoleArn
)。 -
执行容器操作的资源的配置 (
resourceConfiguration
。) -
用于执行容器操作的计算资源的类型 (
computeType
使用可能的值:ACU_1 [vCPU=4, memory=16GiB] or ACU_2 [vCPU=8, memory=32GiB]
)。 -
用于执行容器操作的资源实例可以使用的持久性存储的大小 (GB)
volumeSizeInGB
)。 -
在执行应用程序的上下文中使用的变量的值(基本上是传递给应用程序的参数)(
variables
)。在执行容器时,将替换这些变量。这使您能够使用创建数据集内容时提供的不同变量(参数)运行同一个容器。这些区域有: IotAnalytics Jupyter 扩展通过自动识别笔记本中的变量并将它们作为容器化过程的一部分提供来简化此过程。您可以选择已识别的变量或添加自己的自定义变量。在运行容器之前,系统会将每个变量的值替换为执行时的当前值。
-
其中一个变量是使用其最新内容作为应用程序输入的数据集的名称(这是您在上一步中创建的数据集的名称)(
datasetContentVersionValue:datasetName
)。
-
使用 SQL 查询和增量窗口生成数据集,使用容器生成应用程序,Amazon IoT Analytics创建按您指定的间隔在 delta 窗口中的数据上运行的预定生产数据集,生成所需的输出并发送通知。
您可以暂停生产数据集应用程序,并在选择时将其恢复。当您恢复生产数据集应用程序时,Amazon IoT Analytics,默认情况下,它会捕获自上次执行以来到达但尚未分析的所有数据。您还可以通过执行一系列连续运行来配置恢复生产数据集的方式(作业窗口长度)。或者,您可以通过仅捕获新到达且符合增量窗口指定大小的数据来恢复生产数据集应用程序。
在创建或定义由创建另一个数据集触发的数据集时,请注意以下限制:
-
SQL 数据集只能触发容器数据集。
-
一个 SQL 数据集最多可以触发 10 个容器数据集。
创建由 SQL 数据集触发的容器数据集时可能会返回以下错误:
-
“Triggering dataset can only be added on a container dataset”(只能在容器数据集中添加触发数据集)
-
“There can only be one triggering dataset”(只能存在一个触发数据集)
如果您尝试定义由两个不同的 SQL 数据集触发的容器数据集,就会出现此错误。
-
“触发数据集<dataset-name>无法由容器数据集触发”
如果您尝试定义另一个由另一个容器数据集触发的容器数据集,则会出现此错误。
-
“<N>数据集已经依赖于<dataset-name>数据集了。”
如果您尝试定义另一个由已经触发 10 个容器数据集的 SQL 数据集触发的容器数据集,则会出现此错误。
-
“Exactly one trigger type should be provided”(应确切提供一种触发器类型)
当您尝试定义由计划触发器和数据集触发器触发的数据集时,就会出现此错误。
自定义 Docker 容器输入/输出变量
本节演示您的自定义 Docker 映像运行的程序如何读取输入变量并上传其输出。
参数文件
输入变量以及要将输出上传到的目的地存储在一个 JSON 文件中,它位于执行 Docker 映像的实例上的 /opt/ml/input/data/iotanalytics/params
中。以下是该文件内容的示例。
{ "Context": { "OutputUris": { "html": "s3://aws-iot-analytics-dataset-xxxxxxx/notebook/results/iotanalytics-xxxxxxx/output.html", "ipynb": "s3://aws-iot-analytics-dataset-xxxxxxx/notebook/results/iotanalytics-xxxxxxx/output.ipynb" } }, "Variables": { "source_dataset_name": "mydataset", "source_dataset_version_id": "xxxx", "example_var": "hello world!", "custom_output": "s3://aws-iot-analytics/dataset-xxxxxxx/notebook/results/iotanalytics-xxxxxxx/output.txt" } }
除了数据集的名称和版本 ID 外,Variables
部分还包含在 iotanalytics:CreateDataset
调用中指定的变量 - 在此示例中,为变量 example_var
给定值 hello world!
。在 custom_output
变量中还提供了自定义输出 URI。该 OutputUris
字段包含容器可将其输出上传到的默认位置 - 在本示例中,同时提供了 ipynb 和 html 输出的默认输出 URI。
输入变量
通过 Docker 映像启动的程序可从 params
文件中读取变量。以下是一个示例程序,它可以打开params
文件,对其进行解析,然后输出其值example_var
变量。
import json with open("/opt/ml/input/data/iotanalytics/params") as param_file: params = json.loads(param_file.read()) example_var = params["Variables"]["example_var"] print(example_var)
上传输出
您的 Docker 镜像启动的程序也可能将其输出存储在 Amazon S3 位置。必须使用” 加载输出bucket-owner-full-control“访问控制列表. 访问列表授予Amazon IoT Analytics对上传输出的服务控制。在此示例中,我们扩展了前一个示例以上传的内容example_var
到由定义的 Amazon Storage S3 位置custom_output
在里面params
文件。
import boto3 import json from urllib.parse import urlparse ACCESS_CONTROL_LIST = "bucket-owner-full-control" with open("/opt/ml/input/data/iotanalytics/params") as param_file: params = json.loads(param_file.read()) example_var = params["Variables"]["example_var"] outputUri = params["Variables"]["custom_output"] # break the S3 path into a bucket and key bucket = urlparse(outputUri).netloc key = urlparse(outputUri).path.lstrip("/") s3_client = boto3.client("s3") s3_client.put_object(Bucket=bucket, Key=key, Body=example_var, ACL=ACCESS_CONTROL_LIST)