Amazon Glue 中的 ETL 的连接类型和选项 - Amazon Glue
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

Amazon Glue 中的 ETL 的连接类型和选项

在 Amazon Glue 中,各种 PysPark 和 Scala 方法和转换使用 connectionType 参数指定连接类型。它们使用 connectionOptionsoptions 参数指定连接选项。

connectionType 参数可以采用下表中显示的值。下面几个部分介绍每种类型所关联的 connectionOptions(或 options)参数值。除非另有说明,否则这些参数会在连接用作源或接收器时使用。

有关展示如何设置和使用连接选项的代码示例,请参阅示例:设置连接类型和选项

connectionType 连接到
custom.* Spark、Athena 或 JDBC 数据存储(请参阅自定义和 Amazon Web Services Marketplace connectionType 值
documentdb Amazon DocumentDB (with MongoDB compatibility) 数据库
dynamodb Amazon DynamoDB 数据库
kafka KafkaAmazon Managed Streaming for Apache Kafka
kinesis Amazon Kinesis Data Streams
marketplace.* Spark、Athena 或 JDBC 数据存储(请参阅自定义和 Amazon Web Services Marketplace connectionType 值
mongodb MongoDB 数据库
mysql MySQL 数据库(请参阅JDBC connectionType 值
oracle Oracle 数据库(请参阅JDBC connectionType 值
orc Amazon Simple Storage Service(Amazon S3)中以 Apache Hive 优化的行列式(ORC)文件格式存储的文件
parquet Amazon S3 中以 Apache Parquet 文件格式存储的文件
postgresql PostgreSQL 数据库(请参阅JDBC connectionType 值
redshift Amazon Redshift 数据库(请参阅JDBC connectionType 值
S3 Amazon S3
sqlserver Microsoft SQL Server 数据库(请参阅JDBC connectionType 值

"connectionType": "documentdb"

指定与 Amazon DocumentDB (with MongoDB compatibility) 的连接。

源连接和接收器连接的连接选项不同。

"connectionType": "documentdb" as Source

"connectionType": "documentdb" 用作源时可使用以下连接选项:

  • "uri":(必需)要从中读取数据的 Amazon DocumentDB 主机,格式为 mongodb://<host>:<port>

  • "database":(必需)要从中读取数据的 Amazon DocumentDB 数据库。

  • "collection":(必需)要从中读取数据的 Amazon DocumentDB 连接。

  • "username":(必需)Amazon DocumentDB 用户名。

  • "password":(必需)Amazon DocumentDB 密码。

  • "ssl":(如果使用 SSL,则必需)如果您的连接使用 SSL,则必须包含此选项且值为 "true"

  • "ssl.domain_match":(如果使用 SSL,则必需)如果您的连接使用 SSL,则必须包含此选项且值为 "false"

  • "batchSize":(可选)每个批处理返回的文档数量,在内部批处理的游标中使用。

  • "partitioner":(可选)从 Amazon DocumentDB 中读取输入数据的分区器的类名称。该连接器提供以下分区器:

    • MongoDefaultPartitioner (默认值)

    • MongoSamplePartitioner

    • MongoShardedPartitioner

    • MongoSplitVectorPartitioner

    • MongoPaginateByCountPartitioner

    • MongoPaginateBySizePartitioner

  • "partitionerOptions":(可选)指定分区器的选项。各个分区器支持的选项如下:

    • MongoSamplePartitioner: partitionKey, partitionSizeMB, samplesPerPartition

    • MongoShardedPartitioner: shardkey

    • MongoSplitVectorPartitionerpartitionKey、partitionSizeMB

    • MongoPaginateByCountPartitioner: partitionKey, numberOfPartitions

    • MongoPaginateBySizePartitionerpartitionKey、partitionSizeMB

    有关这些选项的更多信息,请参阅 MongoDB 文档中的分区器配置。有关示例代码,请参阅 示例:设置连接类型和选项

"connectionType": "documentdb" as Sink

"connectionType": "documentdb" 用作连接器时可使用以下连接选项:

  • "uri":(必需)要在其中写入数据的 Amazon DocumentDB 主机,格式为 mongodb://<host>:<port>

  • "database":(必需)要在其中写入数据的 Amazon DocumentDB 数据库。

  • "collection":(必需)要在其中写入数据的 Amazon DocumentDB 连接。

  • "username":(必需)Amazon DocumentDB 用户名。

  • "password":(必需)Amazon DocumentDB 密码。

  • "extendedBsonTypes":(可选)如果为 true,则在 Amazon DocumentDB 中写入数据时会启用扩展 BSON 类型。默认为 true

  • "replaceDocument":(可选)如果为 true,则在保存包含 _id 字段的数据集时会替换整个文档。如果为 false,则只会更新文档中与数据集中的字段匹配的字段。默认为 true

  • "maxBatchSize":(可选)保存数据时的批量操作的最大批次大小。默认值为 512。

有关示例代码,请参阅 示例:设置连接类型和选项

"connectionType": "dynamodb"

指定与 Amazon DynamoDB 的连接。

源连接和接收器连接的连接选项不同。

"connectionType": "dynamodb" with the ETL connector as Source

在使用 Amazon Glue DynamoDB ETL 连接器时,请使用以下连接选项并将 "connectionType": "dynamodb" 作为源:

  • "dynamodb.input.tableName":(必需)要从中读取数据的 DynamoDB 表格。

  • "dynamodb.throughput.read.percent":(可选)要使用的读取容量单位 (RCU) 的百分比。默认设置为“0.5”。可接受的值从“0.1”到“1.5”,包含这两个值。

    • 0.5 表示默认读取速率,这意味着 Amazon Glue 将尝试占用表的一半的读取容量。如果增加值超过 0.5,Amazon Glue 将增加请求速率;将值降低到 0.5 以下将降低读取请求速率。(实际读取速率取决于 DynamoDB 表中是否存在统一键分配的等因素。)

    • 当 DynamoDB 表处于按需模式时,Amazon Glue 处理表的读取容量为 40000。要导出大型表,我们建议您将 DynamoDB 表切换为按需模式。

  • "dynamodb.splits":(可选)定义在读取时将此 DynamoDB 表分成多少个部分。默认设置为“1”。可接受的值从“1”到“1,000,000”,包含这两个值。

    • 1 表示没有并行度。我们强烈建议您使用以下公式指定更大的值以获得更好的性能。

    • 我们建议您使用以下公式计算 numSlots,并将其用作 dynamodb.splits。如果您需要更高的性能,我们建议您增加 DPU 数量以扩展任务。

      • numExecutors =

        • (DPU - 1) * 2 - 1,如果 WorkerTypeStandard

        • (NumberOfWorkers - 1),如果 WorkerTypeG.1XG.2X

      • numSlotsPerExecutor =

        • 4,如果 WorkerTypeStandard

        • 8,如果 WorkerTypeG.1X

        • 16,如果 WorkerTypeG.2X

      • numSlots = numSlotsPerExecutor * numExecutors

  • "dynamodb.sts.roleArn":(可选)用于跨账户访问的 IAM 角色 ARN。此参数适用于 Amazon Glue 1.0 或更高版本。

  • "dynamodb.sts.roleSessionName":(可选)STS 会话名称。默认设置为“glue-dynamodb-read-sts-session”。此参数适用于 Amazon Glue 1.0 或更高版本。

以下代码示例演示了如何从 DynamoDB 表中读取(通过 ETL 连接器)以及向其写入数据。它们演示了如何从一个表读取数据并将数据写入其他表。

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={"dynamodb.input.tableName": "test_source", "dynamodb.throughput.read.percent": "1.0", "dynamodb.splits": "100" } ) print(dyf.getNumPartitions()) glue_context.write_dynamic_frame_from_options( frame=dyf, connection_type="dynamodb", connection_options={"dynamodb.output.tableName": "test_sink", "dynamodb.throughput.write.percent": "1.0" } ) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.input.tableName" -> "test_source", "dynamodb.throughput.read.percent" -> "1.0", "dynamodb.splits" -> "100" )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) val dynamoDbSink: DynamoDbDataSink = glueContext.getSinkWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.output.tableName" -> "test_sink", "dynamodb.throughput.write.percent" -> "1.0" )) ).asInstanceOf[DynamoDbDataSink] dynamoDbSink.writeDynamicFrame(dynamicFrame) Job.commit() } }
注意

Amazon Glue 支持从其他 Amazon 账户的 DynamoDB 表读取数据。有关更多信息,请参阅跨账户、跨区域访问 DynamoDB 表

注意

DynamoDB ETL 读取器不支持筛选条件或下推谓词。

"connectionType": "dynamodb" with the Amazon Glue DynamoDB export connector as Source

此功能位于 Amazon Glue 的预览版中,可能会发生变化。

除 Amazon Glue DynamoDB ETL 连接器外,Amazon Glue 提供 DynamoDB 导出连接器,该连接器调用 DynamoDB ExportTableToPointInTime 请求并将其以 DynamoDB JSON 格式存储在您提供的 Simple Storage Service (Amazon S3) 位置。然后,Amazon Glue 通过从 Simple Storage Service (Amazon S3) 导出位置读取数据来创建 DynamicFrame 对象。

在 DynamoDB 表大小超过 80 GB 时,导出连接器的性能优于 ETL 连接器。此外,鉴于导出请求在 Amazon Glue 任务中的 Spark 进程之外执行,您可以启用 Amazon Glue 任务的弹性伸缩以节省导出请求期间的 DPU 使用量。借助导出连接器,您也无需为 Spark 执行程序并行度或 DynamoDB 吞吐量读取百分比配置拆分数。

在使用 Amazon Glue DynamoDB 导出连接器(仅适用于 Amazon Glue 版本 2.0 以上)时,使用以下连接选项并将 "connectionType": "dynamodb"用作源:

  • "dynamodb.export":(必需)字符串值:

    • 如果设置为 ddb,将启用 Amazon Glue DynamoDB 导出连接器,其中在 Amazon Glue 任务期间将调用新的 ExportTableToPointInTimeRequest。新的导出将通过从 dynamodb.s3.bucketdynamodb.s3.prefix 传递的位置生成。

    • 如果设置为 s3,将启用 Amazon Glue DynamoDB 导出连接器但会跳过创建新的 DynamoDB 导出,而使用 dynamodb.s3.bucketdynamodb.s3.prefix 作为该表以前导出的 Simple Storage Service (Amazon S3) 位置。

  • "dynamodb.tableArn":(必需)要从中读取数据的 DynamoDB 表格。

  • "dynamodb.unnestDDBJson":(可选)采用布尔值。如果设置为 true(真),则对导出中存在的 DynamoDB JSON 结构执行解除嵌套转换。默认值设置为 false。

  • "dynamodb.s3.bucket":(可选)指示将会执行 DynamoDB ExportTableToPointInTime 进程的 Amazon S3 存储桶位置。导出的文件格式为 DynamoDB JSON。

    • "dynamodb.s3.prefix":(可选)指示将用于存储 DynamoDB ExportTableToPointInTime 负载的 Amazon S3 存储桶内的 Amazon S3 前缀位置。如果既未指定 dynamodb.s3.prefix,也未指定 dynamodb.s3.bucket,则这些值将默认为 Amazon Glue 任务配置中指定的临时目录位置。有关更多信息,请参阅 Amazon Glue 使用的特殊参数

    • "dynamodb.s3.bucketOwner":指示跨账户 Amazon S3 访问所需的存储桶拥有者。

  • "dynamodb.sts.roleArn":(可选)跨账户访问和/或跨区域访问 DynamoDB 表时将会代入的 IAM 角色 ARN。注意:相同的 IAM 角色 ARN 将用于访问为 ExportTableToPointInTime 请求指定的 Amazon S3 位置。

  • "dynamodb.sts.roleSessionName":(可选)STS 会话名称。默认设置为“glue-dynamodb-read-sts-session”。

注意

DynamoDB 对调用 ExportTableToPointInTime 请求有特定的要求。有关更多信息,请参阅在 DynamoDB 中请求表导出。例如,表需要启用时间点恢复 (PITR) 才能使用此连接器。DynamoDB 连接器还支持在将 DynamoDB 导出到 Simple Storage Service (Amazon S3) 时进行 KMS 加密。在 Amazon Glue 任务配置中指定安全性配置,将为 DynamoDB 导出启用 KMS 加密。KMS 密钥必须与 Simple Storage Service (Amazon S3) 存储桶位于同一区域。

请注意,您需要支付 DynamoDB 导出的额外费用和 Simple Storage Service (Amazon S3) 存储成本。任务运行完成后,Simple Storage Service (Amazon S3) 中的导出数据仍然存在,因此您无需其他 DynamoDB 导出即可重复使用这些数据。使用此连接器的一个要求是该表启用了时间点恢复 (PITR)。

DynamoDB ETL 连接器或导出连接器不支持在 DynamoDB 源应用筛选条件或下推谓词。

以下代码示例演示如何进行读取(通过导出连接器)以及打印分区数量。

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": "<test_source>", "dynamodb.s3.bucket": "<bucket name>", "dynamodb.s3.prefix": "<bucket prefix>", "dynamodb.s3.bucketOwner": "<account_id>", } ) print(dyf.getNumPartitions()) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) Job.commit() } }

以下示例演示如何从具有 dynamodb 分类的 Amazon Glue 数据目录表进行读取(通过导出连接器)以及打印分区数量:

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dynamicFrame = glue_context.create_dynamic_frame.from_catalog( database="<catalog_database>", table_name="<catalog_table_name", additional_options={ "dynamodb.export": "ddb", "dynamodb.s3.bucket": "<s3_bucket>", "dynamodb.s3.prefix": "<s3_bucket_prefix>" } ) print(dynamicFrame.getNumPartitions()) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getCatalogSource( database = "<catalog_database>", tableName = "<catalog_table_name", additionalOptions = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.s3.bucket" -> "<s3_bucket>", "dynamodb.s3.prefix" -> "<s3_bucket_prefix>" )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) )

遍历 DynamoDB JSON 结构

使用 Amazon Glue DynamoDB 导出连接器进行 DynamoDB 导出时可以生成具有特定嵌套结构的 JSON 文件。有关更多信息,请参阅数据对象。Amazon Glue 提供 DynamicFrame 转换,可以将这些结构解除嵌套,成为易于下游应用程序使用的形式。

转换可以通过两种方式之一调用,第一种方式是通过 Amazon Glue DynamoDB 导出连接器传递的布尔值标记。第二种方式是通过调用转换函数本身。

以下代码示例演示如何使用 Amazon Glue DynamoDB 导出连接器、调用解除嵌套命令,以及打印分区数量:

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": "<test_source>", "dynamodb.unnestDDBJson": True, "dynamodb.s3.bucket": "<bucket name>", "dynamodb.s3.prefix": "<bucket prefix>", "dynamodb.s3.bucketOwner": "<account_id>", } ) print(dyf.getNumPartitions()) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.unnestDDBJson" -> true "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) Job.commit() } }

转换的另一种调用是通过单独的 DynamicFrame 函数调用。有关更多信息,请参阅适用于 Python 的 DynamicFrame 类和适用于 Scala 的 Amazon Glue Scala DynamicFrame 类

"connectionType": "dynamodb" with the ETL connector as Sink

"connectionType": "dynamodb" 用作连接器时可使用以下连接选项:

  • "dynamodb.output.tableName":(必需)要写入的 DynamoDB 表。

  • "dynamodb.throughput.write.percent":(可选)要使用的写入容量单位(WCU)的百分比。默认设置为“0.5”。可接受的值从“0.1”到“1.5”,包含这两个值。

    • 0.5 表示默认写入速率,这意味着 Amazon Glue 将尝试占用表的一半的写入容量。如果增加值超过 0.5,Amazon Glue 将增加请求速率;将值降低到 0.5 以下将降低写入请求速率。(实际写入速率取决于 DynamoDB 表中是否存在统一键分配等因素)。

    • 当 DynamoDB 表处于按需模式时,Amazon Glue 处理表的写入容量为 40000。要导入大型表,我们建议您将 DynamoDB 表切换为按需模式。

  • "dynamodb.output.numParallelTasks":(可选)定义同时向 DynamoDB 写入数据的并行任务数。用于计算每个 Spark 任务的允许 WCU。如果您不想控制这些详细信息,则无需指定此参数。

    • permissiveWcuPerTask = TableWCU * dynamodb.throughput.write.percent / dynamodb.output.numParallelTasks

    • 如果您未指定此参数,则按照以下公式自动计算每个 Spark 任务的允许 WCU:

      • numPartitions = dynamicframe.getNumPartitions()

      • numExecutors =

        • (DPU - 1) * 2 - 1,如果 WorkerTypeStandard

        • (NumberOfWorkers - 1),如果 WorkerTypeG.1XG.2X

      • numSlotsPerExecutor =

        • 4,如果 WorkerTypeStandard

        • 8,如果 WorkerTypeG.1X

        • 16,如果 WorkerTypeG.2X

      • numSlots = numSlotsPerExecutor * numExecutors

      • numParallelTasks = min(numPartitions, numSlots)

    • 示例 1. DPU=10,WorkerType=Standard。输入 DynamicFrame 具有 100 个 RDD 分区。

      • numPartitions = 100

      • numExecutors = (10 - 1) * 2 - 1 = 17

      • numSlots = 4 * 17 = 68

      • numParallelTasks = min(100, 68) = 68

    • 示例 2. DPU=10,WorkerType=Standard。输入 DynamicFrame 具有 20 个 RDD 分区。

      • numPartitions = 20

      • numExecutors = (10 - 1) * 2 - 1 = 17

      • numSlots = 4 * 17 = 68

      • numParallelTasks = min(20, 68) = 20

  • "dynamodb.output.retry":(可选)定义存在 DynamoDB 中的 ProvisionedThroughputExceededException 时我们执行的重试次数。默认设置为“10”。

  • "dynamodb.sts.roleArn":(可选)用于跨账户访问的 IAM 角色 ARN。

  • "dynamodb.sts.roleSessionName":(可选)STS 会话名称。默认设置为“glue-dynamodb-write-sts-session”。

注意

DynamoDB 写入器在 Amazon Glue 版本 1.0 或更高版本中受支持。

注意

Amazon Glue 支持将数据写入其他 Amazon 账户的 DynamoDB 表。有关更多信息,请参阅跨账户、跨区域访问 DynamoDB 表

以下代码示例说明如何从 DynamoDB 表中读取以及向其写入数据。它们演示了如何从一个表读取数据并将数据写入其他表。

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.input.tableName": "test_source", "dynamodb.throughput.read.percent": "1.0", "dynamodb.splits": "100" } ) print(dyf.getNumPartitions()) glue_context.write_dynamic_frame_from_options( frame=dyf, connection_type="dynamodb", connection_options={ "dynamodb.output.tableName": "test_sink", "dynamodb.throughput.write.percent": "1.0" } ) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.input.tableName" -> "test_source", "dynamodb.throughput.read.percent" -> "1.0", "dynamodb.splits" -> "100" )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) val dynamoDbSink: DynamoDbDataSink = glueContext.getSinkWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.output.tableName" -> "test_sink", "dynamodb.throughput.write.percent" -> "1.0" )) ).asInstanceOf[DynamoDbDataSink] dynamoDbSink.writeDynamicFrame(dynamicFrame) Job.commit() } }

