Amazon Data Firehose 数据传输 - 亚马逊 Data Firehose
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

亚马逊 Data Firehose 以前被称为亚马逊 Kinesis Data Firehose

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Amazon Data Firehose 数据传输

将数据发送到传输流后,它们自动传输到您选择的目标。

重要

如果使用 Kinesis Producer Library(KPL)将数据写入 Kinesis 数据流,则可以使用聚合来合并写入该 Kinesis 数据流的记录。如果您随后使用该数据流作为 Firehose 交付流的来源,Firehose 会在将记录传送到目标之前对其进行解聚处理。如果您将传输流配置为转换数据,Firehose 会在将记录传送到之前取消聚合。Amazon Lambda有关更多信息,请参阅《Amazon Kinesis Data Streams 开发人员指南》中的使用 Kinesis Producer Library 开发 Amazon Kinesis Data Streams Producer进行聚合

数据传输格式

为了向亚马逊简单存储服务 (Amazon S3) Simple S3 Service 传输数据,Firehose 会根据传输流的缓冲配置连接多条传入记录。然后,将记录作为 Amazon S3 对象传输到 Amazon S3。默认情况下,Firehose 连接的数据时不带任何分隔符。如果您想在记录之间使用新的行分隔符,则可以通过在 Firehose 控制台配置或 API 参数中启用该功能来添加新的行分隔符。

为了向 Amazon Redshift 传输数据,Firehose 首先按照前面描述的格式将传入的数据传送到你的 S3 存储桶。然后,Firehose 会发出 Amazon COPY Redshift 命令,将数据从 S3 存储桶加载到亚马逊 Redshift 预配置集群或亚马逊 Redshift 无服务器工作组。确保在 Firehose 将多条传入记录连接到 Amazon S3 对象之后,可以将亚马逊 S3 对象复制到您的 Amazon Redshift 预配置集群或 Amazon Redshift 无服务器工作组。有关更多信息,请参阅 Amazon Redshift COPY 命令数据格式参数

为了向 OpenSearch 服务和 OpenSearch 无服务器传输数据,Firehose 会根据传输流的缓冲配置来缓冲传入的记录。然后,它会生成一个 OpenSearch 服务或 OpenSearch 无服务器批量请求,将多条记录索引到您的 OpenSearch 服务集群或 OpenSearch 无服务器集合。在将记录发送到 Firehose 之前,请确保您的记录已经 UTF-8 编码并扁平化为单行 JSON 对象。此外,必须将 OpenSearch 服务集群的rest.action.multi.allow_explicit_index选项设置为 true(默认),才能使用为每条记录设置的显式索引来接受批量请求。有关更多信息,请参阅《Amazon OpenSearch 服务开发者指南》中的 OpenSearch 服务配置高级选项

为了向 Splunk 传输数据,Firehose 会连接你发送的字节。如果要在您的数据中使用分隔符(如换行符),您必须自行插入这些分隔符。确保将 Splunk 配置为解析任何此类分隔符。

在向受支持的第三方服务提供商拥有的 HTTP 端点传输数据时,您可以使用集成的 Amazon Lambda 服务创建一个函数,将传入记录转换为与服务提供商集成预期的格式匹配的格式。请联系您选择其 HTTP 端点作为目标的第三方服务提供商,了解有关他们接受的记录格式的更多信息。

数据传输频率

每个 Firehose 目标都有自己的数据传输频率。有关更多信息,请参阅 缓冲提示

Data Delivery Failure Handling

每个 Firehose 目标都有自己的数据传输失败处理方法。

Amazon S3

到 S3 存储桶的数据传输可能会由于某些原因而失败。例如,存储桶可能已不存在,Firehose 担任的 IAM 角色可能无法访问存储桶、网络出现故障或类似事件。在这种情况下,Firehose 会持续重试长达 24 小时,直到交付成功。Firehose 的最大数据存储时间为 24 小时。如果数据传输失败超过 24 小时,数据将丢失。

Amazon Redshift

对于 Amazon Redshift 目标,您可以在创建传输流时指定重试持续时间(0–7200 秒)。

