Kinesis Data Analytics API 示例代码 - Amazon Kinesis Data Analytics
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Kinesis Data Analytics API 示例代码

本主题包含 Kinesis Data Analytics 操作的示例请求块。

要在 Amazon Command Line Interface (Amazon CLI) 中将 JSON 作为一个操作的输入,请将请求保存在 JSON 文件中。然后,使用 --cli-input-json 参数将文件名传递给该操作。

以下示例说明了如何将 JSON 文件与操作一起使用。

$ aws kinesisanalyticsv2 start-application --cli-input-json file://start.json

有关使用 JSON 的更多信息,请参阅Amazon CLI,请参阅生成 CLI 框架和 CLI 输入 JSON 参数中的Amazon Command Line Interface用户指南.

AddApplicationCloudWatchLoggingOption

请求代码的以下示例请求代码:AddApplicationCloudWatchLoggingOption行动添加了亚马逊CloudWatchKinesis Data Analytics 应用程序的日志记录选项:

{ "ApplicationName": "MyApplication", "CloudWatchLoggingOption": { "LogStreamARN": "arn:aws:logs:us-east-1:123456789123:log-group:my-log-group:log-stream:My-LogStream" }, "CurrentApplicationVersionId": 2 }

AddApplication输入

请求代码的以下示例请求代码:AddApplication输入操作将应用程序输入添加到 Kinesis Data Analytics 应用程序中:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 2, "Input": { "InputParallelism": { "Count": 2 }, "InputSchema": { "RecordColumns": [ { "Mapping": "$.TICKER", "Name": "TICKER_SYMBOL", "SqlType": "VARCHAR(50)" }, { "SqlType": "REAL", "Name": "PRICE", "Mapping": "$.PRICE" } ], "RecordEncoding": "UTF-8", "RecordFormat": { "MappingParameters": { "JSONMappingParameters": { "RecordRowPath": "$" } }, "RecordFormatType": "JSON" } }, "KinesisStreamsInput": { "ResourceARN": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream" } } }

AddApplicationInputProcessing配置

请求代码的以下示例请求代码:AddApplicationInputProcessing配置操作将应用程序输入处理配置添加到 Kinesis Data Analytics 应用程序:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 2, "InputId": "2.1", "InputProcessingConfiguration": { "InputLambdaProcessor": { "ResourceARN": "arn:aws:lambda:us-east-1:012345678901:function:MyLambdaFunction" } } }

AddApplication输出

请求代码的以下示例请求代码:AddApplication输出操作将 Kinesis 数据流作为应用程序输出添加到 Kinesis Data Analytics 应用程序中:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 2, "Output": { "DestinationSchema": { "RecordFormatType": "JSON" }, "KinesisStreamsOutput": { "ResourceARN": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream" }, "Name": "DESTINATION_SQL_STREAM" } }

AddApplicationReferenceData源

请求代码的以下示例请求代码:AddApplicationReferenceData源操作将 CSV 应用程序引用数据源添加到 Kinesis Data Analytics 应用程序中:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 5, "ReferenceDataSource": { "ReferenceSchema": { "RecordColumns": [ { "Mapping": "$.TICKER", "Name": "TICKER", "SqlType": "VARCHAR(4)" }, { "Mapping": "$.COMPANYNAME", "Name": "COMPANY_NAME", "SqlType": "VARCHAR(40)" }, ], "RecordEncoding": "UTF-8", "RecordFormat": { "MappingParameters": { "CSVMappingParameters": { "RecordColumnDelimiter": " ", "RecordRowDelimiter": "\r\n" } }, "RecordFormatType": "CSV" } }, "S3ReferenceDataSource": { "BucketARN": "arn:aws:s3:::MyS3Bucket", "FileKey": "TickerReference.csv" }, "TableName": "string" } }

AddApplicationVpcConfiguration

AddApplicationVpcConfiguration 操作的以下示例请求代码将 VPC 配置添加到现有应用程序中:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 9, "VpcConfiguration": { "SecurityGroupIds": [ "sg-0123456789abcdef0" ], "SubnetIds": [ "subnet-0123456789abcdef0" ] } }

CreateApplication

请求代码的以下示例请求代码:CreateApplication操作创建 Kinesis Data Analytics 应用程序:

{ "ApplicationName":"MyApplication", "ApplicationDescription":"My-Application-Description", "RuntimeEnvironment":"FLINK-1_13", "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole", "CloudWatchLoggingOptions":[ { "LogStreamARN":"arn:aws:logs:us-east-1:123456789123:log-group:my-log-group:log-stream:My-LogStream" } ], "ApplicationConfiguration": { "EnvironmentProperties": {"PropertyGroups": [ {"PropertyGroupId": "ConsumerConfigProperties", "PropertyMap": {"aws.region": "us-east-1", "flink.stream.initpos": "LATEST"} }, {"PropertyGroupId": "ProducerConfigProperties", "PropertyMap": {"aws.region": "us-east-1"} }, ] }, "ApplicationCodeConfiguration":{ "CodeContent":{ "S3ContentLocation":{ "BucketARN":"arn:aws:s3:::mybucket", "FileKey":"myflink.jar", "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345" } }, "CodeContentType":"ZIPFILE" }, "FlinkApplicationConfiguration":{ "ParallelismConfiguration":{ "ConfigurationType":"CUSTOM", "Parallelism":2, "ParallelismPerKPU":1, "AutoScalingEnabled":true } } } }

CreateApplication快照

请求代码的以下示例请求代码:CreateApplication快照操作创建应用程序状态快照:

{ "ApplicationName": "MyApplication", "SnapshotName": "MySnapshot" }

DeleteApplication

请求代码的以下示例请求代码:DeleteApplication操作删除 Kinesis Data Analytics 应用程序:

{"ApplicationName": "MyApplication", "CreateTimestamp": 12345678912}

DeleteApplicationCloudWatchLoggingOption

请求代码的以下示例请求代码:DeleteApplicationCloudWatchLoggingOption操作删除亚马逊CloudWatchKinesis Data Analytics 应用程序的日志记录选项:

{ "ApplicationName": "MyApplication", "CloudWatchLoggingOptionId": "3.1" "CurrentApplicationVersionId": 3 }

DeleteApplicationInputProcessing配置

请求代码的以下示例请求代码:DeleteApplicationInputProcessing配置操作从 Kinesis Data Analytics 应用程序中删除输入处理配置:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 4, "InputId": "2.1" }

