AWS Glue 中的 ETL 的连接类型和选项 - AWS Glue
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

AWS Glue 中的 ETL 的连接类型和选项

在 AWS Glue 中,各种 PySpark 和 Scala 方法和转换使用connectionType参数。它们使用 connectionOptionsoptions 参数指定连接选项。

connectionType 参数可以采用下表中显示的值。下面几个部分介绍每种类型所关联的 connectionOptions(或 options)参数值。除非另有说明,否则这些参数会在连接用作源或接收器时使用。

有关展示如何设置和使用连接选项的代码示例,请参阅示例:设置连接类型和选项

connectionType 连接到
自定义。 * 火花、Athena 或 JDBC 数据存储(请参阅自定义和 AWS Marketplace connectionType 值
documentdb Amazon DocumentDB(与 MongoDB 兼容)database
dynamodb Amazon DynamoDBdatabase
Kafka Kafka或者Amazon Managed Streaming for Apache Kafka
kinesis Amazon Kinesis Data Streams
Marketplace。 * 火花、Athena 或 JDBC 数据存储(请参阅自定义和 AWS Marketplace connectionType 值
mongodb MongoDB 数据库
mysql MySQL 数据库(请参阅JDBC connectionType 值
oracle Oracle 数据库(请参阅JDBC connectionType 值
orc 存储在 Amazon Simple Storage Service (Amazon S3) 中的文件Apache 配置单元优化行列 (ORC)文件格式
parquet 存储在 Amazon S3 中的文件Apache Parquet文件格式
postgresql PostgreSQL 数据库(请参阅JDBC connectionType 值
redshift Amazon Redshift数据库(请参阅JDBC connectionType 值
s3 Amazon S3
sqlserver Microsoft SQL Server 数据库(请参阅JDBC connectionType 值

"connectionType": "documentdb"

指定与 Amazon DocumentDB 的连接(与 MongoDB 兼容)。

源连接和接收器连接的连接选项不同。

"connectionType": "documentdb" as Source

"connectionType": "documentdb" 用作源时可使用以下连接选项:

  • "uri":(必需)要从中读取数据的 Amazon DocumentDB 主机,格式为mongodb://<host>:<port>

  • "database":(必需)要从中读取数据的 Amazon DocumentDB 数据库。

  • "collection":(必需)要从中读取数据的 Amazon DocumentDB 集合。

  • "username":(必填)Amazon DocumentDB 用户名。

  • "password":(必填)Amazon DocumentDB 密码。

  • "ssl":(如果使用 SSL,则必需)如果您的连接使用 SSL,则必须将此选项包含在"true"

  • "ssl.domain_match":(如果使用 SSL,则必需)如果您的连接使用 SSL,则必须将此选项包含在"false"

  • "batchSize":(可选):每个批处理返回的文档数量,在内部批处理的游标中使用。

  • "partitioner":(可选):用于从 Amazon DocumentDB 中读取输入数据的分区器的类名称。该连接器提供以下分区器:

    • MongoDefaultPartitioner (默认值)

    • MongoSamplePartitioner

    • MongoShardedPartitioner

    • MongoSplitVectorPartitioner

    • MongoPaginateByCountPartitioner

    • MongoPaginateBySizePartitioner

  • "partitionerOptions"(可选):指定分区器的选项。各个分区器支持的选项如下:

    • MongoSamplePartitioner: partitionKey, partitionSizeMB, samplesPerPartition

    • MongoShardedPartitioner: shardkey

    • MongoSplitVectorPartitionerpartitionKey、partitionSizeMB

    • MongoPaginateByCountPartitioner: partitionKey, numberOfPartitions

    • MongoPaginateBySizePartitionerpartitionKey、partitionSizeMB

    有关这些选项的更多信息,请参阅 MongoDB 文档中的分区器配置。有关示例代码,请参阅 示例:设置连接类型和选项

"connectionType": "documentdb" as Sink

"connectionType": "documentdb" 用作连接器时可使用以下连接选项:

  • "uri":(必需)要在其中写入数据的 Amazon DocumentDB 主机,格式为mongodb://<host>:<port>

  • "database":(必需)要在其中写入数据的 Amazon DocumentDB 数据库。

  • "collection":(必需)要在其中写入数据的 Amazon DocumentDB 集合。

  • "username":(必填)Amazon DocumentDB 用户名。

  • "password":(必填)Amazon DocumentDB 密码。

  • "extendedBsonTypes":(可选)如果true,允许在 Amazon DocumentDB 中写入数据时使用扩展 BSON 类型。默认为 true

  • "replaceDocument":(可选)如果为 true,则在保存包含 _id 字段的数据集时会替换整个文档。如果为 false,则只会更新文档中与数据集中的字段匹配的字段。默认为 true

  • "maxBatchSize":(可选):保存数据时的批量操作的最大批次大小。默认值为 512。

有关示例代码,请参阅 示例:设置连接类型和选项

"connectionType": "dynamodb"

指定与 Amazon DynamoDB 的连接。

源连接和接收器连接的连接选项不同。

“connectionType”: “dynamodb” as Source

"connectionType": "dynamodb" 用作源时可使用以下连接选项:

  • "dynamodb.input.tableName":(必需)要从中读取数据的 DynamoDB 表格。

  • "dynamodb.throughput.read.percent":(可选)要使用的读取容量单位 (RCU) 的百分比。默认设置为“0.5”。可接受的值从“0.1”到“1.5”,包含这两个值。

    • 0.5表示默认读取速率,这意味着 AWS Glue 将尝试占用表的一半读取容量。如果您增加上面的值0.5,AWS Glue 会增加请求速率;降低低于0.5减少读取请求速率。(实际读取速率会有所不同,这取决于 DynamoDB 表中是否存在统一分配的键等因素。)

    • 当 DynamoDB 表处于按需模式时,AWS Glue 会将表的读取容量处理为 40000。要导出大型表,我们建议您将 DynamoDB 表切换为按需模式。

  • "dynamodb.splits":(可选)定义在读取时将此 DynamoDB 表分成多少个部分。默认设置为“1”。可接受的值从“1”到“1,000,000”,包含这两个值。

    • 1表示没有并行性。我们强烈建议您使用以下公式指定更大的值以获得更好的性能。

    • 我们建议您计算numSlots使用以下公式,并将其用作dynamodb.splits。如果您需要更高的性能,我们建议您通过增加 DPU 数量来扩展工作。

      • numExecutors =

        • (DPU - 1) * 2 - 1ifWorkerTypeStandard

        • (NumberOfWorkers - 1)ifWorkerTypeG.1X或者G.2X

      • numSlotsPerExecutor =

        • 4ifWorkerTypeStandard

        • 8ifWorkerTypeG.1X

        • 16ifWorkerTypeG.2X

      • numSlots = numSlotsPerExecutor * numExecutors

  • "dynamodb.sts.roleArn":(可选)要担任跨账户访问的 IAM 角色 ARN。此参数在 AWS Glue 1.0 或更高版本中可用。

  • "dynamodb.sts.roleSessionName":(可选)STS 会话名称。默认设置为 “粘合动态-读取 STS 会话”。此参数在 AWS Glue 1.0 或更高版本中可用。

注意

AWS Glue 支持从其他 AWS 账户的 DynamoDB 表读取数据。有关更多信息,请参阅跨账户跨区域访问 DynamoDB 表

注意

DynamoDB 读取器不支持筛选条件或按下谓词。

“connectionType”: “dynamodb” as Sink

"connectionType": "dynamodb" 用作连接器时可使用以下连接选项:

  • "dynamodb.output.tableName":(必需)要写入的 DynamoDB 表。

  • "dynamodb.throughput.write.percent":(可选)要使用的写入容量单位 (WCU) 的百分比。默认设置为“0.5”。可接受的值从“0.1”到“1.5”,包含这两个值。

    • 0.5表示默认写入速率,这意味着 AWS Glue 将尝试占用表的一半的写入容量。如果您将值增加到 0.5 以上,AWS Glue 会增加请求速率;将值降低到 0.5 以下会降低写入请求速率。(实际写入速率会有所不同,具体取决于 DynamoDB 表中是否存在统一分配的键等因素。)

    • 当 DynamoDB 表处于按需模式时,AWS Glue 会将表的写入容量作为40000。要导入大型表,我们建议您将 DynamoDB 表切换为按需模式。

  • "dynamodb.output.numParallelTasks":(可选)定义同时写入 DynamoDB 的并行任务数量。用于计算每个 Spark 任务的允许 WCU。如果您不想控制这些详细信息,则无需指定此参数。

    • permissiveWcuPerTask = TableWCU * dynamodb.throughput.write.percent / dynamodb.output.numParallelTasks

    • 如果未指定该参数,则每个 Spark 的允许 WCU 任务将按以下公式自动计算:

      • numPartitions = dynamicframe.getNumPartitions()

      • numExecutors =

        • (DPU - 1) * 2 - 1ifWorkerTypeStandard

        • (NumberOfWorkers - 1)ifWorkerTypeG.1X或者G.2X

      • numSlotsPerExecutor =

        • 4ifWorkerTypeStandard

        • 8ifWorkerTypeG.1X

        • 16ifWorkerTypeG.2X

      • numSlots = numSlotsPerExecutor * numExecutors

      • numParallelTasks = min(numPartitions, numSlots)

    • 示例 1. DPU=10,工作人员类型 = 标准。输入 DynamicFrame 具有 100 个 RDD 分区。

      • numPartitions = 100

      • numExecutors = (10 - 1) * 2 - 1 = 17

      • numSlots = 4 * 17 = 68

      • numParallelTasks = min(100, 68) = 68

    • 示例 2. DPU=10,工作人员类型 = 标准。输入 DynamicFrame 具有 20 个 RDD 分区。

      • numPartitions = 20

      • numExecutors = (10 - 1) * 2 - 1 = 17

      • numSlots = 4 * 17 = 68

      • numParallelTasks = min(20, 68) = 20

  • "dynamodb.output.retry":(可选)定义当存在ProvisionedThroughputExceededExceptionDynamoDB。默认设置为 “10”。

  • "dynamodb.sts.roleArn":(可选)要担任跨账户访问的 IAM 角色 ARN。

  • "dynamodb.sts.roleSessionName":(可选)STS 会话名称。默认设置为 “粘合动态-写入-STS 会话”。

注意

AWS Glue 1.0 或更高版本支持 DynamoDB 编写器。

注意

AWS Glue 支持将数据写入其他 AWS 账户的 DynamoDB 表。有关更多信息,请参阅跨账户跨区域访问 DynamoDB 表

以下代码示例显示如何从 DynamoDB 表中读取和写入数据。他们演示从一个表读取和写入另一个表。

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.input.tableName": "test_source", "dynamodb.throughput.read.percent": "1.0", "dynamodb.splits": "100" } ) print(dyf.getNumPartitions()) glue_context.write_dynamic_frame_from_options( frame=dyf, connection_type="dynamodb", connection_options={ "dynamodb.output.tableName": "test_sink", "dynamodb.throughput.write.percent": "1.0" } ) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.input.tableName" -> "test_source", "dynamodb.throughput.read.percent" -> "1.0", "dynamodb.splits" -> "100" )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) val dynamoDbSink: DynamoDbDataSink = glueContext.getSinkWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.output.tableName" -> "test_sink", "dynamodb.throughput.write.percent" -> "1.0" )) ).asInstanceOf[DynamoDbDataSink] dynamoDbSink.writeDynamicFrame(dynamicFrame) Job.commit() } }

“connectionType”: “kafka”

指定与 Kafka 集群或 Apache Kafka 集群的 Amazon Managed Streaming。

您可以在GlueContext对象使用来自 Kafka 流式处理源的记录:

  • getCatalogSource

  • getSource

  • getSourceWithFormat

如果您使用getCatalogSource,则作业具有数据目录数据库和表名称信息,并且可以使用它们获取一些基本参数来从 Apache Kafka 流读取。如果您使用getSource,您必须显式指定以下参数:

您可以使用connectionOptions替换为GetSourceoptions替换为getSourceWithFormat,或者additionalOptions替换为getCatalogSource

"connectionType": "kafka" 可使用以下连接选项:

  • bootstrap.servers(必需)引导服务器 URL 的列表,例如,b-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094。此选项必须在 API 调用中指定,或在数据目录的表元数据中定义。

  • security.protocol(必需)一个布尔值,指示是在 Apache Kafka 连接上打开还是关闭 SSL。默认值是 “true”。此选项必须在 API 调用中指定,或在数据目录的表元数据中定义。

  • topicName(必填)在 Apache Kafka 中指定的主题名称。您必须至少指定一个"topicName""assign"或者"subscribePattern"

  • "assign":(必需)特定的TopicPartitions以使用。您必须至少指定一个"topicName""assign"或者"subscribePattern"

  • "subscribePattern":(必需)标识要订阅的主题列表的 Java 正则表达式字符串。您必须至少指定一个"topicName""assign"或者"subscribePattern"

  • classification (可选)

  • delimiter (可选)

  • "startingOffsets":(可选)Kafka 主题中要从中读取数据的起始位置。可能的值为 "earliest""latest"。默认值为 "latest"

  • "endingOffsets":(可选)批处理查询结束时的终点。可能的值包括:"latest"或一个 JSON 字符串,该字符串指定每个TopicPartition

    对于 JSON 字符串,格式为{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}。该值-1作为偏移表示"latest"

  • "pollTimeoutMs":(可选)Spark 作业执行程序中轮询来自 Kafka 的数据的超时时间(以毫秒为单位)。默认值为 512

  • "numRetries":(可选)无法获取 Kafka 偏移量之前重试的次数。默认值为 3

  • "retryIntervalMs":(可选)重试获取 Kafka 偏移之前等待的时间(以毫秒为单位)。默认值为 10

  • "maxOffsetsPerTrigger":(可选)每个触发器间隔处理的最大偏移数的速率限制。将按比例分割指定的总偏移数topicPartitions的不同卷。默认值为 null,这意味着使用者读取所有偏移量,直到已知的最新偏移量。

  • "minPartitions":(可选)要从 Kafka 中读取的最小分区数。默认值为 null,这意味着火花分区的数量等于 Kafka 分区的数量。

“connectionType”: “动态”

指定与 Amazon Kinesis Data Streams 的连接。

您可以在GlueContext对象使用来自 Kinesis 流式处理源的记录:

  • getCatalogSource

  • getSource

  • getSourceWithFormat

如果您使用getCatalogSource,则该作业具有数据目录数据库和表名信息,并且可以使用它们获取从 Kinesis 流源读取的一些基本参数。如果您使用getSource,您必须显式指定以下参数:

您可以使用connectionOptions替换为GetSourceoptions替换为getSourceWithFormat,或者additionalOptions替换为getCatalogSource

"connectionType": "kinesis" 可使用以下连接选项:

  • endpointUrl(必需)Kinesis 流式处理源的 URL。此选项必须在 API 调用中指定,或在数据目录的表元数据中定义。

  • streamName(必需)以下格式的数据流标识符:account-id:StreamName:streamCreationTimestamp。 此选项必须在 API 调用中指定,或在数据目录的表元数据中定义。

  • classification (可选)

  • delimiter (可选)

  • "startingPosition":(可选)Kinesis 数据流中要从中读取数据的起始位置。可能的值包括:"latest""trim_horizon",或者"earliest"。默认值为 "latest"

  • "maxFetchTimeInMs":(可选)作业执行程序从每个分片的 Kinesis 数据流中获取记录所花费的最长时间,以毫秒 (ms) 为单位。默认值为 1000

  • "maxFetchRecordsPerShard":(可选)Kinesis 数据流中每个分片要获取的最大记录数。默认值为 100000

  • "maxRecordPerRead":(可选)从每个 Kinesis 数据流中获取的最大记录数getRecordsoperation. 默认值为 10000

  • "describeShardInterval":(可选)两个之间的最短时间间隔ListShardsAPI 调用您的脚本以考虑重新分片。有关更多信息,请参阅 。重新分片策略Amazon Kinesis Data Streams 开发人员指南。默认值为 1s

  • "numRetries":(可选)Kinesis 数据流 API 请求的最大重试次数。默认值为 3

  • "retryIntervalMs":(可选)重试 Kinesis Data Streams API 调用之前的冷却时间段(以毫秒为单位指定)。默认值为 1000

  • "maxRetryIntervalMs":(可选)Kinesis Data Streams API 调用的两次重试之间的最大冷却时间段(以毫秒为单位指定)。默认值为 10000

  • "avoidEmptyBatches":(可选)通过在批处理开始之前检查 Kinesis 数据流中是否有未读数据,避免创建空微批处理作业。默认值为 "False"

"connectionType": "mongodb"

指定与 MongoDB 的连接。源连接和接收器连接的连接选项不同。

"connectionType": "mongodb" as Source

"connectionType": "mongodb" 用作源时可使用以下连接选项:

  • "uri":(必需)要从中读取数据的 MongoDB 主机,格式为 mongodb://<host>:<port>

  • "database":(必需)要从中读取数据的 MongoDB 数据库。也可以将此选项传递给additional_options调用glue_context.create_dynamic_frame_from_catalog在您的作业脚本中。

  • "collection":(必需)要从中读取数据的 MongoDB 集合。也可以将此选项传递给additional_options调用glue_context.create_dynamic_frame_from_catalog在您的作业脚本中。

  • "username":(必需)MongoDB 用户名。

  • "password":(必需)MongoDB 密码。

  • "ssl":(可选)如果为 true,则启动 SSL 连接。默认为 false

  • "ssl.domain_match":(可选)如果为 true,且 ssltrue,则执行域匹配检查。默认为 true

  • "batchSize":(可选):每个批处理返回的文档数量,在内部批处理的游标中使用。

  • "partitioner":(可选):用于从 MongoDB 中读取输入数据的分区器的类名称。该连接器提供以下分区器:

    • MongoDefaultPartitioner (默认值)

    • MongoSamplePartitioner (需要 MongoDB 3.2 或更高版本)

    • MongoShardedPartitioner

    • MongoSplitVectorPartitioner

    • MongoPaginateByCountPartitioner

    • MongoPaginateBySizePartitioner

  • "partitionerOptions"(可选):指定分区器的选项。各个分区器支持的选项如下:

    • MongoSamplePartitioner: partitionKey, partitionSizeMB, samplesPerPartition

    • MongoShardedPartitioner: shardkey

    • MongoSplitVectorPartitioner: partitionKey, partitionSizeMB

    • MongoPaginateByCountPartitioner: partitionKey, numberOfPartitions

    • MongoPaginateBySizePartitioner: partitionKey, partitionSizeMB

    有关这些选项的更多信息,请参阅 MongoDB 文档中的分区器配置。有关示例代码,请参阅 示例:设置连接类型和选项

"connectionType": "mongodb" as Sink

"connectionType": "mongodb" 用作连接器时可使用以下连接选项:

  • "uri":(必需)要在其中写入数据的 MongoDB 主机,格式为 mongodb://<host>:<port>

  • "database":(必需)要在其中写入数据的 MongoDB 数据库。

  • "collection":(必需)要在其中写入数据的 MongoDB 集合。

  • "username":(必需)MongoDB 用户名。

  • "password":(必需)MongoDB 密码。

  • "ssl":(可选)如果为 true,则启动 SSL 连接。默认为 false

  • "ssl.domain_match":(可选)如果为 true,且 ssltrue,则执行域匹配检查。默认为 true

  • "extendedBsonTypes":(可选)如果true,允许在 MongoDB 中写入数据时使用扩展 BSON 类型。默认为 true

  • "replaceDocument":(可选)如果为 true,则在保存包含 _id 字段的数据集时会替换整个文档。如果为 false,则只会更新文档中与数据集中的字段匹配的字段。默认为 true

  • "maxBatchSize":(可选):保存数据时的批量操作的最大批次大小。默认值为 512。

有关示例代码,请参阅 示例:设置连接类型和选项

"connectionType": "orc"

指定与 Amazon S3 中存储的文件的连接,这些文件位于Apache 配置单元优化行列 (ORC)文件格式。

"connectionType": "orc" 可使用以下连接选项:

  • paths:(必需)要从中读取数据的 Amazon S3 路径的列表。

  • (其他选项名称/值对):任何其他选项(包括格式化选项)将直接传递到 SparkSQLDataSource。有关更多信息,请参阅 Spark 的 Redshift 数据源

"connectionType": "parquet"

指定与 Amazon S3 中存储的文件的连接,这些文件位于Apache Parquet文件格式。

"connectionType": "parquet" 可使用以下连接选项:

  • paths:(必需)要从中读取数据的 Amazon S3 路径的列表。

  • (其他选项名称/值对):任何其他选项(包括格式化选项)将直接传递到 SparkSQLDataSource。有关更多信息,请参阅 GitHub 网站上的 Spark 的 Amazon Redshift 数据源

"connectionType": "s3"

指定与 Amazon S3 的连接。

"connectionType": "s3" 可使用以下连接选项:

  • "paths":(必需)要从中读取数据的 Amazon S3 路径的列表。

  • "exclusions":(可选)包含要排除的 Unix 样式 glob 模式的 JSON 列表的字符串。例如,"[\"**.pdf\"]" 会排除所有 PDF 文件。有关 AWS Glue 支持的 glob 语法的更多信息,请参阅包含和排除模式

  • "compressionType":或”compression“:(可选)指定数据压缩方式。使用"compressionType",以获取 Amazon S3 源代码和"compression",供 Amazon S3 目标使用。通常,如果数据有标准文件扩展名,则不需要指定。可能的值为 "gzip""bzip"

  • "groupFiles":(可选)当输入包含超过 50,000 个文件时,默认情况下,文件分组处于启用状态。要打开文件少于 50,000 个,请将此参数设置为"inPartition"。当超过 50,000 个文件时,若要禁用分组,请将此参数设置为 "none"

  • "groupSize":(可选)目标组大小(以字节为单位)。默认值根据输入数据大小和群集大小进行计算。当少于 50,000 个输入文件时,"groupFiles" 必须设置为 "inPartition",此选项才能生效。

  • "recurse":(可选)如果设置为 true,则以递归方式读取指定路径下的所有子目录中的文件。

  • "maxBand":(可选,高级)此选项控制持续时间(以毫秒为单位)。s3列表可能是一致的。修改时间戳位于最后一个maxBand以毫秒为单位的时间,会特别跟踪使用JobBookmarks以考虑 Amazon S3 最终一致性。大多数用户不需要设置此选项。默认值为 900000 毫秒或 15 分钟。

  • "maxFilesInBand":(可选,高级)此选项指定在最后 maxBand 秒内可保存的最大文件数量。如果超过此值,额外的文件将会跳过,且只能在下一次作业运行中处理。大多数用户不需要设置此选项。

  • "isFailFast":(可选)此选项确定 AWS Glue ETL 作业是否引发读者解析异常。如果设置为true,如果 Spark 任务的四次重试无法正确解析数据,则作业会快速失败。

JDBC connectionType 值

这些功能包括:

  • "connectionType": "sqlserver":指定与 Microsoft SQL Server 数据库的连接。

  • "connectionType": "mysql":指定与 MySQL 数据库的连接。

  • "connectionType": "oracle":指定与 Oracle 数据库的连接。

  • "connectionType": "postgresql":指定与 PostgreSQL 数据库的连接。

  • "connectionType": "redshift":指定与 Amazon Redshift 数据库的连接。

下表列出 AWS Glue 支持的 JDBC 驱动程序版本。

产品 JDBC 驱动程序版本
Microsoft SQL Server 6.x
MySQL 5.1
Oracle Database 11.2
PostgreSQL 42.x
Amazon Redshift 4.1

将这些连接选项与 JDBC 连接结合使用:

  • "url":(必填)数据库的 JDBC URL。

  • "dbtable":要从中进行读取的数据库表。对于在数据库中支持架构的 JDBC 数据存储,指定 schema.table-name。如果未提供架构,则使用默认的“public”架构。

  • "redshiftTmpDir":(对于 Amazon Redshift 为必填项,对于其他 JDBC 类型为可选项)当从数据库中复制数据时,Amazon S3 路径可用于暂存临时数据。

  • "user":(必需)在连接时使用的用户名。

  • "password":(必填)在连接时使用的密码。

  • (可选)以下选项允许您提供自定义 JDBC 驱动程序。如果必须使用 AWS Glue 本身不支持的驱动程序,请使用这些选项。ETL 作业可以为数据源和目标使用不同的 JDBC 驱动程序版本,即使源和目标是相同的数据库产品也是如此。这允许您在不同版本的源数据库和目标数据库之间迁移数据。要使用这些选项,您必须首先将 JDBC 驱动程序的 jar 文件上传到 Amazon S3。

    • "customJdbcDriverS3Path":自定义 JDBC 驱动程序的 Amazon S3 路径。

    • "customJdbcDriverClassName":JDBC 驱动程序的类名。

用于 JDBC 连接的连接选项中包含的所有其他选项名称/值对(包括格式化选项)将直接传递到底层 SparkSQL DataSource。有关更多信息,请参阅 Spark 的 Redshift 数据源

以下代码示例演示了如何使用自定义 JDBC 驱动程序读取和写入 JDBC 数据库。这些示例演示了如何从一个版本的数据库产品读取和写入同一产品的更高版本。

Python
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext, SparkConf from awsglue.context import GlueContext from awsglue.job import Job import time from pyspark.sql.types import StructType, StructField, IntegerType, StringType sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session # Construct JDBC connection options connection_mysql5_options = { "url": "jdbc:mysql://<jdbc-host-name>:3306/db", "dbtable": "test", "user": "admin", "password": "pwd"} connection_mysql8_options = { "url": "jdbc:mysql://<jdbc-host-name>:3306/db", "dbtable": "test", "user": "admin", "password": "pwd", "customJdbcDriverS3Path": "s3://path/mysql-connector-java-8.0.17.jar", "customJdbcDriverClassName": "com.mysql.cj.jdbc.Driver"} connection_oracle11_options = { "url": "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL", "dbtable": "test", "user": "admin", "password": "pwd"} connection_oracle18_options = { "url": "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL", "dbtable": "test", "user": "admin", "password": "pwd", "customJdbcDriverS3Path": "s3://path/ojdbc10.jar", "customJdbcDriverClassName": "oracle.jdbc.OracleDriver"} # Read from JDBC databases with custom driver df_mysql8 = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql8_options) # Read DynamicFrame from MySQL 5 and write to MySQL 8 df_mysql5 = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql5_options) glueContext.write_from_options(frame_or_dfc=df_mysql5, connection_type="mysql", connection_options=connection_mysql8_options) # Read DynamicFrame from Oracle 11 and write to Oracle 18 df_oracle11 = glueContext.create_dynamic_frame.from_options(connection_type="oracle", connection_options=connection_oracle11_options) glueContext.write_from_options(frame_or_dfc=df_oracle11, connection_type="oracle", connection_options=connection_oracle18_options)
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamicFrame import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { val MYSQL_5_URI: String = "jdbc:mysql://<jdbc-host-name>:3306/db" val MYSQL_8_URI: String = "jdbc:mysql://<jdbc-host-name>:3306/db" val ORACLE_11_URI: String = "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL" val ORACLE_18_URI: String = "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL" // Construct JDBC connection options lazy val mysql5JsonOption = jsonOptions(MYSQL_5_URI) lazy val mysql8JsonOption = customJDBCDriverJsonOptions(MYSQL_8_URI, "s3://path/mysql-connector-java-8.0.17.jar", "com.mysql.cj.jdbc.Driver") lazy val oracle11JsonOption = jsonOptions(ORACLE_11_URI) lazy val oracle18JsonOption = customJDBCDriverJsonOptions(ORACLE_18_URI, "s3://path/ojdbc10.jar", "oracle.jdbc.OracleDriver") def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Read from JDBC database with custom driver val df_mysql8: DynamicFrame = glueContext.getSource("mysql", mysql8JsonOption).getDynamicFrame() // Read DynamicFrame from MySQL 5 and write to MySQL 8 val df_mysql5: DynamicFrame = glueContext.getSource("mysql", mysql5JsonOption).getDynamicFrame() glueContext.getSink("mysql", mysql8JsonOption).writeDynamicFrame(df_mysql5) // Read DynamicFrame from Oracle 11 and write to Oracle 18 val df_oracle11: DynamicFrame = glueContext.getSource("oracle", oracle11JsonOption).getDynamicFrame() glueContext.getSink("oracle", oracle18JsonOption).writeDynamicFrame(df_oracle11) Job.commit() } private def jsonOptions(uri: String): JsonOptions = { new JsonOptions( s"""{"url": "${url}", |"dbtable":"test", |"user": "admin", |"password": "pwd"}""".stripMargin) } private def customJDBCDriverJsonOptions(uri: String, customJdbcDriverS3Path: String, customJdbcDriverClassName: String): JsonOptions = { new JsonOptions( s"""{"url": "${url}", |"dbtable":"test", |"user": "admin", |"password": "pwd", |"customJdbcDriverS3Path": "${customJdbcDriverS3Path}", |"customJdbcDriverClassName" : "${customJdbcDriverClassName}"}""".stripMargin) } }

自定义和 AWS Marketplace connectionType 值

这些功能包括:

  • "connectionType": "marketplace.athena":指定与 Amazon Athena 数据存储的连接。此连接使用 AWS Marketplace。

  • "connectionType": "marketplace.spark":指定与 Apache Spark 数据存储的连接。此连接使用 AWS Marketplace。

  • "connectionType": "marketplace.jdbc":指定与 JDBC 数据存储的连接。此连接使用 AWS Marketplace。

  • "connectionType": "custom.athena":指定与 Amazon Athena 数据存储的连接。连接使用您上传到 AWS Glue Studio 的自定义连接器。

  • "connectionType": "custom.spark":指定与 Apache Spark 数据存储的连接。连接使用您上传到 AWS Glue Studio 的自定义连接器。

  • "connectionType": "custom.jdbc":指定与 JDBC 数据存储的连接。连接使用您上传到 AWS Glue Studio 的自定义连接器。

用于类型自定义 .jdbc 或商城 .jdbc 的连接选项

  • className— 字符串,必填,驱动程序类名称。

  • connectionName— 与连接器关联的连接的字符串(必需)名称。

  • url— 带占位符的 JDBC URL 的字符串,必填字符串(${}),它们用于建立到数据源的连接。占位符${secretKey}在 AWS Secret Manager 中替换为同名的密钥。有关构建 URL 的详细信息,请参阅数据存储文档。

  • secretId或者user/password— 字符串(必需),用于检索 URL 的凭据。

  • dbTable或者query— 字符串,必需的表或 SQL 查询从中获取数据。您可以指定 dbTablequery,但不能同时指定两者。

  • partitionColumn— 字符串,可选,用于分区的整数列的名称。此选项仅在包含在lowerBoundupperBound, 和numPartitions。此选项的工作方式与 Spark SQL JDBC 阅读器中的工作方式相同。有关更多信息,请参阅 。JDBC 到其他数据库中的Apache Spark SQL、DataFrames 和 DataSatam 指南

    这些区域有:lowerBoundupperBound值用于确定分区步长,而不是用于过滤表中的行。对表中的所有行进行分区并返回。

    注意

    使用查询而不是表名时,应验证查询是否适用于指定的分区条件。例如:

    • 如果您的查询格式为"SELECT col1 FROM table1",然后通过附加一个WHERE子句,使用分区列的查询结尾。

    • 如果您的查询格式为”SELECT col1 FROM table1 WHERE col2=val",然后通过扩展WHERE子句,将使用AND和使用分区列的表达式。

  • lowerBound— 整数,可选,partitionColumn,用于决定分区步长。

  • upperBound— 整数,可选,最大值partitionColumn,用于决定分区步长。

  • numPartitions— 整数,可选,分区数量。此值以及lowerBound(含) 和upperBound(独占),生成的表单分区步长WHERE子句表达式,用于拆分partitionColumn

    重要

    请注意分区的数量,因为分区过多可能会导致外部数据库系统出现问题。

  • filterPredicate— 用于过滤源数据的字符串、可选的额外条件子句。例如:

    BillingCity='Mountain View'

    使用query而不是table名称,则应验证查询是否与指定的filterPredicate。例如:

    • 如果您的查询格式为"SELECT col1 FROM table1",然后通过附加一个WHERE子句在使用筛选器谓词的查询结尾处。

    • 如果您的查询格式为"SELECT col1 FROM table1 WHERE col2=val",然后通过扩展WHERE子句,将使用AND和使用过滤器谓词的表达式。

  • dataTypeMapping— 字典、可选的自定义数据类型映射,用于从JDBC数据类型设置为连接词数据类型。例如,选项"dataTypeMapping":{"FLOAT":"STRING"}映射 JDBC 类型的数据字段FLOATJava 中的 “Clue”String类型,方法是调用ResultSet.getString()方法,并使用它来构建Glue 记录。这些区域有:ResultSet对象由每个驱动程序实现,因此行为特定于您使用的驱动程序。请参阅 JDBC 驱动程序的文档,了解驱动程序如何执行转换。

  • 目前支持的 AWS Glue 数据类型包括:

    • DATE

    • STRING

    • TIMESTAMP

    • INT

    • FLOAT

    • LONG

    • 大十进制

    • BYTE

    • SHORT

    • DOUBLE

    支持的 JDBC 数据类型为Java 8 java.平方英尺/类型

    默认数据类型映射(从 JDBC 到 AWS Glue)为:

    • 日期-> 日期

    • VARCHAR-> 字符串

    • CHAR-> 字符串

    • 龙王-> 字符串

    • 时间戳-> 时间戳

    • 整数-> 整数

    • 浮点-> 浮点

    • 真实-> 浮点

    • 位-> 布尔值

    • 布尔值-> 布尔值

    • 比金特-> 长

    • 十进制-> 大十进制

    • 数字-> 大十进制

    • 淡色-> 短

    • 小林-> 短

    • 双-> 双

    如果使用带有选项的自定义数据类型映射dataTypeMapping,则可以覆盖默认数据类型映射。只有 JDBC 数据类型在dataTypeMapping选项受到影响;默认映射用于所有其他 JDBC 数据类型。如果需要,您可以为其他 JDBC 数据类型添加映射。如果 JDBC 数据类型未包含在默认映射或自定义映射中,则该数据类型将转换为 AWS GlueSTRING数据类型。

以下 Python 代码示例说明了如何使用 AWS Marketplace JDBC 驱动程序从 JDBC 数据库读取。它演示了从数据库读取和写入 S3 位置。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

类型自定义 .雅典娜或商城。雅典娜的连接选项

  • className— 字符串,必填,驱动程序类名称。当您使用 Athena-CloudWatch 连接器时,此参数值是类名称的前缀(例如,"com.amazonaws.athena.connectors")。Athena-CloudWatch 连接器由两个类组成:元数据处理程序和记录处理程序。如果您在此处提供通用前缀,则 API 会根据该前缀加载正确的类。

  • tableName— 字符串,必需的,要读取的 CloudWatch 日志流的名称。此代码段使用特殊视图名称all_log_streams,这意味着返回的动态数据框将包含来自日志组中所有日志流的数据。

  • schemaName— 要从中读取的 CloudWatch 日志组的名称,必需字符串。例如:/aws-glue/jobs/output

  • connectionName— 与连接器关联的连接的字符串(必需)名称。

有关此连接器的其他选项,请参阅Amazon Athena CloudWatch 连接器自述文件GitHub 上的 “File”。

以下 Python 代码示例演示了如何使用 AWS Marketplace 连接器从 Athena 数据存储中读取。它演示了从 Athena 读取和写入 S3 位置。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

类型自定义 .spark 或商城 .spark 的连接选项

  • className— 字符串,必填,连接器类名称。

  • secretId— 字符串(可选),用于检索连接器连接的凭据。

  • connectionName— 与连接器关联的连接的字符串(必需)名称。

  • 其他选项取决于数据存储。例如,弹性搜索配置选项以前缀es,如所述弹性搜索阿帕奇 Hadoop文档中)。火花连接到雪花使用选项,如sfUsersfPassword,如中所述使用 Spark 接口中的连接到 Snowflake Snowflake指南。

以下 Python 代码示例演示了如何使用marketplace.spark连接。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://<AWS endpoint>", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<AWS endpoint>","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()