导出支持的配置 Amazon Web Services 云 目的地 - Amazon IoT Greengrass
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

您正在查看Amazon IoT Greengrass Version 1.Amazon IoT Greengrass Version 2是最新的主要版本Amazon IoT Greengrass. 有关使用Amazon IoT Greengrass V2,请参阅Amazon IoT Greengrass Version 2开发人员指南.

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

导出支持的配置 Amazon Web Services 云 目的地

用户定义的 Lambda 函数使用StreamManagerClient中的Amazon IoT Greengrass与流管理器交互的核心 SDK。当 Lambda 函数创建流或者更新流,它会传递一个MessageStreamDefinition对象,该对象表示流属性,包括导出定义。这些区域有:ExportDefinition对象包含为流定义的导出配置。流管理器使用这些导出配置来确定流的导出位置和方式。


            导出定义属性类型的对象模型图。

您可以在流上定义零个或多个导出配置,包括单个目标类型的多个导出配置。例如,您可以将流导出到两个Amazon IoT Analytics通道和一个 Kinesis 数据流。

对于失败的导出尝试,流管理器会不断重试将数据导出到 Amazon Web Services 云 在最多五分钟的间隔。重试次数没有最大限制。

注意

StreamManagerClient还提供了一个可用于将流导出到 HTTP 服务器的目标。此目标仅用于测试目的。它不稳定,不支持在生产环境中使用。

您可以回应维护这些 Amazon Web Services 云 资源的费用。

Amazon IoT Analytics 通道

流管理器支持自动导出到Amazon IoT Analytics.Amazon IoT Analytics允许您对数据执行高级分析,以帮助制定业务决策并改进机器学习模型。有关更多信息,请参阅 。是什么Amazon IoT Analytics?中的Amazon IoT Analytics用户指南.

在Amazon IoT Greengrass核心 SDK,您的 Lambda 函数使用IoTAnalyticsConfig定义此目标类型的导出配置。有关更多信息,请参阅适用于您的目标语言的开发工具包参考:

Requirements

此导出目的地具有以下要求:

  • 中的目标通道Amazon IoT Analytics均必须位于同一 Amazon Web Services 账户 和 Amazon Web Services 区域 Greengrass 组。

  • 这些区域有:Greengrass 组角色必须允许iotanalytics:BatchPutMessage访问目标频道的权限。例如:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "iotanalytics:BatchPutMessage" ], "Resource": [ "arn:aws:iotanalytics:region:account-id:channel/channel_1_name", "arn:aws:iotanalytics:region:account-id:channel/channel_2_name" ] } ] }

    您可以授予对资源的具体或条件访问权限,例如,通过使用通配符*命名方案。有关更多信息,请参阅 。添加和删除 IAM 策略中的IAM 用户指南.

导出到 Amazon IoT Analytics

创建流Amazon IoT Analytics,您的 Lambda 函数创建流,其中包含一个或多个IoTAnalyticsConfig对象。此对象定义导出设置,如目标通道、批处理大小、批处理间隔和优先级。

当您的 Lambda 函数从设备接收数据时,它们附加消息,其中包含到目标流的数据 blob。

然后,流管理器根据流的导出配置中定义的批处理设置和优先级导出数据。

 

Amazon Kinesis Data Streams

流管理器支持自动将导出到 Amazon Kinesis Data Streams。Kinesis Data Streams 常用于聚合大容量数据并将其加载到数据仓库或 map-reduce 集群中。有关更多信息,请参阅 。什么是 Amazon Kinesis Data Streams?中的Amazon Kinesis 开发人员指南.

在Amazon IoT Greengrass核心 SDK,您的 Lambda 函数使用KinesisConfig定义此目标类型的导出配置。有关更多信息,请参阅适用于您的目标语言的开发工具包参考:

Requirements

