最佳实践 - SQL适用于应用程序的 Amazon Kinesis Data Analytics 开发者指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

经过仔细考虑,我们决定分两个步骤停止使用亚马逊 Kinesis Data Analytics SQL 的应用程序:

1. 从 2025 年 10 月 15 日起,您将无法为应用程序创建新的 Kinesis Data Analytic SQL s。

2. 从 2026 年 1 月 27 日起,我们将删除您的应用程序。您将无法启动或操作适用于应用程序的 Amazon Kinesis Data Analytic SQL s。从那时起,亚马逊 Kinesis Data Analytics SQL 将不再提供支持。有关更多信息,请参阅 适用于应用程序的 Amazon Kinesis Data Analytic SQL s 停产

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

最佳实践

此部分介绍了使用 Amazon Kinesis Data Analytics 应用程序时的最佳实践。

管理应用程序

在管理 Amazon Kinesis Data Analytics 应用程序时,请遵循以下最佳实践:

  • 设置亚马逊 CloudWatch 警报 — 您可以使用Kinesis Data Analytics提供的 CloudWatch 指标来监控以下内容:

    • 输入字节和输入记录(输入应用程序的字节和记录的数目)

    • 输出字节和输出记录

    • MillisBehindLatest(从流式传输源进行读取时的应用程序滞后)

    我们建议您针对生产中的应用程序的以下指标设置至少两个 CloudWatch 警报:

    • MillisBehindLatest – 大多数情况下,建议您将此警报设置为在应用程序滞后于最新数据 1 小时的情况下触发(平均每分钟触发 1 次)。对于 end-to-end处理需求较低的应用程序,您可以将其调整为较低的容差。此警报可帮助确保应用程序读取最新数据。

       

  • 为避免出现 ReadProvisionedThroughputException 异常,请将从同一 Kinesis 数据流中读取的生产应用程序数限制为 2 个应用程序。

    注意

    在这种情况下,应用程序 指可从流式传输源读取的任何应用程序。只有 Kinesis Data Analytics 应用程序可对 Firehose 传输流进行读取。但是,许多应用程序可以从 Kinesis 数据流中读取数据,例如 Kinesis Data Analytics 应用程序或。 Amazon Lambda建议的应用程序限制指的是配置为从流式传输源读取的总应用程序数。

     

    Amazon Kinesis Data Analytics 大约每秒为每个应用程序读取一次流式传输源。不过,滞后的应用程序可以更快的速率读取数据以便保持一致。要允许应用程序的足够吞吐量以便保持一致,请限制读取同一数据源的应用程序的数目。

     

  • 将从同一 Firehose 传输流中读取的生产应用程序数限制为一个应用程序。

    Firehose 传输流可以写入 Amazon S3 和 Amazon Redshift 等目标。它还可以作为 Kinesis Data Analytics 应用程序的流式传输源。因此,我们建议您不要为每个 Firehose 传输流配置多个 Kinesis Data Analytics 应用程序。这有助于确保传输流还可以传输到其他目标。

扩展应用程序

通过从默认值(一)开始主动增加应用程序内输入流的数量,设置应用程序以满足您未来的扩展需求。我们建议根据应用程序的吞吐量选择以下语言:

注意

我们建议您定期查看应用程序的InputProcessing.OkBytes指标,以便您可以提前计划使用多个SQL应用程序或迁移到托管-flink/latest/java/ if your application’s projected input throughput exceeds 100 MB/sec。

监控应用程序

我们建议您创建 CloudWatch 警报,InputProcessing.OkBytes以便在应用程序接近输入吞吐量限制时收到通知。您可以通过这种方式更新应用程序查询,从而获得更高的吞吐量,避免分析时的反向压力和延迟。有关更多信息,请参阅故障排除。这种方式也适用于具备上游吞吐量降低机制这一情况。

  • 对于单个应用程序内流,我们建议的最大吞吐量为 2 到 20 MB/ 秒,具体取决于应用程序查询的复杂性。

  • SQL适用于应用程序的单个 Kinesis Data Analytics 可以处理的最大流吞吐量约为 100 MB/秒。这假设您已将应用程序内流的数量增加到最大值 64,并且已将KPU限制增加到 8 以上。有关更多信息,请参阅限制

注意

我们建议您定期查看应用程序的InputProcessing.OkBytes指标,以便您可以提前计划使用多个SQL应用程序或迁移到托管-flink/latest/java/ if your application’s projected input throughput exceeds 100 MB/sec。

定义输入架构

