导出支持的 Amazon Web Services 云 目标的配置 - Amazon IoT Greengrass
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Amazon IoT Greengrass Version 1 2023 年 6 月 30 日进入延长寿命阶段。有关更多信息,请参阅 Amazon IoT Greengrass V1 维护策略。在此日期之后,将 Amazon IoT Greengrass V1 不会发布提供功能、增强功能、错误修复或安全补丁的更新。在上面运行的设备 Amazon IoT Greengrass V1 不会中断,将继续运行并连接到云端。我们强烈建议您迁移到 Amazon IoT Greengrass Version 2,这样可以添加重要的新功能支持其他平台

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

导出支持的 Amazon Web Services 云 目标的配置

用户定义的 Lambda 函数使用 Amazon IoT Greengrass 核心开发工具包中的 StreamManagerClient 与流管理器交互。当 Lambda 函数创建流更新流时,它会传递一个表示流属性的 MessageStreamDefinition 对象,包括导出定义。ExportDefinition 对象包含为流定义的导出配置。流管理器使用这些导出配置来确定将流导出到何处以及如何导出。


            ExportDefinition 属性类型的对象模型图。

您可以为一个流定义零个或多个导出配置,包括针对单个目标类型的多个导出配置。例如,您可以将一个流导出到两个 Amazon IoT Analytics 通道和一个 Kinesis 数据流。

对于失败的导出尝试,流管理器会持续重试将数据导出到 Amazon Web Services 云,间隔不超过五分钟。重试次数没有最大限制。

注意

StreamManagerClient 还提供了一个可用于将流导出到 HTTP 服务器的目标。此目标仅用于测试目的。其不稳定,或不支持在生产环境中使用。

由您来负责维护这些 Amazon Web Services 云 资源。

Amazon IoT Analytics 通道

流管理器支持自动导出到 Amazon IoT Analytics。Amazon IoT Analytics 允许您对数据进行高级分析,以帮助做出业务决策和改进机器学习模型。有关更多信息,请参阅 Amazon IoT Analytics 用户指南中的什么是 Amazon IoT Analytics?

在 Amazon IoT Greengrass 核心软件开发工具包中,您的 Lambda 函数使用 IoTAnalyticsConfig 来定义此目标类型的导出配置。有关更多信息,请参阅目标语言的开发工具包参考:

要求

此导出目的地具有以下要求:

  • Amazon IoT Analytics 中的目标通道必须与 Greengrass 组处于相同的 Amazon Web Services 账户 和 Amazon Web Services 区域 中。

  • Greengrass 组角色 必须允许对目标通道的 iotanalytics:BatchPutMessage 权限。例如:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "iotanalytics:BatchPutMessage" ], "Resource": [ "arn:aws:iotanalytics:region:account-id:channel/channel_1_name", "arn:aws:iotanalytics:region:account-id:channel/channel_2_name" ] } ] }

    您可以授予对资源的具体或条件访问权限(例如,通过使用通配符 * 命名方案)。有关更多信息,请参阅 IAM 用户指南中的添加和删除 IAM 策略

导出到 Amazon IoT Analytics

要创建导出到 Amazon IoT Analytics 的流,您的 Lambda 函数会创建一个包含一个或多个 IoTAnalyticsConfig 对象的导出定义的流。此对象定义导出设置,例如目标频道、批次大小、批次间隔和优先级。

当您的 Lambda 函数从设备接收数据时,它们会将包含大量数据的消息附加到目标流。

然后,流管理器根据流的导出配置中定义的批处理设置和优先级导出数据。

 

Amazon Kinesis data streams

流管理器支持自动导出到 Amazon Kinesis Data Streams。Kinesis Data Streams 通常用于聚合大量数据并将其加载到数据仓库或 map-reduce 集群中。有关更多信息,请参阅 Amazon Kinesis 开发人员指南中的什么是 Amazon Kinesis Data Streams?

在 Amazon IoT Greengrass 核心软件开发工具包中,您的 Lambda 函数使用 KinesisConfig 来定义此目标类型的导出配置。有关更多信息,请参阅目标语言的开发工具包参考:

要求

