

# 创建 SAP OData 作业


请参阅[使用 Amazon Glue Studio 构建可视化 ETL 作业](https://docs.amazonaws.cn/glue/latest/dg/author-job-glue.html)

# 操作数据供应（ODP）来源


操作数据供应（ODP）提供了一个技术基础架构，可用于支持各种目标应用程序的数据提取和复制，并支持这些场景中的增量机制。对于增量过程，来自源（ODP 提供程序）的数据使用更新过程自动写入到增量队列（操作增量队列 – ODQ），或者使用提取器接口传递到增量队列。ODP 提供程序可以是数据源（提取器）、ABAP 核心数据服务视图（ABAP CDS 视图）、SAP BW 或 SAP BW/4HANA、SAP Landscape Transformation Replication Server（SLT）和 SAP HANA 信息视图（计算视图）。目标应用程序（称为 ODQ“订阅用户”或更普遍的“ODP 使用者”）从增量队列中检索数据并继续处理数据。

## 完全加载


在 SAP OData 和 ODP 实体环境中，**完全加载**是指通过一次操作从 ODP 实体提取所有可用数据的过程。此操作从源系统检索完整的数据集，确保目标系统拥有实体数据全面且最新的副本。完全加载通常用于不支持增量加载的源或需要刷新目标系统的情况。

**示例**

在创建 DynamicFrame 时，您可以将 `ENABLE_CDC` 标志显式设置为 false。注意：默认情况下 `ENABLE_CDC` 为 false；如果您不想初始化增量队列，则无需发送此标志或将其设置为 true。不将此标志设置为 true 将导致完全加载提取。

```
sapodata_df = glueContext.create_dynamic_frame.from_options(
    connection_type="SAPOData",
    connection_options={
        "connectionName": "connectionName",
        "ENTITY_NAME": "entityName",
        "ENABLE_CDC": "false"
    }, transformation_ctx=key)
```

## 增量加载


ODP（操作数据供应）实体环境中的**增量加载**涉及仅提取源系统中自上次数据提取以来新的或更改的数据（增量），从而避免对已经处理的记录进行预处理。这种方法显著提高了效率，减少了数据传输量，增强了性能，确保了系统之间的有效同步，并最大限度地缩短了处理时间，对于经常变化的大型数据集尤其如此。

# 基于增量令牌的增量传输


要使用更改数据捕获（CDC）为支持 ODP 的实体启用增量传输，请执行以下步骤：

1. 在脚本模式下创建增量传输作业。

1. 创建 DataFrame 或 Glue DynamicFrame 时，您需要传递选项 `"ENABLE_CDC": "True"`。此选项可确保您将收到来自 SAP 的增量令牌，该令牌可用于随后检索更改的数据。

增量令牌将在数据帧最后一行的 DELTA\$1TOKEN 列中显示。此令牌可用作后续调用中的连接器选项来逐步检索下一组数据。

**示例**
+ 创建 DynamicFrame 时，我们将 `ENABLE_CDC` 标志设置为 `true`。注意：默认情况下 `ENABLE_CDC` 为 `false`；如果您不想初始化增量队列，则无需发送此标志或将其设置为 true。不将此标志设置为 true 将导致完全加载提取。

  ```
  sapodata_df = glueContext.create_dynamic_frame.from_options(
      connection_type="SAPOData",
      connection_options={
          "connectionName": "connectionName",
          "ENTITY_NAME": "entityName",
          "ENABLE_CDC": "true"
      }, transformation_ctx=key)
  
  # Extract the delta token from the last row of the DELTA_TOKEN column
  delta_token_1 = your_logic_to_extract_delta_token(sapodata_df) # e.g., D20241029164449_000370000
  ```
+ 提取的增量令牌可作为检索新事件的选项传递。

  ```
  sapodata_df_2 = glueContext.create_dynamic_frame.from_options(
      connection_type="SAPOData",
      connection_options={
          "connectionName": "connectionName",
          "ENTITY_NAME": "entityName",
          // passing the delta token retrieved in the last run
          "DELTA_TOKEN": delta_token_1
      } , transformation_ctx=key)
  
  # Extract the new delta token for the next run
  delta_token_2 = your_logic_to_extract_delta_token(sapodata_df_2)
  ```

请注意，存在 `DELTA_TOKEN` 的最后一条记录不是来自源的事务记录，而仅用于传递增量令牌值。

除 `DELTA_TOKEN` 外，数据帧的每一行都会返回以下字段。
+ **GLUE\$1FETCH\$1SQ**：此项是一个序列字段，根据记录接收顺序从 EPOC 时间戳生成，并且每个记录都是唯一的。如果您需要了解或确定源系统中的更改顺序，则可以使用此项。此字段仅对启用 ODP 的实体显示。
+ **DML\$1STATUS**：对于从源新插入和更新的所有记录，此项将显示 `UPDATED`；对于从源删除的记录，此项将显示 `DELETED`。

有关如何通过示例管理状态和重用增量令牌来检索已更改记录的更多详细信息，请参阅[使用 SAP OData 状态管理脚本](sap-odata-state-management-script.md)一节。

## 增量令牌失效


增量令牌与服务集合和用户相关联。如果为同一个服务集合和用户启动新的初始拉取且 `“ENABLE_CDC” : “true”`，则 SAP OData 服务将使因之前初始化发出的所有之前的增量令牌失效。使用过期的增量令牌调用连接器将导致异常：

`Could not open data access via extraction API RODPS_REPL_ODP_OPEN` 

# OData 服务（非 ODP 来源）


## 完全加载


对于非 ODP（操作数据供应）系统，**完全加载**包括从源系统提取整个数据集并将其加载到目标系统中。由于非 ODP 系统本质上不支持增量等高级数据提取机制，因此该过程很简单，但可能会占用大量资源，具体取决于数据的大小。

## 增量加载


对于不支持 **ODP（操作数据供应）**的系统或实体，可以通过实施基于时间戳的机制来跟踪和提取更改，从而手动管理增量数据传输。

**基于时间戳的增量传输**

对于未启用 ODP 的实体（或启用 ODP 但未使用 ENABLE\$1CDC 标志的实体），我们可以在连接器中使用 `filteringExpression` 选项来指示我们要检索数据的 `datetime` 间隔。此方法依赖于数据中的时间戳字段，该字段表示每条记录的上次创建/修改时间。

**示例**

检索 2024-01-01T00:00:00.000 之后更改的记录

```
sapodata_df = glueContext.create_dynamic_frame.from_options(
    connection_type="SAPOData",
    connection_options={
        "connectionName": "connectionName",
        "ENTITY_NAME": "entityName",
        "filteringExpression": "LastChangeDateTime >= 2024-01-01T00:00:00.000"
    }, transformation_ctx=key)
```

注意：在此示例中，`LastChangeDateTime` 是表示每条记录上次修改时间的字段。实际字段名称可能会因具体的 SAP OData 实体而有所不同。

要在后续运行中获得新的数据子集，您需要使用新时间戳更新 `filteringExpression`。通常，此项将是先前检索到的数据的最大时间戳值。

**示例**

```
max_timestamp = get_max_timestamp(sapodata_df)  # Function to get the max timestamp from the previous run
next_filtering_expression = f"LastChangeDateTime > {max_timestamp}"

# Use this next_filtering_expression in your next run
```

在下一节中，我们将提供一种自动化的方法来管理这些基于时间戳的增量传输，从而无需在两次运行之间手动更新筛选表达式。