在 Amazon Glue 中添加串流 ETL 任务 - Amazon Glue
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

在 Amazon Glue 中添加串流 ETL 任务

您可以创建连续运行的串流提取、转换和负载(ETL)任务,使用来自 Amazon Kinesis Data Streams、Apache Kafka 和 Amazon Managed Streaming for Apache Kafka(Amazon MSK)等串流源的数据。这些任务会清理并转换数据,然后将结果加载到 Amazon S3 数据湖或 JDBC 数据存储中。

默认情况下,Amazon Glue 在 100 秒的时段内处理和写出数据。这可以实现数据的高效处理,并允许对晚于预计时间到达的数据执行聚合。您可以修改此窗口时段的大小以提高及时性或聚合精度。Amazon Glue 串流任务使用检查点而非任务书签来跟踪已读取的数据。

注意

当串流 ETL 任务正在运行时,Amazon Glue 会按小时计费。

创建串流 ETL 任务涉及以下步骤:

  1. 对于 Apache Kafka 串流源,请创建与 Kafka 源或 Amazon MSK 集群的 Amazon Glue 连接。

  2. 手动为串流源创建数据目录表。

  3. 为串流数据源创建 ETL 任务。定义特定于串流的任务属性,并提供您自己的脚本或(可选)修改生成的脚本。

有关更多信息,请参阅 Amazon Glue 中的流式处理 ETL

为 Amazon Kinesis Data Streams 创建串流 ETL 任务时,您无需创建 Amazon Glue 连接。但是,如果有一个连接附加到以 Kinesis Data Streams 作为源的 Amazon Glue 串流 ETL 任务,则需要提供到 Kinesis 的 Virtual Private Cloud(VPC)终端节点。有关更多信息,请参阅 Amazon VPC 用户指南中的创建接口端点 在另一个账户中指定 Amazon Kinesis Data Streams 串流时,您必须设置角色和策略从而允许跨账户访问。有关更多信息,请参阅示例:从不同账户的 Kinesis 串流中读取

Amazon Glue 流式传输 ETL 任务可以自动检测压缩数据,以透明方式解压流式传输数据,对输入源执行常见转换,并加载到输出存储。

如果是以下输入格式,则 Amazon Glue 支持自动解压以下压缩类型:

压缩类型 Avro 文件 Avro 基准 JSON CSV Grok
BZIP2
GZIP
SNAPPY 是(原始 Snappy) 是(framed Snappy) 是(framed Snappy) 是(framed Snappy) 是(framed Snappy)
XZ
ZSTD
DEFLATE

为 Apache Kafka Data Stream 创建 Amazon Glue 连接

要从 Apache Kafka 流中进行读取,您必须创建 Amazon Glue 连接。

为 Kafka 源创建 Amazon Glue 连接(控制台)

  1. 打开 Amazon Glue 控制台,地址:https://console.aws.amazon.com/glue/

  2. 在导航窗格的 Data catalog (数据目录) 下,选择 Connections (连接)

  3. 选择 Add connection (添加连接),然后在 Set up your connection’s properties (设置连接的属性) 页面上,输入连接名称。

    注意

    有关指定连接属性的更多信息,请参阅 Amazon Glue连接属性。

  4. 对于 Connection type (连接类型),选择 Kafka

  5. 对于 Kafka bootstrap servers URLs (Kafka 引导服务器 URL),输入您 Amazon MSK 集群或 Apache Kafka 集群引导代理的主机和端口编号。仅使用传输层安全性(TLS)端点建立到 Kafka 群集的初始连接。不支持 Plaintext 端点。

    以下是 Amazon MSK 集群的主机名和端口编号对的示例列表。

    myserver1.kafka.us-east-1.amazonaws.com:9094,myserver2.kafka.us-east-1.amazonaws.com:9094, myserver3.kafka.us-east-1.amazonaws.com:9094

    有关引导代理信息的更多信息,请参阅 Amazon Managed Streaming for Apache Kafka 开发人员指南中的获取 Amazon MSK 集群的引导代理

  6. 如果您希望与 Kafka 数据源建立安全连接,请选择 Require SSL connection (需要 SSL 连接),并在 Kafka private CA certificate location (Kafka 私有 CA 证书位置) 中,输入自定义 SSL 证书的有效 Amazon S3 路径。

    对于与自我托管式 Kafka 的 SSL 连接,自定义证书是强制性的。对于 Amazon MSK 则是可选的。

    有关为 Kafka 指定自定义证书的更多信息,请参阅 Amazon Glue 连接 SSL 属性

  7. (可选)输入描述,然后选择 Next (下一步)

  8. 对于 Amazon MSK 集群,请指定其 Virtual Private Cloud(VPC)、子网和安全组。对于自行托管式 Kafka,VPC 信息是可选的。

  9. 选择 Next (下一步) 以查看所有连接属性,然后选择 Finish (结束)

