Route incoming records to different Iceberg tables - Amazon Data Firehose
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

Firehose supports database as a source in all Amazon Web Services Regions except China Regions, Amazon GovCloud (US) Regions, and Asia Pacific (Malaysia). This feature is in preview and is subject to change. Do not use it for your production workloads.

Route incoming records to different Iceberg tables

Firehose can route incoming records in a stream to different Iceberg tables based on the content of the record. For example, consider the following sample input record.

{ "game_id": "111c-a4b8", "play_mode": 2, "event": { "event_type": "PlayerSpawn", "mode": "rejoin-mode" }, "position": { "x": 143.595901, "y": 476, "z": "0.24234876" }, "duration_secs": 2, "level_id": "the_good_place", "player_count": 3 }
{ "build_type": "RtDbg", "device_type": "PC", "event": { "event_type": "BootStart", }, "host_name": "BSK-DRFORNIES", "is_trial": false, "issuer_id": "", "mac_address": "", "platform": "PC", "play_go_source": -1, }

In this example, the event_type field has two possible values – PlayerSpawn and BootStart. When an incoming record has event_type field as PlayerSpawn, we want to write the record to an Iceberg table named PlayerSpawn, and when an incoming record has event_type field as BootStart, we want to write the record to a table named BootStart.

Note that the records with PlayerSpawn and BootStart might have a different set of fields that map to different columns in the corresponding Iceberg table. The incoming records might have a nested JSON structure where the event_type can be nested within the JSON record. In the upcoming sections, we discuss how you can route records to different tables by providing the appropriate routing information to Firehose in such scenarios.

Provide routing information to Firehose with JSONQuery expression

The simplest and most cost effective way to provide record routing information to Firehose is by providing a JSONQuery expression. With this approach, you provide JSONQuery expressions for three parameters – Database Name, Table Name, and (optionally) Operation. Firehose uses the expression that you provide to extract information from your incoming stream records to route the records.

The Database Name parameter specifies the name of the destination database, the Table Name parameter specifies the name of the destination table, and Operation is an optional parameter that indicates whether to insert the incoming stream record as a new record into the destination table, or to modify or delete an existing record in the destination table. The Operation field must have one of the following values – insert, update, or delete.

For each of these three parameters, you can either provide a static value or a dynamic expression where the value is retrieved from the incoming record. For example, if you want to deliver all incoming stream records to a single database named gameevents, the Database Name would have a static value of “gameevents”. If the destination table name needs to be obtained from a field in the incoming record, the Table Name is a dynamic expression that specifies the field in the incoming record from which the destination table name needs to be retrieved.

In the following example, we use a static value for Database Name, a dynamic value for Table Name, and a static value for operation. Note that specifying the Operation is optional. If no operation is specified, Firehose inserts the incoming records into the destination table as new records by default.

Database Name : "gameevents" Table Name : .event.event_type Operation : "insert"

Note that the event_type field is nested within the JSON record, so we specify Table Name with the nested field information as .event.event_type.

For example, if you want to route the record to a destination table based on the value in a top-level field, such as the “level-id” to route to a destination table named “the_good_place”, you would specify the JSONQuery specification as follows.

Database Name : "gameevents" Table Name : .level_id operation : "insert"
Note
  • When you specify the operation as update or delete, you must either specify unique keys for the destination table when you setup your Firehose stream, or set identifier-field-ids in Iceberg when you execute create table or alter table operations in Iceberg. If you fail to specify this, then Firehose throws an error and delivers data to an S3 error bucket.

  • The Database Name and Table Name values must exactly match with your destination database and table names. If they do not match, then Firehose throws an error and delivers data to an S3 error bucket.

Provide routing information using an Amazon Lambda function

There might be scenarios where you have complex rules that determine how to route incoming records to a destination table. For example, you might have a rule that defined if a field contains the value A, B, or F, that should be routed to a destination table named TableX or you might want to augment the incoming stream record by adding additional attributes. For example, if a record contains a field device_id as 1, you might want to add another field that device_type as “modem“, and write the additional field to the destination table column. In such cases, you can transform the source stream by using an Amazon Lambda function in Firehose and provide routing information as part of the output of the Lambda transformation function. To understand how you can transform the source stream by using an Amazon Lambda function in Firehose, see Transform source data in Amazon Data Firehose.

When you use Lambda for transformation of source stream in Firehose, the output must contain recordId, result ,and data or KafkaRecordValue parameters. The parameter recordId contains input stream record, result indicates whether the transformation was successful, and data contains the Base64-encoded transformed output of your Lambda function. For more information, see Required parameters for data transformation.

