安装自定义插件 - Amazon Managed Workflows for Apache Airflow
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

安装自定义插件

Amazon MWAA 支持 Apache Airflow 的内置插件管理器,允许您使用自定义 Apache Airflow 运算符、挂钩、传感器或接口。本页介绍使用 plugins.zip 文件在 Amazon MWAA 环境中安装 Apache Airflow 自定义插件的步骤。

先决条件

在完成本页上的步骤之前,您需要具备以下条件。

  • 权限 — Amazon 账户必须已获得管理员授权,访问适用于环境的 AmazonMWAAFullConsoleAccess 访问控制策略。此外,执行角色必须允许 Amazon MWAA 环境访问环境所使用的 Amazon 资源。

  • 访问权限-如果您需要访问公共存储库才能直接在 Web 服务器上安装依赖项,则必须将环境配置为具有公共网络 Web 服务器访问权限。有关更多信息,请参阅 Apache Airflow 访问模式

  • Amazon S3 配置 — 用于存储 DAG 的 Amazon S3 存储桶、在 plugins.zip 中的自定义插件和在 requirements.txt 中的 Python 依赖项必须配置为已阻止公共访问已启用版本控制

工作原理

要在环境中运行自定义插件,您必须做三件事:

  1. 在本地创建 plugins.zip 文件。

  2. plugins.zip 文件上传到 Amazon S3 中的存储桶。

  3. 在 Amazon MWAA 控制台的插件文件字段中指定此文件的版本。

注意

如果这是您首次将 plugins.zip 上传到 Amazon S3 存储桶,则还需要在 Amazon MWAA 控制台上指定文件路径。您只需要完成此步骤一次。

v2 中发生了什么变化

  • 新增:运算符、挂钩和执行程序。DAG 中的导入语句以及在 Amazon MWAA 上您在 plugins.zip 中指定的自定义插件在 Apache Airflow v1 和 Apache Airflow v2 之间发生了变化。例如,,Apache Airflow v1 中的 from airflow.contrib.hooks.aws_hook import AwsHook 已更改为 Apache Airflow v2 中的 from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook。要了解更多信息,请参阅《Apache Airflow 参考指南》中的 Python API 参考

  • 新增:插件导入。不再支持使用 airflow.{operators,sensors,hooks}.<plugin_name> 导入插件中添加的运算符、传感器和挂钩。这些扩展应作为常规 Python 模块导入。在 v2 及更高版本中,推荐的方法是将它们放在 DAG 目录中,然后创建并使用 .airflowignore 文件将它们排除在解析为 DAG 之外。要了解更多信息,请参阅《Apache Airflow 参考指南》中的模块管理创建自定义运算符

自定义插件概述

Apache Airflow 的内置插件管理器只需将文件拖放到 $AIRFLOW_HOME/plugins 文件夹中即可将外部功能集成到其核心中。它允许您使用自定义 Apache Airflow 操作符、挂钩、传感器或接口。下一节提供了本地开发环境中平面和嵌套目录结构的示例,以及生成的 import 语句,这些语句决定了 plugins.zip 中的目录结构。

自定义插件目录和大小限制

