Amazon Kinesis Data Firehose
开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 Amazon AWS 入门

在 Kinesis Data Firehose 中转换输入记录格式

Amazon Kinesis Data Firehose 可以在将输入数据存储在 Amazon S3 之前将数据格式从 JSON 转换为 Apache ParquetApache ORC。Parquet 和 ORC 是列式数据格式,与 JSON 等行式格式相比,前者可节省空间并更快地启用查询。如果您想要转换 JSON 以外的输入格式,如逗号分隔值 (CSV) 或结构化文本,您可以首先使用 AWS Lambda 来将其转换为 JSON。有关更多信息,请参阅Amazon Kinesis Data Firehose 数据转换

记录格式转换要求

Kinesis Data Firehose 需要以下三个元素才能转换记录数据的格式:

  • 用于读取输入数据的 JSON 的解串器 – 您可以选择两种类型的解串器之一:Apache Hive JSON SerDeOpenX JSON SerDe

  • 用于确定如何解释该数据的架构 – 使用 AWS Glue 在 AWS Glue 数据目录中创建架构。然后,Kinesis Data Firehose 引用该架构并使用它来解释您的输入数据。您可以使用同一架构来配置 Kinesis Data Firehose 和您的分析软件。有关更多信息,请参阅 AWS Glue 开发人员指南 中的填充 AWS Glue 数据目录

  • 用于将数据转换为目标列式存储格式 (Parquet 或 ORC) 的串行器 – 您可以选择两种类型的串行器之一:ORC SerDeParquet SerDe

重要

如果启用记录格式转换,您不能将 Kinesis Data Firehose 目标设置为 Amazon Elasticsearch Service (Amazon ES)、Amazon Redshift 或 Splunk。启用格式转换后,Amazon S3 是您可用于 Kinesis Data Firehose 传输流的唯一目标。

即使您在将记录发送到 Kinesis Data Firehose 之前聚合记录,也可以转换数据的格式。

选择 JSON 解串器

如果您的输入 JSON 包含采用以下格式的时间戳,请选择 OpenX JSON SerDe

  • yyyy-MM-dd'T'HH:mm:ss[.S]'Z',其中小数最多有 9 位 – 例如,2017-02-07T15:13:01.39256Z

  • yyyy-[M]M-[d]d HH:mm:ss[.S],其中小数最多有 9 位 – 例如,2017-02-07 15:13:01.14

  • 秒,以纪元格式表示 – 例如,1518033528

  • 毫秒,以纪元格式表示 – 例如,1518033528123

  • 浮点秒,以纪元格式表示 – 例如,1518033528.123

OpenX JSON SerDe 可将句点 (.) 转换为下划线 (_)。它还可以在对 JSON 键进行反串行化前将其转换为小写。有关通过 Kinesis Data Firehose 适用于此解串器的选项的更多信息,请参阅 OpenXJsonSerDe

如果您不确定要选择的解串器,请使用 OpenX JSON SerDe,除非您有其不支持的时间戳。

如果您有之前列出的格式以外的时间戳,请使用 Apache Hive JSON SerDe。选择此解串器后,您可以指定要使用的时间戳格式。为此,请遵循 Joda-Time DateTimeFormat 格式字符串的模式语法。有关更多信息,请参阅 Class DateTimeFormat

您还可以使用特殊值 millis 来解析时间戳 (毫秒,以纪元格式表示)。如果您未指定格式,Kinesis Data Firehose 默认使用 java.sql.Timestamp::valueOf

Hive JSON SerDe 不允许以下内容:

  • 列名称中的句点 (.)。

  • 类型为 uniontype 的字段。

  • 架构中具有数字类型但属于 JSON 中的字符串的字段。例如,如果架构为 (an int),而且 JSON 为 {"a":"123"},则 Hive SerDe 会出现错误。

Hive SerDe 不将嵌套 JSON 转换为字符串。例如,如果您有 {"a":{"inner":1}},它不会将 {"inner":1} 视为字符串。

选择串行器

选择的串行器取决于您的业务需求。要了解有关两个串行器选项的更多信息,请参阅 ORC SerDeParquet SerDe

转换输入记录格式 (控制台)

您可以在创建或更新 Kinesis 传输流时在控制台上启用数据格式转换。启用数据格式转换后,Amazon S3 是您可配置传输流的唯一目标。此外,启用格式转换时,将禁用 Amazon S3 压缩。但是,Snappy 压缩会作为自动转换过程的一部分自动进行。此示例中 Kinesis Data Firehose 使用的 Snappy 的构造格式与 Hadoop 兼容。这意味着,您可以使用 Snappy 压缩的结果并在 Athena 中查询此数据。有关 Hadoop 所依赖的 Snappy 构造格式,请参阅 BlockCompressorStream.java

对数据传输流启用数据格式转换

  1. 通过以下网址登录 AWS 管理控制台并打开 Kinesis Data Firehose 控制台:https://console.amazonaws.cn/firehose/

  2. 选择要更新的 Kinesis Data Firehose 传输流,或按照创建 Amazon Kinesis Data Firehose 传输流中的步骤创建新的传输流。

  3. 转换记录格式下,将 记录格式转换设置为已启用

  4. 选择想要的输出格式。有关这两个选项的更多信息,请参阅 Apache ParquetApache ORC

  5. 选择 AWS Glue 表来指定源记录的架构。设置区域、数据库、表和表版本。

转换输入记录格式 (API)

如果您希望 Kinesis Data Firehose 将输入数据的格式从 JSON 转换为 Parquet 或 ORC,请指定 ExtendedS3DestinationConfigurationExtendedS3DestinationUpdate 中可选的 DataFormatConversionConfiguration 元素。如果指定 DataFormatConversionConfiguration,则以下限制将适用:

  • BufferingHints 中,如果启用记录格式转换,则不能将 SizeInMBs 设置为小于 64 的值。此外,如果未启用格式转换,则默认值为 5。在启用格式转换时该值将变为 128。

  • 您必须将 ExtendedS3DestinationConfigurationExtendedS3DestinationUpdate 中的 CompressionFormat 设置为 UNCOMPRESSEDCompressionFormat 的默认值为 UNCOMPRESSED。因此,您还可以将其在 ExtendedS3DestinationConfiguration 中不进行指定。默认情况下,数据将使用 Snappy 压缩来作为串行化过程的一部分得到压缩。此示例中 Kinesis Data Firehose 使用的 Snappy 的构造格式与 Hadoop 兼容。这意味着,您可以使用 Snappy 压缩的结果并在 Athena 中查询此数据。有关 Hadoop 所依赖的 Snappy 构造格式,请参阅 BlockCompressorStream.java。当配置串行器时,您可以选择其他类型的压缩。

记录格式转换错误处理

当 Kinesis Data Firehose 无法解析或反串行化记录时 (例如,当数据与架构不匹配时),它会将其写入 Amazon S3,且带有错误前缀。如果此写入失败,Kinesis Data Firehose 会一直重试,同时阻止进一步传输。对于每个失败的记录,Kinesis Data Firehose 会使用以下架构写入 JSON 文档:

{ "attemptsMade": long, "arrivalTimestamp": long, "lastErrorCode": string, "lastErrorMessage": string, "attemptEndingTimestamp": long, "rawData": string, "sequenceNumber": string, "subSequenceNumber": long, "dataCatalogTable": { "catalogId": string, "databaseName": string, "tableName": string, "region": string, "versionId": string, "catalogArn": string } }