此导出目的地具有以下要求:

  • Kinesis Data Streams 中的目标流必须与 Greengrass 组处于相同的 Amazon Web Services 账户 和 Amazon Web Services 区域 中。

  • Greengrass 组角色 必须允许对数据流的 kinesis:PutRecords 权限。例如:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": [ "arn:aws:kinesis:region:account-id:stream/stream_1_name", "arn:aws:kinesis:region:account-id:stream/stream_2_name" ] } ] }

    您可以授予对资源的具体或条件访问权限(例如,通过使用通配符 * 命名方案)。有关更多信息,请参阅 IAM 用户指南中的添加和删除 IAM 策略

导出到 Kinesis Data Streams

要创建导出到 Kinesis Data Streams 的流,您的 Lambda 函数会创建一个包含一个或多个 KinesisConfig 对象的导出定义的流。此对象定义导出设置,例如目标数据流、批次大小、批次间隔和优先级。

当您的 Lambda 函数从设备接收数据时,它们会将包含大量数据的消息附加到目标流。然后,流管理器根据流的导出配置中定义的批处理设置和优先级导出数据。

流管理器会为上传到 Amazon Kinesis 的每条记录生成一个唯一的随机 UUID 作为分区键。

 

Amazon IoT SiteWise 资产属性

流管理器支持自动导出到 Amazon IoT SiteWise。Amazon IoT SiteWise 可让您能够大规模收集、组织和分析来自工业设备的数据。有关更多信息,请参阅 Amazon IoT SiteWise 用户指南中的什么是 Amazon IoT SiteWise?

在 Amazon IoT Greengrass 核心软件开发工具包中,您的 Lambda 函数使用 IoTSiteWiseConfig 来定义此目标类型的导出配置。有关更多信息,请参阅目标语言的开发工具包参考:

注意

Amazon 还提供 物联网 SiteWise 连接器,这是一个预先构建的解决方案,可以配合 OPC-UA 源使用。

要求

此导出目的地具有以下要求:

  • Amazon IoT SiteWise 中的目标资产属性必须与 Greengrass 组处于相同的 Amazon Web Services 账户 和 Amazon Web Services 区域 中。

    注意

    有关 Amazon IoT SiteWise 支持的区域的名称的列表,请参阅 Amazon 一般参考中的 Amazon IoT SiteWise 终端节点和配额

  • Greengrass 组角色 必须允许对目标资产属性的 iotsitewise:BatchPutAssetPropertyValue 权限。以下示例策略使用 iotsitewise:assetHierarchyPath 条件键来授予对目标根资产及其子项的访问权限。您可以从策略中删除 Condition,以允许访问您的所有 Amazon IoT SiteWise 资产。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "iotsitewise:BatchPutAssetPropertyValue", "Resource": "*", "Condition": { "StringLike": { "iotsitewise:assetHierarchyPath": [ "/root node asset ID", "/root node asset ID/*" ] } } } ] }

    您可以授予对资源的具体或条件访问权限(例如,通过使用通配符 * 命名方案)。有关更多信息,请参阅 IAM 用户指南中的添加和删除 IAM 策略

    有关重要的安全信息,请参阅Amazon IoT SiteWise《用户指南》中的 batchputAssetPropertyPortyValue 授权

导出到 Amazon IoT SiteWise

要创建导出到 Amazon IoT SiteWise 的流,您的 Lambda 函数会创建一个包含一个或多个 IoTSiteWiseConfig 对象的导出定义的流。此对象定义导出设置,例如批次大小、批次间隔和优先级。

当您的 Lambda 函数从设备接收资产属性数据时,它们会将包含数据的消息附加到目标流。消息是 JSON 序列化的 PutAssetPropertyValueEntry 对象,其中包含一个或多个资产属性的属性值。有关更多信息,请参阅为Amazon IoT SiteWise导出目标追加消息

注意

当您将数据发送到 Amazon IoT SiteWise 时,数据必须满足 BatchPutAssetPropertyValue 操作的要求。有关更多信息,请参阅 Amazon IoT SiteWise API 参考中的 BatchPutAssetPropertyValue

然后,流管理器根据流的导出配置中定义的批处理设置和优先级导出数据。

 

