适用于 Apache Flink 的亚马逊托管服务(亚马逊 MSF)以前被称为适用于 Apache Flink 的亚马逊 Kinesis Data Analytics。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
创建并运行适用于 Apache Flink 的托管服务应用程序
在此步骤中,您将创建一个以 Kinesis 数据流作为源和接收器的适用于 Apache Flink 的托管服务。
本节包含以下步骤:
创建依赖资源
在本练习中,创建Managed Service for Apache Flink的应用程序之前,您需要创建以下从属资源:
-
两个 Kinesis 数据流用于输入和输出
-
用于存储应用程序代码的 Amazon S3 存储桶
注意
本教程假设您正在美国东部(弗吉尼亚北部)us-east-1 区域部署应用程序。如果您使用其他区域,请相应地调整所有步骤。
创建两个 Amazon Kinesis 数据流
在为本练习创建 Managed Service for Apache Flink 应用程序之前,请创建两个 Kinesis 数据流(ExampleInputStream和ExampleOutputStream)。您的应用程序将这些数据流用于应用程序源和目标流。
您可以使用 Amazon Kinesis 控制台或以下 Amazon CLI 命令创建这些直播。有关控制台说明,请参阅 Amazon Kinesis Data Streams 开发人员指南中的创建和更新数据流。要使用创建直播 Amazon CLI,请使用以下命令,根据您用于应用程序的区域进行调整。
创建数据流 (Amazon CLI)
-
要创建第一个直播 (
ExampleInputStream),请使用以下 Amazon Kinesis 命令create-streamAmazon CLI :$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 \ -
要创建应用程序用来写入输出的第二个流,请运行相同的命令,将流名称更改为
ExampleOutputStream:$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1 \
为应用程序代码创建 Amazon S3 存储桶
您可以使用控制台来创建 Amazon S3 存储桶。要了解如何使用控制台创建 Amazon S3 存储桶,请参阅 Amazon S3 用户指南中的创建存储桶。使用全球唯一名称命名 Amazon S3 存储桶,例如附加您的登录名。
注意
请务必在本教程中使用的区域 (us-east-1) 中创建存储桶。
其他资源
在您创建应用程序时,适用于 Apache Flink 的托管服务会自动创建以下 Amazon CloudWatch 资源(如果这些资源尚不存在):
-
名为
/AWS/KinesisAnalytics-java/<my-application>的日志组 -
名为
kinesis-analytics-log-stream的日志流
设置本地开发环境
对于开发和调试,您可以直接从所选的 IDE 在计算机上运行 Apache Flink 应用程序。任何 Apache Flink 依赖关系都像使用 Apache Maven 的常规 Java 依赖项一样处理。
注意
在你的开发计算机上,你必须安装 Java JDK 11、Maven 和 Git。我们建议你使用诸如 Eclipse Java Neon 或 IntelliJ IDEA
对您的 Amazon 会话进行身份验证
该应用程序使用 Kinesis 数据流来发布数据。在本地运行时,您必须拥有有效的 Amazon 经过身份验证的会话,并具有写入 Kinesis 数据流的权限。使用以下步骤对您的会话进行身份验证:
-
如果您没有配置带有有效凭据 Amazon CLI 的命名配置文件,请参阅设置 Amazon Command Line Interface (Amazon CLI)。
-
通过发布以下测试记录,验证您的配置 Amazon CLI 是否正确,并且您的用户有权写入 Kinesis 数据流:
$ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST -
如果您的 IDE 有要集成的插件 Amazon,则可以使用该插件将凭据传递给 IDE 中运行的应用程序。有关更多信息,请参阅 Intelli J IDEA Amazon 工具包和适用于 Ecli
pse 的Amazon 工具包。
下载并检查 Apache Flink 流式处理 Java 代码
此示例的 Java 应用程序代码可从中获得 GitHub。要下载应用程序代码,请执行以下操作:
-
使用以下命令克隆远程存储库:
git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git -
导航到
amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted目录。
查看应用程序组件
该应用程序完全是在com.amazonaws.services.msf.BasicStreamingJob课堂上实现的。该main()方法定义了用于处理和运行流数据的数据流。
注意
为了优化开发者体验,该应用程序设计为无需更改任何代码即可在适用于 Apache Flink 的亚马逊托管服务上运行,也可在本地运行,以便在您的 IDE 中进行开发。
-
要读取运行时配置,使其在适用于 Apache Flink 的 Amazon 托管服务和 IDE 中运行时能够正常运行,应用程序会自动检测它是否在 IDE 中本地独立运行。在这种情况下,应用程序加载运行时配置的方式会有所不同:
-
当应用程序检测到自己在 IDE 中以独立模式运行时,请
application_properties.json生成包含在项目资源文件夹中的文件。文件内容如下。 -
当应用程序在适用于 Apache Flink 的亚马逊托管服务中运行时,默认行为会根据您将在适用于 Apache Flink 的亚马逊托管服务 Flink 应用程序中定义的运行时属性加载应用程序配置。请参阅创建和配置适用于 Apache Flink 的托管服务 Flink 应用程序。
private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (env instanceof LocalStreamEnvironment) { LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( BasicStreamingJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); return KinesisAnalyticsRuntime.getApplicationProperties(); } }
-
-
该
main()方法定义应用程序数据流并运行它。-
初始化默认的流媒体环境。在此示例中,我们展示了如何创建
StreamExecutionEnvironment要用于 DataSteam API 的,以及StreamTableEnvironment要用于 SQL 和表 API 的。这两个环境对象是对同一个运行时环境的两个单独引用,用法不同 APIs。StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -
加载应用程序配置参数。这将自动从正确的位置加载它们,具体取决于应用程序的运行位置:
Map<String, Properties> applicationParameters = loadApplicationProperties(env); -
该应用程序使用 Kinesis Cons
umer 连接器定义一个源,用于从输入流中读取数据。输入流的配置在 PropertyGroupId= 中定义InputStream0。直播的名称和区域aws.region分别位于名为stream.name和的属性中。为简单起见,此源将记录读取为字符串。private static FlinkKinesisConsumer<String> createSource(Properties inputProperties) { String inputStreamName = inputProperties.getProperty("stream.name"); return new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties); } ... public static void main(String[] args) throws Exception { ... SourceFunction<String> source = createSource(applicationParameters.get("InputStream0")); DataStream<String> input = env.addSource(source, "Kinesis Source"); ... } -
然后,应用程序使用 Kinesis Streams Sink 连接器定义接收器,将数据发送到输出流
。输出流名称和区域在 PropertyGroupId= 中定义OutputStream0,与输入流类似。接收器直接连接到从源获取数据的内部DataStream。在真实的应用程序中,你需要在源和接收器之间进行一些转换。private static KinesisStreamsSink<String> createSink(Properties outputProperties) { String outputStreamName = outputProperties.getProperty("stream.name"); return KinesisStreamsSink.<String>builder() .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema()) .setStreamName(outputStreamName) .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) .build(); } ... public static void main(String[] args) throws Exception { ... Sink<String> sink = createSink(applicationParameters.get("OutputStream0")); input.sinkTo(sink); ... } -
最后,运行刚才定义的数据流。在定义了数据流所需的所有运算符之后,这必须是该
main()方法的最后一条指令:env.execute("Flink streaming Java API skeleton");
-
使用 pom.xml 文件
pom.xml 文件定义了应用程序所需的所有依赖关系,并设置 Maven Shade 插件来构建包含 Flink 所需的所有依赖项的 fat-jar。
-
有些依赖关系有
provided作用域。当应用程序在适用于 Apache Flink 的亚马逊托管服务中运行时,这些依赖关系将自动可用。它们是编译应用程序或在 IDE 中本地运行应用程序所必需的。有关更多信息,请参阅 在本地运行应用程序。确保您使用的 Flink 版本与您将在适用于 Apache Flink 的亚马逊托管服务中使用的运行时版本相同。<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
-
您必须使用默认作用域向 pom 添加其他 Apache Flink 依赖项,例如此应用程序使用的 K inesis 连接器
。有关更多信息,请参阅 使用 Apache Flink 连接器。您还可以添加应用程序所需的任何其他 Java 依赖项。 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>${aws.connector.version}</version> </dependency>
-
Maven Java 编译器插件确保代码是根据 Java 11 编译的,Java 11 是 Apache Flink 目前支持的 JDK 版本。
-
Maven Shade 插件打包了 fat-jar,但不包括运行时提供的一些库。它还指定了两个变压器:
ServicesResourceTransformer和ManifestResourceTransformer。后者配置包含启动应用程序的main方法的类。如果你重命名了主类,别忘了更新这个转换器。 -
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass> </transformer> ... </plugin>
将样本记录写入输入流
在本节中,您将向流发送示例记录以供应用程序处理。您可以通过两种方式生成示例数据,要么使用 Python 脚本,要么使用 Kinesis 数据
使用 Python 脚本生成示例数据
您可以使用 Python 脚本将示例记录发送到数据流。
注意
要运行这个 Python 脚本,你必须使用 Python 3.x 并安装Amazon 适用于 Python 的 SDK (Boto)
要开始向 Kinesis 输入流发送测试数据,请执行以下操作:
-
从数据生成器 GitHub 存储库
下载数据生成器 stock.pyPython 脚本。 -
运行
stock.py脚本:$ python stock.py
在完成本教程的其余部分的同时,请保持脚本运行。现在你可以运行你的 Apache Flink 应用程序了。
使用 Kinesis 数据生成器生成示例数据
除了使用 Python 脚本之外,您还可以使用 Kinesis 数据生成器
要设置和运行 Kinesis 数据生成器,请执行以下操作:
-
按照 Kinesis 数据生成器文档
中的说明设置该工具的访问权限。您将运行一个用于设置用户和密码的 Amazon CloudFormation 模板。 -
通过模板生成的网址访问 Kinesis 数据生成器。 CloudFormation CloudFormation 模板完成后,您可以在 “输出” 选项卡中找到 URL。
-
配置数据生成器:
-
区域:选择您在本教程中使用的区域:us-east-1
-
直播/传送流:选择应用程序将使用的输入流:
ExampleInputStream -
每秒记录数:100
-
录制模板:复制并粘贴以下模板:
{ "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
-
-
测试模板:选择测试模板并验证生成的记录是否与以下内容类似:
{ "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 } -
启动数据生成器:选择 “选择发送数据”。
Kinesis 数据生成器现在正在向... 发送数据。ExampleInputStream
在本地运行应用程序
您可以在 IDE 中本地运行和调试 Flink 应用程序。
注意
在继续操作之前,请验证输入和输出流是否可用。请参阅创建两个 Amazon Kinesis 数据流。此外,请确认您有权从两个流中读取和写入数据。请参阅对您的 Amazon 会话进行身份验证。
设置本地开发环境需要 Java 11 JDK、Apache Maven 和 IDE 来进行 Java 开发。确认您满足所需的先决条件。请参阅满足完成练习的先决条件。
将 Java 项目导入你的 IDE
要开始在 IDE 中使用该应用程序,必须将其作为 Java 项目导入。
您克隆的存储库包含多个示例。每个示例都是一个单独的项目。在本教程中,请将./java/GettingStarted子目录中的内容导入到 IDE 中。
使用 Maven 将代码作为现有 Java 项目插入。
注意
导入新 Java 项目的确切过程因所使用的 IDE 而异。
检查本地应用程序配置
在本地运行时,应用程序使用下项目资源文件夹中application_properties.json文件中的配置./src/main/resources。您可以编辑此文件以使用不同的 Kinesis 直播名称或区域。
[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]
设置 IDE 运行配置
您可以像运行任何 Java 应用程序一样,通过运行主类com.amazonaws.services.msf.BasicStreamingJob直接从 IDE 运行和调试 Flink 应用程序。在运行应用程序之前,必须设置运行配置。设置取决于您使用的 IDE。例如,请参阅 IntelliJ ID EA 文档中的运行/调试配置
-
将@@
provided依赖项添加到类路径中。这是确保在本地运行时将具有provided作用域的依赖关系传递给应用程序所必需的。如果不进行此设置,应用程序会立即显示class not found错误。 -
将访问 Kinesis 直播的 Amazon 凭证传递给应用程序。最快的方法是使用 IntelliJ IDEA Amazon 工具包
。在 “运行” 配置中使用此 IDE 插件,可以选择特定的 Amazon 配置文件。 Amazon 使用此配置文件进行身份验证。您无需直接传递 Amazon 证书。 -
验证 IDE 是否使用 JDK 11 运行应用程序。
在 IDE 中运行该应用程序
为设置运行配置后BasicStreamingJob,您可以像常规 Java 应用程序一样运行或调试它。
注意
你不能直接java -jar
...从命令行运行 Maven 生成的 fat-jar。此 jar 不包含独立运行应用程序所需的 Flink 核心依赖项。
当应用程序成功启动时,它会记录一些有关独立微型集群和连接器初始化的信息。接下来是 Flink 通常在应用程序启动时发出的许多信息和一些警告日志。
13:43:31,405 INFO com.amazonaws.services.msf.BasicStreamingJob [] - Loading application properties from 'flink-application-properties-dev.json' 13:43:31,549 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Flink Kinesis Consumer is going to read the following streams: ExampleInputStream, 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb. 13:43:31,677 INFO org.apache.flink.runtime.minicluster.MiniCluster [] - Starting Flink Mini Cluster ....
初始化完成后,应用程序不会再发出任何日志条目。当数据流动时,不会发出任何日志。
要验证应用程序是否正确处理数据,您可以检查输入和输出 Kinesis 流,如下一节所述。
注意
不发出有关流动数据的日志是 Flink 应用程序的正常行为。在每条记录上发出日志可能便于调试,但在生产环境中运行时可能会增加大量开销。
观察 Kinesis 流中的输入和输出数据
您可以使用 Amazon Kinesis 控制台中的数据查看器观察(生成示例 Python)或 Kinesis 数据生成器(链接)发送到输入流的记录。
观察记录
-
确认该区域与您运行本教程的区域相同,默认为 us-east-1 美国东部(弗吉尼亚北部)。如果区域不匹配,请更改区域。
-
选择 “数据流”。
-
选择您要观看的直播,
ExampleInputStream或者是ExampleOutputStream. -
选择 “数据查看器” 选项卡。
-
选择任意碎片,保持 “最新” 作为起始位置,然后选择 “获取记录”。您可能会看到 “未找到该请求的记录” 错误。如果是,请选择 “重试获取记录”。发布到直播显示屏的最新记录。
-
在 “数据” 列中选择值以检查 JSON 格式的记录内容。
停止应用程序在本地运行
停止应用程序在 IDE 中运行。IDE 通常会提供 “停止” 选项。确切的位置和方法取决于您使用的 IDE。
编译并打包您的应用程序代码
在本节中,您将使用 Apache Maven 编译 Java 代码并将其打包到 JAR 文件中。您可以使用 Maven 命令行工具或 IDE 来编译和打包代码。
要使用 Maven 命令行进行编译和打包,请执行以下操作:
移至包含 Java GettingStarted 项目的目录并运行以下命令:
$ mvn package
要使用 IDE 进行编译和打包,请执行以下操作:
mvn package从你的 IDE Maven 集成中运行。
在这两种情况下,都会创建以下 JAR 文件:target/amazon-msf-java-stream-app-1.0.jar。
注意
从 IDE 运行 “构建项目” 可能无法创建 JAR 文件。
上传应用程序代码 JAR 文件
在本节中,您将您在上一节中创建的 JAR 文件上传到您在本教程开始时创建的亚马逊简单存储服务 (Amazon S3) Simple Service 存储桶。如果您尚未完成此步骤,请参阅(链接)。
上传应用程序代码 JAR 文件
打开 Amazon S3 控制台,网址为 https://console.aws.amazon.com/s3/
。 -
选择您之前为应用程序代码创建的存储桶。
-
选择上传。
-
选择 Add files。
-
导航到在上一步中生成的 JAR 文件:
target/amazon-msf-java-stream-app-1.0.jar。 -
在不更改任何其他设置的情况下选择 “上传”。
警告
确保在中选择了正确的 JAR 文件<repo-dir>/java/GettingStarted/target/amazon-msf-java-stream-app-1.0.jar。
该target目录还包含您无需上传的其他 JAR 文件。
创建和配置适用于 Apache Flink 的托管服务 Flink 应用程序
您可以使用控制台或 Amazon CLI创建和运行适用于 Apache Flink 的托管服务的应用程序。在本教程中,您将使用控制台。
注意
当您使用控制台创建应用程序时,系统会为您创建您的 Amazon Identity and Access Management (IAM) 和 A CloudWatch mazon Logs 资源。使用创建应用程序时 Amazon CLI,可以单独创建这些资源。
创建应用程序
创建应用程序
登录并通过 /f Amazon Web Services Management Console link 打开亚马逊 MSF 控制台。 https://console.aws.amazon.com
-
确认选择了正确的区域:us-east-1 美国东部(弗吉尼亚北部)
-
打开右侧的菜单,选择 Apache Flink 应用程序,然后选择 “创建流媒体应用程序”。或者,在初始页面的 “入门” 容器中选择 “创建流媒体应用程序”。
-
在 “创建流媒体应用程序” 页面上:
-
选择设置流处理应用程序的方法:选择从头开始创建。
-
Apache Flink 配置,Application Flink 版本:选择 Ap ache Flink 1.20。
-
-
配置您的应用程序
-
应用程序名称:输入
MyApplication。 -
描述:输入
My java test app。 -
访问应用程序资源:选择使用所需策略创建/更新 IAM 角色
kinesis-analytics-MyApplication-us-east-1。
-
-
为应用程序设置配置模板
-
模板:选择 “开发”。
-
-
选择页面底部的 “创建流媒体应用程序”。
注意
在使用控制台创建应用程序的 Managed Service for Apache Flink时,您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源是使用您的应用程序名称和区域命名的,如下所示:
-
策略:
kinesis-analytics-service-MyApplication-us-east-1 -
角色:
kinesisanalytics-MyApplication-us-east-1
适用于 Apache Flink 的亚马逊托管服务以前被称为 Kinesis Data Analytics。为了向后兼容,自动创建的资源的名称前缀kinesis-analytics-为。
编辑 IAM 策略
编辑 IAM policy 以添加访问 Kinesis 数据流的权限。
编辑政策
使用 https://console.aws.amazon.com/iam/
打开 IAM 控制台。 -
选择策略。选择控制台在上一部分中为您创建的
kinesis-analytics-service-MyApplication-us-east-1策略。 -
选择 “编辑”,然后选择 “JSON” 选项卡。
-
将以下策略示例中突出显示的部分添加到策略中。将示例账户 IDs (
012345678901) 替换为您的账户 ID。 -
选择页面底部的 “下一步”,然后选择 “保存更改”。
配置应用程序
编辑应用程序配置以设置应用程序代码构件。
编辑配置
-
在MyApplication页面上,选择配置。
-
在应用程序代码位置部分:
-
对于 Amazon S3 存储桶,请选择您之前为应用程序代码创建的存储桶。选择 “浏览” 并选择正确的存储桶,然后选择 “选择”。请勿点击存储桶名称。
-
在 Amazon S3 对象的路径中,输入
amazon-msf-java-stream-app-1.0.jar。
-
-
对于访问权限,请选择使用所需策略创建/更新 IAM 角色
kinesis-analytics-MyApplication-us-east-1。 -
在 “运行时属性” 部分中,添加以下属性。
-
选择 “添加新项目”,然后添加以下每个参数:
组 ID 键 值 InputStream0stream.nameExampleInputStreamInputStream0aws.regionus-east-1OutputStream0stream.nameExampleOutputStreamOutputStream0aws.regionus-east-1 -
请勿修改任何其他部分。
-
选择保存更改。
注意
当您选择启用 Amazon CloudWatch 日志时,适用于 Apache Flink 的托管服务会为您创建一个日志组和日志流。这些资源的名称如下所示:
-
日志组:
/aws/kinesis-analytics/MyApplication -
日志流:
kinesis-analytics-log-stream
运行应用程序
应用程序现已配置完毕,可以运行了。
运行应用程序
-
在适用于 Apache Flink 的亚马逊托管服务的控制台上,选择 “我的应用程序”,然后选择 “运行”。
-
在下一页的应用程序还原配置页面上,选择使用最新快照运行,然后选择运行。
“应用程序状态” 详细信息会从
Ready到,Starting然后转换到应用程序启动Running时。
当应用程序处于Running状态时,您现在可以打开 Flink 控制面板。
打开 控制面板
-
选择 “打开 Apache Flink 控制面板”。仪表板将在新页面上打开。
-
在 “正在运行的作业” 列表中,选择您可以看到的单个作业。
注意
如果您设置了 Runtime 属性或编辑了 IAM 策略不正确,则应用程序状态可能会变为
Running,但是 Flink 控制面板显示任务正在持续重启。如果应用程序配置错误或缺乏访问外部资源的权限,则通常会出现这种故障。发生这种情况时,请查看 Flink 控制面板中的 “异常” 选项卡以查看问题的原因。
观察正在运行的应用程序的指标
在该MyApplication页面的 Amazon CloudWatch 指标部分,您可以看到正在运行的应用程序中的一些基本指标。
查看指标
-
在 “刷新” 按钮旁边,从下拉列表中选择 10 秒。
-
当应用程序运行且运行正常时,您可以看到正常运行时间指标不断增加。
-
完全重启指标应为零。如果它增加,则配置可能会出现问题。要调查问题,请查看 Flink 控制面板上的 “异常” 选项卡。
-
在运行良好的应用程序中,失败的检查点数指标应为零。
注意
此仪表板显示一组固定的指标,粒度为 5 分钟。您可以使用仪表板中的任何指标创建自定义应用程序 CloudWatch 控制面板。
观察 Kinesis 直播中的输出数据
确保您仍在使用 Python 脚本或 Kinesis 数据生成器将数据发布到输入中。
现在,您可以使用中的数据查看器来观察在 Apache Flink 托管服务上运行的应用程序的输出 https://console.aws.amazon.com/kinesis/
查看输出
-
确认该区域与您运行本教程时使用的区域相同。默认情况下,它是 US-East-1US 东部(弗吉尼亚北部)。如有必要,请更改区域。
-
选择 “数据流”。
-
选择要观看的直播。在本教程中,请使用
ExampleOutputStream。 -
选择 “数据查看器” 选项卡。
-
选择任意碎片,保持 “最新” 作为起始位置,然后选择 “获取记录”。您可能会看到 “未找到该请求的记录” 错误。如果是,请选择 “重试获取记录”。发布到直播显示屏的最新记录。
-
在 “数据” 列中选择值以检查 JSON 格式的记录内容。
停止应用程序
要停止应用程序,请转到名为的 Apache Flink 托管服务应用程序的控制台页面。MyApplication
停止应用程序
-
从 “操作” 下拉列表中,选择 “停止”。
-
应用程序详细信息中的状态从
Running变为Stopping,然后转换到应用程序完全停止Ready时。注意
别忘了停止从 Python 脚本或 Kinesis 数据生成器向输入流发送数据。