创建并运行应用程序 (CLI) - Managed Service for Apache Flink
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

创建并运行应用程序 (CLI)

在本节中,您将使用 Amazon Command Line Interface 创建和运行 Managed Service for Apache Flink 应用程序。使用 kinesisanalyticsv2 Amazon CLI命令创建 Managed Service for Apache Flink 应用程序并与之交互。

创建权限策略

注意

您必须为应用程序创建一个权限策略和角色。如果未创建这些 IAM 资源,应用程序将无法访问其数据和日志流。

首先,使用两个语句创建权限策略:一个语句授予对源流执行读取操作的权限,另一个语句授予对接收器流执行写入操作的权限。然后,将策略附加到 IAM 角色(下一节中将创建此角色)。因此,在 Managed Service for Apache Flink代入该角色时,服务具有必要的权限从源流进行读取和写入接收器流。

使用以下代码创建 AKReadSourceStreamWriteSinkStream 权限策略。将 username 替换为您用于创建 Amazon S3 存储桶以存储应用程序代码的用户名。将 Amazon 资源名称 (ARN) 中的账户 ID (012345678901) 替换为您的账户 ID。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/getting-started-scala-1.0.jar" ] }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

有关创建权限策略的分步说明,请参阅《IAM 用户指南》中的教程:创建和附加您的第一个客户管理型策略

创建 IAM policy

在本节中,您将创建一个 IAM 角色,应用程序的 Managed Service for Apache Flink可以代入此角色来读取源流和写入接收器流。

权限不足时,Managed Service for Apache Flink 无法访问您的串流。您通过 IAM 角色授予这些权限。每个 IAM 角色附加了两种策略。此信任策略授予 Managed Service for Apache Flink代入该角色的权限,权限策略确定 Managed Service for Apache Flink代入这个角色后可以执行的操作。

您将在上一部分中创建的权限策略附加到此角色。

创建 IAM 角色
  1. 通过以下网址打开 IAM 控制台:https://console.aws.amazon.com/iam/

  2. 在导航窗格中选择 角色,然后选择 创建角色

  3. 选择受信任实体的类型 下,选择 Amazon 服务

  4. Choose the service that will use this role (选择将使用此角色的服务) 下,选择 Kinesis

  5. 选择您的用例部分,选择Managed Service for Apache Flink

  6. 选择Next: Permissions(下一步: 权限)

  7. Attach permissions policies 页面上,选择 Next: Review。在创建角色后,您可以附加权限策略。

  8. 创建角色 页面上,输入MF-stream-rw-role作为角色名称。选择 Create role(创建角色)。

    现在,您已经创建了一个名为 MF-stream-rw-role 的新 IAM 角色。接下来,您更新角色的信任和权限策略。

  9. 将权限策略附加到角色。

    注意

    对于本练习,Managed Service for Apache Flink代入此角色,以便同时从 Kinesis 数据流(源)读取数据和将输出写入另一个 Kinesis 数据流。因此,您附加在上一步“创建权限策略”中创建的策略。

    1. Summary (摘要) 页上,选择 Permissions (权限) 选项卡。

    2. 选择附加策略

    3. 在搜索框中,输入 AKReadSourceStreamWriteSinkStream(您在上一部分中创建的策略)。

    4. 选择AKReadSourceStreamWriteSinkStream策略,然后选择附加策略

现在,您已经创建了应用程序用来访问资源的服务执行角色。记下新角色的 ARN。

有关创建角色的分步说明,请参阅《IAM 用户指南》中的在您的中创建 IAM 角色(控制台)

创建应用程序

将以下 JSON 代码保存到名为 create_request.json 的文件中。将示例角色 ARN 替换为您之前创建的角色的 ARN。将存储桶 ARN 后缀 (用户名) 替换为在前一部分中选择的后缀。将服务执行角色中的示例账户 ID (012345678901) 替换为您的账户 ID。

{ "ApplicationName": "getting_started", "ApplicationDescription": "Scala getting started application", "RuntimeEnvironment": "FLINK-1_15", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/MF-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "getting-started-scala-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleInputStream", "flink.stream.initpos" : "LATEST" } }, { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleOutputStream" } } ] } }, "CloudWatchLoggingOptions": [ { "LogStreamARN": "arn:aws:logs:us-west-2:012345678901:log-group:MyApplication:log-stream:kinesis-analytics-log-stream" } ] }

使用以下请求执行 创建应用程序 以创建应用程序

aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json

应用程序现已创建。您在下一步中启动应用程序。

启动应用程序

在本节中,您使用 开始申请 操作来启动应用程序。

启动应用程序
  1. 将以下 JSON 代码保存到名为 start_request.json 的文件中。

    { "ApplicationName": "getting_started", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
  2. 使用上述请求执行 StartApplication 操作来启动应用程序:

    aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json

应用程序正在运行。您可以在 Amazon CloudWatch 控制台上查看Managed Service for Apache Flink的指标,以验证应用程序是否正常运行。

停止应用程序

在本节中,您使用 停止应用程序 操作来停止应用程序。

停止应用程序
  1. 将以下 JSON 代码保存到名为 stop_request.json 的文件中。

    { "ApplicationName": "s3_sink" }
  2. 使用上述请求执行 StopApplication 操作来停止应用程序:

    aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json

应用程序现已停止。

添加 CloudWatch 日志记录选项

您可以使用 Amazon CLI 将 Amazon CloudWatch 日志流添加到应用程序中。有关在应用程序中使用 CloudWatch Logs 的信息,请参阅设置应用程序日志记录。

更新环境属性

在本节中,您使用 U更新应用程序 操作更改应用程序的环境属性,而无需重新编译应用程序代码。在该示例中,您更改源流和目标流的区域。

更新应用程序的环境属性
  1. 将以下 JSON 代码保存到名为 update_properties_request.json 的文件中。

    { "ApplicationName": "getting_started", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": [ { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleInputStream", "flink.stream.initpos" : "LATEST" } }, { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleOutputStream" } } ] } }
  2. 使用前面的请求执行 UpdateApplication 操作以更新环境属性:

    aws kinesisanalyticsv2 update-application --cli-input-json file://update_properties_request.json

更新应用程序代码

在您需要使用新版本的代码包更新应用程序代码时,您可以使用 更新应用程序 CLI 操作。

注意

要使用相同的文件名加载新版本的应用程序代码,您必须指定新的对象版本。有关使用 Amazon S3 对象版本的更多信息,请参阅启用或禁用版本控制

要使用 Amazon CLI,请从 Amazon S3 存储桶中删除以前的代码包,上传新版本,然后调用 UpdateApplication 并指定相同的 Amazon S3 存储桶和对象名称以及新的对象版本。应用程序将使用新的代码包重新启动。

以下示例 UpdateApplication 操作请求重新加载应用程序代码并重新启动应用程序。将 CurrentApplicationVersionId 更新为当前的应用程序版本。您可以使用 ListApplicationsDescribeApplication 操作检查当前的应用程序版本。将存储桶名称后缀 (<用户名>) 更新为在创建相关资源一节中选择的后缀。

{{ "ApplicationName": "getting_started", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "ApplicationCodeConfigurationUpdate": { "CodeContentUpdate": { "S3ContentLocationUpdate": { "BucketARNUpdate": "arn:aws:s3:::ka-app-code-<username>", "FileKeyUpdate": "getting-started-scala-1.0.jar", "ObjectVersionUpdate": "SAMPLEUehYngP87ex1nzYIGYgfhypvDU" } } } } }