适用于 Apache 的托管服务 Flink API 示例代码 - Managed Service for Apache Flink
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

适用于 Apache 的托管服务 Flink API 示例代码

本主题包含Managed Service for Apache Flink操作的示例请求块。

要使用 JSON 作为 Amazon Command Line Interface (Amazon CLI) 操作的输入,请将请求保存在 JSON 文件中。然后,使用 --cli-input-json 参数将文件名传递给该操作。

以下示例说明了如何将 JSON 文件与操作一起使用。

$ aws kinesisanalyticsv2 start-application --cli-input-json file://start.json

有关将 JSON 与一起使用的更多信息 Amazon CLI,请参阅Amazon Command Line Interface 用户指南中的生成 CLI 框架和 CLI 输入 JSON 参数

AddApplicationCloudWatchLoggingOption

以下AddApplicationCloudWatchLoggingOption操作的示例请求代码为适用于 Apache Flink 的托管服务应用程序添加了 Amazon CloudWatch 日志选项:

{ "ApplicationName": "MyApplication", "CloudWatchLoggingOption": { "LogStreamARN": "arn:aws:logs:us-east-1:123456789123:log-group:my-log-group:log-stream:My-LogStream" }, "CurrentApplicationVersionId": 2 }

AddApplicationInput

以下AddApplicationInput操作的示例请求代码将应用程序输入添加到 Apache Flink 托管服务应用程序中:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 2, "Input": { "InputParallelism": { "Count": 2 }, "InputSchema": { "RecordColumns": [ { "Mapping": "$.TICKER", "Name": "TICKER_SYMBOL", "SqlType": "VARCHAR(50)" }, { "SqlType": "REAL", "Name": "PRICE", "Mapping": "$.PRICE" } ], "RecordEncoding": "UTF-8", "RecordFormat": { "MappingParameters": { "JSONMappingParameters": { "RecordRowPath": "$" } }, "RecordFormatType": "JSON" } }, "KinesisStreamsInput": { "ResourceARN": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream" } } }

AddApplicationInputProcessingConfiguration

以下AddApplicationInputProcessingConfiguration操作的示例请求代码将应用程序输入处理配置添加到适用于 Apache Flink 的托管服务应用程序中:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 2, "InputId": "2.1", "InputProcessingConfiguration": { "InputLambdaProcessor": { "ResourceARN": "arn:aws:lambda:us-east-1:012345678901:function:MyLambdaFunction" } } }

AddApplicationOutput

以下AddApplicationOutput操作的示例请求代码将 Kinesis 数据流作为应用程序输出添加到 Apache Flink 托管服务应用程序中:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 2, "Output": { "DestinationSchema": { "RecordFormatType": "JSON" }, "KinesisStreamsOutput": { "ResourceARN": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream" }, "Name": "DESTINATION_SQL_STREAM" } }

AddApplicationReferenceDataSource

以下AddApplicationReferenceDataSource操作的示例请求代码将 CSV 应用程序参考数据源添加到适用于 Apache Flink 的托管服务应用程序中:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 5, "ReferenceDataSource": { "ReferenceSchema": { "RecordColumns": [ { "Mapping": "$.TICKER", "Name": "TICKER", "SqlType": "VARCHAR(4)" }, { "Mapping": "$.COMPANYNAME", "Name": "COMPANY_NAME", "SqlType": "VARCHAR(40)" }, ], "RecordEncoding": "UTF-8", "RecordFormat": { "MappingParameters": { "CSVMappingParameters": { "RecordColumnDelimiter": " ", "RecordRowDelimiter": "\r\n" } }, "RecordFormatType": "CSV" } }, "S3ReferenceDataSource": { "BucketARN": "arn:aws:s3:::MyS3Bucket", "FileKey": "TickerReference.csv" }, "TableName": "string" } }

AddApplicationVpcConfiguration

以下AddApplicationVpcConfiguration操作的示例请求代码将 VPC 配置添加到现有应用程序:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 9, "VpcConfiguration": { "SecurityGroupIds": [ "sg-0123456789abcdef0" ], "SubnetIds": [ "subnet-0123456789abcdef0" ] } }

CreateApplication

以下CreateApplication操作的示例请求代码为 Apache Flink 应用程序创建了一个托管服务:

{ "ApplicationName":"MyApplication", "ApplicationDescription":"My-Application-Description", "RuntimeEnvironment":"FLINK-1_15", "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole", "CloudWatchLoggingOptions":[ { "LogStreamARN":"arn:aws:logs:us-east-1:123456789123:log-group:my-log-group:log-stream:My-LogStream" } ], "ApplicationConfiguration": { "EnvironmentProperties": {"PropertyGroups": [ {"PropertyGroupId": "ConsumerConfigProperties", "PropertyMap": {"aws.region": "us-east-1", "flink.stream.initpos": "LATEST"} }, {"PropertyGroupId": "ProducerConfigProperties", "PropertyMap": {"aws.region": "us-east-1"} }, ] }, "ApplicationCodeConfiguration":{ "CodeContent":{ "S3ContentLocation":{ "BucketARN":"arn:aws:s3:::mybucket", "FileKey":"myflink.jar", "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345" } }, "CodeContentType":"ZIPFILE" }, "FlinkApplicationConfiguration":{ "ParallelismConfiguration":{ "ConfigurationType":"CUSTOM", "Parallelism":2, "ParallelismPerKPU":1, "AutoScalingEnabled":true } } } }

CreateApplicationSnapshot

以下CreateApplicationSnapshot操作的示例请求代码创建了应用程序状态的快照:

{ "ApplicationName": "MyApplication", "SnapshotName": "MySnapshot" }

DeleteApplication

以下DeleteApplication操作请求代码示例删除了 Apache Flink 应用程序的托管服务:

{"ApplicationName": "MyApplication", "CreateTimestamp": 12345678912}

DeleteApplicationCloudWatchLoggingOption

以下DeleteApplicationCloudWatchLoggingOption操作的示例请求代码从适用于 Apache Flink 的托管服务应用程序中删除亚马逊 CloudWatch日志选项:

{ "ApplicationName": "MyApplication", "CloudWatchLoggingOptionId": "3.1" "CurrentApplicationVersionId": 3 }

DeleteApplicationInputProcessingConfiguration

以下DeleteApplicationInputProcessingConfiguration操作的示例请求代码从适用于 Apache Flink 的托管服务应用程序中删除了输入处理配置:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 4, "InputId": "2.1" }

