AmazonScala GlueContext API - Amazon连接词
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

AmazonScala GlueContext API

程序包:com.amazonaws.services.glue

class GlueContext extends SQLContext(sc) ( @transient val sc : SparkContext, val defaultSourcePartitioner : PartitioningStrategy )

GlueContext是读取和写入DynamicFrame来自和运送 Amazon Simple Storage Service (Amazon S3),AmazonGlue 数据目录、JDBC 等。此类提供实用程序函数,用于创建可用来读取和写入 DynamicFrame数据源特性DataSink 对象。

如果从源创建的分区数小于分区的最小阈值(默认值为 10),您还可以使用 GlueContext 设置 DynamicFrame 中的目标分区数量(默认值为 20)。

def 添加列

def addIngestionColumns( df : DataFrame, timeGranularity : String = "") : dataFrame

附加摄入时间列,如ingest_yearingest_monthingest_dayingest_houringest_minute到输入DataFrame。此函数会自动生成在由Amazon当您指定以 Amazon 简单存储服务为目标的数据目录表时,请 Glue 合。此函数会自动更新输出表上的引入时间列的分区。这样可以在输入时间自动对输出数据进行分区,而无需在输入数据中显式的引入时间列。

  • dataFramedataFrame将摄入时间列附加到。

  • timeGranularity— 时间列的粒度。有效值为”day“,”hour" 和 "minute“。例如,如果”hour” 传递给函数,原始的dataFrame将有”ingest_year“,”ingest_month“,”ingest_day“, 和”ingest_hour” 时间列附加。

在追加时间粒度列后返回数据框。

示例:

glueContext.addIngestionColumns(dataFrame, "hour")

def getCatalogSink

def getCatalogSink( database : String, tableName : String, redshiftTmpDir : String = "", transformationContext : String = "" additionalOptions: JsonOptions = JsonOptions.empty, catalogId: String = null ) : DataSink

创建DataSink,写入数据目录所定义的表中的指定位置。

  • database— 数据目录中的数据库名称。

  • tableName— 数据目录中的表名称。

  • redshiftTmpDir— 用于某些数据接收器的临时暂存目录。设置为 默认情况下为空。

  • transformationContext— 与作业书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。

  • additionalOptions— 提供给的额外选项AmazonGlue。

  • catalogId— 正在访问的数据目录的目录 ID(账户 ID)。当为空时,将使用调用方的默认账户 ID。

返回 DataSink

def getCatalogSource

def getCatalogSource( database : String, tableName : String, redshiftTmpDir : String = "", transformationContext : String = "" pushDownPredicate : String = " " additionalOptions: JsonOptions = JsonOptions.empty, catalogId: String = null ) : DataSource

创建数据源特性,以从数据目录中的表定义读取数据。

  • database— 数据目录中的数据库名称。

  • tableName— 数据目录中的表名称。

  • redshiftTmpDir— 用于某些数据接收器的临时暂存目录。设置为 默认情况下为空。

  • transformationContext— 与作业书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。

  • pushDownPredicate— 筛选分区,而不必列出并读取数据集中的所有文件。有关更多信息,请参阅 使用下推谓词进行预筛选

  • additionalOptions – 可选名称/值对的集合。可能的选项包括中的 ETL 的连接类型和选项Amazon连接词但除外endpointUrlstreamNamebootstrap.serverssecurity.protocoltopicNameclassification, 和delimiter

  • catalogId— 正在访问的数据目录的目录 ID(账户 ID)。当为空时,将使用调用方的默认账户 ID。

返回 DataSource

def getJDBCSink

def getJDBCSink( catalogConnection : String, options : JsonOptions, redshiftTmpDir : String = "", transformationContext : String = "", catalogId: String = null ) : DataSink

创建DataSink写入 JDBC 数据库的Connection对象在数据目录中。Connection 对象具有用于连接 JDBC 接收器的信息,包括 URL、用户名、密码、VPC、子网和安全组。

  • catalogConnection— 数据目录中包含要写入的 JDBC URL 的连接名称。

  • options— JSON 名称-值对的字符串,提供写入 JDBC 数据存储所需的附加信息。这包括:

    • dbtable(必填)- JDBC 表名称。对于在数据库中支持架构的 JDBC 数据存储,指定 schema.table-name。如果未提供架构,则使用默认的“public”架构。以下示例显示一个指向名为 test 的架构的选项参数和数据库 test_db 中一个名为 test_table 的表。

      options = JsonOptions("""{"dbtable": "test.test_table", "database": "test_db"}""")
    • database(必填)- JDBC 数据库名称。

    • 任何其他选项直接传递到 SparkSQL JDBC 写入器。有关更多信息,请参阅 Spark 的 Redshift 数据源

  • redshiftTmpDir— 用于某些数据接收器的临时暂存目录。设置为 默认情况下为空。

  • transformationContext— 与作业书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。

  • catalogId— 正在访问的数据目录的目录 ID(账户 ID)。当为空时,将使用调用方的默认账户 ID。

