在 AWS Glue 中添加流式处理 ETL 作业 - AWS Glue
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 AWS Glue 中添加流式处理 ETL 作业

您可以创建持续运行的流式传输提取、转换和加载 (ETL) 作业,使用来自 Amazon Kinesis Data Streams、Apache Kafka 和 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 等流式传输源的数据。作业清理和转换数据,然后将结果加载到 Amazon S3 数据湖或 JDBC 数据存储中。

默认情况下,AWS Glue 在 100 秒的时段内处理和写出数据。这样,可以高效地处理数据,并允许对在预计时间之后到达的数据执行聚合。您可以修改此窗口大小以提高时间线或聚合准确性。AWS Glue 流式处理作业使用检查点而不是作业书签来跟踪已读取的数据。

注意

AWS Glue 在运行时按小时对流式处理 ETL 作业计费。

创建流式处理 ETL 作业涉及以下步骤:

  1. 对于 Apache Kafka 流式传输源,请创建到 Kafka 源或 AWS Glue 集群的 Amazon MSK 连接。

  2. 手动为流式处理源创建 Data Catalog 表。

  3. 为流式处理数据源创建 ETL 作业。定义特定于流式处理的作业属性,并提供您自己的脚本或(可选)修改生成的脚本。

为 Amazon Kinesis Data Streams 创建流式处理 ETL 作业时,您不必创建 AWS Glue 连接。但是,如果存在附加到 AWS Glue 流式处理 ETL 作业的连接,而该作业使用 Kinesis Data Streams 作为源,则需要指向 Kinesis 的 Virtual Private Cloud (VPC) 终端节点。有关更多信息,请参阅 中的创建接口终端节点。Amazon VPC 用户指南

为 Apache Kafka 数据流创建 AWS Glue 连接

要从 Apache Kafka 流中进行读取,您必须创建 AWS Glue 连接。

为 Kafka 源创建 AWS Glue 连接(控制台)

  1. 通过以下网址打开 AWS Glue 控制台:https://console.amazonaws.cn/glue/

  2. 在导航窗格的 Data catalog (数据目录) 下,选择 Connections (连接)

  3. 选择 Add connection (添加连接),然后在 Set up your connection’s properties (设置连接的属性) 页面上,输入连接名称。

  4. 对于 Connection type (连接类型),选择 Kafka

  5. 对于 Kafka bootstrap servers (Kafka 引导服务器)URLs,输入 集群或 Apache Kafka 集群的引导代理的主机和端口号。Amazon MSK仅使用传输层安全性 (TLS) 终端节点建立与 Kafka 集群的初始连接。不支持明文终端节点。

    以下是 Amazon MSK 集群的主机名和端口号对的示例列表。

    myserver1.kafka.us-east-1.amazonaws.com:9094,myserver2.kafka.us-east-1.amazonaws.com:9094, myserver3.kafka.us-east-1.amazonaws.com:9094

    有关获取引导代理信息的更多信息,请参阅 https://docs.amazonaws.cn/msk/latest/developerguide/msk-get-bootstrap-brokers.html 中的获取 Amazon MSK 集群的引导代理Amazon Managed Streaming for Apache Kafka 开发人员指南。

  6. 如果您希望安全连接 Kafka 数据源,请选择 Require SSL connection (需要 SSL 连接),对于 Kafka private CA certificate location (Kafka 私有 CA 证书位置),输入自定义 SSL 证书的有效 Amazon S3 路径。

    对于与自行管理 Kafka 的 SSL 连接,自定义证书是必需的。对于 Amazon MSK 是可选的。

    有关为 Kafka 指定自定义证书的更多信息,请参阅AWS Glue 连接 SSL 属性

  7. (可选)输入描述,然后选择 Next (下一步)

  8. 对于 Amazon MSK 集群,请指定其 Virtual Private Cloud (VPC)、子网和安全组。对于自行管理的 Kafka,VPC 信息是可选的。

  9. 选择 Next (下一步) 可查看所有连接属性,然后选择 Finish (完成)

有关 AWS Glue 连接的更多信息,请参阅 AWS Glue 连接

为流式处理源创建 Data Catalog 表

在创建流式处理 ETL 作业之前,您必须手动创建一个指定源数据流属性(包括数据架构)的 Data Catalog 表。此表用作流式处理 ETL 作业的数据源。

