使用用户定义函数进行查询 - Amazon Athena
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

使用用户定义函数进行查询

Amazon Athena 中的用户定义函数 (UDF) 使您可以创建自定义函数来处理记录或记录组。UDF 接受参数,执行工作,然后返回结果。

要在 Athena 中使用 UDF,请在 SQL 查询中的 SELECT 语句之前写入 USING EXTERNAL FUNCTION 子句。SELECT 语句引用 UDF 并定义在查询运行时传递给 UDF 的变量。SQL 查询在调用 UDF 时使用 Java 运行时调用 Lambda 函数。UDF 在 Lambda 函数中定义为 Java 部署包中的方法。可以在同一个 Java 部署包中为某个 Lambda 函数定义多个 UDF。您还可以在 USING EXTERNAL FUNCTION 子句中指定 Lambda 函数的名称。

部署 Athena UDF 的 Lambda 函数有两个选项。您可以直接使用 Lambda 部署函数,也可以使用 Amazon Serverless Application Repository 来部署。要查找 UDF 的现有 Lambda 函数,您可以搜索公共 Amazon Serverless Application Repository 或私有存储库,然后部署到 Lambda。您还可以创建或修改 Java 源代码,将其打包到 JAR 文件中,然后使用 Lambda 或 Amazon Serverless Application Repository 来部署它。有关能够帮助您开始使用的示例 Java 源代码和软件包,请参阅 使用 Lambda 创建和部署 UDF。有关 Lambda 的更多信息,请参阅《Amazon Lambda 开发人员指南》。有关 Amazon Serverless Application Repository 的更多信息,请参阅《Amazon Serverless Application Repository 开发人员指南》。

有关使用 UDF 与 Athena 一起翻译和分析文本的示例,请参阅 Amazon Machine Learning 博客文章《将 SQL 函数与 Amazon Athena、Amazon Translate 和 Amazon Comprehend 结合起来翻译和分析文本》。

有关在 Amazon Athena 中使用 UDF 扩展地理空间查询的示例,请参阅 Amazon 大数据博客中的使用 UDF 和 Amazon Lambda 在 Amazon Athena 中扩展地理空间查询

注意事项和限制

  • 内置 Athena 函数 – Athena 中的内置函数旨在实现高性能。我们建议尽可能使用内置函数而不是 UDF。有关内置函数的更多信息,请参阅 Amazon Athena 中的函数

  • 仅标量 UDF – Athena 仅支持标量 UDF,它将一次处理一行并返回单个列值。每次调用 Lambda 时,Athena 都会将行批量(可能并行)传递给 UDF。在设计 UDF 和查询时,请注意此处理可能对网络流量产生的潜在影响。

  • UDF 处理函数使用缩写格式– 对于 UDF 函数使用缩写格式(而非完整格式)(例如,使用 package.Class 而非 package.Class::method)。

  • UDF 方法必须为小写– UDF 方法必须为小写字母;不允许使用骆驼大小写。

  • UDF 方法需要参数 - UDF 方法必须至少有一个输入参数。尝试调用未定义输入参数的 UDF 会导致运行时异常。UDF 旨在对数据记录执行函数,但是没有参数的 UDF 不接受任何数据,因此会出现异常。

  • Java 运行时支持 – 目前,Athena UDF 支持用于 Lambda 的 Java 8 和 Java 11 运行时。有关更多信息,请参阅《Amazon Lambda 开发人员指南》中的使用 Jav 构建 Lambda 函数

  • IAM 权限 – 要在 Athena 中创建并运行 UDF 查询语句,则除 Athena 函数之外,还必须允许运行查询的 IAM 委托人执行操作。有关更多信息,请参阅 允许 Amazon Athena 用户定义函数(UDF)的 IAM 权限策略示例

  • Lambda 配额 – Lambda 配额适用于 UDF。有关更多信息,请参阅《Amazon Lambda 开发人员指南》中的 Lambda 配额

  • 行级别筛选 - UDF 不支持 Lake Formation 行级别筛选。

  • 视图 – 您不能将视图与 UDF 同时使用。

  • 已知问题 – 有关已知问题的最新列表,请参阅 GitHub 的 awslabs/aws-athena-query-federation 部分中的 Limitations and issues(限制和问题)。

