使用 StreamManagerClient 与流合作 - AWS IoT Greengrass
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 StreamManagerClient 与流合作

用户定义的 Lambda 功能在 AWS IoT Greengrass Core 可使用 StreamManagerClient 中的对象 AWS IoT Greengrass Core 开发工具包 以创建流 流管理器 然后与流交互。当 Lambda 函数创建流时,它会定义该流的 AWS 云目标、优先级以及其他导出和数据保留策略。要将数据发送到流管理器, Lambda 函数将数据附加到流。如果为流定义了导出目标,流管理器会自动导出流。

注意

通常,流管理器的客户端是用户定义的 Lambda 函数。如果您的业务案例需要它,您还可以允许 Greengrass 核心上运行的非 Lambda 进程(例如,Docker 容器)与流管理器交互。有关更多信息,请参阅客户端身份验证

本主题中的片段向您展示了客户如何呼叫 StreamManagerClient 处理流的方法。有关方法及其引数的实施详情,请使用每个代码段后列出的SDK参考链接。用于包含完整Python的教程 Lambda 函数,请参阅 将数据流导出到 AWS 云(控制台)将数据流导出到 AWS 云(CLI).

您的 Lambda 函数应实例化 StreamManagerClient 函数句柄之外。如果在处理程序中进行实例化,该函数每次被调用时都会创建一个 client 并连接到流管理器。

注意

如果在处理程序中实例化 StreamManagerClient,则必须在 client 完成其工作时显式调用 close() 方法。否则,client 会保持连接打开,并且另一个线程一直运行,直到脚本退出。

StreamManagerClient 支持以下操作:

创建消息流

要创建流, Lambda 函数调用创建方法,并在 MessageStreamDefinition 对象。此对象指定流的唯一名称并定义当达到最大流大小时流管理器如何处理新数据。您可以使用 MessageStreamDefinition 及其数据类型(如 ExportDefinitionStrategyOnFullPersistence)来定义其他流属性。包括:

  • 目标 AWS IoT Analytics, Kinesis Data Streams, AWS IoT SiteWise,和 Amazon S3 自动导出的目的地。有关更多信息,请参阅导出支持的配置 AWS 云目的地

  • 导出优先级。流管理器先导出优先级较高的流,然后导出优先级较低的流。

  • 最大批量和批次间隔 AWS IoT Analytics, Kinesis Data Streams,和 AWS IoT SiteWise 目的地。当满足任一条件时,流管理器导出消息。

  • 生存时间 (TTL)。保证流数据可用于处理的时间量。您应确保数据可以在此时间段内使用。这不是删除策略。TTL 期限后可能不会立即删除数据。

  • 流持久性。选择将流保存到文件系统,以便在核心重新启动期间保留数据或将流保存在内存中。

  • 启动序列号。指定要在导出中用作开始消息的消息的序列号。

有关 MessageStreamDefinition,请参阅目标语言的SDK参考:

注意

StreamManagerClient 还提供了一个目标目标目标,可用于将流导出到 HTTP 服务器。此目标仅用于测试目的。它不稳定,也不支持在生产环境中使用。

创建流后,您的 Lambda 功能可以 附加消息 到流以发送数据以供导出,以及 已读消息 流进行本地处理。您创建的流数量取决于您的硬件功能和业务案例。一种策略是为 AWS IoT Analytics 或Kinesis数据流,但您可以为流定义多个目标。流具有持久的使用寿命。

Requirements

此操作有以下要求:

  • 最低 AWS IoT Greengrass 核心版本: 1.10.0

  • 最低 AWS IoT Greengrass Core 开发工具包 版本: Python: 1.5.0 | Java:   1.4.0 | Node.js:   1.6.0

注意

使用创建一个流 AWS IoT SiteWise 或 Amazon S3 出口目的地有以下要求:

  • 最低 AWS IoT Greengrass 核心版本: 1.11.0

  • 最低 AWS IoT Greengrass Core 开发工具包 版本: Python: 1.6.0 | Java:   1.5.0 | Node.js:   1.7.0

Examples

以下片段创建了一个名为 StreamName。它在中定义流属性 MessageStreamDefinition 和下属数据类型。

