StreamManagerClient 用于处理直播 - Amazon IoT Greengrass
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

StreamManagerClient 用于处理直播

在 Greengrass 核心设备上运行的用户定义的 Greengrass 组件可以使用 Stream Manager SDK 中的StreamManagerClient对象在流管理器中创建流,然后与流进行交互。当组件创建流时,它会为该流定义Amazon Web Services 云目的地、优先级以及其他导出和数据保留策略。要将数据发送到流管理器,组件会将数据附加到流中。如果为流定义了导出目的地,则流管理器会自动导出该流。

注意

通常,流管理器的客户端是用户定义的 Greengrass 组件。如果您的业务案例需要,您还可以允许在 Greengrass 核心(例如 Docker 容器)上运行的非组件进程与流管理器进行交互。有关更多信息,请参阅 客户端身份验证

本主题中的片段向您展示了客户端如何调用StreamManagerClient方法来处理流。有关方法及其参数的实现详细信息,请使用每个片段后面列出的 SDK 参考链接。

如果您在 Lambda 函数中使用流管理器,则您的 Lambda 函数应在函数处理程序StreamManagerClient之外进行实例化。如果在处理程序中进行实例化,该函数每次被调用时都会创建一个 client 并连接到流管理器。

注意

如果在处理程序中实例化 StreamManagerClient,则必须在 client 完成其工作时显式调用 close() 方法。否则,client 会保持连接打开,并且另一个线程一直运行,直到脚本退出。

StreamManagerClient 支持以下操作:

创建消息流

要创建流,用户定义的 Greengrass 组件调用 create 方法并传入MessageStreamDefinition对象。此对象指定了流的唯一名称,并定义了当达到最大流大小时,流管理器应如何处理新数据。您可以使用 MessageStreamDefinition 及其数据类型(如 ExportDefinitionStrategyOnFullPersistence)来定义其他流属性。其中包括:

  • 目标Amazon IoT Analytics、Kinesis Data Streams 和自动导出的 Amazon S3 目的地。Amazon IoT SiteWise有关更多信息,请参阅 支持Amazon Web Services 云目的地的导出配置

  • 导出优先级。流管理器先导出优先级较高的流,然后导出优先级较低的流。

  • Kinesis Data Streams 和Amazon IoT SiteWise目的地的最大批处理大小和批处理间隔。Amazon IoT Analytics当满足任一条件时,流管理器导出消息。

  • Time-to-live (TTL)。保证流数据可用于处理的时间量。您应确保数据可以在此时间段内使用。这不是删除策略。TTL 期限后可能不会立即删除数据。

  • 流持久性。选择将流保存到文件系统,以便在核心重新启动期间保留数据或将流保存在内存中。

  • 起始序列号。指定要在导出中用作起始消息的消息的序列号。

有关的更多信息MessageStreamDefinition,请参阅您的目标语言的 SDK 参考资料:

注意

StreamManagerClient还提供了一个目标目标,您可以使用该目标将流导出到 HTTP 服务器。此目标仅用于测试目的。它不稳定或不支持在生产环境中使用。

创建流后,您的 Greengrass 组件可以向流追加消息以发送数据以供导出,并从流中读取消息以进行本地处理。您创建的流数量取决于您的硬件功能和业务案例。一种策略是为Amazon IoT Analytics或 Kinesis 数据流中的每个目标通道创建一个流,但您可以为一个流定义多个目标。流具有持久的使用寿命。

要求

此操作有以下要求:

  • 直播管理器 SDK 的最低版本:Python:1.1.1.1.1.1.1.1.1.0:1.1.1.1.1.1.1.1.1.1.0 Node.js 1.1.0

示例

以下代码段创建一个名为 StreamName 的流。它在MessageStreamDefinition和次级数据类型中定义流属性。

