Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
Managed Service for Apache Flink API 示例代码
本主题包含Managed Service for Apache Flink操作的示例请求块。
要在 Amazon Command Line Interface (Amazon CLI) 中将 JSON 作为一个操作的输入,请将请求保存在 JSON 文件中。然后,使用 --cli-input-json
参数将文件名传递给该操作。
以下示例说明了如何将 JSON 文件与操作一起使用。
$ aws kinesisanalyticsv2 start-application --cli-input-json file://start.json
有关将 JSON 与Amazon CLI一起使用的更多信息,请参阅Amazon Command Line Interface《用户指南》中的生成 CLI 框架和 CLI 输入 JSON 参数。
主题
- AddApplicationCloudWatchLoggingOption
- AddApplicationInput
- AddApplicationInputProcessingConfiguration
- AddApplicationOutput
- AddApplicationReferenceDataSource
- AddApplicationVpcConfiguration
- 创建应用程序
- 创建应用程序 Snapshot
- DeleteApplication
- DeleteApplicationCloudWatchLoggingOption
- DeleteApplicationInputProcessingConfiguration
- DeleteApplicationOutput
- DeleteApplicationReferenceDataSource
- DeleteApplicationSnapshot
- DeleteApplicationVpcConfiguration
- DescribeApplication
- DescribeApplicationSnapshot
- DiscoverInputSchema
- ListApplications
- ListApplicationSnapshots
- StartApplication
- StopApplication
- UpdateApplication
AddApplicationCloudWatchLoggingOption
AddApplicationCloudWatchLoggingOption 操作的以下示例请求代码将 Amazon CloudWatch 日志记录选项添加到Managed Service for Apache Flink的应用程序:
{ "ApplicationName": "MyApplication", "CloudWatchLoggingOption": { "LogStreamARN": "arn:aws:logs:us-east-1:123456789123:log-group:my-log-group:log-stream:My-LogStream" }, "CurrentApplicationVersionId": 2 }
AddApplicationInput
以下是 AddApplicationInput 操作请求代码示例,将应用程序输入添加到Managed Service for Apache Flink的应用程序:
{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 2, "Input": { "InputParallelism": { "Count": 2 }, "InputSchema": { "RecordColumns": [ { "Mapping": "$.TICKER", "Name": "TICKER_SYMBOL", "SqlType": "VARCHAR(50)" }, { "SqlType": "REAL", "Name": "PRICE", "Mapping": "$.PRICE" } ], "RecordEncoding": "UTF-8", "RecordFormat": { "MappingParameters": { "JSONMappingParameters": { "RecordRowPath": "$" } }, "RecordFormatType": "JSON" } }, "KinesisStreamsInput": { "ResourceARN": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream" } } }
AddApplicationInputProcessingConfiguration
以下是 AddApplicationInputProcessingConfiguration 操作的请求代码示例,用于向Managed Service for Apache Flink的应用程序添加应用程序输入处理配置:
{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 2, "InputId": "2.1", "InputProcessingConfiguration": { "InputLambdaProcessor": { "ResourceARN": "arn:aws:lambda:us-east-1:012345678901:function:MyLambdaFunction" } } }
AddApplicationOutput
以下是 AddApplicationOutput 操作请求代码示例,将 Kinesis 数据流作为应用程序输出添加到Managed Service for Apache Flink的应用程序:
{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 2, "Output": { "DestinationSchema": { "RecordFormatType": "JSON" }, "KinesisStreamsOutput": { "ResourceARN": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream" }, "Name": "DESTINATION_SQL_STREAM" } }
AddApplicationReferenceDataSource
以下是 AddApplicationReferenceDataSource 操作请求代码示例,将 CSV 应用程序参考数源据添加到Managed Service for Apache Flink的应用程序中:
{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 5, "ReferenceDataSource": { "ReferenceSchema": { "RecordColumns": [ { "Mapping": "$.TICKER", "Name": "TICKER", "SqlType": "VARCHAR(4)" }, { "Mapping": "$.COMPANYNAME", "Name": "COMPANY_NAME", "SqlType": "VARCHAR(40)" }, ], "RecordEncoding": "UTF-8", "RecordFormat": { "MappingParameters": { "CSVMappingParameters": { "RecordColumnDelimiter": " ", "RecordRowDelimiter": "\r\n" } }, "RecordFormatType": "CSV" } }, "S3ReferenceDataSource": { "BucketARN": "arn:aws:s3:::MyS3Bucket", "FileKey": "TickerReference.csv" }, "TableName": "string" } }
AddApplicationVpcConfiguration
AddApplicationVpcConfiguration 操作的以下示例请求代码将 VPC 配置添加到现有应用程序中:
{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 9, "VpcConfiguration": { "SecurityGroupIds": [ "sg-0123456789abcdef0" ], "SubnetIds": [ "subnet-0123456789abcdef0" ] } }
创建应用程序
以下是 创建应用程序 操作的示例请求代码,用于创建Managed Service for Apache Flink的应用程序:
{ "ApplicationName":"MyApplication", "ApplicationDescription":"My-Application-Description", "RuntimeEnvironment":"FLINK-1_15", "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole", "CloudWatchLoggingOptions":[ { "LogStreamARN":"arn:aws:logs:us-east-1:123456789123:log-group:my-log-group:log-stream:My-LogStream" } ], "ApplicationConfiguration": { "EnvironmentProperties": {"PropertyGroups": [ {"PropertyGroupId": "ConsumerConfigProperties", "PropertyMap": {"aws.region": "us-east-1", "flink.stream.initpos": "LATEST"} }, {"PropertyGroupId": "ProducerConfigProperties", "PropertyMap": {"aws.region": "us-east-1"} }, ] }, "ApplicationCodeConfiguration":{ "CodeContent":{ "S3ContentLocation":{ "BucketARN":"arn:aws:s3:::mybucket", "FileKey":"myflink.jar", "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345" } }, "CodeContentType":"ZIPFILE" }, "FlinkApplicationConfiguration":{ "ParallelismConfiguration":{ "ConfigurationType":"CUSTOM", "Parallelism":2, "ParallelismPerKPU":1, "AutoScalingEnabled":true } } } }
创建应用程序 Snapshot
创建应用程序 Snapshot 操作的以下示例请求代码创建应用程序状态快照:
{ "ApplicationName": "MyApplication", "SnapshotName": "MySnapshot" }
DeleteApplication
以下是 DeleteApplication 操作的示例请求代码,用于删除Managed Service for Apache Flink的应用程序:
{"ApplicationName": "MyApplication", "CreateTimestamp": 12345678912}
DeleteApplicationCloudWatchLoggingOption
以下是 DeleteApplicationCloudWatchLoggingOption 操作的示例请求代码,用于从Managed Service for Apache Flink的应用程序中删除 Amazon CloudWatch 日志记录选项:
{ "ApplicationName": "MyApplication", "CloudWatchLoggingOptionId": "3.1" "CurrentApplicationVersionId": 3 }
DeleteApplicationInputProcessingConfiguration
以下是 DeleteApplicationInputProcessingConfiguration 操作的请求代码示例,用于从Managed Service for Apache Flink的应用程序中删除输入处理配置:
{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 4, "InputId": "2.1" }
DeleteApplicationOutput
以下是 DeleteApplicationOutput 操作的请求代码示例,用于从Managed Service for Apache Flink的应用程序中删除应用程序输出:
{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 4, "OutputId": "4.1" }
DeleteApplicationReferenceDataSource
以下是 DeleteApplicationReferenceDataSource 操作的请求代码示例,用于从Managed Service for Apache Flink的应用程序中删除应用程序参考数据源:
{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 5, "ReferenceId": "5.1" }
DeleteApplicationSnapshot
DeleteApplicationSnapshot 操作的以下示例请求代码删除应用程序状态快照:
{ "ApplicationName": "MyApplication", "SnapshotCreationTimestamp": 12345678912, "SnapshotName": "MySnapshot" }
DeleteApplicationVpcConfiguration
DeleteApplicationVpcConfiguration 操作的以下示例请求代码从应用程序中删除现有的 VPC 配置:
{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 9, "VpcConfigurationId": "1.1" }
DescribeApplication
以下是 DescribeApplication 操作的请求代码示例,用于返回Managed Service for Apache Flink的应用程序的详细信息:
{"ApplicationName": "MyApplication"}
DescribeApplicationSnapshot
DescribeApplicationSnapshot 操作的以下示例请求代码返回有关应用程序状态快照的详细信息:
{ "ApplicationName": "MyApplication", "SnapshotName": "MySnapshot" }
DiscoverInputSchema
DiscoverInputSchema 操作的以下示例请求代码从流式传输源中生成架构:
{ "InputProcessingConfiguration": { "InputLambdaProcessor": { "ResourceARN": "arn:aws:lambda:us-east-1:012345678901:function:MyLambdaFunction" } }, "InputStartingPositionConfiguration": { "InputStartingPosition": "NOW" }, "ResourceARN": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream", "S3Configuration": { "BucketARN": "string", "FileKey": "string" }, "ServiceExecutionRole": "string" }
DiscoverInputSchema 操作的以下示例请求代码从引用源中生成架构:
{ "S3Configuration": { "BucketARN": "arn:aws:s3:::mybucket", "FileKey": "TickerReference.csv" }, "ServiceExecutionRole": "arn:aws:iam::123456789123:role/myrole" }
ListApplications
以下是 ListApplications 操作的请求代码示例,用于返回您账户中Managed Service for Apache Flink 的应用程序列表:
{ "ExclusiveStartApplicationName": "MyApplication", "Limit": 50 }
ListApplicationSnapshots
ListApplicationSnapshots 操作的以下示例请求代码返回应用程序状态快照列表:
{"ApplicationName": "MyApplication", "Limit": 50, "NextToken": "aBcDeFgHiJkLmNoPqRsTuVwXyZ0123" }
StartApplication
以下是 开始申请 操作的请求代码示例,用于启动Managed Service for Apache Flink的应用程序,并从最新快照(如有)中加载应用程序状态:
{ "ApplicationName": "MyApplication", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
StopApplication
以下是 API_停止应用程序 操作的示例请求代码,用于停止Managed Service for Apache Flink的应用程序:
{"ApplicationName": "MyApplication"}
UpdateApplication
以下是 更新应用程序 操作的请求代码示例,用于更新Managed Service for Apache Flink的应用程序,以更改应用程序代码的位置:
{"ApplicationName": "MyApplication", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "ApplicationCodeConfigurationUpdate": { "CodeContentTypeUpdate": "ZIPFILE", "CodeContentUpdate": { "S3ContentLocationUpdate": { "BucketARNUpdate": "
arn:aws:s3:::my_new_bucket
", "FileKeyUpdate": "my_new_code.zip
", "ObjectVersionUpdate": "2" } } } }