将流式传输源元素映射到 SQL 输入列 - Amazon Kinesis Data Analytics for SQL 应用程序开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

如果我们为英文版本指南提供翻译,那么如果存在任何冲突,将以英文版本指南为准。在提供翻译时使用机器翻译。

将流式传输源元素映射到 SQL 输入列

通过使用 Amazon Kinesis Data Analytics,您可以使用标准 SQL 处理和分析 JSON 或 CSV 格式的流数据。

  • 要处理和分析流 CSV 数据,您可以为输入流的列分配列名称和数据类型。您的应用程序将按顺序从每个列定义的输入流中导入一个列。

    您不需要在应用程序输入流中包含所有列,但您不能跳过源流中的列。例如,您可以从包含五个列的一个输入流中导入前三个列,但不能进导入列 1、2 和 4。

  • 要处理和分析流 JSON 数据,您可以使用 JSONPath 表达式将流式传输源中的 JSON 元素映射到输入流中的 SQL 列。有关将 JSONPath 与 Amazon Kinesis Data Analytics 结合使用的更多信息,请参阅使用 JSONPath。SQL 表中的列具有从 JSON 类型中映射的数据类型。有关支持的数据类型,请参阅数据类型。有关将 JSON 数据转换成 SQL 数据的详细信息,请参阅将 JSON 数据类型映射到 SQL 数据类型

有关如何配置输入流的详细信息,请参阅配置应用程序输入

将 JSON 数据映射到 SQL 列

您可以使用 AWS 管理控制台或 Kinesis Data Analytics API 将 JSON 元素映射到输入列。

  • 要使用控制台将元素映射到列,请参阅使用架构编辑器

  • 要使用 Kinesis Data Analytics API 将元素映射到列,请参阅以下部分。

要将 JSON 元素映射到应用程序内部输入流中的列,您需要使用在每个列中包含以下信息的架构:

  • 源表达式: 标识列数据位置的JSONPath表达式。

  • 列名称。 SQL查询用于引用数据的名称。

  • 数据类型 列的SQL数据类型。

使用 API

要将流式传输源中的元素映射到输入列,您可以使用 Kinesis Data Analytics API CreateApplication 操作。要创建应用程序内部流,请指定一个架构以将您的数据转换为 SQL 中使用的架构化版本。CreateApplication 操作会将您的应用程序配置为接收来自单个流式传输源的输入。要将 JSON 元素或 CSV 列映射到 SQL 列,您应在 SourceSchema RecordColumns 数组中创建一个 RecordColumn 对象。RecordColumn 对象具有以下架构:

{ "Mapping": "String", "Name": "String", "SqlType": "String" }

RecordColumn 对象中的字段具有以下值:

  • Mapping: :用于识别列输入源记录中数据的位置的 JSONPath 表达式。采用 CSV 格式的源流的输入架构不存在此值。

  • Name: :应用程序内 SQL 数据流中的列名称。

  • SqlType: :应用程序内 SQL 数据流中的数据的数据类型。

JSON 输入架构示例

以下示例展示了 JSON 架构的 InputSchema 值的格式。

"InputSchema": { "RecordColumns": [ { "SqlType": "VARCHAR(4)", "Name": "TICKER_SYMBOL", "Mapping": "$.TICKER_SYMBOL" }, { "SqlType": "VARCHAR(16)", "Name": "SECTOR", "Mapping": "$.SECTOR" }, { "SqlType": "TINYINT", "Name": "CHANGE", "Mapping": "$.CHANGE" }, { "SqlType": "DECIMAL(5,2)", "Name": "PRICE", "Mapping": "$.PRICE" } ], "RecordFormat": { "MappingParameters": { "JSONMappingParameters": { "RecordRowPath": "$" } }, "RecordFormatType": "JSON" }, "RecordEncoding": "UTF-8" }

CSV 输入架构示例

以下示例展示了采用逗号分隔值 (CSV) 格式的架构的 InputSchema 值的格式。