DeleteApplicationOutput

以下DeleteApplicationOutput操作的示例请求代码从适用于 Apache Flink 的托管服务应用程序中删除应用程序输出:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 4, "OutputId": "4.1" }

DeleteApplicationReferenceDataSource

以下DeleteApplicationReferenceDataSource操作的示例请求代码从适用于 Apache Flink 的托管服务应用程序中移除应用程序参考数据源:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 5, "ReferenceId": "5.1" }

DeleteApplicationSnapshot

以下DeleteApplicationSnapshot操作的示例请求代码删除了应用程序状态的快照:

{ "ApplicationName": "MyApplication", "SnapshotCreationTimestamp": 12345678912, "SnapshotName": "MySnapshot" }

DeleteApplicationVpcConfiguration

以下DeleteApplicationVpcConfiguration操作的示例请求代码将从应用程序中移除现有 VPC 配置:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 9, "VpcConfigurationId": "1.1" }

DescribeApplication

以下DescribeApplication操作的示例请求代码返回有关适用于 Apache Flink 的托管服务应用程序的详细信息:

{"ApplicationName": "MyApplication"}

DescribeApplicationSnapshot

以下DescribeApplicationSnapshot操作的示例请求代码返回有关应用程序状态快照的详细信息:

{ "ApplicationName": "MyApplication", "SnapshotName": "MySnapshot" }

DiscoverInputSchema

以下DiscoverInputSchema操作的示例请求代码从流媒体源生成架构:

{ "InputProcessingConfiguration": { "InputLambdaProcessor": { "ResourceARN": "arn:aws:lambda:us-east-1:012345678901:function:MyLambdaFunction" } }, "InputStartingPositionConfiguration": { "InputStartingPosition": "NOW" }, "ResourceARN": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream", "S3Configuration": { "BucketARN": "string", "FileKey": "string" }, "ServiceExecutionRole": "string" }

以下DiscoverInputSchema操作的示例请求代码从参考来源生成架构:

{ "S3Configuration": { "BucketARN": "arn:aws:s3:::mybucket", "FileKey": "TickerReference.csv" }, "ServiceExecutionRole": "arn:aws:iam::123456789123:role/myrole" }

ListApplications

以下ListApplications操作的示例请求代码会返回您账户中适用于 Apache Flink 应用程序的托管服务列表:

{ "ExclusiveStartApplicationName": "MyApplication", "Limit": 50 }

ListApplicationSnapshots

以下ListApplicationSnapshots操作的示例请求代码返回了应用程序状态快照的列表:

{"ApplicationName": "MyApplication", "Limit": 50, "NextToken": "aBcDeFgHiJkLmNoPqRsTuVwXyZ0123" }

StartApplication

以下StartApplication操作请求代码示例启动适用于 Apache Flink 的托管服务应用程序,并从最新的快照(如果有)加载应用程序状态:

{ "ApplicationName": "MyApplication", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }

StopApplication

以下 API_ StopApplication 操作的示例请求代码用于停止 Apache Flink 应用程序的托管服务:

{"ApplicationName": "MyApplication"}

UpdateApplication

以下UpdateApplication操作的示例请求代码更新了适用于 Apache Flink 的托管服务应用程序,以更改应用程序代码的位置:

{"ApplicationName": "MyApplication", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "ApplicationCodeConfigurationUpdate": { "CodeContentTypeUpdate": "ZIPFILE", "CodeContentUpdate": { "S3ContentLocationUpdate": { "BucketARNUpdate": "arn:aws:s3:::my_new_bucket", "FileKeyUpdate": "my_new_code.zip", "ObjectVersionUpdate": "2" } } } }