适用于 Spark SQL 作业的 Amazon Glue 数据目录支持 - Amazon Glue
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

适用于 Spark SQL 作业的 Amazon Glue 数据目录支持

Amazon Glue 是一个与 Apache Hive 元存储兼容的目录。您可以配置 Amazon Glue 任务和开发端点以使用数据目录作为外部 Apache Hive 元存储。随后,您可以直接对存储在数据目录中的表运行 Apache Spark SQL 查询。预设情况下,Amazon Glue 动态帧与数据目录集成。但是,利用此功能,Spark SQL 任务可以开始使用数据目录作为外部 Hive 元存储。

此功能要求对 Amazon Glue API 端点的网络访问权限。对于连接位于私有子网中的 Amazon Glue 任务,必须配置 VPC 终端节点或 NAT 网关以提供网络访问。有关配置 VPC 终端节点的信息,请参阅 设置对数据存储的网络访问。要创建 NAT 网关,请参阅 Amazon VPC 用户指南中的 NAT 网关

您可以通过将 "--enable-glue-datacatalog": "" 参数分别添加到作业参数和开发终端节点参数来配置 Amazon Glue 作业和开发终端节点。传递此参数将在 Spark 中设置某些配置,使其能够访问数据目录作为外部 Hive 元存储。它还在 Amazon Glue 任务或开发端点中创建的 SparkSession 对象中启用 Hive 支持

要启用数据目录访问,请在控制台的 Add job (添加任务) 或 Add endpoint (添加端点) 页面上,选中 Catalog options (目录选项) 组中的 Use Amazon Glue Data Catalog as the Hive metastore (使用 Amazon Glue 数据目录作为 Hive 元存储) 复选框。请注意,用于作业或开发终端节点的 IAM 角色应具有 glue:CreateDatabase 权限。在数据目录中创建一个名为“default”的数据库(如果该数据库不存在)。

让我们看一下如何在 Spark SQL 作业中使用此功能的示例。以下示例假定您已对 s3://awsglue-datasets/examples/us-legislators 中提供的美国议员数据集进行爬网。

要从 Amazon Glue 数据目录中定义的表中序列化/反序列化数据,Spark SQL 需要在 Spark 任务的类路径中的 Amazon Glue 数据目录中定义的格式的 Hive SerDe 类。

某些常见格式的 SerDes 由 Amazon Glue 分发。以下是这些格式的 Amazon S3 链接:

将 JSON SerDe 作为额外的 JAR 添加到开发终端节点。对于作业,您可以使用参数字段中的 --extra-jars 参数添加 SerDe。有关更多信息,请参阅Amazon Glue 作业参数

以下是用于创建开发终端节点的示例输入 JSON,其中已为 Spark SQL 启用数据目录。

{ "EndpointName": "Name", "RoleArn": "role_ARN", "PublicKey": "public_key_contents", "NumberOfNodes": 2, "Arguments": { "--enable-glue-datacatalog": "" }, "ExtraJarsS3Path": "s3://crawler-public/json/serde/json-serde.jar" }

现在,使用 Spark SQL 查询从美国议员数据集创建的表。

>>> spark.sql("use legislators") DataFrame[] >>> spark.sql("show tables").show() +-----------+------------------+-----------+ | database| tableName|isTemporary| +-----------+------------------+-----------+ |legislators| areas_json| false| |legislators| countries_json| false| |legislators| events_json| false| |legislators| memberships_json| false| |legislators|organizations_json| false| |legislators| persons_json| false| +-----------+------------------+-----------+ >>> spark.sql("describe memberships_json").show() +--------------------+---------+-----------------+ | col_name|data_type| comment| +--------------------+---------+-----------------+ | area_id| string|from deserializer| | on_behalf_of_id| string|from deserializer| | organization_id| string|from deserializer| | role| string|from deserializer| | person_id| string|from deserializer| |legislative_perio...| string|from deserializer| | start_date| string|from deserializer| | end_date| string|from deserializer| +--------------------+---------+-----------------+

如果作业的类路径中没有该格式的 SerDe 类,您将看到与以下内容类似的错误。

>>> spark.sql("describe memberships_json").show() Caused by: MetaException(message:java.lang.ClassNotFoundException Class org.openx.data.jsonserde.JsonSerDe not found) at org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:399) at org.apache.hadoop.hive.ql.metadata.Table.getDeserializerFromMetaStore(Table.java:276) ... 64 more

要仅从 memberships 表中查看不同的 organization_ids,请运行以下 SQL 查询。

>>> spark.sql("select distinct organization_id from memberships_json").show() +--------------------+ | organization_id| +--------------------+ |d56acebe-8fdc-47b...| |8fa6c3d2-71dc-478...| +--------------------+

如果需要对动态帧执行相同操作,请运行以下操作。

>>> memberships = glueContext.create_dynamic_frame.from_catalog(database="legislators", table_name="memberships_json") >>> memberships.toDF().createOrReplaceTempView("memberships") >>> spark.sql("select distinct organization_id from memberships").show() +--------------------+ | organization_id| +--------------------+ |d56acebe-8fdc-47b...| |8fa6c3d2-71dc-478...| +--------------------+

虽然 DynamicFrames 已针对 ETL 操作进行优化,但启用 Spark SQL 以直接访问数据目录提供了一种运行复杂的 SQL 语句或移植现有应用程序的简洁方法。