Connection Types and Options for ETL in AWS Glue - AWS Glue
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

如果我们为英文版本指南提供翻译,那么如果存在任何冲突,将以英文版本指南为准。在提供翻译时使用机器翻译。

Connection Types and Options for ETL in AWS Glue

在 AWS Glue 中,各种 PysPark 和 Scala 方法和转换使用 connectionType 参数指定连接类型。它们使用 connectionOptionsoptions 参数指定连接选项。

connectionType 参数可以采用下表中显示的值。下面几个部分介绍每种类型所关联的 connectionOptions(或 options)参数值。除非另有说明,否则这些参数会在连接用作源或接收器时使用。

有关展示如何设置和使用连接选项的代码示例,请参阅Examples: Setting Connection Types and Options

connectionType 连接到
documentdb/ Amazon DocumentDB(与 MongoDB 兼容) 数据库
dynamodb. Amazon DynamoDB 数据库
mongodb MongoDB 数据库
mysql MySQL 数据库(请参阅JDBC connectionType Values
oracle Oracle 数据库(请参阅JDBC connectionType Values
orc Apache Hive 优化行列式 (ORC) 文件格式存储在 Amazon Simple Storage Service (Amazon S3) 中的文件
parquet Apache Parquet 文件格式存储在 Amazon S3 中的文件
postgresql PostgreSQL 数据库(请参阅JDBC connectionType Values
redshift/ Amazon Redshift 数据库(请参阅JDBC connectionType Values
s3 Amazon S3
sqlserver Microsoft SQL Server 数据库(请参阅JDBC connectionType Values

"connectionType": "documentdb"

指定与 Amazon DocumentDB(与 MongoDB 兼容) 的连接。

源连接和接收器连接的连接选项不同。

"connectionType": "documentdb" as Source

"connectionType": "documentdb" 用作源时可使用以下连接选项:

  • "uri": (Required) The Amazon DocumentDB host to read from, formatted as mongodb://<host>:<port>.

  • "database": (Required) The Amazon DocumentDB database to read from.

  • "collection": (Required) The Amazon DocumentDB collection to read from.

  • "username": (Required) The Amazon DocumentDB user name.

  • "password": (Required) The Amazon DocumentDB password.

  • "ssl": (Required if using SSL) If your connection uses SSL, then you must include this option with the value "true".

  • "ssl.domain_match": (Required if using SSL) If your connection uses SSL, then you must include this option with the value "false".

  • "batchSize": (Optional): The number of documents to return per batch, used within the cursor of internal batches.

  • "partitioner": (Optional): The class name of the partitioner for reading input data from Amazon DocumentDB. The connector provides the following partitioners:

    • MongoDefaultPartitioner (default)

    • MongoSamplePartitioner

    • MongoShardedPartitioner

    • MongoSplitVectorPartitioner

    • MongoPaginateByCountPartitioner

    • MongoPaginateBySizePartitioner

  • "partitionerOptions" (Optional): Options for the designated partitioner. The following options are supported for each partitioner:

    • MongoSamplePartitioner: partitionKey, partitionSizeMB, samplesPerPartition

    • MongoShardedPartitioner: shardkey

    • MongoSplitVectorPartitioner: partitionKey, partitionSizeMB

    • MongoPaginateByCountPartitioner: partitionKey, numberOfPartitions

    • MongoPaginateBySizePartitioner: partitionKey, partitionSizeMB

    For more information about these options, see Partitioner Configuration in the MongoDB documentation. For sample code, see Examples: Setting Connection Types and Options.

"connectionType": "documentdb" as Sink

"connectionType": "documentdb" 用作连接器时可使用以下连接选项:

  • "uri": (Required) The Amazon DocumentDB host to write to, formatted as mongodb://<host>:<port>.

  • "database": (Required) The Amazon DocumentDB database to write to.

  • "collection": (Required) The Amazon DocumentDB collection to write to.

  • "username": (Required) The Amazon DocumentDB user name.

  • "password": (Required) The Amazon DocumentDB password.

  • "extendedBsonTypes": (Optional) If true, enables extended BSON types when writing data to Amazon DocumentDB. The default is true.

  • "replaceDocument": (Optional) If true, replaces the whole document when saving datasets that contain an _id field. If false, only fields in the document that match the fields in the dataset are updated. The default is true.

  • "maxBatchSize": (Optional): The maximum batch size for bulk operations when saving data. The default is 512.

有关示例代码,请参阅 Examples: Setting Connection Types and Options

"connectionType": "dynamodb"

指定与 Amazon DynamoDB 的连接。

源连接和接收器连接的连接选项不同。

"connectionType": "dynamodb" as Source

"connectionType": "dynamodb" 用作源时可使用以下连接选项:

  • "dynamodb.input.tableName": (Required) The DynamoDB table to read from.

  • "dynamodb.throughput.read.percent": (Optional) The percentage of read capacity units (RCU) to use. The default is set to "0.5". Acceptable values are from "0.1" to "1.5", inclusive.

    • 0.5 represents the default read rate, meaning that AWS Glue will attempt to consume half of the read capacity of the table. If you increase the value above 0.5, AWS Glue increases the request rate; decreasing the value below 0.5 decreases the read request rate. (The actual read rate will vary, depending on factors such as whether there is a uniform key distribution in the DynamoDB table.)

    • When the DynamoDB table is in on-demand mode, AWS Glue handles the read capacity of the table as 40000. For exporting a large table we recommend switching your DynamoDB table to on-demand mode.

  • "dynamodb.splits": (Optional) Defines how many splits we partition this DynamoDB table into while reading. The default is set to "1". Acceptable values are from "1" to "1,000,000", inclusive.

    • 1 represents there is no parallelism. We highly recommend to specify a larger value for better performance by using the below formula.

    • We recommend you to calculate numSlots using the following formula, and use it as dynamodb.splits. If you need more performance, we recommend you to scale out your job by increasing the number of DPUs.

      • numExecutors =

        • (DPU - 1) * 2 - 1 if WorkerType is Standard

        • (NumberOfWorkers - 1) if WorkerType is G.1X or G.2X

      • numSlotsPerExecutor =

        • 4 if WorkerType is Standard

        • 8 if WorkerType is G.1X

        • 16 if WorkerType is G.2X

      • numSlots = numSlotsPerExecutor * numExecutors

注意

AWS Glue 支持从另一个AWS帐户读取数据 DynamoDB 表。有关更多信息,请参阅 Cross-Account Cross-Region Access to DynamoDB Tables。)

注意

DynamoDB 读取器不支持筛选条件或按下谓词。

"connectionType": "dynamodb" as Sink

"connectionType": "dynamodb" 用作连接器时可使用以下连接选项:

  • "dynamodb.output.tableName": (Required) The DynamoDB table to write to.

  • "dynamodb.throughput.write.percent": (Optional) The percentage of write capacity units (WCU) to use. The default is set to "0.5". Acceptable values are from "0.1" to "1.5", inclusive.

    • 0.5 represents the default write rate, meaning that AWS Glue will attempt to consume half of the write capacity of the table. If you increase the value above 0.5, AWS Glue increases the request rate; decreasing the value below 0.5 decreases the write request rate. (The actual write rate will vary, depending on factors such as whether there is a uniform key distribution in the DynamoDB table).

    • When the DynamoDB table is in on-demand mode, AWS Glue handles the write capacity of the table as 40000. For importing a large table, we recommend switching your DynamoDB table to on-demand mode.

  • "dynamodb.output.numParallelTasks": (Optional) Defines how many parallel tasks write into DynamoDB at the same time. Used to calculate permissive WCU per Spark task. If you do not want to control these details, you do not need to specify this parameter.

    • permissiveWcuPerTask = TableWCU * dynamodb.throughput.write.percent / dynamodb.output.numParallelTasks

    • If you do not specify this parameter, the permissive WCU per Spark task will be automatically calculated by the following formula:

      • numPartitions = dynamicframe.getNumPartitions()

      • numExecutors =

        • (DPU - 1) * 2 - 1 if WorkerType is Standard

        • (NumberOfWorkers - 1) if WorkerType is G.1X or G.2X

      • numSlotsPerExecutor =

        • 4 if WorkerType is Standard

        • 8 if WorkerType is G.1X

        • 16 if WorkerType is G.2X

      • numSlots = numSlotsPerExecutor * numExecutors

      • numParallelTasks = min(numPartitions, numSlots)

    • Example 1. DPU=10, WorkerType=Standard. Input DynamicFrame has 100 RDD partitions.

      • numPartitions = 100

      • numExecutors = (10 - 1) * 2 - 1 = 17

      • numSlots = 4 * 17 = 68

      • numParallelTasks = min(100, 68) = 68

    • Example 2. DPU=10, WorkerType=Standard. Input DynamicFrame has 20 RDD partitions.

      • numPartitions = 20

      • numExecutors = (10 - 1) * 2 - 1 = 17

      • numSlots = 4 * 17 = 68

      • numParallelTasks = min(20, 68) = 20

  • "dynamodb.output.retry": (Optional) Defines how many retries we perform when there is a ProvisionedThroughputExceededException from DynamoDB. The default is set to "10".

注意

TheThethe DynamoDB 在胶粘版本1.0或更高版本中支持Writer。

注意

AWS Glue 支持将数据写入另一个AWS帐户 DynamoDB 表。有关更多信息,请参阅 Cross-Account Cross-Region Access to DynamoDB Tables。)

以下代码示例显示如何读取和写入 DynamoDB 表。他们展示了一个桌子的读数,并写入另一个桌子。

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.input.tableName": "test_source", "dynamodb.throughput.read.percent": "1.0", "dynamodb.splits": "100" } ) print(dyf.getNumPartitions()) glue_context.write_dynamic_frame_from_options( frame=dyf, connection_type="dynamodb", connection_options={ "dynamodb.output.tableName": "test_sink", "dynamodb.throughput.write.percent": "1.0" } ) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.input.tableName" -> "test_source", "dynamodb.throughput.read.percent" -> "1.0", "dynamodb.splits" -> "100" )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) val dynamoDbSink: DynamoDbDataSink = glueContext.getSinkWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.output.tableName" -> "test_sink", "dynamodb.throughput.write.percent" -> "1.0" )) ).asInstanceOf[DynamoDbDataSink] dynamoDbSink.writeDynamicFrame(dynamicFrame) Job.commit() } }

"connectionType": "mongodb"

指定与 MongoDB 的连接。源连接和接收器连接的连接选项不同。

"connectionType": "mongodb" as Source

"connectionType": "mongodb" 用作源时可使用以下连接选项:

  • "uri": (Required) The MongoDB host to read from, formatted as mongodb://<host>:<port>.

  • "database": (Required) The MongoDB database to read from.

  • "collection": (Required) The MongoDB collection to read from.

  • "username": (Required) The MongoDB user name.

  • "password": (Required) The MongoDB password.

  • "ssl": (Optional) If true, initiates an SSL connection. The default is false.

  • "ssl.domain_match": (Optional) If true and ssl is true, domain match check is performed. The default is true.

  • "batchSize": (Optional): The number of documents to return per batch, used within the cursor of internal batches.

  • "partitioner": (Optional): The class name of the partitioner for reading input data from MongoDB. The connector provides the following partitioners:

    • MongoDefaultPartitioner (default)

    • MongoSamplePartitioner (Requires MongoDB 3.2 or later)

    • MongoShardedPartitioner

    • MongoSplitVectorPartitioner

    • MongoPaginateByCountPartitioner

    • MongoPaginateBySizePartitioner

  • "partitionerOptions" (Optional): Options for the designated partitioner. The following options are supported for each partitioner:

    • MongoSamplePartitioner: partitionKey, partitionSizeMB, samplesPerPartition

    • MongoShardedPartitioner: shardkey

    • MongoSplitVectorPartitioner: partitionKey, partitionSizeMB

    • MongoPaginateByCountPartitioner: partitionKey, numberOfPartitions

    • MongoPaginateBySizePartitioner: partitionKey, partitionSizeMB

    For more information about these options, see Partitioner Configuration in the MongoDB documentation. For sample code, see Examples: Setting Connection Types and Options.

"connectionType": "mongodb" as Sink

"connectionType": "mongodb" 用作连接器时可使用以下连接选项:

  • "uri": (Required) The MongoDB host to write to, formatted as mongodb://<host>:<port>.

  • "database": (Required) The MongoDB database to write to.

  • "collection": (Required) The MongoDB collection to write to.

  • "username": (Required) The MongoDB user name.

  • "password": (Required) The MongoDB password.

  • "ssl": (Optional) If true, initiates an SSL connection. The default is false.

  • "ssl.domain_match": (Optional) If true and ssl is true, domain match check is performed. The default is true.

  • "extendedBsonTypes": (Optional) If true, enables extended BSON types when writing data to MongoDB. The default is true.

  • "replaceDocument": (Optional) If true, replaces the whole document when saving datasets that contain an _id field. If false, only fields in the document that match the fields in the dataset are updated. The default is true.

  • "maxBatchSize": (Optional): The maximum batch size for bulk operations when saving data. The default is 512.

有关示例代码,请参阅 Examples: Setting Connection Types and Options

"connectionType": "orc"

指定与 Amazon S3 中以 Apache Hive 优化的行列式 (ORC) 文件格式存储的文件的连接。

"connectionType": "orc" 可使用以下连接选项:

  • paths: (Required) A list of the Amazon S3 paths to read from.

  • (Other option name/value pairs): Any additional options, including formatting options, are passed directly to the SparkSQL DataSource. For more information, see Redshift data source for Spark.

"connectionType": "parquet"

指定与 Amazon S3 中存储的文件的连接,这些文件采用 Apache Parquet 文件格式。

"connectionType": "parquet" 可使用以下连接选项:

  • paths: (Required) A list of the Amazon S3 paths to read from.

  • (Other option name/value pairs): Any additional options, including formatting options, are passed directly to the SparkSQL DataSource. For more information, see Amazon Redshift data source for Spark on the GitHub website.

"connectionType": "s3"

指定与 Amazon S3 的连接。

"connectionType": "s3" 可使用以下连接选项:

  • "paths": (Required) A list of the Amazon S3 paths to read from.

  • "exclusions": (Optional) A string containing a JSON list of Unix-style glob patterns to exclude. For example, "[\"**.pdf\"]" excludes all PDF files. For more information about the glob syntax that AWS Glue supports, see Include and Exclude Patterns.

  • "compressionType": or "compression": (Optional) Specifies how the data is compressed. Use "compressionType" for Amazon S3 sources and "compression" for Amazon S3 targets. This is generally not necessary if the data has a standard file extension. Possible values are "gzip" and "bzip").

  • "groupFiles": (Optional) Grouping files is enabled by default when the input contains more than 50,000 files. To enable grouping with fewer than 50,000 files, set this parameter to "inPartition". To disable grouping when there are more than 50,000 files, set this parameter to "none".

  • "groupSize": (Optional) The target group size in bytes. The default is computed based on the input data size and the size of your cluster. When there are fewer than 50,000 input files, "groupFiles" must be set to "inPartition" for this to take effect.

  • "recurse": (Optional) If set to true, recursively reads files in all subdirectories under the specified paths.

  • "maxBand": (Optional, advanced) This option controls the duration in seconds after which the s3 listing is likely to be consistent. Files with modification timestamps falling within the last maxBand seconds are tracked specially when using JobBookmarks to account for Amazon S3 eventual consistency. Most users don't need to set this option. The default is 900 seconds.

  • "maxFilesInBand": (Optional, advanced) This option specifies the maximum number of files to save from the last maxBand seconds. If this number is exceeded, extra files are skipped and only processed in the next job run. Most users don't need to set this option.

