使用 Apache Hive 和 Hadoop 创建自定义插件 - Amazon Managed Workflows for Apache Airflow
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Apache Hive 和 Hadoop 创建自定义插件

Amazon MWAA 将 plugins.zip 的内容提取到 /usr/local/airflow/plugins。这可以用来向容器中添加二进制文件。此外,Apache Airflow 会在启动时执行 plugins 文件夹中的 Python 文件内容,使您能够设置和修改环境变量。以下示例将引导您完成在 Amazon MWAA 环境中使用 Apache Hive 和 Hadoop 创建自定义插件的步骤,该插件可以与其他自定义插件和二进制文件组合使用。

版本

  • 本页上的示例代码可与 Python 3.7 中的 Apache Airflow v1一起使用。

  • 您可以将本页上的代码示例与 Python 3.10 中的 Apache Airflow v2 及更高版本一起使用。

先决条件

要使用本页上的示例代码,您需要以下内容:

权限

  • 无需其他权限即可使用本页上的代码示例。

要求

要使用本页上的示例代码,请将以下依赖项添加到 requirements.txt。要了解更多信息,请参阅 安装 Python 依赖项

Apache Airflow v2
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt apache-airflow-providers-amazon[apache.hive]
Apache Airflow v1
apache-airflow[hive]==1.10.12

下载依赖项

Amazon MWAA 会将 plugins.zip 的内容提取到每个 Amazon MWAA 计划程序和工作线程容器上的 /usr/local/airflow/plugins。这用于向环境中添加二进制文件。以下步骤介绍如何组装自定义插件所需的文件。

  1. 在命令提示符下,导航到要创建插件的目录。例如:

    cd plugins
  2. 镜像中下载 Hadoop,例如:

    wget https://downloads.apache.org/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz
  3. 镜像中下载 Hive,例如:

    wget https://downloads.apache.org/hive/hive-3.1.2/apache-hive-3.1.2-bin.tar.gz
  4. 创建目录。例如:

    mkdir hive_plugin
  5. Hadoop 提取。

    tar -xvzf hadoop-3.3.0.tar.gz -C hive_plugin
  6. Hive 提取。

    tar -xvzf apache-hive-3.1.2-bin.tar.gz -C hive_plugin

自定义插件

Apache Airflow 将在启动时执行插件文件夹中的 Python 文件内容。这用于设置和修改环境变量。以下步骤介绍了此自定义插件的示例代码。

  1. 在命令行提示符中,导航到 hive_plugin 目录。例如:

    cd hive_plugin
  2. 复制以下代码示例的内容,并在 hive_plugin 目录中将其本地另存为 hive_plugin.py

    from airflow.plugins_manager import AirflowPlugin import os os.environ["JAVA_HOME"]="/usr/lib/jvm/jre" os.environ["HADOOP_HOME"]='/usr/local/airflow/plugins/hadoop-3.3.0' os.environ["HADOOP_CONF_DIR"]='/usr/local/airflow/plugins/hadoop-3.3.0/etc/hadoop' os.environ["HIVE_HOME"]='/usr/local/airflow/plugins/apache-hive-3.1.2-bin' os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/plugins/hadoop-3.3.0:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/bin:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" os.environ["CLASSPATH"] = os.getenv("CLASSPATH") + ":/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" class EnvVarPlugin(AirflowPlugin): name = 'hive_plugin'
  3. 复制以下文本的内容,并在 hive_plugin 目录中将其本地另存为 .airflowignore

    hadoop-3.3.0 apache-hive-3.1.2-bin

Plugins.zip

以下步骤显示如何创建 plugins.zip。此示例的内容可以与其他插件和二进制文件组合成一个 plugins.zip 文件。

  1. 在命令提示符下,导航到上一步中的 hive_plugin 目录。例如:

    cd hive_plugin
  2. 将内容压缩到 plugins 文件夹中。

    zip -r ../hive_plugin.zip ./

代码示例

以下步骤介绍如何创建用于测试自定义插件的 DAG 代码。

  1. 在命令提示符下,导航到存储 DAG 代码的目录。例如:

    cd dags
  2. 复制以下代码示例的内容,并在本地另存为 hive.py

    from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago with DAG(dag_id="hive_test_dag", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: hive_test = BashOperator( task_id="hive_test", bash_command='hive --help' )

Airflow 配置选项

如果您使用的是 Apache Airflow v2,请添加 core.lazy_load_plugins : False 为 Apache Airflow 配置选项。要了解更多信息,请参阅 2 中的使用配置选项加载插件

接下来做什么?

  • 要了解如何将本示例中的 requirements.txt 文件上传到 Amazon S3 存储桶,请参阅 安装 Python 依赖项

  • 要了解如何将本示例中的 DAGD 代码上传到 Amazon S3 存储桶的 dags 文件夹,请参阅 添加或更新 DAG

  • 要了解如何将本示例中的 plugins.zip 文件上传到 Amazon S3 存储桶,请参阅 安装自定义插件