并行处理输入流以增加吞吐量 - Amazon Kinesis Data Analytics for SQL 应用程序开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

如果我们为英文版本指南提供翻译,那么如果存在任何冲突,将以英文版本指南为准。在提供翻译时使用机器翻译。

并行处理输入流以增加吞吐量

Amazon Kinesis Data Analytics 应用程序可以支持多个应用程序内部输入流,以将应用程序扩展到超出单个应用程序内部输入流的吞吐量。有关应用程序内部输入流的更多信息,请参阅 适用于 SQL 应用程序的 Amazon Kinesis Data Analytics :工作方式

几乎在所有情况下,Amazon Kinesis Data Analytics 扩展您的应用程序以处理传输到您的应用程序的 Kinesis 或 Kinesis Data Firehose 源流的容量。但是,如果您的源流的吞吐量超出单个应用程序内部输入流的吞吐量,您可显式增加您的应用程序使用的应用程序内部输入流的数量。您需使用 InputParallelism 参数执行此操作。

如果 InputParallelism 参数大于 1,则 Amazon Kinesis Data Analytics 在应用程序内部流之间平均拆分源流的分区。例如,如果您的源流具有 50 个分片,并且您已将 InputParallelism 设置为 2,则每个应用程序内部输入流都将收到来自 25 个源流分片的输入。

当您增加应用程序内部流的数量后,您的应用程序必须显式访问每个流中的数据。有关通过代码访问多个应用程序内部流的信息,请参阅在 Amazon Kinesis Data Analytics 应用程序中访问单独的应用程序内部流

虽然以相同的方式在应用程序内部流之间划分 Kinesis Data Streams 和 Kinesis Data Firehose 流分片,但以不同的方式向您的应用程序显示这些分片:

  • Kinesis 数据流中的记录包含一个 shard_id 字段,可用于指定记录的源分片。

  • Kinesis Data Firehose 传输流中的记录不包括标识记录的源分片或分区的字段。这是因为 Kinesis Data Firehose 将此信息从您的应用程序中提取出来。

评估是否增加您的应用程序内部输入流的数量

在大多数情况下,单个应用程序内部输入流可处理单个源流的吞吐量,具体取决于输入流的复杂度和数据大小。要确定您是否需要增加应用程序内部输入流数,您可以在 Amazon CloudWatch 中监控 MillisBehindLatestInputBytes 指标。

如果 InputBytes 指标大于 100 MB/秒(或您预期该指标将大于此速率),这可能会使 MillisBehindLatest 增大并导致应用程序问题的影响扩大。为解决此问题,我们建议您为应用程序选择以下语言:

  • 如果您的应用程序的扩展需求超过 100 MB/秒,请使用多个流和 Kinesis Data Analytics for SQL 应用程序。

  • 如果您希望使用单个流和应用程序,请使用Kinesis Data Analytics for Java 应用程序

如果 MillisBehindLatest 指标具有以下任一特征,则应调高您的应用程序的 InputParallelism 设置:

  • MillisBehindLatest 指标逐渐增大,指示您的应用程序落后于流中的最新数据。

  • MillisBehindLatest 指标始终高于 1000 (1 秒)。

如果满足以下条件,您无需调高您的应用程序的 InputParallelism 设置:

  • MillisBehindLatest 指标逐渐减小,指示您的应用程序跟上了流中的最新数据。

  • MillisBehindLatest 指标小于 1000 (1 秒)。

有关使用 CloudWatch 的更多信息,请参阅 CloudWatch 用户指南

实施多个应用程序内部输入流

如果应用程序是使用 CreateApplication 创建的,您可以设置应用程序内部输入流的数量。您应在使用UpdateApplication创建应用程序之后设置此数量。

注意

您只能使用 Amazon Kinesis Data Analytics API 或 AWS CLI 设置 InputParallelism 设置。您无法使用 AWS 管理控制台设置该设置。有关设置 AWS CLI 的信息,请参阅步骤 2. 设置 AWS Command Line Interface (AWS CLI)

设置新应用程序的输入流计数

以下示例说明了如何使用 CreateApplication API 操作将新应用程序的输入流计数设置为 2。

有关 CreateApplication,参见 CreateApplication.

{ "ApplicationCode": "<The SQL code the new application will run on the input stream>", "ApplicationDescription": "<A friendly description for the new application>", "ApplicationName": "<The name for the new application>", "Inputs": [ { "InputId": "ID for the new input stream", "InputParallelism": { "Count": 2 }], "Outputs": [ ... ], }] }

设置现有应用程序的输入流计数

以下示例说明了如何使用 UpdateApplication API 操作将现有应用程序的输入流计数设置为 2。

有关 Update_Application,参见 UpdateApplication.

{ "InputUpdates": [ { "InputId": "yourInputId", "InputParallelismUpdate": { "CountUpdate": 2 } } ], }

在 Amazon Kinesis Data Analytics 应用程序中访问单独的应用程序内部流

要使用您的应用程序中的多个应用程序内部输入流,您必须从不同的流中显式选择。以下代码示例说明了如何在创建于“入门”教程中的应用程序中查询多个输入流。

在以下示例中,将使用 计数 在一个名为 in_application_stream001。事先聚合源流有助于确保合并的应用内流可以处理多个流的流量,而不会过载。

注意

要运行此示例并获得来自应用程序内部输入流的结果,请更新源流中的分片数和应用程序中的 InputParallelism 参数。

CREATE OR REPLACE STREAM in_application_stream_001 ( ticker VARCHAR(64), ticker_count INTEGER ); CREATE OR REPLACE PUMP pump001 AS INSERT INTO in_application_stream_001 SELECT STREAM ticker_symbol, COUNT(ticker_symbol) FROM source_sql_stream_001 GROUP BY STEP(source_sql_stream_001.rowtime BY INTERVAL '60' SECOND), ticker_symbol; CREATE OR REPLACE PUMP pump002 AS INSERT INTO in_application_stream_001 SELECT STREAM ticker_symbol, COUNT(ticker_symbol) FROM source_sql_stream_002 GROUP BY STEP(source_sql_stream_002.rowtime BY INTERVAL '60' SECOND), ticker_symbol;

前面的代码示例将在 in_application_stream001 中生成类似于下面的输出:

其他注意事项

在使用多个输入流时,请注意以下事项:

  • 应用程序内部输入流的最大数量为 64。

  • 应用程序内部输入流在应用程序的输入流的分片之间均匀分配。

  • 通过添加应用程序内部流获得的性能改进无法线性扩展。也就是说,使应用程序内部流的数量加倍不会使吞吐量加倍。对于典型行大小,每个应用程序内部流可实现每秒大约 5,000 到 15,000 行的吞吐量。通过将应用程序内部流计数增加到 10,您可以实现每秒 20,000 到 30,000 行的吞吐量。吞吐量流速取决于输入流中的字段的计数、数据类型和数据大小。

  • 某些聚合函数(如 AVG)在应用于分区到不同分片中的输入流时可能生成意外结果。由于您需要在将各个分片组合到聚合流中之前对它们运行聚合操作,因此结果可能向包含更多记录的流加权。

  • 如果您增加了输入流的数量后,应用程序性能仍然很差(反映在较高的 MillisBehindLatest 指标上),您可能已达到 Kinesis 处理单元 (KPU) 的限制。有关更多信息,请参阅 自动扩展应用程序以提高吞吐量。)