对于新项目,我们建议您使用新的 Kinesis Data Analytics 工作室,而不是 SQL 应用程序的 Kinesis Data Analytics。Kinesis Data Analytics Studio 将易用性与高级分析功能相结合,使您能够在几分钟内构建复杂的流处理应用程序。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
并行处理输入流以增加吞吐量
Amazon Kinesis Data Analytics 应用程序可以支持多个应用程序内输入流,从而将应用程序扩展到单个应用程序内输入流的吞吐量之外。有关应用程序内部输入流的更多信息,请参阅 适用于 SQL 应用程序的Amazon Kinesis Data Analytics:工作原理。
在几乎所有情况下,Amazon Kinesis Data Analytics 都会扩展您的应用程序以处理馈入应用程序的 Kinesis 流或 Kinesis Data Firehose 源流的容量。但是,如果您的源流的吞吐量超出单个应用程序内部输入流的吞吐量,您可显式增加您的应用程序使用的应用程序内部输入流的数量。您需使用 InputParallelism
参数执行此操作。
当InputParallelism
参数大于一时,Amazon Kinesis Data Analytics 会在应用程序内流之间平均分配源流的分区。例如,如果您的源流具有 50 个分片,并且您已将 InputParallelism
设置为 2
,则每个应用程序内部输入流都将收到来自 25 个源流分片的输入。
当您增加应用程序内部流的数量后,您的应用程序必须显式访问每个流中的数据。有关通过代码访问多个应用程序内部流的信息,请参阅在您的 Amazon Kinesis Data Analytics 应用程序中访问单独的应用程序内流。
尽管 Kinesis Data Streams 和 Kinesis Data Firehose 流分片在应用程序内流之间划分的方式相同,但它们在应用程序中的显示方式有所不同:
来自 Kinesis 数据流的记录包括一个
shard_id
字段,该字段可用于识别记录的源分片。来自 Kinesis Data Firehose 传输流的记录不包含标识记录源分片或分区的字段。这是因为 Kinesis Data Firehose 会将这些信息从您的应用程序中抽象出来。
评估是否增加您的应用程序内部输入流的数量
在大多数情况下,单个应用程序内部输入流可处理单个源流的吞吐量,具体取决于输入流的复杂度和数据大小。要确定是否需要增加应用程序内输入流的数量,您可以监控亚马逊中的InputBytes
和MillisBehindLatest
指标 CloudWatch。
如果 InputBytes
指标大于 100 MB/秒(或您预期该指标将大于此速率),这可能会使 MillisBehindLatest
增大并导致应用程序问题的影响扩大。为解决此问题,我们建议您为应用程序选择以下语言:
如果您的应用程序的扩展需求超过 100 MB/秒,请对 SQL 应用程序使用多个流和 Kinesis Data Analytics。
如果 MillisBehindLatest
指标具有以下任一特征,则应调高您的应用程序的 InputParallelism
设置:
MillisBehindLatest
指标逐渐增大,指示您的应用程序落后于流中的最新数据。MillisBehindLatest
指标始终高于 1000 (1 秒)。
如果满足以下条件,您无需调高您的应用程序的 InputParallelism
设置:
MillisBehindLatest
指标逐渐减小,指示您的应用程序跟上了流中的最新数据。MillisBehindLatest
指标小于 1000 (1 秒)。
有关使用的更多信息 CloudWatch,请参阅CloudWatch 用户指南。
实施多个应用程序内部输入流
如果应用程序是使用 CreateApplication 创建的,您可以设置应用程序内部输入流的数量。您应在使用UpdateApplication创建应用程序之后设置此数量。
注意
您只能使用Amazon Kinesis Data Analytics API 或Amazon CLI.InputParallelism
您无法使用 Amazon Web Services Management Console设置该设置。有关设置 Amazon CLI 的信息,请参阅步骤 2:设置 Amazon Command Line Interface (Amazon CLI)。
设置新应用程序的输入流计数
以下示例说明了如何使用 CreateApplication
API 操作将新应用程序的输入流计数设置为 2。
有关 CreateApplication
的更多信息,请参阅 CreateApplication。
{ "ApplicationCode": "
<The SQL code the new application will run on the input stream>
", "ApplicationDescription": "<A friendly description for the new application>
", "ApplicationName": "<The name for the new application>
", "Inputs": [ { "InputId": "ID for the new input stream
", "InputParallelism": { "Count": 2 }], "Outputs": [ ... ], }] }
设置现有应用程序的输入流计数
以下示例说明了如何使用 UpdateApplication
API 操作将现有应用程序的输入流计数设置为 2。
有关 Update_Application
的更多信息,请参阅 UpdateApplication。
{ "InputUpdates": [ { "InputId": "
yourInputId
", "InputParallelismUpdate": { "CountUpdate": 2 } } ], }
在您的 Amazon Kinesis Data Analytics 应用程序中访问单独的应用程序内流
要使用您的应用程序中的多个应用程序内部输入流,您必须从不同的流中显式选择。以下代码示例说明了如何在创建于“入门”教程中的应用程序中查询多个输入流。
在以下示例中,首先使用 CO UNT 聚合每个源流,然后合并为一个名为的应用程序内流in_application_stream001
。预先聚合源流有助于确保组合的应用程序内部流可处理来自多个流的流量而不会过载。
注意
要运行此示例并获得来自应用程序内部输入流的结果,请更新源流中的分片数和应用程序中的 InputParallelism
参数。
CREATE OR REPLACE STREAM in_application_stream_001 ( ticker VARCHAR(64), ticker_count INTEGER ); CREATE OR REPLACE PUMP pump001 AS INSERT INTO in_application_stream_001 SELECT STREAM ticker_symbol, COUNT(ticker_symbol) FROM source_sql_stream_001 GROUP BY STEP(source_sql_stream_001.rowtime BY INTERVAL '60' SECOND), ticker_symbol; CREATE OR REPLACE PUMP pump002 AS INSERT INTO in_application_stream_001 SELECT STREAM ticker_symbol, COUNT(ticker_symbol) FROM source_sql_stream_002 GROUP BY STEP(source_sql_stream_002.rowtime BY INTERVAL '60' SECOND), ticker_symbol;
前面的代码示例将在 in_application_stream001
中生成类似于下面的输出:

其他注意事项
在使用多个输入流时,请注意以下事项:
应用程序内部输入流的最大数量为 64。
应用程序内部输入流在应用程序的输入流的分片之间均匀分配。
通过添加应用程序内部流获得的性能改进无法线性扩展。也就是说,使应用程序内部流的数量加倍不会使吞吐量加倍。对于典型行大小,每个应用程序内部流可实现每秒大约 5,000 到 15,000 行的吞吐量。通过将应用程序内部流计数增加到 10,您可以实现每秒 20,000 到 30,000 行的吞吐量。吞吐量流速取决于输入流中的字段的计数、数据类型和数据大小。
某些聚合函数(如 AVG)在应用于分区到不同分片中的输入流时可能生成意外结果。由于您需要在将各个分片组合到聚合流中之前对它们运行聚合操作,因此结果可能向包含更多记录的流加权。
如果在增加输入流数量后,您的应用程序继续遇到性能不佳(反映在高
MillisBehindLatest
指标上),则可能已经达到了 Kinesis 处理单元 (KPU) 的极限。有关更多信息,请参阅 自动扩展应用程序以提高吞吐量。