在 Python 中调用 AWS Glue API - AWS Glue
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

如果我们为英文版本指南提供翻译,那么如果存在任何冲突,将以英文版本指南为准。在提供翻译时使用机器翻译。

在 Python 中调用 AWS Glue API

请注意,Boto 3 资源 API 尚不可用于 AWS Glue。目前,只有 Boto 3 客户端 API 可用。

Python 中的 AWS Glue API 名称

Java 和其他编程语言中的 AWS Glue API 名称通常是 CamelCased。但是,当从 Python 调用时,这些通用名称将更改为小写,部分的某些名称用下划线字符隔开,使它们更“Pythonic”。在 AWS Glue API 参考文档中,这些 Pythonic 名称在通用 CamelCased 名称之后列在括号中。

但是,尽管 AWS Glue API 名称自身转换为小写,其参数名称仍保持大写。请务必记住这一点,因为在调用 AWS Glue API 时会按名称传递参数,如下一节中所述。

在 AWS Glue 中传递和访问 Python 参数

在 Python 对 AWS Glue API 的调用中,最好按名称显式传递参数。例如:

job = glue.create_job(Name='sample', Role='Glue_DefaultRole', Command={'Name': 'glueetl', 'ScriptLocation': 's3://my_script_bucket/scripts/my_etl_script.py'})

了解 Python 创建您可以在 Job 结构JobRun 结构 中指定为 ETL 脚本参数的名称/值元组的字典是很有帮助的。然后,Boto 3 通过 REST API 调用,以 JSON 格式将其传递给 AWS Glue。这意味着,当您在脚本中访问这些参数时,不能依赖它们的顺序。

例如,假设您在 Python Lambda 处理程序函数中启动 JobRun,并且您希望指定多个参数。您的代码看起来可能类似于:

from datetime import datetime, timedelta client = boto3.client('glue') def lambda_handler(event, context): last_hour_date_time = datetime.now() - timedelta(hours = 1) day_partition_value = last_hour_date_time.strftime("%Y-%m-%d") hour_partition_value = last_hour_date_time.strftime("%-H") response = client.start_job_run( JobName = 'my_test_Job', Arguments = { '--day_partition_key': 'partition_0', '--hour_partition_key': 'partition_1', '--day_partition_value': day_partition_value, '--hour_partition_value': hour_partition_value } )

要在 ETL 脚本中可靠地访问这些参数,请使用 AWS Glue 的 getResolvedOptions 函数,然后从生成的字典访问它们:

import sys from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ['JOB_NAME', 'day_partition_key', 'hour_partition_key', 'day_partition_value', 'hour_partition_value']) print "The day partition key is: ", args['day_partition_key'] print "and the day partition value is: ", args['day_partition_value']

示例:创建并运行作业

下面的示例显示如何使用 Python 调用 AWS Glue API 来创建和运行 ETL 作业。

创建并运行作业

  1. 创建 AWS Glue 客户端的实例。

    import boto3 glue = boto3.client(service_name='glue', region_name='us-east-1', endpoint_url='https://glue.us-east-1.amazonaws.com')
  2. 创建作业。您必须使用 glueetl 作为名称的 ETL 命令,如以下代码所示:

    myJob = glue.create_job(Name='sample', Role='Glue_DefaultRole', Command={'Name': 'glueetl', 'ScriptLocation': 's3://my_script_bucket/scripts/my_etl_script.py'})
  3. 启动您在上一步中创建的作业的新运行:

    myNewJobRun = glue.start_job_run(JobName=myJob['Name'])
  4. 获取作业状态:

    status = glue.get_job_run(JobName=myJob['Name'], RunId=JobRun['JobRunId'])
  5. 打印作业运行的当前状态:

    print status['JobRun']['JobRunState']