"connectionType": "kafka"

指定与 Kafka 集群或 Amazon Managed Streaming for Apache Kafka 集群的连接。

您可以在 GlueContext 对象下面使用以下方法,利用来自 Kafka 流式传输源的记录:

  • getCatalogSource

  • getSource

  • getSourceWithFormat

  • createDataFrameFromOptions

如果您使用 getCatalogSource,则任务具有数据目录数据库和表名称信息,将其用于获取一些基本参数,以便从 Apache Kafka 流读取数据。如果您使用 getSourcegetSourceWithFormatcreateDataFrameFromOptions,则您必须明确指定以下参数:

您可以配合使用 connectionOptionsgetSourcecreateDataFrameFromOptionsoptionsgetSourceWithFormat、或者 additionalOptionsgetCatalogSource,以指定这些选项。

"connectionType": "kafka" 可使用以下连接选项:

  • bootstrap.servers(必需)引导服务器 URL 的列表,例如,作为 b-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094。此选项必须在 API 调用中指定,或在数据目录的表元数据中定义。

  • security.protocol(必填)用于与代理通信的协议。可能的值为 "SSL""PLAINTEXT"

  • topicName(必填)要订阅的以逗号分隔的主题列表。您必须指定"topicName""assign""subscribePattern" 中的其中一个,且只能指定一个。

  • "assign":(必填)用于指定要使用的 TopicPartitions 的 JSON 字符串。您必须指定"topicName""assign""subscribePattern" 中的其中一个,且只能指定一个。

    例如:“{"topicA":[0,1],"topicB":[2,4]}”

  • "subscribePattern":(必需)标识要订阅的主题列表的 Java 正则表达式字符串。您必须指定"topicName""assign""subscribePattern" 中的其中一个,且只能指定一个。

    示例:“topic.*”

  • classification(可选)

  • delimiter(可选)

  • "startingOffsets":(可选)Kafka 主题中数据读取的起始位置。可能的值为 "earliest""latest"。默认值为 "latest"

  • "endingOffsets":(可选)批处理查询结束时的终点。可能值为 "latest",或者为每个 TopicPartition 指定结束偏移的 JSON 字符串。

    对于 JSON 字符串,格式为 {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}。偏移值 -1 表示 "latest"

  • "pollTimeoutMs":(可选)Spark 任务执行程序中,从 Kafka 轮询数据的超时时间(以毫秒为单位)。原定设置值为 512

  • "numRetries":(可选)无法获取 Kafka 偏移时的重试次数。原定设置值为 3

  • "retryIntervalMs":(可选)重试获取 Kafka 偏移时的等待时间(以毫秒为单位)。原定设置值为 10

  • "maxOffsetsPerTrigger":(可选)每个触发间隔处理的最大偏移数的速率限制。指定的总偏移数跨不同卷的 topicPartitions 按比例分割。默认值为 null,这意味着使用者读取所有偏移,直到已知的最新偏移。

  • "minPartitions":(可选)从 Kafka 读取数据的必需最小分区数。默认值为 null,这意味着 Spark 分区数等于 Kafka 分区数。

  • "includeHeaders":(可选)是否包含 Kafka 标头。当选项设置为“true”时,数据输出将包含一个名为“glue_streaming_kafka_headers”的附加列,类型为 Array[Struct(key: String, value: String)]。默认值为“false”。此选项仅适用于 Amazon Glue 版本 3.0。

  • "schema":(当 inferSchema 设为 false 时为必填)用于处理有效工作负载的架构。如果分类为 avro,则提供的架构必须采用 Avro 架构格式。如果分类不是 avro,则提供的架构必须采用 DDL 架构格式。

    以下是一些架构示例。

    Example in DDL schema format
    'column1' INT, 'column2' STRING , 'column3' FLOAT
    Example in Avro schema format
    { "type":"array", "items": { "type":"record", "name":"test", "fields": [ { "name":"_id", "type":"string" }, { "name":"index", "type": [ "int", "string", "float" ] } ] } }
  • "inferSchema":(可选)默认值为“false”。如果设置为“true”,则会在运行时检测到 foreachbatch 内的有效工作负载中的架构。

  • "avroSchema":(已弃用)用于指定 Avro 数据架构(使用 Avro 格式时)的参数。此参数现已被弃用。使用 schema 参数。

