使用 StreamManagerClient 处理流 - Amazon IoT Greengrass
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

您正在查看Amazon IoT Greengrass Version 1.Amazon IoT Greengrass Version 2是最新的主要版本Amazon IoT Greengrass. 有关使用Amazon IoT Greengrass V2,请参阅Amazon IoT Greengrass Version 2开发人员指南.

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 StreamManagerClient 处理流

用户定义的 Lambda 函数在Amazon IoT Greengrass核心可以使用StreamManagerClient对象Amazon IoT Greengrass核心开发工具包创建流流管理器,然后与流进行交互。当 Lambda 函数创建一个流时,它定义了 Amazon Web Services 云 目标、优先级以及其他导出和数据保留策略。要将数据发送到流管理器,Lambda 函数会将数据附加到流。如果为流定义了导出目标,流管理器会自动导出流。

注意

通常,流管理器的客户端是用户定义的 Lambda 函数。如果您的业务案例需要它,您也可以允许在 Greengrass 核心上运行的非 Lambda 进程(例如 Docker 容器)与流管理器交互。有关更多信息,请参阅客户端身份验证

本主题中的代码段向您展示客户端如何调用StreamManagerClient方法处理流。有关方法及其参数的实现详细信息,请使用指向每个代码片段后面列出的开发工具包参考的链接。有关包含完整 Python Lambda 函数的教程,请参阅将数据流导出到 Amazon Web Services 云 (console)或者将数据流导出到 Amazon Web Services 云 (CLI).

你的 Lambda 函数应该实例化StreamManagerClient在函数处理程序之外。如果在处理程序中进行实例化,该函数每次被调用时都会创建一个 client 并连接到流管理器。

注意

如果在处理程序中实例化 StreamManagerClient,则必须在 client 完成其工作时显式调用 close() 方法。否则,client 会保持连接打开,并且另一个线程一直运行,直到脚本退出。

StreamManagerClient 支持以下操作:

创建消息流

要创建流,用户定义的 Lambda 函数会调用 create 方法并传入MessageStreamDefinition对象。此对象指定流的唯一名称,并定义当达到最大流大小时流管理器应如何处理新数据。您可以使用 MessageStreamDefinition 及其数据类型(如 ExportDefinitionStrategyOnFullPersistence)来定义其他流属性。包括:

  • 目标Amazon IoT Analytics, Kinesis Data Streams,Amazon IoT SiteWise,以及用于自动导出的 Amazon S3 目标。有关更多信息,请参阅导出支持的配置 Amazon Web Services 云 目的地

  • 导出优先级。流管理器先导出优先级较高的流,然后导出优先级较低的流。

  • 最大批处理大小和批处理间隔Amazon IoT Analytics、Kinesis Data Streams 和Amazon IoT SiteWise目标。当满足任一条件时,流管理器导出消息。

  • 生存时间 (TTL)。保证流数据可用于处理的时间量。您应确保数据可以在此时间段内使用。这不是删除策略。TTL 期限后可能不会立即删除数据。

  • 流持久性。选择将流保存到文件系统,以便在核心重新启动期间保留数据或将流保存在内存中。

  • 起始序列号。指定要用作导出中起始消息的消息的序列号。

有关 的更多信息MessageStreamDefinition,请参阅目标语言的 SDK 参考:

注意

StreamManagerClient还提供了一个可用于将流导出到 HTTP 服务器的目标。此目标仅用于测试目的。它不稳定,不支持在生产环境中使用。

创建流后,Lambda 函数可以附加消息传输到流以发送数据以进行导出,读取消息进行本地处理。您创建的流数量取决于您的硬件功能和业务案例。一种策略是为Amazon IoT Analytics或 Kinesis 数据流,尽管您可以为流定义多个目标。流具有持久的使用寿命。

Requirements

此操作具有以下要求:

  • 最低Amazon IoT Greengrass核心版本:1.10.0

  • 最低Amazon IoT Greengrass核心开发工具包版本:Python: 1.5.0 | Java:1.4.0 | Node.js:1.6.0

注意

创建具有Amazon IoT SiteWise或 Amazon S3 导出目标具有以下要求:

  • 最低Amazon IoT Greengrass核心版本:1.11.0

  • 最低Amazon IoT Greengrass核心开发工具包版本:Python: 版本 1.6.0 | Java:1.5.0 | Node.js:1.7.0

Examples