有关 Amazon Glue 连接的更多信息,请参阅 Amazon Glue 连接

为串流源创建数据目录表

您可以为流式传输源手动创建数据目录表,以指定源数据流属性(包括数据 Schema)。此表用作串流 ETL 任务的数据源。

如果您不知道源数据流中数据的架构,则可以在不使用架构的情况下创建表。然后,当您创建串流 ETL 任务时,您可以打开 Amazon Glue 架构检测函数。Amazon Glue 会通过流数据来确定架构。

可使用 Amazon Glue 控制台、Amazon Command Line Interface (Amazon CLI) 或 Amazon Glue API 创建表。有关使用 Amazon Glue 控制台手动创建表的信息,请参阅 在 Amazon Glue Data Catalog 中定义表

注意

无法使用 Amazon Lake Formation 控制台创建表;必须使用 Amazon Glue 控制台。

另外,请考虑以下有关 Avro 格式的串流源或可以应用 Grok 模式的日志数据的信息。

Kinesis 数据源

在创建表时,请设置以下串流 ETL 属性(控制台)。

源的类型

Kinesis

对于同一账户中的 Kinesis 源:
区域

存在 Amazon Kinesis Data Streams 服务的 Amazon 区域。“区域”和 Kinesis 流名称会一起转换为“流 ARN”。

示例:https://kinesis.us-east-1.amazonaws.com

Kinesis 流名称

流名称如《Amazon Kinesis Data Streams 开发人员指南》https://docs.amazonaws.cn/streams/latest/dev/kinesis-using-sdk-java-create-stream.html中的创建流所述。

有关其他账户中的 Kinesis 源,请参阅此示例来设置角色和策略以允许进行跨账户访问。配置以下设置:
流 ARN

使用者用于注册的 Kinesis Data Streams ARN。有关更多信息,请参阅 Amazon 一般参考中的 Amazon Resource Name(ARN)和 Amazon 服务命名空间

所担任角色的 ARN

担任角色的 Amazon Resource Name(ARN)。

会话名称(可选)

所担任角色会话的标识符。

在根据不同规则或因为不同原因担任相同角色时,使用角色会话名称对会话进行唯一标识。在跨账户方案中,角色会话名称对于拥有此角色的账户可见,并且可以由拥有该角色的账户记录。所担任角色规则的 ARN 中也使用角色会话名称。这意味着使用临时安全证书的后续跨账户 API 请求会在其 Amazon CloudTrail 日志中将角色会话名称公开给外部账户。

为 Amazon Kinesis Data Streams 设置串流 ETL 属性(Amazon Glue API 或 Amazon CLI)

  • 要为同一账户中的 Kinesis 源设置串流 ETL 属性,请在 CreateTable API 操作或 create_table CLI 命令的 StorageDescriptor 结构中指定 streamNameendpointUrl 参数。

    "StorageDescriptor": { "Parameters": { "typeOfData": "kinesis", "streamName": "sample-stream", "endpointUrl": "https://kinesis.us-east-1.amazonaws.com" } ... }

    或者,指定 streamARN

    "StorageDescriptor": { "Parameters": { "typeOfData": "kinesis", "streamARN": "arn:aws:kinesis:us-east-1:123456789:stream/sample-stream" } ... }
  • 要为其他账户中的 Kinesis 源设置串流 ETL 属性,请在 CreateTable API 操作或 create_table CLI 命令的 StorageDescriptor 结构中指定 streamARNawsSTSRoleARNawsSTSSessionName(可选)参数。

    "StorageDescriptor": { "Parameters": { "typeOfData": "kinesis", "streamARN": "arn:aws:kinesis:us-east-1:123456789:stream/sample-stream", "awsSTSRoleARN": "arn:aws:iam::123456789:role/sample-assume-role-arn", "awsSTSSessionName": "optional-session" } ... }

