Kinesis Data Analytics API 示例代码 - Amazon Kinesis Data Analytics
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

Kinesis Data Analytics API 示例代码

本主题包含 Kinesis Data Analytics 操作的示例请求块。

要在 AWS Command Line Interface (AWS CLI) 中将 JSON 作为一个操作的输入,请将请求保存在 JSON 文件中。然后,使用 --cli-input-json 参数将文件名传递给该操作。

以下示例说明了如何将 JSON 文件与操作一起使用。

$ aws kinesisanalyticsv2 start-application --cli-input-json file://start.json

有关将 JSON 与 AWS CLI 一起使用的更多信息,请参阅 AWS Command Line Interface 用户指南 中的生成 CLI 框架和 CLI 输入 JSON 参数

AddApplicationCloudWatchLoggingOption

AddApplicationCloudWatchLoggingOption 操作的以下示例请求代码将 Amazon CloudWatch 日志记录选项添加到 Kinesis Data Analytics 应用程序中:

{ "ApplicationName": "MyApplication", "CloudWatchLoggingOption": { "LogStreamARN": "arn:aws:logs:us-east-1:123456789123:log-group:my-log-group:log-stream:My-LogStream" }, "CurrentApplicationVersionId": 2 }

AddApplicationInput

AddApplicationInput 操作的以下示例请求代码将应用程序输入添加到 Kinesis Data Analytics 应用程序中:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 2, "Input": { "InputParallelism": { "Count": 2 }, "InputSchema": { "RecordColumns": [ { "Mapping": "$.TICKER", "Name": "TICKER_SYMBOL", "SqlType": "VARCHAR(50)" }, { "SqlType": "REAL", "Name": "PRICE", "Mapping": "$.PRICE" } ], "RecordEncoding": "UTF-8", "RecordFormat": { "MappingParameters": { "JSONMappingParameters": { "RecordRowPath": "$" } }, "RecordFormatType": "JSON" } }, "KinesisStreamsInput": { "ResourceARN": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream" } } }

AddApplicationInputProcessingConfiguration

AddApplicationInputProcessingConfiguration 操作的以下示例请求代码将应用程序输入处理配置添加到 Kinesis Data Analytics 应用程序中:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 2, "InputId": "2.1", "InputProcessingConfiguration": { "InputLambdaProcessor": { "ResourceARN": "arn:aws:lambda:us-east-1:012345678901:function:MyLambdaFunction" } } }

AddApplicationOutput

AddApplicationOutput 操作的以下示例请求代码将 Kinesis 数据流作为应用程序输出添加到 Kinesis Data Analytics 应用程序中:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 2, "Output": { "DestinationSchema": { "RecordFormatType": "JSON" }, "KinesisStreamsOutput": { "ResourceARN": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream" }, "Name": "DESTINATION_SQL_STREAM" } }

AddApplicationReferenceDataSource

AddApplicationReferenceDataSource 操作的以下示例请求代码将 CSV 应用程序引用数据源添加到 Kinesis Data Analytics 应用程序中:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 5, "ReferenceDataSource": { "ReferenceSchema": { "RecordColumns": [ { "Mapping": "$.TICKER", "Name": "TICKER", "SqlType": "VARCHAR(4)" }, { "Mapping": "$.COMPANYNAME", "Name": "COMPANY_NAME", "SqlType": "VARCHAR(40)" }, ], "RecordEncoding": "UTF-8", "RecordFormat": { "MappingParameters": { "CSVMappingParameters": { "RecordColumnDelimiter": " ", "RecordRowDelimiter": "\r\n" } }, "RecordFormatType": "CSV" } }, "S3ReferenceDataSource": { "BucketARN": "arn:aws:s3:::MyS3Bucket", "FileKey": "TickerReference.csv" }, "TableName": "string" } }

AddApplicationVpcConfiguration

AddApplicationVpcConfiguration 操作的以下示例请求代码将 VPC 配置添加到现有应用程序中:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 9, "VpcConfiguration": { "SecurityGroupIds": [ "sg-0123456789abcdef0" ], "SubnetIds": [ "subnet-0123456789abcdef0" ] } }

CreateApplication

CreateApplication 操作的以下示例请求代码创建 Kinesis Data Analytics 应用程序:

{ "ApplicationName":"MyApplication", "ApplicationDescription":"My-Application-Description", "RuntimeEnvironment":"FLINK-1_8", "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole", "CloudWatchLoggingOptions":[ { "LogStreamARN":"arn:aws:logs:us-east-1:123456789123:log-group:my-log-group:log-stream:My-LogStream" } ], "ApplicationConfiguration": { "EnvironmentProperties": {"PropertyGroups": [ {"PropertyGroupId": "ConsumerConfigProperties", "PropertyMap": {"aws.region": "us-east-1", "flink.stream.initpos": "LATEST"} }, {"PropertyGroupId": "ProducerConfigProperties", "PropertyMap": {"aws.region": "us-east-1"} }, ] }, "ApplicationCodeConfiguration":{ "CodeContent":{ "S3ContentLocation":{ "BucketARN":"arn:aws:s3:::mybucket", "FileKey":"myflink.jar", "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345" } }, "CodeContentType":"ZIPFILE" }, "FlinkApplicationConfiguration":{ "ParallelismConfiguration":{ "ConfigurationType":"CUSTOM", "Parallelism":2, "ParallelismPerKPU":1, "AutoScalingEnabled":true } } } }

CreateApplicationSnapshot

CreateApplicationSnapshot 操作的以下示例请求代码创建应用程序状态快照:

{ "ApplicationName": "MyApplication", "SnapshotName": "MySnapshot" }

DeleteApplication

DeleteApplication 操作的以下示例请求代码删除 Kinesis Data Analytics 应用程序:

{"ApplicationName": "MyApplication", "CreateTimestamp": 12345678912}

DeleteApplicationCloudWatchLoggingOption

DeleteApplicationCloudWatchLoggingOption 操作的以下示例请求代码从 Kinesis Data Analytics 应用程序中删除 Amazon CloudWatch 日志记录选项:

{ "ApplicationName": "MyApplication", "CloudWatchLoggingOptionId": "3.1" "CurrentApplicationVersionId": 3 }

DeleteApplicationInputProcessingConfiguration

DeleteApplicationInputProcessingConfiguration 操作的以下示例请求代码从 Kinesis Data Analytics 应用程序中删除输入处理配置:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 4, "InputId": "2.1" }

