在 Amazon Data Firehose 中进行动态分区 - Amazon Data Firehose
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

亚马逊 Data Firehose 以前被称为亚马逊 Kinesis Data Firehose

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Amazon Data Firehose 中进行动态分区

动态分区使您能够使用数据中的密钥(例如transaction_id或)对 Firehose 中的流数据进行持续分区customer_id,然后将按这些密钥分组的数据传送到相应的亚马逊简单存储服务 (Amazon S3) Simple S3 Service 前缀中。这使得使用各种服务(例如亚马逊雅典娜、亚马逊EMR、Amazon Redshift Spectrum 和亚马逊)可以更轻松地对亚马逊S3中的流数据进行高性能、具有成本效益的分析。 QuickSight此外,在需要额外处理的 Amazon 用例中,在动态分区的流数据传输到 Amazon S3 之后,Glue 可以执行更复杂的提取、转换和加载 (ETL) 任务。

对数据进行分区可以最大限度地减少扫描的数据量,优化性能,并降低在 Amazon S3 上进行分析查询的成本, 还可以提高对数据的精细访问。Firehose 流传统上用于捕获数据并将其加载到 Amazon S3 中。要对流数据集分区以进行基于 Amazon S3 的分析,您需要先在 Amazon S3 存储桶之间运行分区应用程序,然后才能将数据用于分析,这可能会变得复杂或昂贵。

通过动态分区,Firehose 使用动态或静态定义的数据密钥持续对传输中的数据进行分组,并按密钥将数据传输到各个 Amazon S3 前缀。这样可以 time-to-insight 减少几分钟或几小时。还可以降低成本并简化架构。

分区键

通过动态分区,您可以根据分区键对数据进行分区,从 S3 流数据创建目标数据集。分区键使您能够根据特定值筛选流数据。例如,如果您需要根据客户 ID 和国家/地区筛选数据,则可以将 customer_id 的数据字段指定为一个分区键,将 country 的数据字段指定为另一个分区键。然后,指定表达式(使用支持的格式)来定义动态分区数据记录要传输到的 S3 存储桶前缀。

以下是支持创建分区键的方法:

  • 内联解析-此方法使用 Firehose 的内置支持机制,即 jq 解析器,用于从 JSON 格式的数据记录中提取用于分区的密钥。目前,我们仅支持jq 1.6版本。

  • Amazon Lambda 函数-此方法使用指定的 Lam Amazon bda 函数提取并返回分区所需的数据字段。

重要

启用动态分区时,必须至少配置其中一种方法来对数据进行分区。您可以配置其中一种方法来指定分区键,也可以同时配置这两种方法。

使用内联解析创建分区键

要将内联解析配置为流数据的动态分区方法,必须选择要用作分区键的数据记录参数,并为每个指定的分区键提供一个值。

让我们看一下以下示例数据记录,看看如何通过内联解析为其定义分区键:

{ "type": { "device": "mobile", "event": "user_clicked_submit_button" }, "customer_id": "1234567890", "event_timestamp": 1565382027, #epoch timestamp "region": "sample_region" }

例如,您可以选择根据 customer_id 参数或 event_timestamp 参数对数据进行分区。这意味着您希望使用每条记录中的 customer_id 参数或 event_timestamp 参数的值来确定要向其传输传递记录的 S3 前缀。您也可以选择嵌套参数,例如表达式为 .type.devicedevice。您的动态分区逻辑可能取决于多个参数。

为分区键选择数据参数后,将每个参数映射到有效的 jq 表达式。下表显示了参数到 jq 表达式的映射:

参数 jq 表达式
customer_id .customer_id
device

.type.device

year

.event_timestamp| strftime("%Y")

month

.event_timestamp| strftime("%m")

day

.event_timestamp| strftime("%d")

hour

.event_timestamp| strftime("%H")

在运行时,Firehose 使用上面的右列根据每条记录中的数据来评估参数。

使用 Amazon Lambda 函数创建分区键

对于压缩或加密的数据记录,或除 JSON 之外的任何文件格式的数据,您可以使用集成的 Amazon Lambda 函数和您自己的自定义代码来解压缩、解密或转换记录,以便提取和返回分区所需的数据字段。这是对现有转换 Lambda 函数的扩展,该函数现已在 Firehose 中可用。您可以转换、解析和返回数据字段,然后使用相同的 Lambda 函数进行动态分区。

以下是 Python 中的 Firehose 流处理示例 Lambda 函数,该函数重播从输入到输出的每条读取记录,并从记录中提取分区密钥。

from __future__ import print_function import base64 import json import datetime # Signature for all Lambda functions that user must implement def lambda_handler(firehose_records_input, context): print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn'] + ", Region: " + firehose_records_input['region'] + ", and InvocationId: " + firehose_records_input['invocationId']) # Create return value. firehose_records_output = {'records': []} # Create result object. # Go through records and process them for firehose_record_input in firehose_records_input['records']: # Get user payload payload = base64.b64decode(firehose_record_input['data']) json_value = json.loads(payload) print("Record that was received") print(json_value) print("\n") # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {} event_timestamp = datetime.datetime.fromtimestamp(json_value['eventTimestamp']) partition_keys = {"customerId": json_value['customerId'], "year": event_timestamp.strftime('%Y'), "month": event_timestamp.strftime('%m'), "date": event_timestamp.strftime('%d'), "hour": event_timestamp.strftime('%H'), "minute": event_timestamp.strftime('%M') } # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {'recordId': firehose_record_input['recordId'], 'data': firehose_record_input['data'], 'result': 'Ok', 'metadata': { 'partitionKeys': partition_keys }} # Must set proper record ID # Add the record to the list of output records. firehose_records_output['records'].append(firehose_record_output) # At the end return processed records return firehose_records_output

以下是 Go 中的 Firehose 流处理示例 Lambda 函数,该函数重播从输入到输出的每条读取记录,并从记录中提取分区密钥。

package main import ( "fmt" "encoding/json" "time" "strconv" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) type DataFirehoseEventRecordData struct { CustomerId string `json:"customerId"` } func handleRequest(evnt events.DataFirehoseEvent) (events.DataFirehoseResponse, error) { fmt.Printf("InvocationID: %s\n", evnt.InvocationID) fmt.Printf("DeliveryStreamArn: %s\n", evnt.DeliveryStreamArn) fmt.Printf("Region: %s\n", evnt.Region) var response events.DataFirehoseResponse for _, record := range evnt.Records { fmt.Printf("RecordID: %s\n", record.RecordID) fmt.Printf("ApproximateArrivalTimestamp: %s\n", record.ApproximateArrivalTimestamp) var transformedRecord events.DataFirehoseResponseRecord transformedRecord.RecordID = record.RecordID transformedRecord.Result = events.DataFirehoseTransformedStateOk transformedRecord.Data = record.Data var metaData events.DataFirehoseResponseRecordMetadata var recordData DataFirehoseEventRecordData partitionKeys := make(map[string]string) currentTime := time.Now() json.Unmarshal(record.Data, &recordData) partitionKeys["customerId"] = recordData.CustomerId partitionKeys["year"] = strconv.Itoa(currentTime.Year()) partitionKeys["month"] = strconv.Itoa(int(currentTime.Month())) partitionKeys["date"] = strconv.Itoa(currentTime.Day()) partitionKeys["hour"] = strconv.Itoa(currentTime.Hour()) partitionKeys["minute"] = strconv.Itoa(currentTime.Minute()) metaData.PartitionKeys = partitionKeys transformedRecord.Metadata = metaData response.Records = append(response.Records, transformedRecord) } return response, nil } func main() { lambda.Start(handleRequest) }

用于动态分区的 Amazon S3 存储桶前缀

在创建使用亚马逊 S3 作为目标的 Firehose 流时,必须指定 Firehose 用于传输数据的亚马逊 S3 存储桶。您可以使用 Amazon S3 存储桶前缀来组织存储在 S3 存储桶中的数据。Amazon S3 存储桶前缀类似于目录,可让您将类似的对象分组在一起。

通过动态分区,您的分区数据将传输到指定的 Amazon S3 前缀。如果您未启用动态分区,则可选择为 Firehose 流指定 S3 存储桶前缀。但是,如果您选择启用动态分区,则必须指定 Firehose 向其传送分区数据的 S3 存储桶前缀。

在您启用动态分区的每个 Firehose 流中,S3 存储桶前缀值由基于该交付流的指定分区密钥的表达式组成。再次使用上面的数据记录示例,您可以构建以下 S3 前缀值,该值由基于上面定义的分区键的表达式组成:

"ExtendedS3DestinationConfiguration": { "BucketARN": "arn:aws:s3:::my-logs-prod", "Prefix": "customer_id=!{partitionKeyFromQuery:customer_id}/ device=!{partitionKeyFromQuery:device}/ year=!{partitionKeyFromQuery:year}/ month=!{partitionKeyFromQuery:month}/ day=!{partitionKeyFromQuery:day}/ hour=!{partitionKeyFromQuery:hour}/" }

Firehose 在运行时计算上述表达式。将匹配相同计算的 S3 前缀表达式的记录分组到一个数据集中。然后,Firehose 会将每个数据集传送到评估后的 S3 前缀。向 S3 传送数据集的频率由 Firehose 流缓冲区设置决定。因此,本示例中的记录将传输到以下 S3 对象键:

s3://my-logs-prod/customer_id=1234567890/device=mobile/year=2019/month=08/day=09/hour=20/my-delivery-stream-2019-08-09-23-55-09-a9fa96af-e4e4-409f-bac3-1f804714faaa

对于动态分区,您必须在 S3 存储桶前缀中使用以下表达式格式:!{namespace:value},其中命名空间可以是 partitionKeyFromQuerypartitionKeyFromLambda,也可以是两者。如果使用内联解析为源数据创建分区键,则必须指定一个 S3 存储桶前缀值,该值由以下格式指定的表达式组成:"partitionKeyFromQuery:keyID"。如果使用 Amazon Lambda 函数为源数据创建分区键,则必须指定一个 S3 存储桶前缀值,该值由以下格式指定的表达式组成:"partitionKeyFromLambda:keyID"

注意

您也可以使用 Hive 样式格式指定 S3 存储桶前缀值,例如 customer_id=! {partitionKeyFrom查询:客户_ID}。

有关更多信息,请参阅创建 Amazon Firehose 流和亚马逊 S3 对象的自定义前缀中的 “为目的地选择 Amazon S3”。

聚合数据的动态分区

您可以将动态分区应用于聚合数据(例如,聚合到单个 PutRecordPutRecordBatch API 调用中的多个事件、日志或记录),但必须先对这些数据进行解聚。您可以通过启用多记录解聚来取消聚合数据,即解析Firehose流中的记录并将其分开的过程。

多记录解聚可以是JSON类型,这意味着记录的分离基于连续的 JSON 对象。解聚也可以是这种类型Delimited,这意味着记录的分离是根据指定的自定义分隔符执行的。该自定义分隔符必须是 base-64 编码的字符串。例如,如果要使用以下字符串作为自定义分隔符####,则必须以 base-64 编码格式指定该字符串,将其转换为。IyMjIw==

注意

解聚合 JSON 记录时,请确保您的输入仍以支持的 JSON 格式显示。JSON 对象必须在一行上,没有分隔符或只能使用换行符分隔 (JSONL)。JSON 对象数组不是有效的输入。

以下是正确输入的示例:{"a":1}{"a":2} and {"a":1}\n{"a":2}

以下是错误输入的示例:[{"a":1}, {"a":2}]

对于聚合数据,当您启用动态分区时,Firehose 会解析记录,并根据指定的多记录解聚类型在每个 API 调用中查找有效的 JSON 对象或分隔记录。

重要

如果数据是聚合的,则只有在首次对数据进行解聚时才能应用动态分区。

重要

当您在 Firehose 中使用数据转换功能时,将在数据转换之前应用解聚合。传入 Firehose 的数据将按以下顺序进行处理:解聚合 → 通过 Lambda 进行数据转换 → 分区密钥。

向 S3 传输数据时添加新的行分隔符

您可以启用 “新行分隔符”,以便在传输到 Amazon S3 的对象中的记录之间添加新的行分隔符。这对解析 Amazon S3 中的对象很有帮助。当对聚合数据应用动态分区时,这也特别有用,因为作为解析过程的一部分,多记录解聚合(必须先应用于聚合数据,然后才能对其进行动态分区)会从记录中删除新行。

如何启用动态分区

您可以通过 Amazon Data Firehose 管理控制台、CLI 或 API 为 Firehose 直播配置动态分区。

重要

只有在创建新的 Firehose 直播时,才能启用动态分区。您无法为尚未启用动态分区的现有 Firehose 流启用动态分区。

有关在创建新 Fire hose 直播时如何通过 Firehose 管理控制台启用和配置动态分区的详细步骤,请参阅创建 Amazon Firehose 流。当你开始为你的 Firehose 直播指定目标时,请务必按照 “为你的目的地选择 Amazon S3” 部分中的步骤进行操作,因为当前,只有使用 Amazon S3 作为目标的 Firehose 直播才支持动态分区。

在活动的 Firehose 流上启用动态分区后,您可以通过添加新的分区密钥或删除或更新现有分区密钥和 S3 前缀表达式来更新配置。更新后,Firehose 开始使用新的密钥和新的 S3 前缀表达式。

重要

在 Firehose 直播上启用动态分区后,就无法在此 Firehose 直播上禁用动态分区。

动态分区错误处理

如果 Amazon Data Firehose 无法解析您的 Firehose 流中的数据记录,或者无法提取指定的分区密钥,也无法评估 S3 前缀值中包含的表达式,则这些数据记录将传送到您在创建启用动态分区的 Firehose 流时必须指定的 S3 错误存储桶前缀。S3 错误存储桶前缀包含 Firehose 无法传送到指定 S3 目标的所有记录。这些记录是根据错误类型组织的。除了记录外,传输的对象还包括有关错误的信息,以帮助理解和解决错误。

如果要为此 Firehose 流启用动态分区,则必须为 Firehose 流指定 S3 错误存储桶前缀。如果您不想为 Firehose 流启用动态分区,则指定 S3 错误存储桶前缀是可选的。

数据缓冲和动态分区

Amazon Data Firehose 会将传入的流数据缓冲到一定大小和一段时间,然后再将其传输到指定目的地。您可以在创建新的 Firehose 直播时配置缓冲区大小和缓冲间隔,或者更新现有 Firehose 流的缓冲大小和缓冲间隔。缓冲区大小的单位是 MB,缓冲区间隔的单位是秒。

启用动态分区后,Firehose 会根据配置的缓冲提示(大小和时间)在内部缓冲属于给定分区的记录,然后再将这些记录传送到您的 Amazon S3 存储桶。为了交付最大大小的对象,Firehose 在内部使用了多级缓冲。因此,一批记录的 end-to-end 延迟可能是配置的缓冲提示时间的 1.5 倍。这会影响 Firehose 直播的数据新鲜度。

活动分区计数是传输缓冲区内活动分区的总数。例如,如果动态分区查询每秒构造 3 个分区,并且您有一个缓冲区提示配置,每 60 秒触发一次传输,那么平均您将拥有 180 个活动分区。如果 Firehose 无法将分区中的数据传送到目标,则该分区在交付缓冲区中被视为活动分区,直到可以传送为止。

当根据记录数据字段和 S3 前缀表达式将 S3 前缀计算为新值时,将创建一个新分区。为每个活动分区创建一个新缓冲区。具有相同计算 S3 前缀的每个后续记录都将传输到该缓冲区。一旦缓冲区达到缓冲区大小限制或缓冲时间间隔,Firehose 就会创建一个包含缓冲区数据的对象,并将其传送到指定的 Amazon S3 前缀。传输对象后,该分区的缓冲区和分区本身将被删除,并从活动分区计数中移除。一旦满足每个分区的缓冲区大小或间隔,Firehose 就会将每个缓冲区数据作为单个对象传送。一旦活动分区的数量达到每个交付流 500 的上限,Firehose 流中的其余记录将传送到指定的 S3 错误存储桶前缀。