使用Docker容器 - AWS IoT Analytics
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

使用Docker容器

此部分包括有关如何构建您自己的Docker容器的信息。如果重复使用第三方构建的 Docker 容器,则存在安全风险:这些容器可能使用您的用户权限执行任意代码。在使用之前,请确保您信任任何第三方容器的作者。

使用以下步骤可设置针对自上次执行分析以来到达的数据的定期数据分析:

  1. 创建一个 Docker 容器,其中包含您的数据应用程序以及任何必要的库或其他依赖项。

    IotAnalytics Jupyter 扩展提供了容器化 API 来协助容器化过程。您还可以运行自己的创建图像,在其中创建或组装应用程序工具集以执行所需的数据分析或计算。 AWS IoT Analytics 允许您通过变量定义容器化应用程序的输入数据源和Docker容器的输出数据的目标。(自定义Docker容器输入/输出变量 包含有关将变量用于自定义容器的更多信息。)

  2. 将容器上传到 Amazon ECR 注册设置。

  3. 创建数据仓库以从设备接收和存储消息(数据)(iotanalytics: CreateDatastore)

  4. 创建发送消息的信道(iotanalytics: CreateChannel)。

  5. 创建管道以将通道连接到数据仓库(iotanalytics: CreatePipeline)。

  6. 创建 IAM 授予将消息数据发送到 AWS IoT Analytics 通道(iam: CreateRole.)

  7. 创建使用SQL查询将通道连接到消息数据源的物联网规则(iot: CreateTopicRule 字段 topicRulePayload:actions:iotAnalytics)。当设备发送包含相应主题签证MQTT的消息时,它将发送到您的信道。或者,您可以使用 iotanalytics: BatchPutMessage 将消息从能够使用的设备直接发送到信道中 AWS SDK或 AWS CLI.

  8. 创建一个通过时间调度触发创建的SQL数据集(iotanalytics: CreateDataset, 字段 actions: queryAction:sqlQuery)。

    您还可以指定要应用于消息数据的预筛选器,以帮助将消息限制为自上次执行操作以来到达的消息。(字段 actions:queryAction:filters:deltaTime:timeExpression 给出一个表达式,用于确定消息的时间。同时字段 actions:queryAction:filters:deltaTime:offsetSeconds 指定消息到达时的可能延迟。)

    预过滤器和触发计划一起确定您的增量窗口。每个新的SQL数据集都是使用自上次创建SQL数据集以来收到的消息创建的。(首次创建SQL数据集时怎么办?根据调度和预筛选估算数据集创建时间。)

  9. 创建另一个数据集,由创建第一个( 创建数据集 字段 trigger:dataset)。对于此数据集,您可以指定容器操作(提交 actions:containerAction)指向并在第一步中提供了运行您创建的Docker容器所需的信息。在这里,您还可以指定:

    • 存储在您帐户中的码头集装箱的ARN(image。)

    • 角色的 ARN,该角色用于授予访问所需资源所需的系统权限,以便运行容器操作 (executionRoleArn)。

    • 执行容器操作(resourceConfiguration。)

    • 用于执行容器操作的计算资源的类型(computeType 具有可能的值: ACU_1 [vCPU=4, memory=16GiB] or ACU_2 [vCPU=8, memory=32GiB])。

    • 用于执行容器操作的资源实例可用的持久存储的大小(GB)(volumeSizeInGB)。

    • 在应用程序执行上下文中使用的变量的值(基本上,参数传递到应用程序)(variables)。

      在执行容器时,将替换这些变量。这使您可以使用在创建数据集内容时提供的不同变量(参数)运行相同的容器。在容器化过程中,IotAnalytics Jupyter 扩展通过在笔记本中自动识别变量并将其标记为可用来简化此过程。您可以选择已识别的变量或添加自己的自定义变量。在运行容器之前,系统会将每个变量的值替换为执行时的当前值。

    • 其中一个变量是最新内容用作应用程序输入的数据集的名称(这是您在上一步中创建的数据集的名称)(datasetContentVersionValue:datasetName)。

通过SQL查询和增量窗口生成数据集,以及应用程序容器, AWS IoT Analytics 创建计划生产数据集,该数据集按您从增量窗口指定的数据时间间隔运行,从而生成所需的输出和发送通知。

