最佳实操 - 适用于 SQL 应用程序的 Amazon Kinesis Data Analytics 开发人员指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,建议您使用新的适用于 Apache Flink Studio 的托管服务,而不是使用适用于 SQL 应用程序的 Kinesis Data Analytics。Managed Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

最佳实操

此部分介绍了使用 Amazon Kinesis Data Analytics 应用程序时的最佳实践。

管理应用程序

在管理 Amazon Kinesis Data Analytics 应用程序时,请遵循以下最佳实践:

  • 设置亚马逊 CloudWatch 警报 — 您可以使用Kinesis Data Analytics提供的 CloudWatch 指标来监控以下内容:

    • 输入字节和输入记录(输入应用程序的字节和记录的数目)

    • 输出字节和输出记录

    • MillisBehindLatest(从流式传输源进行读取时的应用程序滞后)

    我们建议您针对生产中的应用程序的以下指标设置至少两个 CloudWatch 警报:

    • MillisBehindLatest – 大多数情况下,建议您将此警报设置为在应用程序滞后于最新数据 1 小时的情况下触发(平均每分钟触发 1 次)。对于 end-to-end 处理需求较低的应用程序,您可以将其调整为较低的容差。此警报可帮助确保应用程序读取最新数据。

       

  • 为避免出现 ReadProvisionedThroughputException 异常,请将从同一 Kinesis 数据流中读取的生产应用程序数限制为 2 个应用程序。

    注意

    在这种情况下,应用程序 指可从流式传输源读取的任何应用程序。只有 Kinesis Data Analytics 应用程序可以从 Firehose 传输流中读取数据。但是,许多应用程序可以从 Kinesis 数据流中读取数据,例如 Kinesis Data Analytics 应用程序或。 Amazon Lambda建议的应用程序限制指的是配置为从流式传输源读取的总应用程序数。

     

    Amazon Kinesis Data Analytics 大约每秒为每个应用程序读取一次流式传输源。不过,滞后的应用程序可以更快的速率读取数据以便保持一致。要允许应用程序的足够吞吐量以便保持一致,请限制读取同一数据源的应用程序的数目。

     

  • 将从同一 Firehose 交付流中读取的生产应用程序数量限制为一个应用程序。

    Firehose 传输流可以写入到亚马逊 S3 和亚马逊 Redshift 等目的地。它还可以作为 Kinesis Data Analytics 应用程序的流式传输源。因此,我们建议您不要为每个 Firehose 交付流配置多个 Kinesis Data Analytics 应用程序。这有助于确保传输流还可以传输到其他目标。

扩展应用程序

通过从默认值(一)开始主动增加应用程序内输入流的数量,设置应用程序以满足您未来的扩展需求。我们建议根据应用程序的吞吐量选择以下语言:

注意

建议您定期检查应用程序的 InputProcessing.OkBytes 指标,以便您可以规划使用多个 SQL 应用程序,或者如果应用程序的预计输入吞吐量将超过 100 MB/秒,则可以迁移到 managed-flink/latest/java/。

监控应用程序

我们建议您创建 CloudWatch 警报,InputProcessing.OkBytes以便在应用程序接近输入吞吐量限制时收到通知。您可以通过这种方式更新应用程序查询,从而获得更高的吞吐量,避免分析时的反向压力和延迟。有关更多信息,请参阅故障排除。这种方式也适用于具备上游吞吐量降低机制这一情况。

  • 对于单个应用程序内流,我们建议的最大吞吐量为 2 到 20 MB/ 秒,具体取决于应用程序查询的复杂性。

  • 适用于 SQL 的 Kinesis Data Analytics 单个应用程序可以处理的最大流式处理吞吐量约为 100 MB/秒。假设您已将应用程序内部流的数量增加到最大值 64,并且您的 KPU 限制已超过 8。有关更多信息,请参阅限制

注意

建议您定期检查应用程序的 InputProcessing.OkBytes 指标,以便您可以规划使用多个 SQL 应用程序,或者如果应用程序的预计输入吞吐量将超过 100 MB/秒,则可以迁移到 managed-flink/latest/java/。

定义输入架构

