Amazon Data Firehose 数据传输 - Amazon Data Firehose
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

亚马逊 Data Firehose 以前被称为亚马逊 Kinesis Data Firehose

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Amazon Data Firehose 数据传输

数据发送到您的 Firehose 直播后,会自动传送到您选择的目的地。

重要

如果使用 Kinesis Producer Library(KPL)将数据写入 Kinesis 数据流,则可以使用聚合来合并写入该 Kinesis 数据流的记录。如果您随后使用该数据流作为 Firehose 数据流的来源,Amazon Data Firehose 会在将记录传送到目标之前对其进行解聚处理。如果您将 Firehose 流配置为转换数据,Amazon Data Firehose 会在将记录传送到之前对其进行解聚处理。 Amazon Lambda有关更多信息,请参阅《Amazon Kinesis Data Streams 开发人员指南》中的使用 Kinesis Producer Library 开发 Amazon Kinesis Data Streams Producer进行聚合

数据传输格式

为了向亚马逊简单存储服务 (Amazon S3) Service 传输数据,Firehose 会根据传输流的缓冲配置连接多条传入记录。然后,将记录作为 Amazon S3 对象传输到 Amazon S3。默认情况下,Firehose 连接的数据时不带任何分隔符。如果要在记录之间使用新的行分隔符,则可以通过在 Firehoseconsol e 配置或 API 参数中启用该功能来添加新的行分隔符。

为了向 Amazon Redshift 传输数据,Firehose 首先按照前面描述的格式将传入的数据传送到你的 S3 存储桶。然后,Firehose 会发出 Amazon COPY Redshift 命令,将数据从 S3 存储桶加载到亚马逊 Redshift 预配置集群或亚马逊 Redshift 无服务器工作组。确保在 Amazon Data Firehose 将多个传入记录连接到 Amazon S3 对象之后,可以将亚马逊 S3 对象复制到您的 Amazon Redshift 预配置集群或亚马逊 Redshift 无服务器工作组。有关更多信息,请参阅 Amazon Redshift COPY 命令数据格式参数

为了向 OpenSearch 服务和 OpenSearch 无服务器传输数据,Amazon Data Firehose 会根据您的 Firehose 流的缓冲配置来缓冲传入的记录。然后,它会生成一个 OpenSearch 服务或 OpenSearch 无服务器批量请求,将多条记录索引到您的 OpenSearch 服务集群或 OpenSearch 无服务器集合。在将记录发送到 Amazon Data Firehose 之前,请确保您的记录已经 UTF-8 编码并扁平化为单行 JSON 对象。此外,必须将 OpenSearch 服务集群的rest.action.multi.allow_explicit_index选项设置为 true(默认),才能使用为每条记录设置的显式索引来接受批量请求。有关更多信息,请参阅《Amazon OpenSearch 服务开发者指南》中的 OpenSearch 服务配置高级选项

为了向 Splunk 传输数据,Amazon Data Firehose 会连接您发送的字节。如果要在您的数据中使用分隔符(如换行符),您必须自行插入这些分隔符。确保将 Splunk 配置为解析任何此类分隔符。

在向受支持的第三方服务提供商拥有的 HTTP 端点传输数据时,您可以使用集成的 Amazon Lambda 服务创建一个函数,将传入记录转换为与服务提供商集成预期的格式匹配的格式。请联系您选择其 HTTP 端点作为目标的第三方服务提供商,了解有关他们接受的记录格式的更多信息。

数据传输频率

每个 Firehose 目标都有自己的数据传输频率。有关更多信息,请参阅缓冲提示

Data Delivery Failure Handling

每个 Amazon Data Firehose 目标都有自己的数据传输失败处理方法。

Amazon S3

到 S3 存储桶的数据传输可能会由于某些原因而失败。例如,该存储桶可能已不存在,Amazon Data Firehose 担任的 IAM 角色可能无法访问该存储桶、网络出现故障或类似事件。在这种情况下,Amazon Data Firehose 会持续重试长达 24 小时,直到交付成功。Amazon Data Firehose 的最长数据存储时间为 24 小时。如果数据传输失败超过 24 小时,数据将丢失。

Amazon Redshift

