代码示例:联接和关系化数据 - Amazon连接词
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

代码示例:联接和关系化数据

本示例使用从http://everypolitician.org/添加到sample-dataset存储桶在 Amazon Simple Storage Service (Amazon S3) 中:s3://awsglue-datasets/examples/us-legislators/all。 该数据集包含有关美国议员及其在美国众议院和参议院中占有的席位的数据(JSON 格式),并且已针对本教程进行了轻微修改且在公共 Amazon S3 存储桶中提供。

源代码在join_and_relationalize.py中的AmazonGlue 样品库在 GitHub 网站上。

利用此数据,本教程将介绍如何执行以下操作:

  • 使用AmazonGlue 合爬网程序对存储在公共 Amazon S3 存储桶中的对象进行分类并将其架构保存到AmazonGlue 数据目录。

  • 检查生成自爬网的表元数据和架构。

  • 编写 Python 提取、转移和加载 (ETL) 脚本,该脚本使用数据目录中的元数据执行以下操作:

    • 将不同源文件中的数据加入到单个数据表中 (即,使数据非规范化)。

    • 按议员类型筛选已加入到单独的表中的表。

    • 将生成的数据写入单独的 Apache Parquet 文件以供日后分析。

将生成的数据写入单独的 Apache Parquet 文件以供日后分析。将生成的数据写入单独的 Apache Parquet 文件以供日后分析。有关更多信息,请参阅 查看开发终端节点属性

第 1 步:在 Amazon S3 存储桶中爬网数据

  1. 登录到Amazon Web Services Management Console,然后打开AmazonGlue 控制台https://console.aws.amazon.com/glue/

  2. 按中的步骤操作在 Amazon Glue 控制台上使用爬网程序,创建可爬网s3://awsglue-datasets/examples/us-legislators/all数据集添加到名为legislators中的AmazonGlue 数据目录。示例数据已位于此 Amazon S3 存储桶中。

  3. 运行新爬网程序,然后检查 legislators 数据库。

    该爬网程序将创建以下元数据表:

    • persons_json

    • memberships_json

    • organizations_json

    • events_json

    • areas_json

    • countries_r_json

    这是一个包含议员及其历史记录的半规范化表集合。

第 2 步:将样板脚本添加到开发终结点笔记本中。

将以下样板文件脚本粘贴到开发终端节点笔记本中以导入Amazon您需要的 Glue 库,并设置单个GlueContext

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())

第 3 步:从数据目录中的数据检查方案

接下来,您可以轻松地从AmazonGlue 数据目录,并检查数据的模式。例如,要查看 persons_json 表的架构,请在您的笔记本中添加以下内容:

persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json") print "Count: ", persons.count() persons.printSchema()

下面是来自打印调用的输出:

Count: 1961 root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string

表中的每个人都是某个美国国会机构的成员。

要查看 memberships_json 表的架构,请键入以下内容:

memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json") print "Count: ", memberships.count() memberships.printSchema()

您可以在一个 (扩展) 代码行中执行所有这些操作:

Count: 10439 root |-- area_id: string |-- on_behalf_of_id: string |-- organization_id: string |-- role: string |-- person_id: string |-- legislative_period_id: string |-- start_date: string |-- end_date: string

organizations 是美国国会的党派和两大议院,即参议院和众议院。要查看 organizations_json 表的架构,请键入以下内容:

orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json") print "Count: ", orgs.count() orgs.printSchema()

您可以在一个 (扩展) 代码行中执行所有这些操作:

Count: 13 root |-- classification: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- name: string |-- seats: int |-- type: string

第 4 步:筛选数据

接下来,仅保留您需要的字段,然后将 id 重命名为 org_id。该数据集足够小,方便您完整查看。

toDF()DynamicFrame 转换为 Apache Spark DataFrame,因此您可以应用已存在于 Apache Spark SQL 中的转换:

orgs = orgs.drop_fields(['other_names', 'identifiers']).rename_field( 'id', 'org_id').rename_field( 'name', 'org_name') orgs.toDF().show()

下面显示了输出:

+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+ |classification| org_id| org_name| links|seats| type| image| +--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+ | party| party/al| AL| null| null| null| null| | party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...| | party|party/democrat-li...| Democrat-Liberal|[[website,http://...| null| null| null| | legislature|d56acebe-8fdc-47b...|House of Represen...| null| 435|lower house| null| | party| party/independent| Independent| null| null| null| null| | party|party/new_progres...| New Progressive|[[website,http://...| null| null|https://upload.wi...| | party|party/popular_dem...| Popular Democrat|[[website,http://...| null| null| null| | party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...| | party|party/republican-...|Republican-Conser...|[[website,http://...| null| null| null| | party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...| | party| party/independent| Independent| null| null| null| null| | party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...| | legislature|8fa6c3d2-71dc-478...| Senate| null| 100|upper house| null| +--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+

键入以下内容以查看显示在 memberships 中的 organizations

memberships.select_fields(['organization_id']).toDF().distinct().show()

下面显示了输出:

+--------------------+ | organization_id| +--------------------+ |d56acebe-8fdc-47b...| |8fa6c3d2-71dc-478...| +--------------------+

第 5 步:整合内容

现在,使用AmazonGlue 合到这些关系表并创建一个完整的历史记录表的议员memberships及其相应的organizations

  1. 首先,联接 idperson_id 上的 personsmemberships

  2. 接下来,将结果与 org_idorganization_id 上的 orgs 联接。

  3. 然后,删除多余的字段 person_idorg_id

您可以在一个 (扩展) 代码行中执行所有这些操作:

l_history = Join.apply(orgs, Join.apply(persons, memberships, 'id', 'person_id'), 'org_id', 'organization_id').drop_fields(['person_id', 'org_id']) print "Count: ", l_history.count() l_history.printSchema()

您可以在一个 (扩展) 代码行中执行所有这些操作:

Count: 10439 root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- death_date: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- family_name: string |-- id: string |-- start_date: string |-- end_date: string

您现在有了可用于分析的最终表。您可以采用紧凑且高效的格式写入该表以供分析 (即 Parquet),该格式可让您在AmazonGlue,Amazon Athena,或 Amazon Redshift Spectrum。

以下调用将跨多个文件写入该表以在日后执行分析时支持快速并行读取:

glueContext.write_dynamic_frame.from_options(frame = l_history, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/legislator_history"}, format = "parquet")

要将所有历史记录数据放入单个文件,您必须将其转换为数据帧,为其重新分区然后写入它:

s_history = l_history.toDF().repartition(1) s_history.write.parquet('s3://glue-sample-target/output-dir/legislator_single')

或者,如果您希望按参议院和众议院分隔该数据:

l_history.toDF().write.parquet('s3://glue-sample-target/output-dir/legislator_part', partitionBy=['org_name'])

第 6 步:将数据写入到关系数据库

Amazon利用 Glue,您可以轻松将数据写入到关系数据库 (如 Amazon Redshift),甚至对半结构化数据也是如此。它提供了一种转换 relationalize,这将展平 DynamicFrames,无论帧中的对象的复杂度可能如何。

使用本示例中的 l_history DynamicFrame,传入根表的名称 (hist_root) 和 relationalize 的临时工作路径。这将返回 DynamicFrameCollection。您随后可以列出该集合中 DynamicFrames 的名称:

dfc = l_history.relationalize("hist_root", "s3://glue-sample-target/temp-dir/") dfc.keys()

以下是 keys 调用的输出:

[u'hist_root', u'hist_root_contact_details', u'hist_root_links', u'hist_root_other_names', u'hist_root_images', u'hist_root_identifiers']

Relationalize 将该历史记录表拆分为 6 个新表:1 个包含 DynamicFrame 中每个对象的记录的根表以及 5 个用于数组的辅助表。关系数据库中的数组处理通常不够理想,尤其是在这些数组变大时。将这些数组分成不同的表会使查询进展得快得多。

接下来,通过检查 contact_details 来查看分隔:

l_history.select_fields('contact_details').printSchema() dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()

以下是 show 调用的输出:

root |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 10| 0| fax| | | 10| 1| | 202-225-1314| | 10| 2| phone| | | 10| 3| | 202-225-3772| | 10| 4| twitter| | | 10| 5| | MikeRossUpdates| | 75| 0| fax| | | 75| 1| | 202-225-7856| | 75| 2| phone| | | 75| 3| | 202-225-2711| | 75| 4| twitter| | | 75| 5| | SenCapito| +---+-----+------------------------+-------------------------+

contact_details 字段是原始 DynamicFrame 中的一个结构数组。这些数组中的每个元素都是辅助表中的单独的行,通过 index 编制索引。此处的 id 是具有键 contact_detailshist_root 表的外键:

dfc.select('hist_root').toDF().where( "contact_details = 10 or contact_details = 75").select( ['id', 'given_name', 'family_name', 'contact_details']).show()

下面是输出:

+--------------------+----------+-----------+---------------+ | id|given_name|family_name|contact_details| +--------------------+----------+-----------+---------------+ |f4fc30ee-7b42-432...| Mike| Ross| 10| |e3c60f34-7d1b-4c0...| Shelley| Capito| 75| +--------------------+----------+-----------+---------------+

请注意,这些命令中先后使用了 toDF()where 表示式来筛选您要查看的行。

因此,将 hist_root 表与辅助表联接可让您执行以下操作:

  • 将数据加载到不支持数组的数据库中。

  • 使用 SQL 查询数组中的每个单独的项目。

您已经有一个名为 redshift3 的连接设置。有关如何创建您自己的连接的信息,请参阅在 中定义连接Amazon Glue数据目录

接下来,通过遍历DynamicFrames一次运行一条:

for df_name in dfc.keys(): m_df = dfc.select(df_name) print "Writing to Redshift table: ", df_name glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df, catalog_connection = "redshift3", connection_options = {"dbtable": df_name, "database": "testdb"}, redshift_tmp_dir = "s3://glue-sample-target/temp-dir/")

dbtable 属性是 JDBC 表的名称。对于在数据库中支持架构的 JDBC 数据存储,指定 schema.table-name。如果未提供架构,则使用默认的“public”架构。

有关更多信息,请参阅 中的 ETL 的连接类型和选项Amazon连接词

下面是表在 Amazon Redshift 中的形式。(您已通过 psql 连接到 Amazon Redshift。)

testdb=# \d List of relations schema | name | type | owner --------+---------------------------+-------+----------- public | hist_root | table | test_user public | hist_root_contact_details | table | test_user public | hist_root_identifiers | table | test_user public | hist_root_images | table | test_user public | hist_root_links | table | test_user public | hist_root_other_names | table | test_user (6 rows) testdb=# \d hist_root_contact_details Table "public.hist_root_contact_details" Column | Type | Modifiers ---------------------------+--------------------------+----------- id | bigint | index | integer | contact_details.val.type | character varying(65535) | contact_details.val.value | character varying(65535) | testdb=# \d hist_root Table "public.hist_root" Column | Type | Modifiers -----------------------+--------------------------+----------- role | character varying(65535) | seats | integer | org_name | character varying(65535) | links | bigint | type | character varying(65535) | sort_name | character varying(65535) | area_id | character varying(65535) | images | bigint | on_behalf_of_id | character varying(65535) | other_names | bigint | birth_date | character varying(65535) | name | character varying(65535) | organization_id | character varying(65535) | gender | character varying(65535) | classification | character varying(65535) | legislative_period_id | character varying(65535) | identifiers | bigint | given_name | character varying(65535) | image | character varying(65535) | family_name | character varying(65535) | id | character varying(65535) | death_date | character varying(65535) | start_date | character varying(65535) | contact_details | bigint | end_date | character varying(65535) |

现在,您可以在 Amazon Redshift 中使用 SQL 查询这些表:

testdb=# select * from hist_root_contact_details where id = 10 or id = 75 order by id, index;

下面显示了结果:

id | index | contact_details.val.type | contact_details.val.value ---+-------+--------------------------+--------------------------- 10 | 0 | fax | 202-224-6020 10 | 1 | phone | 202-224-3744 10 | 2 | twitter | ChuckGrassley 75 | 0 | fax | 202-224-4680 75 | 1 | phone | 202-224-4642 75 | 2 | twitter | SenJackReed (6 rows)

Conclusion

总体而言,AmazonGlue 附非常灵活。它让您只需几行代码即可完成通常需要编写几天才能实现的功能。您可以在 Python 文件中找到完整的源到目标 ETL 脚本。join_and_relationalize.py中的AmazonGlue 样本(位于 GitHub 上)。