使用 StreamManagerClient 处理流 - Amazon IoT Greengrass
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Amazon IoT Greengrass Version 1 2023 年 6 月 30 日进入延长寿命阶段。有关更多信息,请参阅 Amazon IoT Greengrass V1 维护策略。在此日期之后,将 Amazon IoT Greengrass V1 不会发布提供功能、增强功能、错误修复或安全补丁的更新。在上面运行的设备 Amazon IoT Greengrass V1 不会中断,将继续运行并连接到云端。我们强烈建议您迁移到 Amazon IoT Greengrass Version 2,这样可以添加重要的新功能支持其他平台

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 StreamManagerClient 处理流

在 Amazon IoT Greengrass 核心上运行的用户定义的 Lambda 函数可以使用 Amazon IoT Greengrass Core SDK 中的 StreamManagerClient 对象在流管理器中创建流,然后便可以与流进行交互。当 Lambda 函数创建流时,它会定义该流的 Amazon Web Services 云 目标、优先级以及其他导出和数据保留策略。要将数据发送到流管理器,Lambda 函数会将数据附加到流中。如果为流定义了导出目标,流管理器会自动导出流。

注意

通常,流管理器的客户端是用户定义的 Lambda 函数。如果您的业务案例需要它,您也可以允许在 Greengrass 核心上运行的非 Lambda 进程(例如 Docker 容器)与流管理器交互。有关更多信息,请参阅客户端身份验证

本主题中的代码段向您展示客户端如何调用 StreamManagerClient 方式处理流。有关方法及其参数的实现详细信息,请使用指向每个代码片段后面列出的开发工具包参考的链接。有关使用完整 Python Lambda 函数的教程,请参阅 将数据流导出到 Amazon Web Services 云(控制台)将数据流导出到 Amazon Web Services 云 (CLI)

您的 Lambda 函数应该在函数处理程序之外实例化 StreamManagerClient。如果在处理程序中进行实例化,该函数每次被调用时都会创建一个 client 并连接到流管理器。

注意

如果在处理程序中实例化 StreamManagerClient,则必须在 client 完成其工作时显式调用 close() 方法。否则,client 会保持连接打开,并且另一个线程一直运行,直到脚本退出。

StreamManagerClient 支持以下操作:

创建消息流

要创建流,用户定义的 Lambda 函数会调用 create 方法并传入一个 MessageStreamDefinition 对象。此对象指定流的唯一名称,并定义当达到最大流大小时,流管理器应如何处理新数据。您可以使用 MessageStreamDefinition 及其数据类型(如 ExportDefinitionStrategyOnFullPersistence)来定义其他流属性。其中包括:

  • 自动导出的目标 Amazon IoT Analytics、Kinesis Data Streams、Amazon IoT SiteWise 和 Amazon S3 目标。有关更多信息,请参阅导出支持的 Amazon Web Services 云 目标的配置

  • 导出优先级。流管理器先导出优先级较高的流,然后导出优先级较低的流。

  • Amazon IoT Analytics、Kinesis Data Streams 和 Amazon IoT SiteWise 目标的最大批处理大小和批处理间隔。当满足任一条件时,流管理器导出消息。

  • 生存时间 (TTL)。保证流数据可用于处理的时间量。您应确保数据可以在此时间段内使用。这不是删除策略。TTL 期限后可能不会立即删除数据。

  • 流持久性。选择将流保存到文件系统,以便在核心重新启动期间保留数据或将流保存在内存中。

  • 起始序列号。指定要在导出中用作起始消息的消息的序列号。

有关 MessageStreamDefinition 的更多信息,请参阅目标语言的开发工具包参考:

注意

StreamManagerClient 还提供了一个可用于将流导出到 HTTP 服务器的目标。此目标仅用于测试目的。其不稳定,或不支持在生产环境中使用。

创建流后,您的 Lambda 函数可以将消息附加到流中以发送数据以供导出,并从流中读取消息以进行本地处理。您创建的流数量取决于您的硬件功能和业务案例。一种策略是为 Amazon IoT Analytics 或 Kinesis 数据流中的每个目标通道创建一个流(尽管您可以为一个流定义多个目标)。流具有持久的使用寿命。

要求

此操作具有以下要求:

  • Amazon IoT Greengrass Core 最低版本:1.10.0

  • Amazon IoT Greengrass 核心软件开发工具包最低版本:Python:1.5.0 | Java:1.4.0 | Node.js:1.6.0

注意

