Managed Service for Apache Flink API 示例代码 - Managed Service for Apache Flink
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Managed Service for Apache Flink API 示例代码

本主题包含Managed Service for Apache Flink操作的示例请求块。

要在 Amazon Command Line Interface (Amazon CLI) 中将 JSON 作为一个操作的输入,请将请求保存在 JSON 文件中。然后,使用 --cli-input-json 参数将文件名传递给该操作。

以下示例说明了如何将 JSON 文件与操作一起使用。

$ aws kinesisanalyticsv2 start-application --cli-input-json file://start.json

有关将 JSON 与Amazon CLI一起使用的更多信息,请参阅Amazon Command Line Interface《用户指南》中的生成 CLI 框架和 CLI 输入 JSON 参数

AddApplicationCloudWatchLoggingOption

AddApplicationCloudWatchLoggingOption 操作的以下示例请求代码将 Amazon CloudWatch 日志记录选项添加到Managed Service for Apache Flink的应用程序:

{ "ApplicationName": "MyApplication", "CloudWatchLoggingOption": { "LogStreamARN": "arn:aws:logs:us-east-1:123456789123:log-group:my-log-group:log-stream:My-LogStream" }, "CurrentApplicationVersionId": 2 }

AddApplicationInput

以下是 AddApplicationInput 操作请求代码示例,将应用程序输入添加到Managed Service for Apache Flink的应用程序:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 2, "Input": { "InputParallelism": { "Count": 2 }, "InputSchema": { "RecordColumns": [ { "Mapping": "$.TICKER", "Name": "TICKER_SYMBOL", "SqlType": "VARCHAR(50)" }, { "SqlType": "REAL", "Name": "PRICE", "Mapping": "$.PRICE" } ], "RecordEncoding": "UTF-8", "RecordFormat": { "MappingParameters": { "JSONMappingParameters": { "RecordRowPath": "$" } }, "RecordFormatType": "JSON" } }, "KinesisStreamsInput": { "ResourceARN": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream" } } }

AddApplicationInputProcessingConfiguration

以下是 AddApplicationInputProcessingConfiguration 操作的请求代码示例,用于向Managed Service for Apache Flink的应用程序添加应用程序输入处理配置:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 2, "InputId": "2.1", "InputProcessingConfiguration": { "InputLambdaProcessor": { "ResourceARN": "arn:aws:lambda:us-east-1:012345678901:function:MyLambdaFunction" } } }

AddApplicationOutput

以下是 AddApplicationOutput 操作请求代码示例,将 Kinesis 数据流作为应用程序输出添加到Managed Service for Apache Flink的应用程序:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 2, "Output": { "DestinationSchema": { "RecordFormatType": "JSON" }, "KinesisStreamsOutput": { "ResourceARN": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream" }, "Name": "DESTINATION_SQL_STREAM" } }

AddApplicationReferenceDataSource

以下是 AddApplicationReferenceDataSource 操作请求代码示例,将 CSV 应用程序参考数源据添加到Managed Service for Apache Flink的应用程序中:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 5, "ReferenceDataSource": { "ReferenceSchema": { "RecordColumns": [ { "Mapping": "$.TICKER", "Name": "TICKER", "SqlType": "VARCHAR(4)" }, { "Mapping": "$.COMPANYNAME", "Name": "COMPANY_NAME", "SqlType": "VARCHAR(40)" }, ], "RecordEncoding": "UTF-8", "RecordFormat": { "MappingParameters": { "CSVMappingParameters": { "RecordColumnDelimiter": " ", "RecordRowDelimiter": "\r\n" } }, "RecordFormatType": "CSV" } }, "S3ReferenceDataSource": { "BucketARN": "arn:aws:s3:::MyS3Bucket", "FileKey": "TickerReference.csv" }, "TableName": "string" } }

AddApplicationVpcConfiguration

AddApplicationVpcConfiguration 操作的以下示例请求代码将 VPC 配置添加到现有应用程序中:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 9, "VpcConfiguration": { "SecurityGroupIds": [ "sg-0123456789abcdef0" ], "SubnetIds": [ "subnet-0123456789abcdef0" ] } }

创建应用程序

以下是 创建应用程序 操作的示例请求代码,用于创建Managed Service for Apache Flink的应用程序:

{ "ApplicationName":"MyApplication", "ApplicationDescription":"My-Application-Description", "RuntimeEnvironment":"FLINK-1_15", "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole", "CloudWatchLoggingOptions":[ { "LogStreamARN":"arn:aws:logs:us-east-1:123456789123:log-group:my-log-group:log-stream:My-LogStream" } ], "ApplicationConfiguration": { "EnvironmentProperties": {"PropertyGroups": [ {"PropertyGroupId": "ConsumerConfigProperties", "PropertyMap": {"aws.region": "us-east-1", "flink.stream.initpos": "LATEST"} }, {"PropertyGroupId": "ProducerConfigProperties", "PropertyMap": {"aws.region": "us-east-1"} }, ] }, "ApplicationCodeConfiguration":{ "CodeContent":{ "S3ContentLocation":{ "BucketARN":"arn:aws:s3:::mybucket", "FileKey":"myflink.jar", "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345" } }, "CodeContentType":"ZIPFILE" }, "FlinkApplicationConfiguration":{ "ParallelismConfiguration":{ "ConfigurationType":"CUSTOM", "Parallelism":2, "ParallelismPerKPU":1, "AutoScalingEnabled":true } } } }

创建应用程序 Snapshot

创建应用程序 Snapshot 操作的以下示例请求代码创建应用程序状态快照:

{ "ApplicationName": "MyApplication", "SnapshotName": "MySnapshot" }

DeleteApplication

以下是 DeleteApplication 操作的示例请求代码,用于删除Managed Service for Apache Flink的应用程序:

{"ApplicationName": "MyApplication", "CreateTimestamp": 12345678912}

DeleteApplicationCloudWatchLoggingOption

以下是 DeleteApplicationCloudWatchLoggingOption 操作的示例请求代码,用于从Managed Service for Apache Flink的应用程序中删除 Amazon CloudWatch 日志记录选项:

{ "ApplicationName": "MyApplication", "CloudWatchLoggingOptionId": "3.1" "CurrentApplicationVersionId": 3 }

DeleteApplicationInputProcessingConfiguration

以下是 DeleteApplicationInputProcessingConfiguration 操作的请求代码示例,用于从Managed Service for Apache Flink的应用程序中删除输入处理配置:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 4, "InputId": "2.1" }

