迁移到新的 Amazon MWAA 环境 - Amazon Managed Workflows for Apache Airflow
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

迁移到新的 Amazon MWAA 环境

以下主题描述了将现有 Apache Airflow 工作负载迁移到新的 Amazon MWAA 环境的步骤。您可以使用以下步骤从旧版本的 Amazon MWAA 迁移到新发布版本,或者将自行管理的 Apache Airflow 部署迁移到 Amazon MWAA。本教程假设您正在从现有的 Apache Airflow v1.10.12 迁移到运行 Apache Airflow v2.5.1 的新的 Amazon MWAA,但您可以使用相同的过程从不同的 Apache Airflow 版本迁移或迁移到不同的 Apache Airflow 版本。

先决条件

为了能够完成这些步骤并迁移环境,您需要具备以下条件:

步骤 1:创建一个运行最新支持的 Apache Airflow 版本的 Amazon MWAA 环境

您可以使用《Amazon MWAA 用户指南》中的 Amazon MWAA 入门中的详细步骤或使用 Amazon CloudFormation 模板来创建环境。如果您要从现有的 Amazon MWAA 环境迁移并已使用 Amazon CloudFormation 模板创建旧环境,则可以更改 AirflowVersion 属性以指定新版本。

MwaaEnvironment: Type: AWS::MWAA::Environment DependsOn: MwaaExecutionPolicy Properties: Name: !Sub "${AWS::StackName}-MwaaEnvironment" SourceBucketArn: !GetAtt EnvironmentBucket.Arn ExecutionRoleArn: !GetAtt MwaaExecutionRole.Arn AirflowVersion: 2.5.1 DagS3Path: dags NetworkConfiguration: SecurityGroupIds: - !GetAtt SecurityGroup.GroupId SubnetIds: - !Ref PrivateSubnet1 - !Ref PrivateSubnet2 WebserverAccessMode: PUBLIC_ONLY MaxWorkers: !Ref MaxWorkerNodes LoggingConfiguration: DagProcessingLogs: LogLevel: !Ref DagProcessingLogs Enabled: true SchedulerLogs: LogLevel: !Ref SchedulerLogsLevel Enabled: true TaskLogs: LogLevel: !Ref TaskLogsLevel Enabled: true WorkerLogs: LogLevel: !Ref WorkerLogsLevel Enabled: true WebserverLogs: LogLevel: !Ref WebserverLogsLevel Enabled: true

或者,如果从现有 Amazon MWAA 环境迁移,则可以复制以下 Python 脚本,该脚本使用 Python 的 Amazon SDK(Boto3)来克隆环境。您也可以下载脚本