此导出目的地具有以下要求:

  • Kinesis 数据流中的目标流必须位于相同的 Amazon Web Services 账户 和 Amazon Web Services 区域 Greengrass 组。

  • 这些区域有:Greengrass 组角色必须允许kinesis:PutRecords目标数据流的权限。例如:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": [ "arn:aws:kinesis:region:account-id:stream/stream_1_name", "arn:aws:kinesis:region:account-id:stream/stream_2_name" ] } ] }

    您可以授予对资源的具体或条件访问权限,例如,通过使用通配符*命名方案。有关更多信息,请参阅 。添加和删除 IAM 策略中的IAM 用户指南.

导出到 Kinesis Data Streams

若要创建导出到 Kinesis Data Streams,您的 Lambda 函数创建流,其中包含一个或多个KinesisConfig对象。此对象定义导出设置,如目标数据流、批处理大小、批处理间隔和优先级。

当您的 Lambda 函数从设备接收数据时,它们附加消息,其中包含到目标流的数据 blob。然后,流管理器根据流的导出配置中定义的批处理设置和优先级导出数据。

流管理器为上传到 Amazon Kinesis 的每条记录生成一个唯一的随机 UUID 作为分区键。

 

Amazon IoT SiteWise资产属性

流管理器支持自动导出到Amazon IoT SiteWise.Amazon IoT SiteWise可让您轻松地从工业设备中大规模收集、组织和分析数据。有关更多信息,请参阅 。是什么Amazon IoT SiteWise?中的Amazon IoT SiteWise用户指南.

在Amazon IoT Greengrass核心 SDK,您的 Lambda 函数使用IoTSiteWiseConfig定义此目标类型的导出配置。有关更多信息,请参阅适用于您的目标语言的开发工具包参考:

注意

Amazon还提供了IoT SiteWise 连接器,这是一个预构建的解决方案,您可以与 OPC-UA 源一起使用。

Requirements

此导出目的地具有以下要求:

  • 中的目标资产属性Amazon IoT SiteWise均必须位于同一 Amazon Web Services 账户 和 Amazon Web Services 区域 Greengrass 组。

    注意

    有关区域列表Amazon IoT SiteWise支持,请参阅Amazon IoT SiteWise终端节点和配额中的Amazon一般参考.

  • 这些区域有:Greengrass 组角色必须允许iotsitewise:BatchPutAssetPropertyValue目标资产属性的权限。以下示例策略使用iotsitewise:assetHierarchyPath以授予对目标根资产及其子级的访问权限。您可以将Condition,以允许访问您的所有Amazon IoT SiteWise资产或指定单个资产的 ARN。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "iotsitewise:BatchPutAssetPropertyValue", "Resource": "*", "Condition": { "StringLike": { "iotsitewise:assetHierarchyPath": [ "/root node asset ID", "/root node asset ID/*" ] } } } ] }

    您可以授予对资源的具体或条件访问权限,例如,通过使用通配符*命名方案。有关更多信息,请参阅 。添加和删除 IAM 策略中的IAM 用户指南.

    有关重要的安全性信息,请参阅BatchPutAssetPropertyValue 授权中的Amazon IoT SiteWise用户指南.

导出到 Amazon IoT SiteWise

创建流Amazon IoT SiteWise,您的 Lambda 函数创建流,其中包含一个或多个IoTSiteWiseConfig对象。此对象定义导出设置,如批次大小、批次间隔和优先级。

当 Lambda 函数从设备接收资产属性数据时,它们会将包含数据的消息附加到目标流。消息是 JSON 序列化的PutAssetPropertyValueEntry对象,其中包含一个或多个资源属性的属性值。有关更多信息,请参阅 。附加消息对于 来说为Amazon IoT SiteWise导出目标。

注意

将数据发送给Amazon IoT SiteWise,则数据必须满足BatchPutAssetPropertyValueaction. 有关更多信息,请参阅 。BatchPutAssetPropertyValue中的Amazon IoT SiteWiseAPI 参考.

然后,流管理器根据流的导出配置中定义的批处理设置和优先级导出数据。

 

