本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
Amazon Glue 中的 ETL 的连接类型和选项
在Amazon Glue,各种 PySpark 和 Scala 方法和转换使用connectionType
参数指定连接类型。它们使用 connectionOptions
或 options
参数指定连接选项。
connectionType
参数可以采用下表中显示的值。下面几个部分介绍每种类型所关联的 connectionOptions
(或 options
)参数值。除非另有说明,否则这些参数会在连接用作源或接收器时使用。
有关展示如何设置和使用连接选项的代码示例,请参阅示例:设置连接类型和选项。
connectionType |
连接到 |
---|---|
custom.* | Spark、Athena 或 JDBC 数据存储(请参阅自定义和 Amazon Web Services Marketplace connectionType 值) |
documentdb | Amazon DocumentDB (with MongoDB compatibility) 数据库 |
dynamodb | Amazon DynamoDB 数据库 |
kafka | Kafka |
kinesis | Amazon Kinesis Data Streams |
marketplace.* | Spark、Athena 或 JDBC 数据存储(请参阅自定义和 Amazon Web Services Marketplace connectionType 值) |
mongodb | MongoDB |
mysql | MySQL |
oracle | Oracle |
orc | Amazon Simple Storage Service(Amazon S3)中以 Apache Hive 优化的行列式(ORC) |
parquet | Amazon S3 中以 Apache Parquet |
postgresql | PostgreSQL |
redshift | Amazon Redshift |
S3 | Amazon S3 |
sqlserver | Microsoft SQL Server 数据库(请参阅JDBC connectionType 值) |
"connectionType": "documentdb"
指定与 Amazon DocumentDB (with MongoDB compatibility) 的连接。
源连接和接收器连接的连接选项不同。
"connectionType": "documentdb" as Source
将 "connectionType": "documentdb"
用作源时可使用以下连接选项:
-
"uri"
:(必需)要从中读取数据的 Amazon DocumentDB 主机,格式为mongodb://<host>:<port>
。 -
"database"
:(必需)要从中读取数据的 Amazon DocumentDB 数据库。 -
"collection"
:(必需)要从中读取数据的 Amazon DocumentDB 连接。 -
"username"
:(必需)Amazon DocumentDB 用户名。 -
"password"
:(必需)Amazon DocumentDB 密码。 -
"ssl"
:(如果使用 SSL,则必需)如果您的连接使用 SSL,则必须包含此选项且值为"true"
。 -
"ssl.domain_match"
:(如果使用 SSL,则必需)如果您的连接使用 SSL,则必须包含此选项且值为"false"
。 -
"batchSize"
:(可选)每个批处理返回的文档数量,在内部批处理的游标中使用。 -
"partitioner"
:(可选)从 Amazon DocumentDB 中读取输入数据的分区器的类名称。该连接器提供以下分区器:-
MongoDefaultPartitioner
(默认值) -
MongoSamplePartitioner
-
MongoShardedPartitioner
-
MongoSplitVectorPartitioner
-
MongoPaginateByCountPartitioner
-
MongoPaginateBySizePartitioner
-
-
"partitionerOptions"
:(可选)指定分区器的选项。各个分区器支持的选项如下:-
MongoSamplePartitioner
:partitionKey
,partitionSizeMB
,samplesPerPartition
-
MongoShardedPartitioner
:shardkey
-
MongoSplitVectorPartitioner
:partitionKey
、partitionSizeMB -
MongoPaginateByCountPartitioner
:partitionKey
,numberOfPartitions
-
MongoPaginateBySizePartitioner
:partitionKey
、partitionSizeMB
有关这些选项的更多信息,请参阅 MongoDB 文档中的分区器配置
。有关示例代码,请参阅 示例:设置连接类型和选项。 -
"connectionType": "documentdb" as Sink
将 "connectionType": "documentdb"
用作连接器时可使用以下连接选项:
-
"uri"
:(必需)要在其中写入数据的 Amazon DocumentDB 主机,格式为mongodb://<host>:<port>
。 -
"database"
:(必需)要在其中写入数据的 Amazon DocumentDB 数据库。 -
"collection"
:(必需)要在其中写入数据的 Amazon DocumentDB 连接。 -
"username"
:(必需)Amazon DocumentDB 用户名。 -
"password"
:(必需)Amazon DocumentDB 密码。 -
"extendedBsonTypes"
:(可选)如果为true
,则在 Amazon DocumentDB 中写入数据时会启用扩展 BSON 类型。默认为true
。 -
"replaceDocument"
:(可选)如果为true
,则在保存包含_id
字段的数据集时会替换整个文档。如果为false
,则只会更新文档中与数据集中的字段匹配的字段。默认为true
。 -
"maxBatchSize"
:(可选)保存数据时的批量操作的最大批次大小。默认值为 512。
有关示例代码,请参阅 示例:设置连接类型和选项。
"connectionType": "dynamodb"
指定与 Amazon DynamoDB 的连接。
源连接和接收器连接的连接选项不同。
"connectionType": "dynamodb" with the ETL connector as Source
在使用 Amazon Glue DynamoDB ETL 连接器时,请使用以下连接选项并将 "connectionType": "dynamodb"
作为源:
-
"dynamodb.input.tableName"
:(必需)要从中读取数据的 DynamoDB 表格。 -
"dynamodb.throughput.read.percent"
:(可选)要使用的读取容量单位 (RCU) 的百分比。默认设置为“0.5”。可接受的值从“0.1”到“1.5”,包含这两个值。-
0.5
表示默认读取速率,这意味着 Amazon Glue 将尝试占用表的一半的读取容量。如果增加值超过0.5
,Amazon Glue 将增加请求速率;将值降低到0.5
以下将降低读取请求速率。(实际读取速率取决于 DynamoDB 表中是否存在统一键分配的等因素。) -
当 DynamoDB 表处于按需模式时,Amazon Glue 处理表的读取容量为 40000。要导出大型表,我们建议您将 DynamoDB 表切换为按需模式。
-
-
"dynamodb.splits"
:(可选)定义在读取时将此 DynamoDB 表分成多少个部分。默认设置为“1”。可接受的值从“1”到“1,000,000”,包含这两个值。-
1
表示没有并行度。我们强烈建议您使用以下公式指定更大的值以获得更好的性能。 -
我们建议您使用以下公式计算
numSlots
,并将其用作dynamodb.splits
。如果您需要更高的性能,我们建议您增加 DPU 数量以扩展任务。工件数量(
NumberOfWorkers
)在作业配置中设置。有关更多信息,请参阅在 Amazon Glue 中添加作业:启用自动扩展时,作业的可用工件数量可能会因工作负载而调整。就上下文而言,为 Spark 驱动程序保留了一个执行程序;其他执行程序用于处理数据。-
numExecutors =
-
NumberOfWorkers - 1
,如果WorkerType
为G.1X
或G.2X
-
MaximumCapacity * 2 - 1
如果WorkerType
是Standard
且 Amazon Glue 版本是 2.0 以上。(MaximumCapacity - 1) * 2 - 1
如果WorkerType
是Standard
且 Amazon Glue 版本是 1.0 及以下。
-
-
numSlotsPerExecutor =
-
numSlots = numSlotsPerExecutor * numExecutors
-
-
-
"dynamodb.sts.roleArn"
:(可选)用于跨账户访问的 IAM 角色 ARN。此参数适用于 Amazon Glue 1.0 或更高版本。 -
"dynamodb.sts.roleSessionName"
:(可选)STS 会话名称。默认设置为 “glue-dynamodb-read-sts-sessession”。此参数适用于 Amazon Glue 1.0 或更高版本。
以下代码示例演示了如何从 DynamoDB 表中读取(通过 ETL 连接器)以及向其写入数据。它们演示了如何从一个表读取数据并将数据写入其他表。
注意
Amazon Glue 支持从其他 Amazon 账户的 DynamoDB 表读取数据。有关更多信息,请参阅跨账户、跨区域访问 DynamoDB 表:
注意
DynamoDB ETL 读取器不支持筛选条件或下推谓词。
"connectionType": "dynamodb" with the Amazon Glue DynamoDB export connector as Source
除了Amazon Glue DnamoDB ETL 连接器还Amazon Glue提供了一个DynamoDB 导出连接器,该连接器会调用 DynamoDBExportTableToPointInTime
请求并以 Dyn amoDB JSON 格式将其存储在您提供的 Amazon S3 位置。 Amazon Glue然后通过从Amazon S3 导出位置读取数据创建 DynamicFrame 对象。
在 DynamoDB 表大小超过 80 GB 时,导出连接器的性能优于 ETL 连接器。此外,鉴于导出请求在 Amazon Glue 任务中的 Spark 进程之外执行,您可以启用 Amazon Glue 任务的弹性伸缩以节省导出请求期间的 DPU 使用量。借助导出连接器,您也无需为 Spark 执行程序并行度或 DynamoDB 吞吐量读取百分比配置拆分数。
在使用 Amazon Glue DynamoDB 导出连接器(仅适用于 Amazon Glue 版本 2.0 以上)时,使用以下连接选项并将 "connectionType": "dynamodb"用作源:
-
"dynamodb.export"
:(必需)字符串值:如果设置为
ddb
,将启用 Amazon Glue DynamoDB 导出连接器,其中在 Amazon Glue 任务期间将调用新的ExportTableToPointInTimeRequest
。新的导出将通过从dynamodb.s3.bucket
和dynamodb.s3.prefix
传递的位置生成。如果设置为
s3
,将启用 Amazon Glue DynamoDB 导出连接器但会跳过创建新的 DynamoDB 导出,而使用dynamodb.s3.bucket
和dynamodb.s3.prefix
作为该表以前导出的 Simple Storage Service (Amazon S3) 位置。
-
"dynamodb.tableArn"
:(必需)要从中读取数据的 DynamoDB 表格。 -
"dynamodb.unnestDDBJson"
:(可选)采用布尔值。如果设置为 true(真),则对导出中存在的 DynamoDB JSON 结构执行解除嵌套转换。默认值设置为 false。 -
"dynamodb.s3.bucket"
:(可选)指示将会执行 DynamoDBExportTableToPointInTime
进程的 Amazon S3 存储桶位置。导出的文件格式为 DynamoDB JSON。-
"dynamodb.s3.prefix"
:(可选)指示将用于存储 DynamoDBExportTableToPointInTime
负载的 Amazon S3 存储桶内的 Amazon S3 前缀位置。如果既未指定dynamodb.s3.prefix
,也未指定dynamodb.s3.bucket
,则这些值将默认为 Amazon Glue 任务配置中指定的临时目录位置。有关更多信息,请参阅 Amazon Glue 使用的特殊参数。 -
"dynamodb.s3.bucketOwner"
:指示跨账户 Amazon S3 访问所需的存储桶拥有者。
-
-
"dynamodb.sts.roleArn"
:(可选)跨账户访问和/或跨区域访问 DynamoDB 表时将会代入的 IAM 角色 ARN。注意:相同的 IAM 角色 ARN 将用于访问为ExportTableToPointInTime
请求指定的 Amazon S3 位置。 -
"dynamodb.sts.roleSessionName"
:(可选)STS 会话名称。默认设置为 “glue-dynamodb-read-sts-sessession”。
注意
DynamoDB 对调用 ExportTableToPointInTime
请求有特定的要求。有关更多信息,请参阅在 DynamoDB 中请求表导出。例如,表需要启用时间点恢复 (PITR) 才能使用此连接器。DynamoDB 连接器还支持在将 DynamoDB 导出到 Amazon S3 时进行 Amazon KMS 加密。在 Amazon Glue 任务配置中指定安全性配置,将为 DynamoDB 导出启用 Amazon KMS 加密。KMS 密钥必须与 Simple Storage Service (Amazon S3) 存储桶位于同一区域。
请注意,您需要支付 DynamoDB 导出的额外费用和 Simple Storage Service (Amazon S3) 存储成本。任务运行完成后,Simple Storage Service (Amazon S3) 中的导出数据仍然存在,因此您无需其他 DynamoDB 导出即可重复使用这些数据。使用此连接器的一个要求是该表启用了 point-in-time 恢复 (PITR)。
DynamoDB ETL 连接器或导出连接器不支持在 DynamoDB 源应用筛选条件或下推谓词。
以下代码示例演示如何进行读取(通过导出连接器)以及打印分区数量。
以下示例演示如何从具有 dynamodb
分类的 Amazon Glue 数据目录表进行读取(通过导出连接器)以及打印分区数量:
遍历 DynamoDB JSON 结构
使用 Amazon Glue DynamoDB 导出连接器进行 DynamoDB 导出时可以生成具有特定嵌套结构的 JSON 文件。有关更多信息,请参阅数据对象。 Amazon Glue提供 DynamicFrame 转换,可以将此类结构解套为下游应用程序的 easier-to-use 表单。
您可以通过以下两种方式之一调用该转换。第一种方法是通过 Amazon Glue DynamoDB 导出连接器传递的布尔标志。第二种方式是通过调用转换函数本身。
以下代码示例演示如何使用 Amazon Glue DynamoDB 导出连接器、调用解除嵌套命令,以及打印分区数量:
转换的另一种调用是通过单独的 DynamicFrame 函数调用。有关更多信息,请参见 Python DynamicFrame 类和适用于 ScAmazon ala 的 Glu DynamicFrame e Scala 类。
"connectionType": "dynamodb" with the ETL connector as Sink
将 "connectionType": "dynamodb"
用作连接器时可使用以下连接选项:
-
"dynamodb.output.tableName"
:(必需)要写入的 DynamoDB 表。 -
"dynamodb.throughput.write.percent"
:(可选)要使用的写入容量单位(WCU)的百分比。默认设置为“0.5”。可接受的值从“0.1”到“1.5”,包含这两个值。-
0.5
表示默认写入速率,这意味着 Amazon Glue 将尝试占用表的一半的写入容量。如果增加值超过 0.5,Amazon Glue 将增加请求速率;将值降低到 0.5 以下将降低写入请求速率。(实际写入速率取决于 DynamoDB 表中是否存在统一键分配等因素)。 -
当 DynamoDB 表处于按需模式时,Amazon Glue 处理表的写入容量为
40000
。要导入大型表,我们建议您将 DynamoDB 表切换为按需模式。
-
-
"dynamodb.output.numParallelTasks"
:(可选)定义同时向 DynamoDB 写入数据的并行任务数。用于计算每个 Spark 任务的允许 WCU。如果您不想控制这些详细信息,则无需指定此参数。-
permissiveWcuPerTask = TableWCU * dynamodb.throughput.write.percent / dynamodb.output.numParallelTasks
-
如果您未指定此参数,则按照以下公式自动计算每个 Spark 任务的允许 WCU:
-
numPartitions = dynamicframe.getNumPartitions()
-
numExecutors =
-
(DPU - 1) * 2 - 1
,如果WorkerType
为Standard
-
(NumberOfWorkers - 1)
,如果WorkerType
为G.1X
或G.2X
-
-
numSlotsPerExecutor =
-
4
,如果WorkerType
为Standard
-
8
,如果WorkerType
为G.1X
-
16
,如果WorkerType
为G.2X
-
-
numSlots = numSlotsPerExecutor * numExecutors
-
numParallelTasks = min(numPartitions, numSlots)
-
-
示例 1。DPU=10, WorkerType=标准。输入 DynamicFrame 具有 100 个 RD 分区。
-
numPartitions = 100
-
numExecutors = (10 - 1) * 2 - 1 = 17
-
numSlots = 4 * 17 = 68
-
numParallelTasks = min(100, 68) = 68
-
-
示例 2。DPU=10, WorkerType=标准。输入 DynamicFrame 具有 20 个 RD 分区。
-
numPartitions = 20
-
numExecutors = (10 - 1) * 2 - 1 = 17
-
numSlots = 4 * 17 = 68
-
numParallelTasks = min(20, 68) = 20
-
-
-
"dynamodb.output.retry"
:(可选)定义存在 DynamoDB 中的ProvisionedThroughputExceededException
时我们执行的重试次数。默认设置为“10”。 -
"dynamodb.sts.roleArn"
:(可选)用于跨账户访问的 IAM 角色 ARN。 -
"dynamodb.sts.roleSessionName"
:(可选)STS 会话名称。默认设置为 “glue-dynamodb-write-sts-sessession”。
注意
DynamoDB 写入器在 Amazon Glue 版本 1.0 或更高版本中受支持。
注意
Amazon Glue 支持将数据写入其他 Amazon 账户的 DynamoDB 表。有关更多信息,请参阅跨账户、跨区域访问 DynamoDB 表:
以下代码示例说明如何从 DynamoDB 表中读取以及向其写入数据。它们演示了如何从一个表读取数据并将数据写入其他表。
"connectionType": "kafka"
指定与 Kafka 集群或 Amazon Managed Streaming for Apache Kafka 集群的连接。
您可以在 GlueContext
对象下面使用以下方法,利用来自 Kafka 流式传输源的记录:
-
getCatalogSource
-
getSource
-
getSourceWithFormat
-
createDataFrameFromOptions
如果您使用 getCatalogSource
,则任务具有数据目录数据库和表名称信息,将其用于获取一些基本参数,以便从 Apache Kafka 流读取数据。如果您使用 getSource
、getSourceWithFormat
或 createDataFrameFromOptions
,则您必须明确指定以下参数:
您可以配合使用 connectionOptions
与 getSource
或createDataFrameFromOptions
、options
与 getSourceWithFormat
、或者 additionalOptions
与 getCatalogSource
,以指定这些选项。
"connectionType": "kafka"
可使用以下连接选项:
-
bootstrap.servers
(必需)引导服务器 URL 的列表,例如,作为b-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094
。此选项必须在 API 调用中指定,或在数据目录的表元数据中定义。 -
security.protocol
(必填)用于与代理通信的协议。可能的值为"SSL"
或"PLAINTEXT"
。 -
topicName
(必填)要订阅的以逗号分隔的主题列表。您必须指定"topicName"
、"assign"
或"subscribePattern"
中的其中一个,且只能指定一个。 -
"assign"
:(必填)用于指定要使用的TopicPartitions
的 JSON 字符串。您必须指定"topicName"
、"assign"
或"subscribePattern"
中的其中一个,且只能指定一个。例如:“{"topicA":[0,1],"topicB":[2,4]}”
-
"subscribePattern"
:(必需)标识要订阅的主题列表的 Java 正则表达式字符串。您必须指定"topicName"
、"assign"
或"subscribePattern"
中的其中一个,且只能指定一个。示例:“topic.*”
-
classification
(可选) -
delimiter
(可选) -
"startingOffsets"
:(可选)Kafka 主题中数据读取的起始位置。可能的值为"earliest"
或"latest"
。默认值为"latest"
。 -
"endingOffsets"
:(可选)批处理查询结束时的终点。可能值为"latest"
,或者为每个TopicPartition
指定结束偏移的 JSON 字符串。对于 JSON 字符串,格式为
{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}
。偏移值-1
表示"latest"
。 -
"pollTimeoutMs"
:(可选)Spark 任务执行程序中,从 Kafka 轮询数据的超时时间(以毫秒为单位)。默认值为512
。 -
"numRetries"
:(可选)无法获取 Kafka 偏移时的重试次数。默认值为3
。 -
"retryIntervalMs"
:(可选)重试获取 Kafka 偏移时的等待时间(以毫秒为单位)。默认值为10
。 -
"maxOffsetsPerTrigger"
:(可选)每个触发间隔处理的最大偏移数的速率限制。指定的总偏移数跨不同卷的topicPartitions
按比例分割。默认值为 null,这意味着使用者读取所有偏移,直到已知的最新偏移。 -
"minPartitions"
:(可选)从 Kafka 读取数据的必需最小分区数。默认值为 null,这意味着 Spark 分区数等于 Kafka 分区数。 -
"includeHeaders"
:(可选)是否包含 Kafka 标头。当选项设置为“true”时,数据输出将包含一个名为“glue_streaming_kafka_headers”的附加列,类型为Array[Struct(key: String, value: String)]
。默认值为“false”。此选项仅适用于 Amazon Glue 版本 3.0 或更高版本。 -
"schema"
:(当 inferSchema 设为 false 时为必填)用于处理有效工作负载的架构。如果分类为avro
,则提供的架构必须采用 Avro 架构格式。如果分类不是avro
,则提供的架构必须采用 DDL 架构格式。以下是一些架构示例。
-
"inferSchema"
:(可选)默认值为“false”。如果设置为“true”,则会在运行时检测到foreachbatch
内的有效工作负载中的架构。 -
"avroSchema"
:(已弃用)用于指定 Avro 数据架构(使用 Avro 格式时)的参数。此参数现已被弃用。使用schema
参数。 -
"addRecordTimestamp"
:(可选)当选项设置为 'true' 时,数据输出将包含一个名为 "__src_timestamp" 的附加列,表示主题收到相应记录的时间。默认值为‘false’。4.0 或更高 Amazon Glue 版本支持此选项。 -
"emitConsumerLagMetrics"
:(可选)当该选项设置为 'true' 时,对于每个批次,它将向 'true' 发布主题接收到的最早记录与该记录到达之间的时长指标 CloudWatch。Amazon Glue指标名称为 “glue.streaming”。 maxConsumerLagInMs”。默认值为‘false’。4.0 或更高 Amazon Glue 版本支持此选项。
"connectionType": "kinesis"
为 Amazon Kinesis Data Streams 指定连接选项。
您可以使用存储在数据目录表中的信息从 Amazon Kinesis 数据流读取数据,或提供信息直接访问数据流。如果直接访问数据流,请使用这些选项提供有关如何访问数据流的信息。
如果您使用 getCatalogSource
或 create_data_frame_from_catalog
使用来自 Kinesis 串流源的记录,则任务具有数据目录数据库和表名称信息,将其用于获取一些基本参数,以便从 Kinesis 串流源读取数据。如果使用 getSource
、getSourceWithFormat
、createDataFrameFromOptions
或 create_data_frame_from_options
,则必须使用此处描述的连接选项指定这些基本参数。
您可以使用 GlueContext
类中指定方法的以下参数为 Kinesis 指定连接选项。
-
Scala
-
connectionOptions
:与getSource
、createDataFrameFromOptions
结合使用 -
additionalOptions
:与getCatalogSource
结合使用 -
options
:与getSourceWithFormat
结合使用
-
-
Python
-
connection_options
:与create_data_frame_from_options
结合使用 -
additional_options
:与create_data_frame_from_catalog
结合使用 -
options
:与getSource
结合使用
-
为 Kinesis 串流数据源使用以下连接选项:
-
streamARN
(必需)Kinesis 数据流的 ARN。 -
classification
(可选) -
delimiter
(可选) -
"startingPosition"
:(可选)Kinesis 数据流中数据读取的起始位置。可能的值为"latest"
、"trim_horizon"
或"earliest"
。默认值为"latest"
。 -
"awsSTSRoleARN"
:(可选)要使用 Amazon Security Token Service(Amazon STS) 代入的角色的 Amazon Resource Name (ARN)。此角色必须拥有针对 Kinesis 数据流进行描述或读取记录操作的权限。在访问其他账户中的数据流时,必须使用此参数。与"awsSTSSessionName"
结合使用。 -
"awsSTSSessionName"
:(可选)要使用 Amazon STS 代入角色的会话的标识符。访问其他账户中的数据流时,必须使用此参数。与"awsSTSRoleARN"
结合使用。 -
"maxFetchTimeInMs"
:(可选)任务执行程序从每个分区的 Kinesis 数据流中获取记录所花费的最长时间,以毫秒(ms)为单位。默认值为1000
。 -
"maxFetchRecordsPerShard"
:(可选)Kinesis 数据流中每个分区要获取的最大记录数。默认值为100000
。 -
"maxRecordPerRead"
:(可选)每项getRecords
操作中要从 Kinesis 数据流获取的最大记录数。默认值为10000
。 -
"addIdleTimeBetweenReads"
:(可选)在两项连续getRecords
操作之间添加时间延迟。默认值为"False"
。此选项仅适用于 Glue 版本 2.0 及更高版本。 -
"idleTimeBetweenReadsInMs"
:(可选)两项连续getRecords
操作之间的最短时间延迟,以毫秒为单位。默认值为1000
。此选项仅适用于 Glue 版本 2.0 及更高版本。 -
"describeShardInterval"
:(可选)两个ListShards
API 调用之间的最短时间间隔,供您的脚本考虑重新分区。有关更多信息,请参阅《Amazon Kinesis Data Streams 开发人员指南》中的重新分区策略。默认值为1s
。 -
"numRetries"
:(可选)Kinesis Data Streams API 请求的最大重试次数。默认值为3
。 -
"retryIntervalMs"
:(可选)重试 Kinesis Data Streams API 调用之前的冷却时间(以毫秒为单位)。默认值为1000
。 -
"maxRetryIntervalMs"
:(可选)Kinesis Data Streams API 调用的两次重试之间的最长冷却时间(以毫秒为单位)。默认值为10000
。 -
"avoidEmptyBatches"
:(可选)在批处理开始之前检查 Kinesis 数据流中是否有未读数据,避免创建空白微批处理任务。默认值为"False"
。 -
"schema"
:(当 inferSchema 设为 false 时为必填)用于处理有效工作负载的架构。如果分类为avro
,则提供的架构必须采用 Avro 架构格式。如果分类不是avro
,则提供的架构必须采用 DDL 架构格式。以下是一些架构示例。
-
"inferSchema"
:(可选)默认值为“false”。如果设置为“true”,则会在运行时检测到foreachbatch
内的有效工作负载中的架构。 -
"avroSchema"
:(已弃用)用于指定 Avro 数据架构(使用 Avro 格式时)的参数。此参数现已被弃用。使用schema
参数。 -
"addRecordTimestamp"
:(可选)当选项设置为 'true' 时,数据输出将包含一个名为 "__src_timestamp" 的附加列,表示数据流收到相应记录的时间。默认值为‘false’。4.0 或更高 Amazon Glue 版本支持此选项。 -
"emitConsumerLagMetrics"
:(可选)当该选项设置为 'true' 时,对于每个批次,它将向 'true' 发布数据流接收到的最早记录与该记录到达之间的时长指标 CloudWatch。Amazon Glue指标名称为 “glue.streaming”。 maxConsumerLagInMs”。默认值为‘false’。4.0 或更高 Amazon Glue 版本支持此选项。
"connectionType": "mongodb"
指定与 MongoDB 的连接。源连接和接收器连接的连接选项不同。
"connectionType": "mongodb" as Source
将 "connectionType": "mongodb"
用作源时可使用以下连接选项:
-
"uri"
:(必需)要从中读取数据的 MongoDB 主机,格式为mongodb://<host>:<port>
。 -
"database"
:(必需)要从中读取数据的 MongoDB 数据库。当在您的任务脚本中调用glue_context.create_dynamic_frame_from_catalog
时,此选项还可以在additional_options
中传递。 -
"collection"
:(必需)要从中读取数据的 MongoDB 集合。当在您的任务脚本中调用glue_context.create_dynamic_frame_from_catalog
时,此选项还可以在additional_options
中传递。 -
"username"
:(必需)MongoDB 用户名。 -
"password"
:(必需)MongoDB 密码。 -
"ssl"
:(可选)如果为true
,则启动 SSL 连接。默认为false
。 -
"ssl.domain_match"
:(可选)如果为true
,且ssl
为true
,则执行域匹配检查。默认为true
。 -
"batchSize"
:(可选)每个批处理返回的文档数量,在内部批处理的游标中使用。 -
"partitioner"
:(可选)从 MongoDB 中读取输入数据的分区器的类名称。该连接器提供以下分区器:-
MongoDefaultPartitioner
(默认值) -
MongoSamplePartitioner
(需要 MongoDB 3.2 或更高版本) -
MongoShardedPartitioner
-
MongoSplitVectorPartitioner
-
MongoPaginateByCountPartitioner
-
MongoPaginateBySizePartitioner
-
-
"partitionerOptions"
:(可选)指定分区器的选项。各个分区器支持的选项如下:-
MongoSamplePartitioner
:partitionKey
,partitionSizeMB
,samplesPerPartition
-
MongoShardedPartitioner
:shardkey
-
MongoSplitVectorPartitioner
:partitionKey
,partitionSizeMB
-
MongoPaginateByCountPartitioner
:partitionKey
,numberOfPartitions
-
MongoPaginateBySizePartitioner
:partitionKey
,partitionSizeMB
有关这些选项的更多信息,请参阅 MongoDB 文档中的分区器配置
。有关示例代码,请参阅 示例:设置连接类型和选项。 -
"connectionType": "mongodb" as Sink
将 "connectionType": "mongodb"
用作连接器时可使用以下连接选项:
-
"uri"
:(必需)要在其中写入数据的 MongoDB 主机,格式为mongodb://<host>:<port>
。 -
"database"
:(必需)要在其中写入数据的 MongoDB 数据库。 -
"collection"
:(必需)要在其中写入数据的 MongoDB 集合。 -
"username"
:(必需)MongoDB 用户名。 -
"password"
:(必需)MongoDB 密码。 -
"ssl"
:(可选)如果为true
,则启动 SSL 连接。默认为false
。 -
"ssl.domain_match"
:(可选)如果为true
,且ssl
为true
,则执行域匹配检查。默认为true
。 -
"extendedBsonTypes"
:(可选)如果为true
,则在 MongoDB 中写入数据时会允许扩展 BSON 类型。默认为true
。 -
"replaceDocument"
:(可选)如果为true
,则在保存包含_id
字段的数据集时会替换整个文档。如果为false
,则只会更新文档中与数据集中的字段匹配的字段。默认为true
。 -
"maxBatchSize"
:(可选)保存数据时的批量操作的最大批次大小。默认值为 512。
有关示例代码,请参阅 示例:设置连接类型和选项。
"connectionType": "orc"
指定与 Amazon S3 中以 Apache Hive 优化的行列式(ORC)
"connectionType": "orc"
可使用以下连接选项:
-
paths
:(必需)要从中读取数据的 Amazon S3 路径的列表。 -
(其他选项名称/值对):任何其他选项(包括格式化选项)将直接传递给 SparkSQL
DataSource
。有关更多信息,请参阅适用于 Spark 的 Amazon Redshift 数据源。
"connectionType": "parquet"
指定与 Amazon S3 中以 Apache Parquet
"connectionType": "parquet"
可使用以下连接选项:
-
paths
:(必需)要从中读取数据的 Amazon S3 路径的列表。 -
(其他选项名称/值对):任何其他选项(包括格式化选项)将直接传递给 SparkSQL
DataSource
。有关更多信息,请参阅 GitHub 网站上的 Amazon Redshift 数据源。
"connectionType": "s3"
指定与 Amazon S3 的连接。
"connectionType": "s3"
可使用以下连接选项:
-
"paths"
:(必需)要从中读取数据的 Amazon S3 路径的列表。 -
"exclusions"
:(可选)包含要排除的 Unix 样式 glob 模式的 JSON 列表的字符串。例如,"[\"**.pdf\"]"
会排除所有 PDF 文件。有关 Amazon Glue 支持的 glob 语法的更多信息,请参阅包含和排除模式。 -
"compressionType"
或 "compression
":(可选)指定数据压缩方式。使用适用于 Amazon S3 源的"compressionType"
以及适用于 Amazon S3 目标的"compression"
。通常,如果数据有标准文件扩展名,则不需要指定。可能的值为"gzip"
和"bzip2"
。 -
"groupFiles"
:(可选)当输入包含超过 50,000 个文件时,默认启用文件分组。当少于 50,000 个文件时,若要启用分组,请将此参数设置为"inPartition"
。当超过 50,000 个文件时,若要禁用分组,请将此参数设置为"none"
。 -
"groupSize"
:(可选)目标组大小(以字节为单位)。默认值根据输入数据大小和群集大小进行计算。当少于 50,000 个输入文件时,"groupFiles"
必须设置为"inPartition"
,此选项才能生效。 -
"recurse"
:(可选)如果设置为 true,则以递归方式读取指定路径下的所有子目录中的文件。 -
"maxBand"
:(可选,高级)此选项控制s3
列表可能保持一致的持续时间(以毫秒为单位)。当使用JobBookmarks
来表明 Amazon S3 最终一致性时,将专门跟踪修改时间戳在最后maxBand
毫秒内的文件。大多数用户不需要设置此选项。默认值为 900000 毫秒或 15 分钟。 -
"maxFilesInBand"
:(可选,高级)此选项指定在最后maxBand
秒内可保存的最大文件数量。如果超过此值,额外的文件将会跳过,且只能在下一次作业运行中处理。大多数用户不需要设置此选项。 -
"isFailFast"
:(可选)此选项用于确定 Amazon Glue ETL 任务是否导致读取器解析异常。如果设置为true
,并且 Spark 任务的四次重试无法正确解析数据,则任务会快速失败。
JDBC connectionType 值
JDBC connectionType 值包括:
-
"connectionType": "sqlserver"
:指定与 Microsoft SQL Server 数据库的连接。 -
"connectionType": "mysql"
:指定与 MySQL 数据库的连接。 -
"connectionType": "oracle"
:指定与 Oracle 数据库的连接。 -
"connectionType": "postgresql"
:指定与 PostgreSQL 数据库的连接。 -
"connectionType": "redshift"
:指定与 Amazon Redshift 数据库的连接。有关更多信息,请参阅Redshift 连接:
下表列出了 Amazon Glue 支持的 JDBC 驱动程序版本。
产品 | Glue 4.0 的 JDBC 驱动程序版本 | Glue 3.0 的 JDBC 驱动程序版本 | Glue 0.9、1.0、2.0 的 JDBC 驱动程序版本 |
---|---|---|---|
Microsoft SQL Server | 9.4.0 | 7.x | 6.x |
MySQL | 8.0.23 | 8.0.23 | 5.1 |
Oracle 数据库 | 21.7 | 21.1 | 11.2 |
PostgreSQL | 42.3.6 | 42.2.18 | 42.1.x |
MongoDB | 4.7.2 | 4.0.0 | 2.0.0 |
Amazon Redshift | redshift-jdbc42-2.1.0.9 | redshift-jdbc41-1.2.12.1017 | redshift-jdbc41-1.2.12.1017 |
如果您已经定义了 JDBC 连接,则可以重复使用其中定义的配置属性,例如:url、用户和密码;不必在代码中将它们指定为连接选项。为此,请使用以下连接属性:
-
"useConnectionProperties"
:设置为 "true" 以表示您要使用连接中的配置。 -
"connectionName"
:输入要从中检索配置的连接名称,必须在与作业相同的区域中定义连接。
将这些连接选项与 JDBC 连接结合使用:
-
"url"
:(必填)数据库的 JDBC URL。 -
"dbtable"
:要从中进行读取的数据库表。对于在数据库中支持架构的 JDBC 数据存储,指定schema.table-name
。如果未提供架构,则使用默认的“public”架构。 -
"user"
:(必需)在连接时使用的用户名。 -
"password"
:(必填)在连接时使用的密码。 -
(可选)以下选项允许您提供自定义 JDBC 驱动程序。如果必须使用 Amazon Glue 本身不支持的驱动程序,请使用这些选项。
ETL 作业可以为数据源和目标使用不同的 JDBC 驱动程序版本,即使源和目标是相同的数据库产品也是如此。这允许您在不同版本的源数据库和目标数据库之间迁移数据。要使用这些选项,您必须首先将 JDBC 驱动程序的 JAR 文件上传到 Amazon S3。
-
"customJdbcDriverS3Path"
:自定义 JDBC 驱动程序的 Amazon S3 路径。 -
"customJdbcDriverClassName"
:JDBC 驱动程序的类名。
-
-
"bulksize"
:(可选)用于配置并行插入以加速批量加载到 JDBC 目标。为写入或插入数据时要使用的并行度指定整数值。此选项有助于提高写入数据库(如 Arch User Repository (AUR))的性能。 -
"sampleQuery"
:(可选)用于采样的自定义 SQL 查询语句。默认情况下,查询示例由单个执行者执行。如果您读取的是大型数据集,则可能需要启用 JDBC 分区才能并行查询表。有关更多信息,请参阅 并行读取 JDBC 表。要将sampleQuery
与 JDBC 分区一同使用,也可将enablePartitioningForSampleQuery
设置为 true。 -
"enablePartitioningForSampleQuery"
:(可选)默认情况下,此选项为 false。如果想将sampleQuery
与分区的 JDBC 表一同使用,则必须使用此设置。如果设置为 true,sampleQuery
必须以“where”或“and”结尾,以便于 Amazon Glue 追加分区条件。请参阅下面的示例。 -
"sampleSize"
:(可选)限制查询示例返回的行数。仅当enablePartitioningForSampleQuery
为 true 时有用。如果未启用分区,则应直接在sampleQuery
中添加“limit x”以限制大小。例 在不进行分区的情况下使用 samplQuery
以下代码示例演示了如何在不进行分区的情况下使用
sampleQuery
。//A full sql query statement. val query = "select name from $tableName where age > 0 limit 1" val connectionOptions = JsonOptions(Map( "url" -> url, "dbtable" -> table, "user" -> user, "password" -> password, "sampleQuery" -> query )) val dyf = glueContext.getSource("mysql", connectionOptions) .getDynamicFrame()
例 将 sampleQuery 与 JDBC 分区一起使用
以下代码示例演示了如何将
sampleQuery
与 JDBC 分区一起使用。//note that the query should end with "where" or "and" if use with JDBC partitioning. val query = "select name from $tableName where age > 0 and" //Enable JDBC partitioning by setting hashfield. //to use sampleQuery with partitioning, set enablePartitioningForSampleQuery. //use sampleSize to limit the size of returned data. val connectionOptions = JsonOptions(Map( "url" -> url, "dbtable" -> table, "user" -> user, "password" -> password, "hashfield" -> primaryKey, "sampleQuery" -> query, "enablePartitioningForSampleQuery" -> true, "sampleSize" -> "1" )) val dyf = glueContext.getSource("mysql", connectionOptions) .getDynamicFrame()
如果是 Amazon Redshift 连接类型,用于 JDBC 的连接选项中包含的所有其他选项中包含的所有其他选项名称/值对(包括格式选项)将直接传递到底层 SparksQL 连接类型 DataSource。有关更多信息,请参阅适用于 Spark 的 Amazon Redshift 数据源
以下代码示例演示了如何使用自定义 JDBC 驱动程序读取和写入 JDBC 数据库。这些示例演示了如何从一个版本的数据库产品读取和写入同一产品的更高版本。
注意
Amazon Glue 作业在一次运行期间仅与一个子网关联。这可能会影响您使用同一作业连接到多个数据来源。此行为不仅限于 JDBC 源。
自定义和 Amazon Web Services Marketplace connectionType 值
这些功能包括:
-
"connectionType": "marketplace.athena"
:指定与 Amazon Athena 数据存储的连接。连接使用来自 Amazon Web Services Marketplace 的连接器。 -
"connectionType": "marketplace.spark"
:指定与 Apache Spark 数据存储的连接。连接使用来自 Amazon Web Services Marketplace 的连接器。 -
"connectionType": "marketplace.jdbc"
:指定与 JDBC 数据存储的连接。连接使用来自 Amazon Web Services Marketplace 的连接器。 -
"connectionType": "custom.athena"
:指定与 Amazon Athena 数据存储的连接。连接使用您上传到 Amazon Glue Studio 的自定义连接器。 -
"connectionType": "custom.spark"
:指定与 Apache Spark 数据存储的连接。连接使用您上传到 Amazon Glue Studio 的自定义连接器。 -
"connectionType": "custom.jdbc"
:指定与 JDBC 数据存储的连接。连接使用您上传到 Amazon Glue Studio 的自定义连接器。
适用于类型 custom.jdbc 或 marketplace.jdbc 的连接选项
-
className
– 字符串,必需,驱动程序类名称。 -
connectionName
– 字符串,必需,与连接器关联的连接的名称。 -
url
– 字符串,必需,用于建立与数据源的连接且带占位符(${}
)的 JDBC URL。占位符${secretKey}
替换为 Amazon Secrets Manager 中同名的密钥。有关构建 URL 的详细信息,请参阅数据存储文档。 -
secretId
或user/password
– 字符串,必需,用于检索 URL 的凭证。 -
dbTable
或query
– 字符串,必需,从中获取数据的表或 SQL 查询。您可以指定dbTable
或query
,但不能同时指定两者。 -
partitionColumn
– 字符串,可选,用于分区的整数列的名称。此选项仅在包含lowerBound
、upperBound
和numPartitions
时有效。此选项的工作方式与 Spark SQL JDBC 阅读器中的工作方式相同。有关更多信息,请参阅《Apache Spark SQL DataFrames 和《数据集指南》中的 JDB 转换到其他数据库。 lowerBound
和upperBound
值用于确定分区步长,而不是用于筛选表中的行。对表中的所有行进行分区并返回。注意
使用查询(而不是表名称)时,您应验证查询是否适用于指定的分区条件。例如:
-
如果您的查询格式为
"SELECT col1 FROM table1"
,则在使用分区列的查询结尾附加WHERE
子句,以测试查询。 -
如果您的查询格式为
SELECT col1 FROM table1 WHERE col2=val"
,则通过AND
和使用分区列的表达式扩展WHERE
子句,以测试查询。
-
-
lowerBound
– 整数,可选,用于确定分区步长的最小partitionColumn
值。 -
upperBound
– 整数,可选,用于确定分区步长的最大partitionColumn
值。 -
numPartitions
– 整数,可选,分区数。此值以及lowerBound
(包含)和upperBound
(排除)为用于拆分partitionColumn
而生成的WHERE
子句表达式构成分区步长。重要
请注意分区的数量,因为分区过多可能会导致外部数据库系统出现问题。
-
filterPredicate
– 字符串,可选,用于筛选源数据的额外条件子句。例如:BillingCity='Mountain View'
使用查询(而不是表名称)时,您应验证查询是否适用于指定的
filterPredicate
。例如:-
如果您的查询格式为
"SELECT col1 FROM table1"
,则在使用筛选条件谓词的查询结尾附加WHERE
子句,以测试查询。 -
如果您的查询格式为
"SELECT col1 FROM table1 WHERE col2=val"
,则通过AND
和使用筛选条件谓词的表达式扩展WHERE
子句,以测试查询。
-
-
dataTypeMapping
– 目录,可选,用于构建从 JDBC 数据类型到 Glue 数据类型的映射的自定义数据类型映射。例如,选项"dataTypeMapping":{"FLOAT":"STRING"}
会通过调用驱动程序的ResultSet.getString()
方法,将 JDBC 类型FLOAT
的数据字段映射到 JavaString
类型,并将其用于构建 Amazon Glue 记录。ResultSet
对象由每个驱动程序实现,因此行为特定于您使用的驱动程序。请参阅 JDBC 驱动程序的文档,了解驱动程序执行转换的方式。 -
目前受支持的 Amazon Glue 数据类型包括:
-
DATE
-
STRING
-
TIMESTAMP
-
INT
-
FLOAT
-
LONG
-
BIGDECIMAL
-
BYTE
-
SHORT
-
DOUBLE
支持的 JDBC 数据类型为 Java8 java.sql.types
。 默认数据类型映射(从 JDBC 到 Amazon Glue)如下:
-
DATE -> DATE
-
VARCHAR -> STRING
-
CHAR -> STRING
-
LONGNVARCHAR -> STRING
-
TIMESTAMP -> TIMESTAMP
-
INTEGER -> INT
-
FLOAT -> FLOAT
-
REAL -> FLOAT
-
BIT -> BOOLEAN
-
BOOLEAN -> BOOLEAN
-
BIGINT -> LONG
-
DECIMAL -> BIGDECIMAL
-
NUMERIC -> BIGDECIMAL
-
TINYINT -> SHORT
-
SMALLINT -> SHORT
-
DOUBLE -> DOUBLE
如果将自定义数据类型映射与选项
dataTypeMapping
结合使用,则可以覆盖默认数据类型映射。只有dataTypeMapping
选项中列出的 JDBC 数据类型会受到影响;默认映射适用于所有其他 JDBC 数据类型。如果需要,您可以为其他 JDBC 数据类型添加映射。如果默认映射或自定义映射中均未包含 JDBC 数据类型,则数据类型默认转换为 Amazon GlueSTRING
数据类型。 -
以下 Python 代码示例演示了如何使用 Amazon Web Services Marketplace JDBC 驱动程序从 JDBC 数据库读取数据。它演示了如何从数据库读取数据并将数据写入 S3 位置。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://
<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
适用于类型 custom.athena 或 marketplace.athena 的连接选项
-
className
– 字符串,必需,驱动程序类名称。当您使用 AthenaCloudWatch 连接器时,此参数值是类名称(例如)的前缀(例如"com.amazonaws.athena.connectors"
)的前缀。AthenaCloudWatch 连接器由两个类组成:元数据处理程序和记录处理程序。如果您在此处提供通用前缀,则 API 会根据该前缀加载正确的类。 -
tableName
— 字符串,必需,要读取 CloudWatch 的日志流的名称。此代码段使用特别视图名称all_log_streams
,这意味着返回的动态数据框将包含日志组中所有日志流的数据。 -
schemaName
— 字符串,必需,要从中读取数据。 CloudWatch 例如,/aws-glue/jobs/output
。 -
connectionName
– 字符串,必需,与连接器关联的连接的名称。
有关此连接器的其他选项,请参阅上的 Amazon Amazon Amazon Athena CloudWatch 连接器自述
以下 Python 代码示例演示了如何从使用 Amazon Web Services Marketplace 连接器的 Athena 数据存储读取数据。它演示了如何从 Athena 读取数据并将数据写入 S3 位置。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://
<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
适用于类型 custom.spark 或 marketplace.spark 的连接选项
-
className
– 字符串,必需,连接器类名称。 -
secretId
– 字符串,可选,用于检索连接器连接的凭证。 -
connectionName
– 字符串,必需,与连接器关联的连接的名称。 -
其他选项取决于数据存储。例如, OpenSearch 配置选项以前缀
es
,正如适用于 Apache Hadoop 的 Elasticsearch文档中所述。Spark 与 Snowflake 的连接使用 sfUser
和sfPassword
等连接,正如《连接 Snowflake》指南中的使用 Spark 连接器所述。
以下 Python 代码示例演示了如何从使用marketplace.spark
连接的 OpenSearch 数据存储读取数据。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://
<Amazon endpoint>
", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<Amazon endpoint>
","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()