创建并运行适用于 Apache Flink 的托管服务应用程序 - Managed Service for Apache Flink
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Amazon Managed Service for Apache Flink(Amazon MSF)之前称为 Amazon Kinesis Data Analytics for Apache Flink。

创建并运行适用于 Apache Flink 的托管服务应用程序

在此步骤中,您将创建 Managed Service for Apache Flink 应用程序,并将 Kinesis 数据流作为源和接收器。

创建相关资源

在本练习中,创建Managed Service for Apache Flink的应用程序之前,您需要创建以下从属资源:

  • 两个 Kinesis 流用于输入和输出

  • 存储应用程序代码的 Amazon S3 存储桶

    注意

    本教程假设您在 us-east-1 美国东部(弗吉尼亚州北部)区域部署应用程序。如果您使用其他区域,请相应地调整所有步骤。

创建两个 Amazon Kinesis 数据流

在为本练习创建 Managed Service for Apache Flink 应用程序之前,请创建两个 Kinesis 数据流(ExampleInputStreamExampleOutputStream)。您的应用程序将这些数据流用于应用程序源和目标流。

可以使用 Amazon Kinesis 控制台或以下 Amazon CLI 命令创建这些流。有关控制台说明,请参阅 Amazon Kinesis Data Streams 开发人员指南中的创建和更新数据流。要使用 Amazon CLI 创建流,请使用以下命令,并且根据应用程序使用的区域进行调整。

创建数据流 (Amazon CLI)
  1. 要创建第一个流(ExampleInputStream),请使用以下 Amazon Kinesis create-stream Amazon CLI 命令。

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 \
  2. 要创建应用程序用来写入输出的第二个流,请运行同一命令(将流名称更改为 ExampleOutputStream):

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1 \

为应用程序代码创建一个 Amazon S3 存储桶

您可以使用控制台来创建 Amazon S3 存储桶。要了解如何使用控制台创建 Amazon S3 存储桶,请参阅 Amazon S3 用户指南中的创建存储桶。使用全局唯一名称命名 Amazon S3 存储桶,例如通过附加登录名。

注意

请确保在用于本教程的区域(us-east-1)中创建存储桶。

其他资源

在您创建应用程序时,Managed Service for Apache Flink 会自动创建以下 Amazon CloudWatch 资源(如果这些资源尚不存在):

  • 名为 /AWS/KinesisAnalytics-java/<my-application> 的日志组

  • 名为 kinesis-analytics-log-stream 的日志流

设置本地开发环境

对于开发和调试,您可以直接从所选的 IDE 在计算机上运行 Apache Flink 应用程序。任何 Apache Flink 依赖项都像使用 Apache Maven 的常规 Java 依赖项一样进行处理。

注意

在开发计算机上,必须安装 Java JDK 11、Maven 和 Git。我们建议您使用开发环境(如 Eclipse Java NeonIntelliJ IDEA)。要验证您是否满足所有先决条件,请参阅 满足完成练习的先决条件。您需在计算机上安装 Apache Flink 集群。

对您的 Amazon 会话进行身份验证

该应用程序使用 Kinesis 数据流来发布数据。在本地运行时,您必须拥有经过身份验证的有效 Amazon 会话,并具有写入 Kinesis 数据流的权限。按照以下步骤对会话进行身份验证:

  1. 如果您没有 Amazon CLI 和已配置有效凭证的命名配置文件,请参阅 设置 Amazon Command Line Interface (Amazon CLI)

  2. 通过发布以下测试记录,验证您的 Amazon CLI 是否正确配置,并且您的用户有权写入 Kinesis 数据流:

    $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
  3. 如果您的 IDE 有集成 Amazon 的插件,则可以使用该插件将凭证传递给 IDE 中运行的应用程序。有关更多信息,请参阅适用于 IntelliJ IDEA 的 Amazon Toolkit适用于 Eclipse 的 Amazon Toolkit

下载并检查 Apache Flink 流式处理 Java 代码

