安装自定义插件 - Amazon Managed Workflows for Apache Airflow
Amazon MWAA 支持 Apache Airflow 的内置插件管理器,允许您使用自定义 Apache Airflow 运算符、挂钩、传感器或接口。本页介绍使用文件在您的亚马逊MWAA环境中安装 Apache Airflow 自定义插件plugins.zip步骤。





  1. 在本地创建 plugins.zip 文件。

  2. plugins.zip 文件上传到 Amazon S3 中的存储桶。

  3. 在 Amazon MWAA 控制台的 “插件文件” 字段中指定此文件的版本。


如果这是您首次将上传plugins.zip到 Amazon S3 存储桶,则还需要在亚马逊MWAA控制台上指定文件路径。您只需要完成此步骤一次。


正如 Apache Airflow 文档中所述,只有扩展 Apache Airflow 用户界面时才需要插件。自定义运算符可以直接放在DAG代码旁边的/dags文件夹中。

如果您需要创建自己的与外部系统的集成,请将其放在/ dags 文件夹或其中的子文件夹中,而不是放在该plugins.zip文件夹中。在 Apache Airflow 2.x 中,插件主要用于扩展用户界面。

同样,不应将其他依赖项置于其中plugins.zip。相反,它们可以存储在 Amazon S3 /dags 文件夹下的某个位置,在 Apache Airflow 启动之前,它们将在那里同步到每个亚马逊MWAA容器。


/dags文件夹中plugins.zip或其中未明确定义 Apache Airflow DAG 对象的任何文件都必须列在文件中。.airflowignore


Apache Airflow 的内置插件管理器只需将文件拖放到 $AIRFLOW_HOME/plugins 文件夹中即可将外部功能集成到其核心中。它允许您使用自定义 Apache Airflow 操作符、挂钩、传感器或接口。下一节提供了本地开发环境中平面和嵌套目录结构的示例,以及生成的 import 语句,这些语句决定了 plugins.zip 中的目录结构。


在启动期间,Apache Airflow SchedulerW orkers 会在您的环境的托管的 Amazon Fargate 容器上查找自定义插件,网址为。/usr/local/airflow/plugins/*

  • 目录结构。目录结构(在 /* 中)基于您 plugins.zip 文件的内容。例如,如果 plugins.zip 包含 operators 目录作为顶级目录,则该目录将被解压缩到环境的 /usr/local/airflow/plugins/operators 中。

  • 大小限制。我们建议使用小于 1 GB 的 plugins.zip 文件。plugins.zip 文件大小越大,环境的启动时间就越长。尽管 Amazon MWAA 没有明确限制plugins.zip文件的大小,但如果无法在十分钟内安装依赖项,Fargate 服务将超时并尝试将环境回滚到稳定状态。


对于使用 Apache Airflow v1.10.12 或 Apache Airflow v2.0.2 的环境,MWAA亚马逊会限制 Apache Airflow 网络服务器上的出站流量,并且不允许您直接在 Web 服务器上安装插件或 Python 依赖项。从 Apache Airflow v2.2.2 开始,MWAA亚马逊可以直接在网络服务器上安装插件和依赖项。


下一节使用《Apache Airflow 参考指南》中的示例代码来展示如何构建本地开发环境。

在 plugins.zip 中使用平面目录结构的示例

Apache Airflow v2

以下示例显示了 Apache Airflow v2 中一个采用扁平目录结构的 plugins.zip 文件。

例 带有 PythonVirtualenvOperator plugins.zip 的平面目录

以下示例显示了中 PythonVirtualenvOperator 自定义插件的 plugins.zip 文件的顶级树为 Apache Airflow 创建自定义插件 PythonVirtualenvOperator

├── virtual_python_plugin.py
例 plugins/virtual_python_plugin.py

以下示例显示了 PythonVirtualenvOperator 自定义插件。

""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow.plugins_manager import AirflowPlugin import airflow.utils.python_virtualenv from typing import List def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]: cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if system_site_packages: cmd.append('--system-site-packages') if python_bin is not None: cmd.append(f'--python={python_bin}') return cmd airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd class VirtualPythonPlugin(AirflowPlugin): name = 'virtual_python_plugin'
Apache Airflow v1

以下示例显示了 Apache Airflow v1 中一个采用扁平目录结构的 plugins.zip 文件。

例 带有 PythonVirtualenvOperator plugins.zip 的平面目录

以下示例显示了中 PythonVirtualenvOperator 自定义插件的 plugins.zip 文件的顶级树为 Apache Airflow 创建自定义插件 PythonVirtualenvOperator

├── virtual_python_plugin.py
例 plugins/virtual_python_plugin.py

以下示例显示了 PythonVirtualenvOperator 自定义插件。

from airflow.plugins_manager import AirflowPlugin from airflow.operators.python_operator import PythonVirtualenvOperator def _generate_virtualenv_cmd(self, tmp_dir): cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if self.system_site_packages: cmd.append('--system-site-packages') if self.python_version is not None: cmd.append('--python=python{}'.format(self.python_version)) return cmd PythonVirtualenvOperator._generate_virtualenv_cmd=_generate_virtualenv_cmd class EnvVarPlugin(AirflowPlugin): name = 'virtual_python_plugin'

在 plugins.zip 中使用平面目录结构的示例

Apache Airflow v2

以下示例显示了一个 plugins.zip 文件,其中包含 hooksoperators 的单独目录和 Apache Airflow v2 的 sensors 目录。

例 Plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py


例 dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_airflow_operator import MyOperator from sensors.my_airflow_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
例 plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]


例 hooks/my_airflow_hook.py
from airflow.hooks.base import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
例 sensors/my_airflow_sensor.py
from airflow.sensors.base import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
例 operators/my_airflow_operator.py
from airflow.operators.bash import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_airflow_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
例 operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

按照使用 Amazon MWAA CLI 实用程序测试自定义插件中的步骤进行操作,然后创建 plugins.zip 文件以压缩plugins目录中的内容。例如,cd plugins

Apache Airflow v1

以下示例显示了一个 plugins.zip 文件,其中包含 hooksoperators 的单独目录和 Apache Airflow v1.10.12 的 sensors 目录。

例 Plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py


例 dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_operator import MyOperator from sensors.my_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
例 plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * from utils.my_utils import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]


例 hooks/my_airflow_hook.py
from airflow.hooks.base_hook import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
例 sensors/my_airflow_sensor.py
from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
例 operators/my_airflow_operator.py
from airflow.operators.bash_operator import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
例 operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

按照使用 Amazon MWAA CLI 实用程序测试自定义插件中的步骤进行操作,然后创建 plugins.zip 文件以压缩plugins目录中的内容。例如,cd plugins

创建 plugins.zip 文件

以下步骤描述了我们建议在本地创建 plugins.zip 文件的步骤。

第一步:使用 Amazon MWAA CLI 实用工具测试自定义插件

  • 命令行界面 (CLI) 实用程序在本地复制适用于 Apache Airflow 的亚马逊托管工作流程。

  • 在本地CLI构建 Docker 容器镜像,该镜像类似于亚马逊MWAA生产镜像。这允许您在部署到亚马逊之前运行本地 Apache Airflow 环境来开发和测试DAGs自定义插件和依赖项。MWAA

  • 要运行CLI,请参阅 aws-mwaa-local-runneron GitHub。

步骤 2:创建 plugins.zip 文件

您可以使用内置的ZIP存档实用程序或任何其他ZIP实用程序(例如 7zip)来创建.zip 文件。


当您创建.zip 文件时,Windows 操作系统的内置 zip 实用工具可能会添加子文件夹。我们建议您验证 plugins.zip 文件的内容,然后再上传到 Amazon S3 存储桶,以确保没有添加其他目录。

  1. 将目录更改为本地 Airflow 插件目录。例如:

    myproject$ cd plugins
  2. 运行以下命令以确保内容具有可执行权限(仅限 macOS 和 Linux)。

    plugins$ chmod -R 755 .
  3. 将内容压缩到 plugins 文件夹中

    plugins$ zip -r plugins.zip .

上传 plugins.zip 到 Amazon S3

您可以使用 Amazon S3 控制台或 Amazon Command Line Interface (Amazon CLI) 将plugins.zip文件上传到您的 Amazon S3 存储桶。

使用 Amazon CLI

Amazon Command Line Interface (Amazon CLI) 是一个开源工具,可让您使用命令行 shell 中的命令与 Amazon 服务进行交互。要完成本节中的步骤,您需要以下满足以下条件:

要使用上传 Amazon CLI
  1. 在命令提示符下,导航到存储 plugins.zip 文件的目录。例如:

    cd plugins
  2. 以下示例列出所有 Amazon S3 存储桶。

    aws s3 ls
  3. 使用以下命令列出 Amazon S3 存储桶中适合环境的文件和文件夹。

    aws s3 ls s3://YOUR_S3_BUCKET_NAME
  4. 使用以下命令将 plugins.zip 文件上传到环境的 Amazon S3 存储桶。

    aws s3 cp plugins.zip s3://YOUR_S3_BUCKET_NAME/plugins.zip

使用 Amazon S3 控制台

Amazon S3 控制台是一个基于 Web 的UI ,允许您创建和管理 Amazon S3 桶中的资源。

要使用 Amazon S3 控制台上传,请执行以下操作
  1. 在 Amazon MWAA 控制台上打开 “环境” 页面

  2. 选择环境。

  3. S3 窗格的DAG代码中选择 S3 存储桶链接,在 Amazon S3 控制台上打开您的存储桶。

  4. 选择上传

  5. 选择 添加文件

  6. 选择 plugins.zip 的本地副本,选择上传


本节介绍如何安装您上传到 Amazon S3 存储桶的自定义插件,方法是指定 plugins.zip 文件的路径,并在每次更新 zip 文件时指定 plugins.zip 文件的版本。

在 Amazon MWAA 控制台plugins.zip上指定路径(第一次)

如果这是您首次将上传plugins.zip到 Amazon S3 存储桶,则还需要在亚马逊MWAA控制台上指定文件路径。您只需要完成此步骤一次。

  1. 在 Amazon MWAA 控制台上打开 “环境” 页面

  2. 选择环境。

  3. 选择编辑

  4. 在 Amazon S3 窗格的DAG代码中,选择插件文件-可选字段旁边的浏览 S3

  5. 选择 Amazon S3 存储桶中的 plugins.zip 文件。

  6. 选择选择

  7. 选择下一步更新环境

在 Amazon MWAA 控制台上指定plugins.zip版本

每次在 Amazon S3 存储桶中上传新版本时,都需要plugins.zip在亚马逊MWAA控制台上指定plugins.zip文件的版本。

  1. 在 Amazon MWAA 控制台上打开 “环境” 页面

  2. 选择环境。

  3. 选择编辑

  4. 在 Amazon S3 窗格的DAG代码中,从下拉列表中选择一个plugins.zip版本。

  5. 选择下一步

plugins.zip 的用例示例


