Amazon Managed Service for Apache Flink(Amazon MSF)之前称为 Amazon Kinesis Data Analytics for Apache Flink。
创建并运行适用于 Apache Flink 的托管服务应用程序
在此步骤中,您将创建 Managed Service for Apache Flink 应用程序,并将 Kinesis 数据流作为源和接收器。
本节包含以下步骤:
创建相关资源
在本练习中,创建Managed Service for Apache Flink的应用程序之前,您需要创建以下从属资源:
-
两个 Kinesis 流用于输入和输出
-
存储应用程序代码的 Amazon S3 存储桶
注意
本教程假设您在 us-east-1 美国东部(弗吉尼亚州北部)区域部署应用程序。如果您使用其他区域,请相应地调整所有步骤。
创建两个 Amazon Kinesis 数据流
在为本练习创建 Managed Service for Apache Flink 应用程序之前,请创建两个 Kinesis 数据流(ExampleInputStream和ExampleOutputStream)。您的应用程序将这些数据流用于应用程序源和目标流。
可以使用 Amazon Kinesis 控制台或以下 Amazon CLI 命令创建这些流。有关控制台说明,请参阅 Amazon Kinesis Data Streams 开发人员指南中的创建和更新数据流。要使用 Amazon CLI 创建流,请使用以下命令,并且根据应用程序使用的区域进行调整。
创建数据流 (Amazon CLI)
-
要创建第一个流(
ExampleInputStream),请使用以下 Amazon Kinesiscreate-streamAmazon CLI 命令。$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 \ -
要创建应用程序用来写入输出的第二个流,请运行同一命令(将流名称更改为
ExampleOutputStream):$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1 \
为应用程序代码创建一个 Amazon S3 存储桶
您可以使用控制台来创建 Amazon S3 存储桶。要了解如何使用控制台创建 Amazon S3 存储桶,请参阅 Amazon S3 用户指南中的创建存储桶。使用全局唯一名称命名 Amazon S3 存储桶,例如通过附加登录名。
注意
请确保在用于本教程的区域(us-east-1)中创建存储桶。
其他资源
在您创建应用程序时,Managed Service for Apache Flink 会自动创建以下 Amazon CloudWatch 资源(如果这些资源尚不存在):
-
名为
/AWS/KinesisAnalytics-java/<my-application>的日志组 -
名为
kinesis-analytics-log-stream的日志流
设置本地开发环境
对于开发和调试,您可以直接从所选的 IDE 在计算机上运行 Apache Flink 应用程序。任何 Apache Flink 依赖项都像使用 Apache Maven 的常规 Java 依赖项一样进行处理。
注意
在开发计算机上,必须安装 Java JDK 11、Maven 和 Git。我们建议您使用开发环境(如 Eclipse Java Neon
对您的 Amazon 会话进行身份验证
该应用程序使用 Kinesis 数据流来发布数据。在本地运行时,您必须拥有经过身份验证的有效 Amazon 会话,并具有写入 Kinesis 数据流的权限。按照以下步骤对会话进行身份验证:
-
如果您没有 Amazon CLI 和已配置有效凭证的命名配置文件,请参阅 设置 Amazon Command Line Interface (Amazon CLI)。
-
通过发布以下测试记录,验证您的 Amazon CLI 是否正确配置,并且您的用户有权写入 Kinesis 数据流:
$ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST -
如果您的 IDE 有集成 Amazon 的插件,则可以使用该插件将凭证传递给 IDE 中运行的应用程序。有关更多信息,请参阅适用于 IntelliJ IDEA 的 Amazon Toolkit
和适用于 Eclipse 的 Amazon Toolkit。
下载并检查 Apache Flink 流式处理 Java 代码
在 GitHub 中提供了该示例的 Java 应用程序代码。要下载应用程序代码,请执行以下操作:
-
使用以下命令克隆远程存储库:
git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git -
导航到
amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted目录。
审核应用程序组件
该应用程序完全在 com.amazonaws.services.msf.BasicStreamingJob 类中实施。main() 方法定义用于处理流数据的数据流程并运行它。
注意
为优化开发人员体验,该应用程序设计为无需更改任何代码即可同时在 Amazon Managed Service for Apache Flink 上和本地运行,以便在您的 IDE 中进行开发。
-
要读取运行时配置,使其在 Amazon Managed Service for Apache Flink 和 IDE 中能够正常运行,应用程序会自动检测它是否在 IDE 中本地独立运行。在这种情况下,应用程序加载运行时配置的方式会有所不同:
-
当应用程序检测到其在 IDE 中以独立模式运行时,会形成包含在项目 resources 文件夹中的
application_properties.json文件。该文件的内容如下所示。 -
当应用程序在 Amazon Managed Service for Apache Flink 中运行时,默认行为会根据您将在 Amazon Managed Service for Apache Flink 应用程序中定义的运行时属性加载应用程序配置。请参阅 创建并配置 Managed Service for Apache Flink 应用程序 。
private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (env instanceof LocalStreamEnvironment) { LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( BasicStreamingJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); return KinesisAnalyticsRuntime.getApplicationProperties(); } }
-
-
main()方法定义应用程序数据流程并运行它。-
初始化默认的流环境。在此示例中,我们将展示如何创建将与 DataSteam API 结合使用的
StreamExecutionEnvironment以及将与 SQL 和 Table API 结合使用的StreamTableEnvironment。为使用不同的 API,这两个环境对象是对同一个运行时环境的两个单独引用。StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -
加载应用程序配置参数。这将自动从正确的位置加载这些参数,具体取决于应用程序的运行位置:
Map<String, Properties> applicationParameters = loadApplicationProperties(env); -
该应用程序使用 Kinesis Consumer
连接器定义一个源,用于从输入流中读取数据。输入流的配置以 PropertyGroupId=InputStream0的方式定义。流的名称和区域分别位于名为stream.name和aws.region的属性中。为简单起见,此源将记录读取为字符串。private static FlinkKinesisConsumer<String> createSource(Properties inputProperties) { String inputStreamName = inputProperties.getProperty("stream.name"); return new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties); } ... public static void main(String[] args) throws Exception { ... SourceFunction<String> source = createSource(applicationParameters.get("InputStream0")); DataStream<String> input = env.addSource(source, "Kinesis Source"); ... } -
然后,应用程序使用 Kinesis Streams Sink
连接器定义接收器,以将数据发送到输出流。与输入流类似,输出流的名称和区域以 PropertyGroupId=OutputStream0的方式定义。接收器直接连接到从源获取数据的内部DataStream。在实际应用程序中,需要在源和接收器之间进行一些转换。private static KinesisStreamsSink<String> createSink(Properties outputProperties) { String outputStreamName = outputProperties.getProperty("stream.name"); return KinesisStreamsSink.<String>builder() .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema()) .setStreamName(outputStreamName) .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) .build(); } ... public static void main(String[] args) throws Exception { ... Sink<String> sink = createSink(applicationParameters.get("OutputStream0")); input.sinkTo(sink); ... } -
最后,运行刚才定义的数据流程。定义数据流程所需的所有运算符之后,这必须是
main()方法的最后一条指令:env.execute("Flink streaming Java API skeleton");
-
使用 pom.xml 文件
pom.xml 文件定义应用程序所需的所有依赖项,并且设置 Maven Shade 插件来构建包含 Flink 所需的所有依赖项的 fat-jar。
-
有些依赖项具有
provided范围。当应用程序在 Amazon Managed Service for Apache Flink 中运行时,这些依赖项自动可用。它们是编译应用程序或在 IDE 中本地运行应用程序所必需的依赖项。有关更多信息,请参阅 在本地运行应用程序。请确保使用的 Flink 版本与 Amazon Managed Service for Apache Flink 中使用的运行时版本相同。<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
-
必须使用默认范围向 pom 添加其他 Apache Flink 依赖项,例如此应用程序使用的 Kinesis 连接器
。有关更多信息,请参阅 使用 Apache Flink 连接器。您还可以添加应用程序所需的任何其他 Java 依赖项。 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>${aws.connector.version}</version> </dependency>
-
Maven Java 编译器插件确保根据 Java 11 编译代码,Java 11 是 Apache Flink 目前支持的 JDK 版本。
-
Maven Shade 插件打包 fat-jar,但不包括运行时提供的一些库。它还指定两个转换器:
ServicesResourceTransformer和ManifestResourceTransformer。后者配置包含启动应用程序的main方法的类。如果重命名主类,请不要忘记更新此转换器。 -
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass> </transformer> ... </plugin>
将示例记录写入输入流
在本节中,使用 Python 脚本将示例记录写入流,以供应用程序处理。您可以通过两种方式生成示例数据,即使用 Python 脚本或使用 Kinesis Data Generator
使用 Python 脚本生成示例数据
您可以使用 Python 脚本将示例记录发送到数据流。
注意
要运行此 Python 脚本,必须使用 Python 3.x 并安装适用于 Python 的 Amazon SDK(Boto)
要开始向 Kinesis 输入流发送测试数据,请执行以下操作:
-
从数据生成器 GitHub 存储库
下载数据生成器 stock.pyPython 脚本。 -
运行
stock.py脚本:$ python stock.py
在完成本教程的其余部分时,请将脚本保持运行状态。您现在可以运行 Apache Flink 应用程序。
使用 Kinesis Data Generator 生成示例数据
除了使用 Python 脚本之外,还可以使用 Kinesis Data Generator
要设置和运行 Kinesis Data Generator,请执行以下操作:
-
按照 Kinesis Data Generator 文档
中的说明设置该工具的访问权限。您将运行用于设置用户和密码的 Amazon CloudFormation 模板。 -
通过 CloudFormation 模板生成的 URL 访问 Kinesis Data Generator。完成 CloudFormation 模板后,您可以在输出选项卡中找到该 URL。
-
配置数据生成器:
-
区域:选择您在本教程中使用的区域:us-east-1
-
流/传输流:选择应用程序将使用的输入流:
ExampleInputStream -
每秒记录数:100
-
记录模板:复制并粘贴以下模板:
{ "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
-
-
测试模板:选择测试模板并验证生成的记录是否与以下内容类似:
{ "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 } -
启动数据生成器:选择选择发送数据。
Kinesis Data Generator 现在向 ExampleInputStream 发送数据。
在本地运行应用程序
您可以在 IDE 中本地运行和调试 Flink 应用程序。
注意
在继续之前,请确认输入和输出流是否可用。请参阅 创建两个 Amazon Kinesis 数据流。此外,请确认您是否具有从两个流中读取和写入的权限。请参阅 对您的 Amazon 会话进行身份验证。
设置本地开发环境需要 Java 11 JDK、Apache Maven 和 IDE 来进行 Java 开发。确认您满足所需的先决条件。请参阅 满足完成练习的先决条件。
将 Java 项目导入您的 IDE
要开始在 IDE 中使用该应用程序,必须将其作为 Java 项目导入。
您克隆的存储库包含多个示例。每个示例都是一个单独的项目。在本教程中,请将 ./java/GettingStarted 子目录中的内容导入 IDE。
使用 Maven 将代码作为现有 Java 项目插入。
注意
导入新 Java 项目的确切过程因所使用的 IDE 而异。
检查本地应用程序配置
在本地运行时,应用程序使用 ./src/main/resources 下项目资源文件夹中的 application_properties.json 文件内的配置。您可以编辑此文件以使用不同的 Kinesis 流名称或区域。
[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]
设置 IDE 运行配置
您可以像运行任何 Java 应用程序一样,通过运行主类 com.amazonaws.services.msf.BasicStreamingJob 直接从 IDE 运行和调试 Flink 应用程序。运行应用程序前,必须设置“运行”配置。该设置取决于您使用的 IDE。例如,请参阅 IntelliJ IDEA 文档中的运行/调试配置
-
将
provided依赖项添加到类路径中。这是确保在本地运行时将具有provided范围的依赖项传递给应用程序所必需的条件。如果不进行此设置,应用程序会立即显示class not found错误。 -
将访问 Kinesis 流的 Amazon 凭证传递给应用程序。最快的方法是使用适用于 IntelliJ IDEA 的 Amazon Toolkit
。在“运行”配置中使用此 IDE 插件,可以选择特定的 Amazon 配置文件。 Amazon 使用此配置文件进行身份验证。您无需直接传递 Amazon 凭证。 -
验证 IDE 是否使用 JDK 11 运行应用程序。
在 IDE 中运行应用程序
设置 BasicStreamingJob 的“运行”配置后,您可以像常规 Java 应用程序一样运行或调试它。
注意
不能从命令行使用 java -jar
... 直接运行 Maven 生成的 fat-jar。此 jar 不包含独立运行应用程序所需的 Flink 核心依赖项。
当应用程序成功启动时,它会记录一些有关独立微型集群和连接器初始化的信息。接下来是 Flink 通常在应用程序启动时发出的许多信息和一些警告日志。
13:43:31,405 INFO com.amazonaws.services.msf.BasicStreamingJob [] - Loading application properties from 'flink-application-properties-dev.json' 13:43:31,549 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Flink Kinesis Consumer is going to read the following streams: ExampleInputStream, 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb. 13:43:31,677 INFO org.apache.flink.runtime.minicluster.MiniCluster [] - Starting Flink Mini Cluster ....
初始化完成后,应用程序不会再发出任何日志条目。当数据流动时,不会发出任何日志。
要验证应用程序是否正确处理数据,您可以检查输入和输出 Kinesis 流,如下一节所述。
注意
Flink 应用程序的正常行为是不发出有关流动数据的日志。在每条记录上发出日志可能便于调试,但在生产环境中运行时可能会增加大量开销。
观察 Kinesis 流中的输入和输出数据
您可以使用 Amazon Kinesis 控制台中的数据查看器观察由(生成示例 Python)或 Kinesis Data Generator(链接)发送到输入流的记录。
观察记录
打开 Kinesis 控制台,网址为:https://console.aws.amazon.com/kinesis。
-
验证区域是否与运行本教程的区域相同,默认为 us-east-1 美国东部(弗吉尼亚州北部)。如果区域不匹配,请更改区域。
-
选择数据流。
-
选择您要观察的流,即
ExampleInputStream或ExampleOutputStream. -
选择数据查看器选项卡。
-
选择任意分片,保持最新作为起始位置,然后选择获取记录。您可能会看到“未找到该请求的记录”错误。如果看到此错误,请选择重试获取记录。发布到流显示的最新记录。
-
在“数据”列中选择值以检查 JSON 格式的记录内容。
停止在本地运行的应用程序
停止在 IDE 中运行的应用程序。IDE 通常会提供“停止”选项。确切的位置和方法取决于您使用的 IDE。
编译并打包您的应用程序代码
在本节中,您将使用 Apache Maven 编译 Java 代码并将其打包到 JAR 文件中。您可以使用 Maven 命令行工具或 IDE 编译和打包代码。
要使用 Maven 命令行进行编译和打包,请执行以下操作:
移至包含 Java GettingStarted 项目的目录,然后运行以下命令:
$ mvn package
要使用 IDE 进行编译和打包,请执行以下操作:
从 IDE Maven 集成中运行 mvn package。
在这两种情况下,都会创建以下 JAR 文件:target/amazon-msf-java-stream-app-1.0.jar。
注意
从 IDE 运行“构建项目”可能无法创建 JAR 文件。
上传应用程序代码 JAR 文件
在本节中,您要将在上一节中创建的 JAR 文件上传到在本教程开始时创建的 Amazon Simple Storage Service(Amazon S3)存储桶中。如果您尚未完成此步骤,请参阅(链接)。
上传应用程序代码 JAR 文件
通过以下网址打开 Amazon S3 控制台:https://console.aws.amazon.com/s3/。
-
选择您之前为应用程序代码创建的存储桶。
-
选择上传。
-
选择添加文件。
-
导航到上一步中生成的 JAR 文件:
target/amazon-msf-java-stream-app-1.0.jar。 -
在不更改任何其他设置的情况下选择上传。
警告
确保在 <repo-dir>/java/GettingStarted/target/amazon-msf-java-stream-app-1.0.jar 中选择正确的 JAR 文件。
target 目录还包含您无需上传的其他 JAR 文件。
创建并配置 Managed Service for Apache Flink 应用程序
您可以使用控制台或 Amazon CLI 创建和运行适用于 Apache Flink 的托管服务的应用程序。在本教程中,您将使用控制台。
注意
当您使用控制台创建应用程序时,将创建您的 Amazon Identity and Access Management(IAM) 和 Amazon CloudWatch Logs 资源。当您使用 Amazon CLI 创建应用程序时,您可以单独创建这些资源。
创建应用程序
创建应用程序
登录 Amazon Web Services 管理控制台 并打开 Amazon MSF 控制台,网址为 https://console.aws.amazon.com/flink。
-
确认选择正确的区域:us-east-1 美国东部(弗吉尼亚北部)
-
打开右侧的菜单,选择 Apache Flink 应用程序,然后选择创建流应用程序。或者,在初始页面的“入门”容器中选择创建流应用程序。
-
在创建流应用程序页面上:
-
选择设置流处理应用程序的方法:选择从头开始创建。
-
Apache Flink 配置,应用程序 Flink 版本:选择 Apache Flink 1.20。
-
-
配置应用程序
-
应用程序名称:输入
MyApplication。 -
描述:输入
My java test app。 -
访问应用程序资源:选择使用所需策略创建/更新 IAM 角色
kinesis-analytics-MyApplication-us-east-1。
-
-
配置用于应用程序设置的模板
-
模板:选择开发。
-
-
选择页面底部的创建流应用程序。
注意
在使用控制台创建应用程序的 Managed Service for Apache Flink时,您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源是使用您的应用程序名称和区域命名的,如下所示:
-
策略:
kinesis-analytics-service-MyApplication-us-east-1 -
角色:
kinesisanalytics-MyApplication-us-east-1
Amazon Managed Service for Apache Flink 之前称为 Kinesis Data Analytics。为实现向后兼容,自动创建的资源名称前缀为 kinesis-analytics-。
编辑 IAM 策略
编辑 IAM policy 以添加访问 Kinesis 数据流的权限。
编辑策略
通过 https://console.aws.amazon.com/iam/ 打开 IAM 控制台。
-
选择策略。选择控制台在上一部分中为您创建的
kinesis-analytics-service-MyApplication-us-east-1策略。 -
选择编辑,然后选择 JSON 选项卡。
-
将以下策略示例中突出显示的部分添加到策略中。将示例账户 ID (
012345678901) 替换为您的账户 ID。 -
选择页面底部的下一步,然后选择保存更改。
配置应用程序
编辑应用程序配置以设置应用程序代码构件。
编辑配置
-
在我的应用程序 页面上,选择配置。
-
在应用程序代码位置部分中:
-
对于 Amazon S3 存储桶,请选择之前为应用程序代码创建的存储桶。选择浏览并选择正确的存储桶,然后选择选择。请勿单击存储桶名称。
-
在 Amazon S3 对象的路径中,输入
amazon-msf-java-stream-app-1.0.jar。
-
-
对于访问权限,请选择创建/更新具有必要策略的 IAM 角色
kinesis-analytics-MyApplication-us-east-1。 -
在运行时属性部分中,添加以下属性。
-
选择添加新项目并添加以下每个参数:
组 ID 键 值 InputStream0stream.nameExampleInputStreamInputStream0aws.regionus-east-1OutputStream0stream.nameExampleOutputStreamOutputStream0aws.regionus-east-1 -
请勿修改任何其他部分。
-
选择保存更改。
注意
在选择启用 Amazon CloudWatch 日志记录时,Managed Service for Apache Flink 将为您创建日志组和日志流。这些资源的名称如下所示:
-
日志组:
/aws/kinesis-analytics/MyApplication -
日志流:
kinesis-analytics-log-stream
运行应用程序
应用程序现已完成配置,准备好运行。
运行应用程序
-
在 Amazon Managed Service for Apache Flink 的控制台上,选择我的应用程序,然后选择运行。
-
在下一页的“应用程序还原配置”页面上,选择使用最新快照运行,然后选择运行。
应用程序详细信息中的状态会从
Ready转换到Starting,然后在应用程序启动时转换到Running。
当应用程序处于 Running 状态时,您现在可以打开 Flink 控制面板。
打开控制面板
-
选择打开 Apache Flink 控制面板。控制面板将在新页面上打开。
-
在正在运行的作业列表中,选择您可以看到的单个作业。
注意
如果您错误设置运行时属性或编辑 IAM 策略,则应用程序状态可能会变为
Running,但是 Flink 控制面板显示任务正在持续重新启动。如果应用程序配置错误或缺乏访问外部资源的权限,则通常会出现这种故障场景。发生这种情况时,请检查 Flink 控制面板中的异常选项卡以查看问题的原因。
观察运行中应用程序的指标
在我的应用程序页面的 Amazon CloudWatch 指标部分,您可以看到正在运行的应用程序的一些基本指标。
查看指标
-
在刷新按钮旁边,从下拉列表中选择 10 秒。
-
当应用程序运行且运行状况良好时,您可以看到正常运行时间指标不断增加。
-
完全重新启动指标应为零。如果该指标增加,则配置可能存在问题。要调查问题,请审查 Flink 控制面板上的异常选项卡。
-
在运行状况良好的应用程序中,失败的检查点数指标应为零。
注意
此控制面板显示一组固定的指标,且粒度为 5 分钟。您可以在 CloudWatch 控制面板中创建包含任何指标的自定义应用程序控制面板。
观察 Kinesis 流中的输出数据
确保您仍在使用 Python 脚本或 Kinesis Data Generator 将数据发布到输入中。
现在,您可以使用 https://console.aws.amazon.com/kinesis/ 中的数据查看器来观察在 Managed Service for Apache Flink 上运行的应用程序的输出,类似于之前执行的操作。
查看输出
打开 Kinesis 控制台,网址为:https://console.aws.amazon.com/kinesis。
-
确认该区域与您运行本教程时使用的区域相同。默认情况下,区域为 US-East-1US 东部(弗吉尼亚北部)。如有必要,可以更改区域。
-
选择数据流。
-
选择您要观察的流。在本教程中,请使用
ExampleOutputStream。 -
选择数据查看器选项卡。
-
选择任意分片,保持最新作为起始位置,然后选择获取记录。您可能会看到“未找到该请求的记录”错误。如果看到此错误,请选择重试获取记录。发布到流显示的最新记录。
-
在“数据”列中选择值以检查 JSON 格式的记录内容。
停止应用程序
要停止应用程序,请转至名为 MyApplication 的 Managed Service for Apache Flink 应用程序的控制台页面。
停止应用程序
-
从操作下拉列表中,选择停止。
-
应用程序详细信息中的状态会从
Running转换到Stopping,然后在应用程序完全停止时转换到Ready。注意
请勿忘记还要停止从 Python 脚本或 Kinesis Data Generator 向输入流发送数据。