对于亚马逊 Redshift 目标,您可以在创建 Firehose 直播时指定重试时长(0—7200 秒)。

将数据传输到 Amazon Redshift 预置集群或 Amazon Redshift Serverless 工作组时,可能会因为以下几个原因而失败。例如,您的 Firehose 直播的集群配置可能不正确、集群或工作组处于维护状态,或者网络出现故障。在这种情况下,Amazon Data Firehose 会在指定的时间段内重试并跳过该特定批次的 Amazon S3 对象。跳过的对象的信息会以清单文件的形式传输到您的 S3 存储桶中的 errors/ 文件夹内,您可以利用该清单文件进行手动回填。有关如何使用清单文件手动 COPY 数据的信息,请参阅使用清单指定数据文件

Amazon OpenSearch 服务和 OpenSearch无服务器

对于 OpenSearch 服务和 OpenSearch 无服务器目标,您可以在创建传输流时指定重试持续时间(0—7200 秒)。

由于多种原因,向您的 OpenSearch 服务集群或 OpenSearch 无服务器集合传输数据可能会失败。例如,您的 Firehose 流可能存在错误的 OpenSearch 服务集群或 OpenSearch 无服务器集合配置、 OpenSearch 正在维护的服务集群 OpenSearch 或无服务器集合、网络故障或类似事件。在这种情况下,Amazon Data Firehose 会在指定的时间段内重试,然后跳过该特定的索引请求。跳过的文档会传输到您的 S3 存储桶中的 AmazonOpenSearchService_failed/ 文件夹内,您可以利用它进行手动回填。

对于 OpenSearch 服务,每个文档都具有以下 JSON 格式:

{ "attemptsMade": "(number of index requests attempted)", "arrivalTimestamp": "(the time when the document was received by Firehose)", "errorCode": "(http error code returned by OpenSearch Service)", "errorMessage": "(error message returned by OpenSearch Service)", "attemptEndingTimestamp": "(the time when Firehose stopped attempting index request)", "esDocumentId": "(intended OpenSearch Service document ID)", "esIndexName": "(intended OpenSearch Service index name)", "esTypeName": "(intended OpenSearch Service type name)", "rawData": "(base64-encoded document data)" }

对于 OpenSearch 无服务器,每个文档都采用以下 JSON 格式:

{ "attemptsMade": "(number of index requests attempted)", "arrivalTimestamp": "(the time when the document was received by Firehose)", "errorCode": "(http error code returned by OpenSearch Serverless)", "errorMessage": "(error message returned by OpenSearch Serverless)", "attemptEndingTimestamp": "(the time when Firehose stopped attempting index request)", "osDocumentId": "(intended OpenSearch Serverless document ID)", "osIndexName": "(intended OpenSearch Serverless index name)", "rawData": "(base64-encoded document data)" }
Splunk

当 Amazon Data Firehose 向 Splunk 发送数据时,它会等待 Splunk 的确认。如果发生错误,或者确认未在确认超时时间内到达,Amazon Data Firehose 会启动重试持续时间计数器。它将不断重试,直到重试持续时间到期。之后,Amazon Data Firehose 认为这是数据传输失败,并将数据备份到您的亚马逊 S3 存储桶。

每次 Amazon Data Firehose 向 Splunk 发送数据时,无论是初次尝试还是重试,都会重新启动确认超时计数器。然后,等待 Splunk 的确认。即使重试持续时间过期,Amazon Data Firehose 仍会等待确认,直到收到确认或达到确认超时为止。如果确认超时,Amazon Data Firehose 会进行检查以确定重试计数器中是否还有剩余时间。如果有剩余时间,它将再次重试并重复该逻辑,直到收到确认或确定重试时间已到期。

未收到确认并不是可能出现的唯一一类数据传输错误。有关其他类型的数据传输错误的信息,请参阅 Splunk 数据传输错误。如果重试持续时间大于 0,任何数据传输错误都会触发重试逻辑。

下面是一个错误记录示例。