JDBC connectionType Values

这些功能包括:

  • "connectionType": "sqlserver": Designates a connection to a Microsoft SQL Server database.

  • "connectionType": "mysql": Designates a connection to a MySQL database.

  • "connectionType": "oracle": Designates a connection to an Oracle database.

  • "connectionType": "postgresql": Designates a connection to a PostgreSQL database.

  • "connectionType": "redshift": Designates a connection to an Amazon Redshift database.

下表列出了 AWS Glue 支持的 JDBC 驱动程序版本。

产品 JDBC 驱动程序版本
Microsoft SQL Server 6.x
MySQL 5.1
Oracle Database 11.2
PostgreSQL 42.x
Amazon Redshift 4.1

将这些连接选项与 JDBC 连接结合使用:

  • "url": (Required) The JDBC URL for the database.

  • "dbtable": The database table to read from. For JDBC data stores that support schemas within a database, specify schema.table-name. If a schema is not provided, then the default "public" schema is used.

  • "redshiftTmpDir": (Required for Amazon Redshift, optional for other JDBC types) The Amazon S3 path where temporary data can be staged when copying out of the database.

  • "user": (Required) The username to use when connecting.

  • "password": (Required) The password to use when connecting.

  • (Optional) The following options enable you to supply a custom JDBC driver. Use these options if you must use a driver that AWS Glue does not natively support. ETL jobs can use different JDBC driver versions for the data source and target, even if the source and target are the same database product. This enables you to migrate data between source and target databases with different versions. To use these options, you must first upload the jar file of the JDBC driver to Amazon S3.

    • "customJdbcDriverS3Path": Amazon S3 path of the custom JDBC driver.

    • "customJdbcDriverClassName": Class name of JDBC driver.