DeleteApplicationOutput

以下是 DeleteApplicationOutput 操作的请求代码示例,用于从Managed Service for Apache Flink的应用程序中删除应用程序输出:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 4, "OutputId": "4.1" }

DeleteApplicationReferenceDataSource

以下是 DeleteApplicationReferenceDataSource 操作的请求代码示例,用于从Managed Service for Apache Flink的应用程序中删除应用程序参考数据源:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 5, "ReferenceId": "5.1" }

DeleteApplicationSnapshot

DeleteApplicationSnapshot 操作的以下示例请求代码删除应用程序状态快照:

{ "ApplicationName": "MyApplication", "SnapshotCreationTimestamp": 12345678912, "SnapshotName": "MySnapshot" }

DeleteApplicationVpcConfiguration

DeleteApplicationVpcConfiguration 操作的以下示例请求代码从应用程序中删除现有的 VPC 配置:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 9, "VpcConfigurationId": "1.1" }

DescribeApplication

以下是 DescribeApplication 操作的请求代码示例,用于返回Managed Service for Apache Flink的应用程序的详细信息:

{"ApplicationName": "MyApplication"}

DescribeApplicationSnapshot

DescribeApplicationSnapshot 操作的以下示例请求代码返回有关应用程序状态快照的详细信息:

{ "ApplicationName": "MyApplication", "SnapshotName": "MySnapshot" }

DiscoverInputSchema

DiscoverInputSchema 操作的以下示例请求代码从流式传输源中生成架构:

{ "InputProcessingConfiguration": { "InputLambdaProcessor": { "ResourceARN": "arn:aws:lambda:us-east-1:012345678901:function:MyLambdaFunction" } }, "InputStartingPositionConfiguration": { "InputStartingPosition": "NOW" }, "ResourceARN": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream", "S3Configuration": { "BucketARN": "string", "FileKey": "string" }, "ServiceExecutionRole": "string" }

DiscoverInputSchema 操作的以下示例请求代码从引用源中生成架构:

{ "S3Configuration": { "BucketARN": "arn:aws:s3:::mybucket", "FileKey": "TickerReference.csv" }, "ServiceExecutionRole": "arn:aws:iam::123456789123:role/myrole" }

ListApplications

以下是 ListApplications 操作的请求代码示例,用于返回您账户中Managed Service for Apache Flink 的应用程序列表:

{ "ExclusiveStartApplicationName": "MyApplication", "Limit": 50 }

ListApplicationSnapshots

ListApplicationSnapshots 操作的以下示例请求代码返回应用程序状态快照列表:

{"ApplicationName": "MyApplication", "Limit": 50, "NextToken": "aBcDeFgHiJkLmNoPqRsTuVwXyZ0123" }

StartApplication

以下是 开始申请 操作的请求代码示例,用于启动Managed Service for Apache Flink的应用程序,并从最新快照(如有)中加载应用程序状态:

{ "ApplicationName": "MyApplication", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }

StopApplication

以下是 API_停止应用程序 操作的示例请求代码,用于停止Managed Service for Apache Flink的应用程序:

{"ApplicationName": "MyApplication"}

UpdateApplication

以下是 更新应用程序 操作的请求代码示例,用于更新Managed Service for Apache Flink的应用程序,以更改应用程序代码的位置:

{"ApplicationName": "MyApplication", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "ApplicationCodeConfigurationUpdate": { "CodeContentTypeUpdate": "ZIPFILE", "CodeContentUpdate": { "S3ContentLocationUpdate": { "BucketARNUpdate": "arn:aws:s3:::my_new_bucket", "FileKeyUpdate": "my_new_code.zip", "ObjectVersionUpdate": "2" } } } }