Amazon Glue Scala GlueContext APIs
Package: com.amazonaws.services.glue
class GlueContext extends SQLContext(sc) (
@transient val sc : SparkContext,
val defaultSourcePartitioner : PartitioningStrategy )
GlueContext
is the entry point for reading and writing a DynamicFrame from and to Amazon Simple Storage Service (Amazon S3), the
Amazon Glue Data Catalog, JDBC, and so on. This class provides utility functions to create DataSource trait and DataSink objects that can in turn be
used to read and write DynamicFrame
s.
You can also use GlueContext
to set a target number of partitions (default 20)
in the DynamicFrame
if the number of partitions created from the source is less
than a minimum threshold for partitions (default 10).
def addIngestionTimeColumns
def addIngestionTimeColumns(
df : DataFrame,
timeGranularity : String = "") : dataFrame
Appends ingestion time columns like ingest_year
, ingest_month
,
ingest_day
, ingest_hour
, ingest_minute
to the input
DataFrame
. This function is automatically generated in the script generated
by the Amazon Glue when you specify a Data Catalog table with Amazon S3 as the target. This
function automatically updates the partition with ingestion time columns on the output
table. This allows the output data to be automatically partitioned on ingestion time without
requiring explicit ingestion time columns in the input data.
-
dataFrame
– ThedataFrame
to append the ingestion time columns to. -
timeGranularity
– The granularity of the time columns. Valid values are "day
", "hour
" and "minute
". For example, if "hour
" is passed in to the function, the originaldataFrame
will have "ingest_year
", "ingest_month
", "ingest_day
", and "ingest_hour
" time columns appended.
Returns the data frame after appending the time granularity columns.
Example:
glueContext.addIngestionTimeColumns(dataFrame, "hour")
def createDataFrameFromOptions
def createDataFrameFromOptions( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSource
Returns a DataFrame
created with the specified connection and format. Use this function only with Amazon Glue streaming sources.
connectionType
– The streaming connection type. Valid values includekinesis
andkafka
.-
connectionOptions
– Connection options, which are different for Kinesis and Kafka. You can find the list of all connection options for each streaming data source at Connection types and options for ETL in Amazon Glue for Spark. Note the following differences in streaming connection options:-
Kinesis streaming sources require
streamARN
,startingPosition
,inferSchema
, andclassification
. -
Kafka streaming sources require
connectionName
,topicName
,startingOffsets
,inferSchema
, andclassification
.
-
transformationContext
– The transformation context to use (optional).format
– A format specification (optional). This is used for an Amazon S3 or an Amazon Glue connection that supports multiple formats. For information about the supported formats, see Data format options for inputs and outputs in Amazon Glue for SparkformatOptions
– Format options for the specified format. For information about the supported format options, see Data format options.
Example for Amazon Kinesis streaming source:
val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kinesis", connectionOptions = JsonOptions("""{"streamName": "example_stream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json"}}"""))
Example for Kafka streaming source:
val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kafka", connectionOptions = JsonOptions("""{"connectionName": "example_connection", "topicName": "example_topic", "startingPosition": "earliest", "inferSchema": "false", "classification": "json", "schema":"`column1` STRING, `column2` STRING"}"""))
forEachBatch
forEachBatch(frame, batch_function, options)
Applies the batch_function
passed in to every micro batch that is read from
the Streaming source.
-
frame
– The DataFrame containing the current micro batch. -
batch_function
– A function that will be applied for every micro batch. -
options
– A collection of key-value pairs that holds information about how to process micro batches. The following options are required:-
windowSize
– The amount of time to spend processing each batch. -
checkpointLocation
– The location where checkpoints are stored for the streaming ETL job. -
batchMaxRetries
– The maximum number of times to retry the batch if it fails. The default value is 3. This option is only configurable for Glue version 2.0 and above.
-
Example:
glueContext.forEachBatch(data_frame_datasource0, (dataFrame: Dataset[Row], batchId: Long) => { if (dataFrame.count() > 0) { val datasource0 = DynamicFrame(glueContext.addIngestionTimeColumns(dataFrame, "hour"), glueContext) // @type: DataSink // @args: [database = "tempdb", table_name = "fromoptionsoutput", stream_batch_time = "100 seconds", // stream_checkpoint_location = "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/", // transformation_ctx = "datasink1"] // @return: datasink1 // @inputs: [frame = datasource0] val options_datasink1 = JsonOptions( Map("partitionKeys" -> Seq("ingest_year", "ingest_month","ingest_day", "ingest_hour"), "enableUpdateCatalog" -> true)) val datasink1 = glueContext.getCatalogSink( database = "tempdb", tableName = "fromoptionsoutput", redshiftTmpDir = "", transformationContext = "datasink1", additionalOptions = options_datasink1).writeDynamicFrame(datasource0) } }, JsonOptions("""{"windowSize" : "100 seconds", "checkpointLocation" : "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/"}"""))
def getCatalogSink
def getCatalogSink( database : String,
tableName : String,
redshiftTmpDir : String = "",
transformationContext : String = ""
additionalOptions: JsonOptions = JsonOptions.empty,
catalogId: String = null
) : DataSink
Creates a DataSink that writes to a location specified in a table that is defined in the Data Catalog.
database
— The database name in the Data Catalog.tableName
— The table name in the Data Catalog.redshiftTmpDir
— The temporary staging directory to be used with certain data sinks. Set to empty by default.transformationContext
— The transformation context that is associated with the sink to be used by job bookmarks. Set to empty by default.additionalOptions
– Additional options provided to Amazon Glue.catalogId
— The catalog ID (account ID) of the Data Catalog being accessed. When null, the default account ID of the caller is used.
Returns the DataSink
.
def getCatalogSource
def getCatalogSource( database : String,
tableName : String,
redshiftTmpDir : String = "",
transformationContext : String = ""
pushDownPredicate : String = " "
additionalOptions: JsonOptions = JsonOptions.empty,
catalogId: String = null
) : DataSource
Creates a DataSource trait that reads data from a table definition in the Data Catalog.
database
— The database name in the Data Catalog.tableName
— The table name in the Data Catalog.redshiftTmpDir
— The temporary staging directory to be used with certain data sinks. Set to empty by default.transformationContext
— The transformation context that is associated with the sink to be used by job bookmarks. Set to empty by default.pushDownPredicate
– Filters partitions without having to list and read all the files in your dataset. For more information, see Pre-filtering using pushdown predicates.additionalOptions
– A collection of optional name-value pairs. The possible options include those listed in Connection types and options for ETL in Amazon Glue for Spark except forendpointUrl
,streamName
,bootstrap.servers
,security.protocol
,topicName
,classification
, anddelimiter
. Another supported option iscatalogPartitionPredicate
:catalogPartitionPredicate
— You can pass a catalog expression to filter based on the index columns. This pushes down the filtering to the server side. For more information, see Amazon Glue Partition Indexes. Note thatpush_down_predicate
andcatalogPartitionPredicate
use different syntaxes. The former one uses Spark SQL standard syntax and the later one uses JSQL parser.catalogId
— The catalog ID (account ID) of the Data Catalog being accessed. When null, the default account ID of the caller is used.
Returns the DataSource
.
Example for streaming source
val data_frame_datasource0 = glueContext.getCatalogSource( database = "tempdb", tableName = "test-stream-input", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions("""{ "startingPosition": "TRIM_HORIZON", "inferSchema": "false"}""") ).getDataFrame()
def getJDBCSink
def getJDBCSink( catalogConnection : String,
options : JsonOptions,
redshiftTmpDir : String = "",
transformationContext : String = "",
catalogId: String = null
) : DataSink
Creates a DataSink that writes to a JDBC
database that is specified in a Connection
object in the Data Catalog. The
Connection
object has information to connect to a JDBC sink, including the URL,
user name, password, VPC, subnet, and security groups.
catalogConnection
— The name of the connection in the Data Catalog that contains the JDBC URL to write to.options
— A string of JSON name-value pairs that provide additional information that is required to write to a JDBC data store. This includes:dbtable (required) — The name of the JDBC table. For JDBC data stores that support schemas within a database, specify
schema.table-name
. If a schema is not provided, then the default "public" schema is used. The following example shows an options parameter that points to a schema namedtest
and a table namedtest_table
in databasetest_db
.options = JsonOptions("""{"dbtable": "test.test_table", "database": "test_db"}""")
database (required) — The name of the JDBC database.
Any additional options passed directly to the SparkSQL JDBC writer. For more information, see Redshift data source for Spark
.
redshiftTmpDir
— A temporary staging directory to be used with certain data sinks. Set to empty by default.transformationContext
— The transformation context that is associated with the sink to be used by job bookmarks. Set to empty by default.catalogId
— The catalog ID (account ID) of the Data Catalog being accessed. When null, the default account ID of the caller is used.
Example code:
getJDBCSink(catalogConnection = "my-connection-name", options = JsonOptions("""{"dbtable": "my-jdbc-table", "database": "my-jdbc-db"}"""), redshiftTmpDir = "", transformationContext = "datasink4")
Returns the DataSink
.
def getSink
def getSink( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = ""
) : DataSink
Creates a DataSink that writes data to a destination like Amazon Simple Storage Service (Amazon S3), JDBC, or the Amazon Glue Data Catalog, or an Apache Kafka or Amazon Kinesis data stream.
-
connectionType
— The type of the connection. See Connection types and options for ETL in Amazon Glue for Spark. -
connectionOptions
— A string of JSON name-value pairs that provide additional information to establish the connection with the data sink. See Connection types and options for ETL in Amazon Glue for Spark. -
transformationContext
— The transformation context that is associated with the sink to be used by job bookmarks. Set to empty by default.
Returns the DataSink
.
def getSinkWithFormat
def getSinkWithFormat( connectionType : String,
options : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSink
Creates a DataSink that writes data to a destination like Amazon S3, JDBC, or the Data Catalog, or an Apache Kafka or Amazon Kinesis data stream. Also sets the format for the data to be written out to the destination.
connectionType
— The type of the connection. See Connection types and options for ETL in Amazon Glue for Spark.-
options
— A string of JSON name-value pairs that provide additional information to establish a connection with the data sink. See Connection types and options for ETL in Amazon Glue for Spark. transformationContext
— The transformation context that is associated with the sink to be used by job bookmarks. Set to empty by default.format
— The format of the data to be written out to the destination.formatOptions
— A string of JSON name-value pairs that provide additional options for formatting data at the destination. See Data format options.
Returns the DataSink
.
def getSource
def getSource( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = ""
pushDownPredicate
) : DataSource
Creates a DataSource trait that reads data from a source like Amazon S3, JDBC, or the Amazon Glue Data Catalog. Also supports Kafka and Kinesis streaming data sources.
connectionType
— The type of the data source. See Connection types and options for ETL in Amazon Glue for Spark.-
connectionOptions
— A string of JSON name-value pairs that provide additional information for establishing a connection with the data source. For more information, see Connection types and options for ETL in Amazon Glue for Spark.A Kinesis streaming source requires the following connection options:
streamARN
,startingPosition
,inferSchema
, andclassification
.A Kafka streaming source requires the following connection options:
connectionName
,topicName
,startingOffsets
,inferSchema
, andclassification
. transformationContext
— The transformation context that is associated with the sink to be used by job bookmarks. Set to empty by default.pushDownPredicate
— Predicate on partition columns.
Returns the DataSource
.
Example for Amazon Kinesis streaming source:
val kinesisOptions = jsonOptions() data_frame_datasource0 = glueContext.getSource("kinesis", kinesisOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"streamARN": "arn:aws:kinesis:eu-central-1:123456789012:stream/fromOptionsStream", |"startingPosition": "TRIM_HORIZON", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }
Example for Kafka streaming source:
val kafkaOptions = jsonOptions() val data_frame_datasource0 = glueContext.getSource("kafka", kafkaOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"connectionName": "ConfluentKafka", |"topicName": "kafka-auth-topic", |"startingOffsets": "earliest", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }
def getSourceWithFormat
def getSourceWithFormat( connectionType : String,
options : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSource
Creates a DataSource trait that reads data from a source like Amazon S3, JDBC, or the Amazon Glue Data Catalog, and also sets the format of data stored in the source.
connectionType
– The type of the data source. See Connection types and options for ETL in Amazon Glue for Spark.-
options
– A string of JSON name-value pairs that provide additional information for establishing a connection with the data source. See Connection types and options for ETL in Amazon Glue for Spark. transformationContext
– The transformation context that is associated with the sink to be used by job bookmarks. Set to empty by default.format
– The format of the data that is stored at the source. When theconnectionType
is "s3", you can also specifyformat
. Can be one of “avro”, “csv”, “grokLog”, “ion”, “json”, “xml”, “parquet”, or “orc”.formatOptions
– A string of JSON name-value pairs that provide additional options for parsing data at the source. See Data format options.
Returns the DataSource
.
Examples
Create a DynamicFrame from a data source that is a comma-separated values (CSV) file on Amazon S3:
val datasource0 = glueContext.getSourceWithFormat( connectionType="s3", options =JsonOptions(s"""{"paths": [ "
s3://csv/nycflights.csv
"]}"""), transformationContext = "datasource0", format = "csv", formatOptions=JsonOptions(s"""{"withHeader":"true","separator": ","}""") ).getDynamicFrame()
Create a DynamicFrame from a data source that is a PostgreSQL using a JDBC connection:
val datasource0 = glueContext.getSourceWithFormat( connectionType="postgresql", options =JsonOptions(s"""{ "url":"jdbc:postgresql://
databasePostgres-1
.rds.amazonaws.com:5432
/testdb
", "dbtable": "public.company
", "redshiftTmpDir":"", "user":"username
", "password":"password123
" }"""), transformationContext = "datasource0").getDynamicFrame()
Create a DynamicFrame from a data source that is a MySQL using a JDBC connection:
val datasource0 = glueContext.getSourceWithFormat( connectionType="mysql", options =JsonOptions(s"""{ "url":"jdbc:mysql://
databaseMysql-1
.rds.amazonaws.com:3306
/testdb
", "dbtable": "athenatest_nycflights13_csv
", "redshiftTmpDir":"", "user":"username
", "password":"password123
" }"""), transformationContext = "datasource0").getDynamicFrame()
def getSparkSession
def getSparkSession : SparkSession
Gets the SparkSession
object associated with this GlueContext. Use this
SparkSession object to register tables and UDFs for use with DataFrame
created
from DynamicFrames.
Returns the SparkSession.
def startTransaction
def startTransaction(readOnly: Boolean):String
Start a new transaction. Internally calls the Lake Formation startTransaction API.
readOnly
– (Boolean) Indicates whether this transaction should be read only or read and write. Writes made using a read-only transaction ID will be rejected. Read-only transactions do not need to be committed.
Returns the transaction ID.
def commitTransaction
def commitTransaction(transactionId: String, waitForCommit: Boolean): Boolean
Attempts to commit the specified transaction. commitTransaction
may return
before the transaction has finished committing. Internally calls the Lake Formation commitTransaction API.
transactionId
– (String) The transaction to commit.waitForCommit
– (Boolean) Determines whether thecommitTransaction
returns immediately. The default value is true. If false,commitTransaction
polls and waits until the transaction is committed. The amount of wait time is restricted to 1 minute using exponential backoff with a maximum of 6 retry attempts.
Returns a Boolean to indicate whether the commit is done or not.
def cancelTransaction
def cancelTransaction(transactionId: String): Unit
Attempts to cancel the specified transaction. Internally calls the Lake Formation CancelTransaction API.
transactionId
– (String) The transaction to cancel.
Returns a TransactionCommittedException
exception if the transaction was previously committed.
def this
def this( sc : SparkContext,
minPartitions : Int,
targetPartitions : Int )
Creates a GlueContext
object using the specified SparkContext
,
minimum partitions, and target partitions.
sc
— TheSparkContext
.minPartitions
— The minimum number of partitions.targetPartitions
— The target number of partitions.
Returns the GlueContext
.
def this
def this( sc : SparkContext )
Creates a GlueContext
object with the provided SparkContext
.
Sets the minimum partitions to 10 and target partitions to 20.
sc
— TheSparkContext
.
Returns the GlueContext
.
def this
def this( sparkContext : JavaSparkContext )
Creates a GlueContext
object with the provided JavaSparkContext
.
Sets the minimum partitions to 10 and target partitions to 20.
sparkContext
— TheJavaSparkContext
.
Returns the GlueContext
.