在 GitHub 中提供了该示例的 Java 应用程序代码。要下载应用程序代码,请执行以下操作:

  1. 使用以下命令克隆远程存储库:

    git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
  2. 导航到 amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted 目录。

审核应用程序组件

该应用程序完全在 com.amazonaws.services.msf.BasicStreamingJob 类中实施。main() 方法定义用于处理流数据的数据流程并运行它。

注意

为优化开发人员体验,该应用程序设计为无需更改任何代码即可同时在 Amazon Managed Service for Apache Flink 上和本地运行,以便在您的 IDE 中进行开发。

  • 要读取运行时配置,使其在 Amazon Managed Service for Apache Flink 和 IDE 中能够正常运行,应用程序会自动检测它是否在 IDE 中本地独立运行。在这种情况下,应用程序加载运行时配置的方式会有所不同:

    1. 当应用程序检测到其在 IDE 中以独立模式运行时,会形成包含在项目 resources 文件夹中的 application_properties.json 文件。该文件的内容如下所示。

    2. 当应用程序在 Amazon Managed Service for Apache Flink 中运行时,默认行为会根据您将在 Amazon Managed Service for Apache Flink 应用程序中定义的运行时属性加载应用程序配置。请参阅 创建并配置 Managed Service for Apache Flink 应用程序

      private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (env instanceof LocalStreamEnvironment) { LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( BasicStreamingJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); return KinesisAnalyticsRuntime.getApplicationProperties(); } }
  • main() 方法定义应用程序数据流程并运行它。

    • 初始化默认的流环境。在此示例中,我们将展示如何创建将与 DataSteam API 结合使用的 StreamExecutionEnvironment 以及将与 SQL 和 Table API 结合使用的 StreamTableEnvironment。为使用不同的 API,这两个环境对象是对同一个运行时环境的两个单独引用。

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    • 加载应用程序配置参数。这将自动从正确的位置加载这些参数,具体取决于应用程序的运行位置:

      Map<String, Properties> applicationParameters = loadApplicationProperties(env);
    • 该应用程序使用 Kinesis Consumer 连接器定义一个源,用于从输入流中读取数据。输入流的配置以 PropertyGroupId=InputStream0 的方式定义。流的名称和区域分别位于名为 stream.nameaws.region 的属性中。为简单起见,此源将记录读取为字符串。

      private static FlinkKinesisConsumer<String> createSource(Properties inputProperties) { String inputStreamName = inputProperties.getProperty("stream.name"); return new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties); } ... public static void main(String[] args) throws Exception { ... SourceFunction<String> source = createSource(applicationParameters.get("InputStream0")); DataStream<String> input = env.addSource(source, "Kinesis Source"); ... }
    • 然后,应用程序使用 Kinesis Streams Sink 连接器定义接收器,以将数据发送到输出流。与输入流类似,输出流的名称和区域以 PropertyGroupId=OutputStream0 的方式定义。接收器直接连接到从源获取数据的内部 DataStream。在实际应用程序中,需要在源和接收器之间进行一些转换。

      private static KinesisStreamsSink<String> createSink(Properties outputProperties) { String outputStreamName = outputProperties.getProperty("stream.name"); return KinesisStreamsSink.<String>builder() .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema()) .setStreamName(outputStreamName) .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) .build(); } ... public static void main(String[] args) throws Exception { ... Sink<String> sink = createSink(applicationParameters.get("OutputStream0")); input.sinkTo(sink); ... }
    • 最后,运行刚才定义的数据流程。定义数据流程所需的所有运算符之后,这必须是 main() 方法的最后一条指令:

      env.execute("Flink streaming Java API skeleton");

使用 pom.xml 文件

