本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
Amazon Glue for Spark 中适用于 ETL 的连接类型和选项
在 Amazon Glue for Spark 中,各种 PysPark 和 Scala 方法和转换使用 connectionType
参数指定连接类型。它们使用 connectionOptions
或 options
参数指定连接选项。
connectionType
参数可以采用下表中显示的值。下面几个部分介绍每种类型所关联的 connectionOptions
(或 options
)参数值。除非另有说明,否则这些参数会在连接用作源或接收器时使用。
有关展示如何设置和使用连接选项的代码示例,请参阅每种连接类型的主页。
connectionType |
连接到 |
---|---|
dynamodb | Amazon DynamoDB 数据库 |
kinesis | Amazon Kinesis Data Streams |
S3 | Amazon S3 |
documentdb | Amazon DocumentDB (with MongoDB compatibility) 数据库 |
opensearch | Amazon OpenSearch Service。 |
redshift | Amazon Redshift |
kafka | Kafka |
azurecosmos | Azure Cosmos for NoSQL。 |
azuresql | Azure SQL。 |
bigquery | Google BigQuery。 |
mongodb | MongoDB |
sqlserver | Microsoft SQL Server 数据库(请参阅JDBC 连接) |
mysql | MySQL |
oracle | Oracle |
postgresql | PostgreSQL |
saphana | SAP HANA。 |
snowflake | Snowflake |
teradata | Teradata Vantage。 |
vertica | Vertica。 |
custom.* | Spark、Athena 或 JDBC 数据存储(请参阅自定义和 Amazon Web Services Marketplace connectionType 值) |
marketplace.* | Spark、Athena 或 JDBC 数据存储(请参阅自定义和 Amazon Web Services Marketplace connectionType 值) |
自定义和 Amazon Web Services Marketplace connectionType 值
这些功能包括:
-
"connectionType": "marketplace.athena"
:指定与 Amazon Athena 数据存储的连接。连接使用来自 Amazon Web Services Marketplace 的连接器。 -
"connectionType": "marketplace.spark"
:指定与 Apache Spark 数据存储的连接。连接使用来自 Amazon Web Services Marketplace 的连接器。 -
"connectionType": "marketplace.jdbc"
:指定与 JDBC 数据存储的连接。连接使用来自 Amazon Web Services Marketplace 的连接器。 -
"connectionType": "custom.athena"
:指定与 Amazon Athena 数据存储的连接。连接使用您上传到 Amazon Glue Studio 的自定义连接器。 -
"connectionType": "custom.spark"
:指定与 Apache Spark 数据存储的连接。连接使用您上传到 Amazon Glue Studio 的自定义连接器。 -
"connectionType": "custom.jdbc"
:指定与 JDBC 数据存储的连接。连接使用您上传到 Amazon Glue Studio 的自定义连接器。
适用于类型 custom.jdbc 或 marketplace.jdbc 的连接选项
-
className
– 字符串,必需,驱动程序类名称。 -
connectionName
– 字符串,必需,与连接器关联的连接的名称。 -
url
– 字符串,必需,用于建立与数据源的连接且带占位符(${}
)的 JDBC URL。占位符${secretKey}
替换为 Amazon Secrets Manager 中同名的密钥。有关构建 URL 的详细信息,请参阅数据存储文档。 -
secretId
或user/password
– 字符串,必需,用于检索 URL 的凭证。 -
dbTable
或query
– 字符串,必需,从中获取数据的表或 SQL 查询。您可以指定dbTable
或query
,但不能同时指定两者。 -
partitionColumn
– 字符串,可选,用于分区的整数列的名称。此选项仅在包含lowerBound
、upperBound
和numPartitions
时有效。此选项的工作方式与 Spark SQL JDBC 阅读器中的工作方式相同。有关更多信息,请参阅《Apache Spark SQL、DataFrame 和 Dataset 指南》中的 JDBC 转换到其他数据库。 lowerBound
和upperBound
值用于确定分区步长,而不是用于筛选表中的行。对表中的所有行进行分区并返回。注意
使用查询(而不是表名称)时,您应验证查询是否适用于指定的分区条件。例如:
-
如果您的查询格式为
"SELECT col1 FROM table1"
,则在使用分区列的查询结尾附加WHERE
子句,以测试查询。 -
如果您的查询格式为
SELECT col1 FROM table1 WHERE col2=val"
,则通过AND
和使用分区列的表达式扩展WHERE
子句,以测试查询。
-
-
lowerBound
– 整数,可选,用于确定分区步长的最小partitionColumn
值。 -
upperBound
– 整数,可选,用于确定分区步长的最大partitionColumn
值。 -
numPartitions
– 整数,可选,分区数。此值以及lowerBound
(包含)和upperBound
(排除)为用于拆分partitionColumn
而生成的WHERE
子句表达式构成分区步长。重要
请注意分区的数量,因为分区过多可能会导致外部数据库系统出现问题。
-
filterPredicate
– 字符串,可选,用于筛选源数据的额外条件子句。例如:BillingCity='Mountain View'
使用查询(而不是表名称)时,您应验证查询是否适用于指定的
filterPredicate
。例如:-
如果您的查询格式为
"SELECT col1 FROM table1"
,则在使用筛选条件谓词的查询结尾附加WHERE
子句,以测试查询。 -
如果您的查询格式为
"SELECT col1 FROM table1 WHERE col2=val"
,则通过AND
和使用筛选条件谓词的表达式扩展WHERE
子句,以测试查询。
-
-
dataTypeMapping
– 目录,可选,用于构建从 JDBC 数据类型到 Glue 数据类型的映射的自定义数据类型映射。例如,选项"dataTypeMapping":{"FLOAT":"STRING"}
会通过调用驱动程序的ResultSet.getString()
方法,将 JDBC 类型FLOAT
的数据字段映射到 JavaString
类型,并将其用于构建 Amazon Glue 记录。ResultSet
对象由每个驱动程序实现,因此行为特定于您使用的驱动程序。请参阅 JDBC 驱动程序的文档,了解驱动程序执行转换的方式。 -
目前受支持的 Amazon Glue 数据类型包括:
-
DATE
-
string
-
TIMESTAMP
-
INT
-
FLOAT
-
LONG
-
BIGDECIMAL
-
BYTE
-
SHORT
-
DOUBLE
支持的 JDBC 数据类型为 Java8 java.sql.types
。 默认数据类型映射(从 JDBC 到 Amazon Glue)如下:
-
DATE -> DATE
-
VARCHAR -> STRING
-
CHAR -> STRING
-
LONGNVARCHAR -> STRING
-
TIMESTAMP -> TIMESTAMP
-
INTEGER -> INT
-
FLOAT -> FLOAT
-
REAL -> FLOAT
-
BIT -> BOOLEAN
-
BOOLEAN -> BOOLEAN
-
BIGINT -> LONG
-
DECIMAL -> BIGDECIMAL
-
NUMERIC -> BIGDECIMAL
-
TINYINT -> SHORT
-
SMALLINT -> SHORT
-
DOUBLE -> DOUBLE
如果将自定义数据类型映射与选项
dataTypeMapping
结合使用,则可以覆盖默认数据类型映射。只有dataTypeMapping
选项中列出的 JDBC 数据类型会受到影响;默认映射适用于所有其他 JDBC 数据类型。如果需要,您可以为其他 JDBC 数据类型添加映射。如果默认映射或自定义映射中均未包含 JDBC 数据类型,则数据类型默认转换为 Amazon GlueSTRING
数据类型。 -
以下 Python 代码示例演示了如何使用 Amazon Web Services Marketplace JDBC 驱动程序从 JDBC 数据库读取数据。它演示了如何从数据库读取数据并将数据写入 S3 位置。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://
<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
适用于类型 custom.athena 或 marketplace.athena 的连接选项
-
className
– 字符串,必需,驱动程序类名称。当您使用 Athena-CloudWatch 连接器时,此参数值是类名称(例如"com.amazonaws.athena.connectors"
)的前缀。Athena-CloudWatch 连接器由两个类组成:元数据处理程序和记录处理程序。如果您在此处提供通用前缀,则 API 会根据该前缀加载正确的类。 -
tableName
– 字符串,必需,要读取的 CloudWatch 日志流的名称。此代码段使用特别视图名称all_log_streams
,这意味着返回的动态数据框将包含日志组中所有日志流的数据。 -
schemaName
– 字符串,必需,要从中读取数据的 CloudWatch 日志流的名称。例如,/aws-glue/jobs/output
。 -
connectionName
– 字符串,必需,与连接器关联的连接的名称。
有关此连接器的其他选项,请参阅 GitHub 上的 Amazon Athena CloudWatch 连接器自述
以下 Python 代码示例演示了如何从使用 Amazon Web Services Marketplace 连接器的 Athena 数据存储读取数据。它演示了如何从 Athena 读取数据并将数据写入 S3 位置。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://
<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
适用于类型 custom.spark 或 marketplace.spark 的连接选项
-
className
– 字符串,必需,连接器类名称。 -
secretId
– 字符串,可选,用于检索连接器连接的凭证。 -
connectionName
– 字符串,必需,与连接器关联的连接的名称。 -
其他选项取决于数据存储。例如,OpenSearch 配置选项以前缀
es
开头,正如适用于 Apache Hadoop 的 Elasticsearch文档中所述。Spark 与 Snowflake 的连接使用 sfUser
和sfPassword
等连接,正如《连接 Snowflake》指南中的使用 Spark 连接器所述。
以下 Python 代码示例演示了如何从使用 marketplace.spark
连接的 OpenSearch 数据存储读取数据。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://
<Amazon endpoint>
", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<Amazon endpoint>
","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
常规选项
本节中的选项以 connection_options
形式提供,但不是专门适用于一个连接器。
配置书签时通常使用以下参数。它们可能适用于 Amazon S3 或 JDBC 工作流程。有关更多信息,请参阅 使用作业书签。
jobBookmarkKeys
- 列名称的数组。jobBookmarkKeysSortOrder
- 定义如何根据排序顺序比较值的字符串。有效值:"asc"
、"desc"
。useS3ListImplementation
- 用于在列出 Amazon S3 存储桶内容时管理内存性能。有关更多信息,请参阅 Optimize memory management in Amazon Glue。