在控制台中配置应用程序输入时,您首先指定流式传输源。随后,控制台将使用发现 API(请参阅 DiscoverInputSchema)对流式传输源上的记录采样来推断架构。此外,架构在产生的应用程序内部流中定义列的名称和数据类型。控制台将显示架构。建议您对此推断的架构执行以下操作:

  • 充分测试推断的架构。发现过程仅使用流式传输源上的记录示例来推断架构。如果您的流式传输源有多种记录类型,则发现 API 可能错过了一种或多种记录类型的采样。这种情况会导致架构不能准确反映流式传输源上的数据。

    当您的应用程序启动时,这些丢失的记录类型可能会导致解析错误。Amazon Kinesis Data Analytics 将这些记录发送到应用程序内部错误流。要减少这些解析错误,建议您在控制台中以交互方式测试推断的架构,并监控应用程序内部流是否缺少记录。

     

  • Kinesis Data Analytics API 不支持在输入配置中指定列的 NOT NULL 约束。如果您希望在应用程序内部流中指定列的 NOT NULL 约束,请使用应用程序代码创建这些应用程序内部流。随后,您可以将数据从一个应用程序内部流复制到另一个应用程序内部流,之后将强制实施该约束。

    在需要值时尝试在行中插入 NULL 值会导致出现错误。Kinesis Data Analytics 会将这些错误发送到应用程序内部错误流。

     

  • 放宽发现过程所推断的数据类型。发现过程将根据流式传输源中记录的随机采样来建议列和数据类型。建议您仔细审查这些列和数据类型,并考虑放宽这些数据类型以涵盖输入中所有可能的记录情况。这可确保应用程序在运行时出现的分析错误更少。例如,如果推断的架构具有 SMALLINT 作为列类型,则可考虑将列类型更改为 INTEGER

     

  • 在应用程序代码中使用 SQL 函数来处理任何非结构化的数据或列。您的输入中可包含非结构化的数据或列(例如,日志数据)。有关示例,请参阅示例:转换 DateTime 值。处理此类数据的一种方法是,定义仅具有一个类型 VARCHAR(N) 的列的架构,其中 N 是您应在流中看到的可能最大的行。随后,可在您的应用程序代码中读取传入记录,并使用 StringDate Time 函数来解析和架构化原始数据。

     

  • 确保您能够完全处理包含两个级别以上的嵌套的流式传输源数据。当源数据为 JSON 时,您可以使用嵌套。发现 API 会推断可展平一个嵌套层的架构。对于两层嵌套,发现 API 也将尝试展平嵌套。在超出两层嵌套的情况下,对展平的支持是有限的。要完全处理嵌套,您必须手动修改推断的架构来满足您的需求。可使用下列任一策略执行此操作:

     

    • 使用 JSON 行路径可选择性地只提取应用程序所需的键值对。JSON 行路径为要引入应用程序中的特定键值对提供了一个指针。您可为任何级别的嵌套执行此操作。

    • 使用 JSON 行路径可选择性地提取复杂的 JSON 对象,然后在应用程序代码中使用字符串处理函数提取所需的特定数据。

连接到输出

建议每个应用程序具有至少两个输出:

  • 使用第一个目标插入 SQL 查询的结果。

  • 使用第二个目标插入整个错误流,并通过 Firehose 传输流将其发送到 S3 存储桶。

创作应用程序代码

我们建议执行下列操作:

  • 在 SQL 语句中,不要指定超过 1 个小时的基于时间的窗口,原因如下:

    • 有时,需要重新启动应用程序,这是因为您更新了应用程序或出于 Kinesis Data Analytics 内部原因。在重新启动时,将从流式数据源中重新读取窗口中包含的所有数据。这需要等待一段时间,然后 Kinesis Data Analytics 才能发送该窗口的输出。

    • Kinesis Data Analytics 必须将与应用程序状态相关的所有内容(包括相关数据)保留一段时间。这会消耗大量的 Kinesis Data Analytics 处理单元。

  • 在开发期间,在 SQL 语句中设置较小的窗口以便更快地查看结果。当您将应用程序部署到生产环境时,可以设置适当的窗口大小。

  • 不要使用单个复杂的 SQL 语句,而是在将结果保存到中间应用程序内部流的每个步骤中将此语句分成多个语句。这可帮助您加快调试速度。

  • 在您使用滚动窗口时,建议您使用两个窗口,一个用于处理时间,另一个用于逻辑时间(接收时间或事件时间)。有关更多信息,请参阅时间戳和 ROWTIME 列

测试应用程序

在更改 Kinesis Data Analytics 应用程序的架构或应用程序代码时,我们建议使用测试应用程序验证您的更改,然后再将其部署到生产环境中。

设置测试应用程序

您可以通过控制台或使用 Amazon CloudFormation 模板设置测试应用程序。使用 Amazon CloudFormation 模板有助于确保您对测试应用程序和实时应用程序所做的代码更改保持一致。

设置测试应用程序时,您可以将应用程序连接到活动数据,或者使用模拟数据来填充流以进行测试。建议通过两种方法来使用模拟数据填充流:

  • 使用 Kinesis Data Generator (KDG)。KDG 使用数据模板将随机数据发送到 Kinesis 流。KDG 易于使用,但不适合测试数据项之间的复杂关系,例如检测数据热点或异常的应用程序。

  • 使用自定义 Python 应用程序以将更复杂的数据发送到 Kinesis 数据流。Python 应用程序可以生成数据项之间的复杂关系,例如热点或异常。有关将数据集群化发送到数据热点的 Python 应用程序示例,请参阅示例:检测流上的热点 (HOTSPOTS 函数)

运行测试应用程序时,请使用目标(例如到 Amazon Redshift 数据库的 Firehose 传输流)查看结果,而不是在控制台上查看应用程序内流。控制台上显示的数据是流的采样,并未包含所有记录。

测试架构更改

更改应用程序的输入流架构时,使用测试应用程序来验证以下情况:

  • 来自您的流中的数据强制转换为正确的数据类型。例如,确保日期时间数据未作为字符串接收到应用程序中。

  • 进行解析的数据强制转换为您需要的数据类型。如果出现解析或强制转换错误,您可以在控制台上查看,或者为错误流分配目标并在目标存储中查看错误。

  • 字符数据的数据字段具有足够的长度,并且应用程序未截断字符数据。您可以在目标存储中查看数据记录,验证未截断您的应用程序数据。

测试代码更改

测试对 SQL 代码的更改时,需要了解关于您应用程序的领域的一些知识。您必须确定需要能够测试哪些输出以及正确的输出应该是什么。有关在修改应用程序的 SQL 代码时可能需要验证的问题领域,请参阅适用于 SQL 应用程序的 Amazon Kinesis Data Analytics 故障排除