Amazon Kinesis Data Firehose - Amazon Kinesis Data Firehose
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Amazon Kinesis Data Firehose

将数据发送到传输流后,它们自动传输到您选择的目标。

重要

如果您使用 Kinesis Producer Library (KPL) 将数据写入 Kinesis 数据流,则可以使用聚合来合并写入该 Kinesis 数据流的记录。如果您随后使用该数据流作为 Kinesis Data Firehose 交付流的来源,则 Kinesis Data Firehose 会在将记录传送到目标之前对其进行解聚处理。如果您将传输流配置为转换数据,则 Kinesis Data Firehose 会在将记录传送到之前对其进行解聚处理Amazon Lambda。有关更多信息,请参阅 Amazon Kinesis Data Streams 开发人员指南中的使用Amazon Kinesis Data Streams 生成器

数据传输格式

为了将数据传输到 Amazon S3 传输数据,Kinesis Data Firehose 根据传输流的缓冲配置连接多个传入记录。然后它将记录以 Amazon S3 对象传输到 Amazon S3 对象。在将每条记录发送到 Kinesis Data Firehose 之前,您可能需要在每条记录的末尾添加一个记录分隔符。然后,您可以将已交付的 Amazon S3 对象划分为单个记录。

为了将数据传输到 Amazon Redshift,Kinesis Data Firehose 首先以前面描述的格式将传入的数据传输到你的 S3 存储桶。然后,Kinesis Data Firehose 发出 Amazon RedshiftCOPY 命令,将数据从你的 S3 存储桶加载到你的 Amazon Redshift 集群。确保在 Kinesis Data Firehose 将多个传入记录连接到 Amazon S3 对象之后,Amazon S3 对象可以复制到你的 Amazon Redshift 集群。有关更多信息,请参阅 Amazon Redshift COPY 命令数据格式参数

为了将数据传输到 Serv OpenSearch ice 和 OpenSearch Serverless,Kinesis Data Firehose 会根据传输流的缓冲配置来缓冲传入的记录。然后,它会生成 OpenSearch 服务或 OpenSearch 无服务器批量请求,将多条记录索引到您的 OpenSearch 服务集群或 OpenSearch Serverless 集合。在将记录发送到 Kinesis Data Firehose 之前,请确保您的记录采用 UTF-8 编码并将其展平为单行 JSON 对象。此外,必须将 OpenSearch 服务集群的rest.action.multi.allow_explicit_index选项设置为 true(默认),才能使用每条记录设置的显式索引来接受批量请求。有关更多信息,请参阅《亚马逊OpenSearch 服务开发者指南》中的 OpenSearch 服务配置高级选项

为了向 Splunk 传输数据,Kinesis Data Firehose 会将你发送的字节串联起来。如果要在您的数据中使用分隔符 (如换行符),您必须自行插入这些分隔符。确保将 Splunk 配置为解析任何此类分隔符。

向受支持的第三方服务提供商拥有的 HTTP 终端节点传输数据时,您可以使用集成的 Amazon Lambda 服务创建一个函数,将传入记录转换为与服务提供商集成期望的格式相匹配的格式。请联系您为目的地选择了 HTTP 端点的第三方服务提供商,详细了解他们接受的记录格式。

数据传输频率

每个 Kinesis Data Firehose 目的地都有自己的数据传输频率。

Amazon S3

向 Amazon S3 传输数据的频率由您为传输流配置的 Amazon S3 缓冲区大小和缓冲间隔值决定。Kinesis Data Firehose 在将传入数据传输到 Amazon S3 之前缓冲该数据。您可以配置 Amazon S3 缓冲区大小(1—128 MB)或缓冲间隔(60—900 秒)的值。满足的条件首先会触发数据传输到 Amazon S3。当向目标传输的数据落后于向传输流写入数据时,Kinesis Data Firehose 会动态增加缓冲区大小。之后,它将跟上并确保所有数据都传输到目标。

Amazon Redshift

