Kinesis Data Firehose 中的动态分区 - Amazon Kinesis Data Firehose
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Kinesis Data Firehose 中的动态分区

动态分区使您能够通过在数据中使用密钥在 Kinesis Data Firehose 中持续对流数据进行分区(例如,customer_id要么transaction_id),然后将按这些键分组的数据传输到相应的 Amazon Simple Storage Service (Amazon S3) 前缀中。这使得使用 Amazon Athena、亚马逊 EMR、Amazon Redshift Spectrum 和 Amazon QuickSight 等各种服务对 Amazon S3 中的流数据进行高性能、经济高效的分析变得更加轻松。此外,Amazon在需要额外处理的用例中,在将动态分区的流数据传输到 Amazon S3 之后,Glue 可以执行更复杂的提取、转换和加载 (ETL) 作业。

对数据进行分区可以最大限度地减少扫描的数据量、优化性能并降低 Amazon S3 上的分析查询的成本。它还增加了对数据的粒度访问。传统上,Kinesis Data Firehose 交付流用于捕获数据并将其加载到 Amazon S3 中。要对基于 Amazon S3 的分析的流数据集进行分区,您需要在 Amazon S3 存储桶之间运行分区应用程序,然后才能将数据用于分析,这可能会变得复杂或成本高昂。

通过动态分区,Kinesis Data Firehose 使用动态或静态定义的数据密钥持续对传输中数据进行分组,并按键将数据传输到单个 Amazon S3 前缀。这可以减少 time-to-insight 按几分钟或几小时。它还降低了成本并简化了体系结构。

分区键

使用动态分区,您可以根据分区密钥对数据进行分区,从 S3 流数据创建目标数据集。通过分区密钥,您可以根据特定值过滤流媒体数据。例如,如果您需要根据客户 ID 和国家/地区筛选数据,则可以指定customer_id作为一个分区密钥和数据字段country作为另一个分区密钥。然后,您可以指定表达式(使用支持的格式)来定义将动态分区数据记录传送到的 S3 存储桶前缀。

以下是创建分区键的支持方法:

  • 内联解析-此方法使用 Amazon Kinesis Data Firehose 内置的支持机制,jq 解析器,用于从 JSON 格式的数据记录中提取用于分区的密钥。

  • AmazonLambda 函数-此方法使用指定AmazonLambda 函数用于提取和返回分区所需的数据字段。

重要

启用动态分区时,必须至少配置其中一种方法才能对数据进行分区。您可以配置这些方法中的任一方法来指定分区密钥,也可以同时指定两种方法。

使用内联解析创建分区密钥

要将内联解析配置为流数据的动态分区方法,您必须选择要用作分区密钥的数据记录参数,并为每个指定的分区密钥提供一个值。

让我们看下面的示例数据记录,看看如何使用内联解析为其定义分区密钥:

{ "type": { "device": "mobile", "event": "user_clicked_submit_button" }, "customer_id": "1234567890", "event_timestamp": 1565382027, #epoch timestamp "region": "sample_region" }

例如,您可以选择基于customer_id参数或event_timestamp参数。这意味着你想要的价值customer_id参数或event_timestamp每条记录中的参数,用于确定将记录传送到的 S3 前缀。你也可以选择嵌套参数,例如device用表达式.type.device. 你的动态分区逻辑可以取决于多个参数。

为分区密钥选择数据参数后,您可以将每个参数映射到有效的 jq 表达式。下表显示了参数与 jq 表达式的这种映射:

参数 jq 表达式
customer_id .ustomer_id
device

.type.device

year

.event_timeamp | strftime (“%Y”)

month

.event_timeamp | strftime (“%m”)

day

.event_timeamp | strftime (“%d”)

hour

.event_timeamp | strftime (“%H”)

在运行时,Kinesis Data Firehose 使用上面的右列来根据每条记录中的数据来评估参数。

使用AmazonLambda 函数

对于压缩或加密的数据记录,或者除 JSON 以外的任何文件格式的数据,您可以使用集成AmazonLambda 函数使用您自己的自定义代码来解压、解密或转换记录,以提取和返回分区所需的数据字段。这是对现有转换 Lambda 函数的扩展,该函数现在可用于 Kinesis Data Firehose。您可以转换、解析和返回数据字段,然后使用相同的 Lambda 函数进行动态分区。

以下是一个示例 Amazon Kinesis Firehose 传输流处理 Python 中的 Lambda 函数,该函数将输入到输出的每个读取记录重放,并从记录中提取分区密钥。

from __future__ import print_function import base64 import json import datetime # Signature for all Lambda functions that user must implement def lambda_handler(firehose_records_input, context): print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn'] + ", Region: " + firehose_records_input['region'] + ", and InvocationId: " + firehose_records_input['invocationId']) # Create return value. firehose_records_output = {'records': []} # Create result object. # Go through records and process them for firehose_record_input in firehose_records_input['records']: # Get user payload payload = base64.b64decode(firehose_record_input['data']) json_value = json.loads(payload) print("Record that was received") print(json_value) print("\n") # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {} event_timestamp = datetime.datetime.fromtimestamp(json_value['eventTimestamp']) partition_keys = {"customerId": json_value['customerId'], "year": event_timestamp.strftime('%Y'), "month": event_timestamp.strftime('%m'), "date": event_timestamp.strftime('%d'), "hour": event_timestamp.strftime('%H'), "minute": event_timestamp.strftime('%M') } # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {'recordId': firehose_record_input['recordId'], 'data': firehose_record_input['data'], 'result': 'Ok', 'metadata': { 'partitionKeys': partition_keys }} # Must set proper record ID # Add the record to the list of output records. firehose_records_output['records'].append(firehose_record_output) # At the end return processed records return firehose_records_output

以下是一个示例 Amazon Kinesis Firehose 传输流处理 Go 中的 Lambda 函数,该函数将每条读取记录从输入到输出,并从记录中提取分区密钥。

package main import ( "fmt" "encoding/json" "time" "strconv" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) type KinesisFirehoseEventRecordData struct { CustomerId string `json:"customerId"` } func handleRequest(evnt events.KinesisFirehoseEvent) (events.KinesisFirehoseResponse, error) { fmt.Printf("InvocationID: %s\n", evnt.InvocationID) fmt.Printf("DeliveryStreamArn: %s\n", evnt.DeliveryStreamArn) fmt.Printf("Region: %s\n", evnt.Region) var response events.KinesisFirehoseResponse for _, record := range evnt.Records { fmt.Printf("RecordID: %s\n", record.RecordID) fmt.Printf("ApproximateArrivalTimestamp: %s\n", record.ApproximateArrivalTimestamp) var transformedRecord events.KinesisFirehoseResponseRecord transformedRecord.RecordID = record.RecordID transformedRecord.Result = events.KinesisFirehoseTransformedStateOk transformedRecord.Data = record.Data var metaData events.KinesisFirehoseResponseRecordMetadata var recordData KinesisFirehoseEventRecordData partitionKeys := make(map[string]string) currentTime := time.Now() json.Unmarshal(record.Data, &recordData) partitionKeys["customerId"] = recordData.CustomerId partitionKeys["year"] = strconv.Itoa(currentTime.Year()) partitionKeys["month"] = strconv.Itoa(int(currentTime.Month())) partitionKeys["date"] = strconv.Itoa(currentTime.Day()) partitionKeys["hour"] = strconv.Itoa(currentTime.Hour()) partitionKeys["minute"] = strconv.Itoa(currentTime.Minute()) metaData.PartitionKeys = partitionKeys transformedRecord.Metadata = metaData response.Records = append(response.Records, transformedRecord) } return response, nil } func main() { lambda.Start(handleRequest) }

动态分区的 Amazon S3 存储桶前缀

当您创建使用 Amazon S3 作为目标的交付流时,您必须指定一个 Amazon S3 存储桶,以便在其中 Kinesis Data Firehose 传输数据。Amazon S3 存储桶前缀用于组织存储在 S3 存储桶中的数据。Amazon S3 存储桶前缀类似于允许您将类似对象组合在一起的目录。

使用动态分区,您的分区数据将传输到指定的 Amazon S3 前缀中。如果不启用动态分区,则为交付流指定 S3 存储桶前缀是可选的。但是,如果您选择启用动态分区,则必须指定将 Kinesis Data Firehose 送到的 S3 存储桶前缀。

在启用动态分区的每个传输流中,S3 存储桶前缀值由基于该传输流的指定分区密钥的表达式组成。再次使用上述数据记录示例,您可以构建以下 S3 前缀值,该值由基于上面定义的分区密钥组成的表达式:

"ExtendedS3DestinationConfiguration": { "BucketARN": "arn:aws:s3:::my-logs-prod", "Prefix": "customer_id=!{partitionKeyFromQuery:customer_id}/ device=!{partitionKeyFromQuery:device}/ year=!{partitionKeyFromQuery:year}/ month=!{partitionKeyFromQuery:month}/ day=!{partitionKeyFromQuery:day}/ hour=!{partitionKeyFromQuery:hour}/" }

Kinesis Data Firehose 会在运行时评估上述表达式。它将匹配相同评估的 S3 前缀表达式的记录分组到单个数据集中。然后,Kinesis Data Firehose 将每个数据集传送到评估的 S3 前缀。向 S3 传输数据集的频率由交付流缓冲区设置决定。因此,本示例中的记录将传送到以下 S3 对象密钥:

s3://my-logs-prod/customer_id=1234567890/device=mobile/year=2019/month=08/day=09/hour=20/my-delivery-stream-2019-08-09-23-55-09-a9fa96af-e4e4-409f-bac3-1f804714faaa

对于动态分区,您必须在 S3 存储桶前缀中使用以下表达式格式:!{namespace:value},其中命名空间可以是partitionKeyFromQuery要么partitionKeyFromLambda,或同时选择两者。如果您使用内联解析为源数据创建分区密钥,则必须指定 S3 存储桶前缀值,该值由以下格式指定的表达式组成:"partitionKeyFromQuery:keyID". 如果您正在使用AmazonLambda 函数要为源数据创建分区密钥,您必须指定 S3 存储桶前缀值,该值由以下格式指定的表达式组成:"partitionKeyFromLambda:keyID".

注意

您还可以使用 Hive 样式格式指定 S3 存储桶前缀值,例如 customer_id=! {来自查询的分区键:Customer_ID}。

有关更多信息,请参阅中的 “为目标选择 Amazon S3”创建 Amazon Kinesis Data Firehose 传输流Amazon S3 对象的自定义前缀.

聚合数据的动态分区

您可以将动态分区应用于聚合数据(例如,将多个事件、日志或记录聚合到一个中)PutRecordPutRecordBatchAPI 调用)但是这些数据必须首先进行分类。您可以通过启用多记录分类来解析数据,即解析传输流中的记录并将其分离的过程。多记录分解可以是JSON类型,这意味着记录的分离是基于有效的 JSON 来执行的。或者它可以是Delimitedtype,这意味着记录的分离是基于指定的自定义分隔符执行的。此自定义分隔符必须是基于 64 编码的字符串。例如,如果要将以下字符串用作自定义分隔符####,您必须以 base-64 编码格式指定它,这将其转换为IyMjIw==.

