测试蓝图 - AWS Glue
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

测试蓝图

在开发代码时,应执行本地测试以验证工作流布局是否正确。

本地测试不会生成 AWS Glue 作业、爬虫程序或触发器。相反,您可以在本地运行布局脚本并使用to_json()validate()方法打印对象并查找错误。这些方法在库中定义的所有三个类中都可用。

可通过两种方式处理user_paramssystem_params参数,AWS Glue 传递给您的布局函数。您的测试台代码可以创建示例蓝图参数值的字典,并将其作为user_params参数。或者,您可以删除对user_params并用硬编码字符串替换它们。

如果您的代码使用regionaccountId属性中的system_params参数,你可以传入你自己的字典system_params

测试蓝图

  1. 在包含库的目录中启动 Python 解释器,或将蓝图文件和提供的库加载到首选集成开发环境 (IDE) 中。

  2. 确保您的代码导入提供的库。

  3. 将代码添加到布局函数以调用validate()或者to_json()在任何实体上或Workflow对象。例如,如果您的代码创建了Crawler命名为的对象mycrawler,您可以调用validate()详情如下.

    mycrawler.validate()

    您可以打印mycrawler详情如下所示:

    print(mycrawler.to_json())

    如果您调用to_json,则不需要同时调用validate(),由于 to_json()Callsvalidate()

    在工作流对象上调用这些方法非常有用。假设您的脚本命名工作流对象my_workflow,验证并打印工作流对象,如下所示。

    print(my_workflow.to_json())

    有关 to_json()validate() 的更多信息,请参阅类方法

    您还可以导入pprint并精致打印工作流程对象,如本部分后面的示例所示。

  4. 运行代码,修复错误,最后删除对validate()或者to_json()

以下示例说明如何构建示例蓝图参数字典并将其作为user_params参数到布局函数generate_compaction_workflow。它还说明了如何漂亮地打印生成的工作流程对象。

from pprint import pprint from awsglue.blueprint.workflow import * from awsglue.blueprint.job import * from awsglue.blueprint.crawler import * USER_PARAMS = {"WorkflowName": "compaction_workflow", "ScriptLocation": "s3://awsexamplebucket1/scripts/threaded-compaction.py", "PassRole": "arn:aws:iam::111122223333:role/GlueRole-ETL", "DatabaseName": "cloudtrial", "TableName": "ct_cloudtrail", "CoalesceFactor": 4, "MaxThreadWorkers": 200} def generate_compaction_workflow(user_params: dict, system_params: dict) -> Workflow: compaction_job = Job(Name=f"{user_params['WorkflowName']}_etl_job", Command={"Name": "glueetl", "ScriptLocation": user_params['ScriptLocation'], "PythonVersion": "3"}, Role="arn:aws:iam::111122223333:role/AWSGlueServiceRoleDefault", DefaultArguments={"DatabaseName": user_params['DatabaseName'], "TableName": user_params['TableName'], "CoalesceFactor": user_params['CoalesceFactor'], "max_thread_workers": user_params['MaxThreadWorkers']}) catalog_target = {"CatalogTargets": [{"DatabaseName": user_params['DatabaseName'], "Tables": [user_params['TableName']]}]} compacted_files_crawler = Crawler(Name=f"{user_params['WorkflowName']}_post_crawl", Targets = catalog_target, Role=user_params['PassRole'], DependsOn={compaction_job: "SUCCEEDED"}, WaitForDependencies="AND", SchemaChangePolicy={"DeleteBehavior": "LOG"}) compaction_workflow = Workflow(Name=user_params['WorkflowName'], Entities=Entities(Jobs=[compaction_job], Crawlers=[compacted_files_crawler])) return compaction_workflow generated = generate_compaction_workflow(user_params=USER_PARAMS, system_params={}) gen_dict = generated.to_json() pprint(gen_dict)