如果您不知道源数据流中的数据的架构,可以创建不带架构的表。然后,当您创建流式处理 ETL 作业时,您可以启用 AWS Glue 架构检测函数。AWS Glue 从流式处理数据中确定架构。

可使用 AWS Glue 控制台、AWS Command Line Interface (AWS CLI) 或 AWS Glue API 创建表。有关使用 AWS Glue 控制台手动创建表的信息,请参阅在 AWS Glue 数据目录 中定义表

注意

无法使用 AWS Lake Formation 控制台创建表;必须使用 AWS Glue 控制台。

创建表时,请设置以下流式处理 ETL 属性。

源的类型

KinesisKafka

对于 Kinesis 源:
流名称

Amazon Kinesis Data Streams 开发人员指南 中的创建流中所述的流名称。

Kinesis 源 URL

Amazon Kinesis Data Streams 服务的完全限定 URL。

示例:https://kinesis.us-east-1.amazonaws.com

对于 Kafka 源:
主题名称

Kafka 中指定的主题名称。

Connection

一个引用 Kafka 源的 AWS Glue 连接,如为 Apache Kafka 数据流创建 AWS Glue 连接中所述。

此外,对于 Avro 格式的流式传输源或可将 Grok 模式应用于的日志数据,请考虑以下信息。

Avro 流式传输源的注意事项和限制

以下说明和限制适用于 Avro 格式的流式传输源:

  • 启用架构检测后,Avro 架构必须包含在负载中。禁用后,负载应仅包含数据。

  • 一些 Avro 数据类型在动态帧中不受支持。使用 控制台的创建表向导中的 Define a schema (定义架构)AWS Glue 页面定义架构时,无法指定这些数据类型。在架构检测期间,Avro 架构中不支持的类型将转换为支持的类型,如下所示:

    • EnumType => StringType

    • FixedType => BinaryType

    • UnionType => StructType

  • 如果您使用控制台中的 Define a schema (定义架构) 页面定义表架构,则该架构的隐式根元素类型为 record。 如果您需要 record 之外的根元素类型(例如 arraymap),则无法使用 Define a schema (定义架构) 页面指定架构。相反,您必须跳过该页面,并将架构指定为表属性或在 ETL 脚本中指定。

    • 要在表属性中指定架构,请完成创建表向导,编辑表详细信息,并在 Table properties (表属性) 下添加新的键/值对。使用键 avroSchema,然后输入值的架构 JSON 对象,如以下屏幕截图中所示。

      
                                    在 Table properties (表属性) 标题下,有两个文本字段列。左列标题为 Key (键),右列标题为 Value (值)。第一行中的键/值对是分类/效用。第二行中的键/值对是 avroSchema/{"type":"array","items":"string"}。
    • 要在 ETL 脚本中指定架构,请修改 datasource0 分配语句并将 avroSchema 键添加到 additional_options 参数,如以下 Python 和 Scala 示例所示。

      Python
      SCHEMA_STRING = ‘{"type":"array","items":"string"}’ datasource0 = glueContext.create_data_frame.from_catalog(database = "database", table_name = "table_name", transformation_ctx = "datasource0", additional_options = {"startingPosition": "TRIM_HORIZON", "inferSchema": "false", "avroSchema": SCHEMA_STRING})
      Scala
      val SCHEMA_STRING = """{"type":"array","items":"string"}""" val datasource0 = glueContext.getCatalogSource(database = "database", tableName = "table_name", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions(s"""{"startingPosition": "TRIM_HORIZON", "inferSchema": "false", "avroSchema":"$SCHEMA_STRING"}""")).getDataFrame()

将 Grok 模式应用于流式传输源

您可以为日志数据源创建流式处理 ETL 作业,并使用 Grok 模式将日志转换为结构化数据。然后,ETL 作业将数据作为结构化数据源进行处理。您可以指定在为流式传输源创建 Data Catalog 表时要应用的 Grok 模式。

有关 Grok 模式和自定义模式字符串值的信息,请参阅编写 Grok 自定义分类器

将 Grok 模式添加到 Data Catalog 表(控制台)

  • 使用创建表向导,并使用为流式处理源创建 Data Catalog 表中指定的参数创建表。将数据格式指定为 Grok,填写 Grok pattern (Grok 模式) 字段,并选择性地在 Custom patterns (optional) (自定义模式(可选)) 下添加自定义模式。

    
                            *

    在每个自定义模式之后按 Enter

将 Grok 模式添加到 Data Catalog 表(AWS Glue API 或 AWS CLI)

  • GrokPattern 参数和(可选)CustomPatterns 参数添加到 CreateTable API 操作或 create_table CLI 命令。

    "Parameters": { ... "grokPattern": "string", "grokCustomPatterns": "string", ... },

    grokCustomPatterns 表示为字符串,并使用“\\n”作为模式之间的分隔符。

    以下是指定这些参数的示例。

    "parameters": { ... "grokPattern": "%{USERNAME:username} %{DIGIT:digit:int}", "grokCustomPatterns": "digit \d", ... }

定义流式处理 ETL 作业的作业属性

在 AWS Glue 控制台上定义流式处理 ETL 作业时,请提供以下特定于流的属性。有关其他作业属性的说明,请参阅定义作业属性。有关如何使用 AWS Glue 控制台添加作业的更多信息,请参阅在 AWS Glue 控制台上处理作业

IAM 角色

指定用于对运行作业、访问流式处理源和访问目标数据存储所用的资源进行授权的 AWS Identity and Access Management (IAM) 角色。

要访问 Amazon Kinesis Data Streams,请将 AmazonKinesisFullAccess AWS 托管策略附加到该角色,或附加类似的 IAM 策略来允许更精细的访问。有关示例策略,请参阅使用 IAM 控制对 Amazon Kinesis Data Streams 资源的访问

有关在 AWS Glue 中运行作业的权限的更多信息,请参阅管理 AWS Glue 资源的访问权限

类型

选择 Spark streaming (Spark 流式处理)

AWS Glue version

Glue 版本决定了可用于作业的 Apache Spark 的版本以及 Python 或 Scala 的版本。为 Glue 版本 1.0 或 Glue 版本 2.0 选择一个选项,该选项指定可供作业使用的 Python 或 Scala 版本。AWS Glue具有 Python 3 支持的版本 2.0 是流式处理 ETL 作业的默认版本。

作业超时

(可选)输入持续时间(以分钟为单位)。如果将此字段留空,作业将连续运行。

数据源

指定您在为流式处理源创建 Data Catalog 表中创建的表。

数据目标

请执行下列操作之一:

  • 选择 Create tables in your data target (在数据目标中创建表) 并指定以下数据目标属性。

    数据存储

    选择 Amazon S3 或 JDBC。

    格式

    选择任意格式。所有项都支持流式处理。

  • 选择 Use tables in the data catalog and update your data target (在数据目录中使用表并更新数据目标),然后为 JDBC 数据存储选择表。

输出架构定义

请执行下列操作之一:

  • 选择 Automatically detect schema of each record (自动检测每条记录的架构) 以启用架构检测。AWS Glue 根据流数据确定架构。

  • 选择 Specify output schema for all records 以使用应用映射转换来定义输出架构。

Script

(可选)提供您自己的脚本或修改生成的脚本以执行 Apache Spark Structured Streaming 引擎支持的操作。有关可用操作的信息,请参阅流式处理 DataFrames/数据集的操作

流式处理 ETL 注释和限制

请记住以下注释和限制:

  • 使用架构检测时,您无法执行流数据的联接。

  • 您的 ETL 脚本可以使用 AWS Glue 的内置转换以及 Apache Spark Structured Streaming 的原生转换。有关更多信息,请参阅 Apache Spark 网站上的流式处理DataFrames/数据集的操作内置转换

  • AWS Glue 流式处理 ETL 作业使用检查点来跟踪已读取的数据。因此,停止并重新启动的作业将从流中停止的位置开始。如果要重新处理数据,您可以删除脚本中引用的检查点文件夹。

  • 不支持作业书签。

  • 如果 Amazon Kinesis 流式处理作业正在运行并且正在使用来自 AWS Glue 数据流的数据,则无法更改该流的分片数。先停止作业,修改流分片,然后重新启动作业。

  • Kinesis 流必须位于与 AWS Glue 作业相同的账户中。

  • 您不能将作业注册为 Kinesis Data Streams 的增强型扇出功能使用者。