Amazon MWAA 的快速入门教程 - Amazon Managed Workflows for Apache Airflow
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Amazon MWAA 的快速入门教程

本快速入门教程使用一个 Amazon CloudFormation 模板来同时创建 Amazon VPC 基础设施、带dags文件夹的 Amazon S3 存储桶和适用于 Apache Airflow 的亚马逊托管工作流环境。

在本教程中:

本教程将引导你完成三个 Amazon Command Line Interface (Amazon CLI) 命令,用于将 DAG 上传到 Amazon S3、在 Apache Airflow 中运行 DAG 以及查看登录信息。 CloudWatch最后,它将引导您完成为 Apache Airflow 开发团队创建 IAM 策略 的步骤。

注意

此页面上的 Amazon CloudFormation 模板为最新版本的 Apache Airflow 创建了适用于 Apache Airflow 的亚马逊托管工作流程环境。 Amazon CloudFormation可用的最新版本是 Apache Airflow v2.8.1。

此页面上的 Amazon CloudFormation 模板创建了以下内容:

  • VPC 基础设施。模板使用 通过互联网进行公共路由。它为 WebserverAccessMode: PUBLIC_ONLY 中的 Apache Airflow Web 服务器使用 公有网络访问模式

  • Amazon S3 桶。模板将创建带有 dags 文件夹的 Amazon S3 存储桶。将其配置为在启用存储桶版本控制的情况下阻止所有公共访问,如 为 Amazon MWAA 创建 Amazon S3 存储桶。 中所定义。

  • Amazon MWAA 环境。该模板创建了一个与 Amazon S3 存储桶上的dags文件夹关联的 Amazon MWAA 环境、一个有权访问 Amazon MWAA 使用的 Amazon 服务的执行角色以及使用Amazon 自有密钥进行加密的默认角色,如中所定义。创建 Amazon MWAA 环境

  • CloudWatch 日志。该模板启用 Apache Airflow 在 CloudWatch “信息” 及以上级别登录 Apache Airflow 调度程序日志组、A irflow Web 服务器日志组、Airflow 工作日志组、Air flo w DAG 处理日志组和 Airflo w 任务日志组,如中所定义。在 Amazon 中查看气流日志 CloudWatch

在本教程中,您将完成以下任务:

  • 上传并运行 DAG。将 Amazon MWAA 支持的最新 Apache Airflow 版本的 Apache Airflow 教程 DAG 上传到 Amazon S3,然后在 Apache Airflow UI 中运行,如 添加或更新 DAG 中所定义。

  • 查看日志。在 “日志” 中查看 Airflow Web 服务器 CloudWatch 日志组,如中所定义。在 Amazon 中查看气流日志 CloudWatch

  • 创建访问控制策略。在 IAM 中为 Apache Airflow 开发团队创建访问控制策略,如 访问 Amazon MWAA 环境 中所定义。

先决条件

Amazon Command Line Interface (Amazon CLI) 是一个开源工具,可让您使用命令行 shell 中的命令与 Amazon 服务进行交互。要完成本节中的步骤,您需要以下满足以下条件:

第一步:将 Amazon CloudFormation 模板保存到本地

  • 复制以下模板的内容并将其作为 mwaa_public_network.yml 保存在本地中。您也可以使用下载模板

    AWSTemplateFormatVersion: "2010-09-09" Parameters: EnvironmentName: Description: An environment name that is prefixed to resource names Type: String Default: MWAAEnvironment VpcCIDR: Description: The IP range (CIDR notation) for this VPC Type: String Default: 10.192.0.0/16 PublicSubnet1CIDR: Description: The IP range (CIDR notation) for the public subnet in the first Availability Zone Type: String Default: 10.192.10.0/24 PublicSubnet2CIDR: Description: The IP range (CIDR notation) for the public subnet in the second Availability Zone Type: String Default: 10.192.11.0/24 PrivateSubnet1CIDR: Description: The IP range (CIDR notation) for the private subnet in the first Availability Zone Type: String Default: 10.192.20.0/24 PrivateSubnet2CIDR: Description: The IP range (CIDR notation) for the private subnet in the second Availability Zone Type: String Default: 10.192.21.0/24 MaxWorkerNodes: Description: The maximum number of workers that can run in the environment Type: Number Default: 2 DagProcessingLogs: Description: Log level for DagProcessing Type: String Default: INFO SchedulerLogsLevel: Description: Log level for SchedulerLogs Type: String Default: INFO TaskLogsLevel: Description: Log level for TaskLogs Type: String Default: INFO WorkerLogsLevel: Description: Log level for WorkerLogs Type: String Default: INFO WebserverLogsLevel: Description: Log level for WebserverLogs Type: String Default: INFO Resources: ##################################################################################################################### # CREATE VPC ##################################################################################################################### VPC: Type: AWS::EC2::VPC Properties: CidrBlock: !Ref VpcCIDR EnableDnsSupport: true EnableDnsHostnames: true Tags: - Key: Name Value: MWAAEnvironment InternetGateway: Type: AWS::EC2::InternetGateway Properties: Tags: - Key: Name Value: MWAAEnvironment InternetGatewayAttachment: Type: AWS::EC2::VPCGatewayAttachment Properties: InternetGatewayId: !Ref InternetGateway VpcId: !Ref VPC PublicSubnet1: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 0, !GetAZs '' ] CidrBlock: !Ref PublicSubnet1CIDR MapPublicIpOnLaunch: true Tags: - Key: Name Value: !Sub ${EnvironmentName} Public Subnet (AZ1) PublicSubnet2: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 1, !GetAZs '' ] CidrBlock: !Ref PublicSubnet2CIDR MapPublicIpOnLaunch: true Tags: - Key: Name Value: !Sub ${EnvironmentName} Public Subnet (AZ2) PrivateSubnet1: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 0, !GetAZs '' ] CidrBlock: !Ref PrivateSubnet1CIDR MapPublicIpOnLaunch: false Tags: - Key: Name Value: !Sub ${EnvironmentName} Private Subnet (AZ1) PrivateSubnet2: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 1, !GetAZs '' ] CidrBlock: !Ref PrivateSubnet2CIDR MapPublicIpOnLaunch: false Tags: - Key: Name Value: !Sub ${EnvironmentName} Private Subnet (AZ2) NatGateway1EIP: Type: AWS::EC2::EIP DependsOn: InternetGatewayAttachment Properties: Domain: vpc NatGateway2EIP: Type: AWS::EC2::EIP DependsOn: InternetGatewayAttachment Properties: Domain: vpc NatGateway1: Type: AWS::EC2::NatGateway Properties: AllocationId: !GetAtt NatGateway1EIP.AllocationId SubnetId: !Ref PublicSubnet1 NatGateway2: Type: AWS::EC2::NatGateway Properties: AllocationId: !GetAtt NatGateway2EIP.AllocationId SubnetId: !Ref PublicSubnet2 PublicRouteTable: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref VPC Tags: - Key: Name Value: !Sub ${EnvironmentName} Public Routes DefaultPublicRoute: Type: AWS::EC2::Route DependsOn: InternetGatewayAttachment Properties: RouteTableId: !Ref PublicRouteTable DestinationCidrBlock: 0.0.0.0/0 GatewayId: !Ref InternetGateway PublicSubnet1RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PublicRouteTable SubnetId: !Ref PublicSubnet1 PublicSubnet2RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PublicRouteTable SubnetId: !Ref PublicSubnet2 PrivateRouteTable1: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref VPC Tags: - Key: Name Value: !Sub ${EnvironmentName} Private Routes (AZ1) DefaultPrivateRoute1: Type: AWS::EC2::Route Properties: RouteTableId: !Ref PrivateRouteTable1 DestinationCidrBlock: 0.0.0.0/0 NatGatewayId: !Ref NatGateway1 PrivateSubnet1RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PrivateRouteTable1 SubnetId: !Ref PrivateSubnet1 PrivateRouteTable2: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref VPC Tags: - Key: Name Value: !Sub ${EnvironmentName} Private Routes (AZ2) DefaultPrivateRoute2: Type: AWS::EC2::Route Properties: RouteTableId: !Ref PrivateRouteTable2 DestinationCidrBlock: 0.0.0.0/0 NatGatewayId: !Ref NatGateway2 PrivateSubnet2RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PrivateRouteTable2 SubnetId: !Ref PrivateSubnet2 SecurityGroup: Type: AWS::EC2::SecurityGroup Properties: GroupName: "mwaa-security-group" GroupDescription: "Security group with a self-referencing inbound rule." VpcId: !Ref VPC SecurityGroupIngress: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: !Ref SecurityGroup IpProtocol: "-1" SourceSecurityGroupId: !Ref SecurityGroup EnvironmentBucket: Type: AWS::S3::Bucket Properties: VersioningConfiguration: Status: Enabled PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true ##################################################################################################################### # CREATE MWAA ##################################################################################################################### MwaaEnvironment: Type: AWS::MWAA::Environment DependsOn: MwaaExecutionPolicy Properties: Name: !Sub "${AWS::StackName}-MwaaEnvironment" SourceBucketArn: !GetAtt EnvironmentBucket.Arn ExecutionRoleArn: !GetAtt MwaaExecutionRole.Arn DagS3Path: dags NetworkConfiguration: SecurityGroupIds: - !GetAtt SecurityGroup.GroupId SubnetIds: - !Ref PrivateSubnet1 - !Ref PrivateSubnet2 WebserverAccessMode: PUBLIC_ONLY MaxWorkers: !Ref MaxWorkerNodes LoggingConfiguration: DagProcessingLogs: LogLevel: !Ref DagProcessingLogs Enabled: true SchedulerLogs: LogLevel: !Ref SchedulerLogsLevel Enabled: true TaskLogs: LogLevel: !Ref TaskLogsLevel Enabled: true WorkerLogs: LogLevel: !Ref WorkerLogsLevel Enabled: true WebserverLogs: LogLevel: !Ref WebserverLogsLevel Enabled: true SecurityGroup: Type: AWS::EC2::SecurityGroup Properties: VpcId: !Ref VPC GroupDescription: !Sub "Security Group for Amazon MWAA Environment ${AWS::StackName}-MwaaEnvironment" GroupName: !Sub "airflow-security-group-${AWS::StackName}-MwaaEnvironment" SecurityGroupIngress: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: !Ref SecurityGroup IpProtocol: "-1" SourceSecurityGroupId: !Ref SecurityGroup SecurityGroupEgress: Type: AWS::EC2::SecurityGroupEgress Properties: GroupId: !Ref SecurityGroup IpProtocol: "-1" CidrIp: "0.0.0.0/0" MwaaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - airflow-env.amazonaws.com - airflow.amazonaws.com Action: - "sts:AssumeRole" Path: "/service-role/" MwaaExecutionPolicy: DependsOn: EnvironmentBucket Type: AWS::IAM::ManagedPolicy Properties: Roles: - !Ref MwaaExecutionRole PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: airflow:PublishMetrics Resource: - !Sub "arn:aws:airflow:${AWS::Region}:${AWS::AccountId}:environment/${EnvironmentName}" - Effect: Deny Action: s3:ListAllMyBuckets Resource: - !Sub "${EnvironmentBucket.Arn}" - !Sub "${EnvironmentBucket.Arn}/*" - Effect: Allow Action: - "s3:GetObject*" - "s3:GetBucket*" - "s3:List*" Resource: - !Sub "${EnvironmentBucket.Arn}" - !Sub "${EnvironmentBucket.Arn}/*" - Effect: Allow Action: - logs:DescribeLogGroups Resource: "*" - Effect: Allow Action: - logs:CreateLogStream - logs:CreateLogGroup - logs:PutLogEvents - logs:GetLogEvents - logs:GetLogRecord - logs:GetLogGroupFields - logs:GetQueryResults - logs:DescribeLogGroups Resource: - !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:airflow-${AWS::StackName}*" - Effect: Allow Action: cloudwatch:PutMetricData Resource: "*" - Effect: Allow Action: - sqs:ChangeMessageVisibility - sqs:DeleteMessage - sqs:GetQueueAttributes - sqs:GetQueueUrl - sqs:ReceiveMessage - sqs:SendMessage Resource: - !Sub "arn:aws:sqs:${AWS::Region}:*:airflow-celery-*" - Effect: Allow Action: - kms:Decrypt - kms:DescribeKey - "kms:GenerateDataKey*" - kms:Encrypt NotResource: !Sub "arn:aws:kms:*:${AWS::AccountId}:key/*" Condition: StringLike: "kms:ViaService": - !Sub "sqs.${AWS::Region}.amazonaws.com" Outputs: VPC: Description: A reference to the created VPC Value: !Ref VPC PublicSubnets: Description: A list of the public subnets Value: !Join [ ",", [ !Ref PublicSubnet1, !Ref PublicSubnet2 ]] PrivateSubnets: Description: A list of the private subnets Value: !Join [ ",", [ !Ref PrivateSubnet1, !Ref PrivateSubnet2 ]] PublicSubnet1: Description: A reference to the public subnet in the 1st Availability Zone Value: !Ref PublicSubnet1 PublicSubnet2: Description: A reference to the public subnet in the 2nd Availability Zone Value: !Ref PublicSubnet2 PrivateSubnet1: Description: A reference to the private subnet in the 1st Availability Zone Value: !Ref PrivateSubnet1 PrivateSubnet2: Description: A reference to the private subnet in the 2nd Availability Zone Value: !Ref PrivateSubnet2 SecurityGroupIngress: Description: Security group with self-referencing inbound rule Value: !Ref SecurityGroupIngress MwaaApacheAirflowUI: Description: MWAA Environment Value: !Sub "https://${MwaaEnvironment.WebserverUrl}"