{ "attemptsMade": 0, "arrivalTimestamp": 1506035354675, "errorCode": "Splunk.AckTimeout", "errorMessage": "Did not receive an acknowledgement from HEC before the HEC acknowledgement timeout expired. Despite the acknowledgement timeout, it's possible the data was indexed successfully in Splunk. Amazon Data Firehose backs up in Amazon S3 data for which the acknowledgement timeout expired.", "attemptEndingTimestamp": 13626284715507, "rawData": "MiAyNTE2MjAyNzIyMDkgZW5pLTA1ZjMyMmQ1IDIxOC45Mi4xODguMjE0IDE3Mi4xNi4xLjE2NyAyNTIzMyAxNDMzIDYgMSA0MCAxNTA2MDM0NzM0IDE1MDYwMzQ3OTQgUkVKRUNUIE9LCg==", "EventId": "49577193928114147339600778471082492393164139877200035842.0" }
HTTP 端点目标

当 Amazon Data Firehose 向 HTTP 终端节点目标发送数据时,它会等待该目标的响应。如果发生错误,或者响应未在响应超时时间内到达,Amazon Data Firehose 会启动重试持续时间计数器。它将不断重试,直到重试持续时间到期。之后,Amazon Data Firehose 认为这是数据传输失败,并将数据备份到您的亚马逊 S3 存储桶。

每次 Amazon Data Firehose 向 HTTP 终端节点目标发送数据时,无论是初次尝试还是重试,都会重新启动响应超时计数器。然后,等待来自 HTTP 端点目标的响应。即使重试持续时间过期,Amazon Data Firehose 仍会等待响应,直到收到响应或达到响应超时为止。如果响应超时,Amazon Data Firehose 会进行检查以确定重试计数器中是否还有剩余时间。如果还有时间,会再次重试并重复逻辑,直到收到响应或确定重试时间已过期。

未收到响应并不是唯一可能发生的数据传输错误类型。有关其他类型的数据传输错误的信息,请参阅 HTTP 端点数据传输错误

下面是一个错误记录示例。

{ "attemptsMade":5, "arrivalTimestamp":1594265943615, "errorCode":"HttpEndpoint.DestinationException", "errorMessage":"Received the following response from the endpoint destination. {"requestId": "109777ac-8f9b-4082-8e8d-b4f12b5fc17b", "timestamp": 1594266081268, "errorMessage": "Unauthorized"}", "attemptEndingTimestamp":1594266081318, "rawData":"c2FtcGxlIHJhdyBkYXRh", "subsequenceNumber":0, "dataId":"49607357361271740811418664280693044274821622880012337186.0" }

Amazon S3 对象名称格式

<uuid><file extension><delivery stream version>当 Firehose 向 Amazon S3 传输数据时,S3 对象密钥名称遵循以下格式,<evaluated prefix><suffix>其中后缀的格式为 ------------------------------------------------------------------------------- <delivery stream name><delivery stream version><year><month><day><hour><minute><second> 您可以更改传输流配置(例如,S3 存储桶名称、缓冲提示、压缩以及加密)。您可以使用 Firehose 控制台或 UpdateDestinationAPI 操作来实现此目的。

对于<evaluated prefix>,Firehose 在格式中添加了默认的时间前缀。YYYY/MM/dd/HH此前缀在存储桶中创建逻辑层次结构,其中每个正斜杠 (/) 在层次结构中创建一个级别。您可以通过指定包含在运行时计算的表达式的自定义前缀来修改此结构。有关如何指定自定义前缀的信息,请参阅 Amazon 简单存储服务对象的自定义前缀

默认情况下,用于时间前缀的时区为 UTC,但您可以将其更改为自己喜欢的时区。例如,如果您想使用日本标准时间而不是 UTC,则可以在 Amazon Web Services Management Console 或 API 参数设置 (CustomTimeZone) 中将时区配置为亚洲/东京。以下列表包含支持在 Firehose 中配置 S3 前缀的时区:

Africa
Africa/Abidjan Africa/Accra Africa/Addis_Ababa Africa/Algiers Africa/Asmera Africa/Bangui Africa/Banjul Africa/Bissau Africa/Blantyre Africa/Bujumbura Africa/Cairo Africa/Casablanca Africa/Conakry Africa/Dakar Africa/Dar_es_Salaam Africa/Djibouti Africa/Douala Africa/Freetown Africa/Gaborone Africa/Harare Africa/Johannesburg Africa/Kampala Africa/Khartoum Africa/Kigali Africa/Kinshasa Africa/Lagos Africa/Libreville Africa/Lome Africa/Luanda Africa/Lubumbashi Africa/Lusaka Africa/Malabo Africa/Maputo Africa/Maseru Africa/Mbabane Africa/Mogadishu Africa/Monrovia Africa/Nairobi Africa/Ndjamena Africa/Niamey Africa/Nouakchott Africa/Ouagadougou Africa/Porto-Novo Africa/Sao_Tome Africa/Timbuktu Africa/Tripoli Africa/Tunis Africa/Windhoek
America
America/Adak America/Anchorage America/Anguilla America/Antigua America/Aruba America/Asuncion America/Barbados America/Belize America/Bogota America/Buenos_Aires America/Caracas America/Cayenne America/Cayman America/Chicago America/Costa_Rica America/Cuiaba America/Curacao America/Dawson_Creek America/Denver America/Dominica America/Edmonton America/El_Salvador America/Fortaleza America/Godthab America/Grand_Turk America/Grenada America/Guadeloupe America/Guatemala America/Guayaquil America/Guyana America/Halifax America/Havana America/Indianapolis America/Jamaica America/La_Paz America/Lima America/Los_Angeles America/Managua America/Manaus America/Martinique America/Mazatlan America/Mexico_City America/Miquelon America/Montevideo America/Montreal America/Montserrat America/Nassau America/New_York America/Noronha America/Panama America/Paramaribo America/Phoenix America/Port_of_Spain America/Port-au-Prince America/Porto_Acre America/Puerto_Rico America/Regina America/Rio_Branco America/Santiago America/Santo_Domingo America/Sao_Paulo America/Scoresbysund America/St_Johns America/St_Kitts America/St_Lucia America/St_Thomas America/St_Vincent America/Tegucigalpa America/Thule America/Tijuana America/Tortola America/Vancouver America/Winnipeg
Antarctica
Antarctica/Casey Antarctica/DumontDUrville Antarctica/Mawson Antarctica/McMurdo Antarctica/Palmer
Asia
Asia/Aden Asia/Almaty Asia/Amman Asia/Anadyr Asia/Aqtau Asia/Aqtobe Asia/Ashgabat Asia/Ashkhabad Asia/Baghdad Asia/Bahrain Asia/Baku Asia/Bangkok Asia/Beirut Asia/Bishkek Asia/Brunei Asia/Calcutta Asia/Colombo Asia/Dacca Asia/Damascus Asia/Dhaka Asia/Dubai Asia/Dushanbe Asia/Hong_Kong Asia/Irkutsk Asia/Jakarta Asia/Jayapura Asia/Jerusalem Asia/Kabul Asia/Kamchatka Asia/Karachi Asia/Katmandu Asia/Krasnoyarsk Asia/Kuala_Lumpur Asia/Kuwait Asia/Macao Asia/Magadan Asia/Manila Asia/Muscat Asia/Nicosia Asia/Novosibirsk Asia/Phnom_Penh Asia/Pyongyang Asia/Qatar Asia/Rangoon Asia/Riyadh Asia/Saigon Asia/Seoul Asia/Shanghai Asia/Singapore Asia/Taipei Asia/Tashkent Asia/Tbilisi Asia/Tehran Asia/Thimbu Asia/Thimphu Asia/Tokyo Asia/Ujung_Pandang Asia/Ulaanbaatar Asia/Ulan_Bator Asia/Vientiane Asia/Vladivostok Asia/Yakutsk Asia/Yekaterinburg Asia/Yerevan
Atlantic
Atlantic/Azores Atlantic/Bermuda Atlantic/Canary Atlantic/Cape_Verde Atlantic/Faeroe Atlantic/Jan_Mayen Atlantic/Reykjavik Atlantic/South_Georgia Atlantic/St_Helena Atlantic/Stanley
Australia
Australia/Adelaide Australia/Brisbane Australia/Broken_Hill Australia/Darwin Australia/Hobart Australia/Lord_Howe Australia/Perth Australia/Sydney
Europe
Europe/Amsterdam Europe/Andorra Europe/Athens Europe/Belgrade Europe/Berlin Europe/Brussels Europe/Bucharest Europe/Budapest Europe/Chisinau Europe/Copenhagen Europe/Dublin Europe/Gibraltar Europe/Helsinki Europe/Istanbul Europe/Kaliningrad Europe/Kiev Europe/Lisbon Europe/London Europe/Luxembourg Europe/Madrid Europe/Malta Europe/Minsk Europe/Monaco Europe/Moscow Europe/Oslo Europe/Paris Europe/Prague Europe/Riga Europe/Rome Europe/Samara Europe/Simferopol Europe/Sofia Europe/Stockholm Europe/Tallinn Europe/Tirane Europe/Vaduz Europe/Vienna Europe/Vilnius Europe/Warsaw Europe/Zurich
Indian
Indian/Antananarivo Indian/Chagos Indian/Christmas Indian/Cocos Indian/Comoro Indian/Kerguelen Indian/Mahe Indian/Maldives Indian/Mauritius Indian/Mayotte Indian/Reunion
Pacific
Pacific/Apia Pacific/Auckland Pacific/Chatham Pacific/Easter Pacific/Efate Pacific/Enderbury Pacific/Fakaofo Pacific/Fiji Pacific/Funafuti Pacific/Galapagos Pacific/Gambier Pacific/Guadalcanal Pacific/Guam Pacific/Honolulu Pacific/Kiritimati Pacific/Kosrae Pacific/Majuro Pacific/Marquesas Pacific/Nauru Pacific/Niue Pacific/Norfolk Pacific/Noumea Pacific/Pago_Pago Pacific/Palau Pacific/Pitcairn Pacific/Ponape Pacific/Port_Moresby Pacific/Rarotonga Pacific/Saipan Pacific/Tahiti Pacific/Tarawa Pacific/Tongatapu Pacific/Truk Pacific/Wake Pacific/Wallis

