Connection types and options for ETL in Amazon Glue for Spark - Amazon Glue
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

Connection types and options for ETL in Amazon Glue for Spark

In Amazon Glue for Spark, various PySpark and Scala methods and transforms specify the connection type using a connectionType parameter. They specify connection options using a connectionOptions or options parameter.

The connectionType parameter can take the values shown in the following table. The associated connectionOptions (or options) parameter values for each type are documented in the following sections. Except where otherwise noted, the parameters apply when the connection is used as a source or sink.

For sample code that demonstrates setting and using connection options, see the homepage for each connection type.

connectionType Connects to
dynamodb Amazon DynamoDB database
kinesis Amazon Kinesis Data Streams
s3 Amazon S3
documentdb Amazon DocumentDB (with MongoDB compatibility) database
opensearch Amazon OpenSearch Service.
redshift Amazon Redshift database
kafka Kafka or Amazon Managed Streaming for Apache Kafka
azurecosmos Azure Cosmos for NoSQL.
azuresql Azure SQL.
bigquery Google BigQuery.
mongodb MongoDB database, including MongoDB Atlas.
sqlserver Microsoft SQL Server database (see JDBC connections)
mysql MySQL database (see JDBC connections)
oracle Oracle database (see JDBC connections)
postgresql PostgreSQL database (see JDBC connections)
saphana SAP HANA.
snowflake Snowflake data lake
teradata Teradata Vantage.
vertica Vertica.
custom.* Spark, Athena, or JDBC data stores (see Custom and Amazon Web Services Marketplace connectionType values
marketplace.* Spark, Athena, or JDBC data stores (see Custom and Amazon Web Services Marketplace connectionType values)

DataFrame options for ETL in Amazon Glue 5.0 for Spark

A DataFrame is a Dataset organized into named columns similar to a table and supports functional-style (map/reduce/filter/etc.) operations and SQL operations (select, project, aggregate).

To create a DataFrame for a data source supported by Glue, the following are required:

  • data source connector ClassName

  • data source connection Options

Similarly, to write a DataFrame to a data sink supported by Glue, the same are required:

  • data sink connector ClassName

  • data sink connection Options

Note that Amazon Glue features such as job bookmarks and DynamicFrame options such as connectionName are not supported in DataFrame. For more details about DataFrame and the supported operations, see the Spark documentation for DataFrame.

Specifying the connector ClassName

To specify the ClassName of a data source/sink, use the .format option to provide the corresponding connector ClassName that defines the data source/sink.

JDBC connectors

For JDBC connectors, specify jdbc as the value of the .format option and provide the JDBC driver ClassName in the driver option.

df = spark.read.format("jdbc").option("driver", "<DATA SOURCE JDBC DRIVER CLASSNAME>")... df.write.format("jdbc").option("driver", "<DATA SINK JDBC DRIVER CLASSNAME>")...

The following table lists the JDBC driver ClassName of the supported data source in Amazon Glue for DataFrames.

Data source Driver ClassName
PostgreSQL org.postgresql.Driver
Oracle oracle.jdbc.driver.OracleDriver
SQLServer com.microsoft.sqlserver.jdbc.SQLServerDriver
MySQL com.mysql.jdbc.Driver
SAPHana com.sap.db.jdbc.Driver
Teradata com.teradata.jdbc.TeraDriver
Spark connectors

For spark connectors, specify the ClassName of the connector as the value of the .format option.

df = spark.read.format("<DATA SOURCE CONNECTOR CLASSNAME>")... df.write.format("<DATA SINK CONNECTOR CLASSNAME>")...

The following table lists the Spark connector ClassName of the supported data source in Amazon Glue for DataFrames.

Data source ClassName
MongoDB/DocumentDB glue.spark.mongodb
Redshift io.github.spark_redshift_community.spark.redshift
AzureCosmos cosmos.oltp
AzureSQL com.microsoft.sqlserver.jdbc.spark
BigQuery com.google.cloud.spark.bigquery
OpenSearch org.opensearch.spark.sql
Snowflake net.snowflake.spark.snowflake
Vertica com.vertica.spark.datasource.VerticaSource

Specifying the connection Options

To specify the Options of the connection to a data source/sink, use the .option(<KEY>, <VALUE>) to provide individual options or .options(<MAP>) to provide multiple options as a key-value map.

Each data source/sink supports its own set of connection Options. For details on the available Options, refer to the public documentation of the specific data source/sink Spark connector listed in the following table.

Examples

The following examples read from PostgreSQL and write into SnowFlake:

Python

Example:

from awsglue.context import GlueContext from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() dataSourceClassName = "jdbc" dataSourceOptions = { "driver": "org.postgresql.Driver", "url": "<url>", "user": "<user>", "password": "<password>", "dbtable": "<dbtable>", } dataframe = spark.read.format(className).options(**options).load() dataSinkClassName = "net.snowflake.spark.snowflake" dataSinkOptions = { "sfUrl": "<url>", "sfUsername": "<username>", "sfPassword": "<password>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>" } dataframe.write.format(dataSinkClassName).options(**dataSinkOptions).save()
Scala

Example:

import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().getOrCreate() val dataSourceClassName = "jdbc" val dataSourceOptions = Map( "driver" -> "org.postgresql.Driver", "url" -> "<url>", "user" -> "<user>", "password" -> "<password>", "dbtable" -> "<dbtable>" ) val dataframe = spark.read.format(dataSourceClassName).options(dataSourceOptions).load() val dataSinkClassName = "net.snowflake.spark.snowflake" val dataSinkOptions = Map( "sfUrl" -> "<url>", "sfUsername" -> "<username>", "sfPassword" -> "<password>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>" ) dataframe.write.format(dataSinkClassName).options(dataSinkOptions).save()

Custom and Amazon Web Services Marketplace connectionType values

These include the following:

  • "connectionType": "marketplace.athena": Designates a connection to an Amazon Athena data store. The connection uses a connector from Amazon Web Services Marketplace.

  • "connectionType": "marketplace.spark": Designates a connection to an Apache Spark data store. The connection uses a connector from Amazon Web Services Marketplace.

  • "connectionType": "marketplace.jdbc": Designates a connection to a JDBC data store. The connection uses a connector from Amazon Web Services Marketplace.

  • "connectionType": "custom.athena": Designates a connection to an Amazon Athena data store. The connection uses a custom connector that you upload to Amazon Glue Studio.

  • "connectionType": "custom.spark": Designates a connection to an Apache Spark data store. The connection uses a custom connector that you upload to Amazon Glue Studio.

  • "connectionType": "custom.jdbc": Designates a connection to a JDBC data store. The connection uses a custom connector that you upload to Amazon Glue Studio.

Connection options for type custom.jdbc or marketplace.jdbc

  • className – String, required, driver class name.

  • connectionName – String, required, name of the connection that is associated with the connector.

  • url – String, required, JDBC URL with placeholders (${}) which are used to build the connection to the data source. The placeholder ${secretKey} is replaced with the secret of the same name in Amazon Secrets Manager. Refer to the data store documentation for more information about constructing the URL.

  • secretId or user/password – String, required, used to retrieve credentials for the URL.

  • dbTable or query – String, required, the table or SQL query to get the data from. You can specify either dbTable or query, but not both.

  • partitionColumn – String, optional, the name of an integer column that is used for partitioning. This option works only when it's included with lowerBound, upperBound, and numPartitions. This option works the same way as in the Spark SQL JDBC reader. For more information, see JDBC To Other Databases in the Apache Spark SQL, DataFrames and Datasets Guide.

    The lowerBound and upperBound values are used to decide the partition stride, not for filtering the rows in table. All rows in the table are partitioned and returned.

    Note

    When using a query instead of a table name, you should validate that the query works with the specified partitioning condition. For example:

    • If your query format is "SELECT col1 FROM table1", then test the query by appending a WHERE clause at the end of the query that uses the partition column.

    • If your query format is "SELECT col1 FROM table1 WHERE col2=val", then test the query by extending the WHERE clause with AND and an expression that uses the partition column.

  • lowerBound – Integer, optional, the minimum value of partitionColumn that is used to decide partition stride.

  • upperBound – Integer, optional, the maximum value of partitionColumn that is used to decide partition stride.

  • numPartitions – Integer, optional, the number of partitions. This value, along with lowerBound (inclusive) and upperBound (exclusive), form partition strides for generated WHERE clause expressions that are used to split the partitionColumn.

    Important

    Be careful with the number of partitions because too many partitions might cause problems on your external database systems.

  • filterPredicate – String, optional, extra condition clause to filter data from source. For example:

    BillingCity='Mountain View'

    When using a query instead of a table name, you should validate that the query works with the specified filterPredicate. For example:

    • If your query format is "SELECT col1 FROM table1", then test the query by appending a WHERE clause at the end of the query that uses the filter predicate.

    • If your query format is "SELECT col1 FROM table1 WHERE col2=val", then test the query by extending the WHERE clause with AND and an expression that uses the filter predicate.

  • dataTypeMapping – Dictionary, optional, custom data type mapping that builds a mapping from a JDBC data type to a Glue data type. For example, the option "dataTypeMapping":{"FLOAT":"STRING"} maps data fields of JDBC type FLOAT into the Java String type by calling the ResultSet.getString() method of the driver, and uses it to build Amazon Glue records. The ResultSet object is implemented by each driver, so the behavior is specific to the driver you use. Refer to the documentation for your JDBC driver to understand how the driver performs the conversions.

  • The Amazon Glue data types supported currently are:

    • DATE

    • STRING

    • TIMESTAMP

    • INT

    • FLOAT

    • LONG

    • BIGDECIMAL

    • BYTE

    • SHORT

    • DOUBLE

    The JDBC data types supported are Java8 java.sql.types.

    The default data type mappings (from JDBC to Amazon Glue) are:

    • DATE -> DATE

    • VARCHAR -> STRING

    • CHAR -> STRING

    • LONGNVARCHAR -> STRING

    • TIMESTAMP -> TIMESTAMP

    • INTEGER -> INT

    • FLOAT -> FLOAT

    • REAL -> FLOAT

    • BIT -> BOOLEAN

    • BOOLEAN -> BOOLEAN

    • BIGINT -> LONG

    • DECIMAL -> BIGDECIMAL

    • NUMERIC -> BIGDECIMAL

    • TINYINT -> SHORT

    • SMALLINT -> SHORT

    • DOUBLE -> DOUBLE

    If you use a custom data type mapping with the option dataTypeMapping, then you can override a default data type mapping. Only the JDBC data types listed in the dataTypeMapping option are affected; the default mapping is used for all other JDBC data types. You can add mappings for additional JDBC data types if needed. If a JDBC data type is not included in either the default mapping or a custom mapping, then the data type converts to the Amazon Glue STRING data type by default.

The following Python code example shows how to read from JDBC databases with Amazon Web Services Marketplace JDBC drivers. It demonstrates reading from a database and writing to an S3 location.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

Connection options for type custom.athena or marketplace.athena

  • className – String, required, driver class name. When you're using the Athena-CloudWatch connector, this parameter value is the prefix of the class Name (for example, "com.amazonaws.athena.connectors"). The Athena-CloudWatch connector is composed of two classes: a metadata handler and a record handler. If you supply the common prefix here, then the API loads the correct classes based on that prefix.

  • tableName – String, required, the name of the CloudWatch log stream to read. This code snippet uses the special view name all_log_streams, which means that the dynamic data frame returned will contain data from all log streams in the log group.

  • schemaName – String, required, the name of the CloudWatch log group to read from. For example, /aws-glue/jobs/output.

  • connectionName – String, required, name of the connection that is associated with the connector.

For additional options for this connector, see the Amazon Athena CloudWatch Connector README file on GitHub.

The following Python code example shows how to read from an Athena data store using an Amazon Web Services Marketplace connector. It demonstrates reading from Athena and writing to an S3 location.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

Connection options for type custom.spark or marketplace.spark

  • className – String, required, connector class name.

  • secretId – String, optional, used to retrieve credentials for the connector connection.

  • connectionName – String, required, name of the connection that is associated with the connector.

  • Other options depend on the data store. For example, OpenSearch configuration options start with the prefix es, as described in the Elasticsearch for Apache Hadoop documentation. Spark connections to Snowflake use options such as sfUser and sfPassword, as described in Using the Spark Connector in the Connecting to Snowflake guide.

The following Python code example shows how to read from an OpenSearch data store using a marketplace.spark connection.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://<Amazon endpoint>", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<Amazon endpoint>","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

General options

The options in this section are provided as connection_options, but do not specifically apply to one connector.

The following parameters are used generally when configuring bookmarks. They may apply to Amazon S3 or JDBC workflows. For more information, see Using job bookmarks.

  • jobBookmarkKeys — Array of column names.

  • jobBookmarkKeysSortOrder — String defining how to compare values based on sort order. Valid values: "asc", "desc".

  • useS3ListImplementation — Used to manage memory performance when listing Amazon S3 bucket contents. For more information, see Optimize memory management in Amazon Glue.