本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
AWS Glue Scala GlueContext APIs
程序包:com.amazonaws.services.glue
class GlueContext extends SQLContext(sc) (
@transient val sc : SparkContext,
val defaultSourcePartitioner : PartitioningStrategy )
GlueContext
是在 Amazon Simple Storage Service (Amazon S3)、AWS Glue 数据目录、JDBC 等中读取和写入 DynamicFrame 的入口点。此类提供实用程序函数,用于创建可用来读取和写入 DynamicFrame
的 DataSource 性状 和 DataSink 对象。
如果从源创建的分区数小于分区的最小阈值(默认值为 10),您还可以使用 GlueContext
设置 DynamicFrame
中的目标分区数量(默认值为 20)。
def addIngestionColumns (定义 <;引擎名称>;)
def addIngestionColumns(
df : DataFrame,
timeGranularity : String = "") : dataFrame
将 ingest_year
、ingest_month
、ingest_day
、ingest_hour
、ingest_minute
等提取时间列附加到输入 DataFrame
中。 当您指定 AWS Glue 表并将 数据目录 作为目标时,此函数会在 Amazon S3 生成的脚本中自动生成。此函数会自动使用输出表上的提取时间列更新分区。这允许根据提取时间自动对输出数据进行分区,而无需在输入数据中显式提取时间列。
-
dataFrame
– 要将提取时间列附加到的dataFrame
。 -
timeGranularity
– 时间列的粒度。有效值为“day
”、“hour
”和“minute
”。例如,如果将“hour
”传递到函数,则原始dataFrame
将附加“ingest_year
”、“ingest_month
”、“ingest_day
”和“ingest_hour
”时间列。
在附加时间粒度列后返回数据帧。
例如:
glueContext.addIngestionColumns(dataFrame, "hour")
def getCatalogSink (定义 )
def getCatalogSink( database : String,
tableName : String,
redshiftTmpDir : String = "",
transformationContext : String = ""
additionalOptions: JsonOptions = JsonOptions.empty,
catalogId: String = null
) : DataSink
创建一个可向在 数据目录 中定义的表中指定的位置写入的 DataSink。
-
database
— 数据目录 中的数据库名称。 -
tableName
— 数据目录 中的表名称。 -
redshiftTmpDir
— 用于某些数据接收器的临时暂存目录。设置为 默认情况下为空。 -
transformationContext
— 与作业书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。 -
additionalOptions
– 提供给 AWS Glue 的额外选项。 -
catalogId
— 正在访问的数据目录的目录 ID(账户 ID)。当为空时,将使用调用方的默认账户 ID。
返回 DataSink
。
def getCatalogSource (定义 )
def getCatalogSource( database : String,
tableName : String,
redshiftTmpDir : String = "",
transformationContext : String = ""
pushDownPredicate : String = " "
additionalOptions: JsonOptions = JsonOptions.empty,
catalogId: String = null
) : DataSource
创建可从 数据目录 中的表定义读取数据的 DataSource 性状。
-
database
— 数据目录 中的数据库名称。 -
tableName
— 数据目录 中的表名称。 -
redshiftTmpDir
— 用于某些数据接收器的临时暂存目录。设置为 默认情况下为空。 -
transformationContext
— 与作业书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。 -
pushDownPredicate
– 筛选分区,而不必列出并读取数据集中的所有文件。有关更多信息,请参阅使用下推谓词进行预筛选。 -
additionalOptions
– 可选名称/值对的集合。可能的选项包括 AWS Glue 中的 ETL 的连接类型和选项 中列出的选项,但endpointUrl
、streamName
、bootstrap.servers
、security.protocol
、topicName
、classification
和delimiter
除外。 -
catalogId
— 正在访问的数据目录的目录 ID(账户 ID)。当为空时,将使用调用方的默认账户 ID。
返回 DataSource
。
def getJDBCSink (定义 Amazon EFS)
def getJDBCSink( catalogConnection : String,
options : JsonOptions,
redshiftTmpDir : String = "",
transformationContext : String = "",
catalogId: String = null
) : DataSink
创建一个可向在 数据目录 中的 Connection
对象指定的 JDBC 数据库写入的 DataSink。Connection
对象具有用于连接 JDBC 接收器的信息,包括 URL、用户名、密码、VPC、子网和安全组。
-
catalogConnection
— 数据目录 中包含要写入的 JDBC URL 的连接名称。 -
options
— JSON 名称-值对的字符串,提供写入 JDBC 数据存储所需的附加信息。这包括:-
dbtable(必填)— JDBC 表的名称。对于在数据库中支持架构的 JDBC 数据存储,请指定
schema.table-name
。 如果未提供架构,则使用默认的“public”架构。以下示例显示一个指向名为test
的架构的选项参数和数据库test_db
中一个名为test_table
的表。options = JsonOptions("""{"dbtable": "test.test_table", "database": "test_db"}""")
-
database(必填)— JDBC 数据库名称。
-
任何其他选项直接传递到 SparkSQL JDBC 写入器。有关更多信息,请参阅 Spark 的 Redshift 数据源
。
-
-
redshiftTmpDir
— 用于某些数据接收器的临时暂存目录。设置为 默认情况下为空。 -
transformationContext
— 与作业书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。 -
catalogId
— 正在访问的数据目录的目录 ID(账户 ID)。当为空时,将使用调用方的默认账户 ID。
示例代码:
getJDBCSink(catalogConnection = "my-connection-name", options = JsonOptions("""{"dbtable": "my-jdbc-table", "database": "my-jdbc-db"}"""), redshiftTmpDir = "", transformationContext = "datasink4")
返回 DataSink
。
def getSink (定义 <;引擎名称>;)
def getSink( connectionType : String,
options : JsonOptions,
transformationContext : String = ""
) : DataSink
创建可向 Amazon Simple Storage Service (Amazon S3)、JDBC 或 AWS Glue 数据目录 等目标写入数据的 DataSink。
-
connectionType
— 连接的类型。请参阅 AWS Glue 中的 ETL 的连接类型和选项。 -
options
— JSON 名称-值对的字符串,提供与数据接收器建立连接所需的附加信息。请参阅 AWS Glue 中的 ETL 的连接类型和选项。 -
transformationContext
— 与作业书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。
返回 DataSink
。
def getSinkWithFormat (定义 <;引擎名称>;)
def getSinkWithFormat( connectionType : String,
options : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSink
创建可向 Amazon S3、JDBC 或 数据目录 等目标写入数据并设置要写出到目标的数据格式的 DataSink。
-
connectionType
— 连接的类型。请参阅 AWS Glue 中的 ETL 的连接类型和选项。 -
options
— JSON 名称-值对的字符串,提供与数据接收器建立连接所需的附加信息。请参阅 AWS Glue 中的 ETL 的连接类型和选项。 -
transformationContext
— 与作业书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。 -
format
— 要写出到目标的数据的格式。 -
formatOptions
— JSON 名称-值对的字符串,提供用于设置目标处的数据格式的附加选项。请参阅 格式选项。
返回 DataSink
。
def getSource (定义 <;引擎名称>;)
def getSource( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = ""
pushDownPredicate
) : DataSource
创建可从 Amazon S3、JDBC 或 AWS Glue 数据目录 等源中读取数据的 DataSource 性状。
-
connectionType
— 数据源的类型。请参阅 AWS Glue 中的 ETL 的连接类型和选项。 -
connectionOptions
— JSON 名称-值对的字符串,提供与数据源建立连接所需的附加信息。请参阅 AWS Glue 中的 ETL 的连接类型和选项。 -
transformationContext
— 与作业书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。 -
pushDownPredicate
— 预测分区列。
返回 DataSource
。
def getSourceWithFormat (定义 Amazon EFS)
def getSourceWithFormat( connectionType : String,
options : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSource
创建一个 DataSource 性状,以从 Amazon S3、JDBC 或 AWS Glue 数据目录 等源读取数据并设置源中所存储数据的格式。
-
connectionType
— 数据源的类型。请参阅 AWS Glue 中的 ETL 的连接类型和选项。 -
options
— JSON 名称-值对的字符串,提供与数据源建立连接所需的附加信息。请参阅 AWS Glue 中的 ETL 的连接类型和选项。 -
transformationContext
— 与作业书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。 -
format
— 源中所存储数据的格式。当connectionType
为“s3”时,您还可以指定format
。 可以是以下值之一:“avro”、“csv”、“grokLog”、“ion”、“json”、“xml”、“parquet”或“orc”。 -
formatOptions
— JSON 名称-值对的字符串,提供用于在源中分析数据的附加选项。请参阅 格式选项。
返回 DataSource
。
def getSparkSession (定义 Amazon EFS)
def getSparkSession : SparkSession
获取与此 SparkSession
关联的 GlueContext 对象。 使用此 SparkSession 对象可注册表和 UDFs,以便与从 DataFrame
创建的 DynamicFrames 一起使用。
返回 SparkSession。
def this
def this( sc : SparkContext,
minPartitions : Int,
targetPartitions : Int )
使用指定的 SparkContext
、最小分区数和目标分区数创建 GlueContext
对象。
-
sc
—。SparkContext
-
minPartitions
— 最小分区数。 -
targetPartitions
— 目标分区数。
返回 GlueContext
。
def this
def this( sc : SparkContext )
使用提供的 GlueContext
创建 SparkContext
对象。 将最小分区数设置为 10,将目标分区数设置为 20。
-
sc
—。SparkContext
返回 GlueContext
。
def this
def this( sparkContext : JavaSparkContext )
使用提供的 GlueContext
创建 JavaSparkContext
对象。 将最小分区数设置为 10,将目标分区数设置为 20。
-
sparkContext
—。JavaSparkContext
返回 GlueContext
。