以下代码段创建一个名为 StreamName 的流。它在中定义流属性MessageStreamDefinition和从属数据类型。

Python
client = StreamManagerClient() try: client.create_message_stream(MessageStreamDefinition( name="StreamName", # Required. max_size=268435456, # Default is 256 MB. stream_segment_size=16777216, # Default is 16 MB. time_to_live_millis=None, # By default, no TTL is enabled. strategy_on_full=StrategyOnFull.OverwriteOldestData, # Required. persistence=Persistence.File, # Default is File. flush_on_write=False, # Default is false. export_definition=ExportDefinition( # Optional. Choose where/how the stream is exported to the Amazon Web Services 云 . kinesis=None, iot_analytics=None, iot_sitewise=None, s3=None ) )) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 开发工具包参考:创建 _ MessageStreams|MessageStreamDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456L) // Default is 256 MB. .withStreamSegmentSize(16777216L) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the Amazon Web Services 云 . new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) .withIotSiteWise(null) .withS3(null) ) ); } catch (StreamManagerException e) { // Properly handle exception. }

Java 软件开发工具包参考:createMessageStream|MessageStreamDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456) // Default is 256 MB. .withStreamSegmentSize(16777216) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the Amazon Web Services 云 . new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) .withIotSiteWise(null) .withS3(null) ) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js 开发工具包参考:createMessageStream|MessageStreamDefinition

有关配置导出目标的更多信息,请参阅导出支持的配置 Amazon Web Services 云 目的地.

 

附加消息

要将数据发送到流管理器以进行导出,您的 Lambda 函数会将数据附加到目标流。导出目标确定要传递给此方法的数据类型。

Requirements

此操作具有以下要求:

  • 最低Amazon IoT Greengrass核心版本:1.10.0

  • 最低Amazon IoT Greengrass核心开发工具包版本:Python: 1.5.0 | Java:1.4.0 | Node.js:1.6.0

注意

附加带有Amazon IoT SiteWise或 Amazon S3 导出目标具有以下要求:

  • 最低Amazon IoT Greengrass核心版本:1.11.0

  • 最低Amazon IoT Greengrass核心开发工具包版本:Python: 版本 1.6.0 | Java:1.5.0 | Node.js:1.7.0

Examples

Amazon IoT Analytics或 Kinesis Data Streams 导出目的地

以下代码段将消息附加到名为 StreamName 的流。适用于Amazon IoT Analytics或 Kinesis Data Streams 目标时,您的 Lambda 函数会追加一组数据。

此代码片段具有以下要求:

  • 最低Amazon IoT Greengrass核心版本:1.10.0

  • 最低Amazon IoT Greengrass核心开发工具包版本:Python: 1.5.0 | Java:1.4.0 | Node.js:1.6.0

Python
client = StreamManagerClient() try: sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data') except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 开发工具包参考:附加消息

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes()); } catch (StreamManagerException e) { // Properly handle exception. }

Java 软件开发工具包参考:appendMessage

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array")); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js 开发工具包参考:appendMessage

Amazon IoT SiteWise导出目标

以下代码段将消息附加到名为 StreamName 的流。适用于Amazon IoT SiteWise目标时,您的 Lambda 函数会附加一个序列化PutAssetPropertyValueEntry对象。有关更多信息,请参阅导出到 Amazon IoT SiteWise

注意

将数据发送给Amazon IoT SiteWise,则数据必须满足BatchPutAssetPropertyValueaction. 有关更多信息,请参阅 。BatchPutAssetPropertyValue中的Amazon IoT SiteWiseAPI 参考.

此代码片段具有以下要求:

  • 最低Amazon IoT Greengrass核心版本:1.11.0

  • 最低Amazon IoT Greengrass核心开发工具包版本:Python: 版本 1.6.0 | Java:1.5.0 | Node.js:1.7.0

Python
client = StreamManagerClient() try: # SiteWise requires unique timestamps in all messages and also needs timestamps not earlier # than 10 minutes in the past. Add some randomness to time and offset. # Note: To create a new asset property data, you should use the classes defined in the # greengrasssdk.stream_manager module. time_in_nanos = TimeInNanos( time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000) ) variant = Variant(double_value=random.random()) asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)] putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset) sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 开发工具包参考:附加消息|PutAssetPropertyValueEntry

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { Random rand = new Random(); // Note: To create a new asset property data, you should use the classes defined in the // com.amazonaws.greengrass.streammanager.model.sitewise package. List<AssetPropertyValue> entries = new ArrayList<>() ; // IoTSiteWise requires unique timestamps in all messages and also needs timestamps not earlier // than 10 minutes in the past. Add some randomness to time and offset. final int maxTimeRandomness = 60; final int maxOffsetRandomness = 10000; double randomValue = rand.nextDouble(); TimeInNanos timestamp = new TimeInNanos() .withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness)) .withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness))); AssetPropertyValue entry = new AssetPropertyValue() .withValue(new Variant().withDoubleValue(randomValue)) .withQuality(Quality.GOOD) .withTimestamp(timestamp); entries.add(entry); PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry() .withEntryId(UUID.randomUUID().toString()) .withPropertyAlias("PropertyAlias") .withPropertyValues(entries); long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry)); } catch (StreamManagerException e) { // Properly handle exception. }

Java 软件开发工具包参考:appendMessage|PutAssetPropertyValueEntry

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const maxTimeRandomness = 60; const maxOffsetRandomness = 10000; const randomValue = Math.random(); // Note: To create a new asset property data, you should use the classes defined in the // aws-greengrass-core-sdk StreamManager module. const timestamp = new TimeInNanos() .withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness)) .withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness)); const entry = new AssetPropertyValue() .withValue(new Variant().withDoubleValue(randomValue)) .withQuality(Quality.GOOD) .withTimestamp(timestamp); const putAssetPropertyValueEntry = new PutAssetPropertyValueEntry() .withEntryId(`${ENTRY_ID_PREFIX}${i}`) .withPropertyAlias("PropertyAlias") .withPropertyValues([entry]); const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry)); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js 开发工具包参考:appendMessage|PutAssetPropertyValueEntry

Amazon S3 出口目的地

以下代码段将导出任务附加到名为StreamName. 对于 Amazon S3 目标,您的 Lambda 函数会附加一个序列化S3ExportTaskDefinition对象,其中包含有关源输入文件和目标 Amazon S3 对象的信息。如果指定的对象不存在,则流管理器会为您创建它。有关更多信息,请参阅导出到 Amazon S3

此代码片段具有以下要求:

  • 最低Amazon IoT Greengrass核心版本:1.11.0

  • 最低Amazon IoT Greengrass核心开发工具包版本:Python: 版本 1.6.0 | Java:1.5.0 | Node.js:1.7.0

Python
client = StreamManagerClient() try: # Append an Amazon S3 Task definition and print the sequence number. s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName") sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 开发工具包参考:附加消息|S3 导出任务定义

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { // Append an Amazon S3 export task definition and print the sequence number. S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition() .withBucket("BucketName") .withKey("KeyName") .withInputUrl("URLToFile"); long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition)); } catch (StreamManagerException e) { // Properly handle exception. }

Java 软件开发工具包参考:appendMessage|S3 导出任务定义

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { // Append an Amazon S3 export task definition and print the sequence number. const taskDefinition = new S3ExportTaskDefinition() .withBucket("BucketName") .withKey("KeyName") .withInputUrl("URLToFile"); const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition))); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js 开发工具包参考:appendMessage|S3 导出任务定义

 

读取消息

从流读取消息。

Requirements

此操作具有以下要求:

  • 最低Amazon IoT Greengrass核心版本:1.10.0

  • 最低Amazon IoT Greengrass核心开发工具包版本:Python: 1.5.0 | Java:1.4.0 | Node.js:1.6.0

Examples

以下代码段读取名为 StreamName 的流中的消息。read 方法接受一个可选 ReadMessagesOptions 对象,该对象指定要开始读取的序列号、要读取的最小数量和最大数量以及读取消息的超时。

Python
client = StreamManagerClient() try: message_list = client.read_messages( stream_name="StreamName", # By default, if no options are specified, it tries to read one message from the beginning of the stream. options=ReadMessagesOptions( desired_start_sequence_number=100, # Try to read from sequence number 100 or greater. By default, this is 0. min_message_count=10, # Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. max_message_count=100, # Accept up to 100 messages. By default this is 1. read_timeout_millis=5000 # Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. ) ) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 开发工具包参考:读取消息|ReadMessagesOptions

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { List<Message> messages = client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100L) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. .withMinMessageCount(10L) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100L) // Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis()) ); } catch (StreamManagerException e) { // Properly handle exception. }

Java 软件开发工具包参考:readMessages|ReadMessagesOptions

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const messages = await client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1. .withMinMessageCount(10) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100) // Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(5 * 1000) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js 开发工具包参考:readMessages|ReadMessagesOptions

 

列出流

获取流管理器中的流列表。

Requirements

此操作具有以下要求:

  • 最低Amazon IoT Greengrass核心版本:1.10.0

  • 最低Amazon IoT Greengrass核心开发工具包版本:Python: 1.5.0 | Java:1.4.0 | Node.js:1.6.0

Examples

以下代码段获取流管理器中的流列表(按名称)。

Python
client = StreamManagerClient() try: stream_names = client.list_streams() except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 开发工具包参考:列表 _ 流

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { List<String> streamNames = client.listStreams(); } catch (StreamManagerException e) { // Properly handle exception. }

Java 软件开发工具包参考:listStreams

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const streams = await client.listStreams(); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js 开发工具包参考:listStreams

 

描述消息流

获取有关流的元数据,包括流定义、大小和导出状态。

Requirements

此操作具有以下要求:

  • 最低Amazon IoT Greengrass核心版本:1.10.0

  • 最低Amazon IoT Greengrass核心开发工具包版本:Python: 1.5.0 | Java:1.4.0 | Node.js:1.6.0

Examples

以下代码段获取有关名为 StreamName 的流的元数据,包括流的定义、大小和导出程序状态。

Python
client = StreamManagerClient() try: stream_description = client.describe_message_stream(stream_name="StreamName") if stream_description.export_statuses[0].error_message: # The last export of export destination 0 failed with some error # Here is the last sequence number that was successfully exported stream_description.export_statuses[0].last_exported_sequence_number if (stream_description.storage_status.newest_sequence_number > stream_description.export_statuses[0].last_exported_sequence_number): pass # The end of the stream is ahead of the last exported sequence number except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 开发工具包参考:描述 _ 消息 _ 流

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { MessageStreamInfo description = client.describeMessageStream("StreamName"); String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage(); if (lastErrorMessage != null && !lastErrorMessage.equals("")) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.getExportStatuses().get(0).getLastExportedSequenceNumber(); } if (description.getStorageStatus().getNewestSequenceNumber() > description.getExportStatuses().get(0).getLastExportedSequenceNumber()) { // The end of the stream is ahead of the last exported sequence number. } } catch (StreamManagerException e) { // Properly handle exception. }

Java 软件开发工具包参考:describeMessageStream

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const description = await client.describeMessageStream("StreamName"); const lastErrorMessage = description.exportStatuses[0].errorMessage; if (lastErrorMessage) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.exportStatuses[0].lastExportedSequenceNumber; } if (description.storageStatus.newestSequenceNumber > description.exportStatuses[0].lastExportedSequenceNumber) { // The end of the stream is ahead of the last exported sequence number. } } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js 开发工具包参考:describeMessageStream

 

更新消息流

更新现有流的属性。如果您的要求在创建流后发生变化,则可能需要更新流。例如:

  • 添加新导出配置用于 Amazon Web Services 云 目的地。

  • 增加流的最大大小以更改导出或保留数据的方式。例如,流大小与完整设置的策略结合在一起可能会导致数据被删除或拒绝,然后流管理器才能处理数据。

  • 暂停和恢复导出;例如,如果导出任务长时间运行,并且您想要提供上传数据。

您的 Lambda 函数遵循以下高级流程来更新流:

  1. 获取流的描述。

  2. 更新相应的MessageStreamDefinition和从属对象。

  3. 传入更新的MessageStreamDefinition. 确保包含更新流的完整对象定义。未定义的属性恢复为默认值。

    您可以指定消息的序列号,用作导出中的起始消息。

Requirements

此操作具有以下要求:

  • 最低Amazon IoT Greengrass核心版本:1.11.0

  • 最低Amazon IoT Greengrass核心开发工具包版本:Python: 版本 1.6.0 | Java:1.5.0 | Node.js:1.7.0

Examples

以下代码段更新名为的流StreamName. 它更新导出到 Kinesis Data Streams 的多个属性。

Python
client = StreamManagerClient() try: message_stream_info = client.describe_message_stream(STREAM_NAME) message_stream_info.definition.max_size=536870912 message_stream_info.definition.stream_segment_size=33554432 message_stream_info.definition.time_to_live_millis=3600000 message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData message_stream_info.definition.persistence=Persistence.Memory message_stream_info.definition.flush_on_write=False message_stream_info.definition.export_definition.kinesis= [KinesisConfig( # Updating Export definition to add a Kinesis Stream configuration. identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))] client.update_message_stream(message_stream_info.definition) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 开发工具包参考:更新邮件信息流|MessageStreamDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME); // Update the message stream with new values. client.updateMessageStream( messageStreamInfo.getDefinition() .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data. // Max Size update should be greater than initial Max Size defined in Create Message Stream request .withMaxSize(536870912L) // Update Max Size to 512 MB. .withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB. .withFlushOnWrite(true) // Update flush on write to true. .withPersistence(Persistence.Memory) // Update the persistence to Memory. .withTimeToLiveMillis(3600000L) // Update TTL to 1 hour. .withExportDefinition( // Optional. Choose where/how the stream is exported to the Amazon Web Services 云 . messageStreamInfo.getDefinition().getExportDefinition(). // Updating Export definition to add a Kinesis Stream configuration. .withKinesis(new ArrayList<KinesisConfig>() {{ add(new KinesisConfig() .withIdentifier(EXPORT_IDENTIFIER) .withKinesisStreamName("test")); }}) ); } catch (StreamManagerException e) { // Properly handle exception. }

Java 软件开发工具包参考:更新消息流|MessageStreamDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const messageStreamInfo = await c.describeMessageStream(STREAM_NAME); await client.updateMessageStream( messageStreamInfo.definition // Max Size update should be greater than initial Max Size defined in Create Message Stream request .withMaxSize(536870912) // Default is 256 MB. Updating Max Size to 512 MB. .withStreamSegmentSize(33554432) // Default is 16 MB. Updating Segment Size to 32 MB. .withTimeToLiveMillis(3600000) // By default, no TTL is enabled. Update TTL to 1 hour. .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data. .withPersistence(Persistence.Memory) // Default is File. Update the persistence to Memory .withFlushOnWrite(true) // Default is false. Updating to true. .withExportDefinition( // Optional. Choose where/how the stream is exported to the Amazon Web Services 云 . messageStreamInfo.definition.exportDefinition // Updating Export definition to add a Kinesis Stream configuration. .withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())]) ) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js 开发工具包参考:更新邮件信息流|MessageStreamDefinition

更新流的限制

以下约束在更新流时适用。除非在以下列表中注明,否则更新将立即生效。

  • 您无法更新流的持久性。要更改此行为,删除流创建流,它定义了新的持久性策略。

  • 只能在以下条件下更新流的最大大小:

    • 最大大小必须大于等于流的当前大小。要查找此信息,描述流,然后检查返回的MessageStreamInfo对象。

    • 最大大小必须大于等于流的段大小。

  • 您可以将流段大小更新为小于流最大大小的值。更新后的设置将应用于新区段。

  • 对存续时间 (TTL) 属性的更新应用于新的追加操作。如果减小此值,流管理器可能还会删除超过 TTL 的现有段。

  • 对完整属性策略的更新适用于新的追加操作。如果将策略设置为覆盖最早的数据,则流管理器可能还会根据新设置覆盖现有段。

  • 写入时刷新属性的更新应用于新邮件。

  • 导出配置的更新应用于新导出。更新请求必须包含要支持的所有导出配置。否则,流管理器将删除它们。

    • 更新导出配置时,请指定目标导出配置的标识符。

    • 要添加导出配置,请为新导出配置指定唯一标识符。

    • 要删除导出配置,请忽略导出配置。

  • 目的更新流中导出配置的起始序列号,则必须指定一个小于最新序列号的值。要查找此信息,描述流,然后检查返回的MessageStreamInfo对象。

 

删除消息流

删除流。删除流时,流的所有存储数据将从磁盘中删除。

Requirements

此操作具有以下要求:

  • 最低Amazon IoT Greengrass核心版本:1.10.0

  • 最低Amazon IoT Greengrass核心开发工具包版本:Python: 1.5.0 | Java:1.4.0 | Node.js:1.6.0

Examples

以下代码段删除名为 StreamName 的流。

Python
client = StreamManagerClient() try: client.delete_message_stream(stream_name="StreamName") except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 开发工具包参考:deleteMessageStream

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { client.deleteMessageStream("StreamName"); } catch (StreamManagerException e) { // Properly handle exception. }

Java 软件开发工具包参考:删除消息 _ 流

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.deleteMessageStream("StreamName"); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js 开发工具包参考:deleteMessageStream

另请参阅