Amazon Glue Scala GlueContext API
程序包:com.amazonaws.services.glue
class GlueContext extends SQLContext(sc) (
@transient val sc : SparkContext,
val defaultSourcePartitioner : PartitioningStrategy )
GlueContext
是在以下位置读取和写入 DynamicFrame 的入口点:Amazon Simple Storage Service(Amazon S3)、Amazon Glue 数据目录、JDBC 等。此类提供实用程序函数,用于创建可用来读取和写入 DynamicFrame
的 数据源特性 和 DataSink 对象。
如果从源创建的分区数小于分区的最小阈值(默认值为 10),您还可以使用 GlueContext
设置 DynamicFrame
中的目标分区数量(默认值为 20)。
def addIngestionTimeColumns
def addIngestionTimeColumns(
df : DataFrame,
timeGranularity : String = "") : dataFrame
将提取时间列(例如 ingest_year
、ingest_month
、ingest_day
、ingest_hour
、ingest_minute
)附加到输入 DataFrame
。当您指定以 Amazon S3 为目标的数据目录表时,Amazon Glue 生成的脚本中会自动生成此函数。此函数使用输出表上的提取时间列自动更新分区。这允许根据提取时间自动对输出数据进行分区,而不需要在输入数据中显示提取时间列。
-
dataFrame
– 提取时间列要附加到的dataFrame
。 -
timeGranularity
– 时间列的粒度。有效值为“day
”、“hour
”和“minute
”。例如,如果“hour
”传递到函数,原始dataFrame
将附加“ingest_year
”、“ingest_month
”、“ingest_day
”和“ingest_hour
”时间列。
在附加时间粒度列后返回数据框。
例如:
glueContext.addIngestionTimeColumns(dataFrame, "hour")
def createDataFrameFromOptions
def createDataFrameFromOptions( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSource
返回一个使用指定连接和格式创建的 DataFrame
。仅将此函数与 Amazon Glue 流式传输源结合使用。
connectionType
– 流式传输连接类型。有效值包括kinesis
和kafka
。-
connectionOptions
– 连接选项,不同于 Kinesis 和 Kafka。您可以在 Amazon Glue for Spark 中适用于 ETL 的连接类型和选项 找到每个流式传输数据源的所有连接选项列表。请注意流式传输连接选项的以下差异:-
Kinesis 流式传输源需要
streamARN
、startingPosition
、inferSchema
和classification
。 -
Kinesis 流式传输源需要
connectionName
、topicName
、startingOffsets
、inferSchema
和classification
。
-
transformationContext
– 要使用的转换上下文(可选)。format
– 格式规范(可选)。这用于 Amazon S3 或支持多种格式的 Amazon Glue 连接。有关所支持格式的信息,请参阅 Amazon Glue for Spark 中的输入和输出的数据格式选项。formatOptions
– 指定格式的格式选项。有关所支持的格式选项的信息,请参阅数据格式选项。
Amazon Kinesis 流式传输源示例:
val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kinesis", connectionOptions = JsonOptions("""{"streamName": "example_stream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json"}}"""))
Kafka 流式传输源示例:
val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kafka", connectionOptions = JsonOptions("""{"connectionName": "example_connection", "topicName": "example_topic", "startingPosition": "earliest", "inferSchema": "false", "classification": "json", "schema":"`column1` STRING, `column2` STRING"}"""))
forEachBatch
forEachBatch(frame, batch_function, options)
将传入的 batch_function
应用于从流式传输源读取的每个微批处理。
-
frame
– 包含当前微处理的 DataFrame。 -
batch_function
– 应用于每个微处理的函数。 -
options
– 键值对集合,其中包含有关如何处理微批处理的信息。以下选项为必填:-
windowSize
– 处理每个批处理所花费的时间量。 -
checkpointLocation
– 为流式传输 ETL 任务存储检查点的位置。 -
batchMaxRetries
– 在该批处理失败时重试的最大次数。默认值为 3。此选项仅适用于 Glue 版本 2.0 及更高版本。
-
示例:
glueContext.forEachBatch(data_frame_datasource0, (dataFrame: Dataset[Row], batchId: Long) => { if (dataFrame.count() > 0) { val datasource0 = DynamicFrame(glueContext.addIngestionTimeColumns(dataFrame, "hour"), glueContext) // @type: DataSink // @args: [database = "tempdb", table_name = "fromoptionsoutput", stream_batch_time = "100 seconds", // stream_checkpoint_location = "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/", // transformation_ctx = "datasink1"] // @return: datasink1 // @inputs: [frame = datasource0] val options_datasink1 = JsonOptions( Map("partitionKeys" -> Seq("ingest_year", "ingest_month","ingest_day", "ingest_hour"), "enableUpdateCatalog" -> true)) val datasink1 = glueContext.getCatalogSink( database = "tempdb", tableName = "fromoptionsoutput", redshiftTmpDir = "", transformationContext = "datasink1", additionalOptions = options_datasink1).writeDynamicFrame(datasource0) } }, JsonOptions("""{"windowSize" : "100 seconds", "checkpointLocation" : "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/"}"""))
def getCatalogSink
def getCatalogSink( database : String,
tableName : String,
redshiftTmpDir : String = "",
transformationContext : String = ""
additionalOptions: JsonOptions = JsonOptions.empty,
catalogId: String = null
) : DataSink
创建一个可向在数据目录中定义的表中指定的位置写入的 DataSink。
database
– 数据目录中的数据库名称。tableName
– 数据目录中的表名称。redshiftTmpDir
– 用于某些数据接收器的临时暂存目录。设置为 默认情况下为空。transformationContext
– 与任务书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。additionalOptions
提供给 Amazon Glue 的额外选项。catalogId
– 正在访问的数据目录 ID(账户 ID)。当为空时,将使用调用方的默认账户 ID。
返回 DataSink
。
def getCatalogSource
def getCatalogSource( database : String,
tableName : String,
redshiftTmpDir : String = "",
transformationContext : String = ""
pushDownPredicate : String = " "
additionalOptions: JsonOptions = JsonOptions.empty,
catalogId: String = null
) : DataSource
创建可从数据目录中的表定义读取数据的 数据源特性。
database
– 数据目录中的数据库名称。tableName
– 数据目录中的表名称。redshiftTmpDir
– 用于某些数据接收器的临时暂存目录。设置为 默认情况下为空。transformationContext
– 与任务书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。pushDownPredicate
– 筛选分区,而不必列出并读取数据集中的所有文件。有关更多信息,请参阅 使用下推谓词进行预筛选。additionalOptions
– 可选名称/值对的集合。可能选项包括 Amazon Glue for Spark 中适用于 ETL 的连接类型和选项 中列出的选项,但endpointUrl
、streamName
、bootstrap.servers
、security.protocol
、topicName
、classification
和delimiter
除外。另一个支持的选项是catalogPartitionPredicate
:catalogPartitionPredicate
– 要传递目录表达式以根据索引列进行筛选。这样会将筛选下推到服务器端。有关更多信息,请参阅 Amazon Glue 分区数据。请注意,push_down_predicate
和catalogPartitionPredicate
使用不同的语法。前者使用 Spark SQL 标准语法,后者使用 JSQL 解析器。catalogId
– 正在访问的数据目录 ID(账户 ID)。当为空时,将使用调用方的默认账户 ID。
返回 DataSource
。
流式传输源示例
val data_frame_datasource0 = glueContext.getCatalogSource( database = "tempdb", tableName = "test-stream-input", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions("""{ "startingPosition": "TRIM_HORIZON", "inferSchema": "false"}""") ).getDataFrame()
def getJDBCSink
def getJDBCSink( catalogConnection : String,
options : JsonOptions,
redshiftTmpDir : String = "",
transformationContext : String = "",
catalogId: String = null
) : DataSink
创建一个可向在数据目录中的 Connection
对象指定的 JDBC 数据库写入的 DataSink。Connection
对象具有用于连接 JDBC 接收器的信息,包括 URL、用户名、密码、VPC、子网和安全组。
catalogConnection
– 数据目录中包含要写入的 JDBC URL 的连接名称。options
– JSON 名称-值对的字符串,提供写入 JDBC 数据存储所需的附加信息。这包括:dbtable(必填)- JDBC 表名称。对于在数据库中支持架构的 JDBC 数据存储,指定
schema.table-name
。如果未提供架构,则使用默认的“public”架构。以下示例显示一个指向名为test
的架构的选项参数和数据库test_db
中一个名为test_table
的表。options = JsonOptions("""{"dbtable": "test.test_table", "database": "test_db"}""")
database(必填)- JDBC 数据库名称。
任何其他选项直接传递到 SparkSQL JDBC 写入器。有关更多信息,请参阅 Spark 的 Redshift 数据源
。
redshiftTmpDir
– 用于某些数据接收器的临时目录。设置为 默认情况下为空。transformationContext
– 与任务书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。catalogId
– 正在访问的数据目录 ID(账户 ID)。当为空时,将使用调用方的默认账户 ID。
示例代码:
getJDBCSink(catalogConnection = "my-connection-name", options = JsonOptions("""{"dbtable": "my-jdbc-table", "database": "my-jdbc-db"}"""), redshiftTmpDir = "", transformationContext = "datasink4")
返回 DataSink
。
def getSink
def getSink( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = ""
) : DataSink
创建 DataSink,将数据写入 Amazon Simple Storage Service(Amazon S3)、JDBC、Amazon Glue 数据目录、Apache Kafka 或 Amazon Kinesis 数据流等目标写入数据。
-
connectionType
- 连接的类型。请参阅 Amazon Glue for Spark 中适用于 ETL 的连接类型和选项。 -
connectionOptions
– JSON 名称-值对的字符串,提供与数据接收器建立连接所需的附加信息。请参阅 Amazon Glue for Spark 中适用于 ETL 的连接类型和选项。 -
transformationContext
– 与任务书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。
返回 DataSink
。
def getSinkWithFormat
def getSinkWithFormat( connectionType : String,
options : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSink
创建 DataSink,将数据写入 Amazon S3、JDBC、数据目录、Apache Kafka 或 Amazon Kinesis 数据流等目标写入数据。此外还将设置要写出到目标的数据格式。
connectionType
- 连接的类型。请参阅 Amazon Glue for Spark 中适用于 ETL 的连接类型和选项。-
options
– JSON 名称-值对的字符串,提供与数据接收器建立连接所需的附加信息。请参阅 Amazon Glue for Spark 中适用于 ETL 的连接类型和选项。 transformationContext
– 与任务书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。format
– 要写出到目标的数据的格式。formatOptions
– JSON 名称-值对的字符串,提供用于设置目标处的数据格式的附加选项。请参阅 数据格式选项。
返回 DataSink
。
def getSource
def getSource( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = ""
pushDownPredicate
) : DataSource
创建可从 Amazon S3、JDBC 或 Amazon Glue 数据目录等源中读取数据的 数据源特性。还支持 Kafka 和 Kinesis 流式传输数据源。
connectionType
– 数据源的类型。请参阅 Amazon Glue for Spark 中适用于 ETL 的连接类型和选项。-
connectionOptions
– JSON 名称-值对的字符串,提供与数据源建立连接所需的附加信息。有关更多信息,请参阅 Amazon Glue for Spark 中适用于 ETL 的连接类型和选项。Kinesis 流式传输源需要以下连接选项:
streamARN
、startingPosition
、inferSchema
和classification
。Kinesis 流式传输源需要以下连接选项:
connectionName
、topicName
、startingOffsets
、inferSchema
和classification
。 transformationContext
– 与任务书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。pushDownPredicate
– 预测分区列。
返回 DataSource
。
Amazon Kinesis 流式传输源示例:
val kinesisOptions = jsonOptions() data_frame_datasource0 = glueContext.getSource("kinesis", kinesisOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"streamARN": "arn:aws:kinesis:eu-central-1:123456789012:stream/fromOptionsStream", |"startingPosition": "TRIM_HORIZON", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }
Kafka 流式传输源示例:
val kafkaOptions = jsonOptions() val data_frame_datasource0 = glueContext.getSource("kafka", kafkaOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"connectionName": "ConfluentKafka", |"topicName": "kafka-auth-topic", |"startingOffsets": "earliest", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }
def getSourceWithFormat
def getSourceWithFormat( connectionType : String,
options : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSource
创建一个 数据源特性,从 Amazon S3, JDBC 或 Amazon Glue 数据目录等源读取数据并设置源中所存储数据的格式。
connectionType
– 数据源的类型。请参阅 Amazon Glue for Spark 中适用于 ETL 的连接类型和选项。-
options
– JSON 名称-值对的字符串,提供与数据源建立连接所需的附加信息。请参阅 Amazon Glue for Spark 中适用于 ETL 的连接类型和选项。 transformationContext
– 与任务书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。format
– 源中所存储数据的格式。当connectionType
为“s3”时,您也可以指定format
。可以是以下值之一:“avro”、“csv”、“grokLog”、“ion”、“json”、“xml”、“parquet”或“orc”。formatOptions
– JSON 名称-值对的字符串,提供用于在源中分析数据的附加选项。请参阅 数据格式选项。
返回 DataSource
。
示例
从 Amazon S3 上逗号分隔值(CSV)文件的数据源创建 DynamicFrame:
val datasource0 = glueContext.getSourceWithFormat( connectionType="s3", options =JsonOptions(s"""{"paths": [ "
s3://csv/nycflights.csv
"]}"""), transformationContext = "datasource0", format = "csv", formatOptions=JsonOptions(s"""{"withHeader":"true","separator": ","}""") ).getDynamicFrame()
使用 JDBC 连接从作为 PostgreSQL 的数据源创建 DynamicFrame:
val datasource0 = glueContext.getSourceWithFormat( connectionType="postgresql", options =JsonOptions(s"""{ "url":"jdbc:postgresql://
databasePostgres-1
.rds.amazonaws.com:5432
/testdb
", "dbtable": "public.company
", "redshiftTmpDir":"", "user":"username
", "password":"password123
" }"""), transformationContext = "datasource0").getDynamicFrame()
使用 JDBC 连接从作为 MySQL 的数据源创建 DynamicFrame:
val datasource0 = glueContext.getSourceWithFormat( connectionType="mysql", options =JsonOptions(s"""{ "url":"jdbc:mysql://
databaseMysql-1
.rds.amazonaws.com:3306
/testdb
", "dbtable": "athenatest_nycflights13_csv
", "redshiftTmpDir":"", "user":"username
", "password":"password123
" }"""), transformationContext = "datasource0").getDynamicFrame()
def getSparkSession
def getSparkSession : SparkSession
获取与此 GlueContext 关联的 SparkSession
对象。使用此 SparkSession 对象可注册要用于从 DynamicFrame 创建的 DataFrame
的表和 UDF。
返回 SparkSession。
def startTransaction
def startTransaction(readOnly: Boolean):String
开启新事务。内部调用 Lake Formation startTransaction API。
readOnly
–(布尔值)指示此事务应为只读还是读写。使用只读事务 ID 进行的写入将被拒绝。只读事务不需要提交。
返回事务 ID。
def commitTransaction
def commitTransaction(transactionId: String, waitForCommit: Boolean): Boolean
尝试提交指定的事务。可能在事务完成提交之前返回 commitTransaction
。内部调用 Lake Formation commitTransaction API。
transactionId
–(字符串)要提交的事务。waitForCommit
–(布尔值)确定是否立即返回commitTransaction
。默认值为 true。如果为假,则轮询commitTransaction
并等待事务提交。等待时间限制为 1 分钟使用指数回退,最多重试 6 次。
返回布尔值,以指示是否完成提交。
def cancelTransaction
def cancelTransaction(transactionId: String): Unit
尝试取消指定的事务。内部调用 Lake Formation CancelTransaction API。
transactionId
–(字符串)要取消的事务。
如果事务以前已提交,则返回 TransactionCommittedException
异常。
def this
def this( sc : SparkContext,
minPartitions : Int,
targetPartitions : Int )
使用指定的 SparkContext
、最小分区数和目标分区数创建 GlueContext
对象。
sc
— 这些区域有:SparkContext
。minPartitions
– 最小分区数。targetPartitions
– 目标分区数。
返回 GlueContext
。
def this
def this( sc : SparkContext )
使用提供的 GlueContext
创建一个 SparkContext
对象。将最小分区数设置为 10,将目标分区数设置为 20。
sc
— 这些区域有:SparkContext
。
返回 GlueContext
。
def this
def this( sparkContext : JavaSparkContext )
使用提供的 GlueContext
创建一个 JavaSparkContext
对象。将最小分区数设置为 10,将目标分区数设置为 20。
sparkContext
— 这些区域有:JavaSparkContext
。
返回 GlueContext
。