并行处理输入流以增加吞吐量 - Amazon Kinesis Data Analytics 开发人员指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

并行处理输入流以增加吞吐量

Amazon Kinesis Data Analytics 应用程序可以支持多个应用程序内输入流,以将应用程序扩展得超出单个应用程序内输入流的吞吐量。有关应用程序内部输入流的更多信息,请参阅 用于 SQL 应用程序的 Amazon Kinesis Data Analytics:如何使用

几乎在所有情况下,Amazon Kinesis Data Analytics 分析都会扩展您的应用程序以处理馈入您的应用程序的 Kinesis Streams 或 Kinesis Data Fireams 源流的容量。但是,如果您的源流的吞吐量超出单个应用程序内部输入流的吞吐量,您可显式增加您的应用程序使用的应用程序内部输入流的数量。您需使用 InputParallelism 参数执行此操作。

InputParallelism参数大于 1,则 Amazon Kinesis Data Analytics 甚至会在应用程序内部流之间拆分源流的分区。例如,如果您的源流具有 50 个分片,并且您已将 InputParallelism 设置为 2,则每个应用程序内部输入流都将收到来自 25 个源流分片的输入。

当您增加应用程序内部流的数量后,您的应用程序必须显式访问每个流中的数据。有关通过代码访问多个应用程序内部流的信息,请参阅在您的 Amazon Kinesis Data Analytics 应用程序中访问单独的应用程序内部流

虽然 Kinesis Data Streams 和 Kinesis Data Firehose 流分片是以相同的方式在应用程序内部流之间划分的,但两者的差别在于它们向您的应用程序显示的方式:

  • Kinesis 数据流中的记录包含一个shard_id字段,可用于识别记录的源分片。

  • Kinesis Data Firehose 传输流中的记录不包括标识记录的源分片或分区的字段。这是因为 Kinesis Data Firehose 将此信息从您的应用程序中提取出来。

评估是否增加您的应用程序内部输入流的数量

在大多数情况下,单个应用程序内部输入流可处理单个源流的吞吐量,具体取决于输入流的复杂度和数据大小。要确定您是否需要增加应用程序内输入流的数量,您可以监控InputBytesMillisBehindLatestAmazon CloudWatch 中的指标。

如果 InputBytes 指标大于 100 MB/秒(或您预期该指标将大于此速率),这可能会使 MillisBehindLatest 增大并导致应用程序问题的影响扩大。为解决此问题,我们建议您为应用程序选择以下语言:

如果 MillisBehindLatest 指标具有以下任一特征,则应调高您的应用程序的 InputParallelism 设置:

  • MillisBehindLatest 指标逐渐增大,指示您的应用程序落后于流中的最新数据。

  • MillisBehindLatest 指标始终高于 1000 (1 秒)。

如果满足以下条件,您无需调高您的应用程序的 InputParallelism 设置:

  • MillisBehindLatest 指标逐渐减小,指示您的应用程序跟上了流中的最新数据。

  • MillisBehindLatest 指标小于 1000 (1 秒)。

有关使用 CloudWatch 的更多信息,请参阅CloudWatch 用户指南

实施多个应用程序内部输入流

如果应用程序是使用 CreateApplication 创建的,您可以设置应用程序内部输入流的数量。您应在使用UpdateApplication创建应用程序之后设置此数量。

注意

您只能将InputParallelism设置,使用 Amazon Kinesis Data Analytics API 或Amazon CLI。您无法使用 Amazon Web Services Management Console设置该设置。有关设置 Amazon CLI 的信息,请参阅第 2 步:设置 Amazon Command Line Interface (Amazon CLI)

设置新应用程序的输入流计数

以下示例说明了如何使用 CreateApplication API 操作将新应用程序的输入流计数设置为 2。

有关 CreateApplication 的更多信息,请参阅 CreateApplication

{ "ApplicationCode": "<The SQL code the new application will run on the input stream>", "ApplicationDescription": "<A friendly description for the new application>", "ApplicationName": "<The name for the new application>", "Inputs": [ { "InputId": "ID for the new input stream", "InputParallelism": { "Count": 2 }], "Outputs": [ ... ], }] }

设置现有应用程序的输入流计数

以下示例说明了如何使用 UpdateApplication API 操作将现有应用程序的输入流计数设置为 2。

有关 Update_Application 的更多信息,请参阅 UpdateApplication

{ "InputUpdates": [ { "InputId": "yourInputId", "InputParallelismUpdate": { "CountUpdate": 2 } } ], }

在您的 Amazon Kinesis Data Analytics 应用程序中访问单独的应用程序内部流

要使用您的应用程序中的多个应用程序内部输入流,您必须从不同的流中显式选择。以下代码示例说明了如何在创建于“入门”教程中的应用程序中查询多个输入流。

在以下示例中,每个源流首先通过COUNT之前组合到名为的单个应用程序内部流in_application_stream001。预先聚合源流有助于确保组合的应用程序内部流可处理来自多个流的流量而不会过载。

注意

要运行此示例并获得来自应用程序内部输入流的结果,请更新源流中的分片数和应用程序中的 InputParallelism 参数。

CREATE OR REPLACE STREAM in_application_stream_001 ( ticker VARCHAR(64), ticker_count INTEGER ); CREATE OR REPLACE PUMP pump001 AS INSERT INTO in_application_stream_001 SELECT STREAM ticker_symbol, COUNT(ticker_symbol) FROM source_sql_stream_001 GROUP BY STEP(source_sql_stream_001.rowtime BY INTERVAL '60' SECOND), ticker_symbol; CREATE OR REPLACE PUMP pump002 AS INSERT INTO in_application_stream_001 SELECT STREAM ticker_symbol, COUNT(ticker_symbol) FROM source_sql_stream_002 GROUP BY STEP(source_sql_stream_002.rowtime BY INTERVAL '60' SECOND), ticker_symbol;

前面的代码示例将在 in_application_stream001 中生成类似于下面的输出:

其他注意事项

在使用多个输入流时,请注意以下事项:

  • 应用程序内部输入流的最大数量为 64。

  • 应用程序内部输入流在应用程序的输入流的分片之间均匀分配。

  • 通过添加应用程序内部流获得的性能改进无法线性扩展。也就是说,使应用程序内部流的数量加倍不会使吞吐量加倍。对于典型行大小,每个应用程序内部流可实现每秒大约 5,000 到 15,000 行的吞吐量。通过将应用程序内部流计数增加到 10,您可以实现每秒 20,000 到 30,000 行的吞吐量。吞吐量流速取决于输入流中的字段的计数、数据类型和数据大小。

  • 某些聚合函数(如 AVG)在应用于分区到不同分片中的输入流时可能生成意外结果。由于您需要在将各个分片组合到聚合流中之前对它们运行聚合操作,因此结果可能向包含更多记录的流加权。

  • 如果您的应用程序继续遇到较差的性能(反映在MillisBehindLatest指标)在增加了输入流的数量后,您可能已达到 Kinesis 处理单元 (KPU) 的限制。有关更多信息,请参阅自动扩展应用程序以提高吞吐量