<file extension>除此之外,您不能更改后缀字段。启用数据格式转换或压缩后,Firehose 将根据配置附加文件扩展名。下表说明了 Firehose 附加的默认文件扩展名:

配置 文件扩展名
数据格式转换:实木复合地板 . 实木复合地板
数据格式转换:ORC .orc
压缩:Gzip .gz
压缩:Zip .zip
压缩:Snappy .snappy
压缩:Hadoop-Snappy .hsnappy

您还可以在 Firehose 控制台或 API 中指定自己喜欢的文件扩展名。文件扩展名必须以句点 (.) 开头,并且可以包含允许的字符:0-9a-z! -_.*' ()。文件扩展名不能超过 128 个字符。

注意

当您指定文件扩展名时,它将覆盖 Firehose 在启用数据格式转换或压缩时添加的默认文件扩展名。

OpenSearch 服务目标的索引轮换

对于 OpenSearch 服务目标,您可以从以下五个选项之一中指定基于时间的索引轮换选项:NoRotationOneHourOneDayOneWeek、或OneMonth

根据您选择的轮换选项,Amazon Data Firehose 会将UTC到达时间戳的一部分附加到您指定的索引名称中。并相应地轮换附加的时间戳。以下示例显示了 S OpenSearch ervice 中每个索引轮换选项生成的索引名称,其中指定的索引名称为myindex,到达时间戳为2016-02-25T13:00:00Z

RotationPeriod IndexName
NoRotation myindex
OneHour myindex-2016-02-25-13
OneDay myindex-2016-02-25
OneWeek myindex-2016-w08
OneMonth myindex-2016-02
注意

使用 OneWeek 选项时,Data Firehose 会采用 <YEAR>-w<WEEK NUMBER> 格式(例如 2020-w33)自动创建索引,其中周数是使用 UTC 时间并根据以下美国惯例计算的:

  • 一周从星期日开始

  • 一年的第一周是指这一年中包含星期六的第一周

HTTP 终端节点目标跨 Amazon 账户和跨 Amazon 区域交付

Amazon Data Firehose 支持跨 Amazon 账户向 HTTP 终端节点目标传输数据。Amazon Data Firehose Firehose 流和你选择作为目标的 HTTP 终端节点可以位于不同的账户中。 Amazon