您可以调整流管理器设置和 Lambda 函数逻辑来设计导出策略。例如:

  • 对于近实时导出,请设置较小的批量大小和间隔设置,并在收到数据时将数据追加到流中。

  • 为了优化批处理、缓解带宽限制或最大限度地降低成本,Lambda 函数可以在将数据附加到流之前将单个资产属性收到的时间戳-质量值 (TQV) 数据点合并。一种策略是在一条消息中批处理多达 10 个不同属性资产组合或属性别名的条目,而不是为同一属性发送多个条目。这有助于流管理器保持Amazon IoT SiteWise配额.

 

Amazon S3 对象

流管理器支持将自动导出到 Amazon S3。您可以使用 Amazon S3 存储和检索大量数据。有关更多信息,请参阅 。什么是 Amazon S3?中的Amazon Simple Storage Service 开发人员指南.

在Amazon IoT Greengrass核心 SDK,您的 Lambda 函数使用S3ExportTaskExecutorConfig定义此目标类型的导出配置。有关更多信息,请参阅适用于您的目标语言的开发工具包参考:

Requirements

此导出目的地具有以下要求:

  • 目标 Amazon S3 存储桶必须位于同一个 Amazon Web Services 账户 Greengrass 组。

  • 如果默认容器化对于 Greengrass 组是Greengrass 容器,您必须将流管理器 _ 仅读 _ 目的参数来使用位于/tmp或不在根文件系统上。

  • Lambda 在Greengrass 容器模式将输入文件写入输入文件目录,则必须为该目录创建一个本地卷资源,并将该目录装载到具有写入权限的容器中。这可确保文件写入根文件系统并在容器外部可见。有关更多信息,请参阅使用 Lambda 函数和连接器访问本地资源

  • 这些区域有:Greengrass 组角色必须允许对目标存储桶具有以下权限。例如:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts" ], "Resource": [ "arn:aws:s3:::bucket-1-name/*", "arn:aws:s3:::bucket-2-name/*" ] } ] }

    您可以授予对资源的具体或条件访问权限,例如,通过使用通配符*命名方案。有关更多信息,请参阅 。添加和删除 IAM 策略中的IAM 用户指南.

导出到 Amazon S3

要创建导出到 Amazon S3 的流,您的 Lambda 函数使用S3ExportTaskExecutorConfig对象配置导出策略。策略定义导出设置,如分段上传阈值和优先级。对于 Amazon S3 导出,流管理器会上传它从核心设备上的本地文件读取的数据。要启动上载,您的 Lambda 函数会将导出任务附加到目标流。导出任务包含有关输入文件和目标 Amazon S3 对象的信息。流管理器按照追加到流的顺序执行任务。

注意

目标存储桶必须已存在于您的 Amazon Web Services 账户 . 如果指定键的对象不存在,流管理器将为您创建该对象。

下图显示了该高级工作流程。


                    Amazon S3 导出流管理器工作流程图。

流管理器使用分段上传阈值属性分段大小下限设置和输入文件的大小,以确定如何上载数据。分段上传阈值必须大于或等于最小段大小。如果要并行上传数据流,您可以创建多个流。

指定目标 Amazon S3 对象的密钥可以包含有效Java DateTimeFormatter中的字符串!{timestamp:value}占位符。您可以使用这些时间戳占位符根据输入文件数据上传的时间对 Amazon S3 中的数据进行分区。例如,以下键名称将解析为类似的值my-key/2020/12/31/data.txt.

my-key/!{timestamp:YYYY}/!{timestamp:MM}/!{timestamp:dd}/data.txt
注意

如果要监视流的导出状态,请首先创建状态流,然后配置导出流以使用它。有关更多信息,请参阅监控导出任务

管理输入数据

您可以编写 IoT 应用程序用于管理输入数据的生命周期的代码。下面的示例工作流说明了可通过何种方式使用 Lambda 函数来管理此数据。

  1. 本地进程从设备或外围设备接收数据,然后将数据写入核心设备的目录中的文件。这些是流管理器的输入文件。

    注意

    要确定是否必须配置对输入文件目录的访问权限,请参阅流管理器 _ 仅读 _ 目的参数。

    流管理器在中运行的进程继承默认访问身份,用于组。流管理器必须有权访问输入文件。您可以使用chmod(1)命令更改文件的权限。

  2. Lambda 函数扫描目录和追加导出任务添加到目标流。该任务是一个 JSON 序列化S3ExportTaskDefinition对象,该对象指定输入文件的 URL、目标 Amazon S3 存储桶和密钥以及可选用户元数据。

  3. 流管理器读取输入文件并按附加任务的顺序将数据导出到 Amazon S3。目标存储桶必须已存在于您的 Amazon Web Services 账户 . 如果指定键的对象不存在,流管理器将为您创建该对象。

  4. Lambda 函数读取消息来监视导出状态。导出任务完成后,Lambda 函数可以删除相应的输入文件。有关更多信息,请参阅监控导出任务

监控导出任务

您可以编写 IoT 应用程序用于监控 Amazon S3 导出状态的代码。Lambda 函数必须创建状态流,然后将导出流配置为将状态更新写入状态流。单个状态流可以从导出到 Amazon S3 的多个流接收状态更新。

首先,创建流以用作状态流。您可以配置流的大小和保留策略,以控制状态消息的生命周期。例如:

  • SetPersistenceMemory(如果不想存储状态消息)。

  • SetStrategyOnFullOverwriteOldestData,以便不会丢失新的状态消息。

然后,创建或更新导出流以使用状态流。具体来说,设置流的S3ExportTaskExecutorConfig导出配置。这会告诉流管理器将有关导出任务的状态消息写入状态流。在StatusConfig对象中,指定状态流的名称和详细程度级别。以下支持的值范围从最不详细(ERROR)更改为最详细(TRACE)。默认为 INFO

  • ERROR

  • WARN

  • INFO

  • DEBUG

  • TRACE

 

以下示例工作流显示了 Lambda 函数如何使用状态流监视导出状态。

  1. 如上一个工作流中所述,Lambda 函数追加导出任务添加到配置为将有关导出任务的状态消息写入状态流的流。追加操作返回表示任务 ID 的序列号。

  2. Lambda 函数读取消息,然后根据流名称和任务 ID 或根据消息上下文中的导出任务属性筛选消息。例如,Lambda 函数可以按导出任务的输入文件 URL 进行筛选,该 URL 由S3ExportTaskDefinition对象在消息上下文中。

    以下状态代码表示导出任务已达到完成状态:

    • Success. 已成功完成上传。

    • Failure. 流管理器遇到错误,例如,指定的存储桶不存在。解决此问题后,您可以再次将导出任务附加到流中。

    • Canceled. 由于删除了流或导出定义,或任务的生存时间 (TTL) 期间过期,任务已中止。

    注意

    该任务的状态也可能为InProgress或者Warning. 当事件返回不影响任务执行的错误时,流管理器会发出警告。例如,清理中止的部分上载失败将返回警告。

  3. 导出任务完成后,Lambda 函数可以删除相应的输入文件。

下面的示例说明 Lambda 函数如何读取和处理状态消息。

Python
import time from greengrasssdk.stream_manager import ( ReadMessagesOptions, Status, StatusConfig, StatusLevel, StatusMessage, StreamManagerClient, ) from greengrasssdk.stream_manager.util import Util client = StreamManagerClient() try: # Read the statuses from the export status stream is_file_uploaded_to_s3 = False while not is_file_uploaded_to_s3: try: messages_list = client.read_messages( "StatusStreamName", ReadMessagesOptions(min_message_count=1, read_timeout_millis=1000) ) for message in messages_list: # Deserialize the status message first. status_message = Util.deserialize_json_bytes_to_obj(message.payload, StatusMessage) # Check the status of the status message. If the status is "Success", # the file was successfully uploaded to S3. # If the status was either "Failure" or "Cancelled", the server was unable to upload the file to S3. # We will print the message for why the upload to S3 failed from the status message. # If the status was "InProgress", the status indicates that the server has started uploading # the S3 task. if status_message.status == Status.Success: logger.info("Successfully uploaded file at path " + file_url + " to S3.") is_file_uploaded_to_s3 = True elif status_message.status == Status.Failure or status_message.status == Status.Canceled: logger.info( "Unable to upload file at path " + file_url + " to S3. Message: " + status_message.message ) is_file_uploaded_to_s3 = True time.sleep(5) except StreamManagerException: logger.exception("Exception while running") except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 开发工具包参考:读取消息|StatusMessage

Java
import com.amazonaws.greengrass.streammanager.client.StreamManagerClient; import com.amazonaws.greengrass.streammanager.client.utils.ValidateAndSerialize; import com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions; import com.amazonaws.greengrass.streammanager.model.Status; import com.amazonaws.greengrass.streammanager.model.StatusConfig; import com.amazonaws.greengrass.streammanager.model.StatusLevel; import com.amazonaws.greengrass.streammanager.model.StatusMessage; try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { try { boolean isS3UploadComplete = false; while (!isS3UploadComplete) { try { // Read the statuses from the export status stream List<Message> messages = client.readMessages("StatusStreamName", new ReadMessagesOptions().withMinMessageCount(1L).withReadTimeoutMillis(1000L)); for (Message message : messages) { // Deserialize the status message first. StatusMessage statusMessage = ValidateAndSerialize.deserializeJsonBytesToObj(message.getPayload(), StatusMessage.class); // Check the status of the status message. If the status is "Success", the file was successfully uploaded to S3. // If the status was either "Failure" or "Canceled", the server was unable to upload the file to S3. // We will print the message for why the upload to S3 failed from the status message. // If the status was "InProgress", the status indicates that the server has started uploading the S3 task. if (Status.Success.equals(statusMessage.getStatus())) { System.out.println("Successfully uploaded file at path " + FILE_URL + " to S3."); isS3UploadComplete = true; } else if (Status.Failure.equals(statusMessage.getStatus()) || Status.Canceled.equals(statusMessage.getStatus())) { System.out.println(String.format("Unable to upload file at path %s to S3. Message %s", statusMessage.getStatusContext().getS3ExportTaskDefinition().getInputUrl(), statusMessage.getMessage())); sS3UploadComplete = true; } } } catch (StreamManagerException ignored) { } finally { // Sleep for sometime for the S3 upload task to complete before trying to read the status message. Thread.sleep(5000); } } catch (e) { // Properly handle errors. } } catch (StreamManagerException e) { // Properly handle exception. }

Java 软件开发工具包参考:readMessages|StatusMessage

Node.js
const { StreamManagerClient, ReadMessagesOptions, Status, StatusConfig, StatusLevel, StatusMessage, util, } = require('aws-greengrass-core-sdk').StreamManager; const client = new StreamManagerClient(); client.onConnected(async () => { try { let isS3UploadComplete = false; while (!isS3UploadComplete) { try { // Read the statuses from the export status stream const messages = await c.readMessages("StatusStreamName", new ReadMessagesOptions() .withMinMessageCount(1) .withReadTimeoutMillis(1000)); messages.forEach((message) => { // Deserialize the status message first. const statusMessage = util.deserializeJsonBytesToObj(message.payload, StatusMessage); // Check the status of the status message. If the status is 'Success', the file was successfully uploaded to S3. // If the status was either 'Failure' or 'Cancelled', the server was unable to upload the file to S3. // We will print the message for why the upload to S3 failed from the status message. // If the status was "InProgress", the status indicates that the server has started uploading the S3 task. if (statusMessage.status === Status.Success) { console.log(`Successfully uploaded file at path ${FILE_URL} to S3.`); isS3UploadComplete = true; } else if (statusMessage.status === Status.Failure || statusMessage.status === Status.Canceled) { console.log(`Unable to upload file at path ${FILE_URL} to S3. Message: ${statusMessage.message}`); isS3UploadComplete = true; } }); // Sleep for sometime for the S3 upload task to complete before trying to read the status message. await new Promise((r) => setTimeout(r, 5000)); } catch (e) { // Ignored } } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js 开发工具包参考:readMessages|StatusMessage