创建并运行面向 Python 应用程序的 Managed Service for Apache Flink - Managed Service for Apache Flink
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Amazon Managed Service for Apache Flink(Amazon MSF)之前称为 Amazon Kinesis Data Analytics for Apache Flink。

创建并运行面向 Python 应用程序的 Managed Service for Apache Flink

在本节中,您将创建面向 Python 的 Managed Service for Apache Flink 应用程序,并将 Kinesis 流作为源和接收器。

创建相关资源

在本练习中创建 Managed Service for Apache Flink之前,您需要创建以下从属资源:

  • 两个 Kinesis 流用于输入和输出。

  • 存储应用程序代码的 Amazon S3 存储桶。

注意

本教程假设您在 us-east-1 区域中部署应用程序。如果您使用其他区域,则必须相应地调整所有步骤。

创建两个 Kinesis 流

在为本练习创建 Managed Service for Apache Flink 应用程序之前,请在要用于部署应用程序的同一区域(本示例中的 us-east-1)中创建两个 Kinesis 数据流(ExampleInputStreamExampleOutputStream)。您的应用程序将这些数据流用于应用程序源和目标流。

可以使用 Amazon Kinesis 控制台或以下 Amazon CLI 命令创建这些流。有关控制台说明,请参阅 Amazon Kinesis Data Streams 开发人员指南中的创建和更新数据流

创建数据流 (Amazon CLI)
  1. 要创建第一个流 (ExampleInputStream),请使用以下 Amazon Kinesis create-streamAmazon CLI命令。

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1
  2. 要创建应用程序用来写入输出的第二个流,请运行同一命令(将流名称更改为 ExampleOutputStream)。

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1

创建 Amazon S3 存储桶

您可以使用控制台来创建 Amazon S3 存储桶。有关创建该资源的说明,请参阅以下主题:

  • 《Amazon Simple Storage Service 用户指南》中的如何创建 S3 存储桶?。附加您的登录名,以便为 Amazon S3 存储桶指定全局唯一的名称。

    注意

    请确保在用于本教程的区域(us-east-1)中创建 S3 存储桶。

其他资源

在您创建应用程序时,Managed Service for Apache Flink 会创建以下 Amazon CloudWatch 资源(如果这些资源尚不存在):

  • 名为 /AWS/KinesisAnalytics-java/<my-application> 的日志组。

  • 名为 kinesis-analytics-log-stream 的日志流。

设置本地开发环境

为进行开发和调试,可以在自己的计算机上运行 Python Flink 应用程序。您可以使用 python main.py 或所选的 Python IDE 从命令行中启动应用程序。

注意

在您的开发计算机上,必须安装 Python 3.10 或 3.11、Java 11、Apache Maven 和 Git。我们建议您使用像 PyCharmVisual Studio Code 这样的 IDE。要验证您是否满足所有先决条件,请在继续操作之前参阅 满足完成练习的先决条件

要开发您的应用程序并在本地运行它,您必须安装 Flink Python 库。

  1. 使用 VirtualenV、Conda 或任何类似的 Python 工具创建独立的 Python 环境。

  2. 在该环境中安装 PyFlink 库。使用与 Amazon Managed Service for Apache Flink 中所使用相同的 Apache Flink 运行时版本。目前,建议的运行时版本为 1.19.1。

    $ pip install apache-flink==1.19.1
  3. 运行应用程序时,请确保环境处于激活状态。如果在 IDE 中运行应用程序,请确保 IDE 使用该环境作为运行时。该过程取决于您使用的 IDE。

    注意

    您只需要安装 PyFlink 库。您需在计算机上安装 Apache Flink 集群。

对您的 Amazon 会话进行身份验证

