示例:在 Scala 中创建滚动窗口 - Managed Service for Apache Flink
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

示例:在 Scala 中创建滚动窗口

注意

从 Flink 1.15 版本开始,Scala 免费。应用程序现在可以使用任何 Scala 版本的 Java API。Flink 仍然在内部的一些关键组件中使用 Scala,但没有将 Scala 暴露到用户代码类加载器中。因此,用户需要将 Scala 从属项添加到其 jar 存档中。

有关 Flink 1.15 中 Scala 变更的更多信息,请参阅 1.15 Scala 免费

在本练习中,您将创建一个使用 Scala 3.2.0 和 Flink 的 Java API 的简单流媒体应用程序。 DataStream 该应用程序从 Kinesis 流中读取数据,使用滑动窗口对其进行聚合,并将结果写入输出 Kinesis 流。

注意

要为本练习设置所需的先决条件,请先完成入门 (Scala) 练习。

下载并检查应用程序代码

此示例的 Python 应用程序代码可从中获得 GitHub。要下载应用程序代码,请执行以下操作:

  1. 如果尚未安装 Git 客户端,请安装它。有关更多信息,请参阅安装 Git

  2. 使用以下命令克隆远程存储库:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. 导航到 amazon-kinesis-data-analytics-java-examples/scala/TumblingWindow 目录。

请注意有关应用程序代码的以下信息:

  • build.sbt 文件包含有关应用程序的配置和从属项的信息,包括Managed Service for Apache Flink的库。

  • BasicStreamingJob.scala 文件包含定义应用程序功能的主要方法。

  • 应用程序使用 Kinesis 源从源流中进行读取。以下代码段创建 Kinesis 源:

    private def createSource: FlinkKinesisConsumer[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val inputProperties = applicationProperties.get("ConsumerConfigProperties") new FlinkKinesisConsumer[String](inputProperties.getProperty(streamNameKey, defaultInputStreamName), new SimpleStringSchema, inputProperties) }

    该应用程序还使用 Kinesis 接收器写入结果流。以下代码段创建 Kinesis 接收器:

    private def createSink: KinesisStreamsSink[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val outputProperties = applicationProperties.get("ProducerConfigProperties") KinesisStreamsSink.builder[String] .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema) .setStreamName(outputProperties.getProperty(streamNameKey, defaultOutputStreamName)) .setPartitionKeyGenerator((element: String) => String.valueOf(element.hashCode)) .build }
  • 应用程序使用窗口操作符在 5 秒的滚动窗口中查找每个股票代号的值计数。以下代码创建操作符,并将聚合的数据发送到新的 Kinesis Data Streams 接收器:

    environment.addSource(createSource) .map { value => val jsonNode = jsonParser.readValue(value, classOf[JsonNode]) new Tuple2[String, Int](jsonNode.get("ticker").toString, 1) } .returns(Types.TUPLE(Types.STRING, Types.INT)) .keyBy(v => v.f0) // Logically partition the stream for each ticker .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .sum(1) // Sum the number of tickers per partition .map { value => value.f0 + "," + value.f1.toString + "\n" } .sinkTo(createSink)
  • 应用程序创建源连接器和接收器连接器,以使用 StreamExecutionEnvironment 对象访问外部资源。

  • 该应用程序将使用动态应用程序属性创建源和接收连接器。读取应用程序的运行时系统属性来配置连接器。有关运行时系统属性的更多信息,请参阅运行时系统属性

编译并上传应用程序代码

在本节中,您将编译应用程序代码并将其上传到 Amazon S3 存储桶。

编译应用程序代码

使用 SBT 构建工具为应用程序构建 Scala 代码。要安装 SBT,请参阅使用 cs 安装程序安装 sbt。您还需要安装 Java 开发工具包 (JDK)。参阅完成练习的先决条件

  1. 要使用您的应用程序代码,您将其编译和打包成 JAR 文件。您可以用 SBT 编译和打包代码:

    sbt assembly
  2. 如果应用程序成功编译,则创建以下文件:

    target/scala-3.2.0/tumbling-window-scala-1.0.jar
上传 Apache Flink 流式处理 Scala 代码

在本节中,创建 Amazon S3 存储桶并上传应用程序代码。

  1. 打开 Amazon S3 控制台,网址为:https://console.aws.amazon.com/s3/

  2. 选择创建存储桶

  3. 存储桶名称 字段中输入 ka-app-code-<username>。将后缀(如您的用户名)添加到存储桶名称,以使其具有全局唯一性。选择 下一步

  4. 配置选项中,让设置保持原样,然后选择下一步

  5. 设置权限中,让设置保持原样,然后选择下一步

  6. 请选择创建存储桶

  7. 选择 ka-app-code-<username> 存储桶,然后选择上传

  8. 选择文件步骤中,选择添加文件。导航到您在上一步中创建的 tumbling-window-scala-1.0.jar 文件。

  9. 您无需更改该对象的任何设置,因此,请选择上传

