支持Amazon Web Services 云目的地的导出配置 - Amazon IoT Greengrass
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

支持Amazon Web Services 云目的地的导出配置

用户定义的 Greengrass 组件StreamManagerClient在 Stream Manager SDK 中用于与流管理器进行交互。当组件创建流或更新流时,它会传递一个表示流属性的MessageStreamDefinition对象,包括导出定义。该ExportDefinition对象包含为流定义的导出配置。流管理器使用这些导出配置来确定在何处和如何导出流。


       ExportDefinition 属性类型的对象模型图。

您可以在一个流上定义零个或多个导出配置,包括单个目标类型的多个导出配置。例如,您可以将一个流导出到两个Amazon IoT Analytics通道和一个 Kinesis 数据流。

对于导出尝试失败,流管理器会持续重试将数据导出到,间隔最多为五分钟。Amazon Web Services 云重试次数没有最大限制。

注意

StreamManagerClient还提供了可用于将流导出到 HTTP 服务器的目标目标。此目标仅用于测试目的。它不稳定或不支持在生产环境中使用。

您有责任维护这些Amazon Web Services 云资源。

Amazon IoT Analytics 通道

直播管理器支持自动导出到Amazon IoT Analytics。 Amazon IoT Analytics允许您对数据进行高级分析,以帮助做出业务决策和改进机器学习模型。 ,Amazon IoT Analytics? 在《Amazon IoT Analytics用户指南》中。

在 Stream Manager SDK 中,您的 Greengrass 组件使用IoTAnalyticsConfig来定义此目标类型的导出配置。,SDK SDK SDK:

  • Python 开发工具包AnalyticsConfig中的@@ IoT

  • Java 软件开发工具包AnalyticsConfig中的@@ IoT

  • Node.js SDKAnalyticsConfig 中的@@ IoT

要求

此出口目的地有以下要求:

  • 中的目标信道Amazon IoT Analytics必须与 Greengrass 核心设备处于相同Amazon Web Services 账户和Amazon Web Services 区域中。

  • 授权核心设备与Amazon服务必须iotanalytics:BatchPutMessage允许定位频道。例如:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "iotanalytics:BatchPutMessage" ], "Resource": [ "arn:aws:iotanalytics:region:account-id:channel/channel_1_name", "arn:aws:iotanalytics:region:account-id:channel/channel_2_name" ] } ] }

    例如,通过使用通配符*命名方案,您可以授予对资源的精细或有条件的访问权限。有关更多信息,请参阅 IAM 用户指南中的添加和删除 IAM 策略

导出到 Amazon IoT Analytics

要创建导出到的流Amazon IoT Analytics,您的 Greengrass 组件会创建一个包含一个或多个IoTAnalyticsConfig对象的导出定义的流。此对象定义导出设置,例如目标通道、批处理大小、批处理间隔和优先级。

当您的 Greengrass 组件从设备接收数据时,它们会将包含大量数据的消息附加到目标流。

然后,流管理器根据流的导出配置中定义的批处理设置和优先级导出数据。

Kinesis Data Streas

Amazon Kinesis Data Streams。Kinesis Data Streams 通常用于聚合大量数据并将其加载到数据仓库或 MapReduce 集群中。 ,Amazon Kinesis Data Streams? Amazon Kinesis 开发者指南中。

在 Stream Manager SDK 中,您的 Greengrass 组件使用KinesisConfig来定义此目标类型的导出配置。,SDK SDK SDK:

要求

此出口目的地有以下要求:

  • Kinesis Data Streams 中的目标流必须与 Greengrass 核心设备处于相同Amazon Web Services 账户状态。Amazon Web Services 区域

  • 授权核心设备与Amazon服务必须kinesis:PutRecords允许将数据流作为目标。例如:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": [ "arn:aws:kinesis:region:account-id:stream/stream_1_name", "arn:aws:kinesis:region:account-id:stream/stream_2_name" ] } ] }

    例如,通过使用通配符*命名方案,您可以授予对资源的精细或有条件的访问权限。有关更多信息,请参阅 IAM 用户指南中的添加和删除 IAM 策略