在启动期间,Apache Airflow 计划程序工作线程在 Amazon 托管的 Fargate 容器启动时,在 /usr/local/airflow/plugins/* 中为环境查找自定义插件。

  • 目录结构。目录结构(在 /* 中)基于您 plugins.zip 文件的内容。例如,如果 plugins.zip 包含 operators 目录作为顶级目录,则该目录将被解压缩到环境的 /usr/local/airflow/plugins/operators 中。

  • 大小限制。我们建议使用小于 1 GB 的 plugins.zip 文件。plugins.zip 文件大小越大,环境的启动时间就越长。尽管 Amazon MWAA 没有明确限制 plugins.zip 文件的大小,但如果无法在十分钟内安装依赖项,Fargate 服务将超时并尝试将环境回滚到稳定状态。

注意

对于使用 Apache Airflow v1.10.12 或 Apache Airflow v2.0.2 的环境,Amazon MWAA 会限制 Apache Airflow Web 服务器上的出站流量,并且不允许您直接在 Web 服务器上安装插件或 Python 依赖项。从 Apache Airflow v2.2.2 开始,Amazon MWAA 可以直接在 Web 服务器上安装插件和依赖项。

自定义插件示例

下一节使用《Apache Airflow 参考指南》中的示例代码来展示如何构建本地开发环境。

在 plugins.zip 中使用平面目录结构的示例

Apache Airflow v2

以下示例显示了 Apache Airflow v2 中一个采用扁平目录结构的 plugins.zip 文件。

例 带 PythonVirtualenvOperator plugins.zip 的平面目录

以下示例显示在 为 Apache Airflow PythonVirtualenvOperator 创建自定义插件 中 PythonVirtualenvOperator 自定义插件的 plugins.zip 文件的顶级树。

├── virtual_python_plugin.py
例 plugins/virtual_python_plugin.py

以下示例显示了 PythonVirtualenvOperator 自定义插件。

""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow.plugins_manager import AirflowPlugin import airflow.utils.python_virtualenv from typing import List def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]: cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if system_site_packages: cmd.append('--system-site-packages') if python_bin is not None: cmd.append(f'--python={python_bin}') return cmd airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd class VirtualPythonPlugin(AirflowPlugin): name = 'virtual_python_plugin'
Apache Airflow v1

以下示例显示了 Apache Airflow v1 中一个采用扁平目录结构的 plugins.zip 文件。

例 带 PythonVirtualenvOperator plugins.zip 的平面目录

以下示例显示在 为 Apache Airflow PythonVirtualenvOperator 创建自定义插件 中 PythonVirtualenvOperator 自定义插件的 plugins.zip 文件的顶级树。

├── virtual_python_plugin.py
例 plugins/virtual_python_plugin.py

以下示例显示了 PythonVirtualenvOperator 自定义插件。

from airflow.plugins_manager import AirflowPlugin from airflow.operators.python_operator import PythonVirtualenvOperator def _generate_virtualenv_cmd(self, tmp_dir): cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if self.system_site_packages: cmd.append('--system-site-packages') if self.python_version is not None: cmd.append('--python=python{}'.format(self.python_version)) return cmd PythonVirtualenvOperator._generate_virtualenv_cmd=_generate_virtualenv_cmd class EnvVarPlugin(AirflowPlugin): name = 'virtual_python_plugin'

在 plugins.zip 中使用平面目录结构的示例

Apache Airflow v2

以下示例显示了一个 plugins.zip 文件,其中包含 hooksoperators 的单独目录和 Apache Airflow v2 的 sensors 目录。

例 Plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

以下示例显示了使用自定义插件的 DAG(DAG 文件夹)中的导入语句。

例 dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_airflow_operator import MyOperator from sensors.my_airflow_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
例 plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

以下示例显示了自定义插件文件中所需的每条导入语句。

例 hooks/my_airflow_hook.py
from airflow.hooks.base import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
例 sensors/my_airflow_sensor.py
from airflow.sensors.base import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
例 operators/my_airflow_operator.py
from airflow.operators.bash import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_airflow_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
例 operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

按照使用 Amazon MWAA CLI 实用工具测试自定义插件中的步骤进行操作,然后创建 plugins.zip 文件来将内容压缩到 plugins 目录之内。例如,cd plugins

Apache Airflow v1

以下示例显示了一个 plugins.zip 文件,其中包含 hooksoperators 的单独目录和 Apache Airflow v1.10.12 的 sensors 目录。

例 Plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

以下示例显示了使用自定义插件的 DAG(DAG 文件夹)中的导入语句。

例 dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_operator import MyOperator from sensors.my_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
例 plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * from utils.my_utils import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

以下示例显示了自定义插件文件中所需的每条导入语句。

例 hooks/my_airflow_hook.py
from airflow.hooks.base_hook import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
例 sensors/my_airflow_sensor.py
from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
例 operators/my_airflow_operator.py
from airflow.operators.bash_operator import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
例 operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

按照使用 Amazon MWAA CLI 实用工具测试自定义插件中的步骤进行操作,然后创建 plugins.zip 文件来将内容压缩到 plugins 目录之内。例如,cd plugins

创建 plugins.zip 文件

以下步骤描述了我们建议在本地创建 plugins.zip 文件的步骤。

步骤 1:使用 Amazon MWAA CLI 实用工具测试自定义插件

  • 命令行界面(CLI)实用工具可在本地复制 Amazon MWAA 环境。

  • CLI 在本地构建 Docker 容器镜像,类似于 Amazon MWAA 生产镜像。这允许您在部署到 Amazon MWAA 之前运行本地 Apache Airflow 环境来开发和测试 DAG、自定义插件和依赖项。

  • 要运行 CLI,请参阅 GitHub 上的 aws-mwaa-local-runner

步骤 2:创建 plugins.zip 文件

您可以使用内置的 ZIP 存档实用工具或任何其他 ZIP 实用工具(例如 7zip)来创建.zip 文件。

注意

当您创建.zip 文件时,Windows 操作系统的内置 zip 实用工具可能会添加子文件夹。我们建议您验证 plugins.zip 文件的内容,然后再上传到 Amazon S3 存储桶,以确保没有添加其他目录。

  1. 将目录更改为本地 Airflow 插件目录。例如:

    myproject$ cd plugins
  2. 运行以下命令以确保内容具有可执行权限(仅限 macOS 和 Linux)。

    plugins$ chmod -R 755 .
  3. 将内容压缩到 plugins 文件夹之中

    plugins$ zip -r plugins.zip .

上传 plugins.zip 到 Amazon S3

您可以使用 Amazon S3 控制台或 Amazon Command Line Interface(Amazon CLI)将 plugins.zip 文件上传到 Amazon S3 存储桶中。

使用 Amazon CLI

Amazon Command Line Interface(Amazon CLI)是一种开源工具,让您能够在命令行 Shell 中使用命令与 Amazon 服务进行交互。要完成本节中的步骤,您需要以下满足以下条件:

要使用 Amazon CLI 上传,请执行以下操作
  1. 在命令提示符下,导航到存储 plugins.zip 文件的目录。例如:

    cd plugins
  2. 以下示例列出所有 Amazon S3 存储桶。

    aws s3 ls
  3. 使用以下命令列出 Amazon S3 存储桶中适合环境的文件和文件夹。

    aws s3 ls s3://YOUR_S3_BUCKET_NAME
  4. 使用以下命令将 plugins.zip 文件上传到环境的 Amazon S3 存储桶。

    aws s3 cp plugins.zip s3://YOUR_S3_BUCKET_NAME/plugins.zip

使用 Amazon S3 控制台

Amazon S3 控制台是一个基于 Web 的UI ,允许您创建和管理 Amazon S3 桶中的资源。

要使用 Amazon S3 控制台上传,请执行以下操作
  1. 在 Amazon MWAA 控制台上打开环境页面

  2. 选择环境。

  3. S3 中的 DAG 代码窗格中选择 S3 存储桶链接,在 Amazon S3 控制台上打开存储桶。

  4. 请选择 Upload(上传)。

  5. 选择 添加文件

  6. 选择 plugins.zip 的本地副本,选择上传

在环境中安装自定义插件

本节介绍如何安装您上传到 Amazon S3 存储桶的自定义插件,方法是指定 plugins.zip 文件的路径,并在每次更新 zip 文件时指定 plugins.zip 文件的版本。

在 Amazon MWAA 控制台上指定 plugins.zip 的路径(第一次)

如果这是您首次将 plugins.zip 上传到 Amazon S3 存储桶,则还需要在 Amazon MWAA 控制台上指定文件路径。您只需要完成此步骤一次。

  1. 在 Amazon MWAA 控制台上打开环境页面

  2. 选择环境。

  3. 选择编辑

  4. 在 Amazon S3 中的 DAG 代码窗格上,选择插件文件-可选字段旁边的浏览 S3

  5. 选择 Amazon S3 存储桶中的 plugins.zip 文件。

  6. 选择选择

  7. 选择下一步更新环境

在 Amazon MWAA 控制台上指定 plugins.zip 的版本

每次在 Amazon S3 存储桶中上传 plugins.zip 的新版本时,都需要在 Amazon MWAA 控制台上指定 plugins.zip 文件的版本。

  1. 在 Amazon MWAA 控制台上打开环境页面

  2. 选择环境。

  3. 选择编辑

  4. Amazon S3 中的 DAG 代码窗格中,从下拉列表中选择 plugins.zip 的版本。

  5. 选择下一步

plugins.zip 的用例示例

接下来做什么?

  • 使用 GitHub 上的 aws-mwaa-local-runner 在本地测试 DAG、自定义插件和 Python 依赖项。