代码示例:对数据进行联接和关系化 - Amazon Glue
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

代码示例:对数据进行联接和关系化

本示例使用从 http://everypolitician.org/ 下载到 Amazon Simple Storage Service(Amazon S3)中的 sample-dataset 存储桶的数据集:s3://awsglue-datasets/examples/us-legislators/all。该数据集包含有关美国议员及其在美国众议院和参议院中占有的席位的数据(JSON 格式),并且已针对本教程进行了轻微修改且在公共 Amazon S3 存储桶中提供。

您可以在 GitHub 网站上 Amazon Glue 示例存储库join_and_relationalize.py 文件中找到本示例的源代码。

利用此数据,本教程将介绍如何执行以下操作:

  • 使用 Amazon Glue 爬网程序对存储在公有 Amazon S3 存储桶中的对象进行分类并将其架构保存到 Amazon Glue Data Catalog。

  • 检查生成自爬网的表元数据和架构。

  • 编写 Python 提取、转移和加载(ETL)脚本,该脚本使用数据目录中的元数据执行以下操作:

    • 将不同源文件中的数据加入到单个数据表中 (即,使数据非规范化)。

    • 按议员类型筛选已加入到单独的表中的表。

    • 将生成的数据写入单独的 Apache Parquet 文件以供日后分析。

在 Amazon 上运行时,调试 Python 或 PySpark 脚本的首选方法是在 Amazon Glue Studio 上使用笔记本

步骤 1:爬取 Amazon S3 存储桶中的数据

  1. 登录 Amazon Web Services Management Console 并打开位于 https://console.aws.amazon.com/glue/ 的 Amazon Glue 控制台。

  2. 按照 配置爬网程序 中的步骤操作,创建可将 s3://awsglue-datasets/examples/us-legislators/all 数据集网络爬取到 Amazon Glue Data Catalog 中名为 legislators 的数据库的新爬网程序。示例数据已位于此公共 Amazon S3 存储桶中。

  3. 运行新爬网程序,然后检查 legislators 数据库。

    该爬网程序将创建以下元数据表:

    • persons_json

    • memberships_json

    • organizations_json

    • events_json

    • areas_json

    • countries_r_json

    这是一个包含议员及其历史记录的半规范化表集合。

步骤 2:向开发终端节点笔记本中添加样板文件脚本

将以下样板文件脚本粘贴到开发终端节点笔记本中以导入所需的 Amazon Glue 库,然后设置单个 GlueContext

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())

步骤 3:检查数据目录中数据的架构

接下来,您可以轻松地从 Amazon Glue Data Catalog 检查 DynamicFrame,并检查数据的架构。例如,要查看 persons_json 表的架构,请在您的笔记本中添加以下内容:

persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json") print "Count: ", persons.count() persons.printSchema()

下面是来自打印调用的输出:

Count: 1961 root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string

表中的每个人都是某个美国国会机构的成员。

要查看 memberships_json 表的架构,请键入以下内容:

memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json") print "Count: ", memberships.count() memberships.printSchema()

您可以在一个 (扩展) 代码行中执行所有这些操作:

Count: 10439 root |-- area_id: string |-- on_behalf_of_id: string |-- organization_id: string |-- role: string |-- person_id: string |-- legislative_period_id: string |-- start_date: string |-- end_date: string

organizations 是美国国会的党派和两大议院,即参议院和众议院。要查看 organizations_json 表的架构,请键入以下内容:

orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json") print "Count: ", orgs.count() orgs.printSchema()

您可以在一个 (扩展) 代码行中执行所有这些操作:

Count: 13 root |-- classification: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- name: string |-- seats: int |-- type: string

步骤 4:筛选数据

接下来,仅保留您需要的字段,然后将 id 重命名为 org_id。该数据集足够小,方便您完整查看。

toDF()DynamicFrame 转换为 Apache Spark DataFrame,因此您可以应用已存在于 Apache Spark SQL 中的转换:

orgs = orgs.drop_fields(['other_names', 'identifiers']).rename_field( 'id', 'org_id').rename_field( 'name', 'org_name') orgs.toDF().show()

下面显示了输出:

+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+ |classification| org_id| org_name| links|seats| type| image| +--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+ | party| party/al| AL| null| null| null| null| | party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...| | party|party/democrat-li...| Democrat-Liberal|[[website,http://...| null| null| null| | legislature|d56acebe-8fdc-47b...|House of Represen...| null| 435|lower house| null| | party| party/independent| Independent| null| null| null| null| | party|party/new_progres...| New Progressive|[[website,http://...| null| null|https://upload.wi...| | party|party/popular_dem...| Popular Democrat|[[website,http://...| null| null| null| | party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...| | party|party/republican-...|Republican-Conser...|[[website,http://...| null| null| null| | party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...| | party| party/independent| Independent| null| null| null| null| | party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...| | legislature|8fa6c3d2-71dc-478...| Senate| null| 100|upper house| null| +--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+

键入以下内容以查看显示在 memberships 中的 organizations

memberships.select_fields(['organization_id']).toDF().distinct().show()

下面显示了输出:

+--------------------+ | organization_id| +--------------------+ |d56acebe-8fdc-47b...| |8fa6c3d2-71dc-478...| +--------------------+

步骤 5:整合内容

现在,使用 Amazon Glue 联接这些关系表并创建一个包含议员 memberships 及其对应的 organizations 的完整历史记录表。

  1. 首先,联接 idperson_id 上的 personsmemberships

  2. 接下来,将结果与 org_idorganization_id 上的 orgs 联接。

  3. 然后,删除多余的字段 person_idorg_id

您可以在一个 (扩展) 代码行中执行所有这些操作:

l_history = Join.apply(orgs, Join.apply(persons, memberships, 'id', 'person_id'), 'org_id', 'organization_id').drop_fields(['person_id', 'org_id']) print "Count: ", l_history.count() l_history.printSchema()

您可以在一个 (扩展) 代码行中执行所有这些操作:

Count: 10439 root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- death_date: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- family_name: string |-- id: string |-- start_date: string |-- end_date: string

您现在有了可用于分析的最终表。您可以采用紧凑且高效的格式写入该表以供分析(即 Parquet),该格式可让您在 Amazon Glue、Amazon Athena 或 Amazon Redshift Spectrum 中运行 SQL。

以下调用将跨多个文件写入该表以在日后执行分析时支持快速并行读取:

glueContext.write_dynamic_frame.from_options(frame = l_history, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/legislator_history"}, format = "parquet")

要将所有历史记录数据放入单个文件,您必须将其转换为数据帧,为其重新分区然后写入它:

s_history = l_history.toDF().repartition(1) s_history.write.parquet('s3://glue-sample-target/output-dir/legislator_single')

或者,如果您希望按参议院和众议院分隔该数据:

l_history.toDF().write.parquet('s3://glue-sample-target/output-dir/legislator_part', partitionBy=['org_name'])

步骤 6:转换关系数据库的数据

利用 Amazon Glue,您可以轻松将数据写入到关系数据库(如 Amazon Redshift),甚至对半结构化数据也是如此。它提供了一种转换 relationalize,这将展平 DynamicFrames,无论帧中的对象的复杂度可能如何。

使用本示例中的 l_history DynamicFrame,传入根表的名称 (hist_root) 和 relationalize 的临时工作路径。这将返回 DynamicFrameCollection。您随后可以列出该集合中 DynamicFrames 的名称:

dfc = l_history.relationalize("hist_root", "s3://glue-sample-target/temp-dir/") dfc.keys()

以下是 keys 调用的输出:

[u'hist_root', u'hist_root_contact_details', u'hist_root_links', u'hist_root_other_names', u'hist_root_images', u'hist_root_identifiers']

Relationalize 将该历史记录表拆分为 6 个新表:1 个包含 DynamicFrame 中每个对象的记录的根表以及 5 个用于数组的辅助表。关系数据库中的数组处理通常不够理想,尤其是在这些数组变大时。将这些数组分成不同的表会使查询进展得快得多。

接下来,通过检查 contact_details 来查看分隔:

l_history.select_fields('contact_details').printSchema() dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()

以下是 show 调用的输出:

root |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 10| 0| fax| | | 10| 1| | 202-225-1314| | 10| 2| phone| | | 10| 3| | 202-225-3772| | 10| 4| twitter| | | 10| 5| | MikeRossUpdates| | 75| 0| fax| | | 75| 1| | 202-225-7856| | 75| 2| phone| | | 75| 3| | 202-225-2711| | 75| 4| twitter| | | 75| 5| | SenCapito| +---+-----+------------------------+-------------------------+

contact_details 字段是原始 DynamicFrame 中的一个结构数组。这些数组中的每个元素都是辅助表中的单独的行,通过 index 编制索引。此处的 id 是具有键 contact_detailshist_root 表的外键:

dfc.select('hist_root').toDF().where( "contact_details = 10 or contact_details = 75").select( ['id', 'given_name', 'family_name', 'contact_details']).show()

下面是输出:

+--------------------+----------+-----------+---------------+ | id|given_name|family_name|contact_details| +--------------------+----------+-----------+---------------+ |f4fc30ee-7b42-432...| Mike| Ross| 10| |e3c60f34-7d1b-4c0...| Shelley| Capito| 75| +--------------------+----------+-----------+---------------+

请注意,这些命令中先后使用了 toDF()where 表示式来筛选您要查看的行。

因此,将 hist_root 表与辅助表联接可让您执行以下操作:

  • 将数据加载到不支持数组的数据库中。

  • 使用 SQL 查询数组中的每个单独的项目。

使用 Amazon Glue 连接安全地存储和访问您的 Amazon Redshift 凭证。有关如何创建您自己的连接的信息,请参阅 连接到数据

现在,您已准备就绪,可以通过一次遍历一个 DynamicFrames 来将数据写入到连接:

for df_name in dfc.keys(): m_df = dfc.select(df_name) print "Writing to table: ", df_name glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df, connection settings here)

根据您的关系数据库类型,连接设置将有所不同:

结论

总的来说,Amazon Glue 非常灵活。它让您只需几行代码即可完成通常需要编写几天才能实现的功能。您可以在 GitHub 上 join_and_relationalize.py 示例的 Python 文件 Amazon Glue 中找到完整的源到目标 ETL 脚本。