本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
Apache Flink 的 Kinesis Data Analytics 中的运行时属性
您可以使用运行时属性配置应用程序,而无需重新编译应用程序代码。
在控制台中使用运行时属性
您可以使用控制台在 Kinesis Data Analytics 应用程序中添加、更新或删除运行时属性。
在 Kinesis Data Analytics 控制台中创建应用程序时,您无法添加运行时属性。
更新 Kinesis Data Analytics 应用程序的运行时属性
打开 Kinesis Data Analytics 控制台https://console.aws.amazon.com/kinesisanalytics
. 选择 Kinesis Data Analytics 应用程序。选择 Application details (应用程序详细信息)。
在应用程序页面上,选择 Configure (配置)。
展开 Properties (属性) 部分。
使用 Properties (属性) 部分中的控件,以键值对形式定义一个属性组。可以使用这些控件添加、更新或删除属性组和运行时属性。
选择 Update(更新)。
在 CLI 中使用运行时属性
您可以使用 Amazon CLI 添加、更新或删除运行时属性。
本节包含为应用程序配置运行时属性的 API 操作的示例请求。有关如何将 JSON 文件用于 API 操作输入的信息,请参阅 Kinesis Data Analytics API 示例代码。
将以下示例中的示例账户 ID (
) 替换为您的账户 ID。012345678901
在创建应用程序时添加运行时属性
CreateApplication
操作的以下示例请求在创建应用程序时添加两个运行时属性组(ProducerConfigProperties
和 ConsumerConfigProperties
):
{ "ApplicationName": "MyApplication", "ApplicationDescription": "my java test app", "RuntimeEnvironment": "FLINK-1_13", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "java-getting-started-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } }
在现有应用程序中添加和更新运行时属性
UpdateApplication
操作的以下示例请求为现有应用程序添加或更新运行时属性:
{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 2, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } }
如果您使用的键在属性组中没有相应的运行时属性,Kinesis Data Analytics 将键值对添加为新属性。如果将键用于属性组中的现有运行时属性,Kinesis Data Analytics 将更新属性值。
删除运行时属性
UpdateApplication
操作的以下示例请求从现有应用程序中删除所有运行时属性和属性组:
{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 3, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": [] } } }
如果省略现有的属性组或属性组中的现有属性键,则会删除该属性组或属性。
在 Kinesis Data Analytics 应用程序中访问运行时属性
您可以使用静态 KinesisAnalyticsRuntime.getApplicationProperties()
方法在 Java 应用程序代码中检索运行时属性,该方法返回一个 Map<String, Properties>
对象。
以下 Java 代码示例检索应用程序的运行时属性:
Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
您按如下方式检索一个属性组(作为 Java.Util.Properties
对象):
Properties consumerProperties = applicationProperties.get("ConsumerConfigProperties");
您通常,您传入Properties
对象,而无需检索各个属性。以下代码示例说明了如何传入从运行时属性中检索的 Properties
对象以创建 Flink 源:
private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<String>(new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; }
有关使用运行时属性的完整代码示例,请参阅入门 (DataStreamAPI)。入门应用程序的源代码可在以下位置找到开始使用