对于聚合数据,启用动态分区时,Kinesis Data Firehose 会根据指定的多记录分解类型来解析记录,并在每个 API 调用中查找有效的 JSON 对象或分隔记录。

重要

如果您的数据是聚合的,则只有在数据首次进行去分类时才能应用动态分区。

将数据传送到 S3 时添加新的行分隔符

启用动态分区时,您可以将传输流配置为在传送到 Amazon S3 的对象中的记录之间添加新的行分隔符。这可能有助于解析 Amazon S3 中的对象。在将动态分区应用于聚合数据时,这也特别有用,因为多记录分解(必须先应用于聚合数据才能动态分区)在分析过程中从记录中删除新行。

如何启用动态分区

您可以通过 Kinesis Data Firehose 管理控制台、CLI 或 API 为交付流配置动态分区。

重要

只有在创建新传输流时,您才能启用动态分区。不能为尚未启用动态分区的现有交付流启用动态分区。

有关如何在创建新交付流时通过 Amazon Kinesis Data Firehose 管理控制台启用和配置动态分区的详细步骤,请参阅创建 Amazon Kinesis Data Firehose 传输流. 当您开始为交付流指定目标的任务时,请务必按照选择 Amazon S3 作为您的目标部分,因为目前,只有使用 Amazon S3 作为目标的交付流才支持动态分区。

