Managed Service for Apache Flink 中的运行时属性 - Managed Service for Apache Flink
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Managed Service for Apache Flink 中的运行时属性

您可以使用运行时属性配置应用程序,而无需重新编译应用程序代码。

在控制台中使用运行时属性

您可以使用控制台在 Managed Service for Apache Flink 中添加、更新或删除运行时属性。

注意

在 Managed Service for Apache Flink 的控制台中创建应用程序时,您无法添加运行时属性。

更新 Managed Service for Apache Flink 应用程序的运行时属性
  1. 打开 Managed Service for Apache Flink 控制台,网址为 https://console.aws.amazon.com/flink

  2. 选择您的 Managed Service for Apache Flink应用程序 选择 Application details (应用程序详细信息)

  3. 在应用程序页面上,选择 Configure (配置)

  4. 展开 Properties (属性) 部分。

  5. 使用 Properties (属性) 部分中的控件,以键值对形式定义一个属性组。可以使用这些控件添加、更新或删除属性组和运行时属性。

  6. 选择更新

在 CLI 中使用运行时属性

您可以使用 Amazon CLI 添加、更新或删除运行时属性。

本节包含为应用程序配置运行时属性的 API 操作的示例请求。有关如何将 JSON 文件用于 API 操作输入的信息,请参阅 Managed Service for Apache Flink API 示例代码

注意

将以下示例中的示例账户 ID (012345678901) 替换为您的账户 ID。

在创建应用程序时添加运行时属性

CreateApplication 操作的以下示例请求在创建应用程序时添加两个运行时属性组(ProducerConfigPropertiesConsumerConfigProperties):

{ "ApplicationName": "MyApplication", "ApplicationDescription": "my java test app", "RuntimeEnvironment": "FLINK-1_15", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/MF-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "java-getting-started-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } }

在现有应用程序中添加和更新运行时属性

UpdateApplication 操作的以下示例请求为现有应用程序添加或更新运行时属性:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 2, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } }
注意

如果您使用的键在属性组中没有相应的运行时属性,则 Managed Service for Apache Flink将键值对添加为新属性。如果将一个键用于属性组中的现有运行时属性,Managed Service for Apache Flink将更新属性值。

删除运行时属性

UpdateApplication 操作的以下示例请求从现有应用程序中删除所有运行时属性和属性组:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 3, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": [] } } }
重要

如果省略现有的属性组或属性组中的现有属性键,则会删除该属性组或属性。

访问 Managed Service for Apache Flink 应用程序中的运行时属性

您可以使用静态 KinesisAnalyticsRuntime.getApplicationProperties() 方法在 Java 应用程序代码中检索运行时属性,该方法返回一个 Map<String, Properties> 对象。

以下 Java 代码示例检索应用程序的运行时属性:

Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();

您按如下方式检索一个属性组(作为 Java.Util.Properties 对象):

Properties consumerProperties = applicationProperties.get("ConsumerConfigProperties");

通常,您传入 Properties 对象以配置 Apache Flink 源或接收器,而无需检索各个属性。以下代码示例说明了如何传入从运行时属性中检索的 Properties 对象以创建 Flink 源:

private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<String>(new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; }

有关使用运行时属性的完整代码示例,请参阅入门指南 (DataStream API)。入门应用程序的源代码可在 Managed Service for Apache Flink Java 示例 GitHub 存储库中的入门中找到。