DeleteApplication输出

请求代码的以下示例请求代码:DeleteApplication输出操作从 Kinesis Data Analytics 应用程序中删除应用程序输出:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 4, "OutputId": "4.1" }

DeleteApplicationReferenceData源

请求代码的以下示例请求代码:DeleteApplicationReferenceData源操作从 Kinesis Data Analytics 应用程序中删除应用程序引用数据源:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 5, "ReferenceId": "5.1" }

DeleteApplication快照

请求代码的以下示例请求代码:DeleteApplication快照操作删除应用程序状态快照:

{ "ApplicationName": "MyApplication", "SnapshotCreationTimestamp": 12345678912, "SnapshotName": "MySnapshot" }

DeleteApplicationVpcConfiguration

DeleteApplicationVpcConfiguration 操作的以下示例请求代码从应用程序中删除现有的 VPC 配置:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 9, "VpcConfigurationId": "1.1" }

DescribeApplication

请求代码的以下示例请求代码:DescribeApplication操作返回 Kinesis Data Analytics 应用程序的详细信息:

{"ApplicationName": "MyApplication"}

DescribeApplication快照

请求代码的以下示例请求代码:DescribeApplication快照操作返回有关应用程序状态快照的详细信息:

{ "ApplicationName": "MyApplication", "SnapshotName": "MySnapshot" }

DiscoverInput架构

请求代码的以下示例请求代码:DiscoverInput架构操作从流式传输源中生成架构:

{ "InputProcessingConfiguration": { "InputLambdaProcessor": { "ResourceARN": "arn:aws:lambda:us-east-1:012345678901:function:MyLambdaFunction" } }, "InputStartingPositionConfiguration": { "InputStartingPosition": "NOW" }, "ResourceARN": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream", "S3Configuration": { "BucketARN": "string", "FileKey": "string" }, "ServiceExecutionRole": "string" }

请求代码的以下示例请求代码:DiscoverInput架构操作从引用源中生成架构:

{ "S3Configuration": { "BucketARN": "arn:aws:s3:::mybucket", "FileKey": "TickerReference.csv" }, "ServiceExecutionRole": "arn:aws:iam::123456789123:role/myrole" }

ListApplications

请求代码的以下示例请求代码:ListApplications操作返回您账户中 Kinesis Data Analytics 应用程序的列表:

{ "ExclusiveStartApplicationName": "MyApplication", "Limit": 50 }

ListApplication快照

请求代码的以下示例请求代码:ListApplication快照操作返回应用程序状态快照列表:

{"ApplicationName": "MyApplication", "Limit": 50, "NextToken": "aBcDeFgHiJkLmNoPqRsTuVwXyZ0123" }

StartApplication

请求代码的以下示例请求代码:StartApplication操作启动 Kinesis Data Analytics 应用程序,并从最新快照(如有)中加载应用程序状态:

{ "ApplicationName": "MyApplication", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }

StopApplication

请求代码的以下示例请求代码:API_StopApplication操作停止 Kinesis Data Analytics 应用程序:

{"ApplicationName": "MyApplication"}

UpdateApplication

请求代码的以下示例请求代码:UpdateApplication操作更新 Kinesis Data Analytics 应用程序以更改应用程序代码的位置:

{"ApplicationName": "MyApplication", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "ApplicationCodeConfigurationUpdate": { "CodeContentTypeUpdate": "ZIPFILE", "CodeContentUpdate": { "S3ContentLocationUpdate": { "BucketARNUpdate": "arn:aws:s3:::my_new_bucket", "FileKeyUpdate": "my_new_code.zip", "ObjectVersionUpdate": "2" } } } }