Amazon Glue for Spark 中适用于 ETL 的连接类型和选项 - Amazon Glue
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Amazon Glue for Spark 中适用于 ETL 的连接类型和选项

在 Amazon Glue for Spark 中,各种 PysPark 和 Scala 方法和转换使用 connectionType 参数指定连接类型。它们使用 connectionOptionsoptions 参数指定连接选项。

connectionType 参数可以采用下表中显示的值。下面几个部分介绍每种类型所关联的 connectionOptions(或 options)参数值。除非另有说明,否则这些参数会在连接用作源或接收器时使用。

有关展示如何设置和使用连接选项的代码示例,请参阅每种连接类型的主页。

connectionType 连接到
dynamodb Amazon DynamoDB 数据库
kinesis Amazon Kinesis Data Streams
S3 Amazon S3
documentdb Amazon DocumentDB (with MongoDB compatibility) 数据库
opensearch Amazon OpenSearch Service
redshift Amazon Redshift 数据库
kafka KafkaAmazon Managed Streaming for Apache Kafka
azurecosmos Azure Cosmos for NoSQL。
azuresql Azure SQL。
bigquery Google BigQuery。
mongodb MongoDB 数据库,包括 MongoDB Atlas。
sqlserver Microsoft SQL Server 数据库(请参阅JDBC 连接
mysql MySQL 数据库(请参阅JDBC 连接
oracle Oracle 数据库(请参阅JDBC 连接
postgresql PostgreSQL 数据库(请参阅JDBC 连接
saphana SAP HANA。
snowflake Snowflake 数据湖
teradata Teradata Vantage。
vertica Vertica。
custom.* Spark、Athena 或 JDBC 数据存储(请参阅自定义和 Amazon Web Services Marketplace connectionType 值
marketplace.* Spark、Athena 或 JDBC 数据存储(请参阅自定义和 Amazon Web Services Marketplace connectionType 值

自定义和 Amazon Web Services Marketplace connectionType 值

这些功能包括:

  • "connectionType": "marketplace.athena":指定与 Amazon Athena 数据存储的连接。连接使用来自 Amazon Web Services Marketplace 的连接器。

  • "connectionType": "marketplace.spark":指定与 Apache Spark 数据存储的连接。连接使用来自 Amazon Web Services Marketplace 的连接器。

  • "connectionType": "marketplace.jdbc":指定与 JDBC 数据存储的连接。连接使用来自 Amazon Web Services Marketplace 的连接器。

  • "connectionType": "custom.athena":指定与 Amazon Athena 数据存储的连接。连接使用您上传到 Amazon Glue Studio 的自定义连接器。

  • "connectionType": "custom.spark":指定与 Apache Spark 数据存储的连接。连接使用您上传到 Amazon Glue Studio 的自定义连接器。

  • "connectionType": "custom.jdbc":指定与 JDBC 数据存储的连接。连接使用您上传到 Amazon Glue Studio 的自定义连接器。

适用于类型 custom.jdbc 或 marketplace.jdbc 的连接选项

  • className – 字符串,必需,驱动程序类名称。

  • connectionName – 字符串,必需,与连接器关联的连接的名称。

  • url – 字符串,必需,用于建立与数据源的连接且带占位符(${})的 JDBC URL。占位符 ${secretKey} 替换为 Amazon Secrets Manager 中同名的密钥。有关构建 URL 的详细信息,请参阅数据存储文档。

  • secretIduser/password – 字符串,必需,用于检索 URL 的凭证。

  • dbTablequery – 字符串,必需,从中获取数据的表或 SQL 查询。您可以指定 dbTablequery,但不能同时指定两者。

  • partitionColumn – 字符串,可选,用于分区的整数列的名称。此选项仅在包含 lowerBoundupperBoundnumPartitions 时有效。此选项的工作方式与 Spark SQL JDBC 阅读器中的工作方式相同。有关更多信息,请参阅《Apache Spark SQL、DataFrame 和 Dataset 指南》中的 JDBC 转换到其他数据库

    lowerBoundupperBound 值用于确定分区步长,而不是用于筛选表中的行。对表中的所有行进行分区并返回。

    注意

    使用查询(而不是表名称)时,您应验证查询是否适用于指定的分区条件。例如:

    • 如果您的查询格式为 "SELECT col1 FROM table1",则在使用分区列的查询结尾附加 WHERE 子句,以测试查询。

    • 如果您的查询格式为 SELECT col1 FROM table1 WHERE col2=val",则通过 AND 和使用分区列的表达式扩展 WHERE 子句,以测试查询。

  • lowerBound – 整数,可选,用于确定分区步长的最小 partitionColumn 值。

  • upperBound – 整数,可选,用于确定分区步长的最大 partitionColumn 值。

  • numPartitions – 整数,可选,分区数。此值以及 lowerBound(包含)和 upperBound(排除)为用于拆分 partitionColumn 而生成的 WHERE 子句表达式构成分区步长。

    重要

    请注意分区的数量,因为分区过多可能会导致外部数据库系统出现问题。

  • filterPredicate – 字符串,可选,用于筛选源数据的额外条件子句。例如:

    BillingCity='Mountain View'

    使用查询(而不是名称)时,您应验证查询是否适用于指定的 filterPredicate。例如:

    • 如果您的查询格式为 "SELECT col1 FROM table1",则在使用筛选条件谓词的查询结尾附加 WHERE 子句,以测试查询。

    • 如果您的查询格式为 "SELECT col1 FROM table1 WHERE col2=val",则通过 AND 和使用筛选条件谓词的表达式扩展 WHERE 子句,以测试查询。

  • dataTypeMapping – 目录,可选,用于构建从 JDBC 数据类型到 Glue 数据类型的映射的自定义数据类型映射。例如,选项 "dataTypeMapping":{"FLOAT":"STRING"} 会通过调用驱动程序的 ResultSet.getString() 方法,将 JDBC 类型 FLOAT 的数据字段映射到 Java String 类型,并将其用于构建 Amazon Glue 记录。ResultSet 对象由每个驱动程序实现,因此行为特定于您使用的驱动程序。请参阅 JDBC 驱动程序的文档,了解驱动程序执行转换的方式。

  • 目前受支持的 Amazon Glue 数据类型包括:

    • DATE

    • string

    • TIMESTAMP

    • INT

    • FLOAT

    • LONG

    • BIGDECIMAL

    • BYTE

    • SHORT

    • DOUBLE

    支持的 JDBC 数据类型为 Java8 java.sql.types

    默认数据类型映射(从 JDBC 到 Amazon Glue)如下:

    • DATE -> DATE

    • VARCHAR -> STRING

    • CHAR -> STRING

    • LONGNVARCHAR -> STRING

    • TIMESTAMP -> TIMESTAMP

    • INTEGER -> INT

    • FLOAT -> FLOAT

    • REAL -> FLOAT

    • BIT -> BOOLEAN

    • BOOLEAN -> BOOLEAN

    • BIGINT -> LONG

    • DECIMAL -> BIGDECIMAL

    • NUMERIC -> BIGDECIMAL

    • TINYINT -> SHORT

    • SMALLINT -> SHORT

    • DOUBLE -> DOUBLE

    如果将自定义数据类型映射与选项 dataTypeMapping 结合使用,则可以覆盖默认数据类型映射。只有 dataTypeMapping 选项中列出的 JDBC 数据类型会受到影响;默认映射适用于所有其他 JDBC 数据类型。如果需要,您可以为其他 JDBC 数据类型添加映射。如果默认映射或自定义映射中均未包含 JDBC 数据类型,则数据类型默认转换为 Amazon Glue STRING 数据类型。

以下 Python 代码示例演示了如何使用 Amazon Web Services Marketplace JDBC 驱动程序从 JDBC 数据库读取数据。它演示了如何从数据库读取数据并将数据写入 S3 位置。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

适用于类型 custom.athena 或 marketplace.athena 的连接选项

  • className – 字符串,必需,驱动程序类名称。当您使用 Athena-CloudWatch 连接器时,此参数值是类名称(例如 "com.amazonaws.athena.connectors")的前缀。Athena-CloudWatch 连接器由两个类组成:元数据处理程序和记录处理程序。如果您在此处提供通用前缀,则 API 会根据该前缀加载正确的类。

  • tableName – 字符串,必需,要读取的 CloudWatch 日志流的名称。此代码段使用特别视图名称 all_log_streams,这意味着返回的动态数据框将包含日志组中所有日志流的数据。

  • schemaName – 字符串,必需,要从中读取数据的 CloudWatch 日志流的名称。例如,/aws-glue/jobs/output

  • connectionName – 字符串,必需,与连接器关联的连接的名称。

有关此连接器的其他选项,请参阅 GitHub 上的 Amazon Athena CloudWatch 连接器自述文件。

以下 Python 代码示例演示了如何从使用 Amazon Web Services Marketplace 连接器的 Athena 数据存储读取数据。它演示了如何从 Athena 读取数据并将数据写入 S3 位置。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

适用于类型 custom.spark 或 marketplace.spark 的连接选项

  • className – 字符串,必需,连接器类名称。

  • secretId – 字符串,可选,用于检索连接器连接的凭证。

  • connectionName – 字符串,必需,与连接器关联的连接的名称。

  • 其他选项取决于数据存储。例如,OpenSearch 配置选项以前缀 es 开头,正如适用于 Apache Hadoop 的 Elasticsearch 文档中所述。Spark 与 Snowflake 的连接使用 sfUsersfPassword 等连接,正如《连接 Snowflake》指南中的使用 Spark 连接器所述。

以下 Python 代码示例演示了如何从使用 marketplace.spark 连接的 OpenSearch 数据存储读取数据。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://<Amazon endpoint>", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<Amazon endpoint>","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

常规选项

本节中的选项以 connection_options 形式提供,但不是专门适用于一个连接器。

配置书签时通常使用以下参数。它们可能适用于 Amazon S3 或 JDBC 工作流程。有关更多信息,请参阅 使用作业书签

  • jobBookmarkKeys - 列名称的数组。

  • jobBookmarkKeysSortOrder - 定义如何根据排序顺序比较值的字符串。有效值:"asc""desc"

  • useS3ListImplementation - 用于在列出 Amazon S3 存储桶内容时管理内存性能。有关更多信息,请参阅 Optimize memory management in Amazon Glue