"connectionType": "kinesis"

为 Amazon Kinesis Data Streams 指定连接选项。

您可以使用存储在数据目录表中的信息从 Amazon Kinesis 数据流读取数据,或提供信息直接访问数据流。如果直接访问数据流,请使用这些选项提供有关如何访问数据流的信息。

如果您使用 getCatalogSourcecreate_data_frame_from_catalog 使用来自 Kinesis 串流源的记录,则任务具有数据目录数据库和表名称信息,将其用于获取一些基本参数,以便从 Kinesis 串流源读取数据。如果使用 getSourcegetSourceWithFormatcreateDataFrameFromOptionscreate_data_frame_from_options,则必须使用此处描述的连接选项指定这些基本参数。

您可以使用 GlueContext 类中指定方法的以下参数为 Kinesis 指定连接选项。

  • Scala

    • connectionOptions:与 getSourcecreateDataFrameFromOptions 结合使用

    • additionalOptions:与 getCatalogSource 结合使用

    • options:与 getSourceWithFormat 结合使用

  • Python

    • connection_options:与 create_data_frame_from_options 结合使用

    • additional_options:与 create_data_frame_from_catalog 结合使用

    • options:与 getSource 结合使用

为 Kinesis 串流数据源使用以下连接选项:

  • streamARN(必需)Kinesis 数据流的 ARN。

  • classification(可选)

  • delimiter(可选)

  • "startingPosition":(可选)Kinesis 数据流中数据读取的起始位置。可能的值为 "latest""trim_horizon""earliest"。原定设置值为 "latest"

  • "awsSTSRoleARN":(可选)要使用 Amazon Security Token Service(Amazon STS) 代入的角色的 Amazon Resource Name (ARN)。此角色必须拥有针对 Kinesis 数据流进行描述或读取记录操作的权限。在访问其他账户中的数据流时,必须使用此参数。与 "awsSTSSessionName" 结合使用。

  • "awsSTSSessionName":(可选)要使用 Amazon STS 代入角色的会话的标识符. 访问其他账户中的数据流时,必须使用此参数。与 "awsSTSRoleARN" 结合使用。

  • "maxFetchTimeInMs":(可选)任务执行程序从每个分区的 Kinesis 数据流中获取记录所花费的最长时间,以毫秒(ms)为单位。原定设置值为 1000

  • "maxFetchRecordsPerShard":(可选)Kinesis 数据流中每个分区要获取的最大记录数。原定设置值为 100000

  • "maxRecordPerRead":(可选)每项 getRecords 操作中要从 Kinesis 数据流获取的最大记录数。原定设置值为 10000

  • "addIdleTimeBetweenReads":(可选)在两项连续 getRecords 操作之间添加时间延迟。原定设置值为 "False"。此选项仅适用于 Glue 版本 2.0 及更高版本。

  • "idleTimeBetweenReadsInMs":(可选)两项连续 getRecords 操作之间的最短时间延迟,以毫秒为单位。原定设置值为 1000。此选项仅适用于 Glue 版本 2.0 及更高版本。

  • "describeShardInterval":(可选)两个 ListShards API 调用之间的最短时间间隔,供您的脚本考虑重新分区。有关更多信息,请参阅《Amazon Kinesis Data Streams 开发人员指南》中的重新分区策略。原定设置值为 1s

  • "numRetries":(可选)Kinesis Data Streams API 请求的最大重试次数。原定设置值为 3

  • "retryIntervalMs":(可选)重试 Kinesis Data Streams API 调用之前的冷却时间(以毫秒为单位)。原定设置值为 1000

  • "maxRetryIntervalMs":(可选)Kinesis Data Streams API 调用的两次重试之间的最长冷却时间(以毫秒为单位)。原定设置值为 10000

  • "avoidEmptyBatches":(可选)在批处理开始之前检查 Kinesis 数据流中是否有未读数据,避免创建空白微批处理任务。原定设置值为 "False"

  • "schema":(当 inferSchema 设为 false 时为必填)用于处理有效工作负载的架构。如果分类为 avro,则提供的架构必须采用 Avro 架构格式。如果分类不是 avro,则提供的架构必须采用 DDL 架构格式。

    以下是一些架构示例。

    Example in DDL schema format
    'column1' INT, 'column2' STRING , 'column3' FLOAT
    Example in Avro schema format
    { "type":"array", "items": { "type":"record", "name":"test", "fields": [ { "name":"_id", "type":"string" }, { "name":"index", "type": [ "int", "string", "float" ] } ] } }
  • "inferSchema":(可选)默认值为“false”。如果设置为“true”,则会在运行时检测到 foreachbatch 内的有效工作负载中的架构。

  • "avroSchema":(已弃用)用于指定 Avro 数据架构(使用 Avro 格式时)的参数。此参数现已被弃用。使用 schema 参数。