pom.xml 文件定义应用程序所需的所有依赖项,并且设置 Maven Shade 插件来构建包含 Flink 所需的所有依赖项的 fat-jar。

  • 有些依赖项具有 provided 范围。当应用程序在 Amazon Managed Service for Apache Flink 中运行时,这些依赖项自动可用。它们是编译应用程序或在 IDE 中本地运行应用程序所必需的依赖项。有关更多信息,请参阅 在本地运行应用程序。请确保使用的 Flink 版本与 Amazon Managed Service for Apache Flink 中使用的运行时版本相同。

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
  • 必须使用默认范围向 pom 添加其他 Apache Flink 依赖项,例如此应用程序使用的 Kinesis 连接器。有关更多信息,请参阅 使用 Apache Flink 连接器。您还可以添加应用程序所需的任何其他 Java 依赖项。

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>${aws.connector.version}</version> </dependency>
  • Maven Java 编译器插件确保根据 Java 11 编译代码,Java 11 是 Apache Flink 目前支持的 JDK 版本。

  • Maven Shade 插件打包 fat-jar,但不包括运行时提供的一些库。它还指定两个转换器:ServicesResourceTransformerManifestResourceTransformer。后者配置包含启动应用程序的 main 方法的类。如果重命名主类,请不要忘记更新此转换器。

  • <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass> </transformer> ... </plugin>

将示例记录写入输入流

在本节中,使用 Python 脚本将示例记录写入流,以供应用程序处理。您可以通过两种方式生成示例数据,即使用 Python 脚本或使用 Kinesis Data Generator

使用 Python 脚本生成示例数据

您可以使用 Python 脚本将示例记录发送到数据流。

注意

要运行此 Python 脚本,必须使用 Python 3.x 并安装适用于 Python 的 Amazon SDK(Boto)库。

要开始向 Kinesis 输入流发送测试数据,请执行以下操作:

  1. 数据生成器 GitHub 存储库下载数据生成器 stock.py Python 脚本。

  2. 运行 stock.py 脚本:

    $ python stock.py

在完成本教程的其余部分时,请将脚本保持运行状态。您现在可以运行 Apache Flink 应用程序。

使用 Kinesis Data Generator 生成示例数据

除了使用 Python 脚本之外,还可以使用 Kinesis Data Generator(也以托管版本提供)将随机示例数据发送到流中。Kinesis Data Generator 在浏览器中运行,无需在计算机上安装任何工具。