第二步:使用创建堆栈 Amazon CLI

  1. 在命令提示符下,导航到存储 mwaa_public_network.yml 的目录。例如:

    cd mwaaproject
  2. 输入 aws cloudformation create-stack 命令来使用 Amazon CLI 创建堆栈。

    aws cloudformation create-stack --stack-name mwaa-environment-public-network --template-body file://mwaa_public_network.yml --capabilities CAPABILITY_IAM
    注意

    创建 Amazon VPC 基础设施、Amazon S3 存储桶和 Amazon MWAA 环境需要 30 多分钟。

步骤 3:将 DAG 上传到 Amazon S3 并在 Apache Airflow UI 中运行

  1. 复制支持的最新 Apache Airflow 版本tutorial.py 文件内容,然后在本地另存为 tutorial.py

  2. 在命令提示符下,导航到存储 tutorial.py 的目录。例如:

    cd mwaaproject
  3. 以下示例列出所有 Amazon S3 存储桶。

    aws s3 ls
  4. 使用以下命令列出 Amazon S3 存储桶中适合环境的文件和文件夹。

    aws s3 ls s3://YOUR_S3_BUCKET_NAME
  5. 使用以下脚本将 tutorial.py 文件上传到 dags 文件夹。替换 YOUR_S3_BUCKET_NAME 中的示例值。

    aws s3 cp tutorial.py s3://YOUR_S3_BUCKET_NAME/dags/
  6. 在 Amazon MWAA 控制台上打开环境页面

  7. 选择环境。

  8. 选择打开 Airflow UI

  9. 在 Apache Airflow UI 上,从可用 DAG 列表中选择教程 DAG。

  10. 在 DAG 详细信息页面上,选择 DAG 名称旁边的暂停/取消暂停 DAG 开关以取消暂停 DAG。

  11. 选择触发 DAG

第四步:在日志中查看 CloudWatch 日志

你可以在 CloudWatch 控制台中查看 Apache Airflow 日志,了解堆栈启用的所有 Apache Airflow 日志。 Amazon CloudFormation 下一节介绍如何查看 Airflow Web 服务器日志组的日志。

  1. 在 Amazon MWAA 控制台上打开环境页面

  2. 选择环境。

  3. 监控窗格上选择 Airflow Web 服务器日志组

  4. 日志流中选择 webserver_console_ip 日志。

接下来做什么?