您可以暂停生产数据集应用程序,并在选择执行此操作时恢复。当您恢复生产数据集应用程序时, AWS IoT Analytics默认情况下,会捕获自上次执行以来到达的所有数据,但尚未分析。您还可以通过执行一系列连续运行来配置恢复生产数据集作业窗口长度的方式。或者,您可以通过仅捕获符合增量窗口指定大小的新到达数据来恢复生产数据集应用程序。

请注意,在创建或定义由创建另一个数据集触发的数据集时,存在以下限制:

  • 只有容器数据集才能由SQL数据集触发。

  • SQL数据集最多只能触发10个容器数据集。

创建由SQL数据集触发的容器数据集时,可能会返回以下错误:

  • “Triggering dataset can only be added on a container dataset”(只能在容器数据集中添加触发数据集)

  • “There can only be one triggering dataset”(只能存在一个触发数据集)

    如果您尝试定义由两个不同的SQL数据集触发的容器数据集,则会出现此错误。

  • "容器数据集无法触发触发触发数据集<dataset-name>"

    如果您尝试定义由另一个容器数据集触发的另一个容器数据集,则会出现此错误。

  • "<N>数据集已依赖于<dataset-name>数据集。"

    如果您尝试定义另一个由已触发10个容器数据集的SQL数据集触发的容器数据集,则会出现此错误。

  • “Exactly one trigger type should be provided”(应确切提供一种触发器类型)

    发生此错误时,您尝试定义一个数据集,该数据集由调度触发器和数据集触发器触发。

自定义Docker容器输入/输出变量

本节演示您的自定义 Docker 映像运行的程序如何读取输入变量并上传其输出。

参数文件

输入变量以及要将输出上传到的目的地存储在一个 JSON 文件中,它位于执行 Docker 映像的实例上的 /opt/ml/input/data/iotanalytics/params 中。以下是该文件的内容示例。

{ "Context": { "OutputUris": { "html": "s3://aws-iot-analytics-dataset-xxxxxxx/notebook/results/iotanalytics-xxxxxxx/output.html", "ipynb": "s3://aws-iot-analytics-dataset-xxxxxxx/notebook/results/iotanalytics-xxxxxxx/output.ipynb" } }, "Variables": { "source_dataset_name": "mydataset", "source_dataset_version_id": "xxxx", "example_var": "hello world!", "custom_output": "s3://aws-iot-analytics/dataset-xxxxxxx/notebook/results/iotanalytics-xxxxxxx/output.txt" } }

除了数据集的名称和版本ID之外, Variables 部分包含中指定的变量 iotanalytics:CreateDataset invocation--inthisexample,avariable example_var 被赋予了 hello world!。还提供自定义输出URI custom_output 变量。该 OutputUris 字段包含容器可将其输出上传到的默认位置 - 在本示例中,同时提供了 ipynb 和 html 输出的默认输出 URI。

输入变量

通过 Docker 映像启动的程序可从 params 文件中读取变量。以下是一个示例程序,可打开 params 文件,解析文件,并打印 example_var 变量。

import json with open("/opt/ml/input/data/iotanalytics/params") as param_file: params = json.loads(param_file.read()) example_var = params["Variables"]["example_var"] print(example_var)

正在上传输出

Docker镜像启动的程序也可能将其输出存储在 Amazon S3 位置。必须使用“bucket-owner-full-control”访问控制列表加载输出。访问列表授予 AWS IoT Analytics 对已上传输出的服务控制。在本示例中,我们扩展了前一项以上传 example_var 到 Amazon S3 位置由 custom_outputparams 文件。

import boto3 import json from urllib.parse import urlparse ACCESS_CONTROL_LIST = "bucket-owner-full-control" with open("/opt/ml/input/data/iotanalytics/params") as param_file: params = json.loads(param_file.read()) example_var = params["Variables"]["example_var"] outputUri = params["Variables"]["custom_output"] # break the S3 path into a bucket and key bucket = urlparse(outputUri).netloc key = urlparse(outputUri).path.lstrip("/") s3_client = boto3.client("s3") s3_client.put_object(Bucket=bucket, Key=key, Body=example_var, ACL=ACCESS_CONTROL_LIST)