Amazon Glue 可视化任务 API(预览版) - Amazon Glue Studio
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

Amazon Glue 可视化任务 API(预览版)

可视化任务 API 目前为 Amazon Glue 的预览版,可能会发生变化。

Amazon Glue 提供了一个 API,允许客户使用 Amazon Glue API 从表示 DAG 的 JSON 对象创建数据集成任务。然后,客户可以使用 Amazon Glue Studio 中的可视化编辑器来处理这些任务。

API 设计和 CRUD API

创建和更新任务 API 现在支持额外的可选参数 codeGenConfigurationNodes。为此字段提供非空 json 结构将导致在 Amazon Glue Studio 中为创建的任务注册 DAG 并生成关联代码。任务创建时此字段的空值或空字符串将被忽略。

codeGenConfigurationNodes 字段的更新可通过更新任务 Amazon Glue API 以类似于创建任务的方式完成。整个字段应在更新任务中指定,其中 DAG 已根据需要更改。提供的空值将被忽略,并且不会执行 DAG 更新。空结构或字符串将导致 codeGenConfigurationNodes 设置为空,并删除以前的任何 DAG。Get-job API 将返回 DAG(如果存在)。删除任务 API 还需要删除任何关联的 DAG。

开始使用

请遵循 SDK 引导

要创建任务,请使用 createJob 函数。CreateJobRequest 输入将有一个额外的字段“codeGenConfigurationNodes”,您可以从中获取并指定 JSON 中的 DAG 对象。

请记住以下事项:

  • “codeGenConfigurationNodes”字段是 nodeId 到节点的映射。

  • 每个节点都以一个标识其节点类型的键开头。

  • 由于节点只能是一种类型,因此只能指定一个键。

  • 输入字段包含当前节点的父节点。

以下是 createJob 输入的 JSON 表示。

{ "Name":"myjob1", "Role":"arn:aws:iam::253723508848:role/myrole", "Description":"", "GlueVersion":"2.0", "Command":{ "Name":"glueetl", "ScriptLocation":"s3://myscripts/myjob1.py", "PythonVersion":"3" }, "MaxRetries":3, "Timeout":2880, "ExecutionProperty":{ "MaxConcurrentRuns":1 }, "NotificationProperty":{}, "DefaultArguments":{ "--class":"GlueApp", "--job-language":"python", "--job-bookmark-option":"job-bookmark-enable", "--TempDir":"s3://assets/temporary/", "--enable-metrics":"true", "--enable-continuous-cloudwatch-log":"true", "--enable-spark-ui":"true", "--spark-event-logs-path":"s3://assets/sparkHistoryLogs/", "--encryption-type":"sse-s3", "--enable-glue-datacatalog":"true" }, "Tags":{}, "DeveloperMode":false, "WorkerType":"G.1X", "NumberOfWorkers":10, "CodeGenConfigurationNodes":{ "node-1":{ "S3CatalogSource":{ "Database":"database", "Name":"S3 bucket", "Table":"table1" } }, "node-2":{ "ApplyMapping":{ "Mapping":[ { "FromPath":[ "col0" ], "ToKey":"col0", "ToType":"string", "FromType":"string", "Dropped":false }, { "FromPath":[ "col1" ], "ToKey":"col1", "ToType":"string", "FromType":"string", "Dropped":false }, { "FromPath":[ "col2" ], "ToKey":"col2", "ToType":"string", "FromType":"string", "Dropped":false }, { "FromPath":[ "col3" ], "ToKey":"col3", "ToType":"string", "FromType":"string", "Dropped":false } ], "Inputs":[ "node-1" ], "Name":"ApplyMapping" } }, "node-3":{ "S3CatalogTarget":{ "Path":"s3://mypath/", "UpdateCatalogOptions":"none", "Inputs":[ "node-1", "node-2" ], "SchemaChangePolicy":{ "enableUpdateCatalog":false }, "Name":"S3 bucket", "Format":"json", "PartitionKeys":[], "Compression":"none" } } } }

以下是一个更复杂的示例:

{ "Name": "myjob2", "Role": "arn:aws:iam::253723508848:role/myrole", "Description": "", "GlueVersion": "2.0", "Command": { "Name": "glueetl", "ScriptLocation": "s3://myscripts/myjob1.py", "PythonVersion": "3" }, "MaxRetries": 3, "Timeout": 2880, "ExecutionProperty": { "MaxConcurrentRuns": 1 }, "node-3": { "S3DirectTarget": { "Path": "s3://mypath/", "UpdateCatalogOptions": "none", "Inputs": [ "node-1624994219677" ], "SchemaChangePolicy": { "EnableUpdateCatalog": false }, "Name": "S3 bucket", "Format": "json", "PartitionKeys": [], "Compression": "none" } }, "node-1624994205115": { " CatalogSource": { "Name": "AWS Glue Data Catalog", "Database": "database2", "Table": "table2" } }, "node-1624994219677": { "Join": { "Name": "Join", "Inputs": [ "node-1624994205115", "node-2" ], "JoinType": "equijoin", "Columns": [ { "From": "node-1624994205115", "Keys": [ "firstname" ] }, { "From": "node-2", "Keys": [ "col0" ] } ], "ColumnConditions": [ "=" ] } }, "node-2": { "S3CatalogSource": { "Database": "database", “Input”: [“node-1624994219677”] "Name": "S3 bucket", "Format": "json", "PartitionKeys": [], "Compression": "none" } } }

更新任务由于 updateJob 也有“codeGenConfigurationNodes”字段,因此输入格式相同。获取任务命令也将以相同的格式返回“codeGenConfigurationNodes”字段。

API 设计和 CRUD API

由于“codeGenConfigurationNodes”参数已添加到现有 API 中,因此将继承这些 API 中的任何限制。此外,codeGenConfigurationNodes 和某些节点的大小也将受到限制。有关完整的限制,请参阅附录。这些限制将应用于每个字段。

一般形状为:

{ "Nodeid-1": {...}, "Nodeid-2": {...} }

需知信息:

  • 键应唯一标识节点

  • 正文包含节点规范

  • 每个节点都将有一个强类型模型

  • 附录部分提供了详尽的节点模型列表

SDK 引导

要访问所需文件,请转到 GitHub 存储库,如下所述。

CLI

转到 GitHub 存储库访问 service-2.json 文件并下载该文件。如果您使用的是 Mac 或 Linux,请将此文件放在 ~/.aws/models/glue/2017- 03-31 文件夹中。如果 .aws 不存在,这意味着您必须配置 Amazon CLI。Amazon CLI 安装说明可在此处找到。如果没有其他文件夹,可以手动创建。此自定义模型的 CLI 的使用方式与像通常使用的 CLI 相同。

Java 软件开发工具包

对于较旧的 Java 客户端,GitHub 存储库中提供了名为 AwsGlueJavaClient-1.12.x.jar 的 JAR。

要使用较新的 java2.x 的 Amazon SDK,GitHub 存储库中提供了名为 AwsJavaSdk-Glue-2.0.jar 的 JAR。

以您喜欢的方式将 JAR 添加到类路径中。将 JAR 添加到类路径后,使用方式与使用现有 Amazon Glue SDK 相同。

附录:可视化任务示例和模型定义

本附录提供了源、数据目标和转换的示例和模型定义。

示例

来自 Glue 目录表的 S3CSVSource:

{ "Database": "database", "Table": "table1", "Name": "S3 bucket", "IsCatalog": true }

RDS 的 CatalogSource:

{ "Database": "database", "Table": "rdsSource", "Name": "MyRdsSource", "IsCatalog": true }

数据目标

S3CatalogTarget

{ "Inputs": [ "node-1625147321253" ], "Database": "dbl", "Table": "s3Table", "Name": "s3 bucket", "Format": "json", "PartitionKeys": [ "col1" ], "UpdateCatalogOptions": "schemaAndPartitions", "SchemaChangePolicy": { "EnableUpdateCatalog": true, "UpdateBehavior": "UPDATE_IN_DATABASE" } }

S3DirectTarget

{ "Path": "s3://mypath/", "UpdateCatalogOptions": "none", "Inputs": [ "node-2" ], "SchemaChangePolicy": { "EnableUpdateCatalog": false }, "Name": "S3 bucket", "Format": "json", "PartitionKeys": [], "Classification": "DataSink", "Compression": "none" }

转换

重命名字段

{ "Inputs": [ "node-1" ], "Name": "MyRenameField", "SourcePath": "col3" "TargetPath": "name" }

筛选条件

{ "Name": "Filter", "Inputs": [ "node-2" ], "LogicalOperator": "AND", "Filters": [ { "Operation": "ISNULL", "Negated": false, "Values": [ { "Type": "COLUMNEXTRACTED", "Value": "col1" } ] }, { "Operation": "REGEX", "Negated": false, "Values": [ { "Type": "CONSTANT", "Value": ".*" }, { "Type": "COLUMNEXTRACTED", "Value": "col2" } ] } ] }

