创建并运行应用程序 (CLI) - Managed Service for Apache Flink
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

创建并运行应用程序 (CLI)

在本节中,您将使用创建和运行适用 Amazon Command Line Interface 于 Apache Flink 的托管服务应用程序。使用 k inesisanalyticsv2 Amazon CLI 命令为 Apache Flink 应用程序创建托管服务并与之交互。

创建权限策略

注意

您必须为应用程序创建一个权限策略和角色。如果您不创建这些IAM资源,则您的应用程序将无法访问其数据和日志流。

首先,使用两个语句创建权限策略:一个语句授予对源流执行读取操作的权限,另一个语句授予对接收器流执行写入操作的权限。然后,将该策略附加到一个IAM角色(将在下一节中创建)。因此,在 Managed Service for Apache Flink代入该角色时,服务具有必要的权限从源流进行读取和写入接收器流。

使用以下代码创建 AKReadSourceStreamWriteSinkStream 权限策略。将 username 替换为您用于创建 Amazon S3 存储桶来存储应用程序代码的用户名。将 Amazon 资源名称 (ARNs) 中的账户 ID (012345678901) 替换为您的账户 ID。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/getting-started-scala-1.0.jar" ] }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

有关创建权限策略的 step-by-step 说明,请参阅IAM用户指南中的教程:创建并附加您的第一个客户托管策略

创建IAM策略

在本节中,您将创建一个IAM角色,适用于 Apache Flink 的托管服务应用程序可以代入该角色来读取源流并写入接收流。

权限不足时,Managed Service for Apache Flink 无法访问您的串流。您可以通过IAM角色授予这些权限。每个IAM角色都有两个附加的策略。此信任策略授予 Managed Service for Apache Flink代入该角色的权限,权限策略确定 Managed Service for Apache Flink代入这个角色后可以执行的操作。

您将在上一部分中创建的权限策略附加到此角色。

创建 IAM 角色
  1. 打开IAM控制台,网址为https://console.aws.amazon.com/iam/

  2. 在导航窗格中选择角色,然后选择创建角色

  3. 选择受信任实体的类型 下,选择 Amazon 服务

  4. 选择将使用此角色的服务 下,选择 Kinesis

  5. 选择您的用例部分,选择Managed Service for Apache Flink

  6. 选择下一步: 权限

  7. 附加权限策略 页面上,选择 下一步: 审核。在创建角色后,您可以附加权限策略。

  8. 创建角色 页面上,输入MF-stream-rw-role作为角色名称。选择 Create role(创建角色)。

    现在,您已经创建了一个名为的新IAM角色MF-stream-rw-role。接下来,您更新角色的信任和权限策略。

  9. 将权限策略附加到角色。

    注意

    对于本练习,Managed Service for Apache Flink代入此角色,以便同时从 Kinesis 数据流(源)读取数据和将输出写入另一个 Kinesis 数据流。因此,您附加在上一步创建权限策略中创建的策略。

    1. 摘要 页上,选择 权限 选项卡。

    2. 选择附加策略

    3. 在搜索框中,输入 AKReadSourceStreamWriteSinkStream(您在上一部分中创建的策略)。

    4. 选择AKReadSourceStreamWriteSinkStream策略,然后选择附加策略

现在,您已经创建了应用程序用来访问资源的服务执行角色。记下ARN新角色。

有关创建角色的 step-by-step 说明,请参阅《IAM用户指南》中的创建IAM角色(控制台)

创建应用程序

将以下JSON代码保存到名为的文件中create_request.json。将示例角色ARN替换为您之前创建的角色所ARN对应的角色。将存储桶ARN后缀(用户名)替换为您在上一节中选择的后缀。将服务执行角色中的示例账户 ID (012345678901) 替换为您的账户 ID。

{ "ApplicationName": "getting_started", "ApplicationDescription": "Scala getting started application", "RuntimeEnvironment": "FLINK-1_19", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/MF-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "getting-started-scala-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleInputStream", "flink.stream.initpos" : "LATEST" } }, { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleOutputStream" } } ] } }, "CloudWatchLoggingOptions": [ { "LogStreamARN": "arn:aws:logs:us-west-2:012345678901:log-group:MyApplication:log-stream:kinesis-analytics-log-stream" } ] }

CreateApplication使用以下请求执行以创建应用程序:

aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json

应用程序现已创建。您在下一步中启动应用程序。

启动应用程序

在本节中,您将使用StartApplication操作来启动应用程序。

启动应用程序
  1. 将以下JSON代码保存到名为的文件中start_request.json

    { "ApplicationName": "getting_started", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
  2. 使用上述请求执行 StartApplication 操作来启动应用程序:

    aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json

应用程序正在运行。您可以在亚马逊 CloudWatch 控制台上查看托管服务的 Apache Flink 指标,以验证应用程序是否正常运行。

停止应用程序

在本节中,您将使用StopApplication操作来停止应用程序。

停止应用程序
  1. 将以下JSON代码保存到名为的文件中stop_request.json

    { "ApplicationName": "s3_sink" }
  2. 使用上述请求执行 StopApplication 操作来停止应用程序:

    aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json

应用程序现已停止。

添加 CloudWatch 日志选项

您可以使用将 Amazon CloudWatch 日志流 Amazon CLI 添加到您的应用程序中。有关在应用程序中使用 CloudWatch 日志的信息,请参阅设置应用程序日志记录

更新环境属性

在本节中,您将使用UpdateApplication操作来更改应用程序的环境属性,而无需重新编译应用程序代码。在该示例中,您更改源流和目标流的区域。

更新应用程序的环境属性
  1. 将以下JSON代码保存到名为的文件中update_properties_request.json

    { "ApplicationName": "getting_started", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": [ { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleInputStream", "flink.stream.initpos" : "LATEST" } }, { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleOutputStream" } } ] } }
  2. 使用前面的请求执行 UpdateApplication 操作以更新环境属性:

    aws kinesisanalyticsv2 update-application --cli-input-json file://update_properties_request.json

更新应用程序代码

当您需要使用新版本的代码包更新应用程序代码时,可以使用UpdateApplicationCLI操作。

注意

要使用相同的文件名加载新版本的应用程序代码,您必须指定新的对象版本。有关使用 Amazon S3 对象版本的更多信息,请参阅启用或禁用版本控制

要使用 Amazon CLI,请从 Amazon S3 存储桶中删除之前的代码包,上传新版本,然后调用UpdateApplication,指定相同的 Amazon S3 存储桶和对象名称以及新的对象版本。应用程序将使用新的代码包重新启动。

以下示例 UpdateApplication 操作请求重新加载应用程序代码并重新启动应用程序。将 CurrentApplicationVersionId 更新为当前的应用程序版本。您可以使用 ListApplicationsDescribeApplication 操作检查当前的应用程序版本。将存储桶名称后缀 (<用户名>) 更新为在创建依赖资源一节中选择的后缀。

{{ "ApplicationName": "getting_started", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "ApplicationCodeConfigurationUpdate": { "CodeContentUpdate": { "S3ContentLocationUpdate": { "BucketARNUpdate": "arn:aws:s3:::ka-app-code-<username>", "FileKeyUpdate": "getting-started-scala-1.0.jar", "ObjectVersionUpdate": "SAMPLEUehYngP87ex1nzYIGYgfhypvDU" } } } } }