在 Kinesis Data Analytics for Java Applications 中实施容错功能 - Amazon Kinesis Data Analytics
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

在 Kinesis Data Analytics for Java Applications 中实施容错功能

检查点是用于在 Amazon Kinesis Data Analytics for Java Applications 中实施容错功能的方法。检查点 是运行的应用程序的最新备份,用于立即从意外的应用程序中断或故障转移中恢复。

有关 Apache Flink 应用程序中的检查点的详细信息,请参阅 Apache Flink 文档中的检查点

快照 是手动创建和管理的应用程序状态备份。通过使用快照,您可以调用 UpdateApplication 以将应用程序还原到以前的状态。有关更多信息,请参阅使用快照管理应用程序备份

如果为应用程序启用了检查点,该服务将创建应用程序数据备份,并在应用程序意外重新启动时加载该备份以提供容错功能。这些意外的应用程序重新启动可能是由意外的作业重新启动、实例故障等引起的。这会在这些重新启动期间为应用程序提供与无故障执行相同的语义。

如果为应用程序启用了快照,并使用应用程序的 ApplicationRestoreConfiguration 配置了快照,则该服务在应用程序更新期间或与服务相关的扩展或维护期间提供恰好一次处理语义。

在 Kinesis Data Analytics for Java Applications 中配置检查点

您可以配置应用程序的检查点行为。您可以定义它是否永久保存检查点状态、将其状态保存到检查点的频率以及一个检查点操作结束到另一个检查点操作开始之间的最小间隔。

您可以使用 CreateApplicationUpdateApplication API 操作配置以下设置:

  • CheckpointingEnabled — 指示是否在应用程序中启用了检查点。

  • CheckpointInterval — 包含检查点(持久性)操作之间的时间(以毫秒为单位)。

  • ConfigurationType — 可以将该值设置为 DEFAULT 以使用默认检查点行为;将该值设置为 CUSTOM 以配置其他值。

    注意

    默认检查点行为如下所示:

    • CheckpointingEnabled:true

    • CheckpointInterval:60000

    • MinPauseBetweenCheckpoints:5000

    如果 ConfigurationType 设置为 DEFAULT,将使用前面的值,即使使用 AWS CLI 或在应用程序代码中设置值以将其设置为其他值。

  • MinPauseBetweenCheckpoints — 从一个检查点操作结束到另一个检查点操作开始之间的最短时间(以毫秒为单位)。如果设置该值,则可以防止应用程序在检查点操作所花的时间超过 CheckpointInterval 时继续执行检查点操作。

检查点 API 示例

本节包含为应用程序配置检查点的 API 操作的示例请求。有关如何将 JSON 文件用于 API 操作输入的信息,请参阅 Kinesis Data Analytics API 示例代码

为新应用程序配置检查点

CreateApplication 操作的以下示例请求在您创建应用程序时配置检查点:

{ "ApplicationName": "MyApplication", "RuntimeEnvironment":"FLINK-1_8", "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole", "ApplicationConfiguration": { "ApplicationCodeConfiguration":{ "CodeContent":{ "S3ContentLocation":{ "BucketARN":"arn:aws:s3:::mybucket", "FileKey":"myflink.jar", "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345" } }, "FlinkApplicationConfiguration": { "CheckpointConfiguration": { "CheckpointingEnabled": "true", "CheckpointInterval": 20000, "ConfigurationType": "CUSTOM", "MinPauseBetweenCheckpoints": 10000 } } }

为新应用程序禁用检查点

CreateApplication 操作的以下示例请求在您创建应用程序时禁用检查点:

{ "ApplicationName": "MyApplication", "RuntimeEnvironment":"FLINK-1_8", "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole", "ApplicationConfiguration": { "ApplicationCodeConfiguration":{ "CodeContent":{ "S3ContentLocation":{ "BucketARN":"arn:aws:s3:::mybucket", "FileKey":"myflink.jar", "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345" } }, "FlinkApplicationConfiguration": { "CheckpointConfiguration": { "CheckpointingEnabled": "false" } } }

为现有应用程序配置检查点

UpdateApplication 操作的以下示例请求为现有应用程序配置检查点:

{ "ApplicationName": "MyApplication", "ApplicationConfigurationUpdate": { "FlinkApplicationConfigurationUpdate": { "CheckpointConfigurationUpdate": { "CheckpointingEnabledUpdate": "true", "CheckpointIntervalUpdate": 20000, "ConfigurationTypeUpdate": "CUSTOM", "MinPauseBetweenCheckpointsUpdate": 10000 } } } }

为现有应用程序禁用检查点

UpdateApplication 操作的以下示例请求为现有应用程序禁用检查点:

{ "ApplicationName": "MyApplication", "ApplicationConfigurationUpdate": { "FlinkApplicationConfigurationUpdate": { "CheckpointConfigurationUpdate": { "CheckpointingEnabledUpdate": "false", "CheckpointIntervalUpdate": 20000, "ConfigurationTypeUpdate": "CUSTOM", "MinPauseBetweenCheckpointsUpdate": 10000 } } } }