Amazon Managed Service for Apache Flink(Amazon MSF)之前称为 Amazon Kinesis Data Analytics for Apache Flink。
Apache Beam 应用程序出现检查点故障
如果你的 Beam 应用程序配置了 sh utdownSourcesAfterIdlems
症状
前往 Managed Service for Apache Flink 应用程序 CloudWatch 日志,检查是否已记录以下日志消息。以下日志消息表明,由于某些任务已经完成,检查点未能触发。
{ "locationInformation": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:888)", "logger": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator", "message": "Failed to trigger checkpoint for jobyour job IDsince some tasks of jobyour job IDhas been finished, abort the checkpoint Failure reason: Not all required tasks are currently running.", "threadName": "Checkpoint Timer", "applicationARN":your application ARN, "applicationVersionId": "5", "messageSchemaVersion": "1", "messageType": "INFO" }
这也可以在 Flink 控制面板上找到,其中一些任务已进入 “已完成” 状态,并且无法再进行检查点操作了。
原因
ShutdownSourcesAfterIdlems 是一个 Beam 配置变量,用于关闭在配置的毫秒时间内处于空闲状态的信号源。一旦源被关闭,就无法再进行检查点检查了。这可能导致检查点失败
任务进入 “已完成” 状态的原因之一是 shutdownSourcesAfterIdlems 设置为 0ms,这意味着空闲的任务将立即关闭。
解决方案
要防止任务立即进入 “已完成” 状态,请将 shutdownSourcesAfterIdlems 设置为 long.max_Value。这可以通过两种方式完成:
-
选项 1:如果在 Managed Service for Apache Flink 应用程序配置页面中设置了光束配置,则可以添加新的密钥值对来设置 shutdpwnSourcesAfterIdlems,如下所示:
-
选项 2:如果在 JAR 文件中设置了光束配置,则可以按如下方式设置 shutdownSourcesAfterIdlems:
FlinkPipelineOptions options = PipelineOptionsFactory.create().as(FlinkPipelineOptions.class); // Initialize Beam Options object options.setShutdownSourcesAfterIdleMs(Long.MAX_VALUE); // set shutdownSourcesAfterIdleMs to Long.MAX_VALUE options.setRunner(FlinkRunner.class); Pipeline p = Pipeline.create(options); // attach specified options to Beam pipeline