您的应用程序代码现在存储在 Amazon S3 存储桶中,应用程序可以在其中访问代码。

创建并运行应用程序(控制台)

按照以下步骤,使用控制台创建、配置、更新和运行应用程序。

创建应用程序

  1. 打开 Managed Service for Apache Flink 控制台,网址为 https://console.aws.amazon.com/flink

  2. 在 Managed Service for Apache Flink 控制面板上,选择创建分析应用程序

  3. Managed Service for Apache Flink - 创建应用程序页面上,提供应用程序详细信息,如下所示:

    • 对于 应用程序名称 ,输入 MyApplication

    • 对于描述,输入 My Scala test app

    • 对于 运行时,请选择 Apache Flink

    • 将版本保留为 Apache Flink 版本 1.15.2(建议的版本)

  4. 对于访问权限,请选择 创建/更新 IAM 角色 kinesis-analytics-MyApplication-us-west-2

  5. 选择创建应用程序

注意

在使用控制台创建应用程序的 Managed Service for Apache Flink时,您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源是使用您的应用程序名称和区域命名的,如下所示:

  • 策略:kinesis-analytics-service-MyApplication-us-west-2

  • 角色:kinesisanalytics-MyApplication-us-west-2

配置应用程序

请使用以下过程来配置应用程序。

配置应用程序
  1. MyApplication页面上,选择配置

  2. 配置应用程序 页面上,提供 代码位置

    • 对于Amazon S3 存储桶,请输入ka-app-code-<username>

    • 在 Amazon S3 对象的路径中,输入tumbling-window-scala-1.0.jar

  3. 对应用程序的访问权限 下,对于 访问权限,选择 创建/更新 IAM 角色 kinesis-analytics-MyApplication-us-west-2

  4. 属性下面,选择添加组

  5. 输入以下信息:

    组 ID
    ConsumerConfigProperties input.stream.name ExampleInputStream
    ConsumerConfigProperties aws.region us-west-2
    ConsumerConfigProperties flink.stream.initpos LATEST

    选择保存

  6. 属性下面,再次选择添加组

  7. 输入以下信息:

    组 ID
    ProducerConfigProperties output.stream.name ExampleOutputStream
    ProducerConfigProperties aws.region us-west-2
  8. 监控 下,确保 监控指标级别 设置为 应用程序

  9. 要进行CloudWatch 日志记录,请选中 “启用” 复选框。

  10. 选择更新

注意

当您选择启用 Amazon CloudWatch 日志时,适用于 Apache Flink 的托管服务会为您创建一个日志组和日志流。这些资源的名称如下所示:

  • 日志组:/aws/kinesis-analytics/MyApplication

  • 日志流:kinesis-analytics-log-stream

编辑 IAM policy

编辑 IAM policy 以添加访问 Amazon S3 数据流的权限。

编辑 IAM policy 以添加 S3 存储桶权限
  1. 通过 https://console.aws.amazon.com/iam/ 打开 IAM 控制台。

  2. 选择策略。选择控制台在上一部分中为您创建的 kinesis-analytics-service-MyApplication-us-west-2 策略。

  3. 摘要 页面上,选择 编辑策略。选择 JSON 选项卡。

  4. 将以下策略示例中突出显示的部分添加到策略中。将示例账户 ID (012345678901) 替换为您的账户 ID。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/tumbling-window-scala-1.0.jar" ] }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

运行应用程序

可以通过运行应用程序、打开 Apache Flink 控制面板并选择所需的 Flink 任务来查看 Flink 任务图。

停止应用程序

要停止应用程序,请在MyApplication页面上选择停止。确认该操作。

创建并运行应用程序 (CLI)

在本节中,您将使用 Amazon Command Line Interface 创建和运行 Managed Service for Apache Flink 应用程序。使用 kinesisanalyticsv2 Amazon CLI命令创建 Managed Service for Apache Flink 应用程序并与之交互。

创建权限策略

注意

您必须为应用程序创建一个权限策略和角色。如果未创建这些 IAM 资源,应用程序将无法访问其数据和日志流。

首先,使用两个语句创建权限策略:一个语句授予对源流执行读取操作的权限,另一个语句授予对接收器流执行写入操作的权限。然后,将策略附加到 IAM 角色(下一部分中将创建此角色)。因此,在 Managed Service for Apache Flink代入该角色时,服务具有必要的权限从源流进行读取和写入接收器流。