该应用程序使用 Kinesis 数据流来发布数据。在本地运行时,您必须拥有经过身份验证的有效 Amazon 会话,并具有写入 Kinesis 数据流的权限。按照以下步骤对会话进行身份验证:

  1. 如果您没有 Amazon CLI 和已配置有效凭证的命名配置文件,请参阅 设置 Amazon Command Line Interface (Amazon CLI)

  2. 通过发布以下测试记录,验证您的 Amazon CLI 是否正确配置,并且您的用户有权写入 Kinesis 数据流:

    $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
  3. 如果您的 IDE 有集成 Amazon 的插件,则可以使用该插件将凭证传递给 IDE 中运行的应用程序。有关更多信息,请参阅适用于 PyCharm 的 Amazon Toolkit适用于 Visual Studio Code 的 Amazon Toolkit适用于 IntelliJ IDEA 的 Amazon Toolkit

下载并检查 Apache Flink 流式处理 Python 代码

在 GitHub 中提供了该示例的 Python 应用程序代码。要下载应用程序代码,请执行以下操作:

  1. 使用以下命令克隆远程存储库:

    git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
  2. 导航到 ./python/GettingStarted 目录。

审核应用程序组件

应用程序代码位于 main.py 中。我们使用 Python 中嵌入的 SQL 来定义应用程序的流程。

注意

为优化开发人员体验,该应用程序设计为无需更改任何代码即可同时在 Amazon Managed Service for Apache Flink 上和本地运行,以便在您的计算机上进行开发。应用程序使用环境变量 IS_LOCAL = true 来检测其何时在本地运行。必须在 Shell 上或 IDE 的运行配置中设置环境变量 IS_LOCAL = true

  • 应用程序设置执行环境并读取运行时配置。要在 Amazon Managed Service for Apache Flink 上和本地运行,应用程序会检查 IS_LOCAL 变量。

    • 以下是应用程序在 Amazon Managed Service for Apache Flink 中运行时的默认行为:

      1. 加载随应用程序打包的依赖项。有关更多信息,请参阅(LINK)

      2. 从在 Amazon Managed Service for Apache Flink 应用程序中定义的运行时属性加载配置。有关更多信息,请参阅(LINK)

    • 如果在本地运行应用程序,则在应用程序检测 IS_LOCAL = true 时:

      1. 从项目加载外部依赖项。

      2. 从项目中包含的 application_properties.json 文件加载配置。

        ... APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json" ... is_local = ( True if os.environ.get("IS_LOCAL") else False ) ... if is_local: APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" CURRENT_DIR = os.path.dirname(os.path.realpath(__file__)) table_env.get_config().get_configuration().set_string( "pipeline.jars", "file:///" + CURRENT_DIR + "/target/pyflink-dependencies.jar", )
  • 应用程序使用 Kinesis 连接器定义带有 CREATE TABLE 语句的源表。此表从输入 Kinesis 流中读取数据。应用程序从运行时配置中获取流的名称、区域和初始位置。

    table_env.execute_sql(f""" CREATE TABLE prices ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{input_stream_name}', 'aws.region' = '{input_stream_region}', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """)
  • 在此示例中,应用程序还使用 Kinesis 连接器定义接收表。此表将数据发送到输出 Kinesis 流。

    table_env.execute_sql(f""" CREATE TABLE output ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3) ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{output_stream_name}', 'aws.region' = '{output_stream_region}', 'sink.partitioner-field-delimiter' = ';', 'sink.batch.max-size' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )""")
  • 最后,应用程序执行 SQL,该 SQL 从源表中 INSERT INTO... 接收器表。在更复杂的应用程序中,在写入接收器之前,您可能还要执行其他步骤来转换数据。

    table_result = table_env.execute_sql("""INSERT INTO output SELECT ticker, price, event_time FROM prices""")
  • 您必须在 main() 函数末尾再添加一个步骤,才能在本地运行应用程序:

    if is_local: table_result.wait()

    如果不使用此语句,则当您在本地运行应用程序时,它会立即终止。在 Amazon Managed Service for Apache Flink 中运行应用程序时,不得执行此语句。

