使用 StreamManagerClient 处理流 - AWS IoT Greengrass
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

使用 StreamManagerClient 处理流

在 Lambda 上运行的用户定义 AWS IoT Greengrass Core 函数可以使用 AWS IoT Greengrass Core 开发工具包 中的 StreamManagerClient 对象在流管理器中创建流并与流进行交互。当 Lambda 函数创建流时,它会定义该流的 AWS 云目标、优先级以及其他导出和数据保留策略。如果定义了导出目标,流管理器会自动导出流。

注意

通常,流管理器的客户端是用户定义的 Lambda 函数。如果您的业务案例需要它,您可以允许在 Greengrass 核心上运行的非 Lambda 进程(例如 Docker 容器)与流管理器交互。有关更多信息,请参阅客户端身份验证

本主题中的代码段向您展示客户端如何使用 StreamManagerClient 处理流。有关方法及其参数的实现详细信息,请使用指向开发工具包参考的链接。有关使用完整 Python Lambda 函数的教程,请参阅将数据流导出到 AWS 云(控制台)将数据流导出到 AWS 云 (CLI)

您应该在函数处理程序之外实例化 StreamManagerClient。如果在处理程序中进行实例化,该函数每次被调用时都会创建一个 client 并连接到流管理器。

注意

如果在处理程序中实例化 StreamManagerClient,则必须在 client 完成其工作时显式调用 close() 方法。否则,client 会保持连接打开,并且另一个线程一直运行,直到脚本退出。

StreamManagerClient 支持以下操作:

创建消息流

要创建流,用户定义的 Lambda 函数调用 create 方法并传入 MessageStreamDefinition 对象。MessageStreamDefinition 包含流的唯一名称,并定义当达到最大流大小时流管理器应如何处理新数据。您可以使用 MessageStreamDefinition 及其数据类型(如 ExportDefinitionStrategyOnFullPersistence)来定义其他流属性。包括:

  • 目标 AWS IoT Analytics 通道和 Kinesis 数据流。流管理器自动将流导出到目标目的地。这些 AWS 云资源由客户创建和维护。

  • 导出优先级。流管理器先导出优先级较高的流,然后导出优先级较低的流。

  • 最大批处理大小和批处理间隔。当满足任一条件时,流管理器导出消息。

  • 生存时间 (TTL)。保证流数据可用于处理的时间量。您应确保数据可以在此时间段内使用。这不是删除策略。TTL 期限后可能不会立即删除数据。

  • 流持久性。选择将流保存到文件系统,以便在核心重新启动期间保留数据或将流保存在内存中。

有关 MessageStreamDefinition 的更多信息,请参阅目标语言的开发工具包参考:PythonJavaNode.js

注意

StreamManagerClient 还提供了一个可用于将流导出到 HTTP 服务器的目标。此目标仅用于测试目的。此目标不稳定,不支持在生产环境中使用。

您创建的流数量取决于您的硬件功能和业务案例。一种策略是为 AWS IoT Analytics 或 Kinesis 数据流中的每个目标通道创建一个流(尽管您可以为一个流定义多个目标)。流具有持久的使用寿命。创建流后,Lambda 函数可以读取和写入它。但是,您无法在创建流定义后更改它。如果要进行更改,则必须删除流,然后重新创建它。删除流时,流的所有存储数据将从磁盘中删除。

以下代码段创建一个名为 StreamName 的流。它定义了 MessageStreamDefinition 中的流属性和支持的数据类型。

Python
client = StreamManagerClient() try: client.create_message_stream(MessageStreamDefinition( name="StreamName", # Required. max_size=268435456, # Default is 256 MB. stream_segment_size=16777216, # Default is 16 MB. time_to_live_millis=None, # By default, no TTL is enabled. strategy_on_full=StrategyOnFull.OverwriteOldestData, # Required. persistence=Persistence.File, # Default is File. flush_on_write=False, # Default is false. export_definition=ExportDefinition( # Optional. Choose where/how the stream is exported to the AWS Cloud. kinesis=None, iot_analytics=None ) )) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

开发工具包参考:create_message_stream | MessageStreamDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456L) // Default is 256 MB. .withStreamSegmentSize(16777216L) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS Cloud. new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) ) ); } catch (StreamManagerException e) { // Properly handle exception. }

开发工具包参考:createMessageStream | MessageStreamDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456) // Default is 256 MB. .withStreamSegmentSize(16777216) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS Cloud. new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) ) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