要设置和运行 Kinesis Data Generator,请执行以下操作:

  1. 按照 Kinesis Data Generator 文档中的说明设置该工具的访问权限。您将运行用于设置用户和密码的 Amazon CloudFormation 模板。

  2. 通过 CloudFormation 模板生成的 URL 访问 Kinesis Data Generator。完成 CloudFormation 模板后,您可以在输出选项卡中找到该 URL。

  3. 配置数据生成器:

    • 区域:选择您在本教程中使用的区域:us-east-1

    • 流/传输流:选择应用程序将使用的输入流:ExampleInputStream

    • 每秒记录数:100

    • 记录模板:复制并粘贴以下模板:

      { "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
  4. 测试模板:选择测试模板并验证生成的记录是否与以下内容类似:

    { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
  5. 启动数据生成器:选择选择发送数据

Kinesis Data Generator 现在向 ExampleInputStream 发送数据。

在本地运行应用程序

您可以在 IDE 中本地运行和调试 Flink 应用程序。

注意

在继续之前,请确认输入和输出流是否可用。请参阅 创建两个 Amazon Kinesis 数据流。此外,请确认您是否具有从两个流中读取和写入的权限。请参阅 对您的 Amazon 会话进行身份验证

设置本地开发环境需要 Java 11 JDK、Apache Maven 和 IDE 来进行 Java 开发。确认您满足所需的先决条件。请参阅 满足完成练习的先决条件

将 Java 项目导入您的 IDE

要开始在 IDE 中使用该应用程序,必须将其作为 Java 项目导入。

您克隆的存储库包含多个示例。每个示例都是一个单独的项目。在本教程中,请将 ./java/GettingStarted 子目录中的内容导入 IDE。

使用 Maven 将代码作为现有 Java 项目插入。

注意

导入新 Java 项目的确切过程因所使用的 IDE 而异。

检查本地应用程序配置

在本地运行时,应用程序使用 ./src/main/resources 下项目资源文件夹中的 application_properties.json 文件内的配置。您可以编辑此文件以使用不同的 Kinesis 流名称或区域。

[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]

设置 IDE 运行配置

您可以像运行任何 Java 应用程序一样,通过运行主类 com.amazonaws.services.msf.BasicStreamingJob 直接从 IDE 运行和调试 Flink 应用程序。运行应用程序前,必须设置“运行”配置。该设置取决于您使用的 IDE。例如,请参阅 IntelliJ IDEA 文档中的运行/调试配置。具体而言,您必须设置以下内容:

  1. provided 依赖项添加到类路径中。这是确保在本地运行时将具有 provided 范围的依赖项传递给应用程序所必需的条件。如果不进行此设置,应用程序会立即显示 class not found 错误。

  2. 将访问 Kinesis 流的 Amazon 凭证传递给应用程序。最快的方法是使用适用于 IntelliJ IDEA 的 Amazon Toolkit。在“运行”配置中使用此 IDE 插件,可以选择特定的 Amazon 配置文件。 Amazon 使用此配置文件进行身份验证。您无需直接传递 Amazon 凭证。

  3. 验证 IDE 是否使用 JDK 11 运行应用程序。

在 IDE 中运行应用程序

设置 BasicStreamingJob 的“运行”配置后,您可以像常规 Java 应用程序一样运行或调试它。

注意

不能从命令行使用 java -jar ... 直接运行 Maven 生成的 fat-jar。此 jar 不包含独立运行应用程序所需的 Flink 核心依赖项。

当应用程序成功启动时,它会记录一些有关独立微型集群和连接器初始化的信息。接下来是 Flink 通常在应用程序启动时发出的许多信息和一些警告日志。

13:43:31,405 INFO com.amazonaws.services.msf.BasicStreamingJob [] - Loading application properties from 'flink-application-properties-dev.json' 13:43:31,549 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Flink Kinesis Consumer is going to read the following streams: ExampleInputStream, 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb. 13:43:31,677 INFO org.apache.flink.runtime.minicluster.MiniCluster [] - Starting Flink Mini Cluster ....

初始化完成后,应用程序不会再发出任何日志条目。当数据流动时,不会发出任何日志。

要验证应用程序是否正确处理数据,您可以检查输入和输出 Kinesis 流,如下一节所述。

注意

Flink 应用程序的正常行为是不发出有关流动数据的日志。在每条记录上发出日志可能便于调试,但在生产环境中运行时可能会增加大量开销。

观察 Kinesis 流中的输入和输出数据

您可以使用 Amazon Kinesis 控制台中的数据查看器观察由(生成示例 Python)或 Kinesis Data Generator(链接)发送到输入流的记录。

观察记录
  1. 打开 Kinesis 控制台,网址为:https://console.aws.amazon.com/kinesis

  2. 验证区域是否与运行本教程的区域相同,默认为 us-east-1 美国东部(弗吉尼亚州北部)。如果区域不匹配,请更改区域。

  3. 选择数据流

  4. 选择您要观察的流,即 ExampleInputStreamExampleOutputStream.

  5. 选择数据查看器选项卡。

  6. 选择任意分片,保持最新作为起始位置,然后选择获取记录。您可能会看到“未找到该请求的记录”错误。如果看到此错误,请选择重试获取记录。发布到流显示的最新记录。

  7. 在“数据”列中选择值以检查 JSON 格式的记录内容。

停止在本地运行的应用程序

停止在 IDE 中运行的应用程序。IDE 通常会提供“停止”选项。确切的位置和方法取决于您使用的 IDE。

编译并打包您的应用程序代码

在本节中,您将使用 Apache Maven 编译 Java 代码并将其打包到 JAR 文件中。您可以使用 Maven 命令行工具或 IDE 编译和打包代码。

要使用 Maven 命令行进行编译和打包,请执行以下操作:

移至包含 Java GettingStarted 项目的目录,然后运行以下命令:

$ mvn package

要使用 IDE 进行编译和打包,请执行以下操作:

从 IDE Maven 集成中运行 mvn package

在这两种情况下,都会创建以下 JAR 文件:target/amazon-msf-java-stream-app-1.0.jar

注意

从 IDE 运行“构建项目”可能无法创建 JAR 文件。

上传应用程序代码 JAR 文件

在本节中,您要将在上一节中创建的 JAR 文件上传到在本教程开始时创建的 Amazon Simple Storage Service(Amazon S3)存储桶中。如果您尚未完成此步骤,请参阅(链接)。

上传应用程序代码 JAR 文件
  1. 通过以下网址打开 Amazon S3 控制台:https://console.aws.amazon.com/s3/

  2. 选择您之前为应用程序代码创建的存储桶。

  3. 选择上传

  4. 选择添加文件

  5. 导航到上一步中生成的 JAR 文件:target/amazon-msf-java-stream-app-1.0.jar

  6. 在不更改任何其他设置的情况下选择上传

警告

确保在 <repo-dir>/java/GettingStarted/target/amazon-msf-java-stream-app-1.0.jar 中选择正确的 JAR 文件。

target 目录还包含您无需上传的其他 JAR 文件。

创建并配置 Managed Service for Apache Flink 应用程序

您可以使用控制台或 Amazon CLI 创建和运行适用于 Apache Flink 的托管服务的应用程序。在本教程中,您将使用控制台。

注意

当您使用控制台创建应用程序时,将创建您的 Amazon Identity and Access Management(IAM) 和 Amazon CloudWatch Logs 资源。当您使用 Amazon CLI 创建应用程序时,您可以单独创建这些资源。

创建应用程序

创建应用程序
  1. 登录 Amazon Web Services 管理控制台 并打开 Amazon MSF 控制台,网址为 https://console.aws.amazon.com/flink。

  2. 确认选择正确的区域:us-east-1 美国东部(弗吉尼亚北部)

  3. 打开右侧的菜单,选择 Apache Flink 应用程序,然后选择创建流应用程序。或者,在初始页面的“入门”容器中选择创建流应用程序

  4. 创建流应用程序页面上:

    • 选择设置流处理应用程序的方法:选择从头开始创建

    • Apache Flink 配置,应用程序 Flink 版本:选择 Apache Flink 1.20

  5. 配置应用程序

    • 应用程序名称:输入 MyApplication

    • 描述:输入 My java test app

    • 访问应用程序资源:选择使用所需策略创建/更新 IAM 角色 kinesis-analytics-MyApplication-us-east-1

  6. 配置用于应用程序设置的模板

    • 模板:选择开发

  7. 选择页面底部的创建流应用程序

注意

在使用控制台创建应用程序的 Managed Service for Apache Flink时,您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源是使用您的应用程序名称和区域命名的,如下所示:

  • 策略:kinesis-analytics-service-MyApplication-us-east-1

  • 角色:kinesisanalytics-MyApplication-us-east-1

Amazon Managed Service for Apache Flink 之前称为 Kinesis Data Analytics。为实现向后兼容,自动创建的资源名称前缀为 kinesis-analytics-

编辑 IAM 策略

编辑 IAM policy 以添加访问 Kinesis 数据流的权限。

编辑策略
  1. 通过 https://console.aws.amazon.com/iam/ 打开 IAM 控制台。

  2. 选择策略。选择控制台在上一部分中为您创建的 kinesis-analytics-service-MyApplication-us-east-1 策略。

  3. 选择编辑,然后选择 JSON 选项卡。

  4. 将以下策略示例中突出显示的部分添加到策略中。将示例账户 ID (012345678901) 替换为您的账户 ID。

    JSON
    { "Version":"2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream" } ] }
  5. 选择页面底部的下一步,然后选择保存更改

