导出支持的配置 AWS 云目的地 - AWS IoT Greengrass
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

导出支持的配置 AWS 云目的地

用户定义的 Lambda 函数使用 StreamManagerClient 在 AWS IoT Greengrass Core 开发工具包 与流管理器交互。当 Lambda 功能 创建一个流更新流,它通过了 MessageStreamDefinition 对象,表示流属性,包括导出定义。的 ExportDefinition 对象包含为流定义的导出配置。StreamManager使用这些导出配置来确定从何处以及如何导出流。


            对象模型图 ExportDefinition 属性类型。

您可以在流上定义零个或多个导出配置,包括单个目标类型的多个导出配置。例如,您可以将流导出到 AWS IoT Analytics 渠道和一个 Kinesis 数据流。

注意

StreamManagerClient 还提供了一个目标目标目标,可用于将流导出到 HTTP 服务器。此目标仅用于测试目的。它不稳定,也不支持在生产环境中使用。

您有责任维护这些 AWS 云资源。

AWS IoT Analytics 通道

StreamManager支持自动导出至 AWS IoT Analytics. 可让您对数据执行高级分析,以帮助制定业务决策并改进机器学习模型。AWS IoT Analytics有关更多信息,请参阅 用户指南AWS IoT Analytics 中的什么是 ?AWS IoT Analytics。

在 AWS IoT Greengrass Core 开发工具包,您的 Lambda 函数使用 IoTAnalyticsConfig 以定义此目的地类型的导出配置。有关详细信息,请参阅您目标语言的SDK参考:

Requirements

此出口目的地有以下要求:

  • 中的目标渠道 AWS IoT Analytics 必须与 AWS 账户和 AWS Region(地区)为Greengrass组。

  • Greengrass 组角色 必须允许 iotanalytics:BatchPutMessage 目标信道权限。例如:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "iotanalytics:BatchPutMessage" ], "Resource": [ "arn:aws-cn:iotanalytics:区域:account-id:channel/channel_1_name", "arn:aws-cn:iotanalytics:区域:account-id:channel/channel_2_name" ] } ] }

    您可以授予对资源的粒度或条件性访问权限,例如,使用通配符 * 命名方案。有关更多信息,请参阅 添加和删除 IAM 政策IAM 用户指南.

导出到 AWS IoT Analytics

要创建可导出至的流 AWS IoT Analytics,您的 Lambda 功能 创建流 具有包含一个或多个 IoTAnalyticsConfig 对象。此对象定义导出设置,例如目标通道、批处理大小、批处理间隔和优先级。

当您的 Lambda 功能从设备接收数据, 附加消息 包含目标流的大量数据。

然后,流管理器根据流的导出配置中定义的批处理设置和优先级导出数据。

 

Amazon Kinesis 数据流

StreamManager支持自动导出至 Amazon Kinesis Data Streams. 常用于聚合大容量数据并将其加载到数据仓库或 map-reduce 集群。Kinesis Data Streams有关更多信息,请参阅 Amazon Kinesis Data Streams 中的什么是 ?Amazon Kinesis Developer Guide。

在 AWS IoT Greengrass Core 开发工具包,您的 Lambda 函数使用 KinesisConfig 以定义此目的地类型的导出配置。有关详细信息,请参阅您目标语言的SDK参考:

Requirements

此出口目的地有以下要求:

  • 目标流 Kinesis Data Streams 必须与 AWS 账户和 AWS Region(地区)为Greengrass组。

  • Greengrass 组角色 必须允许 kinesis:PutRecords 目标数据流的权限。例如:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": [ "arn:aws-cn:kinesis:区域:account-id:stream/stream_1_name", "arn:aws-cn:kinesis:区域:account-id:stream/stream_2_name" ] } ] }

    您可以授予对资源的粒度或条件性访问权限,例如,使用通配符 * 命名方案。有关更多信息,请参阅 添加和删除 IAM 政策IAM 用户指南.