# This Python file uses the following encoding: utf-8 ''' Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. SPDX-License-Identifier: MIT-0 Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ''' from __future__ import print_function import argparse import json import socket import time import re import sys from datetime import timedelta from datetime import datetime import boto3 from botocore.exceptions import ClientError, ProfileNotFound from boto3.session import Session ENV_NAME = "" REGION = "" def verify_boto3(boto3_current_version): ''' check if boto3 version is valid, must be 1.17.80 and up return true if all dependenceis are valid, false otherwise ''' valid_starting_version = '1.17.80' if boto3_current_version == valid_starting_version: return True ver1 = boto3_current_version.split('.') ver2 = valid_starting_version.split('.') for i in range(max(len(ver1), len(ver2))): num1 = int(ver1[i]) if i < len(ver1) else 0 num2 = int(ver2[i]) if i < len(ver2) else 0 if num1 > num2: return True elif num1 < num2: return False return False def get_account_id(env_info): ''' Given the environment metadata, fetch the account id from the environment ARN ''' return env_info['Arn'].split(":")[4] def validate_envname(env_name): ''' verify environment name doesn't have path to files or unexpected input ''' if re.match(r"^[a-zA-Z][0-9a-zA-Z-_]*$", env_name): return env_name raise argparse.ArgumentTypeError("%s is an invalid environment name value" % env_name) def validation_region(input_region): ''' verify environment name doesn't have path to files or unexpected input REGION: example is us-east-1 ''' session = Session() mwaa_regions = session.get_available_regions('mwaa') if input_region in mwaa_regions: return input_region raise argparse.ArgumentTypeError("%s is an invalid REGION value" % input_region) def validation_profile(profile_name): ''' verify profile name doesn't have path to files or unexpected input ''' if re.match(r"^[a-zA-Z0-9]*$", profile_name): return profile_name raise argparse.ArgumentTypeError("%s is an invalid profile name value" % profile_name) def validation_version(version_name): ''' verify profile name doesn't have path to files or unexpected input ''' if re.match(r"[1-2].\d.\d", version_name): return version_name raise argparse.ArgumentTypeError("%s is an invalid version name value" % version_name) def validation_execution_role(execution_role_arn): ''' verify profile name doesn't have path to files or unexpected input ''' if re.match(r'(?i)\b((?:[a-z][\w-]+:(?:/{1,3}|[a-z0-9%])|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:\'".,<>?«»“”‘’]))', execution_role_arn): return execution_role_arn raise argparse.ArgumentTypeError("%s is an invalid execution role ARN" % execution_role_arn) def create_new_env(env): ''' method to duplicate env ''' mwaa = boto3.client('mwaa', region_name=REGION) print('Source Environment') print(env) if (env['AirflowVersion']=="1.10.12") and (VERSION=="2.2.2"): if env['AirflowConfigurationOptions']['secrets.backend']=='airflow.contrib.secrets.aws_secrets_manager.SecretsManagerBackend': print('swapping',env['AirflowConfigurationOptions']['secrets.backend']) env['AirflowConfigurationOptions']['secrets.backend']='airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend' env['LoggingConfiguration']['DagProcessingLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['SchedulerLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['TaskLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['WebserverLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['WorkerLogs'].pop('CloudWatchLogGroupArn') env['AirflowVersion']=VERSION env['ExecutionRoleArn']=EXECUTION_ROLE_ARN env['Name']=ENV_NAME_NEW env.pop('Arn') env.pop('CreatedAt') env.pop('LastUpdate') env.pop('ServiceRoleArn') env.pop('Status') env.pop('WebserverUrl') if not env['Tags']: env.pop('Tags') print('Destination Environment') print(env) return mwaa.create_environment(**env) def get_mwaa_env(input_env_name): # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/mwaa.html#MWAA.Client.get_environment mwaa = boto3.client('mwaa', region_name=REGION) environment = mwaa.get_environment( Name=input_env_name )['Environment'] return environment def print_err_msg(c_err): '''short method to handle printing an error message if there is one''' print('Error Message: {}'.format(c_err.response['Error']['Message'])) print('Request ID: {}'.format(c_err.response['ResponseMetadata']['RequestId'])) print('Http code: {}'.format(c_err.response['ResponseMetadata']['HTTPStatusCode'])) # # Main # # Usage: # python3 clone_environment.py --envname MySourceEnv --envnamenew MyDestEnv --region us-west-2 --execution_role AmazonMWAA-MyDestEnv-ExecutionRole --version 2.2.2 # # based on https://github.com/awslabs/aws-support-tools/blob/master/MWAA/verify_env/verify_env.py # if __name__ == '__main__': if sys.version_info[0] < 3: print("python2 detected, please use python3. Will try to run anyway") if not verify_boto3(boto3.__version__): print("boto3 version ", boto3.__version__, "is not valid for this script. Need 1.17.80 or higher") print("please run pip install boto3 --upgrade --user") sys.exit(1) parser = argparse.ArgumentParser() parser.add_argument('--envname', type=validate_envname, required=True, help="name of the source MWAA environment") parser.add_argument('--region', type=validation_region, default=boto3.session.Session().region_name, required=False, help="region, Ex: us-east-1") parser.add_argument('--profile', type=validation_profile, default=None, required=False, help="AWS CLI profile, Ex: dev") parser.add_argument('--version', type=validation_version, default="2.2.2", required=False, help="Airflow destination version, Ex: 2.2.2") parser.add_argument('--execution_role', type=validation_execution_role, default=None, required=True, help="New environment execution role ARN, Ex: arn:aws:iam::112233445566:role/service-role/AmazonMWAA-MyEnvironment-ExecutionRole") parser.add_argument('--envnamenew', type=validate_envname, required=True, help="name of the destination MWAA environment") args, _ = parser.parse_known_args() ENV_NAME = args.envname REGION = args.region PROFILE = args.profile VERSION = args.version EXECUTION_ROLE_ARN = args.execution_role ENV_NAME_NEW = args.envnamenew try: print("PROFILE",PROFILE) if PROFILE: boto3.setup_default_session(profile_name=PROFILE) env = get_mwaa_env(ENV_NAME) response = create_new_env(env) print(response) except ClientError as client_error: if client_error.response['Error']['Code'] == 'LimitExceededException': print_err_msg(client_error) print('please retry the script') elif client_error.response['Error']['Code'] in ['AccessDeniedException', 'NotAuthorized']: print_err_msg(client_error) print('please verify permissions used have permissions documented in readme') elif client_error.response['Error']['Code'] == 'InternalFailure': print_err_msg(client_error) print('please retry the script') else: print_err_msg(client_error) except ProfileNotFound as profile_not_found: print('profile', PROFILE, 'does not exist, please doublecheck the profile name') except IndexError as error: print("Error:", error)