启用活动交付流上的动态分区后,您可以通过添加新分区密钥或删除或更新现有分区密钥和 S3 前缀表达式来更新配置。更新后,Amazon Kinesis Data Firehose 开始使用新密钥和新的 S3 前缀表达式。

重要

在交付流上启用动态分区后,无法在此传输流上禁用该分区。

动态分区错误处理

如果 Kinesis Data Firehose 无法解析传输流中的数据记录,或者无法提取指定的分区密钥,或者无法评估 S3 前缀值中包含的表达式,则这些数据记录将传送到您在创建时必须指定的 S3 错误存储桶前缀。您可以在其中启用动态分区的交付流。S3 错误存储桶前缀包含 Kinesis Data Firehose 无法传送到指定 S3 目标的所有记录。这些记录根据错误类型进行组织。除了记录外,交付的对象还包括有关错误的信息,以帮助理解和解决错误。

如果要为此传输流启用动态分区,则必须为传输流指定 S3 错误存储桶前缀。如果您不想为传输流启用动态分区,则指定 S3 错误存储桶前缀是可选的。

数据缓冲和动态分区

在传输到指定目标之前,Amazon Kinesis Data Firehose 会在特定时间内缓冲特定大小的传入流数据。您可以在创建新的传输流时配置缓冲区大小和缓冲区间隔,或者更新现有传送流的缓冲区大小和缓冲区间隔。缓冲区大小以 MB 为单位测量,缓冲区间隔以秒为单位测量。

对于启用了数据分区的交付流,Kinesis Data Firehose 在运行时根据记录的有效负载为每个分区创建一个缓冲区。对于启用了数据分区的传输流,缓冲区大小范围为 64MB 到 128MB,默认设置为 128MB,缓冲区间隔范围为 60 秒到 900 秒。每个活动分区支持每秒 40 MB 的最大吞吐量。

活动分区计数是交付缓冲区内活动分区的总数。例如,如果动态分区查询每秒构造 3 个分区,并且您有一个缓冲区提示配置,每 60 秒触发交付一次,那么平均而言,您将拥有 180 个活动分区。如果 Kinesis Data Firehose 无法将分区中的数据传送到目标,则在交付缓冲区之前,此分区将被视为活动分区。

启用交付流上的动态分区后,最多可以为该传输流创建 500 个活动分区。您可以使用Amazon Kinesis Data Firehose 限制表请求提高此配额的值。当基于记录数据字段和 S3 前缀表达式将 S3 前缀评估为新值时,将创建一个新分区。为每个活动分区创建一个新的缓冲区。每条具有相同评估的 S3 前缀的后续记录都会传送到该缓冲区。缓冲区满足缓冲区大小限制或缓冲时间间隔后,Amazon Kinesis Data Firehose 会创建一个包含缓冲区数据的对象,并将其传送到指定的 Amazon S3 前缀。传送对象后,该分区的缓冲区和分区本身将被删除并从活动分区计数中删除。一旦分别满足每个分区的缓冲区大小或间隔,Amazon Kinesis Data Firehose 将每个缓冲区数据作为单个对象传送。一旦活动分区的数量达到每个传输流 500 个的限制,传输流中的其余记录将传输到指定的 S3 错误存储桶前缀。