配置应用程序

编辑应用程序配置以设置应用程序代码构件。

编辑配置
  1. 我的应用程序 页面上,选择配置

  2. 应用程序代码位置部分中:

    • 对于 Amazon S3 存储桶,请选择之前为应用程序代码创建的存储桶。选择浏览并选择正确的存储桶,然后选择选择。请勿单击存储桶名称。

    • Amazon S3 对象的路径中,输入 amazon-msf-java-stream-app-1.0.jar

  3. 对于访问权限,请选择创建/更新具有必要策略的 IAM 角色 kinesis-analytics-MyApplication-us-east-1

  4. 运行时属性部分中,添加以下属性。

  5. 选择添加新项目并添加以下每个参数:

    组 ID
    InputStream0 stream.name ExampleInputStream
    InputStream0 aws.region us-east-1
    OutputStream0 stream.name ExampleOutputStream
    OutputStream0 aws.region us-east-1
  6. 请勿修改任何其他部分。

  7. 选择保存更改

注意

在选择启用 Amazon CloudWatch 日志记录时,Managed Service for Apache Flink 将为您创建日志组和日志流。这些资源的名称如下所示:

  • 日志组:/aws/kinesis-analytics/MyApplication

  • 日志流:kinesis-analytics-log-stream

运行应用程序