将数据传输到 Amazon Redshift 预置集群或 Amazon Redshift Serverless 工作组时,可能会因为以下几个原因而失败。例如,传输流的集群配置不正确、集群或工作组正在维护或网络故障。在这些条件下,Firehose 会在指定的时间段内重试并跳过该特定批次的 Amazon S3 对象。跳过的对象的信息会以清单文件的形式传输到您的 S3 存储桶中的 errors/ 文件夹内,您可以利用该清单文件进行手动回填。有关如何使用清单文件手动 COPY 数据的信息,请参阅使用清单指定数据文件

Amazon OpenSearch 服务和 OpenSearch无服务器

对于 OpenSearch 服务和 OpenSearch 无服务器目标,您可以在创建传输流时指定重试持续时间(0—7200 秒)。

由于多种原因,向您的 OpenSearch 服务集群或 OpenSearch 无服务器集合传输数据可能会失败。例如,您的交付流的 OpenSearch 服务集群或 OpenSearch 无服务器集合配置可能不正确、 OpenSearch 服务集群或 OpenSearch 无服务器集合处于维护状态、网络故障或类似事件。在这些条件下,Firehose 会在指定的持续时间内重试,然后跳过该特定的索引请求。跳过的文档会传输到您的 S3 存储桶中的 AmazonOpenSearchService_failed/ 文件夹内,您可以利用它进行手动回填。

对于 OpenSearch 服务,每个文档都具有以下 JSON 格式:

{ "attemptsMade": "(number of index requests attempted)", "arrivalTimestamp": "(the time when the document was received by Firehose)", "errorCode": "(http error code returned by OpenSearch Service)", "errorMessage": "(error message returned by OpenSearch Service)", "attemptEndingTimestamp": "(the time when Firehose stopped attempting index request)", "esDocumentId": "(intended OpenSearch Service document ID)", "esIndexName": "(intended OpenSearch Service index name)", "esTypeName": "(intended OpenSearch Service type name)", "rawData": "(base64-encoded document data)" }

对于 OpenSearch 无服务器,每个文档都采用以下 JSON 格式:

{ "attemptsMade": "(number of index requests attempted)", "arrivalTimestamp": "(the time when the document was received by Firehose)", "errorCode": "(http error code returned by OpenSearch Serverless)", "errorMessage": "(error message returned by OpenSearch Serverless)", "attemptEndingTimestamp": "(the time when Firehose stopped attempting index request)", "osDocumentId": "(intended OpenSearch Serverless document ID)", "osIndexName": "(intended OpenSearch Serverless index name)", "rawData": "(base64-encoded document data)" }
Splunk

当 Firehose 向 Splunk 发送数据时,它会等待 Splunk 的确认。如果发生错误,或者确认未在确认超时时间内到达,Firehose 会启动重试持续时间计数器。它将不断重试,直到重试持续时间到期。之后,Firehose 认为这是数据传输失败,并将数据备份到您的 Amazon S3 存储桶中。

每次 Firehose 向 Splunk 发送数据时,无论是初次尝试还是重试,都会重新启动确认超时计数器。然后,等待 Splunk 的确认。即使重试时长到期,Firehose 仍会等待确认,直到收到确认或达到确认超时为止。如果确认超时,Firehose 会进行检查以确定重试计数器中是否还有剩余时间。如果有剩余时间,它将再次重试并重复该逻辑,直到收到确认或确定重试时间已到期。

未收到确认并不是可能出现的唯一一类数据传输错误。有关其他类型的数据传输错误的信息,请参阅 Splunk 数据传输错误。如果重试持续时间大于 0,任何数据传输错误都会触发重试逻辑。

下面是一个错误记录示例。

{ "attemptsMade": 0, "arrivalTimestamp": 1506035354675, "errorCode": "Splunk.AckTimeout", "errorMessage": "Did not receive an acknowledgement from HEC before the HEC acknowledgement timeout expired. Despite the acknowledgement timeout, it's possible the data was indexed successfully in Splunk. Kinesis Firehose backs up in Amazon S3 data for which the acknowledgement timeout expired.", "attemptEndingTimestamp": 13626284715507, "rawData": "MiAyNTE2MjAyNzIyMDkgZW5pLTA1ZjMyMmQ1IDIxOC45Mi4xODguMjE0IDE3Mi4xNi4xLjE2NyAyNTIzMyAxNDMzIDYgMSA0MCAxNTA2MDM0NzM0IDE1MDYwMzQ3OTQgUkVKRUNUIE9LCg==", "EventId": "49577193928114147339600778471082492393164139877200035842.0" }
HTTP 端点目标

