利用用户定义的函数 (UDF) - 适用于 Amazon Kinesis Data Analytics·for·SQL 应用程序开发人员指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,建议您使用新的适用于 Apache Flink Studio 的托管服务,而不是使用 Kinesis Data Analytics for SQL 应用程序。Managed Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

利用用户定义的函数 (UDF)

该模式用于演示如何利用 Kinesis Data Analytics-Studio Zeppelin 笔记本中的 UDF 来处理 Kinesis 流中的数据。适用于 Apache Flink Studio 的托管服务使用 Apache Flink 提供高级分析功能,其中包括恰好一次处理语义、事件时间窗口、通过用户定义的函数和客户集成实现的可扩展性、命令式语言支持、应用程序持久状态、水平扩展、多数据源支持、可扩展的集成等。这些对确保数据流处理的准确性、完整性、一致性和可靠性至关重要,也是适用于 SQL 的 Amazon Kinesis Data Analytics 所不具备的功能。

在此示例应用程序中,我们将演示如何利用 KDA-Studio Zeppelin 笔记本中的 UDF 来处理 Kinesis 流中的数据。通过使用适用于 Kinesis Data Analytics 的 Studio 笔记本,您可以实时以交互式方式查询数据流,并使用标准 SQL、Python 和 Scala 轻松构建和运行流处理应用程序。只需在 Amazon Web Services Management Console 中进行几次单击操作,即可启动无服务器笔记本来查询数据流,并在几秒钟内获得结果。有关更多信息,请参阅将 使用适用于 Apache Flink 的 Kinesis Data Analytics Studio 笔记本

在 KDA-SQL 应用程序中用于对数据进行预处理/后处理的 Lambda 函数:

用户定义的函数,用于使用 KDA-Studio Zeppelin 笔记本对数据进行预处理/后处理

用户定义的函数(UDF)

引用用户定义的函数来转换数据流可将常见的业务逻辑重新用于运算符。这可以在适用于 Apache Flink Studio 的托管服务笔记本内完成,也可以作为外部引用的应用程序 jar 文件完成。使用用户定义的函数可以简化您可能对流数据执行的转换或数据扩充。

在你的笔记本中,你需要引用一个简单的 Java 应用程序 jar,它具有匿名化个人电话号码的功能。你也可以编写 Python 或 Scala UDF,以便在笔记本中使用。我们选择了一个 Java 应用程序 jar 来突出将应用程序 jar 导入 Pyflink 笔记本的功能。

环境设置

为遵循本指南以及与您的流数据进行交互,您需要使用 Amazon CloudFormation 脚本启动以下资源:

  • 源和目标 Kinesis Data Streams

  • Glue 数据库

  • IAM 角色

  • 适用于 Apache Flink Studio 的托管服务应用程序

  • Lambda 函数 (启动适用于 Apache Flink Studio 的托管服务应用程序)

  • 执行上述 Lambda 函数的 Lambda 角色

  • 用于调用 Lambda 函数的自定义资源

这里下载 Amazon CloudFormation 模板。

创建 Amazon CloudFormation 堆栈。
  1. 转至 Amazon Web Services Management Console 然后在服务列表下选择 CloudFormation

  2. CloudFormation 页面,选择 堆栈,然后选择以新资源创建堆栈(标准)

  3. 创建堆栈页面上,选择上传模板文件,然后选择您之前下载的 kda-flink-udf.yml。上传文件,然后选择 下一步

  4. 为模板指定一个名称 (比如 kinesis-UDF),以便记忆。如果您想更改名称,请更新输入参数,例如输入流。选择 Next(下一步)。

  5. 配置堆栈选项页面上,根据需要添加标签,然后选择下一步

  6. 查看页面上,选中允许创建 IAM 资源的复选框,然后选择提交

Amazon CloudFormation 堆栈可能需要 10 到 15 分钟才能启动,具体取决于您要启动的区域。看到整个堆栈处于 CREATE_COMPLETE 状态后,继续操作。

使用适用于 Apache Flink Studio 的托管服务笔记本

通过使用适用于 Kinesis Data Analytics 的 Studio 笔记本,您可以实时以交互式方式查询数据流,并使用标准 SQL、Python 和 Scala 轻松构建和运行流处理应用程序。只需在 Amazon Web Services Management Console 中进行几次单击操作,即可启动无服务器笔记本来查询数据流,并在几秒钟内获得结果。

笔记本是一个基于 Web 的开发环境。通过使用笔记本,您可以获得简单的交互式开发体验以及 Apache Flink 提供的高级数据流处理功能。Studio 笔记本使用由 Apache Zeppelin 提供支持的笔记本,并使用 Apache Flink 作为流处理引擎。Studio 笔记本将这些技术无缝结合,使所有技能组开发人员都可以对数据流进行高级分析。

Apache Zeppelin 为您的 Studio 笔记本提供了一整套分析工具,包括:

  • 数据可视化

  • 将数据导出到文件

  • 控制输出格式以便分析