Python
client = StreamManagerClient() try: client.create_message_stream(MessageStreamDefinition( name="StreamName", # Required. max_size=268435456, # Default is 256 MB. stream_segment_size=16777216, # Default is 16 MB. time_to_live_millis=None, # By default, no TTL is enabled. strategy_on_full=StrategyOnFull.OverwriteOldestData, # Required. persistence=Persistence.File, # Default is File. flush_on_write=False, # Default is false. export_definition=ExportDefinition( # Optional. Choose where/how the stream is exported to the AWS Cloud. kinesis=None, iot_analytics=None, iot_sitewise=None, s3=None ) )) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

PythonSDK参考: 创建消息流 |内测 MessageStreamDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456L) // Default is 256 MB. .withStreamSegmentSize(16777216L) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS Cloud. new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) .withIotSiteWise(null) .withS3(null) ) ); } catch (StreamManagerException e) { // Properly handle exception. }

JavaSDK参考: createMessageStream |内测 MessageStreamDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456) // Default is 256 MB. .withStreamSegmentSize(16777216) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS Cloud. new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) .withIotSiteWise(null) .withS3(null) ) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.jsSDK参考: createMessageStream |内测 MessageStreamDefinition

有关配置导出目的地的更多信息,请参阅 导出支持的配置 AWS 云目的地.

 

附加消息

要将数据发送到流管理器进行导出,您的 Lambda 函数将数据附加到目标流。导出目的地确定要传递给此方法的数据类型。

Requirements

此操作有以下要求:

  • 最低 AWS IoT Greengrass 核心版本: 1.10.0

  • 最低 AWS IoT Greengrass Core 开发工具包 版本: Python: 1.5.0 | Java:   1.4.0 | Node.js:   1.6.0

注意

使用附加消息 AWS IoT SiteWise 或 Amazon S3 出口目的地有以下要求:

  • 最低 AWS IoT Greengrass 核心版本: 1.11.0

  • 最低 AWS IoT Greengrass Core 开发工具包 版本: Python: 1.6.0 | Java:   1.5.0 | Node.js:   1.7.0

Examples

AWS IoT Analytics 或 Kinesis Data Streams 导出目的地

以下片段将消息附加到名为的流中 StreamName。对于 AWS IoT Analytics 或 Kinesis Data Streams 目的地,您的 Lambda 函数附加数据块。

此代码段有以下要求:

  • 最低 AWS IoT Greengrass 核心版本: 1.10.0

  • 最低 AWS IoT Greengrass Core 开发工具包 版本: Python: 1.5.0 | Java:   1.4.0 | Node.js:   1.6.0

Python
client = StreamManagerClient() try: sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data') except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

