将流式传输源元素映射到 SQL 输入列 - 适用于 Amazon Kinesis Data Analytics·for·SQL 应用程序开发人员指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,建议您使用新的适用于 Apache Flink Studio 的托管服务,而不是使用 Kinesis Data Analytics for SQL 应用程序。Managed Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将流式传输源元素映射到 SQL 输入列

注意

2023 年 9 月 12 日之后,如果您尚未使用适用于 SQL 的 Kinesis Data Analytics,则将无法使用 Kinesis Data Firehose 作为来源创建新应用程序。有关更多信息,请参阅限制

通过使用 Amazon Kinesis Data Analytics,您可以使用标准 SQL 处理和分析 JSON 或 CSV 格式的流数据。

  • 要处理和分析流 CSV 数据,您可以为输入流的列分配列名称和数据类型。您的应用程序将按顺序从每个列定义的输入流中导入一个列。

    您不需要在应用程序输入流中包含所有列,但您不能跳过源流中的列。例如,您可以从包含五个列的一个输入流中导入前三个列,但不能进导入列 1、2 和 4。

  • 要处理和分析流 JSON 数据,您可以使用 JSONPath 表达式将流式传输源中的 JSON 元素映射到输入流中的 SQL 列。有关通过 Amazon Kinesis Data Analytics 使用 JSONPath 的更多信息,请参阅 使用 JSONPath。SQL 表中的列具有从 JSON 类型中映射的数据类型。有关支持的数据类型,请参阅数据类型。有关将 JSON 数据转换成 SQL 数据的详细信息,请参阅将 JSON 数据类型映射到 SQL 数据类型

有关如何配置输入流的详细信息,请参阅配置应用程序输入

将 JSON 数据映射到 SQL 列

您可以使用 Amazon Web Services Management Console 或 Kinesis Data Analytics API 将 JSON 元素映射到输入列。

  • 要使用控制台将元素映射到列,请参阅使用架构编辑器

  • 如需使用 Kinesis Data Analytics API 将元素映射到列,请参阅以下部分。

要将 JSON 元素映射到应用程序内部输入流中的列,您需要使用在每个列中包含以下信息的架构:

  • 源表达式:指定列的数据位置的 JSONPath 表达式。

  • 列名称:您的 SQL 查询用于引用数据的名称。

  • 数据类型:列的 SQL 数据类型。

使用 API

如需将流式源中的元素映射到输入列,您可以使用 Kinesis Data Analytics API CreateApplication 操作。要创建应用程序内部流,请指定一个架构以将您的数据转换为 SQL 中使用的架构化版本。CreateApplication 操作会将您的应用程序配置为接收来自单个流式传输源的输入。要将 JSON 元素或 CSV 列映射到 SQL 列,您应在 SourceSchema RecordColumns 数组中创建一个 RecordColumn 对象。RecordColumn 对象具有以下架构:

{ "Mapping": "String", "Name": "String", "SqlType": "String" }

RecordColumn 对象中的字段具有以下值:

  • Mapping:用于识别列输入源记录中数据的位置的 JSONPath 表达式。采用 CSV 格式的源流的输入架构不存在此值。

  • Name:应用程序内 SQL 数据流中的列名称。

  • SqlType:应用程序内 SQL 数据流中的数据的数据类型。

JSON 输入架构示例

以下示例展示了 JSON 架构的 InputSchema 值的格式。

"InputSchema": { "RecordColumns": [ { "SqlType": "VARCHAR(4)", "Name": "TICKER_SYMBOL", "Mapping": "$.TICKER_SYMBOL" }, { "SqlType": "VARCHAR(16)", "Name": "SECTOR", "Mapping": "$.SECTOR" }, { "SqlType": "TINYINT", "Name": "CHANGE", "Mapping": "$.CHANGE" }, { "SqlType": "DECIMAL(5,2)", "Name": "PRICE", "Mapping": "$.PRICE" } ], "RecordFormat": { "MappingParameters": { "JSONMappingParameters": { "RecordRowPath": "$" } }, "RecordFormatType": "JSON" }, "RecordEncoding": "UTF-8" }

CSV 输入架构示例

以下示例展示了采用逗号分隔值 (CSV) 格式的架构的 InputSchema 值的格式。