"connectionType": "mongodb"

指定与 MongoDB 的连接。源连接和接收器连接的连接选项不同。

"connectionType": "mongodb" as Source

"connectionType": "mongodb" 用作源时可使用以下连接选项:

  • "uri":(必需)要从中读取数据的 MongoDB 主机,格式为 mongodb://<host>:<port>

  • "database":(必需)要从中读取数据的 MongoDB 数据库。当在您的任务脚本中调用 glue_context.create_dynamic_frame_from_catalog 时,此选项还可以在 additional_options 中传递。

  • "collection":(必需)要从中读取数据的 MongoDB 集合。当在您的任务脚本中调用 glue_context.create_dynamic_frame_from_catalog 时,此选项还可以在 additional_options 中传递。

  • "username":(必需)MongoDB 用户名。

  • "password":(必需)MongoDB 密码。

  • "ssl":(可选)如果为 true,则启动 SSL 连接。默认为 false

  • "ssl.domain_match":(可选)如果为 true,且 ssltrue,则执行域匹配检查。默认为 true

  • "batchSize":(可选)每个批处理返回的文档数量,在内部批处理的游标中使用。

  • "partitioner":(可选)从 MongoDB 中读取输入数据的分区器的类名称。该连接器提供以下分区器:

    • MongoDefaultPartitioner (默认值)

    • MongoSamplePartitioner (需要 MongoDB 3.2 或更高版本)

    • MongoShardedPartitioner

    • MongoSplitVectorPartitioner

    • MongoPaginateByCountPartitioner

    • MongoPaginateBySizePartitioner

  • "partitionerOptions":(可选)指定分区器的选项。各个分区器支持的选项如下:

    • MongoSamplePartitioner: partitionKey, partitionSizeMB, samplesPerPartition

    • MongoShardedPartitioner: shardkey

    • MongoSplitVectorPartitioner: partitionKey, partitionSizeMB

    • MongoPaginateByCountPartitioner: partitionKey, numberOfPartitions

    • MongoPaginateBySizePartitioner: partitionKey, partitionSizeMB

    有关这些选项的更多信息,请参阅 MongoDB 文档中的分区器配置。有关示例代码,请参阅 示例:设置连接类型和选项