步骤 2:迁移工作流程资源

Apache Airflow v2 是主要版本。如果要从 Apache Airflow v1 迁移,则必须准备工作流程资源并验证对 DAG、要求和插件所做的更改。为此,我们建议使用 Docker 和 Amazon MWAA 本地运行器在本地操作系统上配置 Apache Airflow 的版本。Amazon MWAA 本地运行器提供了命令行界面(CLI)实用工具,可在本地复制 Amazon MWAA 环境。

每当您更改 Apache Airflow 版本时,请确保在 requirements.txt引用正确的 --constraint URL。

要迁移工作流程资源,请执行以下操作
  1. 创建 aws-mwaa-local-runner 存储库的分支,然后克隆 Amazon MWAA 本地运行器的副本。

  2. 查看 aws-mwaa-local-runner 存储库的 v1.10.15 分支。Apache Airflow 发布 v1.10.15 作为桥版本,以帮助迁移到 Apache Airflow v2,尽管 Amazon MWAA 不支持 v1.10.15,但您可以使用 Amazon MWAA 本地运行器来测试资源。

  3. 使用 Amazon MWAA 本地运行器 CLI 工具来构建 Docker 映像并在本地运行 Apache Airflow。有关更多信息,请参阅 GitHub 存储库中的 README

  4. 使用本地运行的 Apache Airflow,按照 Apache Airflow 文档网站从 1.10 升级到 2 中描述的步骤进行操作。

    1. 要更新 requirements.txt,请按照《Amazon MWAA 用户指南》管理 Python 依赖项中推荐的最佳实践进行操作。

    2. 如果您已将自定义运算符和传感器与现有 Apache Airflow v1.10.12 环境的插件捆绑在一起,请将其移动至 DAG 文件夹。有关 Apache Airflow v2+ 模块管理最佳实践的更多信息,请参阅 Apache Airflow 文档网站中的模块管理

  5. 对工作流程资源进行必要的更改后,请查看 aws-mwaa-local-runner 存储库的 v2.5.1 分支,然后在本地测试更新的工作流程 DAG、要求和自定义插件。如果您要迁移到其他 Apache Airflow 版本,则可以改为使用适合您版本的本地运行器分支。

  6. 成功测试工作流程资源后,将 DAG、requirements.txt 和插件复制到您使用新 Amazon MWAA 环境配置的 Amazon S3 存储桶中。

步骤 3:从现有环境中导出元数据

当您将更新的 DAG 文件复制到环境的 Amazon S3 存储桶并且计划程序对其进行解析时,Apache Airflow 元数据表格(例如dagdag_tagdag_code)会自动填充。与权限相关的表格也会根据 IAM 执行角色权限自动填充。您不需要迁移它们。

如果需要,您可以迁移与 DAG 历史记录、variableslot_poolsla_miss 以及(如果需要)xcomjoblog 表格相关的数据。任务实例日志存储在 airflow-{environment_name} 日志组下的 CloudWatch Logs 中。如果要查看较早运行的任务实例日志,则必须将这些日志复制到新的环境日志组中。我们建议您只移动几天的日志,以降低相关成本。

如果您要从现有的 Amazon MWAA 环境迁移,则无法直接访问元数据数据库。您必须运行 DAG 才能将元数据从现有 Amazon MWAA 环境导出到您选择的 Amazon S3 存储桶。如果您要从自行管理的环境迁移,也可以使用以下步骤导出 Apache Airflow 元数据。

导出数据后,您可以在新的环境中运行 DAG 来导入数据。在导出和导入过程中,所有其他 DAG 都将暂停。

