适用于 Apache Flink 的托管服务中的运行时属性 - Managed Service for Apache Flink
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

适用于 Apache Flink 的托管服务中的运行时属性

您可以使用运行时系统属性配置应用程序,而无需重新编译应用程序代码。

在控制台中使用运行时属性

您可以使用在 Apache Flink 托管服务应用程序中添加、更新或删除运行时属性。 Amazon Web Services Management Console

注意

如果您使用的是早期支持的 Apache Flink 版本,并且想要将现有应用程序升级到 Apache Flink 1.19.1,则可以使用就地升级 Apache Flink 版本来实现。通过就地版本升级,您可以针对单个 ARN 在 Apache Flink 版本中保持应用程序的可追溯性,包括快照、日志、指标、标签、Flink 配置等。您可以在 and st READY ate 中RUNNING使用此功能。有关更多信息,请参阅 Apache Flink 的就地版本升级

更新 Managed Service for Apache Flink 应用程序的运行时系统属性
  1. 打开 Managed Service for Apache Flink 控制台,网址为 https://console.aws.amazon.com/flink

  2. 选择您的 Managed Service for Apache Flink应用程序 选择 Application details (应用程序详细信息)

  3. 在应用程序页面上,选择 Configure (配置)

  4. 展开 Properties (属性) 部分。

  5. 使用 Properties (属性) 部分中的控件,以键值对形式定义一个属性组。可以使用这些控件添加、更新或删除属性组和运行时系统属性。

  6. 选择更新

在 CLI 中使用运行时属性

您可以使用 Amazon CLI 添加、更新或删除运行时系统属性。

本节包含为应用程序配置运行时系统属性的 API 操作的示例请求。有关如何将 JSON 文件用于 API 操作输入的信息,请参阅 适用于 Apache 的托管服务 Flink API 示例代码

注意

将以下示例中的示例账户 ID (012345678901) 替换为您的账户 ID。

在创建应用程序时添加运行时属性

CreateApplication 操作的以下示例请求在创建应用程序时添加两个运行时系统属性组(ProducerConfigPropertiesConsumerConfigProperties):

{ "ApplicationName": "MyApplication", "ApplicationDescription": "my java test app", "RuntimeEnvironment": "FLINK-1_19", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/MF-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "java-getting-started-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } }

在现有应用程序中添加和更新运行时属性

UpdateApplication 操作的以下示例请求为现有应用程序添加或更新运行时系统属性:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 2, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } }
注意

如果您使用的键在属性组中没有相应的运行时系统属性,则 Managed Service for Apache Flink将键值对添加为新属性。如果将一个键用于属性组中的现有运行时系统属性,Managed Service for Apache Flink将更新属性值。

移除运行时属性

UpdateApplication 操作的以下示例请求从现有应用程序中删除所有运行时系统属性和属性组:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 3, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": [] } } }
重要

如果省略现有的属性组或属性组中的现有属性键,则会删除该属性组或属性。

在 Apache Flink 应用程序的托管服务中访问运行时属性

您可以使用静态 KinesisAnalyticsRuntime.getApplicationProperties() 方法在 Java 应用程序代码中检索运行时系统属性,该方法返回一个 Map<String, Properties> 对象。

以下 Java 代码示例检索应用程序的运行时系统属性:

Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();

您按如下方式检索一个属性组(作为 Java.Util.Properties 对象):

Properties consumerProperties = applicationProperties.get("ConsumerConfigProperties");

通常,您传入 Properties 对象以配置 Apache Flink 源或接收器,而无需检索各个属性。以下代码示例说明了如何传入从运行时系统属性中检索的 Properties 对象以创建 Flink 源:

private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<String>(new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; }

有关代码示例,请参阅 Managed Service for Apache Flink:示例