Firehose supports database as a source in all Amazon Web Services Regions
Route incoming records to different Iceberg tables
Firehose can route incoming records in a stream to different Iceberg tables based on the content of the record. For example, consider the following sample input record.
{ "game_id": "111c-a4b8", "play_mode": 2, "event": { "event_type": "PlayerSpawn", "mode": "rejoin-mode" }, "position": { "x": 143.595901, "y": 476, "z": "0.24234876" }, "duration_secs": 2, "level_id": "the_good_place", "player_count": 3 }
{ "build_type": "RtDbg", "device_type": "PC", "event": { "event_type": "BootStart", }, "host_name": "BSK-DRFORNIES", "is_trial": false, "issuer_id": "", "mac_address": "", "platform": "PC", "play_go_source": -1, }
In this example, the event_type
field
has two possible values – PlayerSpawn
and BootStart
.
When an incoming record has event_type
field as PlayerSpawn
, we want to write the record to an Iceberg table named
PlayerSpawn
, and when an incoming record has event_type field as
BootStart
, we want to write the
record to a table named BootStart
.
Note that the records with PlayerSpawn
and BootStart
might
have a different set of fields that map to different columns in the corresponding
Iceberg table. The incoming records might have a nested JSON structure where the
event_type
can be nested within the
JSON record. In the upcoming sections, we discuss how you can route records to different
tables by providing the appropriate routing information to Firehose in such scenarios.
Provide routing information to Firehose with JSONQuery expression
The simplest and most cost effective way to provide record routing information to
Firehose is by providing a JSONQuery expression. With this approach, you provide
JSONQuery expressions for three parameters – Database Name
,
Table Name
, and (optionally) Operation
. Firehose uses the
expression that you provide to extract information from your incoming stream records
to route the records.
The Database Name
parameter specifies the name of the destination
database, the Table Name
parameter specifies the name of the
destination table, and Operation
is an optional parameter that
indicates whether to insert the incoming stream record as a new record into the
destination table, or to modify or delete an existing record in the destination
table. The Operation field must have one of the following values –
insert
, update
, or delete
.
For each of these three parameters, you can either provide a static value or a
dynamic expression where the value is retrieved from the incoming record. For
example, if you want to deliver all incoming stream records to a single database
named gameevents
, the Database Name would have a static value of
“gameevents”
. If the destination table name needs to be obtained
from a field in the incoming record, the Table Name is a dynamic expression that
specifies the field in the incoming record from which the destination table name
needs to be retrieved.
In the following example, we use a static value for Database Name, a dynamic value for Table Name, and a static value for operation. Note that specifying the Operation is optional. If no operation is specified, Firehose inserts the incoming records into the destination table as new records by default.
Database Name : "gameevents" Table Name : .event.event_type Operation : "insert"
Note that the event_type
field is nested within the JSON record, so
we specify Table Name with the nested field information as
.event.event_type
.
For example, if you want to route the record to a destination table based on the value in a top-level field, such as the “level-id” to route to a destination table named “the_good_place”, you would specify the JSONQuery specification as follows.
Database Name : "gameevents" Table Name : .level_id operation : "insert"
Note
-
When you specify the operation as
update
ordelete
, you must either specify unique keys for the destination table when you setup your Firehose stream, or set identifier-field-idsin Iceberg when you execute create table or alter table operations in Iceberg. If you fail to specify this, then Firehose throws an error and delivers data to an S3 error bucket. -
The
Database Name
andTable Name
values must exactly match with your destination database and table names. If they do not match, then Firehose throws an error and delivers data to an S3 error bucket.
Provide routing information using an Amazon Lambda function
There might be scenarios where you have complex rules that determine how to route
incoming records to a destination table. For example, you might have a rule that
defined if a field contains the value A, B, or F, that should be routed to a
destination table named TableX
or you might want to augment the
incoming stream record by adding additional attributes. For example, if a record
contains a field device_id
as 1, you might want to add another field
that device_type
as “modem“, and write the additional field to the
destination table column. In such cases, you can transform the source stream by
using an Amazon Lambda function in Firehose and provide routing information as part of the
output of the Lambda transformation function. To understand how you can transform the
source stream by using an Amazon Lambda function in Firehose, see Transform source data in Amazon Data Firehose.
When you use Lambda for transformation of source stream in Firehose, the output must
contain recordId
, result
,and data
or
KafkaRecordValue
parameters. The parameter recordId
contains input stream record, result
indicates whether the
transformation was successful, and data
contains the Base64-encoded
transformed output of your Lambda function. For more information, see Required parameters for data
transformation.
{ "recordId": "49655962066601463032522589543535113056108699331451682818000000", "result": "Ok", "data": "eyJjb3Vyc2VpZCI6ICI1IiwgImNvdXJzZW5hbWUiOiAiQ29tcHV0ZXIgU2NpZW5jZSIsICJzZW1lc3RlciI6ICJmYWxsIiwgImdyYWRlIjogIkEifQ==" }
To specify routing information to Firehose on how to route the stream record to a
destination table as part of your Lambda function, the output of your Lambda function
must contain an additional section for metadata
. The following example
shows how the metadata section is added to the Lambda output for a Firehose stream that
uses Kinesis Data Streams as a data source to instruct Firehose that it must insert the record as a new
record into table named PlayerSpawn
of the database
gameevents
.
{ "recordId": "49655962066601463032522589543535113056108699331451682818000000", "result": "Ok", "data": "eyJjb3Vyc2VpZCI6ICI1IiwgImNvdXJzZW5hbWUiOiAiQ29tcHV0ZXIgU2NpZW5jZSIsICJzZW1lc3RlciI6ICJmYWxsIiwgImdyYWRlIjogIkEifQ==", "metadata":{ "otfMetadata":{ "destinationTableName":"gameevents", "destinationDatabaseName":"PlayerSpawn", "operation":"insert" } } }
Similarly, the following example shows how you can add the metadata section to the
Lambda output for a Firehose that uses Amazon Managed Streaming for Apache Kafka as a data source to instruct Firehose that
it must insert the record as a new record into table named PlayerSpawn
in the database gameevents
.
{ "recordId": "49655962066601463032522589543535113056108699331451682818000000", "result": "Ok", "kafkaRecordValue": "eyJjb3Vyc2VpZCI6ICI1IiwgImNvdXJzZW5hbWUiOiAiQ29tcHV0ZXIgU2NpZW5jZSIsICJzZW1lc3RlciI6ICJmYWxsIiwgImdyYWRlIjogIkEifQ==", "metadata":{ "otfMetadata":{ "destinationTableName":"gameevents", "destinationDatabaseName":"PlayerSpawn", "operation":"insert" } } }
For this example,
-
destinationDatabaseName
refers to the name of the target database and is a required field. -
destinationTableName
refers to the name of the target table and is a required field. -
operation
is an optional field with possible values asinsert
,update
, anddelete
. If you do not specify any values, the default operation isinsert
.
Note
-
When you specify the operation as
update
ordelete
, you must either specify unique keys for the destination table when you setup your Firehose stream, or set identifier-field-idsin Iceberg when you execute create table or alter table operations in Iceberg. If you fail to specify this, then Firehose throws an error and delivers data to an S3 error bucket. -
The
Database Name
andTable Name
values must exactly match with your destination database and table names. If they do not match, then Firehose throws an error and delivers data to an S3 error bucket. -
When your Firehose stream has both a Lambda transformation function as well as a JSONQuery expression, Firehose first checks for the metadata field in Lambda output to determine how to route the record to the appropriate destination table, and then look at the output of your JSONQuery expression for missing fields.
If neither Lambda nor JSONQuery expression provides the required routing information, then Firehose assumes this as a single table scenario and looks for single table information in the unique keys configuration.
For more information, see the Route incoming records to a single Iceberg table. If Firehose fails to correctly determine routing information and match the record to a specified destination table, it delivers the data to your specified S3 error bucket.
Sample Lambda function
This Lambda function is a sample python code that parses the incoming stream records and adds required fields to specify how the data should be written to specific tables. You can use this sample code to add the metadata section for routing information.
import json import base64 def lambda_handler(firehose_records_input, context): print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn']) firehose_records_output = {} firehose_records_output['records'] = [] for firehose_record_input in firehose_records_input['records']: # Get payload from Lambda input, it could be different with different sources if 'kafkaRecordValue' in firehose_record_input: payload_bytes = base64.b64decode(firehose_record_input['kafkaRecordValue']).decode('utf-8') else payload_bytes = base64.b64decode(firehose_record_input['data']).decode('utf-8') # perform data processing on customer payload bytes here # Create output with proper record ID, output data (may be different with different sources), result, and metadata firehose_record_output = {} if 'kafkaRecordValue' in firehose_record_input: firehose_record_output['kafkaRecordValue'] = base64.b64encode(payload_bytes.encode('utf-8')) else firehose_record_output['data'] = base64.b64encode(payload_bytes.encode('utf-8')) firehose_record_output['recordId'] = firehose_record_input['recordId'] firehose_record_output['result'] = 'Ok' firehose_record_output['metadata'] = { 'otfMetadata': { 'destinationDatabaseName': 'your_destination_database', 'destinationTableName': 'your_destination_table', 'operation': 'insert' } } firehose_records_output['records'].append(firehose_record_output) return firehose_records_output