将流式传输源元素映射到 SQL 输入列 - SQL适用于应用程序的 Amazon Kinesis Data Analytics 开发者指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

经过仔细考虑,我们决定分两个步骤停止使用亚马逊 Kinesis Data Analytics SQL 的应用程序:

1. 从 2025 年 10 月 15 日起,您将无法为应用程序创建新的 Kinesis Data Analytic SQL s。

2. 从 2026 年 1 月 27 日起,我们将删除您的应用程序。您将无法启动或操作适用于应用程序的 Amazon Kinesis Data Analytic SQL s。从那时起,亚马逊 Kinesis Data Analytics SQL 将不再提供支持。有关更多信息,请参阅 适用于应用程序的 Amazon Kinesis Data Analytic SQL s 停产

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将流式传输源元素映射到 SQL 输入列

注意

2023 年 9 月 12 日之后,如果您尚未使用 Kinesis Data Analytics for SQL,则将无法使用 Kinesis Data Firehose 作为来源创建新应用程序。有关更多信息,请参阅限制

通过使用 Amazon Kinesis Data Analytics,您可以使用标准 SQL 处理和分析 JSON 或 CSV 格式的流数据。

  • 要处理和分析流 CSV 数据,您可以为输入流的列分配列名称和数据类型。您的应用程序将按顺序从每个列定义的输入流中导入一个列。

    您不需要在应用程序输入流中包含所有列,但您不能跳过源流中的列。例如,您可以从包含五个列的一个输入流中导入前三个列,但不能进导入列 1、2 和 4。

  • 要处理和分析流 JSON 数据,您可以使用 JSONPath 表达式将流式传输源中的 JSON 元素映射到输入流中的 SQL 列。有关通过 Amazon Kinesis Data Analytics 使用 JSONPath 的更多信息,请参阅 使用 JSONPath。SQL 表中的列具有从 JSON 类型中映射的数据类型。有关支持的数据类型,请参阅数据类型。有关将 JSON 数据转换成 SQL 数据的详细信息,请参阅将 JSON 数据类型映射到 SQL 数据类型

有关如何配置输入流的详细信息,请参阅配置应用程序输入

将 JSON 数据映射到 SQL 列

您可以使用 Amazon Web Services Management Console 或 Kinesis Data Analytics API 将 JSON 元素映射到输入列。

  • 要使用控制台将元素映射到列,请参阅使用架构编辑器

  • 如需使用 Kinesis Data Analytics API 将元素映射到列,请参阅以下部分。

要将 JSON 元素映射到应用程序内部输入流中的列,您需要使用在每个列中包含以下信息的架构:

  • 源表达式:指定列的数据位置的 JSONPath 表达式。

  • 列名称:您的 SQL 查询用于引用数据的名称。

  • 数据类型:列的 SQL 数据类型。

使用 API

如需将流式源中的元素映射到输入列,您可以使用 Kinesis Data Analytics API CreateApplication 操作。要创建应用程序内部流,请指定一个架构以将您的数据转换为 SQL 中使用的架构化版本。CreateApplication 操作会将您的应用程序配置为接收来自单个流式传输源的输入。要将 JSON 元素或 CSV 列映射到 SQL 列,您应在 SourceSchema RecordColumns 数组中创建一个 RecordColumn 对象。RecordColumn 对象具有以下架构:

{ "Mapping": "String", "Name": "String", "SqlType": "String" }

RecordColumn 对象中的字段具有以下值:

  • Mapping:用于识别列输入源记录中数据的位置的 JSONPath 表达式。采用 CSV 格式的源流的输入架构不存在此值。

  • Name:应用程序内 SQL 数据流中的列名称。

  • SqlType:应用程序内 SQL 数据流中的数据的数据类型。

JSON 输入架构示例

以下示例展示了 JSON 架构的 InputSchema 值的格式。

"InputSchema": { "RecordColumns": [ { "SqlType": "VARCHAR(4)", "Name": "TICKER_SYMBOL", "Mapping": "$.TICKER_SYMBOL" }, { "SqlType": "VARCHAR(16)", "Name": "SECTOR", "Mapping": "$.SECTOR" }, { "SqlType": "TINYINT", "Name": "CHANGE", "Mapping": "$.CHANGE" }, { "SqlType": "DECIMAL(5,2)", "Name": "PRICE", "Mapping": "$.PRICE" } ], "RecordFormat": { "MappingParameters": { "JSONMappingParameters": { "RecordRowPath": "$" } }, "RecordFormatType": "JSON" }, "RecordEncoding": "UTF-8" }

