Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅
中国的 Amazon Web Services 服务入门
(PDF)。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
将 Amazon MWAA 与 Amazon EKS 一起使用
以下示例演示了如何将 Amazon MWAA 与 Amazon EKS 一起使用。
版本
先决条件
要使用本主题中的示例,您需要以下内容:
使用 eksctl
命令时,可以包含 --profile
,以指定默认配置文件以外的配置文件。
为 Amazon 创建公钥 EC2
使用以下命令,以从私有密钥对中创建公有密钥。
ssh-keygen -y -f myprivatekey.pem > mypublickey.pub
要了解更新信息,请参阅检索密钥对的公有密钥。
创建集群
使用以下命令来创建集群。如果您想要为集群自定义名称或在其他区域创建集群,请替换名称和区域值。您必须在与您在创建 Amazon MWAA 环境的同一区域中创建集群。替换子网的值,使其与您用于 Amazon MWAA 的 Amazon VPC 网络中的子网相匹配。替换 ssh-public-key
的值以匹配您使用的密钥。您可以使用来自亚马逊且位于同一地区的现有密钥 EC2 ,也可以在您创建 Amazon MWAA 环境的同一地区创建新密钥。
eksctl create cluster \
--name mwaa-eks \
--region us-west-2 \
--version 1.18 \
--nodegroup-name linux-nodes \
--nodes 3 \
--nodes-min 1 \
--nodes-max 4 \
--with-oidc \
--ssh-access \
--ssh-public-key MyPublicKey
\
--managed \
--vpc-public-subnets "subnet-11111111111111111
, subnet-2222222222222222222
" \
--vpc-private-subnets "subnet-33333333333333333
, subnet-44444444444444444
"
完成集群的创建需要一段时间。完成后,您可以使用以下命令验证集群是否已成功创建并配置了 IAM OIDC 提供商:
eksctl utils associate-iam-oidc-provider \
--region us-west-2 \
--cluster mwaa-eks \
--approve
创建 mwaa
命名空间
确认集群已成功创建后,使用以下命令为 pod 创建命名空间。
kubectl create namespace mwaa
为 mwaa
命名空间创建角色
创建命名空间后,在 EKS 上为可在 MWAA 命名空间中运行 pod 的 Amazon MWAA 用户创建角色和角色绑定。如果您为命名空间使用了不同的名称,请将 -n mwaa
中的 mwaa 名称替换为您使用的名称。
cat << EOF | kubectl apply -f - -n mwaa
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: mwaa-role
rules:
- apiGroups:
- ""
- "apps"
- "batch"
- "extensions"
resources:
- "jobs"
- "pods"
- "pods/attach"
- "pods/exec"
- "pods/log"
- "pods/portforward"
- "secrets"
- "services"
verbs:
- "create"
- "delete"
- "describe"
- "get"
- "list"
- "patch"
- "update"
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: mwaa-role-binding
subjects:
- kind: User
name: mwaa-service
roleRef:
kind: Role
name: mwaa-role
apiGroup: rbac.authorization.k8s.io
EOF
运行以下命令来确认新角色可以访问 Amazon EKS 集群。如果您没有使用以下名称,请务必使用正确的名称mwaa
:
kubectl get pods -n mwaa
--as mwaa-service
您还会看到写有如下内容的一条消息:
No resources found in mwaa namespace.
创建并附加 Amazon EKS 集群的 IAM 角色
您必须创建一个 IAM 角色,然后将其绑定到 Amazon EKS(k8s)集群,这样该角色才能通过 IAM 进行身份验证。该角色仅用于登录集群,没有任何控制台或 API 调用的权限。
使用 Amazon MWAA 执行角色 中的步骤为 Amazon MWAA 环境创建新角色。但是,与其创建和附加该主题中描述的策略,不如附加以下策略:
- JSON
-
-
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "airflow:PublishMetrics",
"Resource": "arn:aws:airflow:us-east-1
:111122223333
:environment/${MWAA_ENV_NAME}"
},
{
"Effect": "Deny",
"Action": "s3:ListAllMyBuckets",
"Resource": [
"arn:aws:s3:::{MWAA_S3_BUCKET}",
"arn:aws:s3:::{MWAA_S3_BUCKET}/*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject*",
"s3:GetBucket*",
"s3:List*"
],
"Resource": [
"arn:aws:s3:::{MWAA_S3_BUCKET}",
"arn:aws:s3:::{MWAA_S3_BUCKET}/*"
]
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:CreateLogGroup",
"logs:PutLogEvents",
"logs:GetLogEvents",
"logs:GetLogRecord",
"logs:GetLogGroupFields",
"logs:GetQueryResults",
"logs:DescribeLogGroups"
],
"Resource": [
"arn:aws:logs:us-east-1
:111122223333
:log-group:airflow-${MWAA_ENV_NAME}-*"
]
},
{
"Effect": "Allow",
"Action": "cloudwatch:PutMetricData",
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"sqs:ChangeMessageVisibility",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:GetQueueUrl",
"sqs:ReceiveMessage",
"sqs:SendMessage"
],
"Resource": "arn:aws:sqs:us-east-1
:*:airflow-celery-*"
},
{
"Effect": "Allow",
"Action": [
"kms:Decrypt",
"kms:DescribeKey",
"kms:GenerateDataKey*",
"kms:Encrypt"
],
"NotResource": "arn:aws:kms:*:111122223333
:key/*",
"Condition": {
"StringLike": {
"kms:ViaService": [
"sqs.us-east-1
.amazonaws.com"
]
}
}
},
{
"Effect": "Allow",
"Action": [
"eks:DescribeCluster"
],
"Resource": "arn:aws:eks:us-east-1
:111122223333
:cluster/${EKS_CLUSTER_NAME}"
}
]
}
创建角色后,编辑 Amazon MWAA 环境,使用您创建的角色作为环境的执行角色。要更改角色,请编辑要使用的环境。您可以在“权限”下选择执行角色。
已知问题:
-
角色 ARNs 存在一个已知问题,子路径无法通过 Amazon EKS 进行身份验证。解决方法是手动创建服务角色,而不是使用 Amazon MWAA 自己创建的服务角色。要了解更多信息,请参阅在 aws-auth ConfigMap 中当 ARN 包含路径时,带有路径的角色不起作用
-
如果 Amazon MWAA 服务列表在 IAM 中不可用,则需要选择其他服务策略,例如亚马逊 EC2,然后更新该角色的信任策略以匹配以下内容:
- JSON
-
-
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": [
"airflow-env.amazonaws.com",
"airflow.amazonaws.com"
]
},
"Action": "sts:AssumeRole"
}
]
}
要了解更多信息,请参阅如何在 IAM 角色中使用信任策略。
创建 requirements.txt 文件
要使用本节中的示例代码,请确保已向 requirements.txt
中添加了以下数据库选项之一。要了解更多信息,请参阅安装 Python 依赖项。
- Apache Airflow v2
-
kubernetes
apache-airflow[cncf.kubernetes]==3.0.0
- Apache Airflow v1
-
awscli
kubernetes==12.0.1
为 Amazon EKS 创建身份映射
使用您在以下命令中创建的角色的 ARN 为 Amazon EKS 创建身份映射。将区域us-east-1
更改为创建环境的区域。替换角色的 ARN,最后替换mwaa-execution-role
为环境的执行角色。
eksctl create iamidentitymapping \
--region us-east-1
\
--cluster mwaa-eks \
--arn arn:aws:iam::123456789012
:role/mwaa-execution-role
\
--username mwaa-service
创建 kubeconfig
使用以下命令创建 kubeconfig
:
aws eks update-kubeconfig \
--region us-west-2 \
--kubeconfig ./kube_config.yaml \
--name mwaa-eks \
--alias aws
如果您在运行 update-kubeconfig
时使用了特定的配置文件,则需要删除添加到 kube_config.yaml 文件中的 env:
部分,这样它才能在 Amazon MWAA 中正常运行。为此,请从文件中删除以下内容,然后将其保存:
env:
- name: AWS_PROFILE
value: profile_name
创建 DAG
使用以下代码示例创建 Python 文件,例如 DAG 的 mwaa_pod_example.py
文件。
- Apache Airflow v2
-
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow import DAG
from datetime import datetime
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
default_args = {
'owner': 'aws',
'depends_on_past': False,
'start_date': datetime(2019, 2, 20),
'provide_context': True
}
dag = DAG(
'kubernetes_pod_example', default_args=default_args, schedule_interval=None)
#use a kube_config stored in s3 dags folder for now
kube_config_path = '/usr/local/airflow/dags/kube_config.yaml'
podRun = KubernetesPodOperator(
namespace="mwaa",
image="ubuntu:18.04",
cmds=["bash"],
arguments=["-c", "ls"],
labels={"foo": "bar"},
name="mwaa-pod-test",
task_id="pod-task",
get_logs=True,
dag=dag,
is_delete_operator_pod=False,
config_file=kube_config_path,
in_cluster=False,
cluster_context='aws'
)
- Apache Airflow v1
-
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow import DAG
from datetime import datetime
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
default_args = {
'owner': 'aws',
'depends_on_past': False,
'start_date': datetime(2019, 2, 20),
'provide_context': True
}
dag = DAG(
'kubernetes_pod_example', default_args=default_args, schedule_interval=None)
#use a kube_config stored in s3 dags folder for now
kube_config_path = '/usr/local/airflow/dags/kube_config.yaml'
podRun = KubernetesPodOperator(
namespace="mwaa",
image="ubuntu:18.04",
cmds=["bash"],
arguments=["-c", "ls"],
labels={"foo": "bar"},
name="mwaa-pod-test",
task_id="pod-task",
get_logs=True,
dag=dag,
is_delete_operator_pod=False,
config_file=kube_config_path,
in_cluster=False,
cluster_context='aws'
)
将 DAG 和 kube_config.yaml
添加到 Amazon S3 存储桶中
将您创建的 DAG 和 kube_config.yaml
文件放入 Amazon MWAA 环境的 Amazon S3 存储桶中。您可以使用 Amazon S3 控制台或 Amazon Command Line Interface将所有文件放入存储桶中。
启用并触发示例
在 Apache Airflow 中,启用该示例,然后将其触发。
成功运行并完成后,使用以下命令验证 Pod:
kubectl get pods -n mwaa
您应该可以看到类似于如下所示的输出内容:
NAME READY STATUS RESTARTS AGE
mwaa-pod-test-aa11bb22cc3344445555666677778888 0/1 Completed 0 2m23s
然后,您可以使用以下命令验证 Pod 的输出。请将名称值替换为上一个命令返回的值:
kubectl logs -n mwaa mwaa-pod-test-aa11bb22cc3344445555666677778888