示例代码:

getJDBCSink(catalogConnection = "my-connection-name", options = JsonOptions("""{"dbtable": "my-jdbc-table", "database": "my-jdbc-db"}"""), redshiftTmpDir = "", transformationContext = "datasink4")

返回 DataSink

def getSink

def getSink( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "" ) : DataSink

创建DataSink将数据写入 Amazon Simple Storage Service (Amazon S3)、JDBC 或AmazonGlue 数据目录。

返回 DataSink

def getSinkWithFormat

def getSinkWithFormat( connectionType : String, options : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSink

创建DataSink,将数据写入 Amazon S3、JDBC 或数据目录等目标,并设置要写出的目标的数据格式的。

  • connectionType - 连接的类型。请参阅 中的 ETL 的连接类型和选项Amazon连接词

  • options – JSON 名称-值对的字符串,提供与数据接收器建立连接所需的附加信息。请参阅 中的 ETL 的连接类型和选项Amazon连接词

  • transformationContext— 与作业书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。

  • format – 要写出到目标的数据的格式。

  • formatOptions— JSON 名称-值对的字符串,提供用于设置目标处的数据格式的附加选项。请参阅 格式选项

返回 DataSink

def getSource

def getSource( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "" pushDownPredicate ) : DataSource

创建数据源特性,以从 Amazon S3、JDBC 或等源中读取数据的AmazonGlue 数据目录。

返回 DataSource

def getSourceWithFormat

def getSourceWithFormat( connectionType : String, options : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSource

创建数据源特性,以从 Amazon S3、JDBC 或等源中读取数据的AmazonGlue 数据目录,并设置源中所存储数据的格式。

  • connectionType— 数据源的类型。请参阅 中的 ETL 的连接类型和选项Amazon连接词

  • options— JSON 名称-值对的字符串,提供与数据源建立连接所需的附加信息。请参阅 中的 ETL 的连接类型和选项Amazon连接词

  • transformationContext— 与作业书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。

  • format— 源中所存储数据的格式。当 connectionType 为“s3”时,您也可以指定 format。可以是以下值之一:“avro”、“csv”、“grokLog”、“ion”、“json”、“xml”、“parquet”或“orc”。

  • formatOptions— JSON 名称-值对的字符串,提供用于在源中分析数据的附加选项。请参阅 格式选项

返回 DataSource

示例

Scala Simple Storage Service 源:

val datasource0 = glueContext.getSourceWithFormat( connectionType="s3", options =JsonOptions(s"""{"paths": [ "s3://csv/nycflights.csv"]}"""), transformationContext = "datasource0", format = "csv", formatOptions=JsonOptions(s"""{"withHeader":"true","separator": ","}""") ).getDynamicFrame()

PostgreSQL 源:

val datasource0 = glueContext.getSourceWithFormat( connectionType="postgresql", options =JsonOptions(s"""{ "url":"jdbc:postgresql://databasePostgres-1.rds.amazonaws.com:5432/testdb", "dbtable": "public.company", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()

MySQL 源:

val datasource0 = glueContext.getSourceWithFormat( connectionType="mysql", options =JsonOptions(s"""{ "url":"jdbc:mysql://databaseMysql-1.rds.amazonaws.com:3306/testdb", "dbtable": "athenatest_nycflights13_csv", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()

def getSparkSession

def getSparkSession : SparkSession

获取与此 GlueContext 关联的 SparkSession 对象。使用此 SparkSession 对象可注册要用于从 DynamicFrame 创建的 DataFrame 的表和 UDF。

返回 SparkSession。

def this

def this( sc : SparkContext, minPartitions : Int, targetPartitions : Int )

使用指定的 SparkContext、最小分区数和目标分区数创建 GlueContext 对象。

  • sc — 这些区域有: SparkContext.

  • minPartitions – 最小分区数。

  • targetPartitions – 目标分区数。

返回 GlueContext

def this

def this( sc : SparkContext )

使用提供的 GlueContext 创建一个 SparkContext 对象。将最小分区数设置为 10,将目标分区数设置为 20。

  • sc — 这些区域有: SparkContext.

返回 GlueContext

def this

def this( sparkContext : JavaSparkContext )

使用提供的 GlueContext 创建一个 JavaSparkContext 对象。将最小分区数设置为 10,将目标分区数设置为 20。

  • sparkContext — 这些区域有: JavaSparkContext.

返回 GlueContext