导出到 Kinesis Data Streams

要创建可导出至的流 Kinesis Data Streams,您的 Lambda 功能 创建流 具有包含一个或多个 KinesisConfig 对象。此对象定义导出设置,例如目标数据流、批处理大小、批处理间隔和优先级。

当您的 Lambda 功能从设备接收数据, 附加消息 包含目标流的大量数据。

然后,流管理器根据流的导出配置中定义的批处理设置和优先级导出数据。

 

AWS IoT SiteWise 资产属性

StreamManager支持自动导出至 AWS IoT SiteWise. 可让您从工业设备大规模收集、组织和分析数据。AWS IoT SiteWise有关更多信息,请参阅 用户指南AWS IoT SiteWise 中的什么是 ?AWS IoT SiteWise。

在 AWS IoT Greengrass Core 开发工具包,您的 Lambda 函数使用 IoTSiteWiseConfig 以定义此目的地类型的导出配置。有关详细信息,请参阅您目标语言的SDK参考:

注意

AWS 还提供 IoT SiteWise 连接器,这是一种预置解决方案,可与OPC-UA源配合使用。

Requirements

此出口目的地有以下要求:

  • 中的目标资产属性 AWS IoT SiteWise 必须与 AWS 账户和 AWS Region(地区)为Greengrass组。

    注意

    对于 AWS 一些地区 AWS IoT SiteWise 支持,请参阅 AWS IoT SiteWise 端点和配额AWS 一般参考.

  • Greengrass 组角色 必须允许 iotsitewise:BatchPutAssetPropertyValue 目标资产属性的权限。以下示例策略使用 iotsitewise:assetHierarchyPath 条件密钥,授予对目标根资产及其子项的访问权限。您可以移除 Condition 政策允许访问您所有 AWS IoT SiteWise 资产或指定 ARNs 个人资产。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "iotsitewise:BatchPutAssetPropertyValue", "Resource": "*", "Condition": { "StringLike": { "iotsitewise:assetHierarchyPath": [ "/root node asset ID", "/root node asset ID/*" ] } } } ] }

    您可以授予对资源的粒度或条件性访问权限,例如,使用通配符 * 命名方案。有关更多信息,请参阅 添加和删除 IAM 政策IAM 用户指南.

    有关重要的安全信息,请参阅 BatchPutAssetPropertyValue 授权AWS IoT SiteWise 用户指南.

导出到 AWS IoT SiteWise

要创建可导出至的流 AWS IoT SiteWise,您的 Lambda 功能 创建流 具有包含一个或多个 IoTSiteWiseConfig 对象。此对象定义导出设置,例如批处理大小、批处理间隔和优先级。

当您的 Lambda 函数从装置接收资产属性数据,它们将包含数据的消息附加到目标流。消息采用JSON序列化 PutAssetPropertyValueEntry 包含一个或多个资产属性的属性值的对象。有关更多信息,请参阅 附加消息 为 AWS IoT SiteWise 导出目的地。

注意

在将数据发送到 AWS IoT SiteWise 时,数据必须满足 BatchPutAssetPropertyValue 操作的要求。有关更多信息,请参阅 AWS IoT SiteWise API 参考 中的 BatchPutAssetPropertyValue

然后,流管理器根据流的导出配置中定义的批处理设置和优先级导出数据。

 

您可以调整流管理器设置并 Lambda 函数逻辑来设计您的导出策略。例如:

  • 对于近实时导出,设置低批处理大小和间隔设置,并在收到时将数据附加到流中。

  • 要优化批处理、减轻带宽限制或最大限度降低成本, Lambda 函数可以在将数据附加到流之前,对为单个资产属性接收的时间戳-质量值(TQV)数据点进行池化。一种策略是在一个消息中批量输入最多10个不同的属性-资产组合或属性别名,而不是为同一属性发送多个条目。这有助于流管理器保持在 AWS IoT SiteWise 配额.

 

Amazon S3 对象

