亚马逊 Data Firehose 以前被称为亚马逊 Kinesis Data Firehose
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
在 Firehose 中转换您的输入记录格式
在将数据存储到亚马逊 S3 之前,Amazon Data Firehose 可以将您的输入数据的格式从 JSON 转换为 Apache Parq
记录格式转换要求
Amazon Data Firehose 需要以下三个元素来转换记录数据的格式:
-
用于读取输入数据 JSON 的反序列化器 — 您可以从两种类型的反序列化器中选择一种:Apache Hive
JSON 或 OpenX JSON。 SerDe SerDe 注意
如要将多个 JSON 文档合并到同一记录中,请确保您的输入仍以支持的 JSON 格式显示。JSON 文档数组不是有效输入。
例如,这是正确的输入:
{"a":1}{"a":2}
而这是错误的输入:
[{"a":1}, {"a":2}]
-
用于确定如何解释该数据的架构:使用 Amazon Glue 在 Amazon Glue Data Catalog 中创建架构。然后,Amazon Data Firehose 会引用该架构并使用它来解释您的输入数据。您可以使用相同的架构来配置 Amazon Data Firehose 和您的分析软件。有关更多信息,请参阅《Amazon Glue 开发人员指南》中的填充 Amazon Glue 数据目录。
注意
在 Amazon Glue Data Catalog 中创建的架构应该与输入数据结构相匹配。否则,转换后的数据将不会包含架构中未指定的属性。如果您使用嵌套 JSON,请在架构中使用可镜像 JSON 数据结构的 STRUCT 类型。有关如何使用 STRUCT 类型处理嵌套 JSON 的信息,请参阅本例。
-
用于将@@ 数据转换为目标列式存储格式(Parquet 或 ORC)的序列化器 — 您可以选择两种类型的序列化器之一:ORC 或 Parquet。 SerDe SerDe
重要
如果您启用记录格式转换,则无法将亚马逊数据 Firehose 目标设置为亚马逊 OpenSearch 服务、亚马逊 Redshift 或 Splunk。启用格式转换后,Amazon S3 是您的 Firehose 直播可以使用的唯一目的地。
即使您在将记录发送到 Amazon Data Firehose 之前汇总了这些记录,也可以转换数据的格式。
选择 JSON 解串器
SerDe如果您的输入 JSO N 包含以下格式的时间戳,请选择 OpenX
-
yyyy-MM-dd'T'HH:mm:ss[.S]'Z',其中小数最多有 9 位,例如:
2017-02-07T15:13:01.39256Z
。 -
yyyy-[M]M-[d]d HH:mm:ss[.S],其中小数最多有 9 位,例如:
2017-02-07 15:13:01.14
。 -
秒,以纪元格式表示,例如:
1518033528
。 -
毫秒,以纪元格式表示,例如:
1518033528123
。 -
浮点秒,以纪元格式表示,例如:
1518033528.123
。
OpenX JSON SerDe 可以将句点 (.
) 转换为下划线 (_
)。它还可以在对 JSON 键进行反串行化前将其转换为小写。有关此反序列化器通过 Amazon Data Firehose 提供的选项的更多信息,请参阅 OpenX。JsonSerDe
如果你不确定要选择哪个反序列化器,请使用 OpenX JSON SerDe,除非你有它不支持的时间戳。
如果您的时间戳格式与之前列出的格式不同,请使用 Apache HivDateTimeFormat
格式字符串的模式语法。有关更多信息,请参阅类 DateTimeFormat
您还可以使用特殊值 millis
来解析时间戳(毫秒,以纪元格式表示)。如果您未指定格式,Amazon Data Firehose 会java.sql.Timestamp::valueOf
默认使用该格式。
Hive JSON SerDe 不允许执行以下操作:
-
列名称中的句点 (
.
)。 -
类型为
uniontype
的字段。 -
架构中具有数字类型但属于 JSON 中的字符串的字段。例如,如果架构是(整数),而 JSON 是
{"a":"123"}
,则 Hive SerDe 会给出错误。
Hive SerDe 不会将嵌套的 JSON 转换为字符串。例如,如果您有 {"a":{"inner":1}}
,它不会将 {"inner":1}
视为字符串。
选择串行化器
选择的串行化器取决于您的业务需求。要了解有关这两个序列化器选项的更多信息,请参阅 ORC SerDe 和 P
转换输入记录格式(控制台)
创建或更新 Firehose 直播时,可以在控制台上启用数据格式转换。启用数据格式转换后,Amazon S3 是您可以为 Firehose 数据流配置的唯一目标。此外,启用格式转换时,系统将禁用 Amazon S3 压缩。但是,Snappy 压缩会作为自动转换过程的一部分自动进行。Amazon Data Firehose 在本例中使用的 Snappy 框架格式与 Hadoop 兼容。这意味着,您可以使用 Snappy 压缩的结果并在 Athena 中对这些数据运行查询。有关 Hadoop 所依赖的 Snappy 取景格式,请参阅.java。BlockCompressorStream
要为数据 Firehose 流启用数据格式转换
-
选择要更新的 Firehose 直播,或者按照中的步骤创建新的 Firehose 直播。创建 Firehose 直播
-
在转换记录格式下,将记录格式转换设置为已启用。
-
选择所需的输出格式。有关这两个选项的更多信息,请参阅 Apache Parquet
和 Apache ORC 。 -
选择 Amazon Glue 表来指定源记录的架构。设置区域、数据库、表和表版本。
转换输入记录格式 (API)
如果你想让 Amazon Data Firehose 将你的输入数据格式从 JSON 转换为 Parquet 或 ORC,请在 extendedS3 或 Extended DestinationConfiguration S DataFormatConversionConfiguration3 中指定可选元素。DestinationUpdate如果您指定 DataFormatConversionConfiguration,则适用以下限制:
-
在中 BufferingHints,如果启用记录格式转换,则不能
SizeInMBs
将值设置为小于 64。此外,如果未启用格式转换,则默认值为 5。在启用格式转换后,该值将变为 128。 -
你必须
CompressionFormat
在 extendedS3 DestinationConfiguration 或 extendes3 中将设置为。DestinationUpdateUNCOMPRESSED
CompressionFormat
的默认值为UNCOMPRESSED
。因此,您也可以在 ext en DestinationConfiguration dedS3 中将其保留为未指定。默认情况下,数据将使用 Snappy 压缩来作为串行化过程的一部分得到压缩。Amazon Data Firehose 在本例中使用的 Snappy 框架格式与 Hadoop 兼容。这意味着,您可以使用 Snappy 压缩的结果并在 Athena 中对这些数据运行查询。有关 Hadoop 所依赖的 Snappy 取景格式,请参阅.java。BlockCompressorStream当配置串行化器时,您可以选择其他类型的压缩。
记录格式转换错误处理
当 Amazon Data Firehose 无法解析或反序列化记录时(例如,当数据与架构不匹配时),它会使用错误前缀将其写入 Amazon S3。如果此写入失败,Amazon Data Firehose 将永远重试,从而阻止进一步交付。对于每条失败的记录,Amazon Data Firehose 都会使用以下架构编写一个 JSON 文档:
{ "attemptsMade": long, "arrivalTimestamp": long, "lastErrorCode": string, "lastErrorMessage": string, "attemptEndingTimestamp": long, "rawData": string, "sequenceNumber": string, "subSequenceNumber": long, "dataCatalogTable": { "catalogId": string, "databaseName": string, "tableName": string, "region": string, "versionId": string, "catalogArn": string } }
记录格式转换示例
有关如何使用设置记录格式转换的示例Amazon CloudFormation,请参阅Amazon::DataFirehose:: DeliveryStream。