管理 JAR 依赖项

PyFlink 应用程序通常需要一个或多个连接器。本教程中的应用程序使用 Kinesis 连接器。由于 Apache Flink 在 Java JVM 中运行,因此无论是否使用 Python 实施应用程序,连接器都会作为 JAR 文件分发。在 Amazon Managed Service for Apache Flink 上部署应用程序时,必须将这些依赖项与应用程序打包。

在此示例中,我们展示如何使用 Apache Maven 获取依赖项并打包应用程序以在 Managed Service for Apache Flink 上运行。

注意

还可使用其他方法获取和打包依赖项。此示例演示正确适用于一个或多个连接器的方法。它还允许您在本地运行应用程序以进行开发,也可以在 Managed Service for Apache Flink 上运行应用程序,而无需更改代码。

使用 pom.xml 文件

Apache Maven 使用 pom.xml 文件来控制依赖项和应用程序打包。

所有 JAR 依赖项均在 pom.xml 文件的 <dependencies>...</dependencies> 块中指定。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> ... <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>4.3.0-1.19</version> </dependency> </dependencies> ...

要查找要使用的正确构件和连接器版本,请参阅 将 Apache Flink 连接器与 Managed Service for Apache Flink 一起使用。请务必参考正在使用的 Apache Flink 版本。在此示例中,我们使用的是 Kinesis 连接器。对于 Apache Flink 1.19,连接器版本为 4.3.0-1.19

注意

如果您使用的是 Apache Flink 1.19,则没有专门为此版本发布的连接器版本。使用为 1.18 版本发布的连接器。

下载和打包依赖项

使用 Maven 下载 pom.xml 文件中定义的依赖项,然后将其打包给 Python Flink 应用程序。

  1. 导航到包含 Python 入门项目(名为 python/GettingStarted)的目录。

  2. 运行以下命令:

$ mvn package

Maven 创建名为 ./target/pyflink-dependencies.jar 的新文件。当您在计算机上进行本地开发时,Python 应用程序会查找此文件。

注意

如果您忘记运行此命令,则在尝试运行应用程序时,它将失败并显示错误:找不到标识符 "kinesis" 的任何工厂

将示例记录写入输入流

在本节中,使用 Python 脚本将示例记录写入流,以供应用程序处理。您可以通过两种方式生成示例数据,即使用 Python 脚本或使用 Kinesis Data Generator

使用 Python 脚本生成示例数据

您可以使用 Python 脚本将示例记录发送到数据流。

注意

要运行此 Python 脚本,必须使用 Python 3.x 并安装适用于 Python 的 Amazon SDK(Boto)库。

要开始向 Kinesis 输入流发送测试数据,请执行以下操作:

  1. 数据生成器 GitHub 存储库下载数据生成器 stock.py Python 脚本。

  2. 运行 stock.py 脚本:

    $ python stock.py

在完成本教程的其余部分时,请将脚本保持运行状态。您现在可以运行 Apache Flink 应用程序。

使用 Kinesis Data Generator 生成示例数据

除了使用 Python 脚本之外,还可以使用 Kinesis Data Generator(也以托管版本提供)将随机示例数据发送到流中。Kinesis Data Generator 在浏览器中运行,无需在计算机上安装任何工具。