Python
client = StreamManagerClient() try: client.create_message_stream(MessageStreamDefinition( name="StreamName", # Required. max_size=268435456, # Default is 256 MB. stream_segment_size=16777216, # Default is 16 MB. time_to_live_millis=None, # By default, no TTL is enabled. strategy_on_full=StrategyOnFull.OverwriteOldestData, # Required. persistence=Persistence.File, # Default is File. flush_on_write=False, # Default is false. export_definition=ExportDefinition( # Optional. Choose where/how the stream is exported to the Amazon Web Services 云. kinesis=None, iot_analytics=None, iot_sitewise=None, s3_task_executor=None ) )) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK 参考:创建消息流 | MessageStreamDefinition

Java
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) { client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456L) // Default is 256 MB. .withStreamSegmentSize(16777216L) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the Amazon Web Services 云. new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) .withIotSiteWise(null) .withS3(null) ) ); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK:createMessageStream| MessageStreamDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456) // Default is 256 MB. .withStreamSegmentSize(16777216) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the Amazon Web Services 云. new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) .withIotSiteWise(null) .withS3(null) ) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 参考资料:createMessageStream| MessageStreamDefinition

支持Amazon Web Services 云目的地的导出配置

附加消息

要将数据发送到流管理器进行导出,您的 Greengrass 组件会将数据附加到目标流。导出目的地决定了要传递给此方法的数据类型。

要求

此操作有以下要求:

  • 直播管理器 SDK 的最低版本:Python:1.1.1.1.1.1.1.1.1.0:1.1.1.1.1.1.1.1.1.1.0 Node.js 1.1.0

示例

Amazon IoT Analytics或 Kinesis Data Streams 导出目的地

以下代码段将消息附加到名为 StreamName 的流。对于我们Amazon IoT Analytics的 Kinesis Data Streams 目的地,您的 Greengrass 组件会附加大量数据。

此片段有以下要求:

  • 直播管理器 SDK 的最低版本:Python:1.1.1.1.1.1.1.1.1.0:1.1.1.1.1.1.1.1.1.1.0 Node.js 1.1.0

Python
client = StreamManagerClient() try: sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data') except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python S DK 参考资料:附加消息

Java
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) { long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes()); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK 参考:appendMessage

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array")); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 参考:appendMessage

Amazon IoT SiteWise出口目的地

以下代码段将消息附加到名为 StreamName 的流。对于Amazon IoT SiteWise目的地,您的 Greengrass 组件会附加一个序列化PutAssetPropertyValueEntry对象。有关更多信息,请参阅 导出到 Amazon IoT SiteWise

注意

当您将数据发送到时Amazon IoT SiteWise,您的数据必须满足BatchPutAssetPropertyValue操作的要求。有关更多信息,请参阅《Amazon IoT SiteWise API 参考》中的 BatchPutAssetPropertyValue

此片段有以下要求:

  • 直播管理器 SDK 的最低版本:Python:1.1.1.1.1.1.1.1.1.0:1.1.1.1.1.1.1.1.1.1.0 Node.js 1.1.0

