配置应用程序输入 - Amazon Kinesis Data Analytics for SQL 应用程序开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

如果我们为英文版本指南提供翻译,那么如果存在任何冲突,将以英文版本指南为准。在提供翻译时使用机器翻译。

配置应用程序输入

您的 Amazon Kinesis Data Analytics 应用程序可以从单个流式传输源中接收输入,并且可以选择使用一个引用数据源。有关更多信息,请参阅 适用于 SQL 应用程序的 Amazon Kinesis Data Analytics :工作方式。) 本主题的此部分介绍了应用程序输入源。

配置流式传输源

当您创建应用程序时,可以指定流式传输源。您也可以在创建应用程序后修改输入。Amazon Kinesis Data Analytics 在应用程序中支持以下流式传输源:

  • Kinesis 数据流

  • Kinesis Data Firehose 传输流

注意

如果 Kinesis 数据流是加密的,Kinesis Data Analytics 会无缝地访问加密流中的数据,无需进一步配置。Kinesis Data Analytics 不存储从 Kinesis Data Streams 读取的未加密数据。有关更多信息,请参阅什么是 Kinesis 数据流的服务器端加密?

Kinesis Data Analytics 持续轮询流式传输源以查找新数据,并根据输入配置在应用程序内部流中提取该数据。

注意

添加 Kinesis 流作为应用程序的输入不会影响流中的数据。如果另一资源(如 Kinesis Data Firehose 交付流)也访问了同一 Kinesis 流,则 Kinesis Data Firehose 交付流和 Kinesis Data Analytics 应用程序将收到相同的数据。但是,吞吐量和限制可能会受到影响。

您的应用程序代码可以查询应用程序内部流。作为输入配置的一部分,您需要提供以下内容:

  • 流式传输源 – 您提供流的 Amazon 资源名称 (ARN) 以及 Kinesis Data Analytics 代表您访问流时代入的 IAM 角色。

  • 应用程序内部流名称前缀 – 在启动应用程序时,Kinesis Data Analytics 创建指定的应用程序内部流。在您的应用程序代码中,可以使用此名称访问应用程序内部流。

    您可以选择将一个流式传输源映射到多个应用程序内部流。有关更多信息,请参阅 Limits。) 在这种情况下, Amazon Kinesis Data Analytics 按如下名称创建指定的应用内流数量: prefix_001, prefix_002$and prefix_003...默认情况下, Kinesis Data Analytics 将流源映射到名为 prefix_001.

    在应用程序内部流中插入行时有速度限制。因此,Kinesis Data Analytics 支持多个此类应用程序内部流,以便以快得多的速度将记录添加到应用程序中。如果发现应用程序无法及时处理流式传输源中的数据,您可以添加并行度单元以提高性能。

  • 映射架构 – 您描述流式传输源上的记录格式(JSON、CSV)。您还描述流上的每个记录如何映射到创建的应用程序内部流中的列。您可以在此处提供列名和数据类型。

注意

在创建输入应用程序内部流时,Kinesis Data Analytics 使用引号将标识符(流名称和列名称)引起来。在查询该流和列时,您必须在引号内使用相同的大小写指定它们 (小写和大写字母完全匹配)。有关标识符的更多信息,请参阅 https://docs.amazonaws.cn/kinesisanalytics/latest/sqlref/sql-reference-identifiers.html 中的Amazon Kinesis Data Analytics SQL 参考标识符

您可以在 Amazon Kinesis Data Analytics 控制台中创建一个应用程序并配置输入。然后,控制台执行必需的 API 调用。创建新应用程序 API 或者将输入配置添加到现有应用程序时,可以配置应用程序输入。有关更多信息,请参阅 CreateApplicationAddApplicationInput。下面是 Createapplication API 请求正文的输入配置部分:

"Inputs": [ { "InputSchema": { "RecordColumns": [ { "Mapping": "string", "Name": "string", "SqlType": "string" } ], "RecordEncoding": "string", "RecordFormat": { "MappingParameters": { "CSVMappingParameters": { "RecordColumnDelimiter": "string", "RecordRowDelimiter": "string" }, "JSONMappingParameters": { "RecordRowPath": "string" } }, "RecordFormatType": "string" } }, "KinesisFirehoseInput": { "ResourceARN": "string", "RoleARN": "string" }, "KinesisStreamsInput": { "ResourceARN": "string", "RoleARN": "string" }, "Name": "string" } ]

配置引用源

您还可以选择将引用数据源添加到现有应用程序中,以便扩充来自流式传输源的数据。您必须将引用数据作为对象存储在 Amazon S3 存储桶中。在应用程序启动时,Amazon Kinesis Data Analytics 读取 Amazon S3 对象并创建应用程序内部引用表。然后,您的应用程序代码可以将其与应用程序内部流联接。

您可以使用支持的格式(CSV、JSON)在 Amazon S3 对象中存储引用数据。例如,假设您的应用程序对股票订单执行分析。对流式传输源采用以下记录格式:

Ticker, SalePrice, OrderId AMZN $700 1003 XYZ $250 1004 ...

在这种情况下,您可能考虑维护引用数据源,以提供有关每个股票行情机的详细信息,如公司名称。

Ticker, Company AMZN, Amazon XYZ, SomeCompany ...

您可以使用 API 或控制台添加应用程序引用数据源。Amazon Kinesis Data Analytics 提供了以下 API 操作以管理引用数据源:

有关使用控制台添加引用数据的信息,请参阅示例:将参考数据添加到A Kinesis Data Analytics 应用

请注意以下几点。

  • 如果应用程序正在运行,则 Kinesis Data Analytics 创建一个应用程序内部引用表,然后立即加载引用数据。

  • 如果应用程序未运行(例如,处于就绪状态),则 Kinesis Data Analytics 仅保存更新的输入配置。在应用程序开始运行时,Kinesis Data Analytics 在应用程序中将引用数据作为表进行加载。

假设您希望在 Kinesis Data Analytics 创建应用程序内部引用表后刷新数据。您可能更新了 Amazon S3 对象,或者要使用不同的 Amazon S3 对象。在这种情况下,您可以显式调用 UpdateApplication,或者在控制台中选择 Actions (操作) > Synchronize reference data table (同步引用数据表)。Kinesis Data Analytics 不会自动刷新应用程序内部引用表。

可作为引用数据源创建的 Amazon S3 对象具有大小限制。有关更多信息,请参阅 Limits。) 如果对象大小超出该限制,则 Kinesis Data Analytics 无法加载数据。应用程序状态显示为正在运行,但是不读取数据。

当添加引用数据源时,您需要提供以下信息:

  • S3 存储桶和对象键名称 – 除了存储桶名称和对象键以外,您还需要提供 Kinesis Data Analytics 代表您读取对象时代入的 IAM 角色。

  • 应用程序内部引用表名称 – Kinesis Data Analytics 创建该应用程序内部表并读取 Amazon S3 对象以填充该表。这是您在应用程序代码中指定的表名称。

  • 映射架构 – 您描述记录格式(JSON、CSV),即 Amazon S3 对象中存储的数据的编码。您还可以描述每个对象元素如何映射到应用程序内部引用表中的列。

下面显示 AddApplicationReferenceDataSource API 请求中的请求正文。

{ "applicationName": "string", "CurrentapplicationVersionId": number, "ReferenceDataSource": { "ReferenceSchema": { "RecordColumns": [ { "IsDropped": boolean, "Mapping": "string", "Name": "string", "SqlType": "string" } ], "RecordEncoding": "string", "RecordFormat": { "MappingParameters": { "CSVMappingParameters": { "RecordColumnDelimiter": "string", "RecordRowDelimiter": "string" }, "JSONMappingParameters": { "RecordRowPath": "string" } }, "RecordFormatType": "string" } }, "S3ReferenceDataSource": { "BucketARN": "string", "FileKey": "string", "ReferenceRoleARN": "string" }, "TableName": "string" } }