"InputSchema": { "RecordColumns": [ { "SqlType": "VARCHAR(16)", "Name": "LastName" }, { "SqlType": "VARCHAR(16)", "Name": "FirstName" }, { "SqlType": "INTEGER", "Name": "CustomerId" } ], "RecordFormat": { "MappingParameters": { "CSVMappingParameters": { "RecordColumnDelimiter": ",", "RecordRowDelimiter": "\n" } }, "RecordFormatType": "CSV" }, "RecordEncoding": "UTF-8" }

将 JSON 数据类型映射到 SQL 数据类型

JSON 数据类型将根据应用程序的输入架构转换为相应的 SQL 输入类型。有关支持的 SQL 数据类型的信息,请参阅数据类型。Amazon Kinesis Data Analytics 根据以下规则将 JSON 数据类型转换为 SQL 数据类型。

Null 文本

无论目标数据类型如何,JSON 输入流中的 null 文本 (即,"City":null) 都将转换为 SQL null。

布尔文本

JSON 输入流中的布尔文本 (即 "Contacted":true) 将按以下形式转换为 SQL 数据:

  • 数值(十进制数、国际数据库等): true 转换为1; false 转换为0。

  • 二进制 (BINARY 或 VARBINARY):

    • true: :结果已设置最低位并清除剩余位。

    • false: :结果已清除所有位。

    转换为 VARBINARY 将产生长度为 1 个字节的值。

  • BOOLEAN 转换为相应的SQL布尔值。

  • 字符(字符或Varchar): 转换为相应字符串值(truefalse)。值将被截断以适应字段的长度。

  • 日期时间(日期、时间或时间戳): 转换失败,并且将密码错误写入错误流。

Number

JSON 输入流中的数字文本 (即 "CustomerId":67321) 将按以下形式转换为 SQL 数据:

  • 数值(十进制数、国际数据库等): 直接转换。如果转换后的值超过目标数据类型 (即,将 123.4 转换为 INT) 的大小或精度,转换将失败,并且一个强制转换错误将写入到错误流。

  • 二进制 (BINARY 或 VARBINARY):转换失败,并且将密码错误写入错误流。

  • BOOLEAN

    • 0: 转换为 false.

    • 所有其他数字: 转换为 true.

  • 字符(字符或Varchar): 转换为编号的字符串表示。

  • 日期时间(日期、时间或时间戳): 转换失败,并且将密码错误写入错误流。

String

JSON 输入流中的字符串值(即 "CustomerName":"John Doe")将按以下形式转换为 SQL 数据:

  • 数字(DECIMAL、INT 等):Amazon Kinesis Data Analytics 尝试将值转换为目标数据类型。如果该值无法转换,则转换将失败,并且一个强制转换错误将写入到错误流。

  • 二进制 (BINARY 或 VARBINARY):如果源字符串是有效的二进制文字(即, X'3F67A23A',使用偶数f),值将转换为目标数据类型。否则,转换将失败,并且一个强制转换错误将写入到错误流。

  • BOOLEAN 如果源字符串为 "true",转换为 true。此比较不区分大小写。否则,转换为 false

  • 字符(字符或Varchar): 转换为输入中的字符串值。如果该值长于目标数据类型,那么它将被截断,并且任何错误都不会将写入到错误流。

  • 日期时间(日期、时间或时间戳): 如果源字符串的格式可以转换为目标值,则值将转换。否则,转换将失败,并且一个强制转换错误将写入到错误流。

    有效的日期时间格式包括:

    • “1992-02-14”

    • “1992-02-14 18:35:44.0”

数组或对象

JSON 输入流中的数组或对象将按以下形式转换为 SQL 数据:

  • 字符(字符或Varchar): 转换为阵列或对象的源文本。请参阅访问数组

  • 所有其他数据类型: 转换失败,并且将密码错误写入错误流。

有关 JSON 数组的示例,请参阅使用 JSONPath

相关主题