在 Amazon Glue 中添加流式处理 ETL 作业 - Amazon连接词
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Amazon Glue 中添加流式处理 ETL 作业

可以创建连续运行的流式处理提取、转换和加载 (ETL) 作业、使用流式处理源(例如 Amazon Kinesis Data Streams、Apache Kafka 和 Apache Kafka(亚马逊 MSK)等 Managed Streaming for Apache Kafka 源(Amazon MSK)中的数据。这些作业会清理并转换数据,然后将结果加载到 Amazon S3 数据湖或 JDBC 数据存储中。

默认情况下,Amazon Glue 在 100 秒的时段内处理和写出数据。这可以实现数据的高效处理,并允许对在预计时间之后到达的数据执行聚合。可以修改此时段大小以提高及时性或聚合精度。Amazon Glue流式处理作业使用检查点而非作业书签来跟踪已读取的数据。

注意

Amazon Glue当 ETL 作业正在运行时,按小时计费。

创建流式处理 ETL 作业涉及以下步骤:

  1. 对于 Apache Kafka 流式处理源,请创建Amazon Glue连接到 Kafka 源或亚马逊 MSK 集群。

  2. 手动为流式处理源创建数据目录表。

  3. 为流式处理数据源创建 ETL 作业。定义特定于流式处理的作业属性,并提供您自己的脚本或(可选)修改生成的脚本。

有关更多信息,请参阅Amazon Glue 中的流式处理 ETL

为 Amazon Kinesis 数据流创建流式 ETL 作业时,您无需创建Amazon Glue连接。但是,如果有一个连接附加到Amazon Glue以 Kinesis 数据流作为源的流 ETL 作业,则需要一个虚拟私有云 (VPC) 终端节点到 Kinesis。有关更多信息,请参阅 。创建接口终端节点中的Amazon VPC 用户指南。在另一个账户中指定 Amazon Kinesis Data Streams 时,您必须设置角色和策略以允许跨账户访问。有关更多信息,请参阅 。示例:从不同账户的 Kinesis 流中读取

创建Amazon GlueApache Kafka 数据流的连接

要从 Apache Kafka 流中进行读取,您必须创建 Amazon Glue 连接。

为 Kafka 源创建 Amazon Glue 连接(控制台)

  1. 打开Amazon Glue控制台https://console.aws.amazon.com/glue/

  2. 在导航窗格的 Data catalog (数据目录) 下,选择 Connections (连接)

  3. 选择 Add connection (添加连接),然后在 Set up your connection’s properties (设置连接的属性) 页面上,输入连接名称。

  4. 对于 Connection type (连接类型),选择 Kafka

  5. 适用于卡夫卡引导服务器 URL中,输入您的 Amazon MSK 集群或 Apache Kafka 集群的引导代理程序的主机和端口号。仅使用传输层安全性 (TLS) 终结点建立到 Kafka 群集的初始连接。不支持纯文本终端节点。

    以下是 Amazon MSK 群集的主机名和端口号对的示例列表。

    myserver1.kafka.us-east-1.amazonaws.com:9094,myserver2.kafka.us-east-1.amazonaws.com:9094, myserver3.kafka.us-east-1.amazonaws.com:9094

    有关获取引导程序代理信息的更多信息,请参阅获取 Amazon MSK 集群的引导代理中的Amazon Managed Streaming for Apache Kafka 开发人员指南

  6. 如果您希望与 Kafka 数据源建立安全连接,请选择需要 SSL 连接,以及卡夫卡私人 CA 证书位置中,输入自定义 SSL 证书的有效 Amazon S3 路径。

    对于与自我管理的 Kafka 的 SSL 连接,自定义证书是强制性的。它是可选的亚马逊 MSK。

    有关为 Kafka 指定自定义证书的更多信息,请参阅Amazon Glue 连接 SSL 属性

  7. (可选)输入描述,然后选择下一步

  8. 对于 Amazon MSK 集群,请指定其 Virtual Private Cloud (VPC)、子网和安全组。VPC 信息对于自我管理的卡夫卡是可选的。

  9. 选择下一步以查看所有连接属性,然后选择Finish

有关 Amazon Glue 连接的更多信息,请参阅 Amazon Glue 连接

为流式处理源创建数据目录表

创建流式处理 ETL 作业之前,您必须手动创建数据目录表来指定源数据流属性(包括数据架构)。此表用作流式处理 ETL 作业的数据源。

如果您不知道源数据流中数据的模式,则可以在不使用模式的情况下创建表。然后,当您创建流式 ETL 作业时,您可以打开Amazon Glue模式检测函数。Amazon Glue从流数据确定架构。

可使用 Amazon Glue 控制台、Amazon Command Line Interface (Amazon CLI) 或 Amazon Glue API 创建表。有关使用 Amazon Glue 控制台手动创建表的信息,请参阅在 Amazon Glue Data Catalog 中定义表

注意

无法使用 Amazon Lake Formation 控制台创建表;必须使用 Amazon Glue 控制台。

在创建表时,请设置以下流式处理 ETL 属性(控制台)。

源的类型

KinesisKafka

对于同一账户中的 Kinesis 源:
区域

这些区域有:AmazonAmazon Kinesis Data Streams 服务所在的区域。“区域” 和 “Kinesis” 流名称一起转换为 “流 ARN”。

示例:https://kinesis.us-east-1.amazonaws.com

Kinesis Stream 名

流名称,如创建流中的Amazon Kinesis Data Streams 开发人员指南

有关其他帐户中的 Kinesis 源,请参阅这个示例来设置角色和策略,以允许跨账户访问。配置以下设置:
流 ARN

在其中注册使用者的 Kinesis 数据流的 ARN。有关更多信息,请参阅 。Amazon 资源名称 (ARN) 和Amazon服务命名空间中的Amazon一般参考

代入角色 ARN

要担任角色的 Amazon 资源名称 (ARN)。

会话名称(可选)

所担任角色会话的标识符。

当不同承担者担任相同角色或由于不同原因而使用角色会话名称唯一标识会话。在跨账户方案中,角色会话名称对于可见,并且可以由拥有该角色的帐户记录。该角色会话名称也用于所担任角色委托人的 ARN 中。这意味着,使用临时安全证书的后续跨账户 API 请求将将角色会话名称公开给Amazon CloudTrail日志。

对于 Kafka 源:
主题名称

Kafka 中指定的主题名称。

Connection

一个引用 Kafka 源的 Amazon Glue 连接,如创建Amazon GlueApache Kafka 数据流的连接中所述。

要为 Amazon Kinesis Data Streams 式 ETL 属性(Amazon GlueAPI 或Amazon CLI)

  • 要为同一帐户中的 Kinesis 源设置流式 ETL 属性,请指定streamNameendpointUrl中的StorageDescriptor的结构CreateTableAPI 操作或create_tableCLI 命令。

    "StorageDescriptor": { "Parameters": { "typeOfData": "kinesis", "streamName": "sample-stream", "endpointUrl": "https://kinesis.us-east-1.amazonaws.com" } ... }

    或者,指定streamARN

    "StorageDescriptor": { "Parameters": { "typeOfData": "kinesis", "streamARN": "arn:aws:kinesis:us-east-1:123456789:stream/sample-stream" } ... }
  • 要为另一个帐户中的 Kinesis 源设置流式 ETL 属性,请指定streamARNawsSTSRoleARNawsSTSSessionName(可选)中的StorageDescriptor中的CreateTableAPI 操作或create_tableCLI 命令。

    "StorageDescriptor": { "Parameters": { "typeOfData": "kinesis", "streamARN": "arn:aws:kinesis:us-east-1:123456789:stream/sample-stream", "awsSTSRoleARN": "arn:aws:iam::123456789:role/sample-assume-role-arn", "awsSTSSessionName": "optional-session" } ... }

另外,请考虑以下有关 Avro 格式的流源或可以应用 Grok 模式的日志数据的信息。

Avro 流媒体源的注释和限制

以下注释和限制适用于 Avro 格式的流式传输源:

  • 启用模式检测后,Avro 架构必须包含在有效负载中。关闭时,有效负载应仅包含数据。

  • 某些 Avro 数据类型在动态帧中不受支持。在定义模式时,您无法指定这些数据类型定义架构页面中的Amazon Glue控制台。在模式检测期间,Avro 架构中不受支持的类型将转换为支持的类型,如下所示:

    • EnumType => StringType

    • FixedType => BinaryType

    • UnionType => StructType

  • 如果使用定义架构页面中,模式的隐含根元素类型为record。如果你想要一个除record,例如,array或者map,则不能使用定义架构页. 相反,您必须跳过该页并将架构指定为表属性或在 ETL 脚本中。

    • 要在表属性中指定架构,请完成创建表向导,编辑表详细信息,并在表属性。使用密钥执行avroSchema,然后为该值输入架构 JSON 对象,如以下屏幕截图所示。

      
                                    在表属性标题中,有两列文本字段。左侧列标题为密钥,右侧列标题为值。第一行中的键/值对是分类/avro。第二行中的键/值对是 Avroschema/ {"类型”: "数组”, "项目”: "字符串"}。
    • 要在 ETL 脚本中指定架构,请修改datasource0赋值语句并添加avroSchema键到additional_options参数,如以下 Python 和 Scala 示例所示。

      Python
      SCHEMA_STRING = ‘{"type":"array","items":"string"}’ datasource0 = glueContext.create_data_frame.from_catalog(database = "database", table_name = "table_name", transformation_ctx = "datasource0", additional_options = {"startingPosition": "TRIM_HORIZON", "inferSchema": "false", "avroSchema": SCHEMA_STRING})
      Scala
      val SCHEMA_STRING = """{"type":"array","items":"string"}""" val datasource0 = glueContext.getCatalogSource(database = "database", tableName = "table_name", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions(s"""{"startingPosition": "TRIM_HORIZON", "inferSchema": "false", "avroSchema":"$SCHEMA_STRING"}""")).getDataFrame()

将 Grok 模式应用于流源

您可以为日志数据源创建流式 ETL 作业,并使用 Grok 模式将日志转换为结构化数据。然后,ETL 作业将数据作为结构化数据源进行处理。在为流源创建 “数据目录” 表时,可以指定要应用的 Grok 模式。

有关 Grok 模式和自定义模式字符串值的信息,请参阅编写 Grok 自定义分类器

将 Grok 模式添加到数据目录表(控制台)

  • 使用创建表向导,并使用为流式处理源创建数据目录表。将数据格式指定为 Grok,填写Grok 模式字段中添加自定义模式,并可选择在自定义模式(可选)

    
                            *

    Enter在每个自定义模式之后。

要将 Grok 模式添加到数据目录表 (Amazon GlueAPI 或Amazon CLI)

  • 添加GrokPattern参数,并且可以选择CustomPatterns参数添加到CreateTableAPI 操作或create_tableCLI 命令。

    "Parameters": { ... "grokPattern": "string", "grokCustomPatterns": "string", ... },

    ExpressgrokCustomPatterns作为字符串,并使用 “\ n” 作为模式之间的分隔符。

    下面是指定这些参数的示例。

    "parameters": { ... "grokPattern": "%{USERNAME:username} %{DIGIT:digit:int}", "grokCustomPatterns": "digit \d", ... }

定义流式处理 ETL 作业的作业属性

当您定义流式 ETL 作业时,在Amazon Glue控制台中,提供以下特定于流的属性。有关其他作业属性的说明,请参阅定义 Spark Job 业的作业属性。有关如何使用 Amazon Glue 控制台添加作业的更多信息,请参阅在 Amazon Glue 控制台上处理作业

IAM 角色

指定Amazon Identity and Access Management(IAM) 角色,用于对运行作业、访问流式处理源和访问目标数据存储所用的资源进行授权。

要访问 Amazon Kinesis Data Streams,请附上AmazonKinesisFullAccess Amazon托管策略添加到角色,或附加类似的 IAM 策略来允许更精细的访问。有关示例策略,请参阅使用 IAM 控制对 Amazon Kinesis Data Streams 资源的访问

有关在 Amazon Glue 中运行作业的权限的更多信息,请参阅管理的访问权限AmazonGlue 资源

类型

选择 Spark streaming (Spark 流式处理)

Amazon Glue version

这些区域有:Amazon Glue版本决定了可用于作业的 Apache Spark 和 Python 或 Scala 版本。为 Glue 版 1.0 或 Glue 版 2.0 选择一个选项,该选项指定可供作业使用的 Python 版本或 Scala 版本。Amazon Glue支持 Python 3 的 2.0 版本是流式传输 ETL 作业的默认设置。

作业超时

(可选)输入持续时间(以分钟为单位)。默认值为空,这意味着作业可能无限期运行。

数据源

指定您在为流式处理源创建数据目录表中创建的表。

数据目标

请执行下列操作之一:

  • 选择 Create tables in your data target (在数据目标中创建表) 并指定以下数据目标属性。

    数据存储

    选择 Amazon S3 或 JDBC。

    格式

    选择任意格式。所有项都支持流式处理。

  • 选择使用数据目录中的表并更新您的数据目标,然后选择 JDBC 数据存储的表。

输出架构定义

请执行下列操作之一:

  • 选择自动检测每条记录的模式以启动模式检测。Amazon Glue从流数据确定架构。

  • 选择指定所有记录的输出方案以使用 “应用映射” 转换来定义输出方案。

Script

(可选)提供您自己的脚本或修改生成的脚本以执行 Apache Spark Structured Streaming 引擎支持的操作。有关可用操作的信息,请参阅流式处理 DataFrame/数据集的操作

流式处理 ETL 注释和限制

请记住以下注释和限制:

  • 使用模式检测时,无法执行流数据的连接。

  • 您的 ETL 脚本可以使用Amazon Glue的内置转换和 Apache Spark Streaming 的原生转换。有关更多信息,请参阅 。对流式处理 DataFrame/ 数据集的操作在阿帕奇火花网站上或内置转换

  • Amazon Glue流式处理 ETL 作业使用检查点来跟踪已读取的数据。因此,停止并重新启动的作业将从流中停止的位置开始。如果要重新处理数据,您可以删除脚本中引用的检查点文件夹。

  • 不支持 Job 书签。

  • 您无法更改 Amazon Kinesis 数据流的分片数,如果Amazon Glue流作业正在运行并使用该流中的数据。先停止作业,修改流分片,然后重新启动作业。

  • 对于 Kinesis Data Streams 的增强扇出功能,您不能将作业注册为使用者。