CSV 输入架构示例

以下示例展示了采用逗号分隔值 (CSV) 格式的架构的 InputSchema 值的格式。

"InputSchema": { "RecordColumns": [ { "SqlType": "VARCHAR(16)", "Name": "LastName" }, { "SqlType": "VARCHAR(16)", "Name": "FirstName" }, { "SqlType": "INTEGER", "Name": "CustomerId" } ], "RecordFormat": { "MappingParameters": { "CSVMappingParameters": { "RecordColumnDelimiter": ",", "RecordRowDelimiter": "\n" } }, "RecordFormatType": "CSV" }, "RecordEncoding": "UTF-8" }

将 JSON 数据类型映射到 SQL 数据类型

JSON 数据类型将根据应用程序的输入架构转换为相应的 SQL 输入类型。有关支持的 SQL 数据类型的信息,请参阅数据类型。Amazon Kinesis Data Analytics 将根据以下规则将 JSON 数据类型转换为 SQL 数据类型。

Null 文本

无论目标数据类型如何,JSON 输入流中的 null 文本 (即,"City":null) 都将转换为 SQL null。

布尔文本

JSON 输入流中的布尔文本 (即 "Contacted":true) 将按以下形式转换为 SQL 数据:

  • 数值 (DECIMAL、INT 等):true 转换为 1;false 转换为 0。

  • 二进制 (BINARY 或 VARBINARY):

    • true:结果已设置最低位并清除剩余位。

    • false:结果已清除所有位。

    转换为 VARBINARY 将产生长度为 1 个字节的值。

  • 布尔值:转换为相应的 SQL 布尔值。

  • 字符 (CHAR 或 VARCHAR):转换为相应的字符串值 (truefalse)。值将被截断以适应字段的长度。

  • 日期时间 (DATE、TIME 或 TIMESTAMP):转换将失败,并且一个强制转换错误将写入到错误流。

数字

JSON 输入流中的数字文本 (即 "CustomerId":67321) 将按以下形式转换为 SQL 数据:

  • 数值 (DECIMAL、INT 等):直接转换。如果转换后的值超过目标数据类型 (即,将 123.4 转换为 INT) 的大小或精度,转换将失败,并且一个强制转换错误将写入到错误流。

  • 二进制 (BINARY 或 VARBINARY):转换将失败,并且一个强制转换错误将写入到错误流。

  • BOOLEAN:

    • 0:转换为 false

    • 所有其他数字:转换为 true

  • 字符 (CHAR 或 VARCHAR):转换为数字的字符串表示形式。

  • 日期时间 (DATE、TIME 或 TIMESTAMP):转换将失败,并且一个强制转换错误将写入到错误流。

String

JSON 输入流中的字符串值(即 "CustomerName":"John Doe")将按以下形式转换为 SQL 数据:

  • 数字 (DECIMAL、INT 等):Amazon Kinesis Data Analytics 尝试将值转换为目标数据类型。如果该值无法转换,则转换将失败,并且一个强制转换错误将写入到错误流。

  • 二进制 (BINARY 或 VARBINARY):如果源字符串是有效的二进制文本 (即包含偶数数量的 f 的 X'3F67A23A'),则该值将转换为目标数据类型。否则,转换将失败,并且一个强制转换错误将写入到错误流。

  • 布尔值:如果源字符串为 "true",则转换为 true。此比较不区分大小写。否则,转换为 false

  • 字符 (CHAR 或 VARCHAR):转换为输入中的字符串值。如果该值长于目标数据类型,那么它将被截断,并且任何错误都不会将写入到错误流。

  • 日期时间 (DATE、TIME 或 TIMESTAMP):如果源字符串采用了可转换为目标值的格式,则转换该值。否则,转换将失败,并且一个强制转换错误将写入到错误流。

    有效的日期时间格式包括:

    • “1992-02-14”

    • “1992-02-14 18:35:44.0”

数组或对象

JSON 输入流中的数组或对象将按以下形式转换为 SQL 数据:

  • 字符 (CHAR 或 VARCHAR):转换为数组或对象的源文本。请参阅 访问数组

  • 所有其他数据类型:转换将失败,并且一个强制转换错误将写入到错误流。

有关 JSON 数组的示例,请参阅使用 JSONPath

相关主题