PythonSDK参考: 追加_消息

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes()); } catch (StreamManagerException e) { // Properly handle exception. }

JavaSDK参考: appendMessage

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array")); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.jsSDK参考: appendMessage

AWS IoT SiteWise 导出目的地

以下片段将消息附加到名为的流中 StreamName。对于 AWS IoT SiteWise 目的地,您的 Lambda 函数附加序列化 PutAssetPropertyValueEntry 对象。有关更多信息,请参阅导出到 AWS IoT SiteWise

注意

在将数据发送到 AWS IoT SiteWise 时,数据必须满足 BatchPutAssetPropertyValue 操作的要求。有关更多信息,请参阅 AWS IoT SiteWise API 参考 中的 BatchPutAssetPropertyValue

此代码段有以下要求:

  • 最低 AWS IoT Greengrass 核心版本: 1.11.0

  • 最低 AWS IoT Greengrass Core 开发工具包 版本: Python: 1.6.0 | Java:   1.5.0 | Node.js:   1.7.0

Python
client = StreamManagerClient() try: # SiteWise requires unique timestamps in all messages and also needs timestamps not earlier # than 10 minutes in the past. Add some randomness to time and offset. # Note: To create a new asset property data, you should use the classes defined in the # greengrasssdk.stream_manager module. time_in_nanos = TimeInNanos( time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000) ) variant = Variant(double_value=random.random()) asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)] putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset) sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

PythonSDK参考: 追加_消息 |内测 PutAssetPropertyValueEntry

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { Random rand = new Random(); // Note: To create a new asset property data, you should use the classes defined in the // com.amazonaws.greengrass.streammanager.model.sitewise package. List<AssetPropertyValue> entries = new ArrayList<>() ; // IoTSiteWise requires unique timestamps in all messages and also needs timestamps not earlier // than 10 minutes in the past. Add some randomness to time and offset. final int maxTimeRandomness = 60; final int maxOffsetRandomness = 10000; double randomValue = rand.nextDouble(); TimeInNanos timestamp = new TimeInNanos() .withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness)) .withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness))); AssetPropertyValue entry = new AssetPropertyValue() .withValue(new Variant().withDoubleValue(randomValue)) .withQuality(Quality.GOOD) .withTimestamp(timestamp); entries.add(entry); PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry() .withEntryId(UUID.randomUUID().toString()) .withPropertyAlias("PropertyAlias") .withPropertyValues(entries); long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry)); } catch (StreamManagerException e) { // Properly handle exception. }

JavaSDK参考: appendMessage |内测 PutAssetPropertyValueEntry

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const maxTimeRandomness = 60; const maxOffsetRandomness = 10000; const randomValue = Math.random(); // Note: To create a new asset property data, you should use the classes defined in the // aws-greengrass-core-sdk StreamManager module. const timestamp = new TimeInNanos() .withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness)) .withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness)); const entry = new AssetPropertyValue() .withValue(new Variant().withDoubleValue(randomValue)) .withQuality(Quality.GOOD) .withTimestamp(timestamp); const putAssetPropertyValueEntry = new PutAssetPropertyValueEntry() .withEntryId(`${ENTRY_ID_PREFIX}${i}`) .withPropertyAlias("PropertyAlias") .withPropertyValues([entry]); const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry)); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.jsSDK参考: appendMessage |内测 PutAssetPropertyValueEntry

Amazon S3 导出目的地

以下片段将一个名为的导出任务附加到流中 StreamName。对于 Amazon S3 目的地,您的 Lambda 函数附加序列化 S3ExportTaskDefinition 包含有关源输入文件和目标的信息的对象 Amazon S3 对象。如果指定的对象不存在,StreamManager会为您创建该对象。有关更多信息,请参阅导出到 Amazon S3

此代码段有以下要求:

  • 最低 AWS IoT Greengrass 核心版本: 1.11.0

  • 最低 AWS IoT Greengrass Core 开发工具包 版本: Python: 1.6.0 | Java:   1.5.0 | Node.js:   1.7.0

Python
client = StreamManagerClient() try: # Append an Amazon S3 Task definition and print the sequence number. s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName") sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

PythonSDK参考: 追加_消息 |内测 S3ExportTaskDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { // Append an Amazon S3 export task definition and print the sequence number. S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition() .withBucket("BucketName") .withKey("KeyName") .withInputUrl("URLToFile"); long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition)); } catch (StreamManagerException e) { // Properly handle exception. }

JavaSDK参考: appendMessage |内测 S3ExportTaskDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { // Append an Amazon S3 export task definition and print the sequence number. const taskDefinition = new S3ExportTaskDefinition() .withBucket("BucketName") .withKey("KeyName") .withInputUrl("URLToFile"); const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition))); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.jsSDK参考: appendMessage |内测 S3ExportTaskDefinition

 

读取消息

从流读取消息。

Requirements

此操作有以下要求:

  • 最低 AWS IoT Greengrass 核心版本: 1.10.0

  • 最低 AWS IoT Greengrass Core 开发工具包 版本: Python: 1.5.0 | Java:   1.4.0 | Node.js:   1.6.0

Examples

以下片段从名为的流中读取消息 StreamName。读取方法采用可选方法 ReadMessagesOptions 指定开始读取的序列号、要读取的最小和最大数字以及读取消息超时的对象。

Python
client = StreamManagerClient() try: message_list = client.read_messages( stream_name="StreamName", # By default, if no options are specified, it tries to read one message from the beginning of the stream. options=ReadMessagesOptions( desired_start_sequence_number=100, # Try to read from sequence number 100 or greater. By default, this is 0. min_message_count=10, # Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. max_message_count=100, # Accept up to 100 messages. By default this is 1. read_timeout_millis=5000 # Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. ) ) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

PythonSDK参考: 已读_消息 |内测 ReadMessagesOptions

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { List<Message> messages = client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100L) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. .withMinMessageCount(10L) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100L) // Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis()) ); } catch (StreamManagerException e) { // Properly handle exception. }

JavaSDK参考: readMessages |内测 ReadMessagesOptions

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const messages = await client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1. .withMinMessageCount(10) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100) // Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(5 * 1000) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.jsSDK参考: readMessages |内测 ReadMessagesOptions

 

列出流

获取流管理器中的流列表。

Requirements

此操作有以下要求:

  • 最低 AWS IoT Greengrass 核心版本: 1.10.0

  • 最低 AWS IoT Greengrass Core 开发工具包 版本: Python: 1.5.0 | Java:   1.4.0 | Node.js:   1.6.0

Examples

以下代码段获取流管理器中的流列表(按名称)。

Python
client = StreamManagerClient() try: stream_names = client.list_streams() except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

PythonSDK参考: list_流

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { List<String> streamNames = client.listStreams(); } catch (StreamManagerException e) { // Properly handle exception. }

JavaSDK参考: listStreams

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const streams = await client.listStreams(); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.jsSDK参考: listStreams

 

描述消息流

获取关于流的元数据,包括流定义、大小和导出状态。

Requirements

此操作有以下要求:

  • 最低 AWS IoT Greengrass 核心版本: 1.10.0

  • 最低 AWS IoT Greengrass Core 开发工具包 版本: Python: 1.5.0 | Java:   1.4.0 | Node.js:   1.6.0

Examples

以下代码段获取有关名为 StreamName 的流的元数据,包括流的定义、大小和导出程序状态。

Python
client = StreamManagerClient() try: stream_description = client.describe_message_stream(stream_name="StreamName") if stream_description.export_statuses[0].error_message: # The last export of export destination 0 failed with some error # Here is the last sequence number that was successfully exported stream_description.export_statuses[0].last_exported_sequence_number if (stream_description.storage_status.newest_sequence_number > stream_description.export_statuses[0].last_exported_sequence_number): pass # The end of the stream is ahead of the last exported sequence number except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

PythonSDK参考: 描述_消息_流

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { MessageStreamInfo description = client.describeMessageStream("StreamName"); String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage(); if (lastErrorMessage != null && !lastErrorMessage.equals("")) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.getExportStatuses().get(0).getLastExportedSequenceNumber(); } if (description.getStorageStatus().getNewestSequenceNumber() > description.getExportStatuses().get(0).getLastExportedSequenceNumber()) { // The end of the stream is ahead of the last exported sequence number. } } catch (StreamManagerException e) { // Properly handle exception. }

JavaSDK参考: describeMessageStream

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const description = await client.describeMessageStream("StreamName"); const lastErrorMessage = description.exportStatuses[0].errorMessage; if (lastErrorMessage) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.exportStatuses[0].lastExportedSequenceNumber; } if (description.storageStatus.newestSequenceNumber > description.exportStatuses[0].lastExportedSequenceNumber) { // The end of the stream is ahead of the last exported sequence number. } } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.jsSDK参考: describeMessageStream

 

更新消息流

更新现有流的属性。如果在创建流之后您的要求发生变化,您可能想要更新流。例如:

  • 添加新 导出配置 用于 AWS 云目标。

  • 增加流的最大大小以更改数据的导出或保留方式。例如,流大小与完整设置上的策略组合可能导致在流管理器可以处理之前数据被删除或拒绝。

  • 暂停和恢复导出;例如,如果导出任务运行很长时间,并且您要对上传数据求比例。

您的 Lambda 函数遵循此高级流程来更新流:

  1. 获取流的说明。

  2. 更新相应的目标属性 MessageStreamDefinition 和附属对象。

  3. 在更新 MessageStreamDefinition。确保包含更新流的完整对象定义。未定义的属性会还原为默认值。

    您可以指定要用作导出中的起始消息的消息的序列号。

Requirements

此操作有以下要求:

  • 最低 AWS IoT Greengrass 核心版本: 1.11.0

  • 最低 AWS IoT Greengrass Core 开发工具包 版本: Python: 1.6.0 | Java:   1.5.0 | Node.js:   1.7.0

Examples

以下片段更新名为的流 StreamName。它更新导出至的流的多个属性 Kinesis Data Streams.

Python
client = StreamManagerClient() try: message_stream_info = client.describe_message_stream(STREAM_NAME) message_stream_info.definition.max_size=536870912 message_stream_info.definition.stream_segment_size=33554432 message_stream_info.definition.time_to_live_millis=3600000 message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData message_stream_info.definition.persistence=Persistence.Memory message_stream_info.definition.flush_on_write=False message_stream_info.definition.export_definition.kinesis= [KinesisConfig( # Updating Export definition to add a Kinesis Stream configuration. identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))] client.update_message_stream(message_stream_info.definition) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

PythonSDK参考: updateMessageStream |内测 MessageStreamDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME); // Update the message stream with new values. client.updateMessageStream( messageStreamInfo.getDefinition() .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data. // Max Size update should be greater than initial Max Size defined in Create Message Stream request .withMaxSize(536870912L) // Update Max Size to 512 MB. .withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB. .withFlushOnWrite(true) // Update flush on write to true. .withPersistence(Persistence.Memory) // Update the persistence to Memory. .withTimeToLiveMillis(3600000L) // Update TTL to 1 hour. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS Cloud. messageStreamInfo.getDefinition().getExportDefinition(). // Updating Export definition to add a Kinesis Stream configuration. .withKinesis(new ArrayList<KinesisConfig>() {{ add(new KinesisConfig() .withIdentifier(EXPORT_IDENTIFIER) .withKinesisStreamName("test")); }}) ); } catch (StreamManagerException e) { // Properly handle exception. }

JavaSDK参考: 更新_消息_流 |内测 MessageStreamDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const messageStreamInfo = await c.describeMessageStream(STREAM_NAME); await client.updateMessageStream( messageStreamInfo.definition // Max Size update should be greater than initial Max Size defined in Create Message Stream request .withMaxSize(536870912) // Default is 256 MB. Updating Max Size to 512 MB. .withStreamSegmentSize(33554432) // Default is 16 MB. Updating Segment Size to 32 MB. .withTimeToLiveMillis(3600000) // By default, no TTL is enabled. Update TTL to 1 hour. .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data. .withPersistence(Persistence.Memory) // Default is File. Update the persistence to Memory .withFlushOnWrite(true) // Default is false. Updating to true. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS Cloud. messageStreamInfo.definition.exportDefinition // Updating Export definition to add a Kinesis Stream configuration. .withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())]) ) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.jsSDK参考: updateMessageStream |内测 MessageStreamDefinition

更新流的限制

当更新流时,将应用以下限制。除非在下面的列表中注明,否则更新立即生效。

  • 您无法更新流的持久性。要改变这个行为, 删除流创建流 定义新的持久性政策。

  • 您只能在以下条件下更新流的最大大小:

    • 最大大小必须大于或等于流的当前大小。要查找此信息,请描述流,然后检查返回的 MessageStreamInfo 对象的存储状态。

    • 最大大小必须大于或等于流的线段大小。

  • 您可以将流段大小更新为小于流最大大小的值。更新的设置适用于新段。

  • 对存活时间(TTL)属性的更新适用于新的附加操作。如果减小此值,流管理器也可能删除超过TTL的现有段。

  • 完整属性上的策略更新适用于新的附加操作。如果将策略设置为覆盖最旧的数据,则流管理器也可能基于新设置覆盖现有段。

  • 对写入刷新属性的更新适用于新消息。

  • 导出配置的更新适用于新导出。更新请求必须包含要支持的所有导出配置。否则,流管理器会删除它们。

    • 更新导出配置时,请指定目标导出配置的标识符。

    • 要添加导出配置,请为新导出配置指定唯一标识符。

    • 要删除导出配置,请忽略导出配置。

  • 收件人 更新 流中导出配置的起始序列号,则必须指定小于最新序列号的值。要查找此信息,请描述流,然后检查返回的 MessageStreamInfo 对象的存储状态。

 

删除消息流

删除流。删除流时,流的所有存储数据将从磁盘中删除。

Requirements

此操作有以下要求:

  • 最低 AWS IoT Greengrass 核心版本: 1.10.0

  • 最低 AWS IoT Greengrass Core 开发工具包 版本: Python: 1.5.0 | Java:   1.4.0 | Node.js:   1.6.0

Examples

以下代码段删除名为 StreamName 的流。

Python
client = StreamManagerClient() try: client.delete_message_stream(stream_name="StreamName") except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

PythonSDK参考: deleteMessageStream

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { client.deleteMessageStream("StreamName"); } catch (StreamManagerException e) { // Properly handle exception. }

JavaSDK参考: 删除_消息_流

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.deleteMessageStream("StreamName"); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.jsSDK参考: deleteMessageStream

另请参阅