当 Firehose 向 HTTP 终端节点目标发送数据时,它会等待来自该目标的响应。如果发生错误,或者响应未在响应超时时间内到达,Firehose 会启动重试持续时间计数器。它将不断重试,直到重试持续时间到期。之后,Firehose 认为这是数据传输失败,并将数据备份到您的 Amazon S3 存储桶中。

每次 Firehose 向 HTTP 端点目标发送数据时,无论是初次尝试还是重试,都会重新启动响应超时计数器。然后,等待来自 HTTP 端点目标的响应。即使重试持续时间到期,Firehose 仍会等待响应,直到收到响应或达到响应超时为止。如果响应超时,Firehose 会进行检查以确定重试计数器中是否还有剩余时间。如果还有时间,会再次重试并重复逻辑,直到收到响应或确定重试时间已过期。

未收到响应并不是唯一可能发生的数据传输错误类型。有关其他类型的数据传输错误的信息,请参阅 HTTP 端点数据传输错误

下面是一个错误记录示例。

{ "attemptsMade":5, "arrivalTimestamp":1594265943615, "errorCode":"HttpEndpoint.DestinationException", "errorMessage":"Received the following response from the endpoint destination. {"requestId": "109777ac-8f9b-4082-8e8d-b4f12b5fc17b", "timestamp": 1594266081268, "errorMessage": "Unauthorized"}", "attemptEndingTimestamp":1594266081318, "rawData":"c2FtcGxlIHJhdyBkYXRh", "subsequenceNumber":0, "dataId":"49607357361271740811418664280693044274821622880012337186.0" }

Amazon S3 对象名称格式

在将对象写入 Amazon S3 之前,Firehose 会以该格式添加一个 UTC 时间YYYY/MM/dd/HH前缀。此前缀在存储桶中创建一个逻辑层级结构,其中,每个正斜杠 (/) 均在该层级结构中创建一个层级。您可以通过指定自定义前缀来修改此结构。有关如何指定自定义前缀的信息,请参阅Amazon S3 对象的自定义前缀

Amazon S3 对象名称遵循以下模式DeliveryStreamName-DeliveryStreamVersion-YYYY-MM-dd-HH-MM-SS-RandomString,即DeliveryStreamVersion以 Firehose 传输流的每一次配置更改开头1并增加 1。您可以更改传输流配置(例如,S3 存储桶名称、缓冲提示、压缩以及加密)。您可以使用 Firehose 控制台或 UpdateDestinationAPI 操作来实现此目的。

OpenSearch 服务目标的索引轮换

对于 OpenSearch 服务目标,您可以从以下五个选项之一中指定基于时间的索引轮换选项:NoRotationOneHourOneDayOneWeek、或OneMonth

根据您选择的轮换选项,Firehose 会将 UTC 到达时间戳的一部分附加到您指定的索引名称中。并相应地轮换附加的时间戳。以下示例显示了 S OpenSearch ervice 中每个索引轮换选项生成的索引名称,其中指定的索引名称为myindex,到达时间戳为2016-02-25T13:00:00Z

RotationPeriod IndexName
NoRotation myindex
OneHour myindex-2016-02-25-13
OneDay myindex-2016-02-25
OneWeek myindex-2016-w08
OneMonth myindex-2016-02
注意

使用 OneWeek 选项时,Data Firehose 会采用 <YEAR>-w<WEEK NUMBER> 格式(例如 2020-w33)自动创建索引,其中周数是使用 UTC 时间并根据以下美国惯例计算的:

  • 一周从星期日开始

  • 一年的第一周是指这一年中包含星期六的第一周

针对 HTTP 端点目标的跨 Amazon 账户和跨 Amazon 区域传输

Kinesis Data Firehose 支持跨 Amazon 账户将数据传输到 HTTP 端点目标。Kinesis Data Firehose 传输流和您选择作为目标的 HTTP 端点可以位于不同的 Amazon 账户。

