将 Amazon MWAA 与 Amazon EKS 一起使用 - Amazon Managed Workflows for Apache Airflow
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将 Amazon MWAA 与 Amazon EKS 一起使用

以下示例演示了如何将 Amazon MWAA 与 Amazon EKS 一起使用。

版本

  • 本页上的示例代码可与 Python 3.7 中的 Apache Airflow v1一起使用。

  • 您可以将本页上的代码示例与 Python 3.10 中的 Apache Airflow v2 及更高版本一起使用。

先决条件

要使用本主题中的示例,您需要以下内容:

注意

使用 eksctl 命令时,可以包含 --profile,以指定默认配置文件以外的配置文件。

创建 Amazon EC2 公有密钥

使用以下命令,以从私有密钥对中创建公有密钥。

ssh-keygen -y -f myprivatekey.pem > mypublickey.pub

要了解更新信息,请参阅检索密钥对的公有密钥

创建集群

使用以下命令来创建集群。如果您想要为集群自定义名称或在其他区域创建集群,请替换名称和区域值。您必须在与您在创建 Amazon MWAA 环境的同一区域中创建集群。替换子网的值,使其与您用于 Amazon MWAA 的 Amazon VPC 网络中的子网相匹配。替换 ssh-public-key 的值以匹配您使用的密钥。您可以使用位于同一区域的 Amazon EC2 中的现有密钥,也可以在创建 Amazon MWAA 环境的同一区域创建新密钥。

eksctl create cluster \ --name mwaa-eks \ --region us-west-2 \ --version 1.18 \ --nodegroup-name linux-nodes \ --nodes 3 \ --nodes-min 1 \ --nodes-max 4 \ --with-oidc \ --ssh-access \ --ssh-public-key MyPublicKey \ --managed \ --vpc-public-subnets "subnet-11111111111111111, subnet-2222222222222222222" \ --vpc-private-subnets "subnet-33333333333333333, subnet-44444444444444444"

完成集群的创建需要一段时间。完成后,您可以使用以下命令验证集群是否已成功创建并配置了 IAM OIDC 提供商:

eksctl utils associate-iam-oidc-provider \ --region us-west-2 \ --cluster mwaa-eks \ --approve

创建 mwaa 命名空间

确认集群已成功创建后,使用以下命令为 pod 创建命名空间。

kubectl create namespace mwaa

mwaa 命名空间创建角色

创建命名空间后,在 EKS 上为可在 MWAA 命名空间中运行 pod 的 Amazon MWAA 用户创建角色和角色绑定。如果您为命名空间使用了不同的名称,请将 -n mwaa 中的 mwaa 名称替换为您使用的名称。

cat << EOF | kubectl apply -f - -n mwaa kind: Role apiVersion: rbac.authorization.k8s.io/v1 metadata: name: mwaa-role rules: - apiGroups: - "" - "apps" - "batch" - "extensions" resources: - "jobs" - "pods" - "pods/attach" - "pods/exec" - "pods/log" - "pods/portforward" - "secrets" - "services" verbs: - "create" - "delete" - "describe" - "get" - "list" - "patch" - "update" --- kind: RoleBinding apiVersion: rbac.authorization.k8s.io/v1 metadata: name: mwaa-role-binding subjects: - kind: User name: mwaa-service roleRef: kind: Role name: mwaa-role apiGroup: rbac.authorization.k8s.io EOF

运行以下命令来确认新角色可以访问 Amazon EKS 集群。如果您没有使用 mwaa,请务必使用正确的名称:

kubectl get pods -n mwaa --as mwaa-service

您还会看到写有如下内容的一条消息:

No resources found in mwaa namespace.

创建并附加 Amazon EKS 集群的 IAM 角色

您必须创建一个 IAM 角色,然后将其绑定到 Amazon EKS(k8s)集群,这样该角色才能通过 IAM 进行身份验证。该角色仅用于登录集群,没有任何控制台或 API 调用的权限。