Amazon Data Firehose 还支持跨 Amazon 区域向 HTTP 终端节点目标传输数据。您可以将数据从一个区域的 Firehose 流传输到另一个 Amazon Amazon 区域的 HTTP 终端节点。您还可以将数据从 Firehose 流传输到 Amazon 区域以外的 HTTP 终端节点目标,例如通过将 HTTP 端点网址设置为所需目的地,将数据传输到您自己的本地服务器。在这些情况下,您的传输成本将增加额外的传输费用。有关更多信息,请参阅“按需定价”页面中的数据传输部分。

重复的记录

Amazon Data Firehose 使用 at-least-once 语义进行数据传输。在某些情况下,例如当数据传输超时时,如果原始数据传输请求最终通过,Amazon Data Firehose 的传送重试可能会引入重复项。这适用于 Amazon Data Firehose 支持的所有目标类型。

如何暂停和恢复 Firehose 直播直播

在 Firehose 中设置传送流后,流源中的可用数据将持续传送到目标。如果遇到流目标暂时不可用的情况(例如,在计划维护操作期间),您可能需要暂时暂停数据传输,并在目标位置再次可用时恢复。下面几节介绍了操作方法:

重要

当您使用下述方法暂停和恢复直播时,在恢复直播之后,您会看到很少有记录被传送到 Amazon S3 中的错误存储桶,而其余数据流继续传送到目的地。这是该方法的已知局限性,之所以发生这种情况,是因为有少量记录在多次重试后无法传送到目的地,被追踪为失败。

了解 Firehose 如何处理交付失败

在 Firehose 中设置传输流时,对于许多目的地 OpenSearch,例如 Splunk 和 HTTP 终端节点,您还可以设置一个 S3 存储桶,用于备份未能交付的数据。如需详细了解 Firehose 如何在交付失败时备份数据,请参阅数据传输失败处理。有关如何授予访问权限以备份未能交付的数据的 S3 存储桶的更多信息,请参阅授予 Firehose 对 Amazon S3 目标的访问权限。当 Firehose (a) 无法将数据传送到流目标,以及 (b) 由于传输失败而无法将数据写入备份 S3 存储桶时,它实际上会暂停流传输,直到数据可以传输到目标或写入备份 S3 位置。

暂停 Firehose 直播直播

要在 Firehose 中暂停直播传输,请先移除 Firehose 在传输失败时写入 S3 备份位置的权限。例如,如果您想暂停带有 OpenSearch 目标的传送流,则可以通过更新权限来实现。有关更多信息,请参阅授予 Firehose 访问公共 OpenSearch 服务目标的权限。

删除 s3:PutObject 操作的 "Effect": "Allow" 权限,并显式添加一条语句,该语句对用于备份失败传输的 S3 存储桶的 s3:PutObject 操作应用 Effect": "Deny" 权限。接下来,关闭直播目标(例如,关闭目标 OpenSearch 网域),或者移除 Firehose 写入目标的权限。要更新其他目的地的权限,请在 “使用 Amazon Data Firehose 控制访问权限” 中查看您的目的地部分。完成这两个操作后,Firehose 将停止传输直播,你可以使用 Fi rehose 的CloudWatch 指标进行监控。

重要

在 Firehose 中暂停直播时,您需要确保将流源(例如,在 Kinesis Data Streams 或 Kafka 托管服务中)配置为保留数据,直到恢复流传输并将数据传输到目标为止。如果来源是 DirectPut,Firehose 将保留数据 24 小时。如果您在数据留存期到期之前未恢复流并传输数据,则可能会发生数据丢失。

恢复 Firehose 的传送流

要恢复传送,首先要恢复之前对直播目标所做的更改,方法是打开直播目标并确保 Firehose 有权将直播传送到目的地。接下来,还原之前对应用于 S3 存储桶的权限所做的更改,以备份失败的传输。也就是说,应用 s3:PutObject 操作 "Effect": "Allow" 的权限,并删除 s3:PutObject 操作的 "Effect": "Deny" 权限,后者供 S3 存储桶用于备份失败的传输。最后,使用 Firehose 的CloudWatch 指标进行监控,以确认直播已传送到目的地。要查看错误并对其进行故障排除,请使用适用于 Firehose 的亚马逊 CloudWatch 日志监控