Python
client = StreamManagerClient() try: # SiteWise requires unique timestamps in all messages and also needs timestamps not earlier # than 10 minutes in the past. Add some randomness to time and offset. # Note: To create a new asset property data, you should use the classes defined in the # greengrasssdk.stream_manager module. time_in_nanos = TimeInNanos( time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000) ) variant = Variant(double_value=random.random()) asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)] putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset) sequence_number = client.append_message(stream_name="StreamName", Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 软件开发工具包参考:附加消息 | PutAssetPropertyValueEntry

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { Random rand = new Random(); // Note: To create a new asset property data, you should use the classes defined in the // com.amazonaws.greengrass.streammanager.model.sitewise package. List<AssetPropertyValue> entries = new ArrayList<>() ; // IoTSiteWise requires unique timestamps in all messages and also needs timestamps not earlier // than 10 minutes in the past. Add some randomness to time and offset. final int maxTimeRandomness = 60; final int maxOffsetRandomness = 10000; double randomValue = rand.nextDouble(); TimeInNanos timestamp = new TimeInNanos() .withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness)) .withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness))); AssetPropertyValue entry = new AssetPropertyValue() .withValue(new Variant().withDoubleValue(randomValue)) .withQuality(Quality.GOOD) .withTimestamp(timestamp); entries.add(entry); PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry() .withEntryId(UUID.randomUUID().toString()) .withPropertyAlias("PropertyAlias") .withPropertyValues(entries); long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry)); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK 参考:appendMessage | PutAssetPropertyValueEntry

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const maxTimeRandomness = 60; const maxOffsetRandomness = 10000; const randomValue = Math.random(); // Note: To create a new asset property data, you should use the classes defined in the // aws-greengrass-core-sdk StreamManager module. const timestamp = new TimeInNanos() .withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness)) .withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness)); const entry = new AssetPropertyValue() .withValue(new Variant().withDoubleValue(randomValue)) .withQuality(Quality.GOOD) .withTimestamp(timestamp); const putAssetPropertyValueEntry = new PutAssetPropertyValueEntry() .withEntryId(`${ENTRY_ID_PREFIX}${i}`) .withPropertyAlias("PropertyAlias") .withPropertyValues([entry]); const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry)); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 参考:appendMessage | PutAssetPropertyValueEntry

Amazon S3

以下代码段将导出任务附加到名为的流StreamName。对于 Amazon S3 目标,您的 Greengrass 组件会附加一个序列化S3ExportTaskDefinition对象,该对象包含有关源输入文件和目标 Amazon S3 对象的信息。,Sream Manager。有关更多信息,请参阅 Amazon S3

此片段有以下要求:

  • 直播管理器 SDK 的最低版本:Python:1.1.1.1.1.1.1.1.1.0:1.1.1.1.1.1.1.1.1.1.0 Node.js 1.1.0

Python
client = StreamManagerClient() try: # Append an Amazon S3 Task definition and print the sequence number. s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName") sequence_number = client.append_message(stream_name="StreamName", Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python S DK 参考:附加消息 | S3ExportTaskDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { // Append an Amazon S3 export task definition and print the sequence number. S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition() .withBucket("BucketName") .withKey("KeyName") .withInputUrl("URLToFile"); long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition)); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK 参考:appendMessage | S3ExportTaskDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { // Append an Amazon S3 export task definition and print the sequence number. const taskDefinition = new S3ExportTaskDefinition() .withBucket("BucketName") .withKey("KeyName") .withInputUrl("URLToFile"); const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition))); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 参考:appendMessage | S3ExportTaskDefinition

读取消息

从直播中读取消息。

要求

此操作有以下要求:

  • 直播管理器 SDK 的最低版本:Python:1.1.1.1.1.1.1.1.1.0:1.1.1.1.1.1.1.1.1.1.0 Node.js 1.1.0

示例

以下代码段读取名为 StreamName 的流中的消息。read 方法接受一个可选 ReadMessagesOptions 对象,该对象指定要开始读取的序列号、要读取的最小数量和最大数量以及读取消息的超时。

Python
client = StreamManagerClient() try: message_list = client.read_messages( stream_name="StreamName", # By default, if no options are specified, it tries to read one message from the beginning of the stream. options=ReadMessagesOptions( desired_start_sequence_number=100, # Try to read from sequence number 100 or greater. By default, this is 0. min_message_count=10, # Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. max_message_count=100, # Accept up to 100 messages. By default this is 1. read_timeout_millis=5000 # Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. ) ) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 软件开发工具包参考:读取消息 | ReadMessagesOptions

Java
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) { List<Message> messages = client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100L) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. .withMinMessageCount(10L) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100L) // Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis()) ); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK 参考:readMessag es | ReadMessagesOptions

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const messages = await client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1. .withMinMessageCount(10) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100) // Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(5 * 1000) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 参考:readMessag es | ReadMessagesOptions

列出流

在直播管理器中获取直播列表。

要求

此操作有以下要求:

  • 直播管理器 SDK 的最低版本:Python:1.1.1.1.1.1.1.1.1.0:1.1.1.1.1.1.1.1.1.1.0 Node.js 1.1.0