Kafka 数据源

在创建表时,请设置以下串流 ETL 属性(控制台)。

源的类型

Kafka

对于 Kafka 源:
主题名称

Kafka 中指定的主题名称。

连接

一个引用 Kafka 源的 Amazon Glue 连接,如 为 Apache Kafka Data Stream 创建 Amazon Glue 连接 中所述。

Amazon Glue 架构注册表源

要将 Amazon Glue 架构注册表用于串流任务,请按照位于 使用案例:Amazon Glue Data Catalog 的说明创建或更新架构注册表。

目前,Amazon Glue 串流仅支持架构推理推断设置为 false 的 Glue Schema Registry Avro 格式。

Avro 串流源的注释和限制

以下注释和限制适用于 Avro 格式的串流源:

  • 启用架构检测后,Avro 架构必须包含在负载中。关闭时,负载应仅包含数据。

  • 某些 Avro 数据类型在动态帧中不受支持。在使用 Amazon Glue 控制台的创建表向导中的 Define a schema (定义架构) 页面时,您无法指定这些数据类型。在架构检测期间,Avro 架构中不受支持的类型将转换为受支持的类型,如下所示:

    • EnumType => StringType

    • FixedType => BinaryType

    • UnionType => StructType

  • 如果使用控制台中的 Define a schema (定义架构) 页面,则架构的隐含根元素类型为 record。如果你想要一个除 record 以外的根元素类型,例如 array 或者 map,则不能使用 Define a schema (定义架构) 页面来指定架构。相反,您必须跳过该页并将架构指定为表属性或在 ETL 脚本中指定。

    • 要在表属性中指定架构,请完成创建表向导,编辑表详细信息,并在 Table properties (表属性) 下添加新的键值对。使用密钥 avroSchema,然后为值输入架构 JSON 对象,如以下屏幕截图所示。

      
                                在 Table properties (表属性) 标题中,有两列文本字段。左侧列标题为 Key (密钥),右侧列标题为 Value (值)。第一行中的键/值对是 classification/avro。第二行中的键/值对是 Avroschema/{"type":"array","items":"string"}。
    • 要在 ETL 脚本中指定架构,请修改 datasource0 任务语句并将 avroSchema 密钥添加到 additional_options 参数,如以下 Python 和 Scala 示例所示。

      Python
      SCHEMA_STRING = ‘{"type":"array","items":"string"}’ datasource0 = glueContext.create_data_frame.from_catalog(database = "database", table_name = "table_name", transformation_ctx = "datasource0", additional_options = {"startingPosition": "TRIM_HORIZON", "inferSchema": "false", "avroSchema": SCHEMA_STRING})
      Scala
      val SCHEMA_STRING = """{"type":"array","items":"string"}""" val datasource0 = glueContext.getCatalogSource(database = "database", tableName = "table_name", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions(s"""{"startingPosition": "TRIM_HORIZON", "inferSchema": "false", "avroSchema":"$SCHEMA_STRING"}""")).getDataFrame()

将 Grok 模式应用于串流源

您可以为日志数据源创建串流 ETL 任务,并使用 Grok 模式将日志转换为结构化数据。然后,ETL 任务会将数据作为结构化数据源进行处理。在为串流源创建数据目录表时,可以指定要应用的 Grok 模式。

有关 Grok 模式和自定义模式字符串值的信息,请参阅 编写 Grok 自定义分类器

将 Grok 模式添加到数据目录表(控制台)

  • 使用创建表向导,并使用在 为串流源创建数据目录表 中指定的参数创建表。将数据格式指定为 Grok,填写 Grok pattern (Grok 模式) 字段,并可选择在 Custom patterns (optional) (自定义模式(可选)) 下添加自定义模式。

    
                        *

    在每个自定义模式后按 Enter