要设置和运行 Kinesis Data Generator,请执行以下操作:

  1. 按照 Kinesis Data Generator 文档中的说明设置该工具的访问权限。您将运行用于设置用户和密码的 Amazon CloudFormation 模板。

  2. 通过 CloudFormation 模板生成的 URL 访问 Kinesis Data Generator。完成 CloudFormation 模板后,您可以在输出选项卡中找到该 URL。

  3. 配置数据生成器:

    • 区域:选择您在本教程中使用的区域:us-east-1

    • 流/传输流:选择应用程序将使用的输入流:ExampleInputStream

    • 每秒记录数:100

    • 记录模板:复制并粘贴以下模板:

      { "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
  4. 测试模板:选择测试模板并验证生成的记录是否与以下内容类似:

    { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
  5. 启动数据生成器:选择选择发送数据

Kinesis Data Generator 现在向 ExampleInputStream 发送数据。

在本地运行应用程序

可以在本地测试应用程序,并且使用 python main.py 从命令行或从 IDE 运行该应用程序。

要在本地运行应用程序,必须安装正确版本的 PyFLink 库,如上一节所述。有关更多信息,请参阅(LINK)

注意

在继续之前,请确认输入和输出流是否可用。请参阅 创建两个 Amazon Kinesis 数据流。此外,请确认您是否具有从两个流中读取和写入的权限。请参阅 对您的 Amazon 会话进行身份验证

将 Python 项目导入您的 IDE

要开始在 IDE 中处理应用程序,必须将其作为 Python 项目导入。

您克隆的存储库包含多个示例。每个示例都是一个单独的项目。在本教程中,请将 ./python/GettingStarted 子目录中的内容导入 IDE。

将代码作为现有 Python 项目导入。

注意

导入新 Python 项目的确切过程因所使用的 IDE 而异。

检查本地应用程序配置

在本地运行时,应用程序使用 ./src/main/resources 下项目资源文件夹中的 application_properties.json 文件内的配置。您可以编辑此文件以使用不同的 Kinesis 流名称或区域。

[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]

在本地运行 Python 应用程序

可以在本地运行应用程序,可以从命令行中作为常规 Python 脚本运行,也可以从 IDE 中运行。

要运行应用程序,请从命令行运行
  1. 确保在其中安装 Python Flink 库的 Conda 或 VirtuaLenv 等独立 Python 环境当前处于激活状态。

  2. 确保至少运行一次 mvn package

  3. 设置 IS_LOCAL = true 环境变量:

    $ export IS_LOCAL=true
  4. 将该应用程序作为常规 Python 脚本运行。

    $python main.py
从 IDE 中运行应用程序
  1. 使用以下配置将 IDE 配置为运行 main.py 脚本:

    1. 使用独立的 Python 环境,例如在其中安装 PyfLink 库的 Conda 或 VirtualenV。

    2. 使用 Amazon 凭证访问输入和输出 Kinesis 数据流。

    3. 设置 IS_LOCAL = true

  2. 设置运行配置的确切过程取决于您的 IDE,并且会有所不同。

  3. 设置 IDE 后,运行 Python 脚本,并在应用程序运行时使用 IDE 提供的工具。

在本地检查应用程序日志

在本地运行时,除了在应用程序启动时打印和显示的几行之外,应用程序不会在控制台中显示任何日志。PyfLink 将日志写入安装 Python Flink 库的目录中的一个文件。应用程序启动时会打印日志的位置。还可以运行以下命令来查找日志:

$ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
  1. 列出日志目录中的文件。通常会找到单个的 .log 文件。

  2. 在应用程序运行时追踪文件:tail -f <log-path>/<log-file>.log

观察 Kinesis 流中的输入和输出数据

您可以使用 Amazon Kinesis 控制台中的数据查看器观察由(生成示例 Python)或 Kinesis Data Generator(链接)发送到输入流的记录。

要观察记录,请执行以下操作:

停止在本地运行的应用程序

停止在 IDE 中运行的应用程序。IDE 通常会提供“停止”选项。确切的位置和方法取决于 IDE。

打包应用程序代码

在本节中,您会使用 Apache Maven 将应用程序代码和所有必需的依赖项打包到一个 .zip 文件中。

再次运行 Maven 打包命令:

$ mvn package

此命令会生成文件 target/managed-flink-pyflink-getting-started-1.0.0.zip

将应用程序包上传到 Amazon S3 存储桶

在本节中,您要将在上一节中创建的 .zip 文件上传到在本教程开始时创建的 Amazon Simple Storage Service(Amazon S3)存储桶中。如果您尚未完成此步骤,请参阅(链接)。

上传应用程序代码 JAR 文件
  1. 通过以下网址打开 Amazon S3 控制台:https://console.aws.amazon.com/s3/

  2. 选择您之前为应用程序代码创建的存储桶。

  3. 选择上传

  4. 选择添加文件

  5. 导航到上一步中生成的 .zip 文件:target/managed-flink-pyflink-getting-started-1.0.0.zip

  6. 在不更改任何其他设置的情况下选择上传

创建并配置 Managed Service for Apache Flink 应用程序

您可以使用控制台或 Amazon CLI 创建和配置 Managed Service for Apache Flink 应用程序。在本教程中,我们将使用控制台。

创建应用程序

  1. 登录 Amazon Web Services 管理控制台 并打开 Amazon MSF 控制台,网址为 https://console.aws.amazon.com/flink。

  2. 确认选择正确的区域:美国东部(弗吉尼亚北部)us-east-1。

  3. 打开右侧的菜单,选择 Apache Flink 应用程序,然后选择创建流应用程序。或者,从初始页面的入门部分中选择创建流应用程序

  4. 创建流应用程序页面上:

    • 选择设置流处理应用程序的方法中,选择从头开始创建

    • 对于 Apache Flink 配置,应用程序 Flink 版本,请选择 Apache Flink 1.19

    • 对于应用程序配置

      • 对于 应用程序名称 ,输入 MyApplication

      • 对于描述,输入 My Python test app

      • 对应用程序资源的访问权限中,选择创建/更新具有所需策略的 IAM 角色 Kinesis-analytics-myApplication-us-east-1

    • 对于应用程序模板设置

      • 对于模板,选择开发

    • 选择创建流应用程序

注意

在使用控制台创建应用程序的 Managed Service for Apache Flink时,您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源是使用您的应用程序名称和区域命名的,如下所示:

  • 策略:kinesis-analytics-service-MyApplication-us-west-2

  • 角色:kinesisanalytics-MyApplication-us-west-2

Amazon Managed Service for Apache Flink 之前称为 Kinesis Data Analytics。为实现向后兼容,自动创建的资源名称前缀为 kinesis-analytics

编辑 IAM 策略

编辑 IAM policy 以添加访问 Amazon S3 数据流的权限。

编辑 IAM policy 以添加 S3 存储桶权限
  1. 通过 https://console.aws.amazon.com/iam/ 打开 IAM 控制台。

  2. 选择策略。选择控制台在上一部分中为您创建的 kinesis-analytics-service-MyApplication-us-east-1 策略。

  3. 选择编辑,然后选择 JSON 选项卡。

  4. 将以下策略示例中突出显示的部分添加到策略中。将示例账户 ID (012345678901) 替换为您的账户 ID。

    JSON
    { "Version":"2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream" } ] }
  5. 选择下一步,然后选择保存更改