用于 JDBC 连接的连接选项中包含的所有其他选项名称/值对(包括格式化选项)将直接传递到底层 SparkSQL DataSource。有关更多信息,请参阅 Spark 的 Redshift 数据源

以下代码示例演示了如何使用自定义 JDBC 驱动程序读取和写入 JDBC 数据库。这些示例演示了如何从一个版本的数据库产品读取和写入同一产品的更高版本。

Python
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext, SparkConf from awsglue.context import GlueContext from awsglue.job import Job import time from pyspark.sql.types import StructType, StructField, IntegerType, StringType sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session # Construct JDBC connection options connection_mysql5_options = { "url": "jdbc:mysql://<jdbc-host-name>:3306/db", "dbtable": "test", "user": "admin", "password": "pwd"} connection_mysql8_options = { "url": "jdbc:mysql://<jdbc-host-name>:3306/db", "dbtable": "test", "user": "admin", "password": "pwd", "customJdbcDriverS3Path": "s3://path/mysql-connector-java-8.0.17.jar", "customJdbcDriverClassName": "com.mysql.cj.jdbc.Driver"} connection_oracle11_options = { "url": "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL", "dbtable": "test", "user": "admin", "password": "pwd"} connection_oracle18_options = { "url": "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL", "dbtable": "test", "user": "admin", "password": "pwd", "customJdbcDriverS3Path": "s3://path/ojdbc10.jar", "customJdbcDriverClassName": "oracle.jdbc.OracleDriver"} # Read from JDBC databases with custom driver df_mysql8 = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql8_options) # Read DynamicFrame from MySQL 5 and write to MySQL 8 df_mysql5 = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql5_options) glueContext.write_from_options(frame_or_dfc=df_mysql5, connection_type="mysql", connection_options=connection_mysql8_options) # Read DynamicFrame from Oracle 11 and write to Oracle 18 df_oracle11 = glueContext.create_dynamic_frame.from_options(connection_type="oracle", connection_options=connection_oracle11_options) glueContext.write_from_options(frame_or_dfc=df_oracle11, connection_type="oracle", connection_options=connection_oracle18_options)
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamicFrame import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { val MYSQL_5_URI: String = "jdbc:mysql://<jdbc-host-name>:3306/db" val MYSQL_8_URI: String = "jdbc:mysql://<jdbc-host-name>:3306/db" val ORACLE_11_URI: String = "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL" val ORACLE_18_URI: String = "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL" // Construct JDBC connection options lazy val mysql5JsonOption = jsonOptions(MYSQL_5_URI) lazy val mysql8JsonOption = customJDBCDriverJsonOptions(MYSQL_8_URI, "s3://path/mysql-connector-java-8.0.17.jar", "com.mysql.cj.jdbc.Driver") lazy val oracle11JsonOption = jsonOptions(ORACLE_11_URI) lazy val oracle18JsonOption = customJDBCDriverJsonOptions(ORACLE_18_URI, "s3://path/ojdbc10.jar", "oracle.jdbc.OracleDriver") def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Read from JDBC database with custom driver val df_mysql8: DynamicFrame = glueContext.getSource("mysql", mysql8JsonOption).getDynamicFrame() // Read DynamicFrame from MySQL 5 and write to MySQL 8 val df_mysql5: DynamicFrame = glueContext.getSource("mysql", mysql5JsonOption).getDynamicFrame() glueContext.getSink("mysql", mysql8JsonOption).writeDynamicFrame(df_mysql5) // Read DynamicFrame from Oracle 11 and write to Oracle 18 val df_oracle11: DynamicFrame = glueContext.getSource("oracle", oracle11JsonOption).getDynamicFrame() glueContext.getSink("oracle", oracle18JsonOption).writeDynamicFrame(df_oracle11) Job.commit() } private def jsonOptions(uri: String): JsonOptions = { new JsonOptions( s"""{"url": "${url}", |"dbtable":"test", |"user": "admin", |"password": "pwd"}""".stripMargin) } private def customJDBCDriverJsonOptions(uri: String, customJdbcDriverS3Path: String, customJdbcDriverClassName: String): JsonOptions = { new JsonOptions( s"""{"url": "${url}", |"dbtable":"test", |"user": "admin", |"password": "pwd", |"customJdbcDriverS3Path": "${customJdbcDriverS3Path}", |"customJdbcDriverClassName" : "${customJdbcDriverClassName}"}""".stripMargin) } }