对于新项目,我们建议您使用新的 Kinesis Data Analytics 工作室,而不是 SQL 应用程序的 Kinesis Data Analytics。Kinesis Data Analytics Studio 将易用性与高级分析功能相结合,使您能够在几分钟内构建复杂的流处理应用程序。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
配置应用程序输入
您的 Amazon Kinesis Data Analytics 应用程序可以接收来自单个流媒体源的输入,也可以选择使用一个参考数据源。有关更多信息,请参阅适用于 SQL 应用程序的Amazon Kinesis Data Analytics:工作原理:本主题的此部分介绍了应用程序输入源。
主题
配置流式传输源
当您创建应用程序时,可以指定流式传输源。您也可以在创建应用程序后修改输入。Amazon Kinesis Data Analytics 支持您的应用程序的以下流媒体源:
-
Kinesis 数据流
-
Kinesis Data Firehose 传输流
注意
如果 Kinesis 数据流已加密,Kinesis Data Analytics 无需进一步配置即可无缝访问加密流中的数据。Kinesis Data Analytics 不存储从 Kinesis Data Streams 读取的未加密数据。有关更多信息,请参阅什么是 Kinesis 数据流的服务器端加密?。
Kinesis Data Analytics 持续轮询流媒体源以获取新数据,并根据输入配置将其提取到应用程序内流中。
注意
添加 Kinesis Stream 作为应用程序的输入不会影响流中的数据。如果其他资源,例如 Kinesis Data Firehose 传输流,也访问了相同的 Kinesis 流,则 Kinesis Data Firehose 传输流和 Kinesis Data Analytics 应用程序都将收到相同的数据。但是,吞吐量和限制可能会受到影响。
您的应用程序代码可以查询应用程序内部流。作为输入配置的一部分,您需要提供以下内容:
-
流的-您提供该传输流的 Amazon 资源名称(ARN)以及 Kinesis Data Analytics 可以代您访问该流的 IAM 角色。
-
应用程序内流名称前缀 — 启动应用程序时,Kinesis Data Analytics 会创建指定的应用程序内流。在您的应用程序代码中,可以使用此名称访问应用程序内部流。
您可以选择将一个流式传输源映射到多个应用程序内部流。有关更多信息,请参阅Limits:在这种情况下,Amazon Kinesis Data Analytics 会创建指定数量的应用程序内流,其名称如下:
前
缀
和_001
_002
、前缀前缀
_003
。默认情况下,Kinesis Data Analytics 将流媒体源映射到一个名为p
refix 的应用程序内流_001
。在应用程序内部流中插入行时有速度限制。因此,Kinesis Data Analytics 支持多个这样的应用程序内流,因此您可以以更快的速度将记录导入应用程序。如果发现应用程序无法及时处理流式传输源中的数据,您可以添加并行度单元以提高性能。
-
映射架构 — 您可以描述流媒体源上的记录格式(JSON、CSV)。您还描述流上的每个记录如何映射到创建的应用程序内部流中的列。您可以在此处提供列名和数据类型。
注意
在创建应用程序内输入流时,Kinesis Data Analytics 在标识符(流名称和列名)两侧添加引号。在查询该流和列时,您必须在引号内使用相同的大小写指定它们 (小写和大写字母完全匹配)。有关标识符的更多信息,请参阅 Amazon Kinesis Data Analytics SQL 参考中的标识符。
您可以在 Amazon Kinesis Data Analytics 控制台中创建应用程序和配置输入。然后,控制台执行必需的 API 调用。创建新应用程序 API 或者将输入配置添加到现有应用程序时,可以配置应用程序输入。有关更多信息,请参阅 CreateApplication 和 AddApplicationInput。下面是 Createapplication
API 请求正文的输入配置部分:
"Inputs": [ { "InputSchema": { "RecordColumns": [ { "Mapping": "string", "Name": "string", "SqlType": "string" } ], "RecordEncoding": "string", "RecordFormat": { "MappingParameters": { "CSVMappingParameters": { "RecordColumnDelimiter": "string", "RecordRowDelimiter": "string" }, "JSONMappingParameters": { "RecordRowPath": "string" } }, "RecordFormatType": "string" } }, "KinesisFirehoseInput": { "ResourceARN": "string", "RoleARN": "string" }, "KinesisStreamsInput": { "ResourceARN": "string", "RoleARN": "string" }, "Name": "string" } ]
配置引用源
您还可以选择将引用数据源添加到现有应用程序中,以便扩充来自流式传输源的数据。您必须将引用数据作为对象存储在 Amazon S3 存储桶。应用程序启动时,Amazon Kinesis Data Analytics 读取 Amazon S3 对象并创建应用程序内部参考表。然后,您的应用程序代码可以将其与应用程序内部流联接。
您使用支持的格式(CSV、JSON)将引用数据存储在 Amazon S3 对象中。例如,假设您的应用程序对股票订单执行分析。对流式传输源采用以下记录格式:
Ticker, SalePrice, OrderId AMZN $700 1003 XYZ $250 1004 ...
在这种情况下,您可能考虑维护引用数据源,以提供有关每个股票行情机的详细信息,如公司名称。
Ticker, Company AMZN, Amazon XYZ, SomeCompany ...
您可以使用 API 或控制台添加应用程序参考数据源。Amazon Kinesis Data Analytics 提供以下 API 操作来管理参考数据源:
有关使用控制台添加引用数据的信息,请参阅示例:向 Kinesis Data Analytics 应用程序添加参考数据。
请注意以下几点:
-
如果应用程序正在运行,Kinesis Data Analytics 会创建应用程序内参考表,然后立即加载参考数据。
-
如果应用程序未运行(例如,它处于就绪状态),Kinesis Data Analytics 仅保存更新的输入配置。当应用程序开始运行时,Kinesis Data Analytics 会以表的形式加载应用程序中的参考数据。
假设你想在 Kinesis Data Analytics 创建应用程序内参考表后刷新数据。也许你更新了 Amazon S3 对象,或者你想使用不同的 Amazon S3 对象。在这种情况下,可以在控制台中显式调用UpdateApplication,也可以选择 “操作”、“同步参考数据表”。Kinesis Data Analytics 不会自动刷新应用程序内的参考表。
您可以创建作为参考数据源的 Amazon S3 对象的大小有限制。有关更多信息,请参阅Limits:如果对象大小超过限制,Kinesis Data Analytics 将无法加载数据。应用程序状态显示为正在运行,但是不读取数据。
当添加引用数据源时,您需要提供以下信息:
-
S3 存储段和对象密钥名称 — 除了存储段名称和对象密钥外,您还提供一个 IAM 角色,Kinesis Data Analytics 可以假定该角色代表您读取对象。
-
应用程序内参考表名称 — Kinesis Data Analytics 创建此应用程序内表并通过读取 Amazon S3 对象来填充该表。这是您在应用程序代码中指定的表名称。
-
映射架构 — 您可以描述存储在 Amazon S3 对象中的数据的记录格式(JSON、CSV)和编码。您还可以描述每个对象元素如何映射到应用程序内部引用表中的列。
下面显示 AddApplicationReferenceDataSource
API 请求中的请求正文。
{ "applicationName": "string", "CurrentapplicationVersionId": number, "ReferenceDataSource": { "ReferenceSchema": { "RecordColumns": [ { "IsDropped": boolean, "Mapping": "string", "Name": "string", "SqlType": "string" } ], "RecordEncoding": "string", "RecordFormat": { "MappingParameters": { "CSVMappingParameters": { "RecordColumnDelimiter": "string", "RecordRowDelimiter": "string" }, "JSONMappingParameters": { "RecordRowPath": "string" } }, "RecordFormatType": "string" } }, "S3ReferenceDataSource": { "BucketARN": "string", "FileKey": "string", "ReferenceRoleARN": "string" }, "TableName": "string" } }