Connection types and options for ETL in Amazon Glue for Spark
In Amazon Glue for Spark, various PySpark and Scala methods and transforms specify the connection type using
a connectionType
parameter. They specify connection options using a
connectionOptions
or options
parameter.
The connectionType
parameter can take the values shown in the following table.
The associated connectionOptions
(or options
) parameter values for
each type are documented in the following sections. Except where otherwise noted, the parameters
apply when the connection is used as a source or sink.
For sample code that demonstrates setting and using connection options, see the homepage for each connection type.
connectionType |
Connects to |
---|---|
dynamodb | Amazon DynamoDB database |
kinesis | Amazon Kinesis Data Streams |
s3 | Amazon S3 |
documentdb | Amazon DocumentDB (with MongoDB compatibility) database |
opensearch | Amazon OpenSearch Service. |
redshift | Amazon Redshift |
kafka |
Kafka |
azurecosmos | Azure Cosmos for NoSQL. |
azuresql | Azure SQL. |
bigquery | Google BigQuery. |
mongodb | MongoDB |
sqlserver | Microsoft SQL Server database (see JDBC connections) |
mysql | MySQL |
oracle | Oracle |
postgresql |
PostgreSQL |
saphana | SAP HANA. |
snowflake | Snowflake |
teradata | Teradata Vantage. |
vertica | Vertica. |
custom.* | Spark, Athena, or JDBC data stores (see Custom and Amazon Web Services Marketplace connectionType values |
marketplace.* | Spark, Athena, or JDBC data stores (see Custom and Amazon Web Services Marketplace connectionType values) |
DataFrame options for ETL in Amazon Glue 5.0 for Spark
A DataFrame is a Dataset organized into named columns similar to a table and supports functional-style (map/reduce/filter/etc.) operations and SQL operations (select, project, aggregate).
To create a DataFrame for a data source supported by Glue, the following are required:
data source connector
ClassName
data source connection
Options
Similarly, to write a DataFrame to a data sink supported by Glue, the same are required:
data sink connector
ClassName
data sink connection
Options
Note that Amazon Glue features such as job bookmarks and DynamicFrame options such as connectionName
are not supported in DataFrame. For more details about DataFrame and the supported operations, see the Spark documentation for DataFrame
Specifying the connector ClassName
To specify the ClassName
of a data source/sink, use the .format
option to provide the corresponding connector ClassName
that defines the data source/sink.
JDBC connectors
For JDBC connectors, specify jdbc
as the value of the .format
option and provide the JDBC driver ClassName
in the driver
option.
df = spark.read.format("jdbc").option("driver", "<DATA SOURCE JDBC DRIVER CLASSNAME>")... df.write.format("jdbc").option("driver", "<DATA SINK JDBC DRIVER CLASSNAME>")...
The following table lists the JDBC driver ClassName
of the supported data source in Amazon Glue for DataFrames.
Data source | Driver ClassName |
---|---|
PostgreSQL | org.postgresql.Driver |
Oracle | oracle.jdbc.driver.OracleDriver |
SQLServer | com.microsoft.sqlserver.jdbc.SQLServerDriver |
MySQL | com.mysql.jdbc.Driver |
SAPHana | com.sap.db.jdbc.Driver |
Teradata | com.teradata.jdbc.TeraDriver |
Spark connectors
For spark connectors, specify the ClassName
of the connector as the value of the .format
option.
df = spark.read.format("<DATA SOURCE CONNECTOR CLASSNAME>")... df.write.format("<DATA SINK CONNECTOR CLASSNAME>")...
The following table lists the Spark connector ClassName
of the supported data source in Amazon Glue for DataFrames.
Data source | ClassName |
---|---|
MongoDB/DocumentDB | glue.spark.mongodb |
Redshift | io.github.spark_redshift_community.spark.redshift |
AzureCosmos | cosmos.oltp |
AzureSQL | com.microsoft.sqlserver.jdbc.spark |
BigQuery | com.google.cloud.spark.bigquery |
OpenSearch | org.opensearch.spark.sql |
Snowflake | net.snowflake.spark.snowflake |
Vertica | com.vertica.spark.datasource.VerticaSource |
Specifying the connection Options
To specify the Options
of the connection to a data source/sink, use the .option(<KEY>, <VALUE>)
to provide individual options or .options(<MAP>)
to provide multiple options as a key-value map.
Each data source/sink supports its own set of connection Options
. For details on the available Options
, refer to the public documentation of the specific data source/sink Spark connector listed in the following table.
Examples
The following examples read from PostgreSQL and write into SnowFlake:
Python
Example:
from awsglue.context import GlueContext from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() dataSourceClassName = "jdbc" dataSourceOptions = { "driver": "org.postgresql.Driver", "url": "<url>", "user": "<user>", "password": "<password>", "dbtable": "<dbtable>", } dataframe = spark.read.format(className).options(**options).load() dataSinkClassName = "net.snowflake.spark.snowflake" dataSinkOptions = { "sfUrl": "<url>", "sfUsername": "<username>", "sfPassword": "<password>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>" } dataframe.write.format(dataSinkClassName).options(**dataSinkOptions).save()
Scala
Example:
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().getOrCreate() val dataSourceClassName = "jdbc" val dataSourceOptions = Map( "driver" -> "org.postgresql.Driver", "url" -> "<url>", "user" -> "<user>", "password" -> "<password>", "dbtable" -> "<dbtable>" ) val dataframe = spark.read.format(dataSourceClassName).options(dataSourceOptions).load() val dataSinkClassName = "net.snowflake.spark.snowflake" val dataSinkOptions = Map( "sfUrl" -> "<url>", "sfUsername" -> "<username>", "sfPassword" -> "<password>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>" ) dataframe.write.format(dataSinkClassName).options(dataSinkOptions).save()
Custom and Amazon Web Services Marketplace connectionType values
These include the following:
-
"connectionType": "marketplace.athena"
: Designates a connection to an Amazon Athena data store. The connection uses a connector from Amazon Web Services Marketplace. -
"connectionType": "marketplace.spark"
: Designates a connection to an Apache Spark data store. The connection uses a connector from Amazon Web Services Marketplace. -
"connectionType": "marketplace.jdbc"
: Designates a connection to a JDBC data store. The connection uses a connector from Amazon Web Services Marketplace. -
"connectionType": "custom.athena"
: Designates a connection to an Amazon Athena data store. The connection uses a custom connector that you upload to Amazon Glue Studio. -
"connectionType": "custom.spark"
: Designates a connection to an Apache Spark data store. The connection uses a custom connector that you upload to Amazon Glue Studio. -
"connectionType": "custom.jdbc"
: Designates a connection to a JDBC data store. The connection uses a custom connector that you upload to Amazon Glue Studio.
Connection options for type custom.jdbc or marketplace.jdbc
-
className
– String, required, driver class name. -
connectionName
– String, required, name of the connection that is associated with the connector. -
url
– String, required, JDBC URL with placeholders (${}
) which are used to build the connection to the data source. The placeholder${secretKey}
is replaced with the secret of the same name in Amazon Secrets Manager. Refer to the data store documentation for more information about constructing the URL. -
secretId
oruser/password
– String, required, used to retrieve credentials for the URL. -
dbTable
orquery
– String, required, the table or SQL query to get the data from. You can specify eitherdbTable
orquery
, but not both. -
partitionColumn
– String, optional, the name of an integer column that is used for partitioning. This option works only when it's included withlowerBound
,upperBound
, andnumPartitions
. This option works the same way as in the Spark SQL JDBC reader. For more information, see JDBC To Other Databasesin the Apache Spark SQL, DataFrames and Datasets Guide. The
lowerBound
andupperBound
values are used to decide the partition stride, not for filtering the rows in table. All rows in the table are partitioned and returned.Note
When using a query instead of a table name, you should validate that the query works with the specified partitioning condition. For example:
-
If your query format is
"SELECT col1 FROM table1"
, then test the query by appending aWHERE
clause at the end of the query that uses the partition column. -
If your query format is "
SELECT col1 FROM table1 WHERE col2=val"
, then test the query by extending theWHERE
clause withAND
and an expression that uses the partition column.
-
-
lowerBound
– Integer, optional, the minimum value ofpartitionColumn
that is used to decide partition stride. -
upperBound
– Integer, optional, the maximum value ofpartitionColumn
that is used to decide partition stride. -
numPartitions
– Integer, optional, the number of partitions. This value, along withlowerBound
(inclusive) andupperBound
(exclusive), form partition strides for generatedWHERE
clause expressions that are used to split thepartitionColumn
.Important
Be careful with the number of partitions because too many partitions might cause problems on your external database systems.
-
filterPredicate
– String, optional, extra condition clause to filter data from source. For example:BillingCity='Mountain View'
When using a query instead of a table name, you should validate that the query works with the specified
filterPredicate
. For example:-
If your query format is
"SELECT col1 FROM table1"
, then test the query by appending aWHERE
clause at the end of the query that uses the filter predicate. -
If your query format is
"SELECT col1 FROM table1 WHERE col2=val"
, then test the query by extending theWHERE
clause withAND
and an expression that uses the filter predicate.
-
-
dataTypeMapping
– Dictionary, optional, custom data type mapping that builds a mapping from a JDBC data type to a Glue data type. For example, the option"dataTypeMapping":{"FLOAT":"STRING"}
maps data fields of JDBC typeFLOAT
into the JavaString
type by calling theResultSet.getString()
method of the driver, and uses it to build Amazon Glue records. TheResultSet
object is implemented by each driver, so the behavior is specific to the driver you use. Refer to the documentation for your JDBC driver to understand how the driver performs the conversions. -
The Amazon Glue data types supported currently are:
-
DATE
-
STRING
-
TIMESTAMP
-
INT
-
FLOAT
-
LONG
-
BIGDECIMAL
-
BYTE
-
SHORT
-
DOUBLE
The JDBC data types supported are Java8 java.sql.types
. The default data type mappings (from JDBC to Amazon Glue) are:
-
DATE -> DATE
-
VARCHAR -> STRING
-
CHAR -> STRING
-
LONGNVARCHAR -> STRING
-
TIMESTAMP -> TIMESTAMP
-
INTEGER -> INT
-
FLOAT -> FLOAT
-
REAL -> FLOAT
-
BIT -> BOOLEAN
-
BOOLEAN -> BOOLEAN
-
BIGINT -> LONG
-
DECIMAL -> BIGDECIMAL
-
NUMERIC -> BIGDECIMAL
-
TINYINT -> SHORT
-
SMALLINT -> SHORT
-
DOUBLE -> DOUBLE
If you use a custom data type mapping with the option
dataTypeMapping
, then you can override a default data type mapping. Only the JDBC data types listed in thedataTypeMapping
option are affected; the default mapping is used for all other JDBC data types. You can add mappings for additional JDBC data types if needed. If a JDBC data type is not included in either the default mapping or a custom mapping, then the data type converts to the Amazon GlueSTRING
data type by default. -
The following Python code example shows how to read from JDBC databases with Amazon Web Services Marketplace JDBC drivers. It demonstrates reading from a database and writing to an S3 location.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://
<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
Connection options for type custom.athena or marketplace.athena
-
className
– String, required, driver class name. When you're using the Athena-CloudWatch connector, this parameter value is the prefix of the class Name (for example,"com.amazonaws.athena.connectors"
). The Athena-CloudWatch connector is composed of two classes: a metadata handler and a record handler. If you supply the common prefix here, then the API loads the correct classes based on that prefix. -
tableName
– String, required, the name of the CloudWatch log stream to read. This code snippet uses the special view nameall_log_streams
, which means that the dynamic data frame returned will contain data from all log streams in the log group. -
schemaName
– String, required, the name of the CloudWatch log group to read from. For example,/aws-glue/jobs/output
. -
connectionName
– String, required, name of the connection that is associated with the connector.
For additional options for this connector, see the Amazon Athena CloudWatch Connector README
The following Python code example shows how to read from an Athena data store using an Amazon Web Services Marketplace connector. It demonstrates reading from Athena and writing to an S3 location.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://
<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
Connection options for type custom.spark or marketplace.spark
-
className
– String, required, connector class name. -
secretId
– String, optional, used to retrieve credentials for the connector connection. -
connectionName
– String, required, name of the connection that is associated with the connector. -
Other options depend on the data store. For example, OpenSearch configuration options start with the prefix
es
, as described in the Elasticsearch for Apache Hadoopdocumentation. Spark connections to Snowflake use options such as sfUser
andsfPassword
, as described in Using the Spark Connectorin the Connecting to Snowflake guide.
The following Python code example shows how to read from an OpenSearch data store using a
marketplace.spark
connection.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://
<Amazon endpoint>
", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<Amazon endpoint>
","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
General options
The options in this section are provided as connection_options
, but do not specifically apply
to one connector.
The following parameters are used generally when configuring bookmarks. They may apply to Amazon S3 or JDBC workflows. For more information, see Using job bookmarks.
jobBookmarkKeys
— Array of column names.jobBookmarkKeysSortOrder
— String defining how to compare values based on sort order. Valid values:"asc"
,"desc"
.useS3ListImplementation
— Used to manage memory performance when listing Amazon S3 bucket contents. For more information, see Optimize memory management in Amazon Glue.