将 Grok 模式添加到数据目录表(Amazon Glue API 或 Amazon CLI)

  • 添加 GrokPattern 参数,并且可以选择将 CustomPatterns 参数添加到 CreateTable API 操作或 create_table CLI 命令。

    "Parameters": { ... "grokPattern": "string", "grokCustomPatterns": "string", ... },

    grokCustomPatterns 表达为字符串,并使用“\n”作为模式之间的分隔符。

    以下是指定这些参数的示例。

    "parameters": { ... "grokPattern": "%{USERNAME:username} %{DIGIT:digit:int}", "grokCustomPatterns": "digit \d", ... }

定义串流 ETL 任务的任务属性

在 Amazon Glue 控制台上定义串流 ETL 任务时,请提供以下特定于流的属性。有关其他任务属性的说明,请参阅 定义 Spark 任务的任务属性。有关如何使用 Amazon Glue 控制台添加任务的更多信息,请参阅 在 Amazon Glue 控制台上处理作业

IAM 角色

指定用于对运行任务、访问串流源和访问目标数据存储所用的资源进行授权的 Amazon Identity and Access Management(IAM)角色。

要访问 Amazon Kinesis Data Streams,请将 AmazonKinesisFullAccess Amazon 托管式策略附加到该角色,或附加类似的 IAM 策略来允许更精细的访问。有关示例策略,请参阅使用 IAM 控制对 Amazon Kinesis Data Streams 资源的访问

有关在 Amazon Glue 中运行任务的权限的更多信息,请参阅 管理 Amazon Glue 资源的访问权限

类型

选择 Spark streaming (Spark 串流)

Amazon Glue 版本

Amazon Glue 版本确定可用于任务的 Apache Spark、Python 或 Scala 版本。为 Glue 1.0 版或 Glue 2.0 版选择一个选项,该选项指定可供任务使用的 Python 或 Scala 版本。Amazon Glue支持 Python 3 的 2.0 版是串流 ETL 任务的默认设置。

作业超时

(可选)输入持续时间(以分钟为单位)。默认值为空,这意味着任务可能无限期运行。

数据源

指定您在 为串流源创建数据目录表 中创建的表。

数据目标

请执行下列操作之一:

  • 选择 Create tables in your data target (在数据目标中创建表) 并指定以下数据目标属性。

    数据存储

    选择 Amazon S3 或 JDBC。

    格式

    选择任意格式。所有项都支持流式处理。

  • 选择 Use tables in the data catalog and update your data target (使用数据目录中的表并更新数据目标),然后选择用于 JDBC 数据存储的表。

输出架构定义

请执行下列操作之一:

  • 选择 Automatically detect schema of each record (自动检测每条记录的架构) 以启动架构检测。Amazon Glue 可以通过串流数据确定架构。

  • 选择 Specify output schema for all records (指定所有记录的输出方案) 以使用 Apply Mapping(应用映射)转换来定义输出架构。

Script

(可选)提供您自己的脚本或修改生成的脚本以执行 Apache Spark Structured Streaming 引擎支持的操作。有关可用操作的信息,请参阅串流 DataFrame/数据集的操作

串流 ETL 注释和限制

请记住以下注释和限制:

  • 自动解压 Amazon Glue 流式传输 ETL 任务仅适用于受支持的压缩类型。另请注意以下几点:

    • Framed Snappy 是指适用于 Snappy 的帧格式

    • Deflate 在 Glue 版本 3.0 而不是 Glue 版本 2.0 中受支持。

  • 使用架构检测时,无法执行串流数据联接。

  • 您的 ETL 脚本可以使用 Amazon Glue 的内置转换和 Apache Spark Structured Streaming 的原生转换。有关更多信息,请参阅 Apache Spark 网站或 内置转换 上的串流 DataFrame/数据集的操作

  • Amazon Glue 串流 ETL 任务使用检查点来跟踪已读取的数据。因此,停止并重新启动的任务将从流中停止的位置开始。如果要重新处理数据,您可以删除脚本中引用的检查点文件夹。

  • 不支持任务书签。

  • 由于 Kinesis Data Streams 的增强扇出功能,您无法将任务注册为使用者。

  • 如果您使用在 Amazon Glue 架构注册表中创建的数据目录表,则当新的架构版本可用时,要反映该新架构,您需要执行以下操作:

    1. 停止与该表关联的任务。

    2. 更新数据目录表的架构。

    3. 重新启动与该表关联的任务。