模型定义

AthenaConnector

{ "Name": 100 character String Required, "ConnectionName": 256 character String Required, "ConnectorName": 256 character String, "ConnectionType": 256 character String Required, "ConnectionTable": 256 character String Required, "SchemaName": 256 character String Required }

JDBCConnector

JDBCDataType 是一个 enu。要查看可能值的完整列表,请参阅 JDBCDataType。

{ "Name": 100 character String Required, "ConnectionName": 256 character String Required, "ConnectorName": 256 character String, "ConnectionType": 256 character String Required, "AdditionalOptions": JDBCConnectorOptions, "ConnectionTable": 256 character String, "Query": 256 character String } JDBCConnectorOptions: { "FilterPredicate": 256 character String, "PartitionColumn": 256 character String, "LowerBound": Non-Negative Long, "UpperBound": Non-Negative Long, "NumPartitions": Non-Negative Long, "JobBookmarkKeys": List of Strings up to 100, "JobBookmarkKeysSortOrder": ASC or DESC, "DataTypeMapping": Map<DBCDataType, JDBCDataType>

SparkConnectorSource

{ "Name": 100 character String Required, "ConnectionName": 256 character String Required, "ConnectorName": 256 character String, "ConnectionType": 256 character String Required, "AdditionalOptions": Map<256 character String, Object> }

CatalogSource:

{ "Name": 100 character String Required, "Database": 256 character String Required, "Table": 256 character String Required }

CatalogKinesisSource – 是 CatalogSource 的一种

{ "Name": 100 character String Required, "Database": 256 character String Required, "WindowSize": Positive Integer, "DetectSchema": Boolean, "StreamingOptions": KinesisStreamingSourceOptions, "Table": 256 character String Required }

KinesisStreamingSourceOptions

{ "EndpointUrl":256 character String, "StreamName":256 character String, "Classification":256 character String, "Delimiter":256 character String, "StartingPosition": LATEST or TRIM_HORIZON or EARLIEST, "MaxFetchTimeInMs": Non-negative Long, "MaxFetchRecordsPerShard": non-negative Long, "MaxRecordsPerRead": Non-negative Long, "AddIdleTimeBetweenReads": Boolean, "IdleTimeBetweenReadsInMs": Non-negative Long, "DescribeShardInterval": Non-negative Long, "NumRetries": Positive Integer, "RetryIntervalInMs": Non-negative Long, "MaxRetryIntervalMs": Non-negative Long, "AvoidEmptyBatches": Boolean, "StreamARN": 256 character String "AwsSTSRoleARN": 256 character String, "AwsSTSSessionName": 256 character String }

DirectKinesisSource

{ "Name":100 character String Required, "WindowSize": Positive Integer, "DetectSchema": Boolean, "StreamingOptions": KinesisStreamingSourceOptions }

CatalogKafkaSource

{ "Name":100 character String Required, "Database": 256 character String Required, "WindowSize": Positive Integer, "DetectSchema": Boolean, "StreamingOptions": KafkaStreamingSourceOptions, "Table": 256 character String Required, } KafkaStreamingSourceOptions { "BootstrapServers":256 character String, "SecurityProtocol":256 character String, "ConnectionName":256 character String, "TopicName":256 character String, "Assign":256 character String, "SubscribePattern":256 character String, "Classification":256 character String, "Delimiter":256 character String, "StartingOffsets":256 character String, "EndingOffsets":256 character String, "PollTimeoutInMs": Non-negative long, "NumRetries": Positive integer, "RetryIntervalMs": Non-negative long, "MaxOffsetsPerTrigger": Non-negative long, "MinPartitions": Non-negative integer }

DirectKafkaSource

{ "Name":100 character String Required, "WindowSize": Positive Integer, "DetectSchema": Boolean, "StreamingOptions": KafkaStreamingSourceOptions }

RedshiftSource - 是 CatalogSource 的一种:

{ "Name":100 character String Required, "Database": 256 character String Required, "Table": 256 character String Required, "RedshiftTmpDir":256 character String, "TmpDirIAMRole":256 character String }

S3CatalogSource

{ "Name":100 character String Required, "Database": 256 character String Required, "Table": 256 character String Required, "S3SourceAdditionalOptions": { //Only one can be specified, or neither "BoundedSize":Nullable Long, "BoundedFiles":Nullable Long } }

S3CSVSource

{ "Name":100 character String Required, "Paths": List of Strings. Up to 100 256 character Strings Required, "CompressionType":gzip or bzip2, "Exclusions": List of Strings. Up to 100 256 character Strings, "GroupFiles":256 character String, "GroupSize":256 character String, "Recurse":Boolean, "MaxBand":Integer, "MaxFilesInBand": Non negative Integer, "S3SourceAdditionalOptions": { //Only one can be specified, or neither "boundedSize":Nullable Long, "boundedFiles":Nullable Long }, "Separator":256 character String, "Escaper":256 character String, "QuoteChar":256 character String, "Multiline":Boolean, "WithHeader":Boolean, "WriteHeader":Boolean, "SkipFirst":Boolean, "UseArrowColumnVectors":Boolean, "UseSimdCsvParser":Boolean }

S3JSONSource

{ "Name":100 character String Required, "Paths": List of Strings. Up to 100 256 character Strings Required, "CompressionType":gzip or bzip2, "Exclusions": List of Strings. Up to 100 256 character Strings, "GroupFiles":256 character String, "GroupSize":256 character String, "Recurse":Boolean, "MaxBand": Non negative Integer, "MaxFilesInBand": Non negative Integer, "S3SourceAdditionalOptions": { //Only one can be specified, or neither "BoundedSize":Nullable Long, "BoundedFiles":Nullable Long }, "JsonPath":256 character String, "Multiline":Boolean }

S3ParquetSource

{ "Name":100 character String Required, "Paths": List of Strings. Up to 100 256 character Strings Required, "CompressionType":gzip or bzip2, "Exclusions": List of Strings. Up to 100 256 character Strings, "GroupFiles":256 character String, "GroupSize":256 character String, "Recurse":Boolean, "MaxBand": Non negative Integer, "MaxFilesInBand": Non negative Integer, "S3SourceAdditionalOptions": { //Only one can be specified, or neither "BoundedSize":Nullable Long, "BoundedFiles":Nullable Long }, }

目标

JDBCConnectorTarget

{ "Name":100 character String Required, "Inputs": List of Strings. One 256 character String Required, "ConnectionName":256 character String Required, "ConnectionTable":256 character String, "ConnectorName":256 character String, "ConnectionType":256 character String Required, "ConnectionTypeSuffix":256 character String, "AdditionalOptions":Map<256 character String,Object> }

SparkConnectorTarget

{ "Name":100 character String Required, "Inputs": List of Strings. One 256 character String Required, "ConnectionName":256 character String Required, "ConnectionTable":256 character String, "ConnectorName":256 character String, "ConnectionType":256 character String Required, "ConnectionTypeSuffix": 256 character String, AdditionalOptions":Map<256 character String,Object> }

CatalogTarget

{ "Name":100 character String Required, "Inputs": List of Strings. One 256 character String Required, "Database":256 character String Required, "Table":256 character String Required }

RedshiftTarget

{ “Name”(名称):必填 100 个字符的字符串,“Inputs”(输入):字符串列表。必填 256 个字符的字符串,“Database”(数据库):必填 256 个字符的字符串,“Table”(表格):必填 256 个字符的字符串,“RedshiftTmpDir”:256 字符的字符串,“TmpDirIAMRole”:256 个字符的字符串}

S3CatalogTarget

{ "Name":100 character String Required, "Inputs": List of Strings. One 256 character String Required, "SchemaChangePolicy": SchemaChangePolicy, "PartitionKeys": List of Strings. Up to 100 256 character Strings, "Database":256 character String Required, "Table":256 character String Required } SchemaChangePolicy: { "EnableUpdateCatalog": Boolean, "UpdateBehavior": "LOG" | "UPDATE_IN_DATABASE" }

S3DirectTarget

{ "Name":100 character String Required, "Inputs": List of Strings. One 256 character String Required, "PartitionKeys": List of Strings. Up to 100 256 character Strings, "Path":256 character String Required, "Compression": gzip or bzip2, "Format":json, csv, avro, orc, or parquet Required, "SchemaChangePolicy": DirectSchemaChangePolicy } DirectSchemaChangePolicy: { "EnableUpdateCatalog": Boolean, "UpdateBehavior": "LOG" | "UPDATE_IN_DATABASE", "Database":256 character String, "Table":256 character String }

转换

ApplyMapping

有关 AppyMappingType 的可能值,请参阅文档末尾

{ "Name":100 character String Required, "Inputs": List of Strings. One 256 character String Required, "Mapping":List of up to 250 Mapping Required } Mapping: { "ToKey":256 character String Required, "FromPath": List of Strings. One 256 character String Required, "FromType":ApplyMappingType Required, "ToType": ApplyMappingType Required, "Dropped":Boolean, "Children": List of up to 250 Mapping }

SelectFields

{ "Name":100 character String Required, "Inputs": List of Strings. One 256 character String Required, "Paths": List of Strings. Up to 100 256 character Strings Required }

DropFields

{ "Name":100 character String Required, "Inputs: List of Strings. One 256 character String Required, "Paths": List of Strings. Up to 100 256 character Strings Required }

RenameField

{ "Name":100 character String Required, "Inputs": List of Strings. One 256 character String Required, "SourcePath":List of Strings. Up to 100 256 character Strings Required "TargetPath":256 character String Required }

Spigot

{ "name":100 character String Required, "inputs": List of Strings. One 256 character String Required, "path":256 character String Required, "topk":Integer from 0 to 100, "prob":Double from 0 to 1.0 }

Join

{ "Name": 100 character String Required "Inputs": List of Strings. Two 256 character String Required "JoinTYpe": equijoin, left, right, outer, leftsemi, or leftanti Required "Columns": List[Column] Required } Column: { "From": 256 character String Required "Keys": List[String] Required }

SplitFields

{ "Name":100 character String Required, "Inputs": List of Strings. One 256 character String Required, "Paths":List of Strings. Up to 100 256 character Strings Required }

SelectFromCollection

{ "Name":100 character String Required, "Inputs": List of Strings. One 256 character String Required, "Index": Non Negative Integer Required }

FillMissingValues

{ "Name":100 character String Required, "Inputs": List of Strings. One 256 character String RRequired, "ImputedPath":256 character String Required "FilledPath":256 character String }

筛选条件

{ "Name":100 character String Required, "Inputs": List of Strings. One 256 character String Required, "LogicalOperator":String Required, "Filters":List[FilterInstance] Required } FilterInstance: { "Operation": "EQ" | "LT" | "GT" | "LTE" | "GTE" | "REGEX" | "ISNULL" Required, "Negated":Boolean, "Values":List[FilterValue] Required } FilterValue: { "Type": "COLUMNEXTRACTED" | "CONSTANT" Required, "Value": Object Required,

CustomCode

{ "Name":100 character String Required, "Inputs": List of Strings. One to fifty 256 character String Required, "Code":Up to 51,200 character string or 50 KB Required, "ClassName":256 character String Required }

SparkSQL

{ "Name":100 character String Required, "Inputs": List of Strings. One to fifty 256 character String Required, "SqlQuery": Up to 51,200 character string or 50 KB Required, "SqlAliases":List of Alias. Up to 256 Aliases Required } Alias: { "From":256 character String Required, "Alias":256 character String Required }

DropNullFields

{ "Name":100 character String Required, "Inputs": List of Strings. One to fifty 256 character String Required, "Paths":List of Strings. Up to 100 256 character Strings Required "NullCheckBoxList": NullCheckBoxList, "NullTextList": List of NullValueField. Up to 50 NullValueField. } NullCheckboxList { "IsEmpty": Boolean, "IsNullString": Boolean, "IsNegOne": Boolean } NullvalueFields { "Value": 256 character String, "DataType": DataType } DataType { "Id": 256 character String, "Label": 256 character String }

Union

{ "Name":100 character String Required, "Inputs":List of Strings. Two 256 character String Required, "Sources":List of Strings. Two 256 character String, "UnionType": ALL or DISTINCT Required }

枚举

JDBCDataType

ARRAY,BIGINT,BINARY,BIT,BLOB,BOOLEAN,CHAR,CLOB,DATALINK,DATE,DECIMAL,DISTINCT,DOUBL E,FLOAT,INTEGER,JAVA_OBJECT,LONGNVARCHAR,LONGVARBINARY,LONGVARCHAR,NCHAR,NCLOB,NULL ,NUMERIC,NVARCHAR,OTHER,REAL,REF,REF_CURSOR,ROWID,SMALLINT,SQLXML,STRUCT,TIME,TIME_ WITH_TIMEZONE,TIMESTAMP,TIMESTAMP_WITH_TIMEZONE,TINYINT,VARBINARY,VARCHAR

ApplyMappingType

bigint,binary,boolean,char,date,decimal,double,float,int,interval,long,smallint,str ing,timestamp,tinyint,varchar