"connectionType": "mongodb" as Sink

"connectionType": "mongodb" 用作连接器时可使用以下连接选项:

  • "uri":(必需)要在其中写入数据的 MongoDB 主机,格式为 mongodb://<host>:<port>

  • "database":(必需)要在其中写入数据的 MongoDB 数据库。

  • "collection":(必需)要在其中写入数据的 MongoDB 集合。

  • "username":(必需)MongoDB 用户名。

  • "password":(必需)MongoDB 密码。

  • "ssl":(可选)如果为 true,则启动 SSL 连接。默认为 false

  • "ssl.domain_match":(可选)如果为 true,且 ssltrue,则执行域匹配检查。默认为 true

  • "extendedBsonTypes":(可选)如果为 true,则在 MongoDB 中写入数据时会允许扩展 BSON 类型。默认为 true

  • "replaceDocument":(可选)如果为 true,则在保存包含 _id 字段的数据集时会替换整个文档。如果为 false,则只会更新文档中与数据集中的字段匹配的字段。默认为 true

  • "maxBatchSize":(可选)保存数据时的批量操作的最大批次大小。默认值为 512。

有关示例代码,请参阅 示例:设置连接类型和选项

"connectionType": "orc"

指定与 Amazon S3 中以 Apache Hive 优化的行列式(ORC)文件格式存储的文件的连接。

"connectionType": "orc" 可使用以下连接选项:

  • paths:(必需)要从中读取数据的 Amazon S3 路径的列表。

  • (其他选项名称/值对):任何其他选项(包括格式化选项)将直接传递给 SparkSQL DataSource。有关更多信息,请参阅 Spark 的 Redshift 数据源

"connectionType": "parquet"

指定与 Amazon S3 中以 Apache Parquet 文件格式存储的文件的连接。

