AWS Glue 中的 ETL 的连接类型和选项 - AWS Glue
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

AWS Glue 中的 ETL 的连接类型和选项

在 AWS Glue 中,各种 PySpark 和 Scala 方法和转换使用 connectionType 参数指定连接类型。它们使用 connectionOptionsoptions 参数指定连接选项。

connectionType 参数可以采用下表中显示的值。下面几个部分介绍每种类型所关联的 connectionOptions(或 options)参数值。除非另有说明,否则这些参数会在连接用作源或接收器时使用。

有关展示如何设置和使用连接选项的代码示例,请参阅示例:设置连接类型和选项

connectionType 连接到
自定义。* Spark、Athena 或 JDBC 数据存储(请参阅 自定义和AWS Marketplace connectionType 值
documentdb Amazon DocumentDB(与 MongoDB 兼容) 数据库
dynamodb Amazon DynamoDB 数据库
kafka KafkaAmazon Managed Streaming for Apache Kafka
kinesis Amazon Kinesis Data Streams
Marketplace。* Spark、Athena 或 JDBC 数据存储(请参阅 自定义和AWS Marketplace connectionType 值
mongodb MongoDB 数据库
mysql MySQL 数据库(请参阅JDBC connectionType 值
oracle Oracle 数据库(请参阅JDBC connectionType 值
orc 以 Amazon Simple Storage ServiceApache Hive 优化的行列式 (ORC)Amazon S3 文件格式存储在 ) 中的文件
parquet 以 Amazon S3Apache Parquet 文件格式存储在 中的文件
postgresql PostgreSQL 数据库(请参阅JDBC connectionType 值
redshift Amazon Redshift 数据库(请参阅JDBC connectionType 值
s3 Amazon S3
sqlserver Microsoft SQL Server 数据库(请参阅JDBC connectionType 值

"connectionType": "documentdb"

指定与 Amazon DocumentDB(与 MongoDB 兼容) 的连接。

源连接和接收器连接的连接选项不同。

"connectionType": "documentdb" as Source

"connectionType": "documentdb" 用作源时可使用以下连接选项:

  • "uri":(必需)要从中读取数据的 Amazon DocumentDB 主机,格式为 mongodb://<host>:<port>

  • "database":(必需)要从中读取数据的 Amazon DocumentDB 数据库。

  • "collection":(必需)要从中读取数据的 Amazon DocumentDB 集合。

  • "username":(必填)Amazon DocumentDB 用户名。

  • "password":(必需)Amazon DocumentDB 密码。

  • "ssl":(如果使用 SSL,则为必需)如果您的连接使用 SSL,则您必须包括具有 "true" 值的此选项。

  • "ssl.domain_match":(如果使用 SSL,则为必需)如果您的连接使用 SSL,则您必须包括值为 "false" 的此选项。

  • "batchSize":(可选):每个批处理返回的文档数量,在内部批处理的游标中使用。

  • "partitioner":(可选):用于从 Amazon DocumentDB 读取输入数据的分区器的类名称。该连接器提供以下分区器:

    • MongoDefaultPartitioner(默认值)

    • MongoSamplePartitioner

    • MongoShardedPartitioner

    • MongoSplitVectorPartitioner

    • MongoPaginateByCountPartitioner

    • MongoPaginateBySizePartitioner

  • "partitionerOptions" (可选):指定分区器的选项。各个分区器支持的选项如下:

    • MongoSamplePartitioner: partitionKey, partitionSizeMB, samplesPerPartition

    • MongoShardedPartitioner: shardkey

    • MongoSplitVectorPartitioner: partitionKey, partitionSizeMB

    • MongoPaginateByCountPartitioner: partitionKey, numberOfPartitions

    • MongoPaginateBySizePartitioner: partitionKey, partitionSizeMB

    有关这些选项的更多信息,请参阅 文档中的分区器配置MongoDB。有关示例代码,请参阅 示例:设置连接类型和选项

"connectionType": "documentdb" as Sink

"connectionType": "documentdb" 用作连接器时可使用以下连接选项:

  • "uri":(必需)要在其中写入数据的 Amazon DocumentDB 主机,格式为 mongodb://<host>:<port>

  • "database":(必需)要在其中写入数据的 Amazon DocumentDB 数据库。

  • "collection":(必需)要在其中写入数据的 Amazon DocumentDB 集合。

  • "username":(必填)Amazon DocumentDB 用户名。

  • "password":(必需)Amazon DocumentDB 密码。

  • "extendedBsonTypes":(可选)如果为 true,则在将数据写入到 Amazon DocumentDB 时启用扩展 BSON 类型。默认为 true

  • "replaceDocument":(可选)如果为 true,则在保存包含 _id 字段的数据集时会替换整个文档。如果为 false,则只会更新文档中与数据集中的字段匹配的字段。默认为 true

  • "maxBatchSize":(可选):保存数据时批量操作的最大批处理大小。默认值为 512。

有关示例代码,请参阅 示例:设置连接类型和选项

"connectionType": "dynamodb"

指定与 Amazon DynamoDB 的连接。

源连接和接收器连接的连接选项不同。

"connectionType":将 dynamodb" 作为源

"connectionType": "dynamodb" 用作源时可使用以下连接选项:

  • "dynamodb.input.tableName":(必需)要从中读取数据的 DynamoDB 表。

  • "dynamodb.throughput.read.percent":(可选)要使用的读取容量单位 (RCU) 的百分比。默认设置为“0.5”。可接受的值从“0.1”到“1.5”,包含这两个值。

    • 0.5 表示默认读取速率,这意味着 AWS Glue 将尝试占用表的一半读取容量。如果您将该值增加到 0.5 以上,AWS Glue 会增加请求速率;将值减少到 0.5 以下会降低读取请求速率。(实际读取速率将因 DynamoDB 表中是否存在统一密钥分配等因素而异。)

    • 当 DynamoDB 表处于按需模式时,AWS Glue 会将表的读取容量处理为 40000。要导出大型表,我们建议将 DynamoDB 表切换为按需模式。

  • "dynamodb.splits":(可选)定义在读取时将此 DynamoDB 表分成多少个部分。默认设置为“1”。可接受的值从“1”到“1,000,000”,包含这两个值。

    • 1 表示没有并行度。我们强烈建议您使用以下公式指定一个较大的值以获得更好的性能。

    • 我们建议您使用以下公式计算 numSlots,并将其用作 dynamodb.splits。 如果您需要更高的性能,我们建议您通过增加 DPUs 的数量来扩展作业。

      • numExecutors =

        • (DPU - 1) * 2 - 1 如果 WorkerTypeStandard

        • (NumberOfWorkers - 1) 如果 WorkerTypeG.1XG.2X

      • numSlotsPerExecutor =

        • 4 如果 WorkerTypeStandard

        • 8 如果 WorkerTypeG.1X

        • 16 如果 WorkerTypeG.2X

      • numSlots = numSlotsPerExecutor * numExecutors

  • "dynamodb.sts.roleArn":(可选)要为跨账户访问代入的 IAM 角色 ARN。此参数在 AWS Glue 1.0 或更高版本中可用。

  • "dynamodb.sts.roleSessionName":(可选) STS 会话名称。默认设置为“glue-dynamodb-read-sts-session”。此参数在 AWS Glue 1.0 或更高版本中可用。

注意

AWS Glue 支持从其他 AWS 账户的 DynamoDB 表读取数据。有关更多信息,请参阅对 DynamoDB 表的跨账户跨区域访问

注意

DynamoDB 读取器不支持筛选条件或按下谓词。

"connectionType":将 "dynamodb" 作为接收器

"connectionType": "dynamodb" 用作连接器时可使用以下连接选项:

  • "dynamodb.output.tableName":(必需)要向其写入数据的 DynamoDB 表。

  • "dynamodb.throughput.write.percent":(可选)要使用的写入容量单位 (WCU) 的百分比。默认设置为“0.5”。可接受的值从“0.1”到“1.5”,包含这两个值。

    • 0.5 表示默认写入速率,这意味着 AWS Glue 将尝试占用表的一半写入容量。如果您将该值增加到 0.5 以上,则 AWS Glue 会增加请求速率;将值减少到 0.5 以下会降低写入请求速率。(实际写入速率将因 DynamoDB 表中是否存在统一键分配等因素而异。)

    • 当 DynamoDB 表处于按需模式时,AWS Glue 会将表的写入容量处理为 40000。 要导入大型表,我们建议将 DynamoDB 表切换为按需模式。

  • "dynamodb.output.numParallelTasks":(可选)定义同时写入 DynamoDB 的并行任务的数目。用于计算每个 Spark 任务的允许 WCU。如果您不想控制这些详细信息,则不需要指定该参数。

    • permissiveWcuPerTask = TableWCU * dynamodb.throughput.write.percent / dynamodb.output.numParallelTasks

    • 如果未指定此参数,则每个 Spark 任务的允许 WCU 将使用以下公式自动计算:

      • numPartitions = dynamicframe.getNumPartitions()

      • numExecutors =

        • (DPU - 1) * 2 - 1 如果 WorkerTypeStandard

        • (NumberOfWorkers - 1) 如果 WorkerTypeG.1XG.2X

      • numSlotsPerExecutor =

        • 4 如果 WorkerTypeStandard

        • 8 如果 WorkerTypeG.1X

        • 16 如果 WorkerTypeG.2X

      • numSlots = numSlotsPerExecutor * numExecutors

      • numParallelTasks = min(numPartitions, numSlots)

    • 示例 1. DPU=10,WorkerType=标准。输入 DynamicFrame 具有 100 个 RDD 分区。

      • numPartitions = 100

      • numExecutors = (10 - 1) * 2 - 1 = 17

      • numSlots = 4 * 17 = 68

      • numParallelTasks = min(100, 68) = 68

    • 示例 2. DPU=10,WorkerType = 标准。输入 DynamicFrame 具有 20 个 RDD 分区。

      • numPartitions = 20

      • numExecutors = (10 - 1) * 2 - 1 = 17

      • numSlots = 4 * 17 = 68

      • numParallelTasks = min(20, 68) = 20

  • "dynamodb.output.retry":(可选)定义当 ProvisionedThroughputExceededException 存在 DynamoDB 时,我们执行的重试次数。默认设置为“10”。

  • "dynamodb.sts.roleArn":(可选)要为跨账户访问代入的 IAM 角色 ARN。

  • "dynamodb.sts.roleSessionName":(可选) STS 会话名称。默认设置为“glue-dynamodb-write-sts-session”。

注意

版本 1.0 或更高版本支持 DynamoDB 写入器。AWS Glue

注意

AWS Glue 支持将数据写入另一个 AWS 账户的 DynamoDB 表中。有关更多信息,请参阅对 DynamoDB 表的跨账户跨区域访问

以下代码示例演示如何在 DynamoDB 表中读取和写入。它们演示了从一个表读取和写入另一个表。

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.input.tableName": "test_source", "dynamodb.throughput.read.percent": "1.0", "dynamodb.splits": "100" } ) print(dyf.getNumPartitions()) glue_context.write_dynamic_frame_from_options( frame=dyf, connection_type="dynamodb", connection_options={ "dynamodb.output.tableName": "test_sink", "dynamodb.throughput.write.percent": "1.0" } ) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.input.tableName" -> "test_source", "dynamodb.throughput.read.percent" -> "1.0", "dynamodb.splits" -> "100" )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) val dynamoDbSink: DynamoDbDataSink = glueContext.getSinkWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.output.tableName" -> "test_sink", "dynamodb.throughput.write.percent" -> "1.0" )) ).asInstanceOf[DynamoDbDataSink] dynamoDbSink.writeDynamicFrame(dynamicFrame) Job.commit() } }

"connectionType": "kafka"

指定与 Kafka 集群或 Amazon Managed Streaming for Apache Kafka 集群的连接。

您可以使用 GlueContext 对象下的以下方法来使用 Kafka 流式传输源中的记录:

  • getCatalogSource

  • getSource

  • getSourceWithFormat

如果您使用 getCatalogSource,则作业具有 数据目录 数据库和表名称信息,并可用于获取一些从 Apache Kafka 流读取的基本参数。如果您使用 getSource,则必须明确指定以下参数:

您可以将 connectionOptionsGetSource 结合使用、将 optionsgetSourceWithFormat 结合使用或将 additionalOptionsgetCatalogSource 结合使用来指定这些选项。

"connectionType": "kafka" 可使用以下连接选项:

  • bootstrap.servers(必填)引导服务器 URLs 的列表,例如,作为 b-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094。 必须在 API 调用中指定此选项,或者在 数据目录 中的表元数据中定义此选项。

  • security.protocol(必需)指示在 Apache Kafka 连接上是启用还是禁用 SSL 的布尔值。默认值为“true”。必须在 API 调用中指定此选项,或者在 数据目录 中的表元数据中定义此选项。

  • topicName(必填)Apache Kafka 中指定的主题名称。您必须指定至少一个 "topicName""assign""subscribePattern"

  • "assign":(必需)要使用的特定 TopicPartitions。您必须指定至少一个 "topicName""assign""subscribePattern"

  • "subscribePattern":(必需) 标识要订阅的主题列表的 Java 正则表达式字符串。您必须指定至少一个 "topicName""assign""subscribePattern"

  • classification(可选)

  • delimiter(可选)

  • "startingOffsets":(可选)要从中读取数据的 Kafka 主题的开始位置。可能的值为 "earliest""latest"。 默认值为 "latest"

  • "endingOffsets":(可选) 批处理查询结束时的终点。可能的值为 "latest" 或为每个 TopicPartition 指定结束偏移量的 JSON 字符串。

    对于 JSON 字符串,格式为 {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}。 偏移量形式的值 -1 表示 "latest"

  • "pollTimeoutMs":(可选)从 Spark 作业执行程序中的 Kafka 轮询数据的超时(以毫秒为单位)。默认值为 512

  • "numRetries":(可选)在获取 Kafka 偏移失败之前重试的次数。默认值为 3

  • "retryIntervalMs":(可选) 在重试以获取 Kafka 偏移之前等待的时间 (以毫秒为单位)。默认值为 10

  • "maxOffsetsPerTrigger":(可选) 每个触发间隔处理的最大偏移数的速率限制。指定的偏移总数在不同卷的topicPartitions之间按比例拆分。默认值为 null,这意味着使用者将读取所有偏移,直到出现已知的最新偏移。

  • "minPartitions":(可选)要从 Kafka 读取的所需最小分区数。默认值为 null,这意味着火花分区数等于 Kafka 分区数。

"connectionType": "kinesis"

指定与 Amazon Kinesis Data Streams 的连接。

您可以使用 GlueContext 对象下的以下方法来使用 Kinesis 流式传输源中的记录:

  • getCatalogSource

  • getSource

  • getSourceWithFormat

如果您使用 getCatalogSource,则作业具有 数据目录 数据库和表名称信息,并可用于获取一些从 Kinesis 流式传输源读取的基本参数。如果您使用 getSource,则必须明确指定以下参数:

您可以将 connectionOptionsGetSource 结合使用、将 optionsgetSourceWithFormat 结合使用或将 additionalOptionsgetCatalogSource 结合使用来指定这些选项。

"connectionType": "kinesis" 可使用以下连接选项:

  • endpointUrl(必填)Kinesis 流式传输源的 URL。必须在 API 调用中指定此选项,或者在 数据目录 中的表元数据中定义此选项。

  • streamName(必填)以下格式的数据流标识符:account-id:StreamName:streamCreationTimestamp。 必须在 API 调用中指定此选项,或者在 数据目录 中的表元数据中定义此选项。

  • classification(可选)

  • delimiter(可选)

  • "startingPosition":(可选)要从中读取数据的 Kinesis 数据流的开始位置。可能的值为 "latest""trim_horizon""earliest"。 默认值为 "latest"

  • "maxFetchTimeInMs":(可选) 在作业执行程序上从每个分片的 Kinesis 数据流中提取记录所花费的最长时间,以毫秒 (ms) 为单位。默认值为 1000

  • "maxFetchRecordsPerShard":(可选)Kinesis 数据流中每个分片要提取的最大记录数。默认值为 100000

  • "maxRecordPerRead":(可选)每个 Kinesis 操作中从 getRecords 数据流提取的最大记录数。默认值为 10000

  • "addIdleTimeBetweenReads":(可选) 在两个连续 getRecords 操作之间添加时间延迟。默认值为 "False"

  • "idleTimeBetweenReadsInMs":(可选)两个连续 getRecords 操作之间的最短延迟时间(以毫秒为单位)。默认值为 1000

  • "describeShardInterval":(可选)脚本要考虑重新分片的两个 ListShards API 调用之间的最短时间间隔。有关更多信息,请参阅 https://docs.amazonaws.cn/streams/latest/dev/kinesis-using-sdk-java-resharding-strategies.html 开发人员指南 中的Amazon Kinesis Data Streams重新分片策略。默认值为 1s

  • "numRetries":(可选)Kinesis Data Streams API 请求的最大重试次数。默认值为 3

  • "retryIntervalMs":(可选)重试 Kinesis Data Streams API 调用之前的冷却时间(以毫秒为单位)。默认值为 1000

  • "maxRetryIntervalMs":(可选)Kinesis Data Streams API 调用的两次重试之间的最大冷却时间(以毫秒为单位指定)。默认值为 10000

  • "avoidEmptyBatches":(可选)在启动批次之前,通过检查 Kinesis 数据流中未读取的数据来避免创建空的微批次作业。默认值为 "False"

"connectionType": "mongodb"

指定与 MongoDB 的连接。 源连接和接收器连接的连接选项不同。

"connectionType": "mongodb" as Source

"connectionType": "mongodb" 用作源时可使用以下连接选项:

  • "uri":(必需)要从中读取数据的 MongoDB 主机,格式为 mongodb://<host>:<port>

  • "database":(必需)要从中读取数据的 MongoDB 数据库。在作业脚本中调用 additional_options 时,也可以传递此选项。glue_context.create_dynamic_frame_from_catalog

  • "collection":(必需)要从中读取数据的 MongoDB 集合。在作业脚本中调用 additional_options 时,也可以传递此选项。glue_context.create_dynamic_frame_from_catalog

  • "username":(必需)MongoDB 用户名。

  • "password":(必需)MongoDB 密码。

  • "ssl":(可选)如果为 true,则启动 SSL 连接。默认为 false

  • "ssl.domain_match":(可选)如果 truessltrue,则执行域匹配检查。默认为 true

  • "batchSize":(可选):每个批处理返回的文档数量,在内部批处理的游标中使用。

  • "partitioner":(可选):用于从 MongoDB 读取输入数据的分区器的类名称。 连接器提供以下分区器:

    • MongoDefaultPartitioner(默认值)

    • MongoSamplePartitioner(需要 MongoDB 3.2 或更高版本)

    • MongoShardedPartitioner

    • MongoSplitVectorPartitioner

    • MongoPaginateByCountPartitioner

    • MongoPaginateBySizePartitioner

  • "partitionerOptions" (可选):指定分区器的选项。各个分区器支持的选项如下:

    • MongoSamplePartitioner: partitionKey, partitionSizeMB, samplesPerPartition

    • MongoShardedPartitioner: shardkey

    • MongoSplitVectorPartitioner: partitionKey, partitionSizeMB

    • MongoPaginateByCountPartitioner: partitionKey, numberOfPartitions

    • MongoPaginateBySizePartitioner: partitionKey, partitionSizeMB

    有关这些选项的更多信息,请参阅 文档中的分区器配置MongoDB。有关示例代码,请参阅 示例:设置连接类型和选项

"connectionType": "mongodb" as Sink

"connectionType": "mongodb" 用作连接器时可使用以下连接选项:

  • "uri":(必需)要在其中写入数据的 MongoDB 主机,格式为 mongodb://<host>:<port>

  • "database":(必需)要在其中写入数据的 MongoDB 数据库。

  • "collection":(必需)要在其中写入数据的 MongoDB 集合。

  • "username":(必填)MongoDB 用户名。

  • "password":(必需)MongoDB 密码。

  • "ssl":(可选)如果为 true,则启动 SSL 连接。默认为 false

  • "ssl.domain_match":(可选)如果 truessltrue,则执行域匹配检查。默认为 true

  • "extendedBsonTypes":(可选)如果为 true,则在将数据写入到 MongoDB 时启用扩展 BSON 类型。 默认值为 true

  • "replaceDocument":(可选)如果为 true,则在保存包含 _id 字段的数据集时会替换整个文档。如果为 false,则只会更新文档中与数据集中的字段匹配的字段。默认为 true

  • "maxBatchSize":(可选):保存数据时批量操作的最大批处理大小。默认值为 512。

有关示例代码,请参阅 示例:设置连接类型和选项

"connectionType": "orc"

指定与 Amazon S3 中以 Apache Hive 优化的行列式 (ORC) 文件格式存储的文件的连接。

"connectionType": "orc" 可使用以下连接选项:

  • paths:(必需)要从中读取的 Amazon S3 路径的列表。

  • (其他选项名称/值对):任何其他选项(包括格式化选项)将直接传递到 SparkSQL DataSource。 有关更多信息,请参阅 Spark 的 Redshift 数据源

"connectionType": "parquet"

指定与 Amazon S3 中存储的文件的连接,这些文件采用 Apache Parquet 文件格式。

"connectionType": "parquet" 可使用以下连接选项:

  • paths:(必填)要从中读取的 Amazon S3 路径的列表。

  • (其他选项名称/值对):任何其他选项(包括格式化选项)将直接传递到 SparkSQL DataSource。 有关更多信息,请参阅 网站上的 Spark 的 Amazon Redshift 数据源GitHub。

"connectionType": "s3"

指定与 Amazon S3 的连接。

"connectionType": "s3" 可使用以下连接选项:

  • "paths":(必需)要从中读取的 Amazon S3 路径的列表。

  • "exclusions":(可选)包含要排除的 Unix 样式 glob 模式的 JSON 列表的字符串。例如,"[\"**.pdf\"]" 会排除所有 PDF 文件。有关 AWS Glue 支持的 glob 语法的更多信息,请参阅包含和排除模式

  • "compressionType": 或“compression”:(可选) 指定数据压缩方式。对 Amazon S3 源使用 "compressionType",对 Amazon S3 目标使用 "compression"。通常,如果数据有标准文件扩展名,则不需要指定。可能的值为 "gzip""bzip"

  • "groupFiles":(可选)当输入包含超过 50,000 个文件时,将默认启用文件分组。要对少于 50,000 个文件启用分组,请将此参数设置为 "inPartition"。 要在超过 50,000 个文件时禁用分组,请将此参数设置为 "none"

  • "groupSize":(可选)目标组大小(以字节为单位)。默认值根据输入数据大小和群集大小进行计算。当少于 50,000 个输入文件时,"groupFiles" 必须设置为 "inPartition",此选项才能生效。

  • "recurse":(可选)如果设置为 true,则以递归方式读取指定路径下的所有子目录中的文件。

  • "maxBand":(可选,高级)此选项控制 s3 列表可能保持一致的持续时间(以秒为单位)。当使用 JobBookmarks 来表明 Amazon S3 最终一致性时,将专门跟踪修改时间戳在最后 maxBand 秒内的文件。大多数用户不需要设置此选项。默认值为 900 秒。

  • "maxFilesInBand":(可选,高级)此选项指定在最后 maxBand 秒内可保存的最大文件数量。如果超过此值,额外的文件将会跳过,且只能在下一次作业运行中处理。大多数用户不需要设置此选项。

JDBC connectionType 值

这些功能包括:

  • "connectionType": "sqlserver":指定与 Microsoft SQL Server 数据库的连接。

  • "connectionType": "mysql":指定与 MySQL 数据库的连接。

  • "connectionType": "oracle":指定与 Oracle 数据库的连接。

  • "connectionType": "postgresql":指定与 PostgreSQL 数据库的连接。

  • "connectionType": "redshift":指定与 Amazon Redshift 数据库的连接。

下表列出了 AWS Glue 支持的 JDBC 驱动程序版本。

产品 JDBC 驱动程序版本
Microsoft SQL Server 6.x
MySQL 5.1
Oracle Database 11.2
PostgreSQL 42.x
Amazon Redshift 4.1

将这些连接选项与 JDBC 连接结合使用:

  • "url":(必填)数据库的 JDBC URL。

  • "dbtable":要从中读取的数据库表。对于在数据库中支持架构的 JDBC 数据存储,请指定 schema.table-name。 如果未提供架构,则使用默认的“public”架构。

  • "redshiftTmpDir":(对于 Amazon Redshift 是必需的,对于其他 JDBC 类型是可选的)当从数据库中复制数据时,用于暂存临时数据的 Amazon S3 路径。

  • "user":(必需) 连接时要使用的用户名。

  • "password":(必填)在连接时使用的密码。

  • (可选)以下选项使您能够提供自定义 JDBC 驱动程序。如果必须使用 AWS Glue 本身不支持的驱动程序,请使用这些选项。ETL 作业可以为数据源和目标使用不同的 JDBC 驱动程序版本,即使源和目标是相同的数据库产品也是如此。这使您能够在不同版本的源数据库和目标数据库之间迁移数据。要使用这些选项,您必须首先将 JDBC 驱动程序的 jar 文件上传到 Amazon S3。

    • "customJdbcDriverS3Path":自定义 JDBC 驱动程序的 Amazon S3 路径。

    • "customJdbcDriverClassName":JDBC 驱动程序的类名。

JDBC 连接的连接选项中包含的所有其他选项名称/值对(包括格式化选项)将直接传递到底层 SparkSQL DataSource。 有关更多信息,请参阅 Spark 的 Redshift 数据源

以下代码示例演示了如何使用自定义 JDBC 驱动程序读取和写入 JDBC 数据库。这些示例演示了如何从一个版本的数据库产品读取和写入同一产品的更高版本。

Python
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext, SparkConf from awsglue.context import GlueContext from awsglue.job import Job import time from pyspark.sql.types import StructType, StructField, IntegerType, StringType sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session # Construct JDBC connection options connection_mysql5_options = { "url": "jdbc:mysql://<jdbc-host-name>:3306/db", "dbtable": "test", "user": "admin", "password": "pwd"} connection_mysql8_options = { "url": "jdbc:mysql://<jdbc-host-name>:3306/db", "dbtable": "test", "user": "admin", "password": "pwd", "customJdbcDriverS3Path": "s3://path/mysql-connector-java-8.0.17.jar", "customJdbcDriverClassName": "com.mysql.cj.jdbc.Driver"} connection_oracle11_options = { "url": "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL", "dbtable": "test", "user": "admin", "password": "pwd"} connection_oracle18_options = { "url": "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL", "dbtable": "test", "user": "admin", "password": "pwd", "customJdbcDriverS3Path": "s3://path/ojdbc10.jar", "customJdbcDriverClassName": "oracle.jdbc.OracleDriver"} # Read from JDBC databases with custom driver df_mysql8 = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql8_options) # Read DynamicFrame from MySQL 5 and write to MySQL 8 df_mysql5 = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql5_options) glueContext.write_from_options(frame_or_dfc=df_mysql5, connection_type="mysql", connection_options=connection_mysql8_options) # Read DynamicFrame from Oracle 11 and write to Oracle 18 df_oracle11 = glueContext.create_dynamic_frame.from_options(connection_type="oracle", connection_options=connection_oracle11_options) glueContext.write_from_options(frame_or_dfc=df_oracle11, connection_type="oracle", connection_options=connection_oracle18_options)
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamicFrame import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { val MYSQL_5_URI: String = "jdbc:mysql://<jdbc-host-name>:3306/db" val MYSQL_8_URI: String = "jdbc:mysql://<jdbc-host-name>:3306/db" val ORACLE_11_URI: String = "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL" val ORACLE_18_URI: String = "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL" // Construct JDBC connection options lazy val mysql5JsonOption = jsonOptions(MYSQL_5_URI) lazy val mysql8JsonOption = customJDBCDriverJsonOptions(MYSQL_8_URI, "s3://path/mysql-connector-java-8.0.17.jar", "com.mysql.cj.jdbc.Driver") lazy val oracle11JsonOption = jsonOptions(ORACLE_11_URI) lazy val oracle18JsonOption = customJDBCDriverJsonOptions(ORACLE_18_URI, "s3://path/ojdbc10.jar", "oracle.jdbc.OracleDriver") def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Read from JDBC database with custom driver val df_mysql8: DynamicFrame = glueContext.getSource("mysql", mysql8JsonOption).getDynamicFrame() // Read DynamicFrame from MySQL 5 and write to MySQL 8 val df_mysql5: DynamicFrame = glueContext.getSource("mysql", mysql5JsonOption).getDynamicFrame() glueContext.getSink("mysql", mysql8JsonOption).writeDynamicFrame(df_mysql5) // Read DynamicFrame from Oracle 11 and write to Oracle 18 val df_oracle11: DynamicFrame = glueContext.getSource("oracle", oracle11JsonOption).getDynamicFrame() glueContext.getSink("oracle", oracle18JsonOption).writeDynamicFrame(df_oracle11) Job.commit() } private def jsonOptions(uri: String): JsonOptions = { new JsonOptions( s"""{"url": "${url}", |"dbtable":"test", |"user": "admin", |"password": "pwd"}""".stripMargin) } private def customJDBCDriverJsonOptions(uri: String, customJdbcDriverS3Path: String, customJdbcDriverClassName: String): JsonOptions = { new JsonOptions( s"""{"url": "${url}", |"dbtable":"test", |"user": "admin", |"password": "pwd", |"customJdbcDriverS3Path": "${customJdbcDriverS3Path}", |"customJdbcDriverClassName" : "${customJdbcDriverClassName}"}""".stripMargin) } }

自定义和AWS Marketplace connectionType 值

这些功能包括:

  • "connectionType": "marketplace.athena":指定与 Amazon Athena 数据存储的连接。该连接使用来自 AWS Marketplace 的连接器。

  • "connectionType": "marketplace.spark":指定与 Apache Spark 数据存储的连接。该连接使用来自 AWS Marketplace 的连接器。

  • "connectionType": "marketplace.jdbc":指定与 JDBC 数据存储的连接。该连接使用来自 AWS Marketplace 的连接器。

  • "connectionType": "custom.athena":指定与 Amazon Athena 数据存储的连接。该连接使用您上传到 AWS Glue Studio 的自定义连接器。

  • "connectionType": "custom.spark":指定与 Apache Spark 数据存储的连接。该连接使用您上传到 AWS Glue Studio 的自定义连接器。

  • "connectionType": "custom.jdbc":指定与 JDBC 数据存储的连接。该连接使用您上传到 AWS Glue Studio 的自定义连接器。

custom.jdbc 或 marketplace.jdbc 类型的连接选项

  • className – 字符串、必需、驱动程序类名称。

  • connectionName – 与连接器关联的连接的字符串、必需名称。

  • url – 字符串(必需)带占位符 (${}) 的 JDBC URL,用于构建到数据源的连接。占位符 ${secretKey} 将替换为 AWS Secrets Manager 中同名的密钥。有关构建 URL 的更多信息,请参阅数据存储文档。

  • secretIduser/password – 字符串(必需),用于检索 URL 的凭证。

  • dbTablequery – 字符串、必需,要从中获取数据的表或 SQL 查询。您可以指定 dbTablequery,但不能同时指定两者。

  • partitionColumn – 用于分区的整数列的名称(字符串,可选)。此选项仅在 lowerBoundupperBoundnumPartitions 附带时有效。 此选项的工作方式与 Spark SQL JDBC 读取器中的工作方式相同。有关更多信息,请参阅 Apache Spark SQL、 和数据集指南中的 DataFramesJDBC 到其他数据库

    lowerBound 值用于确定分区表,而不是筛选表中的行。upperBound对表中的所有行进行分区并返回。

    注意

    使用查询而不是表名称时,您应验证查询是否适用于指定的分区条件。例如:

    • 如果您的查询格式为 "SELECT col1 FROM table1",则通过将 WHERE 子句附加到使用分区列的查询的末尾来测试查询。

    • 如果您的查询格式为“SELECT col1 FROM table1 WHERE col2=val"”,则通过扩展具有 WHEREAND 子句和使用分区列的表达式来测试查询。

  • lowerBound – 整数(可选),用于决定分区模式的 partitionColumn 的最小值。

  • upperBound – 整数(可选),用于决定分区模式的 的最大值。partitionColumn

  • numPartitions – 整数,可选,分区数量。此值与 lowerBound(含)和 upperBound(不含)一起,为生成的 WHERE 子句表达式(用于拆分 partitionColumn)形成分区表。

    重要

    分区数量要小心,因为分区过多可能导致外部数据库系统上出现问题。

  • filterPredicate – String, optional, extra condition 子句,用于筛选来自源的数据。例如:

    BillingCity='Mountain View'

    使用查询 而不是 名称时,您应验证查询是否适用于指定的 filterPredicate。 F或示例:

    • 如果您的查询格式为 "SELECT col1 FROM table1",则通过将 WHERE 子句附加到使用筛选条件谓词的查询的末尾来测试查询。

    • 如果您的查询格式为 "SELECT col1 FROM table1 WHERE col2=val",则通过扩展具有 WHEREAND 子句和使用筛选条件谓词的表达式来测试查询。

  • dataTypeMapping – 词典,可选自定义数据类型映射,可构建从 JDBC 数据类型到 Glue 数据类型的映射。例如,选项 "dataTypeMapping":{"FLOAT":"STRING"} 将 JDBC 类型 FLOAT 的数据字段映射到 Java String 类型,方法是调用驱动程序的 ResultSet.getString() 方法,并使用它生成 Glue 记录。对象由每个驱动程序实现,因此行为特定于您使用的驱动程序。ResultSet请参阅 JDBC 驱动程序的文档以了解该驱动程序如何执行转换。

  • 当前支持的 AWS Glue 数据类型包括:

    • DATE

    • STRING

    • TIMESTAMP

    • INT

    • FLOAT

    • LONG

    • BIGDECIMAL

    • BYTE

    • 短横线

    • DOUBLE

    支持的 JDBC d at 为 Java8 java.sql.types。https://docs.oracle.com/javase/8/docs/api/java/sql/Types.html

    默认数据类型映射(从 JDBC 到 AWS Glue)为:

    • DATE -&gt; DATE

    • VARCHAR -&gt; STRING

    • CHAR -&gt; STRING

    • LONGNVARCHAR -&gt; STRING

    • TIMESTAMP -&gt; TIMESTAMP

    • INTEGER -&gt; INT

    • FLOAT -&gt; FLOAT

    • REAL -&gt; FLOAT

    • BIT -&gt; BOOLEAN

    • 布尔值 -&gt; 布尔值

    • BIGINT -&gt; LONG

    • DECIMAL -&gt; BIGDECIMAL

    • NUMERIC -&gt; BIGDECIMAL

    • TINYINT -&gt; SHORT

    • SMALLINT -&gt; SHORT

    • DOUBLE -&gt; DOUBLE

    如果您将自定义数据类型映射与选项 dataTypeMapping 结合使用,则可以覆盖默认数据类型映射。只有 dataTypeMapping 选项中列出的 JDBC 数据类型会受到影响;默认映射用于所有其他 JDBC 数据类型。您可以根据需要为其他 JDBC 数据类型添加映射。如果默认映射或自定义映射中未包含 JDBC 数据类型,则默认情况下,数据类型将转换为 AWS Glue STRING 数据类型。

以下 Python 代码示例演示如何使用 AWS Marketplace JDBC 驱动程序从 JDBC 数据库进行读取。它演示了从数据库读取和写入 S3 位置。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

custom.athena 或 Marketplace.athena 类型的连接选项

  • className – 字符串、必需、驱动程序类名称。当您使用 Athena-CloudWatch 连接器时,此参数值是类名的前缀(例如,)"com.amazonaws.athena.connectors"。Athena-CloudWatch 连接器由两个类组成:一个元数据处理程序和一个记录处理程序。如果您在此处提供通用前缀,则 API 根据该前缀加载正确的类。

  • tableName – 字符串,必需,要读取的 CloudWatch 日志流的名称。此代码段使用特殊视图名称 all_log_streams,这意味着返回的动态数据帧将包含来自日志组中的所有日志流的数据。

  • schemaName – 字符串(必需)CloudWatch 要从中读取的 日志组的名称。 F 或示例 。/aws-glue/jobs/output

  • connectionName – 与连接器关联的连接的字符串、必需名称。

有关此连接器的其他选项,请参阅 上的 AthenaAmazon CloudWatch Connector 自述文件GitHub。

以下 Python 代码示例显示如何使用连接器从 Athena 数据存储中读取。AWS Marketplace它演示了从 Athena 读取和写入 S3 位置。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

custom.spark 或 marketplace.spark 类型的连接选项

  • className – 字符串(必需)、连接器类名称。

  • secretId – 用于检索连接器连接的凭证的字符串(可选)。

  • connectionName – 与连接器关联的连接的字符串、必需名称。

  • 其他选项取决于数据存储。例如,Elasticsearch 配置选项以前缀 es 开头,如 Elasticsearch for Apache Hadoop 文档中所述。Spark 与 Snowflake 的连接使用 sfUsersfPassword 等选项,如连接到 Snowflake 指南中的使用 Spark 连接器所述。

以下 Python 代码示例说明如何使用 连接从 Elasticsearch 数据存储中读取数据。marketplace.spark

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://<AWS endpoint>", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<AWS endpoint>","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()