在控制台中配置应用程序输入时,您首先指定流式传输源。然后,控制台使用发现API(参见DiscoverInputSchema)通过对流媒体源上的记录进行采样来推断架构。此外,架构在产生的应用程序内部流中定义列的名称和数据类型。控制台将显示架构。建议您对此推断的架构执行以下操作:

  • 充分测试推断的架构。发现过程仅使用流式传输源上的记录示例来推断架构。如果您的直播源有许多记录类型,则发现API可能错过了对一种或多种记录类型的采样。这种情况会导致架构不能准确反映流式传输源上的数据。

    当您的应用程序启动时,这些丢失的记录类型可能会导致解析错误。Amazon Kinesis Data Analytics 将这些记录发送到应用程序内部错误流。要减少这些解析错误,建议您在控制台中以交互方式测试推断的架构,并监控应用程序内部流是否缺少记录。

     

  • Kinesis Data API Analytics 不支持NOT NULL在输入配置中指定对列的限制。如果您希望在应用程序内部流中指定列的 NOT NULL 约束,请使用应用程序代码创建这些应用程序内部流。随后,您可以将数据从一个应用程序内部流复制到另一个应用程序内部流,之后将强制实施该约束。

    在需要值时尝试在行中插入 NULL 值会导致出现错误。Kinesis Data Analytics 会将这些错误发送到应用程序内部错误流。

     

  • 放宽发现过程所推断的数据类型。发现过程将根据流式传输源中记录的随机采样来建议列和数据类型。建议您仔细审查这些列和数据类型,并考虑放宽这些数据类型以涵盖输入中所有可能的记录情况。这可确保应用程序在运行时出现的分析错误更少。例如,如果推断的架构具有 SMALLINT 作为列类型,则可考虑将列类型更改为 INTEGER

     

  • 使用应用程序代码中的SQL函数来处理任何非结构化数据或列。您的输入中可包含非结构化的数据或列(例如,日志数据)。有关示例,请参阅 示例:转换 DateTime 值。处理此类数据的一种方法是,定义仅具有一个类型 VARCHAR(N) 的列的架构,其中 N 是您应在流中看到的可能最大的行。随后,可在您的应用程序代码中读取传入记录,并使用 StringDate Time 函数来解析和架构化原始数据。

     

  • 确保您能够完全处理包含两个级别以上的嵌套的流式传输源数据。当源数据为时JSON,您可以进行嵌套。该发现API推断出一种将嵌套层扁平化的架构。对于两个层次的筑巢,该发现API还试图将它们夷为平地。在超出两层嵌套的情况下,对展平的支持是有限的。要完全处理嵌套,您必须手动修改推断的架构来满足您的需求。可使用下列任一策略执行此操作:

     

    • 使用JSON行路径有选择地仅提取应用程序所需的键值对。JSON行路径提供了指向要在应用程序中引入的特定键值对的指针。您可为任何级别的嵌套执行此操作。

    • 使用JSON行路径有选择地提取复杂的JSON对象,然后在应用程序代码中使用字符串操作函数来提取所需的特定数据。

连接到输出

建议每个应用程序具有至少两个输出:

  • 使用第一个目标插入SQL查询结果。

  • 使用第二个目标插入整个错误流,并通过 Firehose 传输流将其发送到 S3 存储桶。

创作应用程序代码

我们建议执行下列操作:

  • 在您的SQL声明中,不要指定长于一小时的基于时间的窗口,原因如下:

    • 有时,需要重新启动应用程序,这是因为您更新了应用程序或出于 Kinesis Data Analytics 内部原因。在重新启动时,将从流式数据源中重新读取窗口中包含的所有数据。这需要等待一段时间,然后 Kinesis Data Analytics 才能发送该窗口的输出。

    • Kinesis Data Analytics 必须将与应用程序状态相关的所有内容(包括相关数据)保留一段时间。这会消耗大量的 Kinesis Data Analytics 处理单元。

  • 在开发过程中,在SQL语句中保持较小的窗口大小,这样可以更快地看到结果。当您将应用程序部署到生产环境时,可以设置适当的窗口大小。

  • 与其使用单个复杂的SQL语句,不如考虑将其分成多个语句,在每个步骤中将结果保存到应用程序内部的中间流中。这可帮助您加快调试速度。

  • 在您使用滚动窗口时,建议您使用两个窗口,一个用于处理时间,另一个用于逻辑时间(接收时间或事件时间)。有关更多信息,请参阅 时间戳和 ROWTIME 列

测试应用程序

在更改 Kinesis Data Analytics 应用程序的架构或应用程序代码时,我们建议使用测试应用程序验证您的更改,然后再将其部署到生产环境中。

设置测试应用程序

您可以通过控制台或使用 Amazon CloudFormation 模板设置测试应用程序。使用 Amazon CloudFormation 模板有助于确保您对测试应用程序和实时应用程序所做的代码更改保持一致。

设置测试应用程序时,您可以将应用程序连接到活动数据,或者使用模拟数据来填充流以进行测试。建议通过两种方法来使用模拟数据填充流:

  • 使用 Kinesis 数据生成器 () KDG。KDG使用数据模板向 Kinesis 流发送随机数据。使用KDG简单,但不适用于测试数据项之间的复杂关系,例如用于检测数据热点或异常的应用程序。

  • 使用自定义 Python 应用程序以将更复杂的数据发送到 Kinesis 数据流。Python 应用程序可以生成数据项之间的复杂关系,例如热点或异常。有关将数据集群化发送到数据热点的 Python 应用程序示例,请参阅示例:检测流上的热点 (HOTSPOTS 函数)

在运行测试应用程序时,使用目标(例如,流向 Amazon Redshift 数据库的 Firehose 传输流)查看结果,而不是在控制台上查看应用程序内部流。控制台上显示的数据是流的采样,并未包含所有记录。

测试架构更改

更改应用程序的输入流架构时,使用测试应用程序来验证以下情况:

  • 来自您的流中的数据强制转换为正确的数据类型。例如,确保日期时间数据未作为字符串接收到应用程序中。

  • 进行解析的数据强制转换为您需要的数据类型。如果出现解析或强制转换错误,您可以在控制台上查看,或者为错误流分配目标并在目标存储中查看错误。

  • 字符数据的数据字段具有足够的长度,并且应用程序未截断字符数据。您可以在目标存储中查看数据记录,验证未截断您的应用程序数据。

测试代码更改

测试对SQL代码的更改需要对应用程序有一定的领域了解。您必须确定需要能够测试哪些输出以及正确的输出应该是什么。有关修改应用程序SQL代码时需要验证的潜在问题区域,请参阅Amazon Kinesis Data Analytics·for·SQL 应用程序故障排除