将环境元数据导出到 Amazon S3 上的 CSV 文件 - Amazon Managed Workflows for Apache Airflow
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将环境元数据导出到 Amazon S3 上的 CSV 文件

以下代码示例说明如何创建有向无环图(DAG),该图在数据库中查询一系列 DAG 运行信息,并将数据写入存储在 Amazon S3 上的 .csv 文件。

您可能需要从环境的 Aurora PostgreSQL 数据库中导出信息,以便在本地检查数据,将其存档到对象存储中,或者将它们与诸如 Amazon S3 到 Amazon Redshift 运算符数据库清理之类的工具结合使用,以便将 Amazon MWAA 元数据移出环境,但保留它们以备将来分析。

您可以在数据库中查询 Apache Airflow 模型中列出的任何对象。此代码示例使用三个模型:DagRunTaskFailTaskInstance,它们提供与 DAG 运行相关的信息。

版本

  • 您可以将本页上的代码示例与 Python 3.10 中的 Apache Airflow v2 及更高版本一起使用。

先决条件

要使用本页上的示例代码,您需要以下内容:

权限

Amazon MWAA 需要获得 Amazon S3 操作 s3:PutObject 的权限,才能将查询的元数据信息写入 Amazon S3。将以下策略声明添加到环境的执行角色中。

{ "Effect": "Allow", "Action": "s3:PutObject*", "Resource": "arn:aws:s3:::your-new-export-bucket" }

此策略将写入权限仅限于 your-new-export-bucket

要求

代码示例

以下步骤描述了如何创建 DAG,以查询 Aurora PostgreSQL 并将结果写入新的 Amazon S3 存储桶。

  1. 在您的终端,导航到存储 DAG 代码的目录。例如:

    cd dags
  2. 复制以下代码示例的内容并本地另存为 metadata_to_csv.py。您可以更改分配给 MAX_AGE_IN_DAYS 的值,以控制 DAG 从元数据数据库中查询的最早记录的龄期。

    from airflow.decorators import dag, task from airflow import settings import os import boto3 from airflow.utils.dates import days_ago from airflow.models import DagRun, TaskFail, TaskInstance import csv, re from io import StringIO DAG_ID = os.path.basename(__file__).replace(".py", "") MAX_AGE_IN_DAYS = 30 S3_BUCKET = '<your-export-bucket>' S3_KEY = 'files/export/{0}.csv' # You can add other objects to export from the metadatabase, OBJECTS_TO_EXPORT = [ [DagRun,DagRun.execution_date], [TaskFail,TaskFail.execution_date], [TaskInstance, TaskInstance.execution_date], ] @task() def export_db_task(**kwargs): session = settings.Session() print("session: ",str(session)) oldest_date = days_ago(MAX_AGE_IN_DAYS) print("oldest_date: ",oldest_date) s3 = boto3.client('s3') for x in OBJECTS_TO_EXPORT: query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS)) print("type",type(query)) allrows=query.all() name=re.sub("[<>']", "", str(x[0])) print(name,": ",str(allrows)) if len(allrows) > 0: outfileStr="" f = StringIO(outfileStr) w = csv.DictWriter(f, vars(allrows[0]).keys()) w.writeheader() for y in allrows: w.writerow(vars(y)) outkey = S3_KEY.format(name[6:]) s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue()) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=days_ago(1), ) def export_db(): t = export_db_task() metadb_to_s3_test = export_db()
  3. 运行以下 Amazon CLI 命令将 DAG 复制到环境的存储桶,然后使用 Apache Airflow UI 触发 DAG。

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. 如果成功,输出将类似于在 export_db 任务的任务日志中的以下内容:

    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail :  [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance :  [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK
    [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

    现在,您可以在 /files/export/ 中的新 Amazon S3 存储桶中访问和下载导出的 .csv 文件。