应用程序现已完成配置,准备好运行。

运行应用程序
  1. 在 Amazon Managed Service for Apache Flink 的控制台上,选择我的应用程序,然后选择运行

  2. 在下一页的“应用程序还原配置”页面上,选择使用最新快照运行,然后选择运行

    应用程序详细信息中的状态会从 Ready 转换到 Starting,然后在应用程序启动时转换到 Running

当应用程序处于 Running 状态时,您现在可以打开 Flink 控制面板。

打开控制面板
  1. 选择打开 Apache Flink 控制面板。控制面板将在新页面上打开。

  2. 正在运行的作业列表中,选择您可以看到的单个作业。

    注意

    如果您错误设置运行时属性或编辑 IAM 策略,则应用程序状态可能会变为 Running,但是 Flink 控制面板显示任务正在持续重新启动。如果应用程序配置错误或缺乏访问外部资源的权限,则通常会出现这种故障场景。

    发生这种情况时,请检查 Flink 控制面板中的异常选项卡以查看问题的原因。

观察运行中应用程序的指标

我的应用程序页面的 Amazon CloudWatch 指标部分,您可以看到正在运行的应用程序的一些基本指标。

查看指标
  1. 刷新按钮旁边,从下拉列表中选择 10 秒

  2. 当应用程序运行且运行状况良好时,您可以看到正常运行时间指标不断增加。

  3. 完全重新启动指标应为零。如果该指标增加,则配置可能存在问题。要调查问题,请审查 Flink 控制面板上的异常选项卡。

  4. 在运行状况良好的应用程序中,失败的检查点数指标应为零。

    注意

    此控制面板显示一组固定的指标,且粒度为 5 分钟。您可以在 CloudWatch 控制面板中创建包含任何指标的自定义应用程序控制面板。

观察 Kinesis 流中的输出数据

确保您仍在使用 Python 脚本或 Kinesis Data Generator 将数据发布到输入中。

现在,您可以使用 https://console.aws.amazon.com/kinesis/ 中的数据查看器来观察在 Managed Service for Apache Flink 上运行的应用程序的输出,类似于之前执行的操作。

查看输出
  1. 打开 Kinesis 控制台,网址为:https://console.aws.amazon.com/kinesis

  2. 确认该区域与您运行本教程时使用的区域相同。默认情况下,区域为 US-East-1US 东部(弗吉尼亚北部)。如有必要,可以更改区域。

  3. 选择数据流

  4. 选择您要观察的流。在本教程中,请使用 ExampleOutputStream

  5. 选择数据查看器选项卡。

  6. 选择任意分片,保持最新作为起始位置,然后选择获取记录。您可能会看到“未找到该请求的记录”错误。如果看到此错误,请选择重试获取记录。发布到流显示的最新记录。

  7. 在“数据”列中选择值以检查 JSON 格式的记录内容。

停止应用程序

要停止应用程序,请转至名为 MyApplication 的 Managed Service for Apache Flink 应用程序的控制台页面。

停止应用程序
  1. 操作下拉列表中,选择停止

  2. 应用程序详细信息中的状态会从 Running 转换到 Stopping,然后在应用程序完全停止时转换到 Ready

    注意

    请勿忘记还要停止从 Python 脚本或 Kinesis Data Generator 向输入流发送数据。

后续步骤

清理 Amazon资源