使用 Amazon MWAA 执行角色 中的步骤为 Amazon MWAA 环境创建新角色。但是,与其创建和附加该主题中描述的策略,不如附加以下策略:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "airflow:PublishMetrics", "Resource": "arn:aws:airflow:${MWAA_REGION}:${ACCOUNT_NUMBER}:environment/${MWAA_ENV_NAME}" }, { "Effect": "Deny", "Action": "s3:ListAllMyBuckets", "Resource": [ "arn:aws:s3:::{MWAA_S3_BUCKET}", "arn:aws:s3:::{MWAA_S3_BUCKET}/*" ] }, { "Effect": "Allow", "Action": [ "s3:GetObject*", "s3:GetBucket*", "s3:List*" ], "Resource": [ "arn:aws:s3:::{MWAA_S3_BUCKET}", "arn:aws:s3:::{MWAA_S3_BUCKET}/*" ] }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:CreateLogGroup", "logs:PutLogEvents", "logs:GetLogEvents", "logs:GetLogRecord", "logs:GetLogGroupFields", "logs:GetQueryResults", "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:${MWAA_REGION}:${ACCOUNT_NUMBER}:log-group:airflow-${MWAA_ENV_NAME}-*" ] }, { "Effect": "Allow", "Action": "cloudwatch:PutMetricData", "Resource": "*" }, { "Effect": "Allow", "Action": [ "sqs:ChangeMessageVisibility", "sqs:DeleteMessage", "sqs:GetQueueAttributes", "sqs:GetQueueUrl", "sqs:ReceiveMessage", "sqs:SendMessage" ], "Resource": "arn:aws:sqs:${MWAA_REGION}:*:airflow-celery-*" }, { "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:DescribeKey", "kms:GenerateDataKey*", "kms:Encrypt" ], "NotResource": "arn:aws:kms:*:${ACCOUNT_NUMBER}:key/*", "Condition": { "StringLike": { "kms:ViaService": [ "sqs.${MWAA_REGION}.amazonaws.com" ] } } }, { "Effect": "Allow", "Action": [ "eks:DescribeCluster" ], "Resource": "arn:aws:eks:${MWAA_REGION}:${ACCOUNT_NUMBER}:cluster/${EKS_CLUSTER_NAME}" } ] }

创建角色后,编辑 Amazon MWAA 环境,使用您创建的角色作为环境的执行角色。要更改角色,请编辑要使用的环境。您可以在“权限”下选择执行角色。

已知问题:

  • 角色 ARN 存在一个已知问题,子路径无法通过 Amazon EKS 进行身份验证。解决方法是手动创建服务角色,而不是使用 Amazon MWAA 自己创建的服务角色。要了解更多信息,请参阅在 aws-auth ConfigMap 中当 ARN 包含路径时,带有路径的角色不起作用

  • 如果 IAM 中没有 Amazon MWAA 服务列表,则需要选择备用服务策略,例如 Amazon EC2,然后更新该角色的信任策略以匹配以下内容:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "airflow-env.amazonaws.com", "airflow.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }

    要了解更多信息,请参阅如何在 IAM 角色中使用信任策略

创建 requirements.txt 文件

要使用本节中的示例代码,请确保已向 requirements.txt 中添加了以下数据库选项之一。要了解更多信息,请参阅 安装 Python 依赖项

Apache Airflow v2
kubernetes apache-airflow[cncf.kubernetes]==3.0.0
Apache Airflow v1
awscli kubernetes==12.0.1

为 Amazon EKS 创建身份映射

使用您在以下命令中创建的角色的 ARN 为 Amazon EKS 创建身份映射。将 your-region 中的区域更改为您在其中创建环境的区域。替换角色的 ARN,最后,将 mwaa-execution-role 替换为环境的执行角色。

eksctl create iamidentitymapping \ --region your-region \ --cluster mwaa-eks \ --arn arn:aws:iam::111222333444:role/mwaa-execution-role \ --username mwaa-service

创建 kubeconfig

使用以下命令创建 kubeconfig

aws eks update-kubeconfig \ --region us-west-2 \ --kubeconfig ./kube_config.yaml \ --name mwaa-eks \ --alias aws

如果您在运行 update-kubeconfig 时使用了特定的配置文件,则需要删除添加到 kube_config.yaml 文件中的 env: 部分,这样它才能在 Amazon MWAA 中正常运行。为此,请从文件中删除以下内容,然后将其保存:

env: - name: AWS_PROFILE value: profile_name

创建 DAG

使用以下代码示例创建 Python 文件,例如 DAG 的 mwaa_pod_example.py 文件。

Apache Airflow v2
""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow import DAG from datetime import datetime from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator default_args = { 'owner': 'aws', 'depends_on_past': False, 'start_date': datetime(2019, 2, 20), 'provide_context': True } dag = DAG( 'kubernetes_pod_example', default_args=default_args, schedule_interval=None) #use a kube_config stored in s3 dags folder for now kube_config_path = '/usr/local/airflow/dags/kube_config.yaml' podRun = KubernetesPodOperator( namespace="mwaa", image="ubuntu:18.04", cmds=["bash"], arguments=["-c", "ls"], labels={"foo": "bar"}, name="mwaa-pod-test", task_id="pod-task", get_logs=True, dag=dag, is_delete_operator_pod=False, config_file=kube_config_path, in_cluster=False, cluster_context='aws' )
Apache Airflow v1
""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow import DAG from datetime import datetime from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator default_args = { 'owner': 'aws', 'depends_on_past': False, 'start_date': datetime(2019, 2, 20), 'provide_context': True } dag = DAG( 'kubernetes_pod_example', default_args=default_args, schedule_interval=None) #use a kube_config stored in s3 dags folder for now kube_config_path = '/usr/local/airflow/dags/kube_config.yaml' podRun = KubernetesPodOperator( namespace="mwaa", image="ubuntu:18.04", cmds=["bash"], arguments=["-c", "ls"], labels={"foo": "bar"}, name="mwaa-pod-test", task_id="pod-task", get_logs=True, dag=dag, is_delete_operator_pod=False, config_file=kube_config_path, in_cluster=False, cluster_context='aws' )

将 DAG 和 kube_config.yaml 添加到 Amazon S3 存储桶中

将您创建的 DAG 和 kube_config.yaml 文件放入 Amazon MWAA 环境的 Amazon S3 存储桶中。您可以使用 Amazon S3 控制台或 Amazon Command Line Interface 将所有文件放入存储桶中。

启用并触发示例

在 Apache Airflow 中,启用该示例,然后将其触发。

成功运行并完成后,使用以下命令验证 Pod:

kubectl get pods -n mwaa

您应该可以看到类似于如下所示的输出内容:

NAME READY STATUS RESTARTS AGE mwaa-pod-test-aa11bb22cc3344445555666677778888 0/1 Completed 0 2m23s

然后,您可以使用以下命令验证 Pod 的输出。请将名称值替换为上一个命令返回的值:

kubectl logs -n mwaa mwaa-pod-test-aa11bb22cc3344445555666677778888