在 Kinesis Data Firehose 中转换输入记录格式
在将数据存储在 Amazon S3 之前,Amazon Kinesis Data Firehose 可以将输入数据的格式从 JSON 转换为 Apache Parquet
记录格式转换要求
Kinesis Data Firehose 需要以下三个元素才能转换记录数据的格式:
-
用于读取输入数据的 JSON 的解串器(您可以选择以下两种类型的解串器之一):Apache Hive JSON SerDe
或 OpenX JSON SerDe 。 注意
如要将多个 JSON 文档合并到同一记录中,请确保您的输入仍以支持的 JSON 格式显示。JSON 文档数组不是有效输入。
例如,这是正确的输入:
{"a":1}{"a":2}
而这是错误的输入:
[{"a":1}, {"a":2}]
-
用于确定如何解释该数据的架构:使用 Amazon Glue 在 Amazon Glue Data Catalog 中创建架构。然后,Kinesis Data Firehose 会引用该架构并使用其解释您的输入数据。您可以使用同一架构来配置 Kinesis Data Firehose 和分析软件。有关更多信息,请参阅《Amazon Glue 开发人员指南》中的填充 Amazon Glue 数据目录。
注意
在 Amazon Glue Data Catalog 中创建的架构应该与输入数据结构相匹配。否则,转换后的数据将不会包含架构中未指定的属性。如果您使用嵌套 JSON,请在架构中使用可镜像 JSON 数据结构的 STRUCT 类型。有关如何使用 STRUCT 类型处理嵌套 JSON 的信息,请参阅本例。
-
用于将数据转换为目标列式存储格式(Parquet 或 ORC)的串行化器(您可以选择以下两种类型的串行器之一):ORC SerDe
或 Parquet SerDe 。
重要
如果您启用记录格式转换,则无法将 Kinesis Data Firehose 目标设置为 Amazon OpenSearch Service、Amazon Redshift 或 Splunk。启用格式转换后,Amazon S3 就是您可用于 Kinesis Data Firehose 传输流的唯一目标。
即使您在将记录发送到 Kinesis Data Firehose 之前聚合了记录,也可以转换数据的格式。
选择 JSON 解串器
如果您的输入 JSON 包含采用以下格式的时间戳,请选择 OpenX JSON SerDe
-
yyyy-MM-dd'T'HH:mm:ss[.S]'Z',其中小数最多有 9 位,例如:
2017-02-07T15:13:01.39256Z
。 -
yyyy-[M]M-[d]d HH:mm:ss[.S],其中小数最多有 9 位,例如:
2017-02-07 15:13:01.14
。 -
秒,以纪元格式表示,例如:
1518033528
。 -
毫秒,以纪元格式表示,例如:
1518033528123
。 -
浮点秒,以纪元格式表示,例如:
1518033528.123
。
OpenX JSON SerDe 可将句点 (.
) 转换为下划线 (_
)。它还可以在对 JSON 键进行反串行化前将其转换为小写。有关此解串器通过 Kinesis Data Firehose 提供的选项的更多信息,请参阅 OpenXJsonSerDe。
如果您不确定要选择哪个解串器,请使用 OpenX JSON SerDe,除非您有其不支持的时间戳。
如果您有之前列出的格式以外的时间戳,请使用 Apache Hive JSON SerDeDateTimeFormat
格式字符串的模式语法。有关更多信息,请参阅 Class DateTimeFormat
您还可以使用特殊值 millis
来解析时间戳(毫秒,以纪元格式表示)。如果您不指定格式,Kinesis Data Firehose 将默认使用 java.sql.Timestamp::valueOf
。
Hive JSON SerDe 不允许以下内容:
-
列名称中的句点 (
.
)。 -
类型为
uniontype
的字段。 -
架构中具有数字类型但属于 JSON 中的字符串的字段。例如,如果架构为 (an int),而且 JSON 为
{"a":"123"}
,则 Hive SerDe 会出现错误。
Hive SerDe 不将嵌套 JSON 转换为字符串。例如,如果您有 {"a":{"inner":1}}
,它不会将 {"inner":1}
视为字符串。
选择串行化器
选择的串行化器取决于您的业务需求。要了解有关两个串行化器选项的更多信息,请参阅 ORC SerDe
转换输入记录格式(控制台)
您可以在创建或更新 Kinesis 传输流时在控制台上启用数据格式转换。启用数据格式转换后,Amazon S3 就是您可为传输流进行配置的唯一目标。此外,启用格式转换时,系统将禁用 Amazon S3 压缩。但是,Snappy 压缩会作为自动转换过程的一部分自动进行。在这种情况下,Kinesis Data Firehose 使用的 Snappy 的构造格式与 Hadoop 兼容。这意味着,您可以使用 Snappy 压缩的结果并在 Athena 中对这些数据运行查询。有关 Hadoop 所依赖的 Snappy 构造格式,请参阅 BlockCompressorStream.java
对数据传输流启用数据格式转换
-
访问 https://console.aws.amazon.com/firehose/
,登录到 Amazon Web Services Management Console 并打开 Kinesis Data Firehose 控制台。 -
选择要更新的 Kinesis Data Firehose 传输流,或按照 创建 Amazon Kinesis Data Firehose 传输流 中的步骤创建新的传输流。
-
在转换记录格式下,将记录格式转换设置为已启用。
-
选择所需的输出格式。有关这两个选项的更多信息,请参阅 Apache Parquet
和 Apache ORC 。 -
选择 Amazon Glue 表来指定源记录的架构。设置区域、数据库、表和表版本。
转换输入记录格式 (API)
如果您需要 Kinesis Data Firehose 将输入数据的格式从 JSON 转换为 Parquet 或 ORC,请在 ExtendedS3DestinationConfiguration 或 ExtendedS3DestinationUpdate 中指定可选的 DataFormatConversionConfiguration 元素。如果指定 DataFormatConversionConfiguration,则以下限制将适用:
-
在 BufferingHints 中,如果启用记录格式转换,则不能将
SizeInMBs
设置为小于 64 的值。此外,如果未启用格式转换,则默认值为 5。在启用格式转换后,该值将变为 128。 -
您必须将 ExtendedS3DestinationConfiguration 或 ExtendedS3DestinationUpdate 中的
CompressionFormat
设置为UNCOMPRESSED
。CompressionFormat
的默认值为UNCOMPRESSED
。因此,您还可以在 ExtendedS3DestinationConfiguration 中对其不进行指定。默认情况下,数据将使用 Snappy 压缩来作为串行化过程的一部分得到压缩。在这种情况下,Kinesis Data Firehose 使用的 Snappy 的构造格式与 Hadoop 兼容。这意味着,您可以使用 Snappy 压缩的结果并在 Athena 中对这些数据运行查询。有关 Hadoop 所依赖的 Snappy 构造格式,请参阅 BlockCompressorStream.java。当配置串行化器时,您可以选择其他类型的压缩。
记录格式转换错误处理
当 Kinesis Data Firehose 无法解析或解串记录时(例如,当数据与架构不匹配时),会将记录写入 Amazon S3,且带有错误前缀。如果此写入失败,Kinesis Data Firehose 会一直重试,同时阻止进一步传输。对于每条失败的记录,Kinesis Data Firehose 会使用以下架构写入 JSON 文档:
{ "attemptsMade": long, "arrivalTimestamp": long, "lastErrorCode": string, "lastErrorMessage": string, "attemptEndingTimestamp": long, "rawData": string, "sequenceNumber": string, "subSequenceNumber": long, "dataCatalogTable": { "catalogId": string, "databaseName": string, "tableName": string, "region": string, "versionId": string, "catalogArn": string } }
记录格式转换示例
有关如何使用 Amazon CloudFormation 设置记录格式转换的示例,请参阅 Amazon::KinesisFirehose::DeliveryStream。