从 Amazon S3 到 Amazon Redshift 的数据COPY操作频率取决于您的 Amazon Redshift 集群完成COPY命令的速度。如果还有数据需要复制,则 Kinesis Data Firehose 将在上一个COPY命令由 Amazon Redshift 成功完成后立即COPY发出新命令。

亚马逊 OpenSearch 服务

向 OpenSearch 服务传输数据的频率由您为传输流配置的 OpenSearch 服务缓冲区大小和缓冲间隔值决定。Kinesis Data Firehose 在将传入数据传输到 OpenSearch S3 之前缓冲该数据。您可以配置 OpenSearch 服务缓冲区大小(1—100 MB)或缓冲间隔(60—900 秒)的值,满足的条件会首先触发向 OpenSearch 服务传输数据。

亚马逊 OpenSearch 无服务器

向 OpenSearch Serverless 传输数据的频率由您为传输流配置的 OpenSearch 无服务器缓冲区大小和缓冲间隔值决定。Kinesis Data Firehose 在将传入数据传输到 OpenSearch Serverlese 之前缓冲该数据。您可以配置 OpenSearch无服务器缓冲区大小(1—100 MB)或缓冲间隔(60—900 秒)的值,满足的条件会首先触发向 OpenSearch 无服务器传输数据。

Splunk

Kinesis Data Firehose 在将传入数据传输到 Splunk 之前缓冲该数据。缓冲区大小为 5 MB,缓冲间隔为 60 秒。先满足的条件将触发到 Splunk 的数据传输操作。无法配置缓冲区大小和间隔。这些数字是最佳的。

HTTP 端点目标

Kinesis Data Firehose 在将传入数据传输到指定的 HTTP 端点目标之前缓冲该数据。目标的推荐缓冲区大小因服务提供商而异。例如,Datadog 的推荐缓冲区大小为 4,New Relic MiBs 和 Sumo Logic 的推荐缓冲区大小为 1 MiB。有关其推荐缓冲区大小的更多信息,请联系您选择其终端节点作为数据目的地的第三方服务提供商。

数据传输故障处理流程

每个 Kinesis Data Firehose 目的地都有自己的数据传输故障处理机制。

Amazon S3

到 S3 存储桶的数据传输可能会由于某些原因而失败。例如,存储桶可能已不存在,Kinesis Data Firehose 担任的 IAM 角色可能无法访问存储桶、网络故障或类似事件。在这种情况下,Kinesis Data Firehose 会持续重试长达 24 小时,直到交付成功。Kinesis Data Firehose 的最长数据存储时间为 24 小时。如果数据传输失败超过 24 小时,数据将丢失。

Amazon Redshift

对于 Amazon Redshift 目的地,您可以在创建传送流时指定重试持续时间(0—7200 秒)。

传输到 Amazon Redshift 集群可能出于几个原因而失败。例如,传输流的集群配置不正确、正在维护集群或网络发生故障。在这些情况下,Kinesis Data Firehose 会在指定的持续时间内重试,并跳过该特定批次的 Amazon S3 对象。跳过的对象的信息会以清单文件的形式传输到您的 S3 存储桶中的 errors/ 文件夹内,您可以利用该清单文件进行手动回填。有关如何使用清单文件手动复制数据的信息,请参阅使用清单指定数据文件

亚马逊 OpenSearch 服务和 OpenSearch无服务器

对于 OpenSearch 服务和 OpenSearch 无服务器目标,可以在创建传输流时指定重试持续时间(0—7200 秒)。

由于多种原因,向 OpenSearch 服务集群或 OpenSearch 无服务器集合传输数据可能会失败。例如,您的交付流的 OpenSearch 服务集群或 OpenSearch 无服务器集合配置可能不正确, OpenSearch 服务群集或 OpenSearch 无服务器集合处于维护状态,网络故障或类似事件。在这些情况下,Kinesis Data Firehose 会在指定的持续时间内重试,然后跳过该特定的索引请求。跳过的文档会传输到您的 S3 存储桶中的 AmazonOpenSearchService_failed/ 文件夹内,您可以利用它进行手动回填。

对于 OpenSearch 服务,每个文档具有以下 JSON 格式:

{ "attemptsMade": "(number of index requests attempted)", "arrivalTimestamp": "(the time when the document was received by Firehose)", "errorCode": "(http error code returned by OpenSearch Service)", "errorMessage": "(error message returned by OpenSearch Service)", "attemptEndingTimestamp": "(the time when Firehose stopped attempting index request)", "esDocumentId": "(intended OpenSearch Service document ID)", "esIndexName": "(intended OpenSearch Service index name)", "esTypeName": "(intended OpenSearch Service type name)", "rawData": "(base64-encoded document data)" }

对于 OpenSearch 无服务器,每个文档具有以下 JSON 格式:

{ "attemptsMade": "(number of index requests attempted)", "arrivalTimestamp": "(the time when the document was received by Firehose)", "errorCode": "(http error code returned by OpenSearch Serverless)", "errorMessage": "(error message returned by OpenSearch Serverless)", "attemptEndingTimestamp": "(the time when Firehose stopped attempting index request)", "osDocumentId": "(intended OpenSearch Serverless document ID)", "osIndexName": "(intended OpenSearch Serverless index name)", "rawData": "(base64-encoded document data)" }
Splunk

当 Kinesis Data Firehose 向 Splunk 发送数据时,它会等待 Splunk 的确认。如果发生错误,或者确认未在确认超时期限内到达,Kinesis Data Firehose 将启动重试持续时间计数器。它将不断重试,直到重试持续时间到期。之后,Kinesis Data Firehose 认为这是数据传输失败,并将数据备份到你的 Amazon S3 存储桶中。

每当 Kinesis Data Firehose 向 Splunk 发送数据时,无论是初次尝试还是重试,它都会重新启动确认超时计数器。然后,等待 Splunk 的确认。即使重试时间到期,Kinesis Data Firehose 仍会等待确认,直到收到确认或达到确认超时为止。如果确认超时,Kinesis Data Firehose 将检查以确定重试计数器中是否还有时间。如果有剩余时间,它将再次重试并重复该逻辑,直到收到确认或确定重试时间已到期。

未收到确认并不是可能出现的唯一一类数据传输错误。有关其他类型的数据传输错误的信息,请参阅 Splunk 数据传输错误。如果重试持续时间大于 0,任何数据传输错误都会触发重试逻辑。

下面是一个错误记录示例。

{ "attemptsMade": 0, "arrivalTimestamp": 1506035354675, "errorCode": "Splunk.AckTimeout", "errorMessage": "Did not receive an acknowledgement from HEC before the HEC acknowledgement timeout expired. Despite the acknowledgement timeout, it's possible the data was indexed successfully in Splunk. Kinesis Firehose backs up in Amazon S3 data for which the acknowledgement timeout expired.", "attemptEndingTimestamp": 13626284715507, "rawData": "MiAyNTE2MjAyNzIyMDkgZW5pLTA1ZjMyMmQ1IDIxOC45Mi4xODguMjE0IDE3Mi4xNi4xLjE2NyAyNTIzMyAxNDMzIDYgMSA0MCAxNTA2MDM0NzM0IDE1MDYwMzQ3OTQgUkVKRUNUIE9LCg==", "EventId": "49577193928114147339600778471082492393164139877200035842.0" }
HTTP 端点目标

当 Kinesis Data Firehose 将数据发送到 HTTP 端点目标时,它会等待来自该目标的响应。如果发生错误,或者响应未在响应超时期限内到达,Kinesis Data Firehose 将启动重试持续时间计数器。它将不断重试,直到重试持续时间到期。之后,Kinesis Data Firehose 认为这是数据传输失败,并将数据备份到你的 Amazon S3 存储桶中。

每当 Kinesis Data Firehose 向 HTTP 端点目标发送数据时,无论是初次尝试还是重试,它都会重新启动响应超时计数器。然后,它等待来自 HTTP 端点目标的响应。即使重试时间到期,Kinesis Data Firehose 仍会等待响应,直到收到响应或达到响应超时为止。如果响应超时,Kinesis Data Firehose 会进行检查以确定重试计数器中是否还有时间。如果还有时间,它会再次重试并重复逻辑,直到收到响应或确定重试时间已过期。