要使用 Amazon IoT SiteWise 或 Amazon S3 导出目标创建流,需要满足以下要求:

  • Amazon IoT Greengrass Core 最低版本:1.11.0

  • Amazon IoT Greengrass 核心软件开发工具包最低版本:Python:1.6.0  |  Java:1.5.0  |  Node.js:1.7.0

示例

以下代码段创建一个名为 StreamName 的流。它定义了 MessageStreamDefinition 中的流属性和从属的数据类型。

Python
client = StreamManagerClient() try: client.create_message_stream(MessageStreamDefinition( name="StreamName", # Required. max_size=268435456, # Default is 256 MB. stream_segment_size=16777216, # Default is 16 MB. time_to_live_millis=None, # By default, no TTL is enabled. strategy_on_full=StrategyOnFull.OverwriteOldestData, # Required. persistence=Persistence.File, # Default is File. flush_on_write=False, # Default is false. export_definition=ExportDefinition( # Optional. Choose where/how the stream is exported to the Amazon Web Services 云. kinesis=None, iot_analytics=None, iot_sitewise=None, s3_task_executor=None ) )) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 开发工具包参考:create_message_stream | MessageStreamDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456L) // Default is 256 MB. .withStreamSegmentSize(16777216L) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the Amazon Web Services 云. new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) .withIotSitewise(null) .withS3TaskExecutor(null) ) ); } catch (StreamManagerException e) { // Properly handle exception. }

Java 开发工具包参考:createMessageStream | MessageStreamDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456) // Default is 256 MB. .withStreamSegmentSize(16777216) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the Amazon Web Services 云. new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) .withIotSitewise(null) .withS3TaskExecutor(null) ) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js 开发工具包参考:createMessageStream | MessageStreamDefinition

有关配置导出目标的更多信息,请参阅导出支持的 Amazon Web Services 云 目标的配置

 

附加消息

要将数据发送到流管理器进行导出,您的 Lambda 函数会将数据附加到目标流。导出目标决定要传递给此方法的数据类型。

要求

此操作具有以下要求:

  • Amazon IoT Greengrass Core 最低版本:1.10.0

  • Amazon IoT Greengrass 核心软件开发工具包最低版本:Python:1.5.0 | Java:1.4.0 | Node.js:1.6.0

注意

要附加 Amazon IoT SiteWise 或 Amazon S3 导出目标的消息,需要满足以下要求:

  • Amazon IoT Greengrass Core 最低版本:1.11.0

  • Amazon IoT Greengrass 核心软件开发工具包最低版本:Python:1.6.0  |  Java:1.5.0  |  Node.js:1.7.0

示例

Amazon IoT Analytics 或 Kinesis Data Streams 导出目标

以下代码段将消息附加到名为 StreamName 的流。对于 Amazon IoT Analytics 或 Kinesis Data Streams 目标,您的 Lambda 函数会附加一个数据 BLOB。

此代码段具有以下要求:

  • Amazon IoT Greengrass Core 最低版本:1.10.0

  • Amazon IoT Greengrass 核心软件开发工具包最低版本:Python:1.5.0 | Java:1.4.0 | Node.js:1.6.0

Python
client = StreamManagerClient() try: sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data') except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 开发工具包参考:append_message

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes()); } catch (StreamManagerException e) { // Properly handle exception. }

Java 开发工具包参考:appendMessage

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array")); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js 开发工具包参考:appendMessage

Amazon IoT SiteWise 导出目标

以下代码段将消息附加到名为 StreamName 的流。对于 Amazon IoT SiteWise 目标,您的 Lambda 函数会附加一个序列化 PutAssetPropertyValueEntry 对象。有关更多信息,请参阅导出到 Amazon IoT SiteWise

注意

当您将数据发送到 Amazon IoT SiteWise 时,数据必须满足 BatchPutAssetPropertyValue 操作的要求。有关更多信息,请参阅 Amazon IoT SiteWise API 参考中的 BatchPutAssetPropertyValue

此代码段具有以下要求:

  • Amazon IoT Greengrass Core 最低版本:1.11.0

  • Amazon IoT Greengrass 核心软件开发工具包最低版本:Python:1.6.0  |  Java:1.5.0  |  Node.js:1.7.0

Python
client = StreamManagerClient() try: # SiteWise requires unique timestamps in all messages. Add some randomness to time and offset. # Note: To create a new asset property data, you should use the classes defined in the # greengrasssdk.stream_manager module. time_in_nanos = TimeInNanos( time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000) ) variant = Variant(double_value=random.random()) asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)] putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset) sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK 参考:append_message | PutAssetPropertyValueEntry

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { Random rand = new Random(); // Note: To create a new asset property data, you should use the classes defined in the // com.amazonaws.greengrass.streammanager.model.sitewise package. List<AssetPropertyValue> entries = new ArrayList<>() ; // IoTSiteWise requires unique timestamps in all messages. Add some randomness to time and offset. final int maxTimeRandomness = 60; final int maxOffsetRandomness = 10000; double randomValue = rand.nextDouble(); TimeInNanos timestamp = new TimeInNanos() .withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness)) .withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness))); AssetPropertyValue entry = new AssetPropertyValue() .withValue(new Variant().withDoubleValue(randomValue)) .withQuality(Quality.GOOD) .withTimestamp(timestamp); entries.add(entry); PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry() .withEntryId(UUID.randomUUID().toString()) .withPropertyAlias("PropertyAlias") .withPropertyValues(entries); long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry)); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK 参考:appendMessage | PutAssetPropertyValueEntry

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const maxTimeRandomness = 60; const maxOffsetRandomness = 10000; const randomValue = Math.random(); // Note: To create a new asset property data, you should use the classes defined in the // aws-greengrass-core-sdk StreamManager module. const timestamp = new TimeInNanos() .withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness)) .withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness)); const entry = new AssetPropertyValue() .withValue(new Variant().withDoubleValue(randomValue)) .withQuality(Quality.GOOD) .withTimestamp(timestamp); const putAssetPropertyValueEntry = new PutAssetPropertyValueEntry() .withEntryId(`${ENTRY_ID_PREFIX}${i}`) .withPropertyAlias("PropertyAlias") .withPropertyValues([entry]); const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry)); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 参考:appendMessage | PutAssetPropertyValueEntry

Amazon S3 导出目标

以下代码段将导出任务附加到名为 StreamName 的流。对于 Amazon S3 目标,您的 Lambda 函数会附加一个序列化 S3ExportTaskDefinition 对象,其中包含有关源输入文件和目标 Amazon S3 对象的信息。如果指定的对象不存在,流管理器会为您创建。有关更多信息,请参阅导出到 Amazon S3

此代码段具有以下要求:

  • Amazon IoT Greengrass Core 最低版本:1.11.0

  • Amazon IoT Greengrass 核心软件开发工具包最低版本:Python:1.6.0  |  Java:1.5.0  |  Node.js:1.7.0

Python
client = StreamManagerClient() try: # Append an Amazon S3 Task definition and print the sequence number. s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName") sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK 参考:append_message | S3ExportTaskDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { // Append an Amazon S3 export task definition and print the sequence number. S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition() .withBucket("BucketName") .withKey("KeyName") .withInputUrl("URLToFile"); long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition)); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK 参考:appendMessage | S3ExportTaskDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { // Append an Amazon S3 export task definition and print the sequence number. const taskDefinition = new S3ExportTaskDefinition() .withBucket("BucketName") .withKey("KeyName") .withInputUrl("URLToFile"); const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition))); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 参考:appendMessage | S3ExportTaskDefinition

 

读取消息

从流读取消息。

要求

此操作具有以下要求:

  • Amazon IoT Greengrass Core 最低版本:1.10.0

  • Amazon IoT Greengrass 核心软件开发工具包最低版本:Python:1.5.0 | Java:1.4.0 | Node.js:1.6.0

示例

以下代码段读取名为 StreamName 的流中的消息。read 方法接受一个可选 ReadMessagesOptions 对象,该对象指定要开始读取的序列号、要读取的最小数量和最大数量以及读取消息的超时。

Python
client = StreamManagerClient() try: message_list = client.read_messages( stream_name="StreamName", # By default, if no options are specified, it tries to read one message from the beginning of the stream. options=ReadMessagesOptions( desired_start_sequence_number=100, # Try to read from sequence number 100 or greater. By default, this is 0. min_message_count=10, # Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. max_message_count=100, # Accept up to 100 messages. By default this is 1. read_timeout_millis=5000 # Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. ) ) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 开发工具包参考:read_messages | ReadMessagesOptions

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { List<Message> messages = client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100L) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. .withMinMessageCount(10L) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100L) // Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis()) ); } catch (StreamManagerException e) { // Properly handle exception. }

Java 开发工具包参考:readMessages | ReadMessagesOptions

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const messages = await client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1. .withMinMessageCount(10) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100) // Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(5 * 1000) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js 开发工具包参考:readMessages | ReadMessagesOptions

 

列出流

在流管理器中获取流列表。

要求

此操作具有以下要求:

  • Amazon IoT Greengrass Core 最低版本:1.10.0

  • Amazon IoT Greengrass 核心软件开发工具包最低版本:Python:1.5.0 | Java:1.4.0 | Node.js:1.6.0

示例

以下代码段获取流管理器中的流列表(按名称)。

Python
client = StreamManagerClient() try: stream_names = client.list_streams() except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 开发工具包参考:list_streams

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { List<String> streamNames = client.listStreams(); } catch (StreamManagerException e) { // Properly handle exception. }

Java 开发工具包参考:listStreams

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const streams = await client.listStreams(); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js 开发工具包参考:listStreams

 

描述消息流

获取有关流的元数据,包括流定义、大小和导出状态。

要求

此操作具有以下要求:

  • Amazon IoT Greengrass Core 最低版本:1.10.0

  • Amazon IoT Greengrass 核心软件开发工具包最低版本:Python:1.5.0 | Java:1.4.0 | Node.js:1.6.0

示例

以下代码段获取有关名为 StreamName 的流的元数据,包括流的定义、大小和导出程序状态。

Python
client = StreamManagerClient() try: stream_description = client.describe_message_stream(stream_name="StreamName") if stream_description.export_statuses[0].error_message: # The last export of export destination 0 failed with some error # Here is the last sequence number that was successfully exported stream_description.export_statuses[0].last_exported_sequence_number if (stream_description.storage_status.newest_sequence_number > stream_description.export_statuses[0].last_exported_sequence_number): pass # The end of the stream is ahead of the last exported sequence number except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 开发工具包参考:describe_message_stream

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { MessageStreamInfo description = client.describeMessageStream("StreamName"); String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage(); if (lastErrorMessage != null && !lastErrorMessage.equals("")) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.getExportStatuses().get(0).getLastExportedSequenceNumber(); } if (description.getStorageStatus().getNewestSequenceNumber() > description.getExportStatuses().get(0).getLastExportedSequenceNumber()) { // The end of the stream is ahead of the last exported sequence number. } } catch (StreamManagerException e) { // Properly handle exception. }

Java 开发工具包参考:describeMessageStream

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const description = await client.describeMessageStream("StreamName"); const lastErrorMessage = description.exportStatuses[0].errorMessage; if (lastErrorMessage) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.exportStatuses[0].lastExportedSequenceNumber; } if (description.storageStatus.newestSequenceNumber > description.exportStatuses[0].lastExportedSequenceNumber) { // The end of the stream is ahead of the last exported sequence number. } } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js 开发工具包参考:describeMessageStream

 

更新消息流

更新现有流的属性。如果流创建后您的要求发生变化,则可能需要更新流。例如:

  • 为 Amazon Web Services 云 目标添加新的导出配置

  • 增加流的最大大小以更改数据的导出或保留方式。例如,将流大小与您在完整设置下的策略相结合,可能会导致数据在流管理器处理之前被删除或拒绝。

  • 暂停然后恢复导出;例如,如果导出任务运行时间较长,而您想对上传数据进行配给。

您的 Lambda 函数遵循以下高级流程来更新流:

  1. 获取流的描述。

  2. 更新相应 MessageStreamDefinition 和从属对象的目标属性。

  3. 传入更新后的 MessageStreamDefinition。请务必包含更新后的流的完整对象定义。未定义属性将恢复为默认值。

    可以指定要在导出中用作起始消息的消息的序列号。

要求

此操作具有以下要求:

  • Amazon IoT Greengrass Core 最低版本:1.11.0

  • Amazon IoT Greengrass 核心软件开发工具包最低版本:Python:1.6.0  |  Java:1.5.0  |  Node.js:1.7.0

示例

以下代码段更新名为 StreamName 的流。它会更新导出到 Kinesis Data Streams 的流的多个属性。

Python
client = StreamManagerClient() try: message_stream_info = client.describe_message_stream(STREAM_NAME) message_stream_info.definition.max_size=536870912 message_stream_info.definition.stream_segment_size=33554432 message_stream_info.definition.time_to_live_millis=3600000 message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData message_stream_info.definition.persistence=Persistence.Memory message_stream_info.definition.flush_on_write=False message_stream_info.definition.export_definition.kinesis= [KinesisConfig( # Updating Export definition to add a Kinesis Stream configuration. identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))] client.update_message_stream(message_stream_info.definition) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK 参考:updateMessageStream | MessageStreamDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME); // Update the message stream with new values. client.updateMessageStream( messageStreamInfo.getDefinition() .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data. // Max Size update should be greater than initial Max Size defined in Create Message Stream request .withMaxSize(536870912L) // Update Max Size to 512 MB. .withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB. .withFlushOnWrite(true) // Update flush on write to true. .withPersistence(Persistence.Memory) // Update the persistence to Memory. .withTimeToLiveMillis(3600000L) // Update TTL to 1 hour. .withExportDefinition( // Optional. Choose where/how the stream is exported to the Amazon Web Services 云. messageStreamInfo.getDefinition().getExportDefinition(). // Updating Export definition to add a Kinesis Stream configuration. .withKinesis(new ArrayList<KinesisConfig>() {{ add(new KinesisConfig() .withIdentifier(EXPORT_IDENTIFIER) .withKinesisStreamName("test")); }}) ); } catch (StreamManagerException e) { // Properly handle exception. }

Java 开发工具包参考:update_message_stream | MessageStreamDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const messageStreamInfo = await c.describeMessageStream(STREAM_NAME); await client.updateMessageStream( messageStreamInfo.definition // Max Size update should be greater than initial Max Size defined in Create Message Stream request .withMaxSize(536870912) // Default is 256 MB. Updating Max Size to 512 MB. .withStreamSegmentSize(33554432) // Default is 16 MB. Updating Segment Size to 32 MB. .withTimeToLiveMillis(3600000) // By default, no TTL is enabled. Update TTL to 1 hour. .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data. .withPersistence(Persistence.Memory) // Default is File. Update the persistence to Memory .withFlushOnWrite(true) // Default is false. Updating to true. .withExportDefinition( // Optional. Choose where/how the stream is exported to the Amazon Web Services 云. messageStreamInfo.definition.exportDefinition // Updating Export definition to add a Kinesis Stream configuration. .withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())]) ) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 参考:updateMessageStream | MessageStreamDefinition

更新流时的限制

更新流时,有以下限制。除非在以下列表中注明,否则更新会立即生效。

  • 无法更新流的持久性。要更改此行为,请删除该流然后创建一个流以定义新的持久性策略。

  • 只有在以下条件下,您才能更新流的最大大小:

    • 最大大小必须大于或等于流的当前大小。要查找此信息,请描述流,然后检查返回的 MessageStreamInfo 对象的存储状态。

    • 最大大小必须大于或等于流的段大小。

  • 可以将流的段大小更新为小于流的最大大小的值。更新的设置将应用于新的段。

  • 生存时间(TTL)属性的更新将应用于新的追加操作。如果您减小此值,流管理器也可能会删除超过 TTL 的现有段。

  • 对全属性策略的更新将应用于新的追加操作。如果您将策略设置为覆盖最旧的数据,则流管理器还可能根据新设置覆盖现有段。

  • 写入时刷新属性的更新将应用于新消息。

  • 导出配置的更新将应用于新的导出。更新请求必须包含您想要支持的所有导出配置。否则,流管理器会将其删除。

    • 更新导出配置时,请指定目标导出配置的标识符。

    • 要添加导出配置,请为新的导出配置指定唯一标识符。

    • 要删除导出配置,请省略导出配置。

  • 更新流中导出配置的起始序列号,必须指定一个小于最新序列号的值。要查找此信息,请描述流,然后检查返回的 MessageStreamInfo 对象的存储状态。

 

删除消息流

删除流。删除流时,流的所有存储数据将从磁盘中删除。

要求

此操作具有以下要求:

  • Amazon IoT Greengrass Core 最低版本:1.10.0

  • Amazon IoT Greengrass 核心软件开发工具包最低版本:Python:1.5.0 | Java:1.4.0 | Node.js:1.6.0

示例

以下代码段删除名为 StreamName 的流。

Python
client = StreamManagerClient() try: client.delete_message_stream(stream_name="StreamName") except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 开发工具包参考:deleteMessageStream

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { client.deleteMessageStream("StreamName"); } catch (StreamManagerException e) { // Properly handle exception. }

Java 开发工具包参考:delete_message_stream

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.deleteMessageStream("StreamName"); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js 开发工具包参考:deleteMessageStream

另请参阅