本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Apache Hive 和 Hadoop 创建自定义插件
Amazon 将 a 的内容MWAA提取plugins.zip
到/usr/local/airflow/plugins
。这可以用来向容器中添加二进制文件。此外,Apache Airflow 会在启动时执行 plugins
文件夹中的 Python 文件内容,使您能够设置和修改环境变量。以下示例将引导您完成在 Amazon MWAA 环境中使用 Apache Hive 和 Hadoop 创建自定义插件的步骤,该插件可以与其他自定义插件和二进制文件组合使用。
版本
-
本页上的示例代码可与 Python 3.7
中的 Apache Airflow v1 一起使用。
先决条件
要使用本页上的示例代码,您需要以下内容:
权限
-
无需其他权限即可使用本页上的代码示例。
要求
要使用本页上的示例代码,请将以下依赖项添加到 requirements.txt
。要了解更多信息,请参阅 安装 Python 依赖项。
下载依赖项
亚马逊MWAA会将 plugins.zip 的内容提取到/usr/local/airflow/plugins
每个亚马逊MWAA调度程序和工作程序容器中。这用于向环境中添加二进制文件。以下步骤介绍如何组装自定义插件所需的文件。
-
在命令提示符下,导航到要创建插件的目录。例如:
cd plugins
-
wget https://downloads.apache.org/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz
-
wget https://downloads.apache.org/hive/hive-3.1.2/apache-hive-3.1.2-bin.tar.gz
-
创建目录。例如:
mkdir hive_plugin
-
Hadoop 提取。
tar -xvzf hadoop-3.3.0.tar.gz -C hive_plugin
-
Hive 提取。
tar -xvzf apache-hive-3.1.2-bin.tar.gz -C hive_plugin
自定义插件
Apache Airflow 将在启动时执行插件文件夹中的 Python 文件内容。这用于设置和修改环境变量。以下步骤介绍了此自定义插件的示例代码。
-
在命令行提示符中,导航到
hive_plugin
目录。例如:cd hive_plugin
-
复制以下代码示例的内容,并在
hive_plugin
目录中将其本地另存为hive_plugin.py
。from airflow.plugins_manager import AirflowPlugin import os os.environ["JAVA_HOME"]="/usr/lib/jvm/jre" os.environ["HADOOP_HOME"]='/usr/local/airflow/plugins/hadoop-3.3.0' os.environ["HADOOP_CONF_DIR"]='/usr/local/airflow/plugins/hadoop-3.3.0/etc/hadoop' os.environ["HIVE_HOME"]='/usr/local/airflow/plugins/apache-hive-3.1.2-bin' os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/plugins/hadoop-3.3.0:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/bin:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" os.environ["CLASSPATH"] = os.getenv("CLASSPATH") + ":/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" class EnvVarPlugin(AirflowPlugin): name = 'hive_plugin'
-
复制以下文本的内容,并在
hive_plugin
目录中将其本地另存为.airflowignore
。hadoop-3.3.0 apache-hive-3.1.2-bin
Plugins.zip
以下步骤显示如何创建 plugins.zip
。此示例的内容可以与其他插件和二进制文件组合成一个 plugins.zip
文件。
-
在命令提示符下,导航到上一步中的
hive_plugin
目录。例如:cd hive_plugin
-
将内容压缩到
plugins
文件夹中。zip -r ../hive_plugin.zip ./
代码示例
以下步骤描述了如何创建用于测试自定义插件的DAG代码。
-
在命令提示符下,导航到存储DAG代码的目录。例如:
cd dags
-
复制以下代码示例的内容,并在本地另存为
hive.py
。from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago with DAG(dag_id="hive_test_dag", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: hive_test = BashOperator( task_id="hive_test", bash_command='hive --help' )
Airflow 配置选项
如果您使用的是 Apache Airflow v2,请添加 core.lazy_load_plugins : False
为 Apache Airflow 配置选项。要了解更多信息,请参阅 2 中的使用配置选项加载插件。
接下来做什么?
-
要了解如何将本示例中的
requirements.txt
文件上传到 Amazon S3 存储桶,请参阅 安装 Python 依赖项。 -
了解如何将本示例中的DAG代码上传到您的 Amazon S3 存储桶中的
dags
文件夹添加或更新 DAG。 -
要了解如何将本示例中的
plugins.zip
文件上传到 Amazon S3 存储桶,请参阅 安装自定义插件。