导出支持的 AWS 云目标的 配置 - AWS IoT Greengrass
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

导出支持的 AWS 云目标的 配置

用户定义的 Greengrass 组件使用 Stream Manager 开发工具包StreamManagerClient中的 与流管理器进行交互。当组件创建或更新时,它会传递表示流属性MessageStreamDefinition的对象,包括导出定义。ExportDefinition 对象包含为流定义的导出配置。流管理器使用这些导出配置来确定导出流的位置和方式。


      ExportDefinition 属性类型的对象模型图。

您可以在流上定义零个或多个导出配置,包括单个目标类型的多个导出配置。例如,您可以将一个流导出到两个 AWS AWS IoT Analytics 通道和一个 Kinesis 数据流。

注意

StreamManagerClient 还提供了一个目标目标目标,您可以使用它将流导出到 HTTP 服务器。此目标仅用于测试目的。它不稳定,也支持在生产环境中使用。

您愿意维护这些 AWS 云资源。

AWS IoT Analytics 通道

流管理器支持自动导出到 AWS AWS IoT Analytics 。AWS IoT Analytics您可以对数据进行高级分析,以帮助做出业务决策并改进机器学习模型。有关更多信息,请参阅 AWS IoT Analytics 用户指南》中的“什么是 AWS IoT Analytics?”

在 Stream Manager 开发工具包中,您的 Greengrass 组件使用 IoTAnalyticsConfig 定义此目标类型的导出配置。有关更多信息,请参阅目标语言的开发工具包参考:

Requirements

此导出目标具有以下要求:

  • AWS AWS IoT Analytics 中的目标通道必须与 Greengrass 核心设备位于同一 AWS 账户和 AWS 区域中。

  • 授权核心设备与 AWS 服务交互 必须允许对目标通道的 iotanalytics:BatchPutMessage 权限。例如:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "iotanalytics:BatchPutMessage" ], "Resource": [ "arn:aws:iotanalytics:region:account-id:channel/channel_1_name", "arn:aws:iotanalytics:region:account-id:channel/channel_2_name" ] } ] }

    您可以授予对 资源的精细或条件访问权限,例如,通过使用通配符*命名方案。有关更多信息,请参阅 IAM 用户指南 中的添加和删除 IAM 策略

导出到 AWS AWS IoT Analytics

要创建导出到 AWS AWS IoT Analytics 的流,您的 Greengrass 组件创建一个具有包含一个或多个对象的导出定义的流IoTAnalyticsConfig。此对象定义导出设置,例如目标通道、批处理大小、批处理间隔和优先级。

当 Greengrass 组件从设备接收数据时,它们会将包含 Blob 数据的消息附加消息附加到目标流。

然后,流管理器根据流的导出配置中定义的批处理设置和优先级来导出数据。

Amazon Kinesis数据流

流管理器支持自动导出到 Amazon Kinesis Data Streams。Kinesis Data Streams 通常用于聚合大容量数据并将其加载到数据仓库或 MapReduce 集群中。有关更多信息,请参阅 Amazon Kinesis 开发人员指南》中的“什么是 Amazon Kinesis Data Streams?”。

在 Stream Manager 开发工具包中,您的 Greengrass 组件使用 KinesisConfig 定义此目标类型的导出配置。有关更多信息,请参阅目标语言的开发工具包参考:

Requirements

此导出目标具有以下要求:

  • Kinesis Data Streams 中的目标流必须与 Greengrass 核心设备位于同一 AWS 账户和 AWS 区域中。

  • 授权核心设备与 AWS 服务交互 必须允许对目标数据流的 kinesis:PutRecords 权限。例如:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": [ "arn:aws:kinesis:region:account-id:stream/stream_1_name", "arn:aws:kinesis:region:account-id:stream/stream_2_name" ] } ] }

    您可以授予对 资源的精细或条件访问权限,例如,通过使用通配符*命名方案。有关更多信息,请参阅 IAM 用户指南 中的添加和删除 IAM 策略