"connectionType": "parquet" 可使用以下连接选项:

  • paths:(必需)要从中读取数据的 Amazon S3 路径的列表。

  • (其他选项名称/值对):任何其他选项(包括格式化选项)将直接传递给 SparkSQL DataSource。有关更多信息,请参阅 GitHub 网站上的 Spark 的 Amazon Redshift 数据源

"connectionType": "s3"

指定与 Amazon S3 的连接。

"connectionType": "s3" 可使用以下连接选项:

  • "paths":(必需)要从中读取数据的 Amazon S3 路径的列表。

  • "exclusions":(可选)包含要排除的 Unix 样式 glob 模式的 JSON 列表的字符串。例如,"[\"**.pdf\"]" 会排除所有 PDF 文件。有关 Amazon Glue 支持的 glob 语法的更多信息,请参阅包含和排除模式

  • "compressionType" 或 "compression":(可选)指定数据压缩方式。使用适用于 Amazon S3 源的 "compressionType" 以及适用于 Amazon S3 目标的 "compression"。通常,如果数据有标准文件扩展名,则不需要指定。可能的值为 "gzip""bzip"

  • "groupFiles":(可选)当输入包含超过 50,000 个文件时,默认启用文件分组。当少于 50,000 个文件时,若要启用分组,请将此参数设置为 "inPartition"。当超过 50,000 个文件时,若要禁用分组,请将此参数设置为 "none"

  • "groupSize":(可选)目标组大小(以字节为单位)。默认值根据输入数据大小和群集大小进行计算。当少于 50,000 个输入文件时,"groupFiles" 必须设置为 "inPartition",此选项才能生效。

  • "recurse":(可选)如果设置为 true,则以递归方式读取指定路径下的所有子目录中的文件。

  • "maxBand":(可选,高级)此选项控制 s3 列表可能保持一致的持续时间(以毫秒为单位)。当使用 JobBookmarks 来表明 Amazon S3 最终一致性时,将专门跟踪修改时间戳在最后 maxBand 毫秒内的文件。大多数用户不需要设置此选项。默认值为 900000 毫秒或 15 分钟。

  • "maxFilesInBand":(可选,高级)此选项指定在最后 maxBand 秒内可保存的最大文件数量。如果超过此值,额外的文件将会跳过,且只能在下一次作业运行中处理。大多数用户不需要设置此选项。

  • "isFailFast":(可选)此选项用于确定 Amazon Glue ETL 任务是否导致读取器解析异常。如果设置为 true,并且 Spark 任务的四次重试无法正确解析数据,则任务会快速失败。

JDBC connectionType 值

这些功能包括:

  • "connectionType": "sqlserver":指定与 Microsoft SQL Server 数据库的连接。

  • "connectionType": "mysql":指定与 MySQL 数据库的连接。

  • "connectionType": "oracle":指定与 Oracle 数据库的连接。

  • "connectionType": "postgresql":指定与 PostgreSQL 数据库的连接。

  • "connectionType": "redshift":指定与 Amazon Redshift 数据库的连接。

下表列出了 Amazon Glue 支持的 JDBC 驱动程序版本。

产品 Glue 0.9、1.0、2.0 的 JDBC 驱动程序版本 Glue 3.0 的 JDBC 驱动程序版本
Microsoft SQL Server 6.x 7.x
MySQL 5.1 8.0.23
Oracle 数据库 11.2 21.1
PostgreSQL 42.1.x 42.2.18
MongoDB 2.0.0 4.0.0
Amazon Redshift 4.1 4.1

将这些连接选项与 JDBC 连接结合使用:

  • "url":(必填)数据库的 JDBC URL。

  • "dbtable":要从中进行读取的数据库表。对于在数据库中支持架构的 JDBC 数据存储,指定 schema.table-name。如果未提供架构,则使用默认的“public”架构。

  • "redshiftTmpDir":(对于 Amazon Redshift 为必填项,对于其他 JDBC 类型为可选项)当从数据库中复制数据时,用于暂存临时数据的 Amazon S3 路径。

  • "user":(必需)在连接时使用的用户名。

  • "password":(必填)在连接时使用的密码。

  • (可选)以下选项允许您提供自定义 JDBC 驱动程序。如果必须使用 Amazon Glue 本身不支持的驱动程序,请使用这些选项。ETL 作业可以为数据源和目标使用不同的 JDBC 驱动程序版本,即使源和目标是相同的数据库产品也是如此。这允许您在不同版本的源数据库和目标数据库之间迁移数据。要使用这些选项,您必须首先将 JDBC 驱动程序的 jar 文件上传到 Amazon S3。

    • "customJdbcDriverS3Path":自定义 JDBC 驱动程序的 Amazon S3 路径。

    • "customJdbcDriverClassName":JDBC 驱动程序的类名。

  • "bulksize":(可选)用于配置并行插入以加速批量加载到 JDBC 目标。为写入或插入数据时要使用的并行度指定整数值。此选项有助于提高写入数据库(如 Arch User Repository (AUR))的性能。

  • "sampleQuery":(可选)用于采样的自定义 SQL 查询语句。默认情况下,查询示例由单个执行者执行。如果您读取的是大型数据集,则可能需要启用 JDBC 分区才能并行查询表。有关更多信息,请参阅 并行读取 JDBC 表。要将 sampleQuery 与 JDBC 分区一同使用,也可将 enablePartitioningForSampleQuery 设置为 true。

  • "enablePartitioningForSampleQuery":(可选)默认情况下,此选项为 false。如果想将 sampleQuery 与分区的 JDBC 表一同使用,则必须使用此设置。如果设置为 true,sampleQuery 必须以“where”或“and”结尾,以便于 Amazon Glue 追加分区条件。请参见以下示例。

  • "sampleSize":(可选)限制查询示例返回的行数。仅当 enablePartitioningForSampleQuery 为 true 时有用。如果未启用分区,则应直接在 sampleQuery 中添加“limit x”以限制大小。

    例 在不进行分区的情况下使用 samplQuery

    以下代码示例演示了如何在不进行分区的情况下使用 sampleQuery

    //A full sql query statement. val query = "select name from $tableName where age > 0 limit 1" val connectionOptions = JsonOptions(Map( "url" → url, "dbtable" → table, "user" → user, "password" → password, "basePath" → basePath, "sampleQuery" → query )) val dyf = glueContext.getSource("mysql", connectionOptions) .getDynamicFrame()

    例 将 sampleQuery 与 JDBC 分区一起使用

    以下代码示例演示了如何将 sampleQuery 与 JDBC 分区一起使用。

    //note that the query should end with "where" or "and" if use with JDBC partitioning. val query = "select name from $tableName where age > 0 and" //Enable JDBC partitioning by setting hashfield. //to use sampleQuery with partitioning, set enablePartitioningForSampleQuery. //use sampleSize to limit the size of returned data. val connectionOptions = JsonOptions(Map( "url" → url, "dbtable" → table, "user" → user, "password" → password, "basePath" → basePath, "hashfield" -> primaryKey, "sampleQuery" → query, "enablePartitioningForSampleQuery" -> true, "sampleSize" -> "1" )) val dyf = glueContext.getSource("mysql", connectionOptions) .getDynamicFrame()

