增量传输 - Amazon Glue
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

增量传输

增量传输允许您在每次运行作业时仅检索新的或更新的数据,从而避免对已经处理的记录进取行预处理。这种方法显著提高了效率,减少了数据传输量并最大限度地缩短了处理时间,对于经常变化的大型数据集尤其如此。

SAP OData 连接器支持两种类型的增量传输:

  • 基于增量令牌

  • 基于时间戳

基于增量令牌的增量传输

对于启用 ODP 且支持更改数据捕获(CDC)的实体,如果提供了 ENABLED_CDC 标志,则连接器将在 DynamicFrame 中返回增量令牌。增量令牌将在 DELTA_TOKEN 列的最后一行提供。您可以使用此令牌作为后续调用中的连接器选项来逐步检索下一组数据。

示例:

创建 DynamicFrame 时传递 ENABLE_CDC 标志。

sapodata_df = glueContext.create_dynamic_frame.from_options( connection_type="SAPOData", connection_options={ "connectionName": "connectionName", "ENTITY_NAME": "entityName", "ENABLE_CDC": "true" }, transformation_ctx=key) # Extract the delta token from the last row of the DELTA_TOKEN column delta_token_1 = your_logic_to_extract_delta_token(sapodata_df) # e.g., D20241029164449_000370000

您可以将提取的增量令牌作为检索新事件的选项传递。

sapodata_df_2 = glueContext.create_dynamic_frame.from_options( connection_type="SAPOData", connection_options={ "connectionName": "connectionName", "ENTITY_NAME": "entityName", // passing the delta token retrieved in the last run "DELTA_TOKEN": delta_token_1 } , transformation_ctx=key) # Extract the new delta token for the next run delta_token_2 = your_logic_to_extract_delta_token(sapodata_df_2)

请注意,存在 DELTA_TOKEN 的最后一条记录不是来自源的事务记录,而仅用于传递增量令牌值。

DELTA_TOKEN 外,数据帧的每一行都会返回以下字段:

  • GLUE_FETCH_SQ:此项是一个序列字段,根据记录接收顺序从 EPOC 时间戳生成,并且每个记录都是唯一的。如果您需要了解或确定源系统中的更改顺序,则可以使用此项。

  • DML_STATUS:对于从源新插入和更新的所有记录,此项将显示 UPDATED;对于从源删除的记录,此项将显示 DELETED

基于时间戳的增量传输

对于未启用 ODP 的实体(或启用 ODP 但未使用 ENABLE_CDC 标志的实体),您可以在连接器中使用 filteringExpression 选项来指示我们要检索数据的日期时间间隔。此方法依赖于数据中的时间戳字段,该字段表示每条记录的上次创建/修改时间。

示例:检索 2024-01-01T00:00:00.000 之后更改的记录

sapodata_df = glueContext.create_dynamic_frame.from_options( connection_type="SAPOData", connection_options={ "connectionName": "connectionName", "ENTITY_NAME": "entityName", "filteringExpression": "LastChangeDateTime >= 2024-01-01T00:00:00.000" }, transformation_ctx=key)
注意

在此示例中,LastChangeDateTime 是表示每条记录上次修改时间的字段。实际字段名称可能会因具体的 SAP OData 实体而有所不同。

要在后续运行中获得新的数据子集,您需要使用新时间戳更新 filteringExpression。通常,此项将是先前检索到的数据的最大时间戳值。例如:

max_timestamp = get_max_timestamp(sapodata_df) # Function to get the max timestamp from the previous run next_filtering_expression = f"LastChangeDateTime > {max_timestamp}" # Use this next_filtering_expression in your next run

在下一节中,我们将提供一种自动化的方法来管理这些基于时间戳的增量传输,从而无需在两次运行之间手动更新筛选表达式。

使用 SAP OData 状态管理脚本

要在 Glue 作业中使用 SAP OData 状态管理脚本,请执行以下步骤:

  1. 从公共 Amazon S3 存储桶下载状态管理脚本

  2. 将脚本上传到您的 Amazon Glue 作业有权访问的 Amazon S3 存储桶。

  3. 在 Amazon Glue 作业中引用脚本:创建或更新 Amazon Glue 作业时,传递引用 Amazon S3 存储桶中脚本路径的“--extra-py-files”选项。例如:--extra-py-files s3://your-bucket/path/to/sap_odata_state_management.py

  4. 在 Amazon Glue 作业脚本中导入和使用状态管理库。

基于增量令牌的增量传输示例

以下是有关如何使用状态管理脚本进行基于增量令牌的增量传输的示例:

from sap_odata_state_management import StateManagerFactory, StateManagerType, StateType # Initialize the state manager state_manager = StateManagerFactory.create_manager( manager_type=StateManagerType.JOB_TAG, state_type=StateType.DELTA_TOKEN, options={ "job_name": args['JOB_NAME'], "logger": logger } ) # Get connector options (including delta token if available) key = "SAPODataNode" connector_options = state_manager.get_connector_options(key) # Use the connector options in your Glue job df = glueContext.create_dynamic_frame.from_options( connection_type="SAPOData", connection_options={ "connectionName": "connectionName", "ENTITY_NAME": "entityName", "ENABLE_CDC": "true", **connector_options } ) # Process your data here... # Update the state after processing state_manager.update_state(key, sapodata_df.toDF())

基于时间戳的增量传输示例

以下是有关如何使用状态管理脚本进行基于时间戳的增量传输的示例:

from sap_odata_state_management import StateManagerFactory, StateManagerType, StateType # Initialize the state manager state_manager = StateManagerFactory.create_manager( manager_type=StateManagerType.JOB_TAG, state_type=StateType.TIMESTAMP, options={ "job_name": args['JOB_NAME'], "logger": logger, "timestamp_column": "LastChangeDateTime" } ) # Get connector options (including filtering expression if available) key = "SAPODataNode" connector_options = state_manager.get_connector_options(key) # Use the connector options in your Glue job sapodata_df = glueContext.create_dynamic_frame.from_options( connection_type="SAPOData", connection_options={ "connectionName": "connectionName", "ENTITY_NAME": "entityName", **connector_options } ) # Process your data here... # Update the state after processing state_manager.update_state(key, sapodata_df.toDF())

在这两个示例中,状态管理脚本处理在两次作业运行之间存储状态(增量令牌或时间戳)的复杂性。它会在获取连接器选项时自动检索上次已知状态,并在处理后更新状态,从而确保每次运行的作业仅处理新的或更改的数据。