Amazon Kinesis Data Firehose Data Transformation
Kinesis Data Firehose can invoke your Lambda function to transform incoming source data and deliver the transformed data to destinations. You can enable Kinesis Data Firehose data transformation when you create your delivery stream.
Data Transformation Flow
When you enable Kinesis Data Firehose data transformation, Kinesis Data Firehose buffers incoming data. The buffering
hint ranges between 0.2 MB and up to 3MB. The default buffering hint is 1MB for all
destinations, except Splunk. For Splunk, the default buffering hint is 256 KB. (To
adjust the buffering size, use the ProcessingConfiguration
ProcessorParameter
BufferSizeInMBs
.)
Kinesis Data Firehose then invokes the specified Lambda function asynchronously with each buffered batch
using the Amazon Lambda synchronous invocation mode. The transformed data is sent from Lambda
to Kinesis Data Firehose. Kinesis Data Firehose then sends it to the destination when the specified destination
buffering size or buffering interval is reached, whichever happens first.
Important
The Lambda synchronous invocation mode has a payload size limit of 6 MB for both the request and the response. Make sure that your buffering size for sending the request to the function is less than or equal to 6 MB. Also ensure that the response that your function returns doesn't exceed 6 MB.
Data Transformation and Status Model
All transformed records from Lambda must contain the following parameters, or Kinesis Data Firehose rejects them and treats that as a data transformation failure.
For Kinesis Data Streams and Direct PUT:
- recordId
-
The record ID is passed from Kinesis Data Firehose to Lambda during the invocation. The transformed record must contain the same record ID. Any mismatch between the ID of the original record and the ID of the transformed record is treated as a data transformation failure.
- result
-
The status of the data transformation of the record. The possible values are:
Ok
(the record was transformed successfully),Dropped
(the record was dropped intentionally by your processing logic), andProcessingFailed
(the record could not be transformed). If a record has a status ofOk
orDropped
, Kinesis Data Firehose considers it successfully processed. Otherwise, Kinesis Data Firehose considers it unsuccessfully processed. - data
-
The transformed data payload, after base64-encoding.
Following is a sample Lambda result output:
{ "recordId":
"<recordId from the Lambda input>"
, "result": "Ok", "data":"<Base64 encoded Transformed data>"
}
For Amazon MSK
- recordId
-
The record ID is passed from Kinesis Data Firehose to Lambda during the invocation. The transformed record must contain the same record ID. Any mismatch between the ID of the original record and the ID of the transformed record is treated as a data transformation failure.
- result
-
The status of the data transformation of the record. The possible values are:
Ok
(the record was transformed successfully),Dropped
(the record was dropped intentionally by your processing logic), andProcessingFailed
(the record could not be transformed). If a record has a status ofOk
orDropped
, Kinesis Data Firehose considers it successfully processed. Otherwise, Kinesis Data Firehose considers it unsuccessfully processed. - KafkaRecordValue
-
The transformed data payload, after base64-encoding.
Following is a sample Lambda result output:
{ "recordId":
"<recordId from the Lambda input>"
, "result": "Ok", "kafkaRecordValue":"<Base64 encoded Transformed data>"
}
Lambda Blueprints
These blueprints demonstrate how you can create and use Amazon Lambda functions to transform data in your Kinesis Firehose data streams.
To see the blueprints that are available in the Amazon Lambda console
Sign in to the Amazon Web Services Management Console and open the Amazon Lambda console at https://console.amazonaws.cn/lambda/
. -
Choose Create function, and then choose Use a blueprint.
-
In the Blueprints field, search for the keyword
firehose
to find the Kinesis Data Firehose Lambda blueprints.
List of blueprints:
-
Process records sent to Kinesis Firehose stream (Node.js, Python)
This blueprint shows a basic example of how to process data in your Kinesis Data Firehose data stream using Amazon Lambda.
Latest release date: November, 2016.
Release notes: none.
-
Process CloudWatch logs sent to Kinesis Firehose (Node.js, Python)
This blueprint shows how you can use Amazon Lambda to de-compress Amazon CloudWatch Logs sent to Kinesis Data Firehose.
Latest release date: September 15, 2022.
Release notes: the latest release version of this blueprint shows how you can use improved error handling to reduce usage charges for Kinesis Data Firehose and Lambda. When using this blueprint, change the Kinesis Firehose Lambda setting for buffer size to 256KB.
-
Convert Kinesis Firehose stream records in syslog format to JSON (Node.js)
This blueprint shows how you can convert input records in RFC3164 Syslog format to JSON.
Latest release date: Nov, 2016.
Release notes: none.
To see the blueprints that are available in the Amazon Serverless Application Repository
-
Choose Browse all applications.
-
In the Applications field, search for the keyword
firehose
.
You can also create a Lambda function without using a blueprint. See Getting
Started with Amazon Lambda
Data Transformation Failure Handling
If your Lambda function invocation fails because of a network timeout or because you've
reached the Lambda invocation limit, Kinesis Data Firehose retries the invocation three times by default.
If the invocation does not succeed, Kinesis Data Firehose then skips that batch of records. The skipped
records are treated as unsuccessfully processed records. You can specify or override the
retry options using the CreateDeliveryStream or UpdateDestination
API. For this type of failure, you can log invocation errors to Amazon CloudWatch Logs. For more
information, see Monitoring Kinesis Data Firehose
Using CloudWatch Logs.
If the status of the data transformation of a record is ProcessingFailed
,
Kinesis Data Firehose treats the record as unsuccessfully processed. For this type of failure, you can
emit error logs to Amazon CloudWatch Logs from your Lambda function. For more information, see Accessing Amazon CloudWatch Logs for
Amazon Lambda in the Amazon Lambda Developer Guide.
If data transformation fails, the unsuccessfully processed records are delivered to
your S3 bucket in the processing-failed
folder. The records have
the following format:
{ "attemptsMade": "
count
", "arrivalTimestamp": "timestamp
", "errorCode": "code
", "errorMessage": "message
", "attemptEndingTimestamp": "timestamp
", "rawData": "data
", "lambdaArn": "arn
" }
attemptsMade
-
The number of invocation requests attempted.
arrivalTimestamp
-
The time that the record was received by Kinesis Data Firehose.
errorCode
-
The HTTP error code returned by Lambda.
errorMessage
-
The error message returned by Lambda.
attemptEndingTimestamp
-
The time that Kinesis Data Firehose stopped attempting Lambda invocations.
rawData
-
The base64-encoded record data.
lambdaArn
-
The Amazon Resource Name (ARN) of the Lambda function.
Duration of a Lambda Invocation
Kinesis Data Firehose supports a Lambda invocation time of up to 5 minutes. If your Lambda function
takes more than 5 minutes to complete, you get the following error: Firehose
encountered timeout errors when calling Amazon Lambda. The maximum supported function
timeout is 5 minutes.
For information about what Kinesis Data Firehose does if such an error occurs, see Data Transformation Failure Handling.
Source Record Backup
Kinesis Data Firehose can back up all untransformed records to your S3 bucket concurrently while delivering transformed records to the destination. You can enable source record backup when you create or update your delivery stream. You cannot disable source record backup after you enable it.