Kinesis Data Firehose 还支持跨 Amazon 区域将数据传输到 HTTP 端点目标。您可以将数据从一个 Amazon 区域中的传输流传输到另一 Amazon 区域中的 HTTP 端点。您还可以将数据从传输流传输到 Amazon 区域之外的 HTTP 端点目标,例如将 HTTP 端点 URL 设置为所需的目标,将数据传输到您自己的本地服务器。在这些情况下,您的传输成本将增加额外的传输费用。有关更多信息,请参阅“按需定价”页面中的数据传输部分。

重复的记录

Kinesis Data at-least-once Firehose 使用语义进行数据传输。在某些情况下(比如数据传输超时),如果原始数据传输请求最终通过,Kinesis Data Firehose 的传输重试可能会导致重复。这适用于 Kinesis Data Firehose 支持的所有目标类型。

如何暂停和恢复 Firehose 直播直播

在 Firehose 中设置传送流后,流源中的可用数据将持续传送到目标。如果遇到流目标暂时不可用的情况(例如,在计划维护操作期间),您可能需要暂时暂停数据传输,并在目标位置再次可用时恢复。下面几节介绍了操作方法:

重要

当您使用下述方法暂停和恢复直播时,在恢复直播之后,您会看到很少有记录被传送到 Amazon S3 中的错误存储桶,而其余数据流继续传送到目的地。这是该方法的已知局限性,之所以发生这种情况,是因为有少量记录在多次重试后无法传送到目的地,被追踪为失败。

了解 Firehose 如何处理交付失败

在 Firehose 中设置传输流时,对于许多目的地 OpenSearch,例如 Splunk 和 HTTP 终端节点,您还可以设置一个 S3 存储桶,用于备份未能交付的数据。如需详细了解 Firehose 如何在交付失败时备份数据,请参阅数据传输失败处理。有关如何授予访问权限以备份未能交付的数据的 S3 存储桶的更多信息,请参阅授予 Firehose 对 Amazon S3 目标的访问权限。当 Firehose (a) 无法将数据传送到流目标,以及 (b) 由于传输失败而无法将数据写入备份 S3 存储桶时,它实际上会暂停流传输,直到数据可以传输到目标或写入备份 S3 位置。

暂停 Firehose 直播直播

要在 Firehose 中暂停直播传输,请先移除 Firehose 在传输失败时写入 S3 备份位置的权限。例如,如果您想暂停带有 OpenSearch 目标的传送流,则可以通过更新权限来实现。有关更多信息,请参阅授予 Firehose 访问公共 OpenSearch 服务目标的权限。

删除 s3:PutObject 操作的 "Effect": "Allow" 权限,并显式添加一条语句,该语句对用于备份失败传输的 S3 存储桶的 s3:PutObject 操作应用 Effect": "Deny" 权限。接下来,关闭直播目标(例如,关闭目标 OpenSearch 网域),或者移除 Firehose 写入目标的权限。要更新其他目的地的权限,请在 “使用 Amazon Firehose 控制访问权限” 中查看您的目的地部分。完成这两个操作后,Firehose 将停止传输直播,你可以使用 Fi rehose 的CloudWatch 指标进行监控。

重要

在 Firehose 中暂停直播时,您需要确保将流源(例如,在 Kinesis Data Streams 或 Kafka 的托管服务中)配置为保留数据,直到恢复流传输并将数据传输到目标为止。如果来源是 DirectPut,Firehose 将保留数据 24 小时。如果您在数据留存期到期之前未恢复流并传输数据,则可能会发生数据丢失。

恢复 Firehose 的传送流

要恢复传送,首先要恢复之前对直播目标所做的更改,方法是打开直播目标并确保 Firehose 有权将直播传送到目的地。接下来,还原之前对应用于 S3 存储桶的权限所做的更改,以备份失败的传输。也就是说,应用 s3:PutObject 操作 "Effect": "Allow" 的权限,并删除 s3:PutObject 操作的 "Effect": "Deny" 权限,后者供 S3 存储桶用于备份失败的传输。最后,使用 Firehose 的CloudWatch 指标进行监控,以确认直播已传送到目的地。要查看错误并对其进行故障排除,请使用适用于 Firehose 的亚马逊 CloudWatch 日志监控