Kinesis Data Streams as

要创建导出到 Kinesis Data Streams 的流,您的 Greengrass 组件会创建一个包含一个或多个KinesisConfig对象的导出定义的流。此对象定义导出设置,例如目标数据流、批次大小、批处理间隔和优先级。

当您的 Greengrass 组件从设备接收数据时,它们会将包含大量数据的消息附加到目标流。然后,流管理器根据流的导出配置中定义的批处理设置和优先级导出数据。

流管理器会为上传到 Amazon Kinesis 的每条记录生成一个唯一的随机 UUID 作为分区键。

Amazon IoT SiteWise资产属性

直播管理器支持自动导出到Amazon IoT SiteWise。 Amazon IoT SiteWise,、、。 ,Amazon IoT SiteWise? 在《Amazon IoT SiteWise用户指南》中。

在 Stream Manager SDK 中,您的 Greengrass 组件使用IoTSiteWiseConfig来定义此目标类型的导出配置。,SDK SDK SDK:

  • Python 开发工具包SiteWiseConfig中的@@ IoT

  • Java 软件开发工具包SiteWiseConfig中的@@ IoT

  • Node.js SDKSiteWiseConfig 中的@@ IoT

注意

Amazon还提供Amazon IoT SiteWise组件,这些组件提供了一个预建的解决方案,可用于从 OPC-UA 源流式传输数据。有关更多信息,请参阅 IoT SiteWise OPC-UA 收集器

要求

此出口目的地有以下要求:

  • 中的目标资产属性Amazon IoT SiteWise必须与 Greengrass 核心设备相同Amazon Web Services 账户。Amazon Web Services 区域

    注意

    有关Amazon IoT SiteWise支持的列表,请参阅《Amazon Web Services 区域Amazon一般参考》中的Amazon IoT SiteWise端点和配额

  • 授权核心设备与Amazon服务必须允许iotsitewise:BatchPutAssetPropertyValue授予目标资产属性的权限。以下示例策略使用iotsitewise:assetHierarchyPath条件密钥来授予对目标根资产及其子项的访问权限。您可以Condition从策略中移除以允许访问您的所有资Amazon IoT SiteWise产,也可以指定单个资产的 ARN。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "iotsitewise:BatchPutAssetPropertyValue", "Resource": "*", "Condition": { "StringLike": { "iotsitewise:assetHierarchyPath": [ "/root node asset ID", "/root node asset ID/*" ] } } } ] }

    例如,通过使用通配符*命名方案,您可以授予对资源的精细或有条件的访问权限。有关更多信息,请参阅 IAM 用户指南中的添加和删除 IAM 策略

    有关重要的安全信息,请参阅《Amazon IoT SiteWise用户指南》中的 BatchPutAssetPropertyValue 授权

导出到 Amazon IoT SiteWise

要创建导出到的流Amazon IoT SiteWise,您的 Greengrass 组件会创建一个包含一个或多个IoTSiteWiseConfig对象的导出定义的流。此对象定义导出设置,例如批次大小、批次间隔和优先级。

当您的 Greengrass 组件从设备接收资产属性数据时,它们会将包含数据的消息附加到目标流。消息是 JSON 序列化PutAssetPropertyValueEntry对象,包含一个或多个资产属性的属性值。有关更多信息,请参阅为Amazon IoT SiteWise导出目的地附加消息

注意

当您将数据发送到时Amazon IoT SiteWise,您的数据必须满足BatchPutAssetPropertyValue操作的要求。有关更多信息,请参阅《Amazon IoT SiteWise API 参考》中的 BatchPutAssetPropertyValue

然后,流管理器根据流的导出配置中定义的批处理设置和优先级导出数据。