要从现有环境中导出元数据,请执行以下操作
  1. 使用 Amazon CLI 创建 Amazon S3 存储桶,用于存储导出的数据。用您的信息替换 UUIDregion

    $ aws s3api create-bucket \ --bucket mwaa-migration-{UUID}\ --region {region}
    注意

    如果您要迁移敏感数据,例如存储在变量中的连接,我们建议您为 Amazon S3 存储桶启用默认加密

  2. 注意

    不适用于从自行管理的环境中迁移。

    修改现有环境的执行角色并添加以下策略以授予对您在步骤 1 中创建的存储桶的写入权限。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject*" ], "Resource": [ "arn:aws:s3:::mwaa-migration-{UUID}/*" ] } ] }
  3. 克隆 amazon-mwaa-examples 存储库,然后导航到适合迁移场景的 metadata-migration 子目录。

    $ git clone https://github.com/aws-samples/amazon-mwaa-examples.git $ cd amazon-mwaa-examples/usecases/metadata-migration/existing-version-new-version/
  4. export_data.py 中,将字符串值替换为 S3_BUCKET,即您创建的用于存储导出的元数据的 Amazon S3 存储桶。

    S3_BUCKET = 'mwaa-migration-{UUID}'
  5. requirements.txt 文件放入 metadata-migration 目录中。如果您已有现有环境的要求文件,请将 requirements.txt 中指定的额外要求添加到您的文件中。如果您没有现有的要求文件,则只需使用 metadata-migration 目录中提供的要求文件即可。

  6. export_data.py 复制到与现有环境相关联的 Amazon S3 存储桶的 DAG 目录中。如果从自行管理的环境中迁移,请将 export_data.py 复制到 /dags 文件夹。

  7. 将更新内容 requirements.txt 复制到与现有环境相关联的 Amazon S3 存储桶,然后编辑环境以指定新的 requirements.txt 版本。

  8. 环境更新后,访问 Apache Airflow UI,取消暂停 db_export DAG,然后触发工作流程运行。

  9. 确认元数据已导出到 mwaa-migration-{UUID} Amazon S3 存储桶中的 data/migration/existing-version_to_new-version/export/ 中,且每个表格都位于自己的专用文件中。

步骤 4:将元数据导入新环境

要将元数据导入新环境,请执行以下操作
  1. import_data.py 中,将以下内容的字符串值替换为您的信息。

    • 要从现有 Amazon MWAA 环境迁移,请执行以下操作:

      S3_BUCKET = 'mwaa-migration-{UUID}' OLD_ENV_NAME='{old_environment_name}' NEW_ENV_NAME='{new_environment_name}' TI_LOG_MAX_DAYS = {number_of_days}

      MAX_DAYS 控制工作流程将指定天数的日志文件复制到新环境。

    • 要从自行管理环境迁移,请执行以下操作:

      S3_BUCKET = 'mwaa-migration-{UUID}' NEW_ENV_NAME='{new_environment_name}'
  2. (可选)import_data.py 仅复制失败的任务日志。如果要复制所有任务日志,请修改 getDagTasks 函数并删除 ti.state = 'failed',如以下代码片段所示。

    def getDagTasks(): session = settings.Session() dagTasks = session.execute(f"select distinct ti.dag_id, ti.task_id, date(r.execution_date) as ed \ from task_instance ti, dag_run r where r.execution_date > current_date - {TI_LOG_MAX_DAYS} and \ ti.dag_id=r.dag_id and ti.run_id = r.run_id order by ti.dag_id, date(r.execution_date);").fetchall() return dagTasks
  3. 修改新环境的执行角色并添加以下策略。权限策略允许 Amazon MWAA 从您导出 Apache Airflow 元数据的 Amazon S3 存储桶中读取数据,并允许从现有日志组复制任务实例日志。用您的信息替换所有占位符。

    注意

    如果您要从自行管理的环境迁移,则必须从策略中移除 CloudWatch Logs 相关权限。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:GetLogEvents", "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:{region}:{account_number}:log-group:airflow-{old_environment_name}*" ] }, { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::mwaa-migration-{UUID}", "arn:aws:s3:::mwaa-migration-{UUID}/*" ] } ] }
  4. import_data.py 复制到与新环境关联的 Amazon S3 存储桶的 DAG 目录中,然后访问 Apache Airflow UI 取消暂停 db_import DAG 并触发工作流程。几分钟后,新的 DAG 将出现在 Apache Airflow UI 中。

  5. DAG 运行完成后,访问每个 DAG 来验证 DAG 运行历史记录是否已复制。

后续步骤

  • 有关可用 Amazon MWAA 环境类和功能的更多信息,请参阅《Amazon MWAA 用户指南》中的 Amazon MWAA 环境类

  • 有关 Amazon MWAA 如何处理自动扩缩工作线程的更多信息,请参阅《Amazon MWAA 用户指南》中的Amazon MWAA 自动扩缩

  • 有关 Amazon MWAA REST API 的更多信息,请参阅 Amazon MWAA REST API

  • Apache Airflow 模型(Apache Airflow 文档)— 了解有关 Apache Airflow 元数据数据库模型的更多信息。