配置应用程序

编辑应用程序配置以设置应用程序代码构件。

配置应用程序
  1. 我的应用程序 页面上,选择配置

  2. 应用程序代码位置部分中:

    • 对于 Amazon S3 存储桶,请选择之前为应用程序代码创建的存储桶。选择浏览并选择正确的存储桶,然后选择选择。请勿在存储桶名称上选择。

    • Amazon S3 对象的路径中,输入 managed-flink-pyflink-getting-started-1.0.0.zip

  3. 对于访问权限,请选择创建/更新具有必要策略的 IAM 角色 kinesis-analytics-MyApplication-us-east-1

  4. 移至运行时属性,并保留所有其他设置的默认值。

  5. 选择添加新项目并添加以下每个参数:

    组 ID
    InputStream0 stream.name ExampleInputStream
    InputStream0 flink.stream.initpos LATEST
    InputStream0 aws.region us-east-1
    OutputStream0 stream.name ExampleOutputStream
    OutputStream0 aws.region us-east-1
    kinesis.analytics.flink.run.options python main.py
    kinesis.analytics.flink.run.options jarfile lib/pyflink-dependencies.jar
  6. 请勿修改任何其他部分,然后选择保存更改

注意

在选择启用 Amazon CloudWatch 日志记录时,Managed Service for Apache Flink 将为您创建日志组和日志流。这些资源的名称如下所示:

  • 日志组:/aws/kinesis-analytics/MyApplication

  • 日志流:kinesis-analytics-log-stream

