用户定义的函数 - Managed Service for Apache Flink
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

用户定义的函数

用户定义函数 (UDF) 是扩展点,允许您调用查询中无法以其他方式表达的常用逻辑或自定义逻辑。您可以使用 Python 或 Java 或 Scala 等 JVM 语言在 Studio 笔记本中的段落中实现 UDF。您还可以将包含以 JVM 语言实现的 UDF 的外部 JAR 文件添加到您的 Studio 笔记本中。

在实现注册子类UserDefinedFunction(或您自己的抽象类)的抽象类的 JAR 时,请使用 Apache Maven 中提供的作用域、Gradle 中的compileOnly依赖项声明、SBT 中提供的作用域或您的 UDF 项目构建配置中的等效指令。这允许 UDF 源代码针对 Flink API 进行编译,但是 Flink API 类本身并未包含在编译工件中。请参阅 UDF jar 示例中的这个 pom,它在 Maven 项目中符合这样的先决条件。

要使用控制台将 UDF JAR 文件添加到 Studio 笔记本中,请执行以下步骤:

  1. 将您的 UDF JAR 文件上载到 Amazon S3。

  2. 在中Amazon Web Services Management Console,选择用于创建 Studio 笔记本的自定义创建选项。

  3. 按照 Studio 笔记本的创建工作流程进行操作,直到进入配置步骤。

  4. 用户定义的函数部分,选择添加用户定义的函数

  5. 指定实现 UDF 的 JAR 文件或 ZIP 文件的 Amazon S3 位置。

  6. 选择保存更改

要在使用 CreateApplicationAPI 创建新的 Studio 笔记本时添加 UDF JAR,请在CustomArtifactConfiguration数据类型中指定 JAR 位置。要将 UDF JAR 添加到现有 Studio 笔记本中,请调用 UpdateApplicationAPI 操作并在CustomArtifactsConfigurationUpdate数据类型中指定 JAR 位置。或者,您可以使用将 UDF JAR 文件添加Amazon Web Services Management Console到 Studio 笔记本中。

用户定义的函数的注意事项

  • Managed Service for Apache Flink Studio 使用 Apache Zeppelin 的术语,其中笔记本是可以包含多个音符的齐柏林飞艇实例。然后,每个注释可以包含多个段落。通过 Managed Service for Apache Flink Studio,解释器流程可以在笔记本中的所有笔记中共享。因此,如果您在一个注释中使用 Function 执行显式createTemporarySystem函数注册,则可以在同一笔记本的另一个注释中按原样引用相同的函数。

    但是,“部署为应用程序” 操作仅适用于单个笔记,而不是笔记本中的所有笔记。执行部署为应用程序时,仅使用活动注释的内容来生成应用程序。在其他 笔记本 中执行的任何显式函数注册都不是生成的应用程序依赖关系的一部分。此外,在 “部署为应用程序” 选项期间,通过将 JAR 的主类名转换为小写字符串来进行隐式函数注册。

    例如,如果TextAnalyticsUDF是 UDF JAR 的主类,则隐式注册将生成函数名称textanalyticsudf。因此,如果 Studio 注释 1 中的显式函数注册如下所示,那么myNewFuncNameForClass由于共享解释器,该笔记本中的所有其他注释(比如注释 2)都可以按名称引用该函数:

    stenv.createTemporarySystemFunction("myNewFuncNameForClass", new TextAnalyticsUDF())

    但是,在注释 2 中作为应用程序部署操作期间,此显式注册将不包含在依赖项中,因此已部署的应用程序将无法按预期运行。由于采用了隐式注册,因此默认情况下,对该函数的所有引用都应该是 with textanalyticsudf 和 not myNewFuncNameForClass

    如果需要注册自定义函数名,那么注释 2 本身应该包含另一段来执行另一次显式注册,如下所示:

    %flink(parallelism=l) import com.amazonaws.kinesis.udf.textanalytics.TextAnalyticsUDF # re-register the JAR for UDF with custom name stenv.createTemporarySystemFunction("myNewFuncNameForClass", new TextAnalyticsUDF())
    %flink. ssql(type=update, parallelism=1) INSERT INTO table2 SELECT myNewFuncNameForClass(column_name) FROM table1 ;
  • 如果您的 UDF JAR 包含 Flink 开发工具包,请配置您的 Java 项目,以便 UDF 源代码可以针对 Flink 开发工具包进行编译,但是 Flink SDK 类本身并未包含在编译工件(例如 JAR)中。

    您可以在 Apache Maven 中使用provided作用域,在 Gradle 中使用compileOnly依赖关系声明,在 SBT 中使用provided作用域,或者在他们的 UDF 项目构建配置中使用等效指令。您可以从 UDF jar 示例中引用这个 pom,它在 maven 项目中遵循了这样的先决条件。有关完整的 step-by-step 教程,请参阅此使用适用于 Apache Flink、Amazon Translate 和 Amazon Comprehend 的亚马逊托管服务的 SQL 函数翻译、编辑和分析流数据