您可以调整直播管理器设置和 Greengrass 组件逻辑来设计导出策略。例如:

  • 对于近乎实时的导出,请设置较低的批量大小和间隔设置,并在收到数据时将数据附加到流中。

  • 为了优化批处理、缓解带宽限制或最大限度地降低成本,您的 Greengrass 组件可以在将数据附加到数据流之前,将为单个资产属性接收到的 timestamp-quality-value (TQV) 数据点汇集在一起。一种策略是在一条消息中批量输入多达 10 个不同的财产资产组合或属性别名,而不是为同一个属性发送多个条目。这有助于直播管理员保持在Amazon IoT SiteWise配额范围内。

Amazon S3

Amazon S3。Amazon S3。 ,A mazon S3? Amazon Simple Storage Servic e》

在 Stream Manager SDK 中,您的 Greengrass 组件使用S3ExportTaskExecutorConfig来定义此目标类型的导出配置。,SDK SDK SDK:

  • Python 软件开发工具包ExportTaskExecutorConfig中的 S@@ 3

  • Java 软件开发工具包ExportTaskExecutorConfig中的 S3

  • Node.js 软件开发工具包ExportTaskExecutorConfig中的 S3

要求

此出口目的地有以下要求:

  • AmazonAmazon Web Services 账户 S3。

  • 如果在 Greengrass 容器模式下运行的 Lambda 函数将输入文件写入输入文件目录,则必须将该目录作为一个卷安装到具有写入权限的容器中。这样可以确保将文件写入根文件系统,并且在容器外部运行的流管理器组件可以看到这些文件。

  • 如果 Docker 容器组件将输入文件写入输入文件目录,则必须将该目录作为卷安装到具有写入权限的容器中。这样可以确保将文件写入根文件系统,并且在容器外部运行的流管理器组件可以看到这些文件。

  • 授权核心设备与Amazon服务必须允许对目标存储桶具有以下权限。例如:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts" ], "Resource": [ "arn:aws:s3:::bucket-1-name/*", "arn:aws:s3:::bucket-2-name/*" ] } ] }

    例如,通过使用通配符*命名方案,您可以授予对资源的精细或有条件的访问权限。有关更多信息,请参阅 IAM 用户指南中的添加和删除 IAM 策略

Amazon S3

要创建导出到 Amazon S3 的流,您的 Greengrass 组件使用该S3ExportTaskExecutorConfig对象来配置导出策略。该策略定义了导出设置,例如分段上传阈值和优先级。对于 Amazon S3 导出,流管理器上传它从核心设备上的本地文件读取的数据。要启动上传,您的 Greengrass 组件会将导出任务附加到目标流。导出任务包含有关输入文件和目标 Amazon S3 对象的信息。流管理器按照任务附加到流的顺序运行任务。

注意

目标存储桶必须已经存在于您的存储桶中Amazon Web Services 账户。 如果指定密钥的对象不存在,则流管理器会为您创建该对象。

流管理器使用分段上传阈值属性、最小分段大小设置和输入文件的大小来确定如何上传数据。分段上传阈值必须大于或等于最小分段大小。parallel。

指定目标 Amazon S3 对象的密钥可以在!{timestamp:value}占位符中包含有效的 Java DateTimeFormatter 字符串。您可以使用这些时间戳占位符根据输入文件数据的上传时间对 Amazon S3 中的数据进行分区。例如,以下密钥名称解析为一个值,例如my-key/2020/12/31/data.txt

my-key/!{timestamp:YYYY}/!{timestamp:MM}/!{timestamp:dd}/data.txt
注意

如果要监视流的导出状态,请先创建状态流,然后将导出流配置为使用它。有关更多信息,请参阅 监控导出任务

管理输入数据

您可以编写 IoT 应用程序用来管理输入数据生命周期的代码。以下示例工作流程显示了如何使用 Greengrass 组件来管理这些数据。

  1. 本地进程从设备或外围设备接收数据,然后将数据写入核心设备目录中的文件中。这些是直播管理器的输入文件。

  2. 创建新文件时,Greengra ss 组件会扫描该目录并将导出任务附加到目标流。该任务是一个 JSON 序列化S3ExportTaskDefinition对象,它指定了输入文件的 URL、目标 Amazon S3 存储桶和密钥以及可选的用户元数据。

  3. 流管理器读取输入文件并将数据按附加任务的顺序导出到 Amazon S3。目标存储桶必须已经存在于您的存储桶中Amazon Web Services 账户。 如果指定密钥的对象不存在,则流管理器会为您创建该对象。

  4. Greengrass 组件从状态流中读取消息以监控导出状态。导出任务完成后,Greengrass 组件可以删除相应的输入文件。有关更多信息,请参阅 监控导出任务

监控导出任务

,Amazon S3。您的 Greengrass 组件必须创建状态流,然后将导出流配置为向状态流写入状态更新。单个状态流可以接收来自导出到 Amazon S3 的多个流的状态更新。

首先,创建一个用作状态流的流。您可以配置流的大小和保留策略,以控制状态消息的有效期。例如:

  • Memory如果您不想存储状态消息,请设置Persistence为。

  • 设置StrategyOnFull为,OverwriteOldestData这样新的状态消息就不会丢失。

然后,创建或更新导出流以使用状态流。具体而言,设置流S3ExportTaskExecutorConfig导出配置的状态配置属性。此设置告诉流管理器将有关导出任务的状态消息写入状态流。在StatusConfig对象中,指定状态流的名称和详细程度。以下支持的值范围从最不详细 (ERROR) 到最详细 (TRACE) 不等。默认为 INFO

  • ERROR

  • WARN

  • INFO

  • DEBUG

  • TRACE

以下示例工作流程显示了 Greengrass 组件如何使用状态流来监控导出状态。

  1. 如前一工作流程所述,Greengras s 组件将导出任务附加到流中,该流配置为将有关导出任务的状态消息写入状态流。追加操作返回代表任务 ID 的序列号。

  2. Greengrass 组件按顺序读取状态流中的消息,然后根据流名称和任务 ID 或消息上下文中的导出任务属性筛选消息。例如,Greengrass 组件可以按导出任务的输入文件 URL 进行筛选,该地址由消息上下文中的S3ExportTaskDefinition对象表示。

    以下状态代码表示导出任务已达到完成状态:

    • Success。 Amad。

    • Failure。 流管理器遇到错误,例如,指定的存储桶不存在。解决问题后,您可以再次将导出任务附加到流中。

    • Canceled。 由于删除了流或导出定义,或者任务的 time-to-live (TTL) 期限已过期,任务已停止。

    注意

    该任务的状态也可能为InProgressWarning。当事件返回不影响任务执行的错误时,流管理器会发出警告。例如,未能清理部分上传会返回警告。

  3. 导出任务完成后,Greengrass 组件可以删除相应的输入文件。

以下示例显示了 Greengrass 组件如何读取和处理状态消息。

Python
import time from stream_manager import ( ReadMessagesOptions, Status, StatusConfig, StatusLevel, StatusMessage, StreamManagerClient, ) from stream_manager.util import Util client = StreamManagerClient() try: # Read the statuses from the export status stream is_file_uploaded_to_s3 = False while not is_file_uploaded_to_s3: try: messages_list = client.read_messages( "StatusStreamName", ReadMessagesOptions(min_message_count=1, read_timeout_millis=1000) ) for message in messages_list: # Deserialize the status message first. status_message = Util.deserialize_json_bytes_to_obj(message.payload, StatusMessage) # Check the status of the status message. If the status is "Success", # the file was successfully uploaded to S3. # If the status was either "Failure" or "Cancelled", the server was unable to upload the file to S3. # We will print the message for why the upload to S3 failed from the status message. # If the status was "InProgress", the status indicates that the server has started uploading # the S3 task. if status_message.status == Status.Success: logger.info("Successfully uploaded file at path " + file_url + " to S3.") is_file_uploaded_to_s3 = True elif status_message.status == Status.Failure or status_message.status == Status.Canceled: logger.info( "Unable to upload file at path " + file_url + " to S3. Message: " + status_message.message ) is_file_uploaded_to_s3 = True time.sleep(5) except StreamManagerException: logger.exception("Exception while running") except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 软件开发工具包参考:读取消息 | StatusMessage

Java
import com.amazonaws.greengrass.streammanager.client.StreamManagerClient; import com.amazonaws.greengrass.streammanager.client.StreamManagerClientFactory; import com.amazonaws.greengrass.streammanager.client.utils.ValidateAndSerialize; import com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions; import com.amazonaws.greengrass.streammanager.model.Status; import com.amazonaws.greengrass.streammanager.model.StatusConfig; import com.amazonaws.greengrass.streammanager.model.StatusLevel; import com.amazonaws.greengrass.streammanager.model.StatusMessage; try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) { try { boolean isS3UploadComplete = false; while (!isS3UploadComplete) { try { // Read the statuses from the export status stream List<Message> messages = client.readMessages("StatusStreamName", new ReadMessagesOptions().withMinMessageCount(1L).withReadTimeoutMillis(1000L)); for (Message message : messages) { // Deserialize the status message first. StatusMessage statusMessage = ValidateAndSerialize.deserializeJsonBytesToObj(message.getPayload(), StatusMessage.class); // Check the status of the status message. If the status is "Success", the file was successfully uploaded to S3. // If the status was either "Failure" or "Canceled", the server was unable to upload the file to S3. // We will print the message for why the upload to S3 failed from the status message. // If the status was "InProgress", the status indicates that the server has started uploading the S3 task. if (Status.Success.equals(statusMessage.getStatus())) { System.out.println("Successfully uploaded file at path " + FILE_URL + " to S3."); isS3UploadComplete = true; } else if (Status.Failure.equals(statusMessage.getStatus()) || Status.Canceled.equals(statusMessage.getStatus())) { System.out.println(String.format("Unable to upload file at path %s to S3. Message %s", statusMessage.getStatusContext().getS3ExportTaskDefinition().getInputUrl(), statusMessage.getMessage())); sS3UploadComplete = true; } } } catch (StreamManagerException ignored) { } finally { // Sleep for sometime for the S3 upload task to complete before trying to read the status message. Thread.sleep(5000); } } catch (e) { // Properly handle errors. } } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK 参考:readMessag es | StatusMessage

Node.js
const { StreamManagerClient, ReadMessagesOptions, Status, StatusConfig, StatusLevel, StatusMessage, util, } = require(*'aws-greengrass-stream-manager-sdk'*); const client = new StreamManagerClient(); client.onConnected(async () => { try { let isS3UploadComplete = false; while (!isS3UploadComplete) { try { // Read the statuses from the export status stream const messages = await c.readMessages("StatusStreamName", new ReadMessagesOptions() .withMinMessageCount(1) .withReadTimeoutMillis(1000)); messages.forEach((message) => { // Deserialize the status message first. const statusMessage = util.deserializeJsonBytesToObj(message.payload, StatusMessage); // Check the status of the status message. If the status is 'Success', the file was successfully uploaded to S3. // If the status was either 'Failure' or 'Cancelled', the server was unable to upload the file to S3. // We will print the message for why the upload to S3 failed from the status message. // If the status was "InProgress", the status indicates that the server has started uploading the S3 task. if (statusMessage.status === Status.Success) { console.log(`Successfully uploaded file at path ${FILE_URL} to S3.`); isS3UploadComplete = true; } else if (statusMessage.status === Status.Failure || statusMessage.status === Status.Canceled) { console.log(`Unable to upload file at path ${FILE_URL} to S3. Message: ${statusMessage.message}`); isS3UploadComplete = true; } }); // Sleep for sometime for the S3 upload task to complete before trying to read the status message. await new Promise((r) => setTimeout(r, 5000)); } catch (e) { // Ignored } } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 参考:readMessag es | StatusMessage