Amazon Data Firehose was previously known as Amazon Kinesis Data Firehose
Dynamic partitioning in Amazon Data Firehose
Dynamic partitioning enables you to continuously partition streaming data in Firehose by using keys within data (for example, customer_id
or
transaction_id
) and then deliver the data grouped by these keys into
corresponding Amazon Simple Storage Service (Amazon S3) prefixes. This makes it easier to
run high performance, cost-efficient analytics on streaming data in Amazon S3 using various
services such as Amazon Athena, Amazon EMR, Amazon Redshift Spectrum, and Amazon QuickSight.
In addition, Amazon Glue can perform more sophisticated extract, transform, and load (ETL)
jobs after the dynamically partitioned streaming data is delivered to Amazon S3, in
use-cases where additional processing is required.
Partitioning your data minimizes the amount of data scanned, optimizes performance, and reduces costs of your analytics queries on Amazon S3. It also increases granular access to your data. Firehose streams are traditionally used in order to capture and load data into Amazon S3. To partition a streaming data set for Amazon S3-based analytics, you would need to run partitioning applications between Amazon S3 buckets prior to making the data available for analysis, which could become complicated or costly.
With dynamic partitioning, Firehose continuously groups in-transit data using dynamically or statically defined data keys, and delivers the data to individual Amazon S3 prefixes by key. This reduces time-to-insight by minutes or hours. It also reduces costs and simplifies architectures.
Topics
Work with partitioning keys
With dynamic partitioning, you create targeted data sets from the streaming S3 data by
partitioning the data based on partitioning keys. Partitioning keys enable you to filter
your streaming data based on specific values. For example, if you need to filter your
data based on customer ID and country, you can specify the data field of
customer_id
as one partitioning key and the data field of
country
as another partitioning key. Then, you specify the expressions
(using the supported formats) to define the S3 bucket prefixes to which the dynamically
partitioned data records are to be delivered.
The following are the supported methods of creating partitioning keys:
-
Inline parsing - this method uses Firehose built-in support mechanism, a jq parser
, for extracting the keys for partitioning from data records that are in JSON format. Currently, we only support jq 1.6
version. -
Amazon Lambda function - this method uses a specified Amazon Lambda function to extract and return the data fields needed for partitioning.
Important
When you enable dynamic partitioning, you must configure at least one of these methods to partition your data. You can configure either of these methods to specify your partitioning keys or both of them at the same time.
Creating partitioning keys with inline parsing
To configure inline parsing as the dynamic partitioning method for your streaming data, you must choose data record parameters to be used as partitioning keys and provide a value for each specified partitioning key.
The following sample data record shows how you can define partitioning keys for it
with inline parsing. Note that the data should be encoded in Base64 format. You can
also refer to the CLI example
{ "type": { "device": "mobile", "event": "user_clicked_submit_button" }, "customer_id": "1234567890", "event_timestamp": 1565382027, #epoch timestamp "region": "sample_region" }
For example, you can choose to partition your data based on the
customer_id
parameter or the event_timestamp
parameter. This means that you want the value of the customer_id
parameter or the event_timestamp
parameter in each record to be used in
determining the S3 prefix to which the record is to be delivered. You can also
choose a nested parameter, like device
with an expression
.type.device
. Your dynamic partitioning logic can depend on
multiple parameters.
After selecting data parameters for your partitioning keys, you then map each parameter to a valid jq expression. The following table shows such a mapping of parameters to jq expressions:
Parameter | jq expression |
---|---|
customer_id |
.customer_id |
device |
.type.device |
year |
.event_timestamp| strftime("%Y") |
month |
.event_timestamp| strftime("%m") |
day |
.event_timestamp| strftime("%d") |
hour |
.event_timestamp| strftime("%H") |
At runtime, Firehose uses the right column above to evaluate the parameters based on the data in each record.
Create partitioning keys with an Amazon Lambda function
For compressed or encrypted data records, or data that is in any file format other than JSON, you can use the integrated Amazon Lambda function with your own custom code to decompress, decrypt, or transform the records in order to extract and return the data fields needed for partitioning. This is an expansion of the existing transform Lambda function that is available today with Firehose. You can transform, parse and return the data fields that you can then use for dynamic partitioning using the same Lambda function.
The following is an example Firehose stream processing Lambda function in Python that replays every read record from input to output and extracts partitioning keys from the records.
from __future__ import print_function import base64 import json import datetime # Signature for all Lambda functions that user must implement def lambda_handler(firehose_records_input, context): print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn'] + ", Region: " + firehose_records_input['region'] + ", and InvocationId: " + firehose_records_input['invocationId']) # Create return value. firehose_records_output = {'records': []} # Create result object. # Go through records and process them for firehose_record_input in firehose_records_input['records']: # Get user payload payload = base64.b64decode(firehose_record_input['data']) json_value = json.loads(payload) print("Record that was received") print(json_value) print("\n") # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {} event_timestamp = datetime.datetime.fromtimestamp(json_value['eventTimestamp']) partition_keys = {"customerId": json_value['customerId'], "year": event_timestamp.strftime('%Y'), "month": event_timestamp.strftime('%m'), "date": event_timestamp.strftime('%d'), "hour": event_timestamp.strftime('%H'), "minute": event_timestamp.strftime('%M') } # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {'recordId': firehose_record_input['recordId'], 'data': firehose_record_input['data'], 'result': 'Ok', 'metadata': { 'partitionKeys': partition_keys }} # Must set proper record ID # Add the record to the list of output records. firehose_records_output['records'].append(firehose_record_output) # At the end return processed records return firehose_records_output
The following is an example Firehose stream processing Lambda function in Go that replays every read record from input to output and extracts partitioning keys from the records.
package main import ( "fmt" "encoding/json" "time" "strconv" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) type DataFirehoseEventRecordData struct { CustomerId string `json:"customerId"` } func handleRequest(evnt events.DataFirehoseEvent) (events.DataFirehoseResponse, error) { fmt.Printf("InvocationID: %s\n", evnt.InvocationID) fmt.Printf("DeliveryStreamArn: %s\n", evnt.DeliveryStreamArn) fmt.Printf("Region: %s\n", evnt.Region) var response events.DataFirehoseResponse for _, record := range evnt.Records { fmt.Printf("RecordID: %s\n", record.RecordID) fmt.Printf("ApproximateArrivalTimestamp: %s\n", record.ApproximateArrivalTimestamp) var transformedRecord events.DataFirehoseResponseRecord transformedRecord.RecordID = record.RecordID transformedRecord.Result = events.DataFirehoseTransformedStateOk transformedRecord.Data = record.Data var metaData events.DataFirehoseResponseRecordMetadata var recordData DataFirehoseEventRecordData partitionKeys := make(map[string]string) currentTime := time.Now() json.Unmarshal(record.Data, &recordData) partitionKeys["customerId"] = recordData.CustomerId partitionKeys["year"] = strconv.Itoa(currentTime.Year()) partitionKeys["month"] = strconv.Itoa(int(currentTime.Month())) partitionKeys["date"] = strconv.Itoa(currentTime.Day()) partitionKeys["hour"] = strconv.Itoa(currentTime.Hour()) partitionKeys["minute"] = strconv.Itoa(currentTime.Minute()) metaData.PartitionKeys = partitionKeys transformedRecord.Metadata = metaData response.Records = append(response.Records, transformedRecord) } return response, nil } func main() { lambda.Start(handleRequest) }
Use Amazon S3 bucket prefix
When you create a Firehose stream that uses Amazon S3 as the destination, you must specify an Amazon S3 bucket where Firehose is to deliver your data. Amazon S3 bucket prefixes are used to organize the data that you store in your S3 buckets. An Amazon S3 bucket prefix is similar to a directory that enables you to group similar objects together.
With dynamic partitioning, your partitioned data is delivered into the specified Amazon S3 prefixes. If you don't enable dynamic partitioning, specifying an S3 bucket prefix for your Firehose stream is optional. However, if you choose to enable dynamic partitioning, you must specify the S3 bucket prefixes to which Firehose delivers partitioned data.
In every Firehose stream where you enable dynamic partitioning, the S3 bucket prefix value consists of expressions based on the specified partitioning keys for that Firehose stream. Using the above data record example again, you can build the following S3 prefix value that consists of expressions based on the partitioning keys defined above:
"ExtendedS3DestinationConfiguration": { "BucketARN": "arn:aws:s3:::my-logs-prod", "Prefix": "customer_id=!{partitionKeyFromQuery:customer_id}/ device=!{partitionKeyFromQuery:device}/ year=!{partitionKeyFromQuery:year}/ month=!{partitionKeyFromQuery:month}/ day=!{partitionKeyFromQuery:day}/ hour=!{partitionKeyFromQuery:hour}/" }
Firehose evaluates the above expression at runtime. It groups records that match the same evaluated S3 prefix expression into a single data set. Firehose then delivers each data set to the evaluated S3 prefix. The frequency of data set delivery to S3 is determined by the Firehose stream buffer setting. As a result, the record in this example is delivered to the following S3 object key:
s3://my-logs-prod/customer_id=1234567890/device=mobile/year=2019/month=08/day=09/hour=20/my-delivery-stream-2019-08-09-23-55-09-a9fa96af-e4e4-409f-bac3-1f804714faaa
For dynamic partitioning, you must use the following expression format in your S3
bucket prefix: !{namespace:value}
, where namespace can be either
partitionKeyFromQuery
or partitionKeyFromLambda
, or both.
If you are using inline parsing to create the partitioning keys for your source data,
you must specify an S3 bucket prefix value that consists of expressions specified in the
following format: "partitionKeyFromQuery:keyID"
. If you are using an Amazon
Lambda function to create partitioning keys for your source data, you must specify an S3
bucket prefix value that consists of expressions specified in the following format:
"partitionKeyFromLambda:keyID"
.
Note
You can also specify the S3 bucket prefix value using the hive style format, for example customer_id=!{partitionKeyFromQuery:customer_id}.
For more information, see the "Choose Amazon S3 for Your Destination" in Creating an
Amazon Firehose stream
Apply dynamic partitioning to aggregated data
You can apply dynamic partitioning to aggregated data (for example, multiple events,
logs, or records aggregated into a single PutRecord
and
PutRecordBatch
API call) but this data must first be deaggregated. You
can deaggregate your data by enabling multi record deaggregation - the process of
parsing through the records in the Firehose stream and separating them.
Multi record deaggregation can either be of JSON
type, meaning that the
separation of records is based on consecutive JSON objects. Deaggregation can also be of
the type Delimited
, meaning that the separation of records is performed
based on a specified custom delimiter. This custom delimiter must be a base-64 encoded
string. For example, if you want to use the following string as your custom delimiter
####
, you must specify it in the base-64 encoded format, which
translates it to IyMjIw==
.
Note
When deaggregating JSON records, make sure that your input is still presented in the supported JSON format. JSON objects must be on a single line with no delimiter or newline-delimited (JSONL) only. An array of JSON objects is not a valid input.
These are examples of correct input: {"a":1}{"a":2} and {"a":1}\n{"a":2}
This is an example of the incorrect input: [{"a":1}, {"a":2}]
With aggregated data, when you enable dynamic partitioning, Firehose parses the records and looks for either valid JSON objects or delimited records within each API call based on the specified multi record deaggregation type.
Important
If your data is aggregated, dynamic partitioning can be only be applied if your data is first deaggregated.
Important
When you use Data Transformation feature in Firehose, the deaggregation will be applied before the Data Transformation. Data coming into Firehose will be processed in the following order: Deaggregation → Data Transformation via Lambda → Partitioning Keys.
Add a new line delimiter when delivering data to S3
You can enable New Line Delimiter to add a new line delimiter between records in objects that are delivered to Amazon S3. This can be helpful for parsing objects in Amazon S3. This is also particularly useful when dynamic partitioning is applied to aggregated data because multi-record deaggregation (which must be applied to aggregated data before it can be dynamically partitioned) removes new lines from records as part of the parsing process.
Enable dynamic partitioning
You can configure dynamic partitioning for your Firehose streams through the Amazon Data Firehose Management Console, CLI, or the APIs.
Important
You can enable dynamic partitioning only when you create a new Firehose stream. You cannot enable dynamic partitioning for an existing Firehose stream that does not have dynamic partitioning already enabled.
For detailed steps on how to enable and configure dynamic partitioning through the
Firehose management console while creating a new Firehose stream, see Creating an
Amazon Firehose stream
Once dynamic partitioning on an active Firehose stream is enabled, you can update the configuration by adding new or removing or updating existing partitioning keys and the S3 prefix expressions. Once updated, Firehose starts using the new keys and the new S3 prefix expressions.
Important
Once you enable dynamic partitioning on a Firehose stream, it cannot be disabled on this Firehose stream.
Troubleshoot dynamic partitioning errors
If Amazon Data Firehose is not able to parse data records in your Firehose stream or it fails to extract the specified partitioning keys, or to evaluate the expressions included in the S3 prefix value, these data records are delivered to the S3 error bucket prefix that you must specify when you create the Firehose stream where you enable dynamic partitioning. The S3 error bucket prefix contains all the records that Firehose is not able to deliver to the specified S3 destination. These records are organized based on the error type. Along with the record, the delivered object also includes information about the error to help understand and resolve the error.
You must specify an S3 error bucket prefix for a Firehose stream if you want to enable dynamic partitioning for this Firehose stream. If you don't want to enable dynamic partitioning for a Firehose stream, specifying an S3 error bucket prefix is optional.
Data buffering and dynamic partitioning
Amazon Data Firehose buffers incoming streaming data to a certain size and for a certain period of time before delivering it to the specified destinations. You can configure the buffer size and the buffer interval while creating new Firehose streams or update the buffer size and the buffer interval on your existing Firehose streams. A buffer size is measured in MBs and a buffer interval is measured in seconds.
Note
Zero buffering feature is not available for dynamic partitioning.
When dynamic partitioning is enabled, Firehose internally buffers records that belong to a given partition based on the configured buffering hint (size and time) before delivering these records to your Amazon S3 bucket. In order to deliver maximum size objects, Firehose uses multi-stage buffering internally. Therefore, end-to-end delay of a batch of records might be 1.5 times of the configured buffering hint time. This affects the data freshness of a Firehose stream.
The active partition count is the total number of active partitions within the delivery buffer. For example, if the dynamic partitioning query constructs 3 partitions per second and you have a buffer hint configuration triggering delivery every 60 seconds, then on average you would have 180 active partitions. If Firehose cannot deliver the data in a partition to a destination, this partition is counted as active in the delivery buffer until it can be delivered.
A new partition is created when an S3 prefix is evaluated to a new value based on the record data fields and the S3 prefix expressions. A new buffer is created for each active partition. Every subsequent record with the same evaluated S3 prefix is delivered to that buffer.
Once the buffer meets the buffer size limit or the buffer time interval, Firehose creates an object with the buffer data and delivers it to the specified Amazon S3 prefix. After the object is delivered, the buffer for that partition and the partition itself are deleted and removed from the active partitions count.
Firehose delivers each buffer data as a single object once the buffer size or interval
are met for each partition separately. Once the number of active partitions reaches a
limit of 500 per Firehose stream, the rest of the records in the Firehose stream are delivered to
the specified S3 error bucket prefix (activePartitionExceeded). You can use the Amazon Data Firehose Limits form