"InputSchema": { "RecordColumns": [ { "SqlType": "VARCHAR(16)", "Name": "LastName" }, { "SqlType": "VARCHAR(16)", "Name": "FirstName" }, { "SqlType": "INTEGER", "Name": "CustomerId" } ], "RecordFormat": { "MappingParameters": { "CSVMappingParameters": { "RecordColumnDelimiter": ",", "RecordRowDelimiter": "\n" } }, "RecordFormatType": "CSV" }, "RecordEncoding": "UTF-8" }

将 JSON 数据类型映射到 SQL 数据类型

JSON 数据类型将根据应用程序的输入架构转换为相应的 SQL 输入类型。有关支持的 SQL 数据类型的信息,请参阅数据类型。Amazon Kinesis Data Analytics 将根据以下规则将 JSON 数据类型转换为 SQL 数据类型。

Null 文本

无论目标数据类型如何,JSON 输入流中的 null 文本 (即,"City":null) 都将转换为 SQL null。

布尔文本

JSON 输入流中的布尔文本 (即 "Contacted":true) 将按以下形式转换为 SQL 数据:

  • 数值 (DECIMAL、INT 等):true 转换为 1;false 转换为 0。

  • 二进制 (BINARY 或 VARBINARY):

    • true:结果已设置最低位并清除剩余位。

    • false:结果已清除所有位。

    转换为 VARBINARY 将产生长度为 1 个字节的值。

  • 布尔值:转换为相应的 SQL 布尔值。

  • 字符 (CHAR 或 VARCHAR):转换为相应的字符串值 (truefalse)。值将被截断以适应字段的长度。

  • 日期时间 (DATE、TIME 或 TIMESTAMP):转换将失败,并且一个强制转换错误将写入到错误流。

数字

JSON 输入流中的数字文本 (即 "CustomerId":67321) 将按以下形式转换为 SQL 数据:

  • 数值 (DECIMAL、INT 等):直接转换。如果转换后的值超过目标数据类型 (即,将 123.4 转换为 INT) 的大小或精度,转换将失败,并且一个强制转换错误将写入到错误流。

  • 二进制 (BINARY 或 VARBINARY):转换将失败,并且一个强制转换错误将写入到错误流。

  • BOOLEAN:

    • 0:转换为 false

    • 所有其他数字:转换为 true

  • 字符 (CHAR 或 VARCHAR):转换为数字的字符串表示形式。

  • 日期时间 (DATE、TIME 或 TIMESTAMP):转换将失败,并且一个强制转换错误将写入到错误流。

字符串

JSON 输入流中的字符串值(即 "CustomerName":"John Doe")将按以下形式转换为 SQL 数据:

  • 数字 (DECIMAL、INT 等):Amazon Kinesis Data Analytics 尝试将值转换为目标数据类型。如果该值无法转换,则转换将失败,并且一个强制转换错误将写入到错误流。

  • 二进制 (BINARY 或 VARBINARY):如果源字符串是有效的二进制文本 (即包含偶数数量的 f 的 X'3F67A23A'),则该值将转换为目标数据类型。否则,转换将失败,并且一个强制转换错误将写入到错误流。

  • 布尔值:如果源字符串为 "true",则转换为 true。此比较不区分大小写。否则,转换为 false

  • 字符 (CHAR 或 VARCHAR):转换为输入中的字符串值。如果该值长于目标数据类型,那么它将被截断,并且任何错误都不会将写入到错误流。

  • 日期时间 (DATE、TIME 或 TIMESTAMP):如果源字符串采用了可转换为目标值的格式,则转换该值。否则,转换将失败,并且一个强制转换错误将写入到错误流。

    有效的日期时间格式包括:

    • “1992-02-14”

    • “1992-02-14 18:35:44.0”

数组或对象

JSON 输入流中的数组或对象将按以下形式转换为 SQL 数据:

  • 字符 (CHAR 或 VARCHAR):转换为数组或对象的源文本。请参阅 访问数组

  • 所有其他数据类型:转换将失败,并且一个强制转换错误将写入到错误流。

有关 JSON 数组的示例,请参阅使用 JSONPath

相关主题