导出到 Kinesis 数据流

要创建导出到 Kinesis Data Streams 的流,您的 Greengrass 组件将创建一个具有包含一个或多个对象的导出定义的流KinesisConfig。此对象定义导出设置,例如目标数据流、批处理大小、批处理间隔和优先级。

当 Greengrass 组件从设备接收数据时,它们会将包含 Blob 数据的消息附加消息附加到目标流。

然后,流管理器根据流的导出配置中定义的批处理设置和优先级来导出数据。

AWS IoT SiteWise 资产属性

流管理器支持自动导出到 AWS IoT IoT SiteWise。AWS IoT SiteWise 允许您从工业设备大规模收集、组织和分析数据。有关更多信息,请参阅 AWS IoT IoT SiteWise 用户指南》中的“什么是 AWS IoT SiteWise?”

在 Stream Manager 开发工具包中,您的 Greengrass 组件使用 IoTSiteWiseConfig 定义此目标类型的导出配置。有关更多信息,请参阅目标语言的开发工具包参考:

注意

AWS 还提供了 AWS AWS IoT SiteWise 连接器,这是一种可与 OPC-UA 源结合使用的预构建解决方案。有关更多信息,请参阅AWS IoT IoT Greengrass 开发人员指南》中的 AWS IoT SiteWise 连接器

Requirements

此导出目标具有以下要求:

  • AWS AWS IoT SiteWise 中的目标资产属性必须与 Greengrass 核心设备位于同一 AWS 账户和 AWS 区域中。

    注意

    有关 AWS AWS IoT SiteWise 支持的 AWS 区域的列表,请参阅 AWS 一般参考 中的 AWS AWS IoT SiteWise 终端节点和配额。

  • 授权核心设备与 AWS 服务交互 必须允许将资产属性设为目标iotsitewise:BatchPutAssetPropertyValue的权限。以下示例策略使用 iotsitewise:assetHierarchyPath 条件键授予对目标根资产及其子项的访问权限。您可以从策略中删除 ,以允许访问您的所有 AWS IoT Condition IoT SiteWise 资产或指定单个资产的 ARNs。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "iotsitewise:BatchPutAssetPropertyValue", "Resource": "*", "Condition": { "StringLike": { "iotsitewise:assetHierarchyPath": [ "/root node asset ID", "/root node asset ID/*" ] } } } ] }

    您可以授予对 资源的精细或条件访问权限,例如,通过使用通配符*命名方案。有关更多信息,请参阅 IAM 用户指南 中的添加和删除 IAM 策略

    有关重要的安全信息,请参阅AWS IoT IoT SiteWise 用户指南》中的 BatchPutAssetPropertyValue 授权

导出到 AWS IoT IoT SiteWise

要创建导出到 AWS IoT IoT SiteWise 的流,您的 Greengrass 组件创建一个包含一个或多个对象的导出定义的流IoTSiteWiseConfig。此对象定义导出设置,例如批处理大小、批处理间隔和优先级。

当 Greengrass 组件从设备接收资产属性数据时,它们会将包含该数据的消息附加到目标流。消息是 JSON 序列化PutAssetPropertyValueEntry对象,其中包含一个或多个资产属性的属性值。有关更多信息,请参阅为 AWS IoT IoT SiteWise 导出目标附加消息

注意

在将数据发送到 AWS IoT IoT SiteWise 时,您的数据必须满足 BatchPutAssetPropertyValue 操作的要求。有关更多信息,请参阅AWS IoT IoT SiteWise API 参考》中的BatchPutAssetPropertyValue

然后,流管理器根据流的导出配置中定义的批处理设置和优先级来导出数据。

