了解分区键 - Amazon Data Firehose
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

了解分区键

通过动态分区,您可以根据分区键对数据进行分区,从 S3 流数据创建目标数据集。分区键使您能够根据特定值筛选流数据。例如,如果您需要根据客户 ID 和国家/地区筛选数据,则可以将 customer_id 的数据字段指定为一个分区键,将 country 的数据字段指定为另一个分区键。然后,指定表达式(使用支持的格式)来定义动态分区数据记录要传输到的 S3 存储桶前缀。

您可以使用以下方法来创建分区键。

  • 内联解析:此方法使用 Firehose 内置支持机制(jq 解析器),从 JSON 格式的数据记录中提取用于分区的键。目前,我们仅支持 jq 1.6 版本。

  • Amazon Lambda 函数:此方法使用指定的 Amazon Lambda 函数提取和返回分区所需的数据字段。

重要

启用动态分区时,必须至少配置其中一种方法来对数据进行分区。您可以配置其中一种方法来指定分区键,也可以同时配置这两种方法。

使用内联解析创建分区键

要将内联解析配置为流数据的动态分区方法,必须选择要用作分区键的数据记录参数,并为每个指定的分区键提供一个值。

以下示例数据记录显示如何通过内联解析为其定义分区键。请注意,应以 Base64 格式对数据进行编码。您也可以参考 CLI 示例

{ "type": { "device": "mobile", "event": "user_clicked_submit_button" }, "customer_id": "1234567890", "event_timestamp": 1565382027, #epoch timestamp "region": "sample_region" }

例如,您可以选择根据 customer_id 参数或 event_timestamp 参数对数据进行分区。这意味着您希望使用每条记录中的 customer_id 参数或 event_timestamp 参数的值来确定要向其传输传递记录的 S3 前缀。您也可以选择嵌套参数,例如表达式为 .type.devicedevice。您的动态分区逻辑可能取决于多个参数。

为分区键选择数据参数后,将每个参数映射到有效的 jq 表达式。下表显示了参数到 jq 表达式的映射:

参数 jq 表达式
customer_id .customer_id
device

.type.device

year

.event_timestamp| strftime("%Y")

month

.event_timestamp| strftime("%m")

day

.event_timestamp| strftime("%d")

hour

.event_timestamp| strftime("%H")

在运行时系统,Firehose 使用上面的右列根据每条记录中的数据来评估参数。

使用 Amazon Lambda 函数创建分区键

对于压缩或加密的数据记录,或除 JSON 之外的任何文件格式的数据,您可以使用集成的 Amazon Lambda 函数和您自己的自定义代码来解压缩、解密或转换记录,以便提取和返回分区所需的数据字段。这是对现有转换 Lambda 函数的扩展,该函数现已随 Firehose 一起提供。您可以转换、解析和返回数据字段,然后使用相同的 Lambda 函数进行动态分区。

以下是一个用 Python 编写的 Firehose 流处理 Lambda 函数示例,该函数从输入到输出重放每一条读取记录,并从记录中提取分区键。

from __future__ import print_function import base64 import json import datetime # Signature for all Lambda functions that user must implement def lambda_handler(firehose_records_input, context): print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn'] + ", Region: " + firehose_records_input['region'] + ", and InvocationId: " + firehose_records_input['invocationId']) # Create return value. firehose_records_output = {'records': []} # Create result object. # Go through records and process them for firehose_record_input in firehose_records_input['records']: # Get user payload payload = base64.b64decode(firehose_record_input['data']) json_value = json.loads(payload) print("Record that was received") print(json_value) print("\n") # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {} event_timestamp = datetime.datetime.fromtimestamp(json_value['eventTimestamp']) partition_keys = {"customerId": json_value['customerId'], "year": event_timestamp.strftime('%Y'), "month": event_timestamp.strftime('%m'), "day": event_timestamp.strftime('%d'), "hour": event_timestamp.strftime('%H'), "minute": event_timestamp.strftime('%M') } # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {'recordId': firehose_record_input['recordId'], 'data': firehose_record_input['data'], 'result': 'Ok', 'metadata': { 'partitionKeys': partition_keys }} # Must set proper record ID # Add the record to the list of output records. firehose_records_output['records'].append(firehose_record_output) # At the end return processed records return firehose_records_output

以下是一个用 Go 编写的 Firehose 流处理 Lambda 函数示例,该函数从输入到输出重放每一条读取记录,并从记录中提取分区键。

package main import ( "fmt" "encoding/json" "time" "strconv" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) type DataFirehoseEventRecordData struct { CustomerId string `json:"customerId"` } func handleRequest(evnt events.DataFirehoseEvent) (events.DataFirehoseResponse, error) { fmt.Printf("InvocationID: %s\n", evnt.InvocationID) fmt.Printf("DeliveryStreamArn: %s\n", evnt.DeliveryStreamArn) fmt.Printf("Region: %s\n", evnt.Region) var response events.DataFirehoseResponse for _, record := range evnt.Records { fmt.Printf("RecordID: %s\n", record.RecordID) fmt.Printf("ApproximateArrivalTimestamp: %s\n", record.ApproximateArrivalTimestamp) var transformedRecord events.DataFirehoseResponseRecord transformedRecord.RecordID = record.RecordID transformedRecord.Result = events.DataFirehoseTransformedStateOk transformedRecord.Data = record.Data var metaData events.DataFirehoseResponseRecordMetadata var recordData DataFirehoseEventRecordData partitionKeys := make(map[string]string) currentTime := time.Now() json.Unmarshal(record.Data, &recordData) partitionKeys["customerId"] = recordData.CustomerId partitionKeys["year"] = strconv.Itoa(currentTime.Year()) partitionKeys["month"] = strconv.Itoa(int(currentTime.Month())) partitionKeys["date"] = strconv.Itoa(currentTime.Day()) partitionKeys["hour"] = strconv.Itoa(currentTime.Hour()) partitionKeys["minute"] = strconv.Itoa(currentTime.Minute()) metaData.PartitionKeys = partitionKeys transformedRecord.Metadata = metaData response.Records = append(response.Records, transformedRecord) } return response, nil } func main() { lambda.Start(handleRequest) }