StreamManager支持自动导出至 Amazon S3.您可以使用 Amazon S3 存储和检索大量数据。有关更多信息,请参阅 开发人员指南Amazon S3 中的什么是 ?Amazon Simple Storage Service。

在 AWS IoT Greengrass Core 开发工具包,您的 Lambda 函数使用 S3ExportTaskExecutorConfig 以定义此目的地类型的导出配置。有关详细信息,请参阅您目标语言的SDK参考:

Requirements

此出口目的地有以下要求:

  • 目标 Amazon S3 桶必须 AWS 帐户为Greengrass组。

  • 如果 默认容器化 对于Greengrass小组 绿草容器,您必须设置 迈瑞经理 参数,使用下面的输入文件目录 /tmp 或不在根文件系统上。

  • 如果 Lambda 函数在 绿草容器 模式将输入文件写入输入文件目录,您必须为目录创建本地卷资源,并将目录安装到具有写入权限的容器。这可确保将文件写入根文件系统,并在容器之外可见。有关更多信息,请参阅使用 Lambda 函数和连接器访问本地资源

  • Greengrass 组角色 必须允许对目标存储桶具有以下权限。例如:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:UploadPart", "s3:InitiateMultipartUpload", "s3:CompleteMultipartUpload", "s3:AbortMultipartUpload", "s3:ListMultipartUploads" ], "Resource": [ "arn:aws-cn:s3:::bucket-1-name/*", "arn:aws-cn:s3:::bucket-2-name/*" ] } ] }

    您可以授予对资源的粒度或条件性访问权限,例如,使用通配符 * 命名方案。有关更多信息,请参阅 添加和删除 IAM 政策IAM 用户指南.

导出到 Amazon S3

要创建可导出至的流 Amazon S3,您的 Lambda 函数使用 S3ExportTaskExecutorConfig 对象以配置导出策略。该策略定义导出设置,例如多部分上传阈值和优先级。对于 Amazon S3 导出,流管理器会上传从核心设备上的本地文件读取的数据。要启动上传,请 Lambda 函数将导出任务附加到目标流。导出任务包含有关输入文件和目标的信息 Amazon S3 对象。流管理器按任务附加到流的顺序执行任务。

注意

目标存储桶必须已存在于您的 AWS 账户中。如果指定键的对象不存在,流管理器会为您创建该对象。

此高级工作流程如下图所示。


                    流管理器工作流示意图 Amazon S3 导出。

StreamManager使用多部分上传阈值属性, 最小零件尺寸 设置和输入文件的大小,以确定如何上传数据。多部件上传阈值必须大于或等于最小部件大小。如果要并行上传数据,可以创建多个流。

指定目标的键 Amazon S3 对象可以包括有效的 爪哇 DateTimeFormatter 字符串 !{timestamp:value} 占位符。您可以使用这些时间戳占位符来在 Amazon S3 以上传输入文件数据的时间为准。例如,以下密钥名称解析为值,例如 my-key/2020/12/31/data.txt.

my-key/!{timestamp:YYYY}/!{timestamp:MM}/!{timestamp:dd}/data.txt
注意

如果要监控流的导出状态,请先创建状态流,然后将导出流配置为使用它。有关更多信息,请参阅监控导出任务

管理输入数据

您可以编写代码 IoT 应用程序用于管理输入数据的生命周期。以下工作流程示例显示了如何使用 Lambda 函数来管理此数据。

  1. 本地进程从设备或外围设备接收数据,然后将数据写入核心设备的目录中的文件。这些是流管理器的输入文件。

    注意

    要确定是否必须配置对输入文件目录的访问,请参阅 迈瑞经理 参数。

    流管理器在中运行的进程继承 默认访问身份 为组。流管理器必须具有访问输入文件的权限。您可以使用 chmod(1) 命令更改文件的权限(如有必要)。

  2. 甲 Lambda 功能扫描目录并 附加导出任务 到目标流。任务是JSON序列化的 S3ExportTaskDefinition 指定输入文件的URL的对象,目标 Amazon S3 bucket和key,以及可选用户元数据。

  3. StreamManager读取输入文件并将数据导出到 Amazon S3 按附加任务的顺序排序。目标存储桶必须已存在于您的 AWS 账户中。如果指定键的对象不存在,流管理器会为您创建该对象。

  4. 的 Lambda 功能 读取消息 从状态流监控导出状态。在导出任务完成后, Lambda 函数可以删除相应的输入文件。有关更多信息,请参阅监控导出任务

监控导出任务

您可以编写代码 IoT 应用程序用于监控的状态 Amazon S3 导出。您的 Lambda 函数必须创建状态流,然后配置导出流以将状态更新写入到状态流。单个状态流可以从导出至的多个流接收状态更新 Amazon S3.

首先 创建流 用作状态流。您可以配置流的大小和保留策略,以控制状态消息的寿命。例如:

  • 设置 PersistenceMemory 如果您不想存储状态消息。

  • 设置 StrategyOnFullOverwriteOldestData 这样新的状态消息不会丢失。

然后,创建或更新导出流以使用状态流。具体来说,设置流的状态配置属性 S3ExportTaskExecutorConfig 导出配置。这告诉流管理器将有关导出任务的状态消息写入到状态流。在 StatusConfig 对象,指定状态流的名称和的级别。以下支持的值范围从最少(ERROR)到大多数(TRACE)。默认值为 INFO

  • ERROR

  • WARN

  • INFO

  • DEBUG

  • TRACE

 

以下工作流程示例展示了 Lambda 功能可能使用状态流来监控导出状态。

  1. 如之前的工作流程所述, Lambda 功能 附加导出任务 到被配置为将有关导出任务的状态消息写入到状态流的流。附加操作返回表示任务ID的序列号。

  2. 甲 Lambda 功能 读取消息 然后根据流名称和任务ID或从消息上下文中导出任务属性过滤消息。例如, Lambda 函数可以通过导出任务的输入文件URL进行筛选,该URL由 S3ExportTaskDefinition 对象。

    以下状态代码表示导出任务已达到已完成状态:

    • Success。上传已成功完成。

    • Failure。StreamManager遇到错误,例如,指定的bucket不存在。解决问题后,您可以再次将导出任务附加到流中。

    • Canceled。由于流或导出定义已删除,或任务的生存时间(TTL)期间已过期,任务已中止。

    注意

    任务的状态也可能是 InProgressWarning。当事件返回不影响任务执行的错误时,StreamManager会发出警告。例如,清理中止的部分上传失败将返回警告。

  3. 在导出任务完成后, Lambda 函数可以删除相应的输入文件。

以下示例显示了 Lambda 功能可能读取和处理状态消息。

Python
import time from greengrasssdk.stream_manager import ( ReadMessagesOptions, Status, StatusConfig, StatusLevel, StatusMessage, StreamManagerClient, ) from greengrasssdk.stream_manager.util import Util client = StreamManagerClient() try: # Read the statuses from the export status stream is_file_uploaded_to_s3 = False while not is_file_uploaded_to_s3: try: messages_list = client.read_messages( "StatusStreamName", ReadMessagesOptions(min_message_count=1, read_timeout_millis=1000) ) for message in messages_list: # Deserialize the status message first. status_message = Util.deserialize_json_bytes_to_obj(message.payload, StatusMessage) # Check the status of the status message. If the status is "Success", # the file was successfully uploaded to S3. # If the status was either "Failure" or "Cancelled", the server was unable to upload the file to S3. # We will print the message for why the upload to S3 failed from the status message. # If the status was "InProgress", the status indicates that the server has started uploading # the S3 task. if status_message.status == Status.Success: logger.info("Successfully uploaded file at path " + file_url + " to S3.") is_file_uploaded_to_s3 = True elif status_message.status == Status.Failure or status_message.status == Status.Canceled: logger.info( "Unable to upload file at path " + file_url + " to S3. Message: " + status_message.message ) is_file_uploaded_to_s3 = True time.sleep(5) except StreamManagerException: logger.exception("Exception while running") except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

PythonSDK参考: 已读_消息 |内测 StatusMessage

Java
import com.amazonaws.greengrass.streammanager.client.StreamManagerClient; import com.amazonaws.greengrass.streammanager.client.utils.ValidateAndSerialize; import com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions; import com.amazonaws.greengrass.streammanager.model.Status; import com.amazonaws.greengrass.streammanager.model.StatusConfig; import com.amazonaws.greengrass.streammanager.model.StatusLevel; import com.amazonaws.greengrass.streammanager.model.StatusMessage; try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { try { boolean isS3UploadComplete = false; while (!isS3UploadComplete) { try { // Read the statuses from the export status stream List<Message> messages = client.readMessages("StatusStreamName", new ReadMessagesOptions().withMinMessageCount(1L).withReadTimeoutMillis(1000L)); for (Message message : messages) { // Deserialize the status message first. StatusMessage statusMessage = ValidateAndSerialize.deserializeJsonBytesToObj(message.getPayload(), StatusMessage.class); // Check the status of the status message. If the status is "Success", the file was successfully uploaded to S3. // If the status was either "Failure" or "Canceled", the server was unable to upload the file to S3. // We will print the message for why the upload to S3 failed from the status message. // If the status was "InProgress", the status indicates that the server has started uploading the S3 task. if (Status.Success.equals(statusMessage.getStatus())) { System.out.println("Successfully uploaded file at path " + FILE_URL + " to S3."); isS3UploadComplete = true; } else if (Status.Failure.equals(statusMessage.getStatus()) || Status.Canceled.equals(statusMessage.getStatus())) { System.out.println(String.format("Unable to upload file at path %s to S3. Message %s", statusMessage.getStatusContext().getS3ExportTaskDefinition().getInputUrl(), statusMessage.getMessage())); sS3UploadComplete = true; } } } catch (StreamManagerException ignored) { } finally { // Sleep for sometime for the S3 upload task to complete before trying to read the status message. Thread.sleep(5000); } } catch (e) { // Properly handle errors. } } catch (StreamManagerException e) { // Properly handle exception. }

JavaSDK参考: readMessages |内测 StatusMessage

Node.js
const { StreamManagerClient, ReadMessagesOptions, Status, StatusConfig, StatusLevel, StatusMessage, util, } = require('aws-greengrass-core-sdk').StreamManager; const client = new StreamManagerClient(); client.onConnected(async () => { try { let isS3UploadComplete = false; while (!isS3UploadComplete) { try { // Read the statuses from the export status stream const messages = await c.readMessages("StatusStreamName", new ReadMessagesOptions() .withMinMessageCount(1) .withReadTimeoutMillis(1000)); messages.forEach((message) => { // Deserialize the status message first. const statusMessage = util.deserializeJsonBytesToObj(message.payload, StatusMessage); // Check the status of the status message. If the status is 'Success', the file was successfully uploaded to S3. // If the status was either 'Failure' or 'Cancelled', the server was unable to upload the file to S3. // We will print the message for why the upload to S3 failed from the status message. // If the status was "InProgress", the status indicates that the server has started uploading the S3 task. if (statusMessage.status === Status.Success) { console.log(`Successfully uploaded file at path ${FILE_URL} to S3.`); isS3UploadComplete = true; } else if (statusMessage.status === Status.Failure || statusMessage.status === Status.Canceled) { console.log(`Unable to upload file at path ${FILE_URL} to S3. Message: ${statusMessage.message}`); isS3UploadComplete = true; } }); // Sleep for sometime for the S3 upload task to complete before trying to read the status message. await new Promise((r) => setTimeout(r, 5000)); } catch (e) { // Ignored } } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.jsSDK参考: readMessages |内测 StatusMessage