您可以调整流管理器设置和 Lambda 函数逻辑来设计您的导出策略。例如:

  • 对于近乎实时的导出,请设置较低的批量大小和间隔设置,并在收到数据时将数据追加到流中。

  • 为了优化批处理、缓解带宽限制或最大限度地降低成本,您的 Lambda 函数可以在将数据追加到流之前,将单个资产属性收到的时间戳质量值 (TQV) 数据点汇集在一起。一种策略是在一条消息中批量输入多达 10 种不同的财产资产组合或属性别名,而不是为同一个属性发送多个条目。这有助于流管理器保持在Amazon IoT SiteWise配额范围内。

 

Amazon S3 对象

流管理器支持自动导出到 Amazon S3。您可以使用 Amazon S3 存储和检索大量的数据。有关更多信息,请参阅 Amazon Simple Storage Service 开发人员指南中的什么是 Amazon S3?

在 Amazon IoT Greengrass 核心软件开发工具包中,您的 Lambda 函数使用 S3ExportTaskExecutorConfig 来定义此目标类型的导出配置。有关更多信息,请参阅目标语言的开发工具包参考:

要求

此导出目的地具有以下要求:

  • Amazon S3 桶必须与 Greengrass 组处于相同的 Amazon Web Services 账户 中。

  • 如果 Greengrass 组的默认容器化Greengrass 容器,则必须将 STREAM_MANAGER_READ_ONLY_DIRS 参数设置为使用位于 /tmp 下或不在根文件系统下的输入文件目录。

  • 如果在 Greengrass 容器模式下运行的 Lambda 函数将输入文件写入输入文件目录,则必须为该目录创建本地卷资源,并将该目录挂载到具有写入权限的容器中。这样可以确保将文件写入根文件系统并在容器外部可见。有关更多信息,请参阅使用 Lambda 函数和连接器访问本地资源

  • Greengrass 组角色 必须允许对目标存储桶具有以下权限。例如:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts" ], "Resource": [ "arn:aws:s3:::bucket-1-name/*", "arn:aws:s3:::bucket-2-name/*" ] } ] }

    您可以授予对资源的具体或条件访问权限(例如,通过使用通配符 * 命名方案)。有关更多信息,请参阅 IAM 用户指南中的添加和删除 IAM 策略

导出到 Amazon S3

为了创建导出到 Amazon S3 的流,您的 Lambda 函数使用 S3ExportTaskExecutorConfig 对象来配置导出策略。该策略定义了导出设置,例如分段上传阈值和优先级。对于 Amazon S3 导出,流管理器会上传它从核心设备上的本地文件中读取的数据。要启动上传,您的 Lambda 函数会将导出任务附加到目标流。导出任务包含有关输入文件和目标 Amazon S3 对象的信息。流管理器按照任务附加到流中的顺序执行任务。

注意

目标桶必须已存在于您的 Amazon Web Services 账户 中。如果指定密钥的对象不存在,则流管理器会为您创建该对象。

下图显示了该高级别流程。


                    Amazon S3 导出的流管理器工作流程示意图。

Stream Manager 使用分段上传阈值属性、最小分段大小设置和输入文件的大小来确定如何上传数据。分段上传阈值必须大于或等于最小分段大小。如果要并行上传数据,则可以创建多个流。

指定您的目标 Amazon S3 对象的密钥可以在 !{timestamp:value} 占位符中包含有效的 Java DateTimeFormatter 字符串。您可以使用这些时间戳占位符根据输入文件数据的上传时间对 Amazon S3 中的数据进行分区。例如,以下键名解析为诸如 my-key/2020/12/31/data.txt 之类的值。

my-key/!{timestamp:YYYY}/!{timestamp:MM}/!{timestamp:dd}/data.txt
注意

如果要监控流的导出状态,请先创建一个状态流,然后将导出流配置为使用该状态流。有关更多信息,请参阅监控导出任务

管理输入数据

您可以编写代码,供物联网应用用来管理输入数据的生命周期。以下示例工作流程显示了如何使用 Lambda 函数来管理这些数据。

  1. 本地进程从设备或外围设备接收数据,然后将数据写入核心设备上目录中的文件。这些是流管理器的输入文件。

    注意

    要确定是否必须配置对输入文件目录的访问权限,请参阅 STREAM_MANAGER_READ_ONLY_DIRS 参数。

    流管理器运行的进程继承了该组默认访问身份的所有文件系统权限。流管理器必须拥有访问输入文件的权限。如有必要,您可以使用 chmod(1) 命令更改文件的权限。

  2. Lambda 函数会扫描该目录,并在创建新文件时将导出任务附加到目标流。该任务是一个 JSON 序列化 S3ExportTaskDefinition 对象,用于指定输入文件的 URL、目标 Amazon S3 存储桶和密钥以及可选的用户元数据。

  3. 流管理器读取输入文件并按照附加任务的顺序将数据导出到 Amazon S3。目标桶必须已存在于您的 Amazon Web Services 账户 中。如果指定密钥的对象不存在,则流管理器会为您创建该对象。

  4. Lambda 函数从状态流读取消息以监控导出状态。导出任务完成后,Lambda 函数可以删除相应的输入文件。有关更多信息,请参阅监控导出任务

监控导出任务

您可以编写代码,让物联网应用程序监控您的 Amazon S3 导出状态。您的 Lambda 函数必须创建状态流,然后配置导出流以将状态更新写入状态流。单个状态流可以接收来自导出到 Amazon S3 的多个流的状态更新。

首先,创建一个流以用作状态流。您可以为流配置大小和保留策略,以控制状态消息的生命周期。例如:

  • 如果您不想存储状态消息,请将 Persistence 设置为 Memory

  • StrategyOnFull 设置为 OverwriteOldestData,这样新的状态消息就不会丢失。

然后,创建或更新导出流以使用状态流。具体而言,设置流 S3ExportTaskExecutorConfig 导出配置的状态配置属性。这会告诉流管理器将有关导出任务的状态消息写入状态流。在 StatusConfig 对象中,指定状态流的名称和详细程度。以下支持的值范围从最低 verbose (ERROR) 到最长 verbose (TRACE) 不等。默认为 INFO

  • ERROR

  • WARN

  • INFO

  • DEBUG

  • TRACE

 

以下示例工作流程显示了 Lambda 函数如何使用状态流来监控导出状态。

  1. 如前面的工作流程所述,Lambda 函数将导出任务附加到流,后者配置为将有关导出任务的状态消息写入状态流。附加操作返回一个表示任务 ID 的序列号。

  2. Lambda 函数按顺序读取状态流中的消息,然后根据流名称和任务 ID 或消息上下文中的导出任务属性筛选消息。例如,Lambda 函数可以按导出任务的输入文件 URL 进行筛选,该文件由消息上下文中的 S3ExportTaskDefinition 对象表示。

    以下状态代码指示导出任务已达到完成状态:

    • Success。上传已成功完成。

    • Failure。流管理器遇到错误,例如,指定的存储桶不存在。解决问题后,您可以再次将导出任务追加到流中。

    • Canceled。由于直播或导出定义已删除,或者任务的生存时间 (TTL) 期已过期,该任务已中止。

    注意

    该任务的状态也可能为 InProgressWarning。当事件返回不影响任务执行的错误时,流管理器会发出警告。例如,如果无法清理已中止的部分上传,则会返回警告。

  3. 导出任务完成后,Lambda 函数可以删除相应的输入文件。

以下示例显示 Lambda 函数如何读取和处理状态消息。

Python
import time from greengrasssdk.stream_manager import ( ReadMessagesOptions, Status, StatusConfig, StatusLevel, StatusMessage, StreamManagerClient, ) from greengrasssdk.stream_manager.util import Util client = StreamManagerClient() try: # Read the statuses from the export status stream is_file_uploaded_to_s3 = False while not is_file_uploaded_to_s3: try: messages_list = client.read_messages( "StatusStreamName", ReadMessagesOptions(min_message_count=1, read_timeout_millis=1000) ) for message in messages_list: # Deserialize the status message first. status_message = Util.deserialize_json_bytes_to_obj(message.payload, StatusMessage) # Check the status of the status message. If the status is "Success", # the file was successfully uploaded to S3. # If the status was either "Failure" or "Cancelled", the server was unable to upload the file to S3. # We will print the message for why the upload to S3 failed from the status message. # If the status was "InProgress", the status indicates that the server has started uploading # the S3 task. if status_message.status == Status.Success: logger.info("Successfully uploaded file at path " + file_url + " to S3.") is_file_uploaded_to_s3 = True elif status_message.status == Status.Failure or status_message.status == Status.Canceled: logger.info( "Unable to upload file at path " + file_url + " to S3. Message: " + status_message.message ) is_file_uploaded_to_s3 = True time.sleep(5) except StreamManagerException: logger.exception("Exception while running") except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 开发工具包参考:read_messages | StatusMessage

Java
import com.amazonaws.greengrass.streammanager.client.StreamManagerClient; import com.amazonaws.greengrass.streammanager.client.utils.ValidateAndSerialize; import com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions; import com.amazonaws.greengrass.streammanager.model.Status; import com.amazonaws.greengrass.streammanager.model.StatusConfig; import com.amazonaws.greengrass.streammanager.model.StatusLevel; import com.amazonaws.greengrass.streammanager.model.StatusMessage; try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { try { boolean isS3UploadComplete = false; while (!isS3UploadComplete) { try { // Read the statuses from the export status stream List<Message> messages = client.readMessages("StatusStreamName", new ReadMessagesOptions().withMinMessageCount(1L).withReadTimeoutMillis(1000L)); for (Message message : messages) { // Deserialize the status message first. StatusMessage statusMessage = ValidateAndSerialize.deserializeJsonBytesToObj(message.getPayload(), StatusMessage.class); // Check the status of the status message. If the status is "Success", the file was successfully uploaded to S3. // If the status was either "Failure" or "Canceled", the server was unable to upload the file to S3. // We will print the message for why the upload to S3 failed from the status message. // If the status was "InProgress", the status indicates that the server has started uploading the S3 task. if (Status.Success.equals(statusMessage.getStatus())) { System.out.println("Successfully uploaded file at path " + FILE_URL + " to S3."); isS3UploadComplete = true; } else if (Status.Failure.equals(statusMessage.getStatus()) || Status.Canceled.equals(statusMessage.getStatus())) { System.out.println(String.format("Unable to upload file at path %s to S3. Message %s", statusMessage.getStatusContext().getS3ExportTaskDefinition().getInputUrl(), statusMessage.getMessage())); sS3UploadComplete = true; } } } catch (StreamManagerException ignored) { } finally { // Sleep for sometime for the S3 upload task to complete before trying to read the status message. Thread.sleep(5000); } } catch (e) { // Properly handle errors. } } catch (StreamManagerException e) { // Properly handle exception. }

Java 开发工具包参考:readMessages | StatusMessage

Node.js
const { StreamManagerClient, ReadMessagesOptions, Status, StatusConfig, StatusLevel, StatusMessage, util, } = require('aws-greengrass-core-sdk').StreamManager; const client = new StreamManagerClient(); client.onConnected(async () => { try { let isS3UploadComplete = false; while (!isS3UploadComplete) { try { // Read the statuses from the export status stream const messages = await c.readMessages("StatusStreamName", new ReadMessagesOptions() .withMinMessageCount(1) .withReadTimeoutMillis(1000)); messages.forEach((message) => { // Deserialize the status message first. const statusMessage = util.deserializeJsonBytesToObj(message.payload, StatusMessage); // Check the status of the status message. If the status is 'Success', the file was successfully uploaded to S3. // If the status was either 'Failure' or 'Cancelled', the server was unable to upload the file to S3. // We will print the message for why the upload to S3 failed from the status message. // If the status was "InProgress", the status indicates that the server has started uploading the S3 task. if (statusMessage.status === Status.Success) { console.log(`Successfully uploaded file at path ${FILE_URL} to S3.`); isS3UploadComplete = true; } else if (statusMessage.status === Status.Failure || statusMessage.status === Status.Canceled) { console.log(`Unable to upload file at path ${FILE_URL} to S3. Message: ${statusMessage.message}`); isS3UploadComplete = true; } }); // Sleep for sometime for the S3 upload task to complete before trying to read the status message. await new Promise((r) => setTimeout(r, 5000)); } catch (e) { // Ignored } } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js 开发工具包参考:readMessages | StatusMessage