运行应用程序

应用程序现已完成配置,准备好运行。

运行应用程序
  1. 在 Amazon Managed Service for Apache Flink 的控制台上,选择我的应用程序,然后选择运行

  2. 在下一页的“应用程序还原配置”页面上,选择使用最新快照运行,然后选择运行

    应用程序详细信息中的状态会从 Ready 转换到 Starting,然后在应用程序启动时转换到 Running

当应用程序处于 Running 状态时,您现在可以打开 Flink 控制面板。

打开控制面板
  1. 选择打开 Apache Flink 控制面板。控制面板将在新页面上打开。

  2. 正在运行的作业列表中,选择您可以看到的单个作业。

    注意

    如果您错误设置运行时属性或编辑 IAM 策略,则应用程序状态可能会变为 Running,但是 Flink 控制面板显示任务正在持续重新启动。如果应用程序配置错误或缺乏访问外部资源的权限,则通常会出现这种故障场景。

    发生这种情况时,请检查 Flink 控制面板中的异常选项卡以查看问题的原因。

观察运行中应用程序的指标

我的应用程序页面的 Amazon CloudWatch 指标部分,您可以看到正在运行的应用程序的一些基本指标。

查看指标
  1. 刷新按钮旁边,从下拉列表中选择 10 秒

  2. 当应用程序运行且运行状况良好时,您可以看到正常运行时间指标不断增加。

  3. 完全重新启动指标应为零。如果该指标增加,则配置可能存在问题。要调查问题,请审查 Flink 控制面板上的异常选项卡。

  4. 在运行状况良好的应用程序中,失败的检查点数指标应为零。

    注意

    此控制面板显示一组固定的指标,且粒度为 5 分钟。您可以在 CloudWatch 控制面板中创建包含任何指标的自定义应用程序控制面板。

观察 Kinesis 流中的输出数据

确保您仍在使用 Python 脚本或 Kinesis Data Generator 将数据发布到输入中。

现在,您可以使用 https://console.aws.amazon.com/kinesis/ 中的数据查看器来观察在 Managed Service for Apache Flink 上运行的应用程序的输出,类似于之前执行的操作。

查看输出
  1. 打开 Kinesis 控制台,网址为:https://console.aws.amazon.com/kinesis

  2. 确认该区域与您运行本教程时使用的区域相同。默认情况下,区域为 US-East-1US 东部(弗吉尼亚北部)。如有必要,可以更改区域。

  3. 选择数据流

  4. 选择您要观察的流。在本教程中,请使用 ExampleOutputStream

  5. 选择数据查看器选项卡。

  6. 选择任意分片,保持最新作为起始位置,然后选择获取记录。您可能会看到“未找到该请求的记录”错误。如果看到此错误,请选择重试获取记录。发布到流显示的最新记录。

  7. 在“数据”列中选择值以检查 JSON 格式的记录内容。

停止应用程序

要停止应用程序,请转至名为 MyApplication 的 Managed Service for Apache Flink 应用程序的控制台页面。

停止应用程序
  1. 操作下拉列表中,选择停止

  2. 应用程序详细信息中的状态会从 Running 转换到 Stopping,然后在应用程序完全停止时转换到 Ready

    注意

    请勿忘记还要停止从 Python 脚本或 Kinesis Data Generator 向输入流发送数据。

后续步骤

清理 Amazon资源