使用 SSHOperator 创建 SSH 连接 - Amazon Managed Workflows for Apache Airflow
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 SSHOperator 创建 SSH 连接

以下示例介绍如何使用有向无环图(DAG)中的 SSHOperator 从 Amazon MWAA 环境连接到远程 Amazon EC2 实例。您可以使用类似的方法连接到任何具有 SSH 访问权限的远程实例。

在以下示例中,您将 SSH 密钥 (.pem) 上传到在 Amazon S3 上的环境的 dags 目录中。然后,您可以使用 requirements.txt 安装必要的依赖项,并在 UI 中创建新的 Apache Airflow 连接。最后,您编写一个 DAG 来创建与远程实例的 SSH 连接。

版本

  • 您可以将本页上的代码示例与 Python 3.10 中的 Apache Airflow v2 及更高版本一起使用。

先决条件

要使用本页上的示例代码,您需要以下内容:

  • Amazon MWAA 环境

  • SSH 密钥。该代码示例假设您有 Amazon EC2 实例,并且与 Amazon MWAA 环境位于同一区域的 .pem。如果您没有密钥,请参阅 Amazon EC2 用户指南中的创建或导入密钥对

权限

  • 无需其他权限即可使用本页上的代码示例。

要求

将以下参数添加到 requirements.txt,以将 apache-airflow-providers-ssh 程序包安装到 Web 服务器上。当环境更新并且 Amazon MWAA 成功安装依赖项后,您将在 UI 中看到新的 SSH 连接类型。

-c https://raw.githubusercontent.com/apache/airflow/constraints-Airflow-version/constraints-Python-version.txt apache-airflow-providers-ssh
注意

-c 定义了 requirements.txt 中的约束 URL。这样可以确保 Amazon MWAA 为环境安装了正确的程序包版本。

将密钥复制到 Amazon S3

使用以下 Amazon Command Line Interface 命令将您的.pem密钥复制到 Amazon S3 中的环境dags目录中。

$ aws s3 cp your-secret-key.pem s3://your-bucket/dags/

Amazon MWAA 将包括 .pem 密钥在内的 dags 中的内容复制到本地 /usr/local/airflow/dags/ 目录,这样,Apache Airflow 就可以访问密钥了。

创建新的 Apache Airflow 连接

使用 Apache Airflow UI 创建新的 SSH 连接
  1. 在 Amazon MWAA 控制台上打开环境页面

  2. 从环境列表中,为环境选择打开 Airflow UI

  3. 在 Apache Airflow UI 页面上,从顶部导航栏中选择管理员以展开下拉列表,然后选择连接

  4. 列出连接页面上,选择 +添加新记录按钮以添加新连接。

  5. 连接到 AD 页面上,提供以下信息:

    1. 连接名称中,输入 ssh_new

    2. 连接类型中,从下拉列表中选择 SSH

      注意

      如果列表中没有 SSH 连接类型,则表示 Amazon MWAA 尚未安装所需的 apache-airflow-providers-ssh 程序包。请更新 requirements.txt 文件以包含此程序包,然后重试。

    3. 主机中,请输入要连接的 Amazon EC2 实例的 IP 地址。例如,12.345.67.89

    4. 用户名中,请输入 ec2-user,以检查您是否正在连接到 Amazon EC2 实例。用户名可能会有所不同,具体取决于您希望 Apache Airflow 连接到的远程实例的类型。

    5. 附加中,请以 JSON 格式输入以下键/值对:

      { "key_file": "/usr/local/airflow/dags/your-secret-key.pem" }

      此键值对指示 Apache Airflow 在本地 /dags 目录中查找密钥。

代码示例

以下 DAG 使用 SSHOperator 连接到目标 Amazon EC2 实例,然后运行 hostname Linux 命令来打印该实例的名称。您可以修改 DAG 以在远程实例上运行任何命令或脚本。

  1. 打开终端,导航到存储 DAG 代码的目录。例如:

    cd dags
  2. 复制以下代码示例的内容,并在本地另存为 ssh.py

    from airflow.decorators import dag from datetime import datetime from airflow.providers.ssh.operators.ssh import SSHOperator @dag( dag_id="ssh_operator_example", schedule_interval=None, start_date=datetime(2022, 1, 1), catchup=False, ) def ssh_dag(): task_1=SSHOperator( task_id="ssh_task", ssh_conn_id='ssh_new', command='hostname', ) my_ssh_dag = ssh_dag()
  3. 运行以下 Amazon CLI 命令将 DAG 复制到环境的存储桶,然后使用 Apache Airflow 用户界面触发 DAG。

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. 如果成功,您将在 ssh_operator_example DAG 的任务日志中看到输出,类似于 ssh_task 的任务日志中的以下内容:

    [2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None,
    Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/your-secret-key.pem'}
    [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks
    [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks
    [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4)
    [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful!
    [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname
    [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal
    [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916