

# Amazon Glue for Spark 中适用于 ETL 的连接类型和选项
<a name="aws-glue-programming-etl-connect"></a>

在 Amazon Glue for Spark 中，各种 PysPark 和 Scala 方法和转换使用 `connectionType` 参数指定连接类型。它们使用 `connectionOptions` 或 `options` 参数指定连接选项。

`connectionType` 参数可以采用下表中显示的值。下面几个部分介绍每种类型所关联的 `connectionOptions`（或 `options`）参数值。除非另有说明，否则这些参数会在连接用作源或接收器时使用。

有关展示如何设置和使用连接选项的代码示例，请参阅每种连接类型的主页。


| `connectionType` | 连接到 | 
| --- | --- | 
| [dynamodb](aws-glue-programming-etl-connect-dynamodb-home.md) | [Amazon DynamoDB](https://docs.amazonaws.cn/amazondynamodb/latest/developerguide/) 数据库 | 
| [kinesis](aws-glue-programming-etl-connect-kinesis-home.md) | [Amazon Kinesis Data Streams](https://docs.amazonaws.cn/streams/latest/dev/introduction.html) | 
| [S3](aws-glue-programming-etl-connect-s3-home.md) | [Amazon S3](https://docs.amazonaws.cn/AmazonS3/latest/dev/) | 
| [documentdb](aws-glue-programming-etl-connect-documentdb-home.md#aws-glue-programming-etl-connect-documentdb) | [Amazon DocumentDB (with MongoDB compatibility)](https://docs.amazonaws.cn/documentdb/latest/developerguide/) 数据库 | 
| [opensearch](aws-glue-programming-etl-connect-opensearch-home.md) | [Amazon OpenSearch Service](https://docs.amazonaws.cn/opensearch-service/latest/developerguide/)。 | 
| [redshift](aws-glue-programming-etl-connect-redshift-home.md) | [Amazon Redshift](https://www.amazonaws.cn/redshift/) 数据库 | 
| [kafka](aws-glue-programming-etl-connect-kafka-home.md) |  [Kafka](https://kafka.apache.org/) 或 [Amazon Managed Streaming for Apache Kafka](https://docs.amazonaws.cn/msk/latest/developerguide/what-is-msk.html) | 
| [azurecosmos](aws-glue-programming-etl-connect-azurecosmos-home.md) | Azure Cosmos for NoSQL。 | 
| [azuresql](aws-glue-programming-etl-connect-azuresql-home.md) | Azure SQL。 | 
| [bigquery](aws-glue-programming-etl-connect-bigquery-home.md) | Google BigQuery。 | 
| [mongodb](aws-glue-programming-etl-connect-mongodb-home.md) | [MongoDB](https://www.mongodb.com/what-is-mongodb) 数据库，包括 MongoDB Atlas。 | 
| [sqlserver](aws-glue-programming-etl-connect-jdbc-home.md) |  Microsoft SQL Server 数据库（请参阅[JDBC 连接](aws-glue-programming-etl-connect-jdbc-home.md)） | 
| [mysql](aws-glue-programming-etl-connect-jdbc-home.md) | [MySQL](https://www.mysql.com/) 数据库（请参阅[JDBC 连接](aws-glue-programming-etl-connect-jdbc-home.md)） | 
| [oracle](aws-glue-programming-etl-connect-jdbc-home.md) | [Oracle](https://www.oracle.com/database/) 数据库（请参阅[JDBC 连接](aws-glue-programming-etl-connect-jdbc-home.md)） | 
| [postgresql](aws-glue-programming-etl-connect-jdbc-home.md) |  [PostgreSQL](https://www.postgresql.org/) 数据库（请参阅[JDBC 连接](aws-glue-programming-etl-connect-jdbc-home.md)） | 
| [saphana](aws-glue-programming-etl-connect-saphana-home.md) | SAP HANA。 | 
| [snowflake](aws-glue-programming-etl-connect-snowflake-home.md) | [Snowflake](https://www.snowflake.com/) 数据湖 | 
| [teradata](aws-glue-programming-etl-connect-teradata-home.md) | Teradata Vantage。 | 
| [vertica](aws-glue-programming-etl-connect-vertica-home.md) | Vertica。 | 
| [custom.\$1](#aws-glue-programming-etl-connect-market) | Spark、Athena 或 JDBC 数据存储（请参阅[自定义和 Amazon Web Services Marketplace connectionType 值](#aws-glue-programming-etl-connect-market)）  | 
| [marketplace.\$1](#aws-glue-programming-etl-connect-market) | Spark、Athena 或 JDBC 数据存储（请参阅[自定义和 Amazon Web Services Marketplace connectionType 值](#aws-glue-programming-etl-connect-market)）  | 

## Amazon Glue 5.0 for Spark 中 ETL 的 DataFrame 选项
<a name="aws-glue-programming-etl-connect-dataframe"></a>

DataFrame 是一个类似于表的按命名列组织的数据集，并支持函数式（map/reduce/filter/等）操作和 SQL 操作（select、project、aggregate）。

要为 Glue 支持的数据来源创建 DataFrame，需要以下项：
+ 数据来源连接器 `ClassName`
+ 数据来源连接 `Options`

同样，要将 DataFrame 写入 Glue 支持的数据接收器，也需要相同项：
+ 数据接收器连接器 `ClassName`
+ 数据接收器连接 `Options`

请注意，DataFrame 不支持 Amazon Glue 功能（例如作业书签）和 DynamicFrame 选项（例如 `connectionName`）。有关 DataFrame 及其支持的操作的更多详细信息，请参阅 [DataFrame](https://spark.apache.org/docs/3.5.2/api/python/reference/pyspark.sql/dataframe.html) 的 Spark 文档。

### 指定连接器类名
<a name="aws-glue-programming-etl-connect-dataframe-classname"></a>

要指定数据来源/接收器的 `ClassName`，请使用 `.format` 选项提供定义数据来源/接收器的相应连接器 `ClassName`。

**JDBC 连接器**  
对于 JDBC 连接器，请指定 `jdbc` 作为 `.format` 选项的值，并在 `driver` 选项中提供 JDBC 驱动程序 `ClassName`。

```
df = spark.read.format("jdbc").option("driver", "<DATA SOURCE JDBC DRIVER CLASSNAME>")...

df.write.format("jdbc").option("driver", "<DATA SINK JDBC DRIVER CLASSNAME>")...
```

下表列出了 Amazon Glue for DataFrames 支持的数据来源的 JDBC 驱动程序 `ClassName`。

| 数据来源 | 驱动程序类名 | 
| --- |--- |
| PostgreSQL | org.postgresql.Driver | 
| Oracle | oracle.jdbc.driver.OracleDriver | 
| SQLServer | com.microsoft.sqlserver.jdbc.SQLServerDriver | 
| MySQL | com.mysql.jdbc.Driver | 
| SAPHana | com.sap.db.jdbc.Driver | 
| Teradata | com.teradata.jdbc.TeraDriver | 

**Spark 连接器**  
对于 Spark 连接器，请将连接器的 `ClassName` 指定为 `.format` 选项的值。

```
df = spark.read.format("<DATA SOURCE CONNECTOR CLASSNAME>")...

df.write.format("<DATA SINK CONNECTOR CLASSNAME>")...
```

下表列出了 Amazon Glue for DataFrames 中支持的数据来源的 Spark 连接器 `ClassName`。

| 数据来源 | 类名 | 
| --- |--- |
| MongoDB/DocumentDB | glue.spark.mongodb | 
| Redshift | io.github.spark\$1redshift\$1community.spark.redshift | 
| AzureCosmos | cosmos.oltp | 
| AzureSQL | com.microsoft.sqlserver.jdbc.spark | 
| BigQuery | com.google.cloud.spark.bigquery | 
| OpenSearch | org.opensearch.spark.sql | 
| Snowflake | net.snowflake.spark.snowflake | 
| Vertica | com.vertica.spark.datasource.VerticaSource | 

### 指定连接选项
<a name="aws-glue-programming-etl-connect-dataframe-connection-options"></a>

要指定与数据来源/接收器的连接的 `Options`，请使用 `.option(<KEY>, <VALUE>)` 提供单个选项或使用 `.options(<MAP>)` 提供多个选项作为键值映射。

每个数据来源/接收器都支持自己的一组连接 `Options`。有关可用 `Options` 的详细信息，请参阅下表中列出的特定数据来源/接收器 Spark 连接器的公共文档。
+ [JDBC](https://spark.apache.org/docs/3.5.2/sql-data-sources-jdbc.html)
+ [MongoDB/DocumentDB](https://www.mongodb.com/docs/spark-connector/v10.4/)
+ [Redshift](https://docs.amazonaws.cn/redshift/latest/mgmt/spark-redshift-connector.html)
+ [AzureCosmos](https://github.com/Azure/azure-cosmosdb-spark)
+ [AzureSQL](https://learn.microsoft.com/en-us/sql/connect/spark/connector?view=sql-server-ver16)
+ [BigQuery](https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example)
+ [OpenSearch](https://github.com/opensearch-project/opensearch-hadoop/blob/main/USER_GUIDE.md#apache-spark)
+ [Snowflake](https://docs.snowflake.com/en/user-guide/spark-connector-use#setting-configuration-options-for-the-connector)
+ [Vertica](https://github.com/vertica/spark-connector)

### 示例
<a name="aws-glue-programming-etl-connect-dataframe-examples"></a>

以下示例从 PostgreSQL 读取并写入 SnowFlake：

**Python**  
示例：

```
from awsglue.context import GlueContext
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

dataSourceClassName = "jdbc"
dataSourceOptions = {
  "driver": "org.postgresql.Driver",
  "url": "<url>",
  "user": "<user>",
  "password": "<password>",
  "dbtable": "<dbtable>",
}

dataframe = spark.read.format(className).options(**options).load()

dataSinkClassName = "net.snowflake.spark.snowflake"
dataSinkOptions = {
  "sfUrl": "<url>",
  "sfUsername": "<username>",
  "sfPassword": "<password>",
  "sfDatabase" -> "<database>",                              
  "sfSchema" -> "<schema>",                       
  "sfWarehouse" -> "<warehouse>"  
}

dataframe.write.format(dataSinkClassName).options(**dataSinkOptions).save()
```

**Scala**  
示例：

```
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().getOrCreate()

val dataSourceClassName = "jdbc"
val dataSourceOptions = Map(
  "driver" -> "org.postgresql.Driver",
  "url" -> "<url>",
  "user" -> "<user>",
  "password" -> "<password>",
  "dbtable" -> "<dbtable>"
)

val dataframe = spark.read.format(dataSourceClassName).options(dataSourceOptions).load()

val dataSinkClassName = "net.snowflake.spark.snowflake"
val dataSinkOptions = Map(
  "sfUrl" -> "<url>",
  "sfUsername" -> "<username>",
  "sfPassword" -> "<password>",
  "sfDatabase" -> "<database>",
  "sfSchema" -> "<schema>",
  "sfWarehouse" -> "<warehouse>"
)

dataframe.write.format(dataSinkClassName).options(dataSinkOptions).save()
```

## 自定义和 Amazon Web Services Marketplace connectionType 值
<a name="aws-glue-programming-etl-connect-market"></a>

这些功能包括：
+ `"connectionType": "marketplace.athena"`：指定与 Amazon Athena 数据存储的连接。连接使用来自 Amazon Web Services Marketplace 的连接器。
+ `"connectionType": "marketplace.spark"`：指定与 Apache Spark 数据存储的连接。连接使用来自 Amazon Web Services Marketplace 的连接器。
+ `"connectionType": "marketplace.jdbc"`：指定与 JDBC 数据存储的连接。连接使用来自 Amazon Web Services Marketplace 的连接器。
+ `"connectionType": "custom.athena"`：指定与 Amazon Athena 数据存储的连接。连接使用您上传到 Amazon Glue Studio 的自定义连接器。
+ `"connectionType": "custom.spark"`：指定与 Apache Spark 数据存储的连接。连接使用您上传到 Amazon Glue Studio 的自定义连接器。
+ `"connectionType": "custom.jdbc"`：指定与 JDBC 数据存储的连接。连接使用您上传到 Amazon Glue Studio 的自定义连接器。

### 适用于类型 custom.jdbc 或 marketplace.jdbc 的连接选项
<a name="marketplace-jdbc-connect-options"></a>
+ `className` – 字符串，必需，驱动程序类名称。
+ `connectionName` – 字符串，必需，与连接器关联的连接的名称。
+ `url` – 字符串，必需，用于建立与数据源的连接且带占位符（`${}`）的 JDBC URL。占位符 `${secretKey}` 替换为 Amazon Secrets Manager 中同名的密钥。有关构建 URL 的详细信息，请参阅数据存储文档。
+ `secretId` 或 `user/password` – 字符串，必需，用于检索 URL 的凭证。
+ `dbTable` 或 `query` – 字符串，必需，从中获取数据的表或 SQL 查询。您可以指定 `dbTable` 或 `query`，但不能同时指定两者。
+ `partitionColumn` – 字符串，可选，用于分区的整数列的名称。此选项仅在包含 `lowerBound`、`upperBound` 和 `numPartitions` 时有效。此选项的工作方式与 Spark SQL JDBC 阅读器中的工作方式相同。有关更多信息，请参阅《Apache Spark SQL、DataFrame 和 Dataset 指南》中的 [JDBC 转换到其他数据库](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html)**。

  `lowerBound` 和 `upperBound` 值用于确定分区步长，而不是用于筛选表中的行。对表中的所有行进行分区并返回。
**注意**  
使用查询（而不是表名称）时，您应验证查询是否适用于指定的分区条件。例如：  
如果您的查询格式为 `"SELECT col1 FROM table1"`，则在使用分区列的查询结尾附加 `WHERE` 子句，以测试查询。
如果您的查询格式为 `SELECT col1 FROM table1 WHERE col2=val"`，则通过 `AND` 和使用分区列的表达式扩展 `WHERE` 子句，以测试查询。
+ `lowerBound` – 整数，可选，用于确定分区步长的最小 `partitionColumn` 值。
+ `upperBound` – 整数，可选，用于确定分区步长的最大 `partitionColumn` 值。
+ `numPartitions` – 整数，可选，分区数。此值以及 `lowerBound`（包含）和 `upperBound`（排除）为用于拆分 `partitionColumn` 而生成的 `WHERE` 子句表达式构成分区步长。
**重要**  
请注意分区的数量，因为分区过多可能会导致外部数据库系统出现问题。
+ `filterPredicate` – 字符串，可选，用于筛选源数据的额外条件子句。例如：

  ```
  BillingCity='Mountain View'
  ```

  使用*查询*（而不是*表*名称）时，您应验证查询是否适用于指定的 `filterPredicate`。例如：
  + 如果您的查询格式为 `"SELECT col1 FROM table1"`，则在使用筛选条件谓词的查询结尾附加 `WHERE` 子句，以测试查询。
  + 如果您的查询格式为 `"SELECT col1 FROM table1 WHERE col2=val"`，则通过 `AND` 和使用筛选条件谓词的表达式扩展 `WHERE` 子句，以测试查询。
+ `dataTypeMapping` – 目录，可选，用于构建从 **JDBC** 数据类型到 **Glue** 数据类型的映射的自定义数据类型映射。例如，选项 `"dataTypeMapping":{"FLOAT":"STRING"}` 会通过调用驱动程序的 `ResultSet.getString()` 方法，将 JDBC 类型 `FLOAT` 的数据字段映射到 Java `String` 类型，并将其用于构建 Amazon Glue 记录。`ResultSet` 对象由每个驱动程序实现，因此行为特定于您使用的驱动程序。请参阅 JDBC 驱动程序的文档，了解驱动程序执行转换的方式。
+ 目前受支持的 Amazon Glue 数据类型包括：
  + DATE
  + string
  + TIMESTAMP
  + INT
  + FLOAT
  + LONG
  + BIGDECIMAL
  + BYTE
  + SHORT
  + DOUBLE

   支持的 JDBC 数据类型为 [Java8 java.sql.types](https://docs.oracle.com/javase/8/docs/api/java/sql/Types.html)。

  默认数据类型映射（从 JDBC 到 Amazon Glue）如下：
  +  DATE -> DATE
  +  VARCHAR -> STRING
  +  CHAR -> STRING
  +  LONGNVARCHAR -> STRING
  +  TIMESTAMP -> TIMESTAMP
  +  INTEGER -> INT
  +  FLOAT -> FLOAT
  +  REAL -> FLOAT
  +  BIT -> BOOLEAN
  +  BOOLEAN -> BOOLEAN
  +  BIGINT -> LONG
  +  DECIMAL -> BIGDECIMAL
  +  NUMERIC -> BIGDECIMAL
  +  TINYINT -> SHORT
  +  SMALLINT -> SHORT
  +  DOUBLE -> DOUBLE

  如果将自定义数据类型映射与选项 `dataTypeMapping` 结合使用，则可以覆盖默认数据类型映射。只有 `dataTypeMapping` 选项中列出的 JDBC 数据类型会受到影响；默认映射适用于所有其他 JDBC 数据类型。如果需要，您可以为其他 JDBC 数据类型添加映射。如果默认映射或自定义映射中均未包含 JDBC 数据类型，则数据类型默认转换为 Amazon Glue `STRING` 数据类型。

以下 Python 代码示例演示了如何使用 Amazon Web Services Marketplace JDBC 驱动程序从 JDBC 数据库读取数据。它演示了如何从数据库读取数据并将数据写入 S3 位置。

```
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
     
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
     
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    ## @type: DataSource
    ## @args: [connection_type = "marketplace.jdbc", connection_options = 
     {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, 
       name, department from department where id < 200","numPartitions":"4",
       "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"},
        transformation_ctx = "DataSource0"]
    ## @return: DataSource0
    ## @inputs: []
    DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = 
      "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},
      "upperBound":"200","query":"select id, name, department from department where 
       id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0",
       "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0")
    ## @type: ApplyMapping
    ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string",
      "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"]
    ## @return: Transform0
    ## @inputs: [frame = DataSource0]
    Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string",
      "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], 
       transformation_ctx = "Transform0")
    ## @type: DataSink
    ## @args: [connection_type = "s3", format = "json", connection_options = {"path": 
     "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"]
    ## @return: DataSink0
    ## @inputs: [frame = Transform0]
    DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, 
      connection_type = "s3", format = "json", connection_options = {"path": 
      "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0")
    job.commit()
```

### 适用于类型 custom.athena 或 marketplace.athena 的连接选项
<a name="marketplace-athena-connect-options"></a>
+ `className` – 字符串，必需，驱动程序类名称。当您使用 Athena-CloudWatch 连接器时，此参数值是类名称（例如 `"com.amazonaws.athena.connectors"`）的前缀。Athena-CloudWatch 连接器由两个类组成：元数据处理程序和记录处理程序。如果您在此处提供通用前缀，则 API 会根据该前缀加载正确的类。
+ `tableName` – 字符串，必需，要读取的 CloudWatch 日志流的名称。此代码段使用特别视图名称 `all_log_streams`，这意味着返回的动态数据框将包含日志组中所有日志流的数据。
+ `schemaName` – 字符串，必需，要从中读取数据的 CloudWatch 日志流的名称。例如 `/aws-glue/jobs/output`。
+ `connectionName` – 字符串，必需，与连接器关联的连接的名称。

有关此连接器的其他选项，请参阅 GitHub 上的 [Amazon Athena CloudWatch 连接器自述](https://github.com/awslabs/aws-athena-query-federation/tree/master/athena-cloudwatch)文件。

以下 Python 代码示例演示了如何从使用 Amazon Web Services Marketplace 连接器的 Athena 数据存储读取数据。它演示了如何从 Athena 读取数据并将数据写入 S3 位置。

```
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
     
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
     
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    ## @type: DataSource
    ## @args: [connection_type = "marketplace.athena", connection_options = 
     {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output",
      "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"]
    ## @return: DataSource0
    ## @inputs: []
    DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = 
      "marketplace.athena", connection_options = {"tableName":"all_log_streams",,
      "schemaName":"/aws-glue/jobs/output","connectionName":
      "test-connection-athena"}, transformation_ctx = "DataSource0")
    ## @type: ApplyMapping
    ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string",
      "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"]
    ## @return: Transform0
    ## @inputs: [frame = DataSource0]
    Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string",
      "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], 
       transformation_ctx = "Transform0")
    ## @type: DataSink
    ## @args: [connection_type = "s3", format = "json", connection_options = {"path": 
     "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"]
    ## @return: DataSink0
    ## @inputs: [frame = Transform0]
    DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, 
      connection_type = "s3", format = "json", connection_options = {"path": 
      "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0")
    job.commit()
```

### 适用于类型 custom.spark 或 marketplace.spark 的连接选项
<a name="marketplace-spark-connect-options"></a>
+ `className` – 字符串，必需，连接器类名称。
+ `secretId` – 字符串，可选，用于检索连接器连接的凭证。
+ `connectionName` – 字符串，必需，与连接器关联的连接的名称。
+ 其他选项取决于数据存储。例如，OpenSearch 配置选项以前缀 `es` 开头，正如[适用于 Apache Hadoop 的 Elasticsearch](https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html) 文档中所述。Spark 与 Snowflake 的连接使用 `sfUser` 和 `sfPassword` 等连接，正如《连接 Snowflake》**指南中的[使用 Spark 连接器](https://docs.snowflake.com/en/user-guide/spark-connector-use.html)所述。

以下 Python 代码示例演示了如何从使用 `marketplace.spark` 连接的 OpenSearch 数据存储读取数据。

```
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
     
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
     
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    ## @type: DataSource
    ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test",
      "es.nodes.wan.only":"true","es.nodes":"https://<Amazon endpoint>",
      "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"]
    ## @return: DataSource0
    ## @inputs: []
    DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = 
      "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only":
      "true","es.nodes":"https://<Amazon endpoint>","connectionName":
      "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0")
    ## @type: DataSink
    ## @args: [connection_type = "s3", format = "json", connection_options = {"path": 
         "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"]
    ## @return: DataSink0
    ## @inputs: [frame = DataSource0]
    DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, 
       connection_type = "s3", format = "json", connection_options = {"path": 
       "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0")
    job.commit()
```

## 常规选项
<a name="aws-glue-programming-etl-connect-general-options"></a>

本节中的选项以 `connection_options` 形式提供，但不是专门适用于一个连接器。

配置书签时通常使用以下参数。它们可能适用于 Amazon S3 或 JDBC 工作流程。有关更多信息，请参阅 [使用作业书签](programming-etl-connect-bookmarks.md)。
+ `jobBookmarkKeys` - 列名称的数组。
+ `jobBookmarkKeysSortOrder` - 定义如何根据排序顺序比较值的字符串。有效值：`"asc"`、`"desc"`。
+ `useS3ListImplementation` - 用于在列出 Amazon S3 存储桶内容时管理内存性能。有关更多信息，请参阅 [Optimize memory management in Amazon Glue](https://www.amazonaws.cn/blogs/big-data/optimize-memory-management-in-aws-glue/)。