如果是 Redshift 连接类型,用于 JDBC 连接的连接选项中包含的所有其他选项名称/值对(包括格式选项)将直接传递到底层 SparkSQL DataSource。有关更多信息,请参阅 Spark 的 Redshift 数据源

以下代码示例演示了如何使用自定义 JDBC 驱动程序读取和写入 JDBC 数据库。这些示例演示了如何从一个版本的数据库产品读取和写入同一产品的更高版本。

Python
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext, SparkConf from awsglue.context import GlueContext from awsglue.job import Job import time from pyspark.sql.types import StructType, StructField, IntegerType, StringType sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session # Construct JDBC connection options connection_mysql5_options = { "url": "jdbc:mysql://<jdbc-host-name>:3306/db", "dbtable": "test", "user": "admin", "password": "pwd"} connection_mysql8_options = { "url": "jdbc:mysql://<jdbc-host-name>:3306/db", "dbtable": "test", "user": "admin", "password": "pwd", "customJdbcDriverS3Path": "s3://path/mysql-connector-java-8.0.17.jar", "customJdbcDriverClassName": "com.mysql.cj.jdbc.Driver"} connection_oracle11_options = { "url": "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL", "dbtable": "test", "user": "admin", "password": "pwd"} connection_oracle18_options = { "url": "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL", "dbtable": "test", "user": "admin", "password": "pwd", "customJdbcDriverS3Path": "s3://path/ojdbc10.jar", "customJdbcDriverClassName": "oracle.jdbc.OracleDriver"} # Read from JDBC databases with custom driver df_mysql8 = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql8_options) # Read DynamicFrame from MySQL 5 and write to MySQL 8 df_mysql5 = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql5_options) glueContext.write_from_options(frame_or_dfc=df_mysql5, connection_type="mysql", connection_options=connection_mysql8_options) # Read DynamicFrame from Oracle 11 and write to Oracle 18 df_oracle11 = glueContext.create_dynamic_frame.from_options(connection_type="oracle", connection_options=connection_oracle11_options) glueContext.write_from_options(frame_or_dfc=df_oracle11, connection_type="oracle", connection_options=connection_oracle18_options)
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamicFrame import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { val MYSQL_5_URI: String = "jdbc:mysql://<jdbc-host-name>:3306/db" val MYSQL_8_URI: String = "jdbc:mysql://<jdbc-host-name>:3306/db" val ORACLE_11_URI: String = "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL" val ORACLE_18_URI: String = "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL" // Construct JDBC connection options lazy val mysql5JsonOption = jsonOptions(MYSQL_5_URI) lazy val mysql8JsonOption = customJDBCDriverJsonOptions(MYSQL_8_URI, "s3://path/mysql-connector-java-8.0.17.jar", "com.mysql.cj.jdbc.Driver") lazy val oracle11JsonOption = jsonOptions(ORACLE_11_URI) lazy val oracle18JsonOption = customJDBCDriverJsonOptions(ORACLE_18_URI, "s3://path/ojdbc10.jar", "oracle.jdbc.OracleDriver") def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Read from JDBC database with custom driver val df_mysql8: DynamicFrame = glueContext.getSource("mysql", mysql8JsonOption).getDynamicFrame() // Read DynamicFrame from MySQL 5 and write to MySQL 8 val df_mysql5: DynamicFrame = glueContext.getSource("mysql", mysql5JsonOption).getDynamicFrame() glueContext.getSink("mysql", mysql8JsonOption).writeDynamicFrame(df_mysql5) // Read DynamicFrame from Oracle 11 and write to Oracle 18 val df_oracle11: DynamicFrame = glueContext.getSource("oracle", oracle11JsonOption).getDynamicFrame() glueContext.getSink("oracle", oracle18JsonOption).writeDynamicFrame(df_oracle11) Job.commit() } private def jsonOptions(uri: String): JsonOptions = { new JsonOptions( s"""{"url": "${url}", |"dbtable":"test", |"user": "admin", |"password": "pwd"}""".stripMargin) } private def customJDBCDriverJsonOptions(uri: String, customJdbcDriverS3Path: String, customJdbcDriverClassName: String): JsonOptions = { new JsonOptions( s"""{"url": "${url}", |"dbtable":"test", |"user": "admin", |"password": "pwd", |"customJdbcDriverS3Path": "${customJdbcDriverS3Path}", |"customJdbcDriverClassName" : "${customJdbcDriverClassName}"}""".stripMargin) } }

自定义和 Amazon Web Services Marketplace connectionType 值

这些功能包括:

  • "connectionType": "marketplace.athena":指定与 Amazon Athena 数据存储的连接。连接使用来自 Amazon Web Services Marketplace 的连接器。

  • "connectionType": "marketplace.spark":指定与 Apache Spark 数据存储的连接。连接使用来自 Amazon Web Services Marketplace 的连接器。

  • "connectionType": "marketplace.jdbc":指定与 JDBC 数据存储的连接。连接使用来自 Amazon Web Services Marketplace 的连接器。

  • "connectionType": "custom.athena":指定与 Amazon Athena 数据存储的连接。连接使用您上传到 Amazon Glue Studio 的自定义连接器。

  • "connectionType": "custom.spark":指定与 Apache Spark 数据存储的连接。连接使用您上传到 Amazon Glue Studio 的自定义连接器。

  • "connectionType": "custom.jdbc":指定与 JDBC 数据存储的连接。连接使用您上传到 Amazon Glue Studio 的自定义连接器。

适用于类型 custom.jdbc 或 marketplace.jdbc 的连接选项

  • className – 字符串,必需,驱动程序类名称。

  • connectionName – 字符串,必需,与连接器关联的连接的名称。

  • url – 字符串,必需,用于建立与数据源的连接且带占位符(${})的 JDBC URL。占位符 ${secretKey} 替换为 Amazon Secrets Manager 中同名的密钥。有关构建 URL 的详细信息,请参阅数据存储文档。

  • secretIduser/password – 字符串,必需,用于检索 URL 的凭证。

  • dbTablequery – 字符串,必需,从中获取数据的表或 SQL 查询。您可以指定 dbTablequery,但不能同时指定两者。

  • partitionColumn – 字符串,可选,用于分区的整数列的名称。此选项仅在包含 lowerBoundupperBoundnumPartitions 时有效。此选项的工作方式与 Spark SQL JDBC 阅读器中的工作方式相同。有关更多信息,请参阅《Apache Spark SQL、DataFrame 和 Dataset 指南》中的 JDBC 转换到其他数据库

    lowerBoundupperBound 值用于确定分区步长,而不是用于筛选表中的行。对表中的所有行进行分区并返回。

    注意

    使用查询(而不是表名称)时,您应验证查询是否适用于指定的分区条件。例如:

    • 如果您的查询格式为 "SELECT col1 FROM table1",则在使用分区列的查询结尾附加 WHERE 子句,以测试查询。

    • 如果您的查询格式为 SELECT col1 FROM table1 WHERE col2=val",则通过 AND 和使用分区列的表达式扩展 WHERE 子句,以测试查询。

  • lowerBound – 整数,可选,用于确定分区步长的最小 partitionColumn 值。

  • upperBound – 整数,可选,用于确定分区步长的最大 partitionColumn 值。

  • numPartitions – 整数,可选,分区数。此值以及 lowerBound(包含)和 upperBound(排除)为用于拆分 partitionColumn 而生成的 WHERE 子句表达式构成分区步长。

    重要

    请注意分区的数量,因为分区过多可能会导致外部数据库系统出现问题。

  • filterPredicate – 字符串,可选,用于筛选源数据的额外条件子句。例如:

    BillingCity='Mountain View'

    使用查询(而不是名称)时,您应验证查询是否适用于指定的 filterPredicate。例如:

    • 如果您的查询格式为 "SELECT col1 FROM table1",则在使用筛选条件谓词的查询结尾附加 WHERE 子句,以测试查询。

    • 如果您的查询格式为 "SELECT col1 FROM table1 WHERE col2=val",则通过 AND 和使用筛选条件谓词的表达式扩展 WHERE 子句,以测试查询。

  • dataTypeMapping – 目录,可选,用于构建从 JDBC 数据类型到 Glue 数据类型的映射的自定义数据类型映射。例如,选项 "dataTypeMapping":{"FLOAT":"STRING"} 会调用驱动程序的 ResultSet.getString() 方法,将 JDBC 类型 FLOAT 的数据字段映射到 Java String 类型,方法是,并将其用于构建胶水记录。ResultSet 对象由每个驱动程序实现,因此行为特定于您使用的驱动程序。请参阅 JDBC 驱动程序的文档,了解驱动程序执行转换的方式。

  • 目前受支持的 Amazon Glue 数据类型包括:

    • DATE

    • STRING

    • TIMESTAMP

    • INT

    • FLOAT

    • LONG

    • BIGDECIMAL

    • BYTE

    • SHORT

    • DOUBLE

    支持的 JDBC 数据类型为 Java8 java.sql.types

    默认数据类型映射(从 JDBC 到 Amazon Glue)如下:

    • DATE -> DATE

    • VARCHAR -> STRING

    • CHAR -> STRING

    • LONGNVARCHAR -> STRING

    • TIMESTAMP -> TIMESTAMP

    • INTEGER -> INT

    • FLOAT -> FLOAT

    • REAL -> FLOAT

    • BIT -> BOOLEAN

    • BOOLEAN -> BOOLEAN

    • BIGINT -> LONG

    • DECIMAL -> BIGDECIMAL

    • NUMERIC -> BIGDECIMAL

    • TINYINT -> SHORT

    • SMALLINT -> SHORT

    • DOUBLE -> DOUBLE

    如果将自定义数据类型映射与选项 dataTypeMapping 结合使用,则可以覆盖默认数据类型映射。只有 dataTypeMapping 选项中列出的 JDBC 数据类型会受到影响;默认映射适用于所有其他 JDBC 数据类型。如果需要,您可以为其他 JDBC 数据类型添加映射。如果默认映射或自定义映射中均未包含 JDBC 数据类型,则数据类型默认转换为 Amazon Glue STRING 数据类型。

以下 Python 代码示例演示了如何使用 Amazon Web Services Marketplace JDBC 驱动程序从 JDBC 数据库读取数据。它演示了如何从数据库读取数据并将数据写入 S3 位置。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

适用于类型 custom.athena 或 marketplace.athena 的连接选项

  • className – 字符串,必需,驱动程序类名称。当您使用 Athena-CloudWatch 连接器时,此参数值是类名称(例如 "com.amazonaws.athena.connectors")的前缀。Athena-CloudWatch 连接器由两个类组成:元数据处理程序和记录处理程序。如果您在此处提供通用前缀,则 API 会根据该前缀加载正确的类。

  • tableName – 字符串,必需,要读取的 CloudWatch 日志流的名称。此代码段使用特别视图名称 all_log_streams,这意味着返回的动态数据框将包含日志组中所有日志流的数据。

  • schemaName – 字符串,必需,要从中读取数据的 CloudWatch 日志流的名称。例如:/aws-glue/jobs/output

  • connectionName – 字符串,必需,与连接器关联的连接的名称。

有关此连接器的其他选项,请参阅 GitHub 上的 Amazon Athena CloudWatch 连接器自述文件。

以下 Python 代码示例演示了如何从使用 Amazon Web Services Marketplace 连接器的 Athena 数据存储读取数据。它演示了如何从 Athena 读取数据并将数据写入 S3 位置。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

适用于类型 custom.spark 或 marketplace.spark 的连接选项

  • className – 字符串,必需,连接器类名称。

  • secretId – 字符串,可选,用于检索连接器连接的凭证。

  • connectionName – 字符串,必需,与连接器关联的连接的名称。

  • 其他选项取决于数据存储。例如,OpenSearch 配置选项以前缀 es 开头,正如适用于 Apache Hadoop 的 Elasticsearch 文档中所述。Spark 与 Snowflake 的连接使用 sfUsersfPassword 等连接,正如《连接 Snowflake》指南中的使用 Spark 连接器所述。

以下 Python 代码示例演示了如何从使用 marketplace.spark 连接的 OpenSearch 数据存储读取数据。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://<Amazon endpoint>", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<Amazon endpoint>","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()