您可以调整流管理器设置和 Greengrass 组件逻辑以设计导出策略。例如:

  • 对于近乎实时的导出,请设置较低的批处理大小和间隔设置,并在收到数据时将数据附加到流中。

  • 为了优化批处理、缓解带宽限制或最大限度地降低成本,Greengrass 组件可以池化为单个资产属性接收的时间戳-质量-值 (TQV) 数据点,然后再将数据附加到流。一种策略是在一个消息中对多达 10 个不同的属性-资产组合或属性别名的条目进行批处理,而不是为同一属性发送多个条目。这有助于流管理器保持在 AWS IoT IoT SiteWise 配额内。

Amazon S3对象

流管理器支持自动导出到 Amazon S3 。您可以使用 Amazon S3 存储和检索大量数据。有关更多信息,请参阅《Amazon Simple Storage Service 开发人员指南》中的“什么是 Amazon Amazon S3?”

在 Stream Manager 开发工具包中,您的 Greengrass 组件使用 S3ExportTaskExecutorConfig 定义此目标类型的导出配置。有关更多信息,请参阅目标语言的开发工具包参考:

Requirements

此导出目标具有以下要求:

  • 目标 Amazon S3 存储桶必须与 Greengrass 核心设备位于同一 AWS 账户中。

  • 如果在 Greengrass 容器模式下运行的 Lambda 函数将输入文件写入到输入文件目录,则必须在容器中将目录作为具有写入权限的卷挂载。这可确保文件写入到根文件系统并在容器外部可见。

  • 授权核心设备与 AWS 服务交互 必须允许对目标存储桶的以下权限。例如:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts" ], "Resource": [ "arn:aws:s3:::bucket-1-name/*", "arn:aws:s3:::bucket-2-name/*" ] } ] }

    您可以授予对 资源的精细或条件访问权限,例如,通过使用通配符*命名方案。有关更多信息,请参阅 IAM 用户指南 中的添加和删除 IAM 策略

导出到 Amazon S3

要创建导出到 Amazon S3 的流,您的 Greengrass 组件将使用 S3ExportTaskExecutorConfig 对象来配置导出策略。该策略定义导出设置,例如分段上传阈值和优先级。对于 Amazon S3 导出,流管理器上传它从核心设备上的本地文件读取的数据。要启动上传,您的 Greengrass 组件会将导出任务附加到目标流。导出任务包含有关输入文件和目标 Amazon S3 对象的信息。流管理器按任务附加到流的顺序运行任务。

注意

目标存储桶必须已存在于您的 AWS 账户中。如果指定键的对象不存在,流管理器将为您创建该对象。

流管理器使用分段上传阈值属性、最小段大小设置和输入文件的大小来确定如何上传数据。分段上传阈值必须大于或等于最小分段大小。如果要并行上传数据,您可以创建多个流。

指定目标 Amazon S3 对象的键可以在占位符中包含有效的 Java DateTimeFormatter!{timestamp:value} 字符串。您可以使用这些时间戳占位符根据上传输入文件数据的时间在 Amazon S3 中对数据进行分区。例如,以下键名解析为值,例如 my-key/2020/12/31/data.txt

my-key/!{timestamp:YYYY}/!{timestamp:MM}/!{timestamp:dd}/data.txt
注意

如果要监控流的导出状态,请先创建一个状态流,然后将导出流配置为使用它。有关更多信息,请参阅监控导出任务

管理输入数据

您可以编写 IoT 应用程序用于管理输入数据生命周期的代码。以下示例工作流程显示如何使用 Greengrass 组件来管理此数据。

  1. 本地进程从设备或外围设备接收数据,然后将数据写入核心设备上的目录中的文件。这些是流管理器的输入文件。

  2. Greengrass 组件扫描 目录,并在创建新文件时将导出任务Amazon S3 导出目标附加到目标流。该任务是一个 JSON 序列化S3ExportTaskDefinition对象,用于指定输入文件的 URL、目标 Amazon S3 存储桶和密钥以及可选的用户元数据。

  3. 流管理器读取输入文件,并按附加任务的顺序将数据导出到 Amazon S3 。目标存储桶必须已存在于您的 AWS 账户中。如果指定键的对象不存在,流管理器将为您创建该对象。

  4. Greengrass 组件从状态流中读取消息以监控导出状态。导出任务完成后,Greengrass 组件可以删除相应的输入文件。有关更多信息,请参阅监控导出任务

监控导出任务

您可以编写 IoT 应用程序用来监控 Amazon S3 导出状态的代码。您的 Greengrass 组件必须创建状态流,然后将导出流配置为将状态更新写入到状态流。单个状态流可以从导出到 Amazon S3 的多个流接收状态更新。

首先,创建要用作状态流的流。您可以为流配置大小和保留策略,以控制状态消息的生命周期。例如:

  • Persistence 如果您不想存储状态消息,请Memory设置为 。

  • StrategyOnFull 设置为 OverwriteOldestData ,以便新状态消息不会丢失。

然后,创建或更新导出流以使用状态流。具体来说,设置流的S3ExportTaskExecutorConfig导出配置的状态配置属性。此设置指示流管理器将有关导出任务的状态消息写入状态流。在 StatusConfig 对象中,指定状态流的名称和详细程度级别。以下支持的值的范围从最不详细 (ERROR) 到最详细 (TRACE)。默认值为 INFO

  • ERROR

  • WARN

  • INFO

  • DEBUG

  • TRACE

以下示例工作流程显示了 Greengrass 组件如何使用状态流监控导出状态。

  1. 如上一个工作流程中所述,Greengrass 组件将导出任务Amazon S3 导出目标附加到 流,该流配置为将有关将任务导出到 状态流的状态消息写入 。附加操作返回表示任务 ID 的序列号。

  2. Greengrass 组件按顺序从状态流中读取消息,然后根据流名称和任务 ID 或根据消息上下文中的导出任务属性筛选消息。例如,Greengrass 组件可以按导出任务的输入文件 URL 进行筛选,该文件 URL 由消息上下文中的 S3ExportTaskDefinition 对象表示。

    以下状态代码指示导出任务已进入已完成状态:

    • Success。上传已成功完成。

    • Failure。流管理器遇到错误,例如,指定的存储桶不存在。解决该问题后,您可以再次将导出任务附加到流。

    • Canceled。由于已删除流或导出定义或任务的生存时间 (TTL) 期间已到期,任务已停止。

    注意

    该任务可能还具有 InProgress 或 状态Warning。当事件返回不影响任务执行的错误时,流管理器会发出警告。例如,未清理部分上传会返回 警告。

  3. 导出任务完成后,Greengrass 组件可以删除相应的输入文件。

以下示例显示了 Greengrass 组件如何读取和处理状态消息。

Python
import time from stream_manager import ( ReadMessagesOptions, Status, StatusConfig, StatusLevel, StatusMessage, StreamManagerClient, ) from stream_manager.util import Util client = StreamManagerClient() try: # Read the statuses from the export status stream is_file_uploaded_to_s3 = False while not is_file_uploaded_to_s3: try: messages_list = client.read_messages( "StatusStreamName", ReadMessagesOptions(min_message_count=1, read_timeout_millis=1000) ) for message in messages_list: # Deserialize the status message first. status_message = Util.deserialize_json_bytes_to_obj(message.payload, StatusMessage) # Check the status of the status message. If the status is "Success", # the file was successfully uploaded to S3. # If the status was either "Failure" or "Cancelled", the server was unable to upload the file to S3. # We will print the message for why the upload to S3 failed from the status message. # If the status was "InProgress", the status indicates that the server has started uploading # the S3 task. if status_message.status == Status.Success: logger.info("Successfully uploaded file at path " + file_url + " to S3.") is_file_uploaded_to_s3 = True elif status_message.status == Status.Failure or status_message.status == Status.Canceled: logger.info( "Unable to upload file at path " + file_url + " to S3. Message: " + status_message.message ) is_file_uploaded_to_s3 = True time.sleep(5) except StreamManagerException: logger.exception("Exception while running") except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 开发工具包参考:read_messages | StatusMessage

Java
import com.amazonaws.greengrass.streammanager.client.StreamManagerClient; import com.amazonaws.greengrass.streammanager.client.StreamManagerClientFactory; import com.amazonaws.greengrass.streammanager.client.utils.ValidateAndSerialize; import com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions; import com.amazonaws.greengrass.streammanager.model.Status; import com.amazonaws.greengrass.streammanager.model.StatusConfig; import com.amazonaws.greengrass.streammanager.model.StatusLevel; import com.amazonaws.greengrass.streammanager.model.StatusMessage; try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) { try { boolean isS3UploadComplete = false; while (!isS3UploadComplete) { try { // Read the statuses from the export status stream List<Message> messages = client.readMessages("StatusStreamName", new ReadMessagesOptions().withMinMessageCount(1L).withReadTimeoutMillis(1000L)); for (Message message : messages) { // Deserialize the status message first. StatusMessage statusMessage = ValidateAndSerialize.deserializeJsonBytesToObj(message.getPayload(), StatusMessage.class); // Check the status of the status message. If the status is "Success", the file was successfully uploaded to S3. // If the status was either "Failure" or "Canceled", the server was unable to upload the file to S3. // We will print the message for why the upload to S3 failed from the status message. // If the status was "InProgress", the status indicates that the server has started uploading the S3 task. if (Status.Success.equals(statusMessage.getStatus())) { System.out.println("Successfully uploaded file at path " + FILE_URL + " to S3."); isS3UploadComplete = true; } else if (Status.Failure.equals(statusMessage.getStatus()) || Status.Canceled.equals(statusMessage.getStatus())) { System.out.println(String.format("Unable to upload file at path %s to S3. Message %s", statusMessage.getStatusContext().getS3ExportTaskDefinition().getInputUrl(), statusMessage.getMessage())); sS3UploadComplete = true; } } } catch (StreamManagerException ignored) { } finally { // Sleep for sometime for the S3 upload task to complete before trying to read the status message. Thread.sleep(5000); } } catch (e) { // Properly handle errors. } } catch (StreamManagerException e) { // Properly handle exception. }

Java 开发工具包参考readMessages | StatusMessage

Node.js
const { StreamManagerClient, ReadMessagesOptions, Status, StatusConfig, StatusLevel, StatusMessage, util, } = require(*'aws-greengrass-stream-manager-sdk'*); const client = new StreamManagerClient(); client.onConnected(async () => { try { let isS3UploadComplete = false; while (!isS3UploadComplete) { try { // Read the statuses from the export status stream const messages = await c.readMessages("StatusStreamName", new ReadMessagesOptions() .withMinMessageCount(1) .withReadTimeoutMillis(1000)); messages.forEach((message) => { // Deserialize the status message first. const statusMessage = util.deserializeJsonBytesToObj(message.payload, StatusMessage); // Check the status of the status message. If the status is 'Success', the file was successfully uploaded to S3. // If the status was either 'Failure' or 'Cancelled', the server was unable to upload the file to S3. // We will print the message for why the upload to S3 failed from the status message. // If the status was "InProgress", the status indicates that the server has started uploading the S3 task. if (statusMessage.status === Status.Success) { console.log(`Successfully uploaded file at path ${FILE_URL} to S3.`); isS3UploadComplete = true; } else if (statusMessage.status === Status.Failure || statusMessage.status === Status.Canceled) { console.log(`Unable to upload file at path ${FILE_URL} to S3. Message: ${statusMessage.message}`); isS3UploadComplete = true; } }); // Sleep for sometime for the S3 upload task to complete before trying to read the status message. await new Promise((r) => setTimeout(r, 5000)); } catch (e) { // Ignored } } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js 开发工具包参考readMessages | StatusMessage