

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 `ECSOperator` 连接 Amazon ECS
<a name="samples-ecs-operator"></a>

此主题介绍如何从 Amazon MWAA 使用 `ECSOperator` 连接到 Amazon Elastic Container Service（Amazon ECS）容器。在以下步骤中，您将向环境的执行角色添加所需的权限，使用 Amazon CloudFormation 模板创建 Amazon ECS Fargate 集群，最后创建并上传连接到新集群的 DAG。

**Topics**
+ [版本](#samples-ecs-operator-version)
+ [先决条件](#samples-ecs-operator-prereqs)
+ [Permissions](#samples-ecs-operator-permissions)
+ [创建 Amazon ECS 集群](#create-cfn-template)
+ [代码示例](#samples-ecs-operator-code)

## 版本
<a name="samples-ecs-operator-version"></a>

您可以在 [Python 3.10](https://peps.python.org/pep-0619/) 中将本页上的代码示例与 **Apache Airflow v2** 一起使用，在 [Python 3.11](https://peps.python.org/pep-0664/) 中与 **Apache Airflow v3** 一起使用。

## 先决条件
<a name="samples-ecs-operator-prereqs"></a>

要使用本页上的示例代码，您需要以下内容：
+ [Amazon MWAA 环境。](get-started.md)

## Permissions
<a name="samples-ecs-operator-permissions"></a>
+ 环境的执行角色需要权限才能在 Amazon ECS 中运行任务。您可以将 [Amazonecs\_ FullAccess](https://console.amazonaws.cn/iam/home#policies/arn:aws:iam::aws:policy/AmazonECS_FullAccess$jsonEditor) Amazon托管策略附加到您的执行角色，也可以创建以下策略并将其附加到您的执行角色。

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Sid": "VisualEditor0",
              "Effect": "Allow",
              "Action": [
                  "ecs:RunTask",
                  "ecs:DescribeTasks"
              ],
              "Resource": "*"
          },
          {
              "Action": "iam:PassRole",
              "Effect": "Allow",
              "Resource": [
                  "*"
              ],
              "Condition": {
                  "StringLike": {
                      "iam:PassedToService": "ecs-tasks.amazonaws.com"
                  }
              }
          }
      ]
  }
  ```

------
+ 除了添加在 Amazon ECS 中运行任务所需的权限外，您还必须修改 Amazon MWAA 执行角色中的 CloudWatch 日志策略声明，以允许访问 Amazon ECS 任务日志组，如下所示。Amazon ECS 日志组由中的 Amazon CloudFormation 模板创建[创建 Amazon ECS 集群](#create-cfn-template)。

  ```
  {
    "Effect": "Allow",
    "Action": [
      "logs:CreateLogStream",
      "logs:CreateLogGroup",
      "logs:PutLogEvents",
      "logs:GetLogEvents",
      "logs:GetLogRecord",
      "logs:GetLogGroupFields",
      "logs:GetQueryResults"
    ],
    "Resource": [
      "arn:aws:logs:{{us-east-1}}:{{123456789012}}:log-group:airflow-{{environment-name}}-*",
      "arn:aws:logs:*:*:log-group:{{ecs-mwaa-group}}:*"
    ]
  }
  ```

有关 Amazon MWAA 执行角色，以及如何附加策略的更多信息，请参阅 [执行角色](mwaa-create-role.md)。

## 创建 Amazon ECS 集群
<a name="create-cfn-template"></a>

使用以下 Amazon CloudFormation 模板，您将构建一个 Amazon ECS Fargate 集群，用于您的 Amazon MWAA 工作流程。有关更多信息，请参阅*《Amazon Elastic Container Service 开发人员指南》*中的[创建一个任务定义](https://docs.amazonaws.cn/AmazonECS/latest/developerguide/create-task-definition)。

1. 创建一个包含以下代码的 JSON 文件，并将该文件另存为 `ecs-mwaa-cfn.json`。

   ```
   {
       "AWSTemplateFormatVersion": "2010-09-09",
       "Description": "This template deploys an ECS Fargate cluster with an Amazon Linux image as a test for MWAA.",
       "Parameters": {
           "VpcId": {
               "Type": "AWS::EC2::VPC::Id",
               "Description": "Select a VPC that allows instances access to ECR, as used with MWAA."
           },
           "SubnetIds": {
               "Type": "List<AWS::EC2::Subnet::Id>",
               "Description": "Select at two private subnets in your selected VPC, as used with MWAA."
           },
           "SecurityGroups": {
               "Type": "List<AWS::EC2::SecurityGroup::Id>",
               "Description": "Select at least one security group in your selected VPC, as used with MWAA."
           }
       },
       "Resources": {
           "Cluster": {
               "Type": "AWS::ECS::Cluster",
               "Properties": {
                   "ClusterName": {
                       "Fn::Sub": "${AWS::StackName}-cluster"
                   }
               }
           },
           "LogGroup": {
               "Type": "AWS::Logs::LogGroup",
               "Properties": {
                   "LogGroupName": {
                       "Ref": "AWS::StackName"
                   },
                   "RetentionInDays": 30
               }
           },
           "ExecutionRole": {
               "Type": "AWS::IAM::Role",
               "Properties": {
                   "AssumeRolePolicyDocument": {
                       "Statement": [
                           {
                               "Effect": "Allow",
                               "Principal": {
                                   "Service": "ecs-tasks.amazonaws.com"
                               },
                               "Action": "sts:AssumeRole"
                           }
                       ]
                   },
                   "ManagedPolicyArns": [
                       "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy"
                   ]
               }
           },
           "TaskDefinition": {
               "Type": "AWS::ECS::TaskDefinition",
               "Properties": {
                   "Family": {
                       "Fn::Sub": "${AWS::StackName}-task"
                   },
                   "Cpu": 2048,
                   "Memory": 4096,
                   "NetworkMode": "awsvpc",
                   "ExecutionRoleArn": {
                       "Ref": "ExecutionRole"
                   },
                   "ContainerDefinitions": [
                       {
                           "Name": {
                               "Fn::Sub": "${AWS::StackName}-container"
                           },
                           "Image": "137112412989.dkr.ecr.us-east-1.amazonaws.com/amazonlinux:latest",
                           "PortMappings": [
                               {
                                   "Protocol": "tcp",
                                   "ContainerPort": 8080,
                                   "HostPort": 8080
                               }
                           ],
                           "LogConfiguration": {
                               "LogDriver": "awslogs",
                               "Options": {
                                   "awslogs-region": {
                                       "Ref": "AWS::Region"
                                   },
                                   "awslogs-group": {
                                       "Ref": "LogGroup"
                                   },
                                   "awslogs-stream-prefix": "ecs"
                               }
                           }
                       }
                   ],
                   "RequiresCompatibilities": [
                       "FARGATE"
                   ]
               }
           },
           "Service": {
               "Type": "AWS::ECS::Service",
               "Properties": {
                   "ServiceName": {
                       "Fn::Sub": "${AWS::StackName}-service"
                   },
                   "Cluster": {
                       "Ref": "Cluster"
                   },
                   "TaskDefinition": {
                       "Ref": "TaskDefinition"
                   },
                   "DesiredCount": 1,
                   "LaunchType": "FARGATE",
                   "PlatformVersion": "1.3.0",
                   "NetworkConfiguration": {
                       "AwsvpcConfiguration": {
                           "AssignPublicIp": "ENABLED",
                           "Subnets": {
                               "Ref": "SubnetIds"
                           },
                           "SecurityGroups": {
                               "Ref": "SecurityGroups"
                           }
                       }
                   }
               }
           }
       }
   }
   ```

1. 在命令提示符下，使用以下 Amazon CLI 命令创建新堆栈。您必须将 `SecurityGroups` 和 `SubnetIds` 的值替换为 Amazon MWAA 环境的安全组和子网的值。

   ```
   aws cloudformation create-stack \
   --stack-name {{my-ecs-stack}} --template-body file://ecs-mwaa-cfn.json \
   --parameters ParameterKey=SecurityGroups,ParameterValue={{your-mwaa-security-group}} \
   ParameterKey=SubnetIds,ParameterValue={{your-mwaa-subnet-1}}\\,{{your-mwaa-subnet-1}} \
   --capabilities CAPABILITY_IAM
   ```

   或者，您可以使用以下 Shell 脚本：该脚本使用`[get-environment](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/mwaa/get-environment.html)` Amazon CLI 命令检索环境的安全组和子网所需的值，然后相应地创建堆栈。要运行该脚本，请运行以下命令。

   1. 复制脚本并将其保存到与 Amazon CloudFormation 模板相同的目录中。`ecs-stack-helper.sh`

      ```
      #!/bin/bash
      
      joinByString() {
        local separator="$1"
        shift
        local first="$1"
        shift
        printf "%s" "$first" "${@/#/$separator}"
      }
      
      response=$(aws mwaa get-environment --name $1)
      
      securityGroupId=$(echo "$response" | jq -r '.Environment.NetworkConfiguration.SecurityGroupIds[]')
      subnetIds=$(joinByString '\,' $(echo "$response" | jq -r '.Environment.NetworkConfiguration.SubnetIds[]'))
      
      aws cloudformation create-stack --stack-name $2 --template-body file://ecs-cfn.json \
      --parameters ParameterKey=SecurityGroups,ParameterValue=$securityGroupId \
      ParameterKey=SubnetIds,ParameterValue=$subnetIds \
      --capabilities CAPABILITY_IAM
      ```

   1. 使用以下命令运行该脚本。将 `environment-name` 和 `stack-name` 替换为您的信息。

      ```
      chmod +x ecs-stack-helper.sh
      ./ecs-stack-helper.bash {{environment-name}} {{stack-name}}
      ```

   如果成功，您将参考以下显示您的新 Amazon CloudFormation 堆栈 ID 的输出。

   ```
   {
     "StackId": "arn:aws:cloudformation:us-west-2:123456789012:stack/my-ecs-stack/123456e7-8ab9-01cd-b2fb-36cce63786c9"
   }
   ```

在 Amazon CloudFormation 堆栈完成并 Amazon 预配置 Amazon ECS 资源后，您就可以创建和上传您的 DAG 了。

## 代码示例
<a name="samples-ecs-operator-code"></a>

1. 打开命令提示符，然后导航到存储 DAG 代码的目录。例如：

   ```
   cd dags
   ```

1. 复制以下代码示例的内容并将其本地另存为 `mwaa-ecs-operator.py`，然后将新 DAG 上传到 Amazon S3。

   ```
   from http import client
   from airflow import DAG
   from airflow.providers.amazon.aws.operators.ecs import ECSOperator
   from airflow.utils.dates import days_ago
   import boto3
   
   CLUSTER_NAME="mwaa-ecs-test-cluster" #Replace value for CLUSTER_NAME with your information.
   CONTAINER_NAME="mwaa-ecs-test-container" #Replace value for CONTAINER_NAME with your information.
   LAUNCH_TYPE="FARGATE"
   
   with DAG(
       dag_id = "ecs_fargate_dag",
       schedule_interval=None,
       catchup=False,
       start_date=days_ago(1)
   ) as dag:
       client=boto3.client('ecs')
       services=client.list_services(cluster=CLUSTER_NAME,launchType=LAUNCH_TYPE)
       service=client.describe_services(cluster=CLUSTER_NAME,services=services['serviceArns'])
   
       ecs_operator_task = ECSOperator(
           task_id = "ecs_operator_task",
           dag=dag,
           cluster=CLUSTER_NAME,
           task_definition=service['services'][0]['taskDefinition'],
           launch_type=LAUNCH_TYPE,
           overrides={
               "containerOverrides":[
                   {
                       "name":CONTAINER_NAME,
                       "command":["ls", "-l", "/"],
                   },
               ],
           },
   
           network_configuration=service['services'][0]['networkConfiguration'],
           awslogs_group="mwaa-ecs-zero",
           awslogs_stream_prefix=f"ecs/{CONTAINER_NAME}",
       )
   ```
**注意**  
在 DAG 的示例中，对于 `awslogs_group`，您可能需要使用 Amazon ECS 任务日志组的名称修改日志组。示例假设名为 `mwaa-ecs-zero` 的日志组。对于 `awslogs_stream_prefix`，使用 Amazon ECS 任务日志流前缀。该示例假设日志流前缀为 `ecs`。

1.  运行以下 Amazon CLI 命令将 DAG 复制到环境的存储桶，然后使用 Apache Airflow 用户界面触发 DAG。

   ```
   aws s3 cp {{your-dag}}.py s3://{{your-environment-bucket}}/dags/
   ```

1. 如果成功，您将在 `ecs_fargate_dag` DAG 的任务日志中看到输出，类似于 `ecs_operator_task` 的任务日志中的以下内容：

   ```
   [2022-01-01, 12:00:00 UTC] {{ecs.py:300}} INFO - Running ECS Task -
   Task definition: arn:aws:ecs:us-west-2:123456789012:task-definition/mwaa-ecs-test-task:1 - on cluster mwaa-ecs-test-cluster
   [2022-01-01, 12:00:00 UTC] {{ecs-operator-test.py:302}} INFO - ECSOperator overrides:
   {'containerOverrides': [{'name': 'mwaa-ecs-test-container', 'command': ['ls', '-l', '/']}]}
   .
   .
   .
   [2022-01-01, 12:00:00 UTC] {{ecs.py:379}} INFO - ECS task ID is: e012340b5e1b43c6a757cf012c635935
   [2022-01-01, 12:00:00 UTC] {{ecs.py:313}} INFO - Starting ECS Task Log Fetcher
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] total 52
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 bin -> usr/bin
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x   2 root root 4096 Apr  9  2019 boot
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   5 root root  340 Jul 19 17:54 dev
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   1 root root 4096 Jul 19 17:54 etc
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 home
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 lib -> usr/lib
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    9 Jun 13 18:51 lib64 -> usr/lib64
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:51 local
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 media
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 mnt
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 opt
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x 103 root root    0 Jul 19 17:54 proc
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-x-\-\-   2 root root 4096 Apr  9  2019 root
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:52 run
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    8 Jun 13 18:51 sbin -> usr/sbin
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 srv
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x  13 root root    0 Jul 19 17:54 sys
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxrwxrwt   2 root root 4096 Jun 13 18:51 tmp
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  13 root root 4096 Jun 13 18:51 usr
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  18 root root 4096 Jun 13 18:52 var
   .
   .
   .
   [2022-01-01, 12:00:00 UTC] {{ecs.py:328}} INFO - ECS Task has been successfully executed
   ```