示例

以下代码段获取流管理器中的流列表(按名称)。

Python
client = StreamManagerClient() try: stream_names = client.list_streams() except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK 参考资料:列表流

Java
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) { List<String> streamNames = client.listStreams(); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK 参考:listStreams

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const streams = await client.listStreams(); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 参考:listStreams

描述消息流

获取有关流的元数据,包括流定义、大小和导出状态。

要求

此操作有以下要求:

  • 直播管理器 SDK 的最低版本:Python:1.1.1.1.1.1.1.1.1.0:1.1.1.1.1.1.1.1.1.1.0 Node.js 1.1.0

示例

以下代码段获取有关名为 StreamName 的流的元数据,包括流的定义、大小和导出程序状态。

Python
client = StreamManagerClient() try: stream_description = client.describe_message_stream(stream_name="StreamName") if stream_description.export_statuses[0].error_message: # The last export of export destination 0 failed with some error # Here is the last sequence number that was successfully exported stream_description.export_statuses[0].last_exported_sequence_number if (stream_description.storage_status.newest_sequence_number > stream_description.export_statuses[0].last_exported_sequence_number): pass # The end of the stream is ahead of the last exported sequence number except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK 参考:描述消息流

Java
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) { MessageStreamInfo description = client.describeMessageStream("StreamName"); String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage(); if (lastErrorMessage != null && !lastErrorMessage.equals("")) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.getExportStatuses().get(0).getLastExportedSequenceNumber(); } if (description.getStorageStatus().getNewestSequenceNumber() > description.getExportStatuses().get(0).getLastExportedSequenceNumber()) { // The end of the stream is ahead of the last exported sequence number. } } catch (StreamManagerException e) { // Properly handle exception. }

Java 开发工具包参考资料:describeMessageStream

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const description = await client.describeMessageStream("StreamName"); const lastErrorMessage = description.exportStatuses[0].errorMessage; if (lastErrorMessage) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.exportStatuses[0].lastExportedSequenceNumber; } if (description.storageStatus.newestSequenceNumber > description.exportStatuses[0].lastExportedSequenceNumber) { // The end of the stream is ahead of the last exported sequence number. } } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 参考资料:describeMessageStream

更新消息流

更新现有流的属性。如果直播创建后您的要求发生变化,则可能需要更新该直播。例如:

  • 为Amazon Web Services 云目标添加新的导出配置

  • 增加数据流的最大大小以更改数据的导出或保留方式。例如,流大小与您的完整设置策略相结合,可能会导致数据在流管理器处理之前被删除或拒绝。

  • 暂停和恢复导出;例如,如果导出任务运行时间很长,而您想对上传数据进行定量分配。

您的 Greengrass 组件遵循以下高级流程来更新数据流:

  1. 获取直播的描述。

  2. 更新相应MessageStreamDefinition和从属对象的目标属性。

  3. 传递更新后的内容MessageStreamDefinition。确保为更新后的流包含完整的对象定义。,。

    您可以指定消息的序列号以用作导出中的起始消息。

要求

此操作有以下要求:

  • 直播管理器 SDK 的最低版本:Python:1.1.1.1.1.1.1.1.1.0:1.1.1.1.1.1.1.1.1.1.0 Node.js 1.1.0

示例

以下代码段更新了名为的流StreamName。它更新了导出到 Kinesis Data Streams 的流的多个属性。

Python
client = StreamManagerClient() try: message_stream_info = client.describe_message_stream(STREAM_NAME) message_stream_info.definition.max_size=536870912 message_stream_info.definition.stream_segment_size=33554432 message_stream_info.definition.time_to_live_millis=3600000 message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData message_stream_info.definition.persistence=Persistence.Memory message_stream_info.definition.flush_on_write=False message_stream_info.definition.export_definition.kinesis= [KinesisConfig( # Updating Export definition to add a Kinesis Stream configuration. identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))] client.update_message_stream(message_stream_info.definition) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 开发工具包参考资料 updateMessageStream| MessageStreamDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME); // Update the message stream with new values. client.updateMessageStream( messageStreamInfo.getDefinition() .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data. // Max Size update should be greater than initial Max Size defined in Create Message Stream request .withMaxSize(536870912L) // Update Max Size to 512 MB. .withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB. .withFlushOnWrite(true) // Update flush on write to true. .withPersistence(Persistence.Memory) // Update the persistence to Memory. .withTimeToLiveMillis(3600000L) // Update TTL to 1 hour. .withExportDefinition( // Optional. Choose where/how the stream is exported to the Amazon Web Services 云. messageStreamInfo.getDefinition().getExportDefinition(). // Updating Export definition to add a Kinesis Stream configuration. .withKinesis(new ArrayList<KinesisConfig>() {{ add(new KinesisConfig() .withIdentifier(EXPORT_IDENTIFIER) .withKinesisStreamName("test")); }}) ); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK 参考:更新消息流 | MessageStreamDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const messageStreamInfo = await c.describeMessageStream(STREAM_NAME); await client.updateMessageStream( messageStreamInfo.definition // Max Size update should be greater than initial Max Size defined in Create Message Stream request .withMaxSize(536870912) // Default is 256 MB. Updating Max Size to 512 MB. .withStreamSegmentSize(33554432) // Default is 16 MB. Updating Segment Size to 32 MB. .withTimeToLiveMillis(3600000) // By default, no TTL is enabled. Update TTL to 1 hour. .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data. .withPersistence(Persistence.Memory) // Default is File. Update the persistence to Memory .withFlushOnWrite(true) // Default is false. Updating to true. .withExportDefinition( // Optional. Choose where/how the stream is exported to the Amazon Web Services 云. messageStreamInfo.definition.exportDefinition // Updating Export definition to add a Kinesis Stream configuration. .withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())]) ) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 参考资料:updateMessageStream| MessageStreamDefinition

更新直播的限制

。除非以下列表中另有说明,否则更新将立即生效。

  • SendSIM。要更改此行为,请删除该流,然后创建一个定义新持久性策略的流

  • 只有在以下条件下,您才能更新流的最大大小:

    • 最大大小必须大于或等于流的当前大小。要查找此信息,请描述流,然后检查返回MessageStreamInfo对象的存储状态。

    • Size Size。

  • 您可以将流段大小更新为小于流最大大小的值。更新后的设置适用于新分段。

  • (TTL),。如果您降低此值,流管理器也可能会删除超过 TTL 的现有分段。

  • 对完整属性的策略的更新适用于新的追加操作。如果您将策略设置为覆盖最旧的数据,则流管理器也可能会根据新设置覆盖现有分段。

  • 写入时刷新属性的更新适用于新消息。

  • 导出配置的更新适用于新的导出。。否则,直播管理器会将其删除。

    • 更新导出配置时,请指定目标导出配置的标识符。

    • 要添加导出配置,请为新的导出配置指定唯一标识符。

    • 要删除导出配置,请省略导出配置。

  • 更新流中导出配置的起始序列号,必须指定一个小于最新序列号的值。要查找此信息,请描述流,然后检查返回MessageStreamInfo对象的存储状态。

删除消息流

删除流。删除流时,流的所有存储数据将从磁盘中删除。

要求

此操作有以下要求:

  • 直播管理器 SDK 的最低版本:Python:1.1.1.1.1.1.1.1.1.0:1.1.1.1.1.1.1.1.1.1.0 Node.js 1.1.0

示例

以下代码段删除名为 StreamName 的流。

Python
client = StreamManagerClient() try: client.delete_message_stream(stream_name="StreamName") except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 开发工具包参考资料 deleteMessageStream

Java
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) { client.deleteMessageStream("StreamName"); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK 参考:delete_message_stream

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.deleteMessageStream("StreamName"); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 参考资料:deleteMessageStream

另请参阅