开发工具包参考:createMessageStream | MessageStreamDefinition

附加消息

以下代码段将消息附加到名为 StreamName 的流。

Python
client = StreamManagerClient() try: sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data') except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

开发工具包参考:append_message

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes()); } catch (StreamManagerException e) { // Properly handle exception. }

开发工具包参考:appendMessage

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array")); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

开发工具包参考:appendMessage

读取消息

以下代码段读取名为 StreamName 的流中的消息。read 方法接受一个可选 ReadMessagesOptions 对象,该对象指定要开始读取的序列号、要读取的最小数量和最大数量以及读取消息的超时。

Python
client = StreamManagerClient() try: message_list = client.read_messages( stream_name="StreamName", # By default, if no options are specified, it tries to read one message from the beginning of the stream. options=ReadMessagesOptions( desired_start_sequence_number=100, # Try to read from sequence number 100 or greater. By default, this is 0. min_message_count=10, # Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. max_message_count=100, # Accept up to 100 messages. By default this is 1. read_timeout_millis=5000 # Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. ) ) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

开发工具包参考:read_messages | ReadMessagesOptions

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { List<Message> messages = client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100L) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. .withMinMessageCount(10L) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100L) // Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis()) ); } catch (StreamManagerException e) { // Properly handle exception. }

开发工具包参考:readMessages | ReadMessagesOptions

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const messages = await client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1. .withMinMessageCount(10) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100) // Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(5 * 1000) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

开发工具包参考:readMessages | ReadMessagesOptions

列出流

以下代码段获取流管理器中的流列表(按名称)。

Python
client = StreamManagerClient() try: stream_names = client.list_streams() except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

开发工具包参考:list_streams

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { List<String> streamNames = client.listStreams(); } catch (StreamManagerException e) { // Properly handle exception. }

开发工具包参考:listStreams

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const streams = await client.listStreams(); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

开发工具包参考:listStreams

描述消息流

以下代码段获取有关名为 StreamName 的流的元数据,包括流的定义、大小和导出程序状态。

Python
client = StreamManagerClient() try: stream_description = client.describe_message_stream(stream_name="StreamName") if stream_description.export_statuses[0].error_message: # The last export of export destination 0 failed with some error # Here is the last sequence number that was successfully exported stream_description.export_statuses[0].last_exported_sequence_number if (stream_description.storage_status.newest_sequence_number > stream_description.export_statuses[0].last_exported_sequence_number): pass # The end of the stream is ahead of the last exported sequence number except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

开发工具包参考:describe_message_stream

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { MessageStreamInfo description = client.describeMessageStream("StreamName"); String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage(); if (lastErrorMessage != null && !lastErrorMessage.equals("")) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.getExportStatuses().get(0).getLastExportedSequenceNumber(); } if (description.getStorageStatus().getNewestSequenceNumber() > description.getExportStatuses().get(0).getLastExportedSequenceNumber()) { // The end of the stream is ahead of the last exported sequence number. } } catch (StreamManagerException e) { // Properly handle exception. }

开发工具包参考:describeMessageStream

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const description = await client.describeMessageStream("StreamName"); const lastErrorMessage = description.exportStatuses[0].errorMessage; if (lastErrorMessage) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.exportStatuses[0].lastExportedSequenceNumber; } if (description.storageStatus.newestSequenceNumber > description.exportStatuses[0].lastExportedSequenceNumber) { // The end of the stream is ahead of the last exported sequence number. } } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

开发工具包参考:describeMessageStream

删除消息流

以下代码段删除名为 StreamName 的流。删除流时,流的所有存储数据将从磁盘中删除。

Python
client = StreamManagerClient() try: client.delete_message_stream(stream_name="StreamName") except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

开发工具包参考:deleteMessageStream

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { client.deleteMessageStream("StreamName"); } catch (StreamManagerException e) { // Properly handle exception. }

开发工具包参考:delete_message_stream

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.deleteMessageStream("StreamName"); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

开发工具包参考:deleteMessageStream

另请参阅