DeleteApplicationOutput

DeleteApplicationOutput 操作的以下示例请求代码从 Kinesis Data Analytics 应用程序中删除应用程序输出:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 4, "OutputId": "4.1" }

DeleteApplicationReferenceDataSource

DeleteApplicationReferenceDataSource 操作的以下示例请求代码从 Kinesis Data Analytics 应用程序中删除应用程序引用数据源:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 5, "ReferenceId": "5.1" }

DeleteApplicationSnapshot

DeleteApplicationSnapshot 操作的以下示例请求代码删除应用程序状态快照:

{ "ApplicationName": "MyApplication", "SnapshotCreationTimestamp": 12345678912, "SnapshotName": "MySnapshot" }

DeleteApplicationVpcConfiguration

DeleteApplicationVpcConfiguration 操作的以下示例请求代码从应用程序中删除现有的 VPC 配置:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 9, "VpcConfigurationId": "1.1" }

DescribeApplication

DescribeApplication 操作的以下示例请求代码返回有关 Kinesis Data Analytics 应用程序的详细信息:

{"ApplicationName": "MyApplication"}

DescribeApplicationSnapshot

DescribeApplicationSnapshot 操作的以下示例请求代码返回有关应用程序状态快照的详细信息:

{ "ApplicationName": "MyApplication", "SnapshotName": "MySnapshot" }

DiscoverInputSchema

DiscoverInputSchema 操作的以下示例请求代码从流式传输源中生成架构:

{ "InputProcessingConfiguration": { "InputLambdaProcessor": { "ResourceARN": "arn:aws:lambda:us-east-1:012345678901:function:MyLambdaFunction" } }, "InputStartingPositionConfiguration": { "InputStartingPosition": "NOW" }, "ResourceARN": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream", "S3Configuration": { "BucketARN": "string", "FileKey": "string" }, "ServiceExecutionRole": "string" }

DiscoverInputSchema 操作的以下示例请求代码从引用源中生成架构:

{ "S3Configuration": { "BucketARN": "arn:aws:s3:::mybucket", "FileKey": "TickerReference.csv" }, "ServiceExecutionRole": "arn:aws:iam::123456789123:role/myrole" }

ListApplications

ListApplications 操作的以下示例请求代码返回您的账户中的 Kinesis Data Analytics 应用程序列表:

{ "ExclusiveStartApplicationName": "MyApplication", "Limit": 50 }

ListApplicationSnapshots

ListApplicationSnapshots 操作的以下示例请求代码返回应用程序状态快照列表:

{"ApplicationName": "MyApplication", "Limit": 50, "NextToken": "aBcDeFgHiJkLmNoPqRsTuVwXyZ0123" }

StartApplication

StartApplication 操作的以下示例请求代码启动 Kinesis Data Analytics 应用程序,并从最新快照(如有)中加载应用程序状态:

{ "ApplicationName": "MyApplication", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }

StopApplication

API_StopApplication 操作的以下示例请求代码停止 Kinesis Data Analytics 应用程序:

{"ApplicationName": "MyApplication"}

UpdateApplication

UpdateApplication 操作的以下示例请求代码更新 Kinesis Data Analytics 应用程序以更改应用程序代码的位置:

{"ApplicationName": "MyApplication", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "ApplicationCodeConfigurationUpdate": { "CodeContentTypeUpdate": "ZIPFILE", "CodeContentUpdate": { "S3ContentLocationUpdate": { "BucketARNUpdate": "arn:aws:s3:::my_new_bucket", "FileKeyUpdate": "my_new_code.zip", "ObjectVersionUpdate": "2" } } } }