Incremental transfers
Incremental transfers allow you to retrieve only new or updated data each time a job is run, avoiding preprocessing already processed records. This approach significantly improves efficiency, reduces data transfer volumes, and minimizes processing time, especially for large datasets that change frequently.
The SAP OData connector supports two types of incremental transfers:
Delta Token-based
Timestamp-based
Delta Token-based incremental transfers
For ODP enabled entities that support Change Data Capture(CDC), a delta token will be returned by the connector in the DynamicFrame if the ENABLED_CDC
flag has been provided. The delta token will be provided in the last row of the DELTA_TOKEN column. You can use this token as a connector option in subsequent calls to incrementally retrieve the next set of data.
Example:
Pass the ENABLE_CDC
flag when creating the DynamicFrame.
sapodata_df = glueContext.create_dynamic_frame.from_options( connection_type="SAPOData", connection_options={ "connectionName": "connectionName", "ENTITY_NAME": "entityName", "ENABLE_CDC": "true" }, transformation_ctx=key) # Extract the delta token from the last row of the DELTA_TOKEN column delta_token_1 = your_logic_to_extract_delta_token(sapodata_df) # e.g., D20241029164449_000370000
You can pass the extracted delta token as an option to retrieve new events.
sapodata_df_2 = glueContext.create_dynamic_frame.from_options( connection_type="SAPOData", connection_options={ "connectionName": "connectionName", "ENTITY_NAME": "entityName", // passing the delta token retrieved in the last run "DELTA_TOKEN": delta_token_1 } , transformation_ctx=key) # Extract the new delta token for the next run delta_token_2 = your_logic_to_extract_delta_token(sapodata_df_2)
Note that the last record, in which the DELTA_TOKEN
is present, is not a transactional record from the source, and is only there for the purpose of passing the delta token value.
Apart from the DELTA_TOKEN
, the following fields are returned in each row of the dataframe:
GLUE_FETCH_SQ
: This is a sequence field, generated from the EPOC timestamp in the order the record was received, and is unique for each record. This can be used if you need to know or establish the order of changes in the source system.DML_STATUS
: This will showUPDATED
for all newly inserted and updated records from the source, andDELETED
for records that have been deleted from source.
Timestamp-based incremental transfers
For non-ODP enabled entities(or for ODP enabled entities that don't use the ENABLE_CDC
flag), you can use a filteringExpression
option in the connector to indicate the datetime interval for which we want to retrieve data. This method relies on a timestamp field in you data that represents when each records was last created/modified.
Example: retrieving records that changed after 2024-01-01T00:00:00.000
sapodata_df = glueContext.create_dynamic_frame.from_options( connection_type="SAPOData", connection_options={ "connectionName": "connectionName", "ENTITY_NAME": "entityName", "filteringExpression": "LastChangeDateTime >= 2024-01-01T00:00:00.000" }, transformation_ctx=key)
Note
In this example, LastChangeDateTime
is the field that represents when each record was last modified. The actual field name may vary depending on your specific SAP OData entity.
To get a new subset of data in subsequent runs, you would update the filteringExpression
with a new timestamp. Typically, this would be the maximum timestamp value from the previously retrieved data. For example:
max_timestamp = get_max_timestamp(sapodata_df) # Function to get the max timestamp from the previous run next_filtering_expression = f"LastChangeDateTime > {max_timestamp}" # Use this next_filtering_expression in your next run
In the next section, we will provide an automated approach to manage these timestamp-based incremental transfers, eliminating the need to manually update the filtering expression between runs.
Using the SAP OData state management script
To use the SAP OData state management script in your Glue job, follow these steps:
Upload the script: Download the script and upload it to an Amazon S3 bucket that your Amazon Glue job has permissions to access.
Reference the script in your Amazon Glue job: When creating or updating your Amazon Glue job, pass the '--extra-py-files' option referencing the script path in your Amazon S3 bucket. For example:
--extra-py-files s3://your-bucket/path/to/sap_odata_state_management.py
Import and use the state management library in your Amazon Glue job scripts.
Delta token-based incremental transfer example
Here's an example of how to use the state management script for delta-token based incremental transfers:
from sap_odata_state_management import StateManagerFactory, StateManagerType, StateType # Initialize the state manager state_manager = StateManagerFactory.create_manager( manager_type=StateManagerType.JOB_TAG, state_type=StateType.DELTA_TOKEN, options={ "job_name": args['JOB_NAME'], "logger": logger } ) # Get connector options (including delta token if available) key = "SAPODataNode" connector_options = state_manager.get_connector_options(key) # Use the connector options in your Glue job df = glueContext.create_dynamic_frame.from_options( connection_type="SAPOData", connection_options={ "connectionName": "connectionName", "ENTITY_NAME": "entityName", "ENABLE_CDC": "true", **connector_options } ) # Process your data here... # Update the state after processing state_manager.update_state(key, sapodata_df.toDF())
Timestamp-based incremental transfer example
Here's an example of how to use the state management script for timestamp-based incremental transfers:
from sap_odata_state_management import StateManagerFactory, StateManagerType, StateType # Initialize the state manager state_manager = StateManagerFactory.create_manager( manager_type=StateManagerType.JOB_TAG, state_type=StateType.TIMESTAMP, options={ "job_name": args['JOB_NAME'], "logger": logger, "timestamp_column": "LastChangeDateTime" } ) # Get connector options (including filtering expression if available) key = "SAPODataNode" connector_options = state_manager.get_connector_options(key) # Use the connector options in your Glue job sapodata_df = glueContext.create_dynamic_frame.from_options( connection_type="SAPOData", connection_options={ "connectionName": "connectionName", "ENTITY_NAME": "entityName", **connector_options } ) # Process your data here... # Update the state after processing state_manager.update_state(key, sapodata_df.toDF())
In both examples, the state management script handles the complexities of storing the state(either delta token or timestamp) between job runs. It automatically retrieves the last know state when getting connector options and updates the state after processing, ensuring the each job run only processes new or changed data.