使用 Oracle 创建自定义插件 - Amazon Managed Workflows for Apache Airflow
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Oracle 创建自定义插件

以下示例将引导您完成使用 Oracle for Amazon MWAA 创建自定义插件的步骤,该插件可以与 plugins.zip 文件中的其他自定义插件和二进制文件组合使用。

版本

  • 本页上的示例代码可与 Python 3.7 中的 Apache Airflow v1 一起使用。

先决条件

要使用本页上的示例代码,您需要以下内容:

权限

  • 无需其他权限即可使用本页上的代码示例。

要求

要使用本页上的示例代码,请将以下依赖项添加到 requirements.txt。要了解更多信息,请参阅 安装 Python 依赖项

Apache Airflow v2
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt cx_Oracle apache-airflow-providers-oracle
Apache Airflow v1
cx_Oracle==8.1.0 apache-airflow[oracle]==1.10.12

代码示例

以下步骤描述了如何创建用于测试自定义插件的DAG代码。

  1. 在命令提示符下,导航到存储DAG代码的目录。例如:

    cd dags
  2. 复制以下代码示例的内容,并在本地另存为 oracle.py

    from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago import os import cx_Oracle DAG_ID = os.path.basename(__file__).replace(".py", "") def testHook(**kwargs): cx_Oracle.init_oracle_client() version = cx_Oracle.clientversion() print("cx_Oracle.clientversion",version) return version with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: hook_test = PythonOperator( task_id="hook_test", python_callable=testHook, provide_context=True )

创建自定义插件

本节介绍如何下载依赖项、创建自定义插件和 plugins.zip。

下载依赖项

亚马逊MWAA会将 plugins.zip 的内容提取到/usr/local/airflow/plugins每个亚马逊MWAA调度程序和工作程序容器中。这用于向环境中添加二进制文件。以下步骤介绍如何组装自定义插件所需的文件。

拉取 Amazon Linux 容器镜像
  1. 在命令提示符下,提取 Amazon Linux 容器镜像,然后在本地运行该容器。例如:

    docker pull amazonlinux docker run -it amazonlinux:latest /bin/bash

    命令提示符应该调用 bash 命令行。例如:

    bash-4.2#
  2. 安装 Linux 原生异步 I/O 工具(libaio)。

    yum -y install libaio
  3. 请将此窗口保持打开状态以供后续步骤使用。我们将在本地复制以下文件:lib64/libaio.so.1lib64/libaio.so.1.0.0lib64/libaio.so.1.0.1

下载客户端文件夹
  1. 在本地安装解压缩包。例如:

    sudo yum install unzip
  2. 创建 oracle_plugin 目录。例如:

    mkdir oracle_plugin cd oracle_plugin
  3. 使用以下 curl 命令从适用于 Linux x86-64(64 位)的 Oracle 即时客户端下载中下载 instantclient-basic-linux.x64-18.5.0.0.0dbru.zip。

    curl https://download.oracle.com/otn_software/linux/instantclient/185000/instantclient-basic-linux.x64-18.5.0.0.0dbru.zip > client.zip
  4. 解压缩 client.zip 文件。例如:

    unzip *.zip
从 Docker 中提取文件
  1. 在新的命令提示符下,显示并写下 Docker 容器 ID。例如:

    docker container ls

    您的命令提示符应返回所有容器及其容器IDs。例如:

    debc16fd6970
  2. oracle_plugin 目录中,将 lib64/libaio.so.1lib64/libaio.so.1.0.0lib64/libaio.so.1.0.1 文件解压缩到本地 instantclient_18_5 文件夹。例如:

    docker cp debc16fd6970:/lib64/libaio.so.1 instantclient_18_5/ docker cp debc16fd6970:/lib64/libaio.so.1.0.0 instantclient_18_5/ docker cp debc16fd6970:/lib64/libaio.so.1.0.1 instantclient_18_5/

自定义插件

Apache Airflow 将在启动时执行插件文件夹中的 Python 文件内容。这用于设置和修改环境变量。以下步骤介绍了此自定义插件的示例代码。

  • 复制以下代码示例的内容,并在本地另存为 env_var_plugin_oracle.py

    from airflow.plugins_manager import AirflowPlugin import os os.environ["LD_LIBRARY_PATH"]='/usr/local/airflow/plugins/instantclient_18_5' os.environ["DPI_DEBUG_LEVEL"]="64" class EnvVarPlugin(AirflowPlugin): name = 'env_var_plugin'

Plugins.zip

以下步骤显示如何创建 plugins.zip。此示例的内容可以与其他插件和二进制文件合并到单个 plugins.zip 文件中。

压缩插件目录中的内容。
  1. 在命令行提示符中,导航到 oracle_plugin 目录。例如:

    cd oracle_plugin
  2. instantclient_18_5 目录压缩到 plugins.zip 中。例如:

    zip -r ../plugins.zip ./
  3. 您应该在命令提示符中看到如下内容:

    oracle_plugin$ ls client.zip instantclient_18_5
  4. 移除该 client.zip 文件。例如:

    rm client.zip
压缩 env_var_plugin_oracle.py 文件
  1. env_var_plugin_oracle.py 文件添加到 plugins.zip 文件的根目录。例如:

    zip plugins.zip env_var_plugin_oracle.py
  2. plugins.zip 现在应包含以下内容:

    env_var_plugin_oracle.py instantclient_18_5/

Airflow 配置选项

如果您使用的是 Apache Airflow v2,请添加 core.lazy_load_plugins : False 为 Apache Airflow 配置选项。要了解更多信息,请参阅 2 中的使用配置选项加载插件

接下来做什么?

  • 要了解如何将本示例中的 requirements.txt 文件上传到 Amazon S3 存储桶,请参阅 安装 Python 依赖项

  • 了解如何将本示例中的DAG代码上传到您的 Amazon S3 存储桶中的dags文件夹添加或更新 DAG

  • 要了解如何将本示例中的 plugins.zip 文件上传到 Amazon S3 存储桶,请参阅 安装自定义插件