Kinesis Data Analytics for Java Applications 中的运行时属性 - Amazon Kinesis Data Analytics
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

Kinesis Data Analytics for Java Applications 中的运行时属性

您可以使用运行时属性配置应用程序,而无需重新编译应用程序代码。

在控制台中使用运行时属性

您可以使用控制台在 Kinesis Data Analytics for Java application 中添加、更新或删除运行时属性。

注意

在 Kinesis Data Analytics 控制台中创建应用程序时,您无法添加运行时属性。

更新Kinesis Data Analytics for Java application的运行时属性

  1. 打开 Kinesis Data Analytics 控制台 ( https://console.amazonaws.cn/kinesisanalytics)。

  2. 选择您的Kinesis Data Analytics for Java application。选择 Application details (应用程序详细信息)

  3. 在应用程序页面上,选择 Configure (配置)

  4. 展开 Properties (属性) 部分。

  5. 使用 Properties (属性) 部分中的控件,以键值对形式定义一个属性组。可以使用这些控件添加、更新或删除属性组和运行时属性。

  6. 选择 Update (更新)

在 CLI 中使用运行时属性

您可以使用 AWS CLI 添加、更新或删除运行时属性。

本节包含为应用程序配置运行时属性的 API 操作的示例请求。有关如何将 JSON 文件用于 API 操作输入的信息,请参阅 Kinesis Data Analytics API 示例代码

注意

将以下示例中的示例账户 ID (012345678901) 替换为您的账户 ID。

在创建应用程序时添加运行时属性

CreateApplication 操作的以下示例请求在创建应用程序时添加两个运行时属性组(ProducerConfigPropertiesConsumerConfigProperties):

{ "ApplicationName": "MyApplication", "ApplicationDescription": "my java test app", "RuntimeEnvironment": "FLINK-1_8", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "java-getting-started-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } }

在现有应用程序中添加和更新运行时属性

UpdateApplication 操作的以下示例请求为现有应用程序添加或更新运行时属性:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 2, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } }
注意

如果您使用的键在属性组中没有相应的运行时属性,则 Kinesis Data Analytics 将键值对添加为新属性。如果将一个键用于属性组中的现有运行时属性,Kinesis Data Analytics 将更新属性值。

删除运行时属性

UpdateApplication 操作的以下示例请求从现有应用程序中删除所有运行时属性和属性组:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 3, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": [] } } }
重要

如果省略现有的属性组或属性组中的现有属性键,则会删除该属性组或属性。

在Kinesis Data Analytics for Java application中访问运行时属性

您可以使用静态 KinesisAnalyticsRuntime.getApplicationProperties() 方法在 Java 应用程序代码中检索运行时属性,该方法返回一个 Map<String, Properties> 对象。

以下 Java 代码示例检索应用程序的运行时属性:

Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();

您按如下方式检索一个属性组(作为 Java.Util.Properties 对象):

Properties consumerProperties = applicationProperties.get("ConsumerConfigProperties");

通常,您传入 Properties 对象以配置 Flink 源或接收器,而无需检索各个属性。以下代码示例说明了如何传入从运行时属性中检索的 Properties 对象以创建 Flink 源:

private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<gt;(new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; }

有关使用运行时属性的完整代码示例,请参阅入门。在 Kinesis Data Analytics Java 示例 GitHub 存储库的入门中提供了入门应用程序的源代码。