未能收到响应并不是唯一可能发生的数据传输错误。有关其他类型的数据传输错误的信息,请参阅 HTTP 端点数据传输错误

下面是一个错误记录示例。

{ "attemptsMade":5, "arrivalTimestamp":1594265943615, "errorCode":"HttpEndpoint.DestinationException", "errorMessage":"Received the following response from the endpoint destination. {"requestId": "109777ac-8f9b-4082-8e8d-b4f12b5fc17b", "timestamp": 1594266081268, "errorMessage": "Unauthorized"}", "attemptEndingTimestamp":1594266081318, "rawData":"c2FtcGxlIHJhdyBkYXRh", "subsequenceNumber":0, "dataId":"49607357361271740811418664280693044274821622880012337186.0" }

Amazon S3 对象名称格式

Kinesis Data Firehose 在将对象写入 Amazon S3YYYY/MM/dd/HH 之前以该格式添加了 UTC 时间前缀。此前缀在存储桶中创建一个逻辑层级结构,其中,每个正斜杠 (/) 均在该层级结构中创建一个层级。您可以通过指定自定义前缀来修改此结构。有关如何指定自定义前缀的信息,请参阅Amazon S3 对象的自定义前缀

Amazon S3 对象名称遵循以下模式DeliveryStreamName-DeliveryStreamVersion-YYYY-MM-dd-HH-MM-SS-RandomString,其DeliveryStreamVersion开头为1,每更改 Kinesis Data Firehose 传输流的配置都会增加 1。您可以更改传输流配置(例如,S3 存储桶名称、缓冲提示、压缩以及加密)。你可以使用 Kinesis Data Firehose 控制台或 UpdateDestinationAPI 操作来做到这一点。

OpenSearch 服务目标的索引轮换

对于 OpenSearch 服务目标,您可以从以下五个选项之一中指定基于时间的索引旋转选项:NoRotationOneHourOneDayOneWeek、或OneMonth

根据您选择的轮换选项,Kinesis Data Firehose 附加一部分 UTC 到达时间戳到您指定的索引名称。并相应地轮换附加的时间戳。以下示例显示了每个索引旋转选项在 Serv OpenSearch ice 中生成的索引名称,其中指定的索引名称为myindex,到达时间戳为2016-02-25T13:00:00Z

RotationPeriod IndexName
NoRotation myindex
OneHour myindex-2016-02-25-13
OneDay myindex-2016-02-25
OneWeek myindex-2016-w08
OneMonth myindex-2016-02
注意

使用该OneWeek选项,Data Firehose 会使用 <YEAR>-w 的格式自动创建索引<WEEK NUMBER>(例如,2020-w33),其中周数是使用 UTC 时间和以下美国惯例计算的:

  • 一周从星期日开始

  • 一年的第一周是今年包含星期六的第一周

跨Amazon账户和跨Amazon区域交付 HTTP 终端节点目标

Kinesis Data Firehose 支持跨Amazon账户向 HTTP 端点目标传输数据。Kinesis Data Firehose 传输流和您选择作为目标的 HTTP 端点可能位于不同的Amazon账户中。

Kinesis Data Firehose 还支持将数据传输到跨Amazon区域的 HTTP 端点目的地。您可以将数据从一个区域的传输流传输到另一个AmazonAmazon区域的 HTTP 终端节点。您还可以通过将 HTTP 终端节点 URL 设置为所需目的地,将数据从传输流传输到Amazon区域以外的 HTTP 终端节点目标,例如传输到您自己的本地服务器。在这些情况下,额外的数据传输费用会添加到您的配送成本中。有关更多信息,请参阅 “按需定价” 页面的数据传输部分。

重复的记录

Kinesis Data Firehose 使用 at-least-once 语义进行数据交付。在某些情况下,例如当数据传输超时时,如果原始数据传输请求最终通过,Kinesis Data Firehose 重试传输可能会引入重复内容。这适用于 Kinesis Data Firehose 支持的所有目的地类型。