代码示例 连接和关系数据 - AWS Glue
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

如果我们为英文版本指南提供翻译,那么如果存在任何冲突,将以英文版本指南为准。在提供翻译时使用机器翻译。

代码示例 连接和关系数据

此示例使用从 http://everypolitician.org/sample-dataset 桶 Amazon Simple Storage Service (Amazon S3): s3://awsglue-datasets/examples/us-legislators/all该数据集包含有关美国议员及其在美国众议院和参议院中占有的席位的数据(JSON 格式),并且已针对本教程进行了轻微修改且在公共 Amazon S3 存储桶中提供。

您可以在 join_and_relationalize.py 文件中的 AWSGLUE样品库 在Github网站上。

利用此数据,本教程将介绍如何执行以下操作:

  • 使用 AWS Glue 抓取器以分类存储在公众内的对象 Amazon S3 把他们的原理图保存在 AWS Glue 数据目录.

  • 检查生成自爬网的表元数据和架构。

  • 编写 Python 提取、转移和加载 (ETL) 脚本,该脚本使用 数据目录 中的元数据执行以下操作:

    • 将不同源文件中的数据加入到单个数据表中 (即,使数据非规范化)。

    • 按议员类型筛选已加入到单独的表中的表。

    • 将生成的数据写入单独的 Apache Parquet 文件以供日后分析。

将生成的数据写入单独的 Apache Parquet 文件以供日后分析。将生成的数据写入单独的 Apache Parquet 文件以供日后分析。有关更多信息,请参阅 查看开发终端节点属性.

步骤 1. 将数据爬到 Amazon S3 桶

  1. 登录 AWS 管理控制台,并打开 AWS Glue 控制台 https://console.amazonaws.cn/glue/.

  2. 遵循步骤 在 AWS Glue 控制台上使用爬网程序,创建一个可以爬行的 s3://awsglue-datasets/examples/us-legislators/all 数据库中名为 legislators 在 AWS Glue 数据目录. 示例数据已位于此公共 Amazon S3 存储桶中。

  3. 运行新爬网程序,然后检查 legislators 数据库。

    该爬网程序将创建以下元数据表:

    • persons_json

    • memberships_json

    • organizations_json

    • events_json

    • areas_json

    • countries_r_json

    这是一个包含议员及其历史记录的半规范化表集合。

步骤 2. 向开发端点笔记本添加样板脚本

将以下样板文件脚本粘贴到开发终端节点笔记本中以导入所需的 AWS Glue 库,然后设置单个 :GlueContext:

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())

步骤 3 检查数据中的数据 数据目录

接下来,您可以轻松地创建检查自动化镜框的 AWS Glue 数据目录,并检查数据的框架。例如,要查看 persons_json 表的架构,请在您的笔记本中添加以下内容:

persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json") print "Count: ", persons.count() persons.printSchema()

下面是来自打印调用的输出:

Count: 1961 root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string

表中的每个人都是某个美国国会机构的成员。

要查看 memberships_json 表的架构,请键入以下内容:

memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json") print "Count: ", memberships.count() memberships.printSchema()

输出如下所示。

Count: 10439 root |-- area_id: string |-- on_behalf_of_id: string |-- organization_id: string |-- role: string |-- person_id: string |-- legislative_period_id: string |-- start_date: string |-- end_date: string

TheThethe organizations 是双方和两个会议室、参议院和代表家。要查看 organizations_json 表的架构,请键入以下内容:

orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json") print "Count: ", orgs.count() orgs.printSchema()

输出如下所示。

Count: 13 root |-- classification: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- name: string |-- seats: int |-- type: string

步骤 4. 筛选数据

接下来,只保留您想要的字段,然后重命名 idorg_id...数据集足够小,您可以查看整个内容。

TheThethe toDF() 转换A DynamicFrame 阿帕克火花 DataFrame,您可以应用ApacheSparkSQL中已存在的转换:

orgs = orgs.drop_fields(['other_names', 'identifiers']).rename_field( 'id', 'org_id').rename_field( 'name', 'org_name') orgs.toDF().show()

下面显示了输出:

+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+ |classification| org_id| org_name| links|seats| type| image| +--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+ | party| party/al| AL| null| null| null| null| | party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...| | party|party/democrat-li...| Democrat-Liberal|[[website,http://...| null| null| null| | legislature|d56acebe-8fdc-47b...|House of Represen...| null| 435|lower house| null| | party| party/independent| Independent| null| null| null| null| | party|party/new_progres...| New Progressive|[[website,http://...| null| null|https://upload.wi...| | party|party/popular_dem...| Popular Democrat|[[website,http://...| null| null| null| | party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...| | party|party/republican-...|Republican-Conser...|[[website,http://...| null| null| null| | party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...| | party| party/independent| Independent| null| null| null| null| | party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...| | legislature|8fa6c3d2-71dc-478...| Senate| null| 100|upper house| null| +--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+

键入以下内容以查看显示在 organizations 中的 :memberships:

memberships.select_fields(['organization_id']).toDF().distinct().show()

下面显示了输出:

+--------------------+ | organization_id| +--------------------+ |d56acebe-8fdc-47b...| |8fa6c3d2-71dc-478...| +--------------------+

步骤 5. 一起把它整合起来

现在,使用 AWS Glue 加入这些关系表并创建一个完整的立法表历史表 memberships 及其相应的 organizations.

  1. 首先,加入 personsmembershipsidperson_id.

  2. 接下来,将结果加入到 orgsorg_idorganization_id.

  3. 然后,删除多余的字段 person_id 和 。org_id.

您可以在一个 (扩展) 代码行中执行所有这些操作:

l_history = Join.apply(orgs, Join.apply(persons, memberships, 'id', 'person_id'), 'org_id', 'organization_id').drop_fields(['person_id', 'org_id']) print "Count: ", l_history.count() l_history.printSchema()

输出如下所示。

Count: 10439 root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- death_date: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- family_name: string |-- id: string |-- start_date: string |-- end_date: string

您现在有了可用于分析的最终表。您可以以紧凑、高效的格式写入分析—比如帕丁—您可以在 AWS Glue,亚马逊 Athena,或 Amazon Redshift Spectrum.

以下调用将跨多个文件写入该表以在日后执行分析时支持快速并行读取:

glueContext.write_dynamic_frame.from_options(frame = l_history, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/legislator_history"}, format = "parquet")

要将所有历史记录数据放入单个文件,您必须将其转换为数据帧,为其重新分区然后写入它:

s_history = l_history.toDF().repartition(1) s_history.write.parquet('s3://glue-sample-target/output-dir/legislator_single')

或者,如果您希望按参议院和众议院分隔该数据:

l_history.toDF().write.parquet('s3://glue-sample-target/output-dir/legislator_part', partitionBy=['org_name'])

步骤 6. 将数据写入关系数据库

AWS Glue 使数据可以很容易地将数据写入 Amazon Redshift,即使有半结构化数据。它提供了一个转换 relationalize,这些平台平衡 DynamicFrames 无论帧中的对象多复杂,都可能是。

使用 l_history DynamicFrame 在本示例中,通过根表的名称(hist_root)和临时工作路径 relationalize...这将返回 DynamicFrameCollection...然后,您可以列出 DynamicFrames 在该系列中:

dfc = l_history.relationalize("hist_root", "s3://glue-sample-target/temp-dir/") dfc.keys()

以下是 keys 调用的输出:

[u'hist_root', u'hist_root_contact_details', u'hist_root_links', u'hist_root_other_names', u'hist_root_images', u'hist_root_identifiers']

Relationalize 将历史记录表分成六个新表格:一个根表格,其中包含每个对象的记录 DynamicFrame,以及阵列的辅助表。关系数据库中的数组处理通常不够理想,尤其是在这些数组变大时。将这些数组分成不同的表会使查询进展得快得多。

接下来,通过检查 来查看分隔:contact_details:

l_history.select_fields('contact_details').printSchema() dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()

以下是 show 调用的输出:

root |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 10| 0| fax| | | 10| 1| | 202-225-1314| | 10| 2| phone| | | 10| 3| | 202-225-3772| | 10| 4| twitter| | | 10| 5| | MikeRossUpdates| | 75| 0| fax| | | 75| 1| | 202-225-7856| | 75| 2| phone| | | 75| 3| | 202-225-2711| | 75| 4| twitter| | | 75| 5| | SenCapito| +---+-----+------------------------+-------------------------+

TheThethe contact_details 字段是原始的结构阵列 DynamicFrame...这些阵列的每个元素都是辅助表中的一个单独行位,该列为 index...TheThethe id 以下是一个外键 hist_root 表格中的 contact_details:

dfc.select('hist_root').toDF().where( "contact_details = 10 or contact_details = 75").select( ['id', 'given_name', 'family_name', 'contact_details']).show()

下面是输出:

+--------------------+----------+-----------+---------------+ | id|given_name|family_name|contact_details| +--------------------+----------+-----------+---------------+ |f4fc30ee-7b42-432...| Mike| Ross| 10| |e3c60f34-7d1b-4c0...| Shelley| Capito| 75| +--------------------+----------+-----------+---------------+

在这些命令中注意到 toDF() 然后 where 表达式用于筛选要查看的行。

因此,将 hist_root 表与辅助表联接可让您执行以下操作:

  • 将数据加载到不支持数组的数据库中。

  • 使用 SQL 查询数组中的每个单独的项目。

您已经设置了一个名为 redshift3...有关如何创建自己连接的信息,请参阅 在 AWS Glue Data Catalog 中定义连接.

接下来,将此收集书写入 Amazon Redshift 通过循环通过 DynamicFrames 一次:

for df_name in dfc.keys(): m_df = dfc.select(df_name) print "Writing to Redshift table: ", df_name glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df, catalog_connection = "redshift3", connection_options = {"dbtable": df_name, "database": "testdb"}, redshift_tmp_dir = "s3://glue-sample-target/temp-dir/")

TheThethe dbtable 属性是JDBC表的名称。对于支持数据库内支持架构的JDBC数据存储区,请指定 schema.table-name...如果未提供架构,则使用默认“公共”架构。

有关更多信息,请参阅 Connection Types and Options for ETL in AWS Glue.

下面是表在 中的形式。Amazon Redshift. (您连接到 Amazon Redshift 通过PSQL。)

testdb=# \d List of relations schema | name | type | owner --------+---------------------------+-------+----------- public | hist_root | table | test_user public | hist_root_contact_details | table | test_user public | hist_root_identifiers | table | test_user public | hist_root_images | table | test_user public | hist_root_links | table | test_user public | hist_root_other_names | table | test_user (6 rows) testdb=# \d hist_root_contact_details Table "public.hist_root_contact_details" Column | Type | Modifiers ---------------------------+--------------------------+----------- id | bigint | index | integer | contact_details.val.type | character varying(65535) | contact_details.val.value | character varying(65535) | testdb=# \d hist_root Table "public.hist_root" Column | Type | Modifiers -----------------------+--------------------------+----------- role | character varying(65535) | seats | integer | org_name | character varying(65535) | links | bigint | type | character varying(65535) | sort_name | character varying(65535) | area_id | character varying(65535) | images | bigint | on_behalf_of_id | character varying(65535) | other_names | bigint | birth_date | character varying(65535) | name | character varying(65535) | organization_id | character varying(65535) | gender | character varying(65535) | classification | character varying(65535) | legislative_period_id | character varying(65535) | identifiers | bigint | given_name | character varying(65535) | image | character varying(65535) | family_name | character varying(65535) | id | character varying(65535) | death_date | character varying(65535) | start_date | character varying(65535) | contact_details | bigint | end_date | character varying(65535) |

现在,您可以在 中使用 SQL 查询这些表:Amazon Redshift:

testdb=# select * from hist_root_contact_details where id = 10 or id = 75 order by id, index;

下面显示了结果:

id | index | contact_details.val.type | contact_details.val.value ---+-------+--------------------------+--------------------------- 10 | 0 | fax | 202-224-6020 10 | 1 | phone | 202-224-3744 10 | 2 | twitter | ChuckGrassley 75 | 0 | fax | 202-224-4680 75 | 1 | phone | 202-224-4642 75 | 2 | twitter | SenJackReed (6 rows)

Conclusion

总的来说,AWS Glue 非常灵活。它让您只需几行代码即可完成通常需要编写几天才能实现的功能。您可以在Python文件中找到整个源到目标ETL脚本 join_and_relationalize.pyAWS胶粘样品 在Github上。