使用以下代码创建 AKReadSourceStreamWriteSinkStream 权限策略。将 username 替换为您用于创建 Amazon S3 存储桶来存储应用程序代码的用户名。将 Amazon 资源名称 (ARN) 中的账户 ID (012345678901) 替换为您的账户 ID。MF-stream-rw-role服务执行角色应根据客户的特定角色量身定制。

{ "ApplicationName": "tumbling_window", "ApplicationDescription": "Scala tumbling window application", "RuntimeEnvironment": "FLINK-1_15", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/MF-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "tumbling-window-scala-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleInputStream", "flink.stream.initpos" : "LATEST" } }, { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleOutputStream" } } ] } }, "CloudWatchLoggingOptions": [ { "LogStreamARN": "arn:aws:logs:us-west-2:012345678901:log-group:MyApplication:log-stream:kinesis-analytics-log-stream" } ] }

有关创建权限策略的 step-by-step 说明,请参阅 IAM 用户指南中的教程:创建并附加您的第一个客户托管策略

创建 IAM 角色

在本节中,您将创建一个 IAM 角色,应用程序的 Managed Service for Apache Flink可以代入此角色来读取源流和写入接收器流。

权限不足时,Managed Service for Apache Flink 无法访问您的串流。您通过 IAM 角色授予这些权限。每个 IAM 角色附加了两种策略。此信任策略授予 Managed Service for Apache Flink代入该角色的权限,权限策略确定 Managed Service for Apache Flink代入这个角色后可以执行的操作。

您将在上一部分中创建的权限策略附加到此角色。

创建 IAM 角色
  1. 通过 https://console.aws.amazon.com/iam/ 打开 IAM 控制台。

  2. 在导航窗格中选择角色,然后选择创建角色

  3. 选择受信任实体的类型 下,选择 Amazon 服务

  4. 选择将使用此角色的服务 下,选择 Kinesis

  5. 选择您的用例部分,选择Managed Service for Apache Flink

  6. 选择下一步: 权限

  7. 附加权限策略 页面上,选择 下一步: 审核。在创建角色后,您可以附加权限策略。

  8. 创建角色 页面上,输入MF-stream-rw-role作为角色名称。选择 创建角色

    现在,您已经创建了一个名为 MF-stream-rw-role 的新 IAM 角色。接下来,您更新角色的信任和权限策略。

  9. 将权限策略附加到角色。

    注意

    对于本练习,Managed Service for Apache Flink代入此角色,以便同时从 Kinesis 数据流(源)读取数据和将输出写入另一个 Kinesis 数据流。因此,您附加在上一步创建权限策略中创建的策略。

    1. 摘要 页上,选择 权限 选项卡。

    2. 选择附加策略

    3. 在搜索框中,输入 AKReadSourceStreamWriteSinkStream(您在上一部分中创建的策略)。

    4. 选择AKReadSourceStreamWriteSinkStream策略,然后选择附加策略

现在,您已经创建了应用程序用来访问资源的服务执行角色。记下新角色的 ARN。

有关创建角色的 step-by-step 说明,请参阅 IAM 用户指南中的创建 IAM 角色(控制台)

创建应用程序

将以下 JSON 代码保存到名为 create_request.json 的文件中。将示例角色 ARN 替换为您之前创建的角色的 ARN。将存储桶 ARN 后缀 (用户名) 替换为在前一部分中选择的后缀。将服务执行角色中的示例账户 ID (012345678901) 替换为您的账户 ID。ServiceExecutionRole应包括您在上一节中创建的 IAM 用户角色。

"ApplicationName": "tumbling_window", "ApplicationDescription": "Scala getting started application", "RuntimeEnvironment": "FLINK-1_15", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/MF-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "tumbling-window-scala-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleInputStream", "flink.stream.initpos" : "LATEST" } }, { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleOutputStream" } } ] } }, "CloudWatchLoggingOptions": [ { "LogStreamARN": "arn:aws:logs:us-west-2:012345678901:log-group:MyApplication:log-stream:kinesis-analytics-log-stream" } ] }

CreateApplication使用以下请求执行以创建应用程序:

aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json

应用程序现已创建。您在下一步中启动应用程序。

启动应用程序

在本节中,您使用 StartApplication 操作来启动应用程序。

启动应用程序
  1. 将以下 JSON 代码保存到名为 start_request.json 的文件中。

    { "ApplicationName": "tumbling_window", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
  2. 使用上述请求执行 StartApplication 操作来启动应用程序:

    aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json

应用程序正在运行。您可以在亚马逊 CloudWatch 控制台上查看托管服务的 Apache Flink 指标,以验证应用程序是否正常运行。

停止应用程序

在本节中,您使用 StopApplication 操作来停止应用程序。

停止应用程序
  1. 将以下 JSON 代码保存到名为 stop_request.json 的文件中。

    { "ApplicationName": "tumbling_window" }
  2. 使用上述请求执行 StopApplication 操作来停止应用程序:

    aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json

应用程序现已停止。

添加 CloudWatch 日志选项

您可以使用将 Amazon CloudWatch 日志流Amazon CLI添加到您的应用程序中。有关在应用程序中使用 CloudWatch 日志的信息,请参阅设置应用程序日志记录

更新环境属性

在本节中,您使用 UpdateApplication 操作更改应用程序的环境属性,而无需重新编译应用程序代码。在该示例中,您更改源流和目标流的区域。

更新应用程序的环境属性
  1. 将以下 JSON 代码保存到名为 update_properties_request.json 的文件中。

    {"ApplicationName": "tumbling_window", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": [ { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleInputStream", "flink.stream.initpos" : "LATEST" } }, { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleOutputStream" } } ] } } }
  2. 使用前面的请求执行 UpdateApplication 操作以更新环境属性:

    aws kinesisanalyticsv2 update-application --cli-input-json file://update_properties_request.json

更新应用程序代码

当您需要使用新版本的代码包更新应用程序代码时,可以使用 UpdateApplicationCLI 操作。

注意

要使用相同的文件名加载新版本的应用程序代码,您必须指定新的对象版本。有关使用 Amazon S3 对象版本的更多信息,请参阅启用或禁用版本控制

要使用 Amazon CLI,请从 Amazon S3 存储桶中删除以前的代码包,上传新版本,然后调用 UpdateApplication 并指定相同的 Amazon S3 存储桶和对象名称以及新的对象版本。应用程序将使用新的代码包重新启动。

以下示例 UpdateApplication 操作请求重新加载应用程序代码并重新启动应用程序。将 CurrentApplicationVersionId 更新为当前的应用程序版本。您可以使用 ListApplicationsDescribeApplication 操作检查当前的应用程序版本。将存储桶名称后缀 (<用户名>) 更新为在创建相关资源一节中选择的后缀。

{ "ApplicationName": "tumbling_window", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "ApplicationCodeConfigurationUpdate": { "CodeContentUpdate": { "S3ContentLocationUpdate": { "BucketARNUpdate": "arn:aws:s3:::ka-app-code-username", "FileKeyUpdate": "tumbling-window-scala-1.0.jar", "ObjectVersionUpdate": "SAMPLEUehYngP87ex1nzYIGYgfhypvDU" } } } } }

清理 Amazon 资源

本节包含清理在滚动窗口教程中创建的 Amazon 资源的相关步骤。

删除 Managed Service for Apache Flink 应用程序

  1. 打开 Managed Service for Apache Flink 控制台,网址为 https://console.aws.amazon.com/flink

  2. 在 Apache Flink 的托管服务面板中,选择。MyApplication

  3. 在应用程序的页面中,选择 删除,然后确认删除。

删除您的 Kinesis 数据流

  1. 打开 Kinesis 控制台,网址为:https://console.aws.amazon.com/kinesis

  2. 在 Kinesis Data Streams 面板中,ExampleInputStream选择。

  3. 在该ExampleInputStream页面中,选择 “删除 Kinesis Stream”,然后确认删除。

  4. Kinesis 直播页面中 ExampleOutputStream,选择,选择操作,选择删除,然后确认删除。

删除您的 Amazon S3 对象和存储桶

  1. 打开 Amazon S3 控制台,网址为:https://console.aws.amazon.com/s3/

  2. 选择 ka-app-code- 存储桶。 <username>

  3. 选择 删除,然后输入存储桶名称以确认删除。

删除您的 IAM 资源

  1. 通过 https://console.aws.amazon.com/iam/ 打开 IAM 控制台。

  2. 在导航栏中,选择策略

  3. 在筛选条件控件中,输入 kinesis

  4. 选择 kinesis-analytics-service--us-MyApplication west-2 策略

  5. 选择 策略操作,然后选择 删除

  6. 在导航栏中,选择 角色

  7. 选择 k inesis-analytics-us-west-2 角色MyApplication

  8. 选择 删除角色,然后确认删除。

删除您的 CloudWatch 资源

  1. 打开 CloudWatch 控制台,网址为 https://console.aws.amazon.com/cloudwatch/

  2. 在导航栏中,选择 日志

  3. 选择 /aws/kinesis-analytics/ 日志组MyApplication

  4. 选择 删除日志组,然后确认删除。