使用 DAG 在 CLI 中导入变量 - Amazon Managed Workflows for Apache Airflow
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

使用 DAG 在 CLI 中导入变量

以下示例代码会使用 Amazon MWAA 上的 CLI 导入变量。

版本

您可以在 Python 3.10 中将本页上的代码示例与 Apache Airflow v2 一起使用,在 Python 3.11 中与 Apache Airflow v3 一起使用。

先决条件

无需其他权限即可使用本页上的代码示例。

权限

Amazon Web Services 账户 需要访问 AmazonMWAAAirflowCliAccess 策略。要了解更多信息,请参阅 Apache Airflow CLI 策略:AmazonMWAAAirflowCliAccess

依赖项

要在 Apache Airflow v2 和更高版本中使用此代码示例,无需附加依赖项。使用 aws-mwaa-docker-images 安装 Apache Airflow。

代码示例

以下示例代码需要三个输入:Amazon MWAA 环境名称(在 mwaa_env 中)、环境 Amazon Web Services 区域(在 aws_region 中)和包含要导入的变量的本地文件(在 var_file 中)。

import boto3 import json import requests import base64 import getopt import sys argv = sys.argv[1:] mwaa_env='' aws_region='' var_file='' try: opts, args = getopt.getopt(argv, 'e:v:r:', ['environment', 'variable-file','region']) #if len(opts) == 0 and len(opts) > 3: if len(opts) != 3: print ('Usage: -e MWAA environment -v variable file location and filename -r aws region') else: for opt, arg in opts: if opt in ("-e"): mwaa_env=arg elif opt in ("-r"): aws_region=arg elif opt in ("-v"): var_file=arg boto3.setup_default_session(region_name="{}".format(aws_region)) mwaa_env_name = "{}".format(mwaa_env) client = boto3.client('mwaa') mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) with open ("{}".format(var_file), "r") as myfile: fileconf = myfile.read().replace('\n', '') json_dictionary = json.loads(fileconf) for key in json_dictionary: print(key, " ", json_dictionary[key]) val = (key + " " + json_dictionary[key]) mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken'] mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname']) raw_data = "variables set {0}".format(val) mwaa_response = requests.post( mwaa_webserver_hostname, headers={ 'Authorization': mwaa_auth_token, 'Content-Type': 'text/plain' }, data=raw_data ) mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8') mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8') print(mwaa_response.status_code) print(mwaa_std_err_message) print(mwaa_std_out_message) except: print('Use this script with the following options: -e MWAA environment -v variable file location and filename -r aws region') print("Unexpected error:", sys.exc_info()[0]) sys.exit(2)

接下来做什么?

  • 要了解如何将本示例中的 DAG 代码上传到 Amazon S3 存储桶的 dags 文件夹,请参阅 添加或更新 DAG