Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅
中国的 Amazon Web Services 服务入门
(PDF)。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
StreamManagerClient 用于处理直播
在 Greengrass 核心设备上运行的用户定义的 Greengrass 组件可以使用StreamManagerClient流管理器 SDK 中的对象在直播管理器中创建直播,然后与直播交互。当组件创建流时,它会定义流的Amazon Web Services 云目的地、优先级以及其他导出和数据保留策略。要将数据发送到流管理器,组件会将数据附加到流中。如果为流定义了导出目标,流管理器会自动导出流。
通常,流管理器的客户端是用户定义的 Greengrass 组件。如果您的业务案例需要,您还可以允许在 Greengrass 内核(例如 Docker 容器)上运行的非组件进程与流管理器交互。有关更多信息,请参阅 客户端身份验证。
本主题中的代码段向您展示客户端如何调用 StreamManagerClient
方式处理流。有关方法及其参数的实现详细信息,请使用指向每个代码片段后面列出的开发工具包参考的链接。
如果您在 Lambda 函数中使用流管理器,则您的 Lambda 函数应在函数处理程序之外进行实例化StreamManagerClient
。如果在处理程序中进行实例化,该函数每次被调用时都会创建一个 client
并连接到流管理器。
如果在处理程序中实例化 StreamManagerClient
,则必须在 client
完成其工作时显式调用 close()
方法。否则,client
会保持连接打开,并且另一个线程一直运行,直到脚本退出。
StreamManagerClient
支持以下操作:
创建消息流
要创建直播,用户定义的 Greengrass 组件会调用 create 方法并传入一个对象。MessageStreamDefinition
此对象指定流的唯一名称,并定义当达到最大流大小时,流管理器应如何处理新数据。您可以使用 MessageStreamDefinition
及其数据类型(如 ExportDefinition
、StrategyOnFull
和 Persistence
)来定义其他流属性。其中包括:
-
自动导出的目标 Amazon IoT Analytics、Kinesis Data Streams、Amazon IoT SiteWise 和 Amazon S3 目标。有关更多信息,请参阅 导出支持的 Amazon Web Services 云 目标的配置。
-
导出优先级。流管理器先导出优先级较高的流,然后导出优先级较低的流。
-
Amazon IoT Analytics、Kinesis Data Streams 和 Amazon IoT SiteWise 目标的最大批处理大小和批处理间隔。当满足任一条件时,流管理器导出消息。
-
T ime-to-live (TTL)。保证流数据可用于处理的时间量。您应确保数据可以在此时间段内使用。这不是删除策略。TTL 期限后可能不会立即删除数据。
-
流持久性。选择将流保存到文件系统,以便在核心重新启动期间保留数据或将流保存在内存中。
-
起始序列号。指定要在导出中用作起始消息的消息的序列号。
有关 MessageStreamDefinition
的更多信息,请参阅目标语言的开发工具包参考:
StreamManagerClient
还提供了一个可用于将流导出到 HTTP 服务器的目标。此目标仅用于测试目的。其不稳定,或不支持在生产环境中使用。
创建流后,您的 Greengrass 组件可以将消息附加到流中以发送数据以供导出,并从流中读取消息以进行本地处理。您创建的流数量取决于您的硬件功能和业务案例。一种策略是为 Amazon IoT Analytics 或 Kinesis 数据流中的每个目标通道创建一个流(尽管您可以为一个流定义多个目标)。流具有持久的使用寿命。
要求
此操作具有以下要求:
示例
以下代码段创建一个名为 StreamName
的流。它定义了 MessageStreamDefinition
中的流属性和从属的数据类型。
- Python
-
client = StreamManagerClient()
try:
client.create_message_stream(MessageStreamDefinition(
name="StreamName", # Required.
max_size=268435456, # Default is 256 MB.
stream_segment_size=16777216, # Default is 16 MB.
time_to_live_millis=None, # By default, no TTL is enabled.
strategy_on_full=StrategyOnFull.OverwriteOldestData, # Required.
persistence=Persistence.File, # Default is File.
flush_on_write=False, # Default is false.
export_definition=ExportDefinition( # Optional. Choose where/how the stream is exported to the Amazon Web Services 云.
kinesis=None,
iot_analytics=None,
iot_sitewise=None,
s3_task_executor=None
)
))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK 参考:创建消息流 | MessageStreamDefinition
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
client.createMessageStream(
new MessageStreamDefinition()
.withName("StreamName") // Required.
.withMaxSize(268435456L) // Default is 256 MB.
.withStreamSegmentSize(16777216L) // Default is 16 MB.
.withTimeToLiveMillis(null) // By default, no TTL is enabled.
.withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required.
.withPersistence(Persistence.File) // Default is File.
.withFlushOnWrite(false) // Default is false.
.withExportDefinition( // Optional. Choose where/how the stream is exported to the Amazon Web Services 云.
new ExportDefinition()
.withKinesis(null)
.withIotAnalytics(null)
.withIotSitewise(null)
.withS3(null)
)
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java 开发工具包参考:createMessageStream| MessageStreamDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
await client.createMessageStream(
new MessageStreamDefinition()
.withName("StreamName") // Required.
.withMaxSize(268435456) // Default is 256 MB.
.withStreamSegmentSize(16777216) // Default is 16 MB.
.withTimeToLiveMillis(null) // By default, no TTL is enabled.
.withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required.
.withPersistence(Persistence.File) // Default is File.
.withFlushOnWrite(false) // Default is false.
.withExportDefinition( // Optional. Choose where/how the stream is exported to the Amazon Web Services 云.
new ExportDefinition()
.withKinesis(null)
.withIotAnalytics(null)
.withIotSiteWise(null)
.withS3(null)
)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK 参考:createMessageStream| MessageStreamDefinition
有关配置导出目标的更多信息,请参阅导出支持的 Amazon Web Services 云 目标的配置。
附加消息
要将数据发送到流管理器进行导出,Greengrass 组件会将数据附加到目标流。导出目标决定要传递给此方法的数据类型。
要求
此操作具有以下要求:
示例
Amazon IoT Analytics 或 Kinesis Data Streams 导出目标
以下代码段将消息附加到名为 StreamName
的流。对于我们Amazon IoT Analytics的 Kinesis Data Streams 目标,你的 Greengrass 组件会附加一大堆数据。
此代码段具有以下要求:
- Python
-
client = StreamManagerClient()
try:
sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data')
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python 开发工具包参考:append_message
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes());
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java 开发工具包参考:appendMessage
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array"));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js 开发工具包参考:appendMessage
Amazon IoT SiteWise 导出目标
以下代码段将消息附加到名为 StreamName
的流。对于Amazon IoT SiteWise目的地,您的 Greengrass 组件会附加一个序列化对象。PutAssetPropertyValueEntry
有关更多信息,请参阅 导出到 Amazon IoT SiteWise。
当您将数据发送到 Amazon IoT SiteWise 时,数据必须满足 BatchPutAssetPropertyValue
操作的要求。有关更多信息,请参阅《Amazon IoT SiteWise API 参考》中的 BatchPutAssetPropertyValue。
此代码段具有以下要求:
- Python
-
client = StreamManagerClient()
try:
# SiteWise requires unique timestamps in all messages and also needs timestamps not earlier
# than 10 minutes in the past. Add some randomness to time and offset.
# Note: To create a new asset property data, you should use the classes defined in the
# greengrasssdk.stream_manager module.
time_in_nanos = TimeInNanos(
time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
)
variant = Variant(double_value=random.random())
asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset)
sequence_number = client.append_message(stream_name="StreamName", Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK 参考:追加消息 | PutAssetPropertyValueEntry
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
Random rand = new Random();
// Note: To create a new asset property data, you should use the classes defined in the
// com.amazonaws.greengrass.streammanager.model.sitewise package.
List<AssetPropertyValue> entries = new ArrayList<>() ;
// IoTSiteWise requires unique timestamps in all messages and also needs timestamps not earlier
// than 10 minutes in the past. Add some randomness to time and offset.
final int maxTimeRandomness = 60;
final int maxOffsetRandomness = 10000;
double randomValue = rand.nextDouble();
TimeInNanos timestamp = new TimeInNanos()
.withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness))
.withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness)));
AssetPropertyValue entry = new AssetPropertyValue()
.withValue(new Variant().withDoubleValue(randomValue))
.withQuality(Quality.GOOD)
.withTimestamp(timestamp);
entries.add(entry);
PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
.withEntryId(UUID.randomUUID().toString())
.withPropertyAlias("PropertyAlias")
.withPropertyValues(entries);
long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK 参考:附加消息 | PutAssetPropertyValueEntry
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const maxTimeRandomness = 60;
const maxOffsetRandomness = 10000;
const randomValue = Math.random();
// Note: To create a new asset property data, you should use the classes defined in the
// aws-greengrass-core-sdk StreamManager module.
const timestamp = new TimeInNanos()
.withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness))
.withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness));
const entry = new AssetPropertyValue()
.withValue(new Variant().withDoubleValue(randomValue))
.withQuality(Quality.GOOD)
.withTimestamp(timestamp);
const putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
.withEntryId(`${ENTRY_ID_PREFIX}${i}`)
.withPropertyAlias("PropertyAlias")
.withPropertyValues([entry]);
const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK 参考:追加消息 | PutAssetPropertyValueEntry
Amazon S3 导出目标
以下代码段将导出任务附加到名为 StreamName
的流。对于亚马逊 S3 目的地,您的 Greengrass 组件会附加一个S3ExportTaskDefinition
序列化对象,其中包含有关源输入文件和目标 Amazon S3 对象的信息。如果指定的对象不存在,流管理器会为您创建。有关更多信息,请参阅 导出到 Amazon S3。
此代码段具有以下要求:
- Python
-
client = StreamManagerClient()
try:
# Append an Amazon S3 Task definition and print the sequence number.
s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName")
sequence_number = client.append_message(stream_name="StreamName", Util.validate_and_serialize_to_json_bytes(s3_export_task_definition))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK 参考:追加消息 | S3 ExportTaskDefinition
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
// Append an Amazon S3 export task definition and print the sequence number.
S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition()
.withBucket("BucketName")
.withKey("KeyName")
.withInputUrl("URLToFile");
long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition));
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK 参考:附加消息 | S3 ExportTaskDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
// Append an Amazon S3 export task definition and print the sequence number.
const taskDefinition = new S3ExportTaskDefinition()
.withBucket("BucketName")
.withKey("KeyName")
.withInputUrl("URLToFile");
const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition)));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK 参考:追加消息 | S3 ExportTaskDefinition
读取消息
从流读取消息。
要求
此操作具有以下要求:
示例
以下代码段读取名为 StreamName
的流中的消息。read 方法接受一个可选 ReadMessagesOptions
对象,该对象指定要开始读取的序列号、要读取的最小数量和最大数量以及读取消息的超时。
- Python
-
client = StreamManagerClient()
try:
message_list = client.read_messages(
stream_name="StreamName",
# By default, if no options are specified, it tries to read one message from the beginning of the stream.
options=ReadMessagesOptions(
desired_start_sequence_number=100,
# Try to read from sequence number 100 or greater. By default, this is 0.
min_message_count=10,
# Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
max_message_count=100, # Accept up to 100 messages. By default this is 1.
read_timeout_millis=5000
# Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
)
)
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK 参考:阅读消息 | ReadMessagesOptions
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
List<Message> messages = client.readMessages("StreamName",
// By default, if no options are specified, it tries to read one message from the beginning of the stream.
new ReadMessagesOptions()
// Try to read from sequence number 100 or greater. By default this is 0.
.withDesiredStartSequenceNumber(100L)
// Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
.withMinMessageCount(10L)
// Accept up to 100 messages. By default this is 1.
.withMaxMessageCount(100L)
// Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
.withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis())
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java 开发工具包参考:阅读消息 | ReadMessagesOptions
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const messages = await client.readMessages("StreamName",
// By default, if no options are specified, it tries to read one message from the beginning of the stream.
new ReadMessagesOptions()
// Try to read from sequence number 100 or greater. By default this is 0.
.withDesiredStartSequenceNumber(100)
// Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1.
.withMinMessageCount(10)
// Accept up to 100 messages. By default this is 1.
.withMaxMessageCount(100)
// Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
.withReadTimeoutMillis(5 * 1000)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK 参考:阅读消息 | ReadMessagesOptions
列出流
在流管理器中获取流列表。
要求
此操作具有以下要求:
示例
以下代码段获取流管理器中的流列表(按名称)。
- Python
-
client = StreamManagerClient()
try:
stream_names = client.list_streams()
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python 开发工具包参考:list_streams
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
List<String> streamNames = client.listStreams();
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java 开发工具包参考:listStreams
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const streams = await client.listStreams();
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js 开发工具包参考:listStreams
描述消息流
获取有关流的元数据,包括流定义、大小和导出状态。
要求
此操作具有以下要求:
示例
以下代码段获取有关名为 StreamName
的流的元数据,包括流的定义、大小和导出程序状态。
- Python
-
client = StreamManagerClient()
try:
stream_description = client.describe_message_stream(stream_name="StreamName")
if stream_description.export_statuses[0].error_message:
# The last export of export destination 0 failed with some error
# Here is the last sequence number that was successfully exported
stream_description.export_statuses[0].last_exported_sequence_number
if (stream_description.storage_status.newest_sequence_number >
stream_description.export_statuses[0].last_exported_sequence_number):
pass
# The end of the stream is ahead of the last exported sequence number
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python 开发工具包参考:describe_message_stream
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
MessageStreamInfo description = client.describeMessageStream("StreamName");
String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage();
if (lastErrorMessage != null && !lastErrorMessage.equals("")) {
// The last export of export destination 0 failed with some error.
// Here is the last sequence number that was successfully exported.
description.getExportStatuses().get(0).getLastExportedSequenceNumber();
}
if (description.getStorageStatus().getNewestSequenceNumber() >
description.getExportStatuses().get(0).getLastExportedSequenceNumber()) {
// The end of the stream is ahead of the last exported sequence number.
}
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java 开发工具包参考:describeMessageStream
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const description = await client.describeMessageStream("StreamName");
const lastErrorMessage = description.exportStatuses[0].errorMessage;
if (lastErrorMessage) {
// The last export of export destination 0 failed with some error.
// Here is the last sequence number that was successfully exported.
description.exportStatuses[0].lastExportedSequenceNumber;
}
if (description.storageStatus.newestSequenceNumber >
description.exportStatuses[0].lastExportedSequenceNumber) {
// The end of the stream is ahead of the last exported sequence number.
}
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK 参考:describeMessageStream
更新消息流
更新现有流的属性。如果流创建后您的要求发生变化,则可能需要更新流。例如:
-
为 Amazon Web Services 云 目标添加新的导出配置。
-
增加流的最大大小以更改数据的导出或保留方式。例如,将流大小与您在完整设置下的策略相结合,可能会导致数据在流管理器处理之前被删除或拒绝。
-
暂停然后恢复导出;例如,如果导出任务运行时间较长,而您想对上传数据进行配给。
您的 Greengrass 组件遵循以下高级流程来更新直播:
-
获取流的描述。
-
更新相应 MessageStreamDefinition
和从属对象的目标属性。
-
传入更新后的 MessageStreamDefinition
。请务必包含更新后的流的完整对象定义。未定义属性将恢复为默认值。
可以指定要在导出中用作起始消息的消息的序列号。
要求
此操作具有以下要求:
示例
以下代码段更新名为 StreamName
的流。它会更新导出到 Kinesis Data Streams 的流的多个属性。
- Python
-
client = StreamManagerClient()
try:
message_stream_info = client.describe_message_stream(STREAM_NAME)
message_stream_info.definition.max_size=536870912
message_stream_info.definition.stream_segment_size=33554432
message_stream_info.definition.time_to_live_millis=3600000
message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData
message_stream_info.definition.persistence=Persistence.Memory
message_stream_info.definition.flush_on_write=False
message_stream_info.definition.export_definition.kinesis=
[KinesisConfig(
# Updating Export definition to add a Kinesis Stream configuration.
identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))]
client.update_message_stream(message_stream_info.definition)
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python 开发工具包参考:updateMessageStream| MessageStreamDefinition
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME);
// Update the message stream with new values.
client.updateMessageStream(
messageStreamInfo.getDefinition()
.withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
// Max Size update should be greater than initial Max Size defined in Create Message Stream request
.withMaxSize(536870912L) // Update Max Size to 512 MB.
.withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB.
.withFlushOnWrite(true) // Update flush on write to true.
.withPersistence(Persistence.Memory) // Update the persistence to Memory.
.withTimeToLiveMillis(3600000L) // Update TTL to 1 hour.
.withExportDefinition(
// Optional. Choose where/how the stream is exported to the Amazon Web Services 云.
messageStreamInfo.getDefinition().getExportDefinition().
// Updating Export definition to add a Kinesis Stream configuration.
.withKinesis(new ArrayList<KinesisConfig>() {{
add(new KinesisConfig()
.withIdentifier(EXPORT_IDENTIFIER)
.withKinesisStreamName("test"));
}})
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK 参考:更新消息流 | MessageStreamDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const messageStreamInfo = await c.describeMessageStream(STREAM_NAME);
await client.updateMessageStream(
messageStreamInfo.definition
// Max Size update should be greater than initial Max Size defined in Create Message Stream request
.withMaxSize(536870912) // Default is 256 MB. Updating Max Size to 512 MB.
.withStreamSegmentSize(33554432) // Default is 16 MB. Updating Segment Size to 32 MB.
.withTimeToLiveMillis(3600000) // By default, no TTL is enabled. Update TTL to 1 hour.
.withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
.withPersistence(Persistence.Memory) // Default is File. Update the persistence to Memory
.withFlushOnWrite(true) // Default is false. Updating to true.
.withExportDefinition(
// Optional. Choose where/how the stream is exported to the Amazon Web Services 云.
messageStreamInfo.definition.exportDefinition
// Updating Export definition to add a Kinesis Stream configuration.
.withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())])
)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK 参考:updateMessageStream| MessageStreamDefinition
更新流时的限制
更新流时,有以下限制。除非在以下列表中注明,否则更新会立即生效。
-
无法更新流的持久性。要更改此行为,请删除该流然后创建一个流以定义新的持久性策略。
-
只有在以下条件下,您才能更新流的最大大小:
-
可以将流的段大小更新为小于流的最大大小的值。更新的设置将应用于新的段。
-
生存时间(TTL)属性的更新将应用于新的追加操作。如果您减小此值,流管理器也可能会删除超过 TTL 的现有段。
-
对全属性策略的更新将应用于新的追加操作。如果您将策略设置为覆盖最旧的数据,则流管理器还可能根据新设置覆盖现有段。
-
写入时刷新属性的更新将应用于新消息。
-
导出配置的更新将应用于新的导出。更新请求必须包含您想要支持的所有导出配置。否则,流管理器会将其删除。
-
更新导出配置时,请指定目标导出配置的标识符。
-
要添加导出配置,请为新的导出配置指定唯一标识符。
-
要删除导出配置,请省略导出配置。
-
要更新流中导出配置的起始序列号,必须指定一个小于最新序列号的值。要查找此信息,请描述流,然后检查返回的 MessageStreamInfo
对象的存储状态。
删除消息流
删除流。删除流时,流的所有存储数据将从磁盘中删除。
要求
此操作具有以下要求:
示例
以下代码段删除名为 StreamName
的流。
- Python
-
client = StreamManagerClient()
try:
client.delete_message_stream(stream_name="StreamName")
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python 开发工具包参考:deleteMessageStream
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
client.deleteMessageStream("StreamName");
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java 开发工具包参考:delete_message_stream
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
await client.deleteMessageStream("StreamName");
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK 参考:deleteMessageStream
另请参阅