使用笔记本
  1. 转至 Amazon Web Services Management Console 然后在服务列表下选择 Amazon Kinesis

  2. 在左侧导航页面上,选择 Analytics 应用程序,然后选择 Studio 笔记本

  3. 验证 KinesisDataAnalyticsStudio 笔记本是否正在运行。

  4. 选择笔记本,然后选择在 Apache Zeppelin 中打开

  5. 下载 数据创建器 Zeppelin 笔记本文件,您需要使用该文件读取数据并将数据加载到 Kinesis 流中。

  6. 导入 Data Producer Zeppelin 笔记本。请务必修改笔记本代码中的输入 STREAM_NAMEREGION。输入流名称可以在Amazon CloudFormation堆栈输出中找到。

  7. 选择运行此段落按钮,将示例数据插入输入的 Kinesis 数据流,执行数据创建器笔记本。

  8. 示例数据加载过程中,下载 MaskPhoneNumber 交互式笔记本,它将读取输入数据,匿名化输入流中的电话号码,并将匿名化数据存储到输出流中。

  9. 导入 MaskPhoneNumber-interactive Zeppelin 笔记本。

  10. 执行笔记本中的每个段落。

    1. 在第 1 段中,通过导入用户定义函数来匿名化电话号码。

      %flink(parallelism=1) import com.mycompany.app.MaskPhoneNumber stenv.registerFunction("MaskPhoneNumber", new MaskPhoneNumber())
    2. 在下一段中,通过创建一个内存表来读取输入流数据。确保流名称和 Amazon 区域正确无误。

      %flink.ssql(type=update) DROP TABLE IF EXISTS customer_reviews; CREATE TABLE customer_reviews ( customer_id VARCHAR, product VARCHAR, review VARCHAR, phone VARCHAR ) WITH ( 'connector' = 'kinesis', 'stream' = 'KinesisUDFSampleInputStream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json');
    3. 检查数据是否已加载到内存表中。

      %flink.ssql(type=update) select * from customer_reviews
    4. 调用用户定义的函数对电话号码进行匿名化处理。

      %flink.ssql(type=update) select customer_id, product, review, MaskPhoneNumber('mask_phone', phone) as phoneNumber from customer_reviews
    5. 电话号码屏蔽后,创建一个屏蔽号码的视图。

      %flink.ssql(type=update) DROP VIEW IF EXISTS sentiments_view; CREATE VIEW sentiments_view AS select customer_id, product, review, MaskPhoneNumber('mask_phone', phone) as phoneNumber from customer_reviews
    6. 验证数据。

      %flink.ssql(type=update) select * from sentiments_view
    7. 为输出的 Kinesis 流创建内存表。确保流名称和 Amazon 区域正确无误。

      %flink.ssql(type=update) DROP TABLE IF EXISTS customer_reviews_stream_table; CREATE TABLE customer_reviews_stream_table ( customer_id VARCHAR, product VARCHAR, review VARCHAR, phoneNumber varchar ) WITH ( 'connector' = 'kinesis', 'stream' = 'KinesisUDFSampleOutputStream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'TRIM_HORIZON', 'format' = 'json');
    8. 在目标 Kinesis 流中插入更新的记录。

      %flink.ssql(type=update) INSERT INTO customer_reviews_stream_table SELECT customer_id, product, review, phoneNumber FROM sentiments_view
    9. 查看和验证来自目标 Kinesis 流的数据。

      %flink.ssql(type=update) select * from customer_reviews_stream_table

将笔记本发布为应用程序

现在,您已经以交互方式对笔记本代码进行了测试,接下来您需要把代码部署为具有持久状态的流应用程序。您需要先修改应用程序配置,以便在 Amazon S3 中为您的代码指定一个位置。

  1. 在 Amazon Web Services Management Console,选择您的笔记本,然后在部署为应用程序配置 (可选) 中,选择编辑

  2. Amazon S3 中代码目标项下,选择通过Amazon CloudFormation 脚本创建的 Amazon S3 存储桶。此过程可能耗时数分钟。

  3. 您将无法按原样发布笔记。如果尝试按原样发布,您将遇到不支持 Select 语句的报错。要避免出现此问题,请下载 MaskPhoneNumber 流式处理Zeppelin 笔记本

  4. 导入 MaskPhoneNumber-streaming Zeppelin 笔记本。

  5. 打开笔记,然后选择 KinesisDataAnalyticsStudio 操作

  6. 选择构建 maskPhoneNumber 流式处理并导出到 S3。确保重命名应用程序名称,请勿使用任何特殊字符。

  7. 选择构建并导出。设置流应用程序将花费数分钟。

  8. 构建完成后,选择使用 Amazon 控制台部署

  9. 在下一页,查看设置并确保选择正确的 IAM 角色。接下来,选择创建流应用程序

  10. 几分钟后,您将看到一条消息,提示流应用程序已成功创建。

部署具有持久状态和限制的应用程序相关更多信息,请参阅部署为具有持久状态的应用程序

清除

或者,您现在也可以卸载 Amazon CloudFormation 堆栈。该操作将删除您之前设置的所有服务。