{ "recordId": "49655962066601463032522589543535113056108699331451682818000000", "result": "Ok", "data": "eyJjb3Vyc2VpZCI6ICI1IiwgImNvdXJzZW5hbWUiOiAiQ29tcHV0ZXIgU2NpZW5jZSIsICJzZW1lc3RlciI6ICJmYWxsIiwgImdyYWRlIjogIkEifQ==" }

To specify routing information to Firehose on how to route the stream record to a destination table as part of your Lambda function, the output of your Lambda function must contain an additional section for metadata. The following example shows how the metadata section is added to the Lambda output for a Firehose stream that uses Kinesis Data Streams as a data source to instruct Firehose that it must insert the record as a new record into table named PlayerSpawn of the database gameevents.

{ "recordId": "49655962066601463032522589543535113056108699331451682818000000", "result": "Ok", "data": "eyJjb3Vyc2VpZCI6ICI1IiwgImNvdXJzZW5hbWUiOiAiQ29tcHV0ZXIgU2NpZW5jZSIsICJzZW1lc3RlciI6ICJmYWxsIiwgImdyYWRlIjogIkEifQ==", "metadata":{ "otfMetadata":{ "destinationTableName":"gameevents", "destinationDatabaseName":"PlayerSpawn", "operation":"insert" } } }

Similarly, the following example shows how you can add the metadata section to the Lambda output for a Firehose that uses Amazon Managed Streaming for Apache Kafka as a data source to instruct Firehose that it must insert the record as a new record into table named PlayerSpawn in the database gameevents.

{ "recordId": "49655962066601463032522589543535113056108699331451682818000000", "result": "Ok", "kafkaRecordValue": "eyJjb3Vyc2VpZCI6ICI1IiwgImNvdXJzZW5hbWUiOiAiQ29tcHV0ZXIgU2NpZW5jZSIsICJzZW1lc3RlciI6ICJmYWxsIiwgImdyYWRlIjogIkEifQ==", "metadata":{ "otfMetadata":{ "destinationTableName":"gameevents", "destinationDatabaseName":"PlayerSpawn", "operation":"insert" } } }

For this example,

  • destinationDatabaseName refers to the name of the target database and is a required field.

  • destinationTableName refers to the name of the target table and is a required field.

  • operation is an optional field with possible values as insert, update, and delete. If you do not specify any values, the default operation is insert.

Note
  • When you specify the operation as update or delete, you must either specify unique keys for the destination table when you setup your Firehose stream, or set identifier-field-ids in Iceberg when you execute create table or alter table operations in Iceberg. If you fail to specify this, then Firehose throws an error and delivers data to an S3 error bucket.

  • The Database Name and Table Name values must exactly match with your destination database and table names. If they do not match, then Firehose throws an error and delivers data to an S3 error bucket.

  • When your Firehose stream has both a Lambda transformation function as well as a JSONQuery expression, Firehose first checks for the metadata field in Lambda output to determine how to route the record to the appropriate destination table, and then look at the output of your JSONQuery expression for missing fields.

    If neither Lambda nor JSONQuery expression provides the required routing information, then Firehose assumes this as a single table scenario and looks for single table information in the unique keys configuration.

    For more information, see the Route incoming records to a single Iceberg table. If Firehose fails to correctly determine routing information and match the record to a specified destination table, it delivers the data to your specified S3 error bucket.

Sample Lambda function

This Lambda function is a sample python code that parses the incoming stream records and adds required fields to specify how the data should be written to specific tables. You can use this sample code to add the metadata section for routing information.

import json import base64 def lambda_handler(firehose_records_input, context): print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn']) firehose_records_output = {} firehose_records_output['records'] = [] for firehose_record_input in firehose_records_input['records']: # Get payload from Lambda input, it could be different with different sources if 'kafkaRecordValue' in firehose_record_input: payload_bytes = base64.b64decode(firehose_record_input['kafkaRecordValue']).decode('utf-8') else payload_bytes = base64.b64decode(firehose_record_input['data']).decode('utf-8') # perform data processing on customer payload bytes here # Create output with proper record ID, output data (may be different with different sources), result, and metadata firehose_record_output = {} if 'kafkaRecordValue' in firehose_record_input: firehose_record_output['kafkaRecordValue'] = base64.b64encode(payload_bytes.encode('utf-8')) else firehose_record_output['data'] = base64.b64encode(payload_bytes.encode('utf-8')) firehose_record_output['recordId'] = firehose_record_input['recordId'] firehose_record_output['result'] = 'Ok' firehose_record_output['metadata'] = { 'otfMetadata': { 'destinationDatabaseName': 'your_destination_database', 'destinationTableName': 'your_destination_table', 'operation': 'insert' } } firehose_records_output['records'].append(firehose_record_output) return firehose_records_output