UDF 查询语法

USING EXTERNAL FUNCTION 子句指定可由查询中的后续 SELECT 语句引用的 UDF 或多个 UDF。您需要 UDF 的方法名称和托管 UDF 的 Lambda 函数的名称。您可以使用 Lambda ARN 代替 Lambda 函数名称。在跨账户场景中,需要提供 Lambda ARN。

摘要

USING EXTERNAL FUNCTION UDF_name(variable1 data_type[, variable2 data_type][,...]) RETURNS data_type LAMBDA 'lambda_function_name_or_ARN' [, EXTERNAL FUNCTION UDF_name2(variable1 data_type[, variable2 data_type][,...]) RETURNS data_type LAMBDA 'lambda_function_name_or_ARN'[,...]] SELECT [...] UDF_name(expression) [, UDF_name2(expression)] [...]

参数

USING EXTERNAL FUNCTION UDF_name(variable1data_type[, variable2data_type][,...])

UDF_name 指定 UDF 的名称,该名称必须与引用的 Lambda 函数中的 Java 方法对应。每个 variable data_type 指定一个命名变量,且其相应的数据类型应可为 UDF 接受作为输入。data_type 必须是下表中列出的受支持 Athena 数据类型之一,并映射到相应的 Java 数据类型。

Athena 数据类型 Java 数据类型

TIMESTAMP

java.time.LocalDateTime (UTC)

DATE

java.time.LocalDate (UTC)

TINYINT

java.lang.Byte

SMALLINT

java.lang.Short

REAL

java.lang.Float

DOUBLE

java.lang.Double

DECIMAL(请参阅 RETURNS 注释)

java.math.BigDecimal

BIGINT

java.lang.Long

INTEGER

java.lang.Int

VARCHAR

java.lang.String

VARBINARY

byte[]

BOOLEAN

java.lang.Boolean

ARRAY

java.util.List

ROW

java.util.Map<String, Object>

RETURNS data_type

data_type 指定 UDF 作为输出返回的 SQL 数据类型。上表中列出的 Athena 数据类型均受支持。对于 DECIMAL 数据类型,请使用语法 RETURNS DECIMAL(precision, scale),其中 precision(精度)和 scale(小数位数)是整数。

LAMBDA 'lambda_function'

lambda_function 指定运行 UDF 时要调用的 Lambda 函数的名称。

SELECT [...]UDF_name(expression) [...]

SELECT 查询会将值传递给 UDF 并返回结果。UDF_name 指定要使用的 UDF,后跟一个表达式,该表达式将进行估算以传递值。传递和返回的值必须与 USING EXTERNAL FUNCTION 子句中为 UDF 指定的相应数据类型匹配。

示例

如需获取 GitHub 上基于AthenaUDFHandler.java 代码的示例查询,请参阅 GitHub 的 Amazon Athena UDF connector(Amazon Athena UDF 连接器)页面。

使用 Lambda 创建和部署 UDF

要创建自定义 UDF,可以通过扩展 UserDefinedFunctionHandler 类来创建新的 Java 类。开发工具包中的 UserDefinedFunctionHandler.java 的源代码在 GitHub 上的 awslabs/aws-athena-query-federation/athena-federation-sdk 存储库中提供,其中还提供有示例 UDF 实施,您可以对其进行检查和修改,以创建自定义 UDF。

本节中的步骤演示从命令行使用 Apache Maven 编写和构建自定义 UDF Jar 文件和部署。

克隆开发工具包并准备开发环境

在开始之前,请确保使用 sudo yum install git -y 在您的系统上安装 git。

安装 Amazon 查询联合开发工具包
  • 在命令行输入以下内容以克隆软件开发工具包存储库。此存储库包括软件开发工具包、示例和一套数据源连接器。有关数据源连接器的更多信息,请参阅使用 Amazon Athena 联合查询

    git clone https://github.com/awslabs/aws-athena-query-federation.git
此程序的安装先决条件

如果您正在使用已安装 Apache Maven、Amazon CLI 和 Amazon Serverless Application Model 构建工具的开发计算机,则可以跳过此步骤。

  1. 从克隆时创建的 aws-athena-query-federation 目录的根目录下,运行准备开发环境的 prepare_dev_env.sh 脚本。

  2. 更新您的 shell 以获取在安装过程中创建的新变量或重新启动终端会话。

    source ~/.profile
    重要

    如果您跳过此步骤,稍后将收到关于 Amazon CLI 或 Amazon SAM 构建工具无法发布 Lambda 函数的错误。

创建您的 Maven 项目

运行以下命令以创建您的 Maven 项目。将 groupId 替换为组织的唯一 ID,并将 my-athena-udf 替换为您的应用程序的名称。有关详细信息,请参阅 Apache Maven 文档中的如何创建我的第一个 Maven 项目?

mvn -B archetype:generate \ -DarchetypeGroupId=org.apache.maven.archetypes \ -DgroupId=groupId \ -DartifactId=my-athena-udfs

将依赖项和插件添加到您的 Maven 项目中

将以下配置添加到 Maven 项目 pom.xml 文件中。要查看示例,请参阅 GitHub 中的 pom.xml 文件。

<properties> <aws-athena-federation-sdk.version>2021.6.1</aws-athena-federation-sdk.version> </properties> <dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-athena-federation-sdk</artifactId> <version>${aws-athena-federation-sdk.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.1</version> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins> </build>

为 UDF 编写 Java 代码

通过扩展 UserDefinedFunctionHandler.java 来创建一个新类。在类中编写您的 UDF。

在以下示例中,已在 MyUserDefinedFunctions 类中创建了两个用于 UDF、compress()decompress() 的 Java 方法。

*package *com.mycompany.athena.udfs; public class MyUserDefinedFunctions extends UserDefinedFunctionHandler { private static final String SOURCE_TYPE = "MyCompany"; public MyUserDefinedFunctions() { super(SOURCE_TYPE); } /** * Compresses a valid UTF-8 String using the zlib compression library. * Encodes bytes with Base64 encoding scheme. * * @param input the String to be compressed * @return the compressed String */ public String compress(String input) { byte[] inputBytes = input.getBytes(StandardCharsets.UTF_8); // create compressor Deflater compressor = new Deflater(); compressor.setInput(inputBytes); compressor.finish(); // compress bytes to output stream byte[] buffer = new byte[4096]; ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(inputBytes.length); while (!compressor.finished()) { int bytes = compressor.deflate(buffer); byteArrayOutputStream.write(buffer, 0, bytes); } try { byteArrayOutputStream.close(); } catch (IOException e) { throw new RuntimeException("Failed to close ByteArrayOutputStream", e); } // return encoded string byte[] compressedBytes = byteArrayOutputStream.toByteArray(); return Base64.getEncoder().encodeToString(compressedBytes); } /** * Decompresses a valid String that has been compressed using the zlib compression library. * Decodes bytes with Base64 decoding scheme. * * @param input the String to be decompressed * @return the decompressed String */ public String decompress(String input) { byte[] inputBytes = Base64.getDecoder().decode((input)); // create decompressor Inflater decompressor = new Inflater(); decompressor.setInput(inputBytes, 0, inputBytes.length); // decompress bytes to output stream byte[] buffer = new byte[4096]; ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(inputBytes.length); try { while (!decompressor.finished()) { int bytes = decompressor.inflate(buffer); if (bytes == 0 && decompressor.needsInput()) { throw new DataFormatException("Input is truncated"); } byteArrayOutputStream.write(buffer, 0, bytes); } } catch (DataFormatException e) { throw new RuntimeException("Failed to decompress string", e); } try { byteArrayOutputStream.close(); } catch (IOException e) { throw new RuntimeException("Failed to close ByteArrayOutputStream", e); } // return decoded string byte[] decompressedBytes = byteArrayOutputStream.toByteArray(); return new String(decompressedBytes, StandardCharsets.UTF_8); } }

构建 JAR 文件

运行 mvn clean install 以构建您的项目。成功构建后,将在名为 artifactId-version.jar 的项目的 target 文件夹中创建一个 JAR 文件,其中 artifactId 是您在 Maven 项目中提供的名称,例如 my-athena-udfs

将 JAR 部署到 Amazon Lambda

有两个选项可以将代码部署到 Lambda:

  • 使用 Amazon Serverless Application Repository 部署(推荐)

  • 从 JAR 文件创建 Lambda 函数

选项 1:部署到 Amazon Serverless Application Repository

将 JAR 文件部署到 Amazon Serverless Application Repository 时,您将创建一个代表应用程序体系结构的 Amazon SAM 模板 YAML 文件。然后,您可以指定此 YAML 文件和一个 Amazon S3 存储桶,在其中上载应用程序的构件并使其对 Amazon Serverless Application Repository 可用。下面的过程使用 publish.sh 脚本,该脚本位于您之前克隆的 Athena Query Federation SDK 的 athena-query-federation/tools 目录中。

有关更多信息和要求,请参阅《Amazon Serverless Application Repository 开发人员指南》中的 发布应用程序、《Amazon Serverless Application Model 开发人员指南》中的 Amazon SAM 模板概念使用 Amazon SAM CLI 发布无服务器应用程序

以下示例演示了 YAML 文件中的参数。将类似的参数添加到 YAML 文件并将其保存在项目目录中。有关完整示例,请参阅 GitHub 中的 athena-udf.yaml

Transform: 'AWS::Serverless-2016-10-31' Metadata: 'AWS::ServerlessRepo::Application': Name: MyApplicationName Description: 'The description I write for my application' Author: 'Author Name' Labels: - athena-federation SemanticVersion: 1.0.0 Parameters: LambdaFunctionName: Description: 'The name of the Lambda function that will contain your UDFs.' Type: String LambdaTimeout: Description: 'Maximum Lambda invocation runtime in seconds. (min 1 - 900 max)' Default: 900 Type: Number LambdaMemory: Description: 'Lambda memory in MB (min 128 - 3008 max).' Default: 3008 Type: Number Resources: ConnectorConfig: Type: 'AWS::Serverless::Function' Properties: FunctionName: !Ref LambdaFunctionName Handler: "full.path.to.your.handler. For example, com.amazonaws.athena.connectors.udfs.MyUDFHandler" CodeUri: "Relative path to your JAR file. For example, ./target/athena-udfs-1.0.jar" Description: "My description of the UDFs that this Lambda function enables." Runtime: java8 Timeout: !Ref LambdaTimeout MemorySize: !Ref LambdaMemory

publish.sh 脚本复制到保存 YAML 文件的项目目录中,然后运行以下命令:

./publish.sh MyS3Location MyYamlFile

例如,如果您的存储桶位置为 s3://mybucket/mysarapps/athenaudf 并且您的 YAML 文件已另存为 my-athena-udfs.yaml

./publish.sh mybucket/mysarapps/athenaudf my-athena-udfs
创建 Lambda 函数
  1. 在以下位置打开 Lambda 控制台:https://console.aws.amazon.com/lambda/,选择 Create function(创建函数),然后选择 Browse serverless app repository(浏览无服务器应用程序存储库)

  2. 选择 Private applications (私有应用程序),在列表中找到您的应用程序,或使用关键词搜索它,然后选择它。

  3. 查看并提供应用程序详细信息,然后选择 Deploy (部署)

    您现在可以使用在 Lambda 函数 JAR 文件中定义的方法名称作为 Athena 中的 UDF。

选项 2:直接创建 Lambda 函数

您也可以直接使用控制台或 Amazon CLI 创建 Lambda 函数。以下示例演示如何使用 create-function Lambda CLI 命令。

aws lambda create-function \ --function-name MyLambdaFunctionName \ --runtime java8 \ --role arn:aws:iam::1234567890123:role/my_lambda_role \ --handler com.mycompany.athena.udfs.MyUserDefinedFunctions \ --timeout 900 \ --zip-file fileb://./target/my-athena-udfs-1.0-SNAPSHOT.jar