Amazon Kinesis Data Firehose
开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

Amazon Kinesis Data Firehose 数据传输

将数据发送到传输流后,它们自动传输到您选择的目标。

重要

如果使用 Kinesis 创建器库 (KPL) 将数据写入到 Kinesis 数据流,您可以使用聚合整合您写入 Kinesis 数据流的记录。如果随后将该数据流用作 Kinesis Data Firehose 传输流的来源,Kinesis Data Firehose 将在记录传送到目标之前取消聚合。如果配置传输流来转换数据,Kinesis Data Firehose 将在记录传输到 AWS Lambda 之前取消聚合。有关更多信息,请参阅 Amazon Kinesis Data Streams 开发人员指南 中的使用 Kinesis 创建器库开发 Amazon Kinesis Data Streams 创建器聚合

数据传输格式

对于到 Amazon Simple Storage Service (Amazon S3) 的数据传输,Kinesis Data Firehose 根据传输流的缓冲配置连接多个传入记录。然后,它将记录作为 Amazon S3 对象传输到 Amazon S3。在将记录发送到 Kinesis Data Firehose 之前,可能需要在每条记录的末尾添加记录分隔符,以便之后将传输完毕的 Amazon S3 对象分割成独立的记录。

对于到 Amazon Redshift 的数据传输,Kinesis Data Firehose 先按上述格式将传入数据传输到您的 S3 存储桶。然后,Kinesis Data Firehose 发出 Amazon Redshift COPY 命令,将数据从 S3 存储桶加载到 Amazon Redshift 集群中。确保在 Kinesis Data Firehose 将多条传入记录串联成一个 Amazon S3 对象后,该 Amazon S3 对象能够复制到 Amazon Redshift 集群。有关更多信息,请参阅 Amazon Redshift COPY 命令数据格式参数

对于到 Amazon ES 的数据传输,Kinesis Data Firehose 根据传输流的缓冲配置缓冲传入记录。然后,它生成一个 Elasticsearch 批量请求以将多条记录索引到 Elasticsearch 集群中。在将记录发送到 Kinesis Data Firehose 之前,确保记录采用 UTF-8 编码并平展为单行 JSON 对象。此外,必须将 Elasticsearch 集群的 rest.action.multi.allow_explicit_index 选项设置为 true(默认值),才能通过为每条记录设置的显式索引接受批量请求。有关更多信息,请参阅 Amazon Elasticsearch Service 开发人员指南 中的 Amazon ES 配置高级选项

对于到 Splunk 的数据传输,Kinesis Data Firehose 串联您发送的字节。如果要在您的数据中使用分隔符 (如换行符),您必须自行插入这些分隔符。确保将 Splunk 配置为解析任何此类分隔符。

数据传输频率

每个 Kinesis Data Firehose 目标都有自己的数据传输频率。

Amazon S3

以 Amazon S3 为目标的数据传输频率由您为传输流配置的 Amazon S3 Buffer size (缓冲区大小)Buffer interval (缓冲间隔) 值确定。Kinesis Data Firehose 缓冲传入数据,然后将其传输到 Amazon S3。您可以配置 Amazon S3 Buffer size (缓冲区大小) 的值 (1–128 MB) 或 Buffer interval (缓冲间隔) 的值(60–900 秒)。先满足的条件将触发到 Amazon S3 的数据传输操作。如果将数据传输到目标的速度落后于将数据写入到传输流的速度,Kinesis Data Firehose 将动态增加缓冲区大小以保持同步。之后,它将跟上并确保所有数据都传输到目标。

Amazon Redshift

从 Amazon S3 到 Amazon Redshift 的数据 COPY 操作频率由您的 Amazon Redshift 集群完成 COPY 命令的快慢决定。如果仍有要复制的数据,Kinesis Data Firehose 会在 Amazon Redshift 成功完成上一条 COPY 命令后立即发出新的 COPY 命令。

Amazon Elasticsearch Service

以 Amazon ES 为目标的数据传输频率由您为传输流配置的 Elasticsearch Buffer size (缓冲区大小) 和 Buffer interval (缓冲间隔) 值确定。Kinesis Data Firehose 缓冲传入数据,然后将其传输到 Amazon ES。您可以配置 Elasticsearch Buffer size (缓冲区大小) 值 (1–100 MB) 或 Buffer interval (缓冲间隔) 值 (60–900 秒),先满足的条件将触发到 Amazon ES 的数据传输。

Splunk

Kinesis Data Firehose 缓冲传入数据,然后将其传输到 Splunk。缓冲区大小为 5 MB,缓冲间隔为 60 秒。先满足的条件将触发到 Splunk 的数据传输操作。无法配置缓冲区大小和间隔。这些数字是最佳的。

数据传输故障处理流程

每个 Kinesis Data Firehose 目标都有自己的数据传输故障处理流程。

Amazon S3

到 S3 存储桶的数据传输可能会由于某些原因而失败。例如,存储桶不再存在、Kinesis Data Firehose 担任的 IAM 角色没有存储桶的访问权限、网络发生故障或发生类似的事件。在此类情况下,Kinesis Data Firehose 会保持重试 24 小时,直到传输成功为止。Kinesis Data Firehose 的最长数据存储时间为 24 小时。如果数据传输失败超过 24 小时,数据将丢失。

Amazon Redshift

对于 Amazon Redshift 目标,您可以在创建传输流时指定重试持续时间(0–7200 秒)。

到 Amazon Redshift 集群的数据传输可能会由于某些原因而失败。例如,传输流的集群配置不正确、正在维护集群或网络发生故障。在此类情况下,Kinesis Data Firehose 会重试指定的持续时间,然后跳过该特定的 Amazon S3 对象批次。跳过的对象的信息会以清单文件的形式传输到您的 S3 存储桶中的 errors/ 文件夹内,您可以利用该清单文件进行手动回填。有关如何利用清单文件手动 COPY 数据的信息,请参阅使用清单指定数据文件

Amazon Elasticsearch Service

对于 Amazon ES 目标,您可以在创建传输流时指定重试时长(0–7200 秒)。

到 Amazon ES 集群的数据传输可能会由于某些原因而失败。例如,传输流的 Amazon ES 集群配置不正确、正在维护 Amazon ES 集群、网络发生故障或发生类似的事件。在此类情况下,Kinesis Data Firehose 会重试指定的持续时间,然后跳过该特定的索引请求。跳过的文档会传输到您的 S3 存储桶中的 elasticsearch_failed/ 文件夹内,您可以利用它进行手动回填。每个文档均具有以下 JSON 格式:

{ "attemptsMade": "(number of index requests attempted)", "arrivalTimestamp": "(the time when the document was received by Firehose)", "errorCode": "(http error code returned by Elasticsearch)", "errorMessage": "(error message returned by Elasticsearch)", "attemptEndingTimestamp": "(the time when Firehose stopped attempting index request)", "esDocumentId": "(intended Elasticsearch document ID)", "esIndexName": "(intended Elasticsearch index name)", "esTypeName": "(intended Elasticsearch type name)", "rawData": "(base64-encoded document data)" }
Splunk

当 Kinesis Data Firehose 向 Splunk 发送数据时,会等待 Splunk 确认。如果出现错误或在确认超时期限内没有收到确认,Kinesis Data Firehose 将启动重试持续时间计数器。它将不断重试,直到重试持续时间到期。然后,Kinesis Data Firehose 将其视为数据传输失败,并将数据备份到 Amazon S3 存储桶中。

每次 Kinesis Data Firehose 将数据发送到 Splunk 时,无论是初始尝试还是重试,都会重新启动确认超时计数器。然后,等待 Splunk 的确认。即使重试持续时间到期,Kinesis Data Firehose 仍会等待确认,直到它收到确认或到达确认超时。如果确认超时,Kinesis Data Firehose 将进行检查以确定在重试计数器中是否有剩余时间。如果有剩余时间,它将再次重试并重复该逻辑,直到收到确认或确定重试时间已到期。

未收到确认并不是可能出现的唯一一类数据传输错误。有关其他类型的数据传输错误的信息,请参阅 Splunk 数据传输错误。如果重试持续时间大于 0,任何数据传输错误都会触发重试逻辑。

下面是一个错误记录示例。

{ "attemptsMade": 0, "arrivalTimestamp": 1506035354675, "errorCode": "Splunk.AckTimeout", "errorMessage": "Did not receive an acknowledgement from HEC before the HEC acknowledgement timeout expired. Despite the acknowledgement timeout, it's possible the data was indexed successfully in Splunk. Kinesis Firehose backs up in Amazon S3 data for which the acknowledgement timeout expired.", "attemptEndingTimestamp": 13626284715507, "rawData": "MiAyNTE2MjAyNzIyMDkgZW5pLTA1ZjMyMmQ1IDIxOC45Mi4xODguMjE0IDE3Mi4xNi4xLjE2NyAyNTIzMyAxNDMzIDYgMSA0MCAxNTA2MDM0NzM0IDE1MDYwMzQ3OTQgUkVKRUNUIE9LCg==", "EventId": "49577193928114147339600778471082492393164139877200035842.0" }

Amazon S3 对象名称格式

在将对象写入 Amazon S3 之前,Kinesis Data Firehose 会为其添加 YYYY/MM/DD/HH 格式的 UTC 时间前缀。此前缀在存储桶中创建一个逻辑层级结构,其中,每个正斜杠 (/) 均在该层级结构中创建一个层级。您可以通过指定自定义前缀来修改此结构。有关如何指定自定义前缀的信息,请参阅Amazon S3 对象的自定义前缀

Amazon S3 对象名称采用 DeliveryStreamName-DeliveryStreamVersion-YYYY-MM-DD-HH-MM-SS-RandomString 模式,其中,DeliveryStreamVersion1 开头,并在每次 Kinesis Data Firehose 传输流配置发生改变时递增 1。您可以更改传输流配置(例如,S3 存储桶名称、缓冲提示、压缩以及加密)。您可以使用 Kinesis Data Firehose 控制台或 UpdateDestination API 操作达成这一目的。

针对 Amazon ES 目标的索引轮换

对于 Amazon ES 目标,您可以指定以下五个基于时间的索引轮换选项之一:NoRotationOneHourOneDayOneWeekOneMonth

根据您选择的轮换选项,Kinesis Data Firehose 在指定的索引名称后面附加 UTC 到达时间戳的一部分,并相应地轮换附加的时间戳。以下示例说明了在 Amazon ES 中为每个索引轮换选项生成的索引名称,其中的特定索引名称为 myindex,到达时间戳为 2016-02-25T13:00:00Z

RotationPeriod IndexName
NoRotation myindex
OneHour myindex-2016-02-25-13
OneDay myindex-2016-02-25
OneWeek myindex-2016-w08
OneMonth myindex-2016-02