本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
Amazon MWAA 的快速入门教程
本快速入门教程使用一个 Amazon CloudFormation 模板来同时创建亚马逊VPC基础设施、一个带有dags
文件夹的 Amazon S3 存储桶和适用于 Apache Airflow 的亚马逊托管工作流程。
主题
在本教程中:
本教程将引导你完成三个 Amazon Command Line Interface (Amazon CLI) 命令,用于将上传DAG到 Amazon S3、DAG在 Apache Airflow 中运行,以及查看登录信息。 CloudWatch最后,它将引导您完成为 Apache Airflow 开发团队创建IAM策略的步骤。
注意
此页面上的 Amazon CloudFormation 模板为最新版本的 Apache Airflow 创建了适用于 Apache Airflow 的亚马逊托管工作流程环境。 Amazon CloudFormation可用的最新版本是 Apache Airflow v2.9.2。
此页面上的 Amazon CloudFormation 模板创建了以下内容:
-
VPC基础设施。模板使用 通过互联网进行公共路由。它为
WebserverAccessMode: PUBLIC_ONLY
中的 Apache Airflow Web 服务器使用 公有网络访问模式。 -
Amazon S3 桶。模板将创建带有
dags
文件夹的 Amazon S3 存储桶。将其配置为在启用存储桶版本控制的情况下阻止所有公共访问,如 为 Amazon MWAA 创建 Amazon S3 存储桶。 中所定义。 -
亚马逊MWAA环境。该模板创建了一个与 Amazon S3 存储桶上的
dags
文件夹关联的 Amazon MWAA 环境、一个有权访问亚马逊MWAA所用 Amazon 服务的执行角色以及使用Amazon 自有密钥进行加密的默认角色,如中所定义创建亚马逊MWAA环境。 -
CloudWatch 日志。该模板启用 Apache Airflow 在 CloudWatch “INFO” 及以上级别登录 Apache Airflow 调度程序日志组、Airflow Web 服务器日志组、Airflow 工作日志组、Airf lo w DAG 处理日志组和 Airflo w 任务日志组,如中所定义。在 Amazon 中查看气流日志 CloudWatch
在本教程中,您将完成以下任务:
-
上传并运行 DAG. 将 Apache Airflow 关于亚马逊MWAA支持DAG的最新 Apache Airflow 版本的教程上传到亚马逊 S3,然后在 Apache Airflow 用户界面中运行,如中所定义。添加或更新 DAG
-
查看日志。在 “日志” 中查看 Airflow Web 服务器 CloudWatch 日志组,如中所定义。在 Amazon 中查看气流日志 CloudWatch
-
创建访问控制策略。在中IAM为您的 Apache Airflow 开发团队创建访问控制策略,如中所定义。访问亚马逊MWAA环境
先决条件
Amazon Command Line Interface (Amazon CLI) 是一个开源工具,可让您使用命令行 shell 中的命令与 Amazon 服务进行交互。要完成本节中的步骤,您需要以下满足以下条件:
第一步:将 Amazon CloudFormation 模板保存到本地
-
复制以下模板的内容并将其作为
mwaa-public-network.yml
保存在本地中。您也可以使用下载模板。AWSTemplateFormatVersion: "2010-09-09" Parameters: EnvironmentName: Description: An environment name that is prefixed to resource names Type: String Default: MWAAEnvironment VpcCIDR: Description: The IP range (CIDR notation) for this VPC Type: String Default: 10.192.0.0/16 PublicSubnet1CIDR: Description: The IP range (CIDR notation) for the public subnet in the first Availability Zone Type: String Default: 10.192.10.0/24 PublicSubnet2CIDR: Description: The IP range (CIDR notation) for the public subnet in the second Availability Zone Type: String Default: 10.192.11.0/24 PrivateSubnet1CIDR: Description: The IP range (CIDR notation) for the private subnet in the first Availability Zone Type: String Default: 10.192.20.0/24 PrivateSubnet2CIDR: Description: The IP range (CIDR notation) for the private subnet in the second Availability Zone Type: String Default: 10.192.21.0/24 MaxWorkerNodes: Description: The maximum number of workers that can run in the environment Type: Number Default: 2 DagProcessingLogs: Description: Log level for DagProcessing Type: String Default: INFO SchedulerLogsLevel: Description: Log level for SchedulerLogs Type: String Default: INFO TaskLogsLevel: Description: Log level for TaskLogs Type: String Default: INFO WorkerLogsLevel: Description: Log level for WorkerLogs Type: String Default: INFO WebserverLogsLevel: Description: Log level for WebserverLogs Type: String Default: INFO Resources: ##################################################################################################################### # CREATE VPC ##################################################################################################################### VPC: Type: AWS::EC2::VPC Properties: CidrBlock: !Ref VpcCIDR EnableDnsSupport: true EnableDnsHostnames: true Tags: - Key: Name Value: MWAAEnvironment InternetGateway: Type: AWS::EC2::InternetGateway Properties: Tags: - Key: Name Value: MWAAEnvironment InternetGatewayAttachment: Type: AWS::EC2::VPCGatewayAttachment Properties: InternetGatewayId: !Ref InternetGateway VpcId: !Ref VPC PublicSubnet1: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 0, !GetAZs '' ] CidrBlock: !Ref PublicSubnet1CIDR MapPublicIpOnLaunch: true Tags: - Key: Name Value: !Sub ${EnvironmentName} Public Subnet (AZ1) PublicSubnet2: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 1, !GetAZs '' ] CidrBlock: !Ref PublicSubnet2CIDR MapPublicIpOnLaunch: true Tags: - Key: Name Value: !Sub ${EnvironmentName} Public Subnet (AZ2) PrivateSubnet1: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 0, !GetAZs '' ] CidrBlock: !Ref PrivateSubnet1CIDR MapPublicIpOnLaunch: false Tags: - Key: Name Value: !Sub ${EnvironmentName} Private Subnet (AZ1) PrivateSubnet2: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 1, !GetAZs '' ] CidrBlock: !Ref PrivateSubnet2CIDR MapPublicIpOnLaunch: false Tags: - Key: Name Value: !Sub ${EnvironmentName} Private Subnet (AZ2) NatGateway1EIP: Type: AWS::EC2::EIP DependsOn: InternetGatewayAttachment Properties: Domain: vpc NatGateway2EIP: Type: AWS::EC2::EIP DependsOn: InternetGatewayAttachment Properties: Domain: vpc NatGateway1: Type: AWS::EC2::NatGateway Properties: AllocationId: !GetAtt NatGateway1EIP.AllocationId SubnetId: !Ref PublicSubnet1 NatGateway2: Type: AWS::EC2::NatGateway Properties: AllocationId: !GetAtt NatGateway2EIP.AllocationId SubnetId: !Ref PublicSubnet2 PublicRouteTable: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref VPC Tags: - Key: Name Value: !Sub ${EnvironmentName} Public Routes DefaultPublicRoute: Type: AWS::EC2::Route DependsOn: InternetGatewayAttachment Properties: RouteTableId: !Ref PublicRouteTable DestinationCidrBlock: 0.0.0.0/0 GatewayId: !Ref InternetGateway PublicSubnet1RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PublicRouteTable SubnetId: !Ref PublicSubnet1 PublicSubnet2RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PublicRouteTable SubnetId: !Ref PublicSubnet2 PrivateRouteTable1: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref VPC Tags: - Key: Name Value: !Sub ${EnvironmentName} Private Routes (AZ1) DefaultPrivateRoute1: Type: AWS::EC2::Route Properties: RouteTableId: !Ref PrivateRouteTable1 DestinationCidrBlock: 0.0.0.0/0 NatGatewayId: !Ref NatGateway1 PrivateSubnet1RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PrivateRouteTable1 SubnetId: !Ref PrivateSubnet1 PrivateRouteTable2: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref VPC Tags: - Key: Name Value: !Sub ${EnvironmentName} Private Routes (AZ2) DefaultPrivateRoute2: Type: AWS::EC2::Route Properties: RouteTableId: !Ref PrivateRouteTable2 DestinationCidrBlock: 0.0.0.0/0 NatGatewayId: !Ref NatGateway2 PrivateSubnet2RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PrivateRouteTable2 SubnetId: !Ref PrivateSubnet2 SecurityGroup: Type: AWS::EC2::SecurityGroup Properties: GroupName: "mwaa-security-group" GroupDescription: "Security group with a self-referencing inbound rule." VpcId: !Ref VPC SecurityGroupIngress: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: !Ref SecurityGroup IpProtocol: "-1" SourceSecurityGroupId: !Ref SecurityGroup EnvironmentBucket: Type: AWS::S3::Bucket Properties: VersioningConfiguration: Status: Enabled PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true ##################################################################################################################### # CREATE MWAA ##################################################################################################################### MwaaEnvironment: Type: AWS::MWAA::Environment DependsOn: MwaaExecutionPolicy Properties: Name: !Sub "${AWS::StackName}-MwaaEnvironment" SourceBucketArn: !GetAtt EnvironmentBucket.Arn ExecutionRoleArn: !GetAtt MwaaExecutionRole.Arn DagS3Path: dags NetworkConfiguration: SecurityGroupIds: - !GetAtt SecurityGroup.GroupId SubnetIds: - !Ref PrivateSubnet1 - !Ref PrivateSubnet2 WebserverAccessMode: PUBLIC_ONLY MaxWorkers: !Ref MaxWorkerNodes LoggingConfiguration: DagProcessingLogs: LogLevel: !Ref DagProcessingLogs Enabled: true SchedulerLogs: LogLevel: !Ref SchedulerLogsLevel Enabled: true TaskLogs: LogLevel: !Ref TaskLogsLevel Enabled: true WorkerLogs: LogLevel: !Ref WorkerLogsLevel Enabled: true WebserverLogs: LogLevel: !Ref WebserverLogsLevel Enabled: true SecurityGroup: Type: AWS::EC2::SecurityGroup Properties: VpcId: !Ref VPC GroupDescription: !Sub "Security Group for Amazon MWAA Environment ${AWS::StackName}-MwaaEnvironment" GroupName: !Sub "airflow-security-group-${AWS::StackName}-MwaaEnvironment" SecurityGroupIngress: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: !Ref SecurityGroup IpProtocol: "-1" SourceSecurityGroupId: !Ref SecurityGroup SecurityGroupEgress: Type: AWS::EC2::SecurityGroupEgress Properties: GroupId: !Ref SecurityGroup IpProtocol: "-1" CidrIp: "0.0.0.0/0" MwaaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - airflow-env.amazonaws.com - airflow.amazonaws.com Action: - "sts:AssumeRole" Path: "/service-role/" MwaaExecutionPolicy: DependsOn: EnvironmentBucket Type: AWS::IAM::ManagedPolicy Properties: Roles: - !Ref MwaaExecutionRole PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: airflow:PublishMetrics Resource: - !Sub "arn:aws:airflow:${AWS::Region}:${AWS::AccountId}:environment/${EnvironmentName}" - Effect: Deny Action: s3:ListAllMyBuckets Resource: - !Sub "${EnvironmentBucket.Arn}" - !Sub "${EnvironmentBucket.Arn}/*" - Effect: Allow Action: - "s3:GetObject*" - "s3:GetBucket*" - "s3:List*" Resource: - !Sub "${EnvironmentBucket.Arn}" - !Sub "${EnvironmentBucket.Arn}/*" - Effect: Allow Action: - logs:DescribeLogGroups Resource: "*" - Effect: Allow Action: - logs:CreateLogStream - logs:CreateLogGroup - logs:PutLogEvents - logs:GetLogEvents - logs:GetLogRecord - logs:GetLogGroupFields - logs:GetQueryResults - logs:DescribeLogGroups Resource: - !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:airflow-${AWS::StackName}*" - Effect: Allow Action: cloudwatch:PutMetricData Resource: "*" - Effect: Allow Action: - sqs:ChangeMessageVisibility - sqs:DeleteMessage - sqs:GetQueueAttributes - sqs:GetQueueUrl - sqs:ReceiveMessage - sqs:SendMessage Resource: - !Sub "arn:aws:sqs:${AWS::Region}:*:airflow-celery-*" - Effect: Allow Action: - kms:Decrypt - kms:DescribeKey - "kms:GenerateDataKey*" - kms:Encrypt NotResource: !Sub "arn:aws:kms:*:${AWS::AccountId}:key/*" Condition: StringLike: "kms:ViaService": - !Sub "sqs.${AWS::Region}.amazonaws.com" Outputs: VPC: Description: A reference to the created VPC Value: !Ref VPC PublicSubnets: Description: A list of the public subnets Value: !Join [ ",", [ !Ref PublicSubnet1, !Ref PublicSubnet2 ]] PrivateSubnets: Description: A list of the private subnets Value: !Join [ ",", [ !Ref PrivateSubnet1, !Ref PrivateSubnet2 ]] PublicSubnet1: Description: A reference to the public subnet in the 1st Availability Zone Value: !Ref PublicSubnet1 PublicSubnet2: Description: A reference to the public subnet in the 2nd Availability Zone Value: !Ref PublicSubnet2 PrivateSubnet1: Description: A reference to the private subnet in the 1st Availability Zone Value: !Ref PrivateSubnet1 PrivateSubnet2: Description: A reference to the private subnet in the 2nd Availability Zone Value: !Ref PrivateSubnet2 SecurityGroupIngress: Description: Security group with self-referencing inbound rule Value: !Ref SecurityGroupIngress MwaaApacheAirflowUI: Description: MWAA Environment Value: !Sub "https://${MwaaEnvironment.WebserverUrl}"
第二步:使用创建堆栈 Amazon CLI
-
在命令提示符下,导航到存储
mwaa-public-network.yml
的目录。例如:cd mwaaproject
-
输入
aws cloudformation create-stack
命令来使用 Amazon CLI创建堆栈。aws cloudformation create-stack --stack-name mwaa-environment-public-network --template-body file://mwaa-public-network.yml --capabilities CAPABILITY_IAM
注意
创建亚马逊VPC基础设施、Amazon S3 存储桶和亚马逊MWAA环境需要 30 多分钟。
第三步:上传DAG到 Amazon S3 并在 Apache Airflow 用户界面中运行
-
复制支持的最新 Apache Airflow 版本
的 tutorial.py
文件内容,然后在本地另存为tutorial.py
。 -
在命令提示符下,导航到存储
tutorial.py
的目录。例如:cd mwaaproject
-
以下示例列出所有 Amazon S3 存储桶。
aws s3 ls
-
使用以下命令列出 Amazon S3 存储桶中适合环境的文件和文件夹。
aws s3 ls s3://
YOUR_S3_BUCKET_NAME
-
使用以下脚本将
tutorial.py
文件上传到dags
文件夹。将样本值替换为YOUR_S3_BUCKET_NAME
.aws s3 cp tutorial.py s3://
YOUR_S3_BUCKET_NAME
/dags/ -
在 Amazon MWAA 控制台上打开 “环境” 页面
。 -
选择环境。
-
选择打开 Airflow UI。
-
在 Apache Airflow 用户界面上DAGs,从可用列表中选择教程。 DAG
-
在DAG详细信息页面上,选择DAG姓名旁边的 “暂停/取消暂停 DAG” 开关以取消暂停。DAG
-
选择 “触发器” DAG。
第四步:在日志中查看 CloudWatch 日志
你可以在 CloudWatch 控制台中查看 Apache Airflow 日志,了解堆栈启用的所有 Apache Airflow 日志。 Amazon CloudFormation 下一节介绍如何查看 Airflow Web 服务器日志组的日志。
-
在 Amazon MWAA 控制台上打开 “环境” 页面
。 -
选择环境。
-
在监控窗格上选择 Airflow Web 服务器日志组。
-
在日志流中选择
webserver_console_ip
日志。
接下来做什么?
-
详细了解如何上传DAGs,在中指定 Python 依赖关系,
requirements.txt
在中指定自定义插件在 Amazon MWAA 上使用 DAG。plugins.zip
-
要详细了解我们推荐的调整环境性能的最佳实践,请参阅 亚马逊上的 Apache Airflow 的性能调整 MWAA。
-
为环境创建监控面板,请参阅 在 Amazon 上监控控制面板和警报 MWAA。
-
运行中的一些DAG代码示例Amazon MWAA 的代码示例。