使用用户定义的函数进行查询(预览) - Amazon Athena
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

如果我们为英文版本指南提供翻译,那么如果存在任何冲突,将以英文版本指南为准。在提供翻译时使用机器翻译。

使用用户定义的函数进行查询(预览)

Amazon Athena 中的用户定义函数 (UDF) 使您可以创建自定义函数来处理记录或记录组。UDF 接受参数,执行工作,然后返回结果。

要在 Athena 中使用 UDF,请在 SQL 查询中的 SELECT 语句之前写入 USING FUNCTION 子句。SELECT 语句引用 UDF 并定义在查询运行时传递给 UDF 的变量。SQL 查询在调用 UDF 时使用 Java 运行时调用 Lambda 函数。UDF 在 Lambda 函数中定义为 Java 部署包中的方法。可以在同一个 Java 部署包中为某个 Lambda 函数定义多个 UDF。您还可以在 USING FUNCTION 子句中指定 Lambda 函数的名称。

部署 Athena UDF 的 Lambda 函数有两个选项。您可以直接使用 Lambda 部署函数,也可以使用 AWS Serverless Application Repository 来部署。要查找 UDF 的现有 Lambda 函数,您可以搜索公共 AWS Serverless Application Repository 或私有存储库,然后部署到 Lambda。您还可以创建或修改 Java 源代码,将其打包到 JAR 文件中,然后使用 Lambda 或 AWS Serverless Application Repository 来部署它。我们提供 Java 源代码和软件包示例,帮助您开始使用。有关 Lambda 的更多信息,请参阅 AWS Lambda Developer Guide。有关 AWS Serverless Application Repository 的更多信息,请参阅 AWS Serverless Application Repository 开发人员指南

注意事项和限制

  • 可用区域 – Athena UDF 功能目前在 美国东部(弗吉尼亚北部)、亚太地区(孟买)、欧洲(爱尔兰) 和 美国西部(俄勒冈) 区域中提供预览版。

  • AmazonAthenaPreviewFunctionality 工作组 – To use this feature in preview, you must create an Athena workgroup named AmazonAthenaPreviewFunctionality and join that workgroup. For more information, see 管理工作组.

  • 内置 Athena 函数 – Athena 中的内置 Presto 函数设计为实现高性能。我们建议尽可能使用内置函数而不是 UDF。有关内置函数的更多信息,请参阅 Amazon Athena 中的 Presto 函数

  • 仅标量 UDF – Athena 仅支持标量 UDF,它一次处理一行并返回单个列值。Athena 每次调用 Lambda 时都会将行批量(可能并行)传递给 UDF。在设计 UDF 和查询时,请注意此处理设计可能对网络流量产生的潜在影响。

  • 仅 Java 运行时 – 目前,Athena UDF 仅支持 Lambda 的 Java 8 运行时。

  • IAM 权限 – 要在包含 UDF 查询语句的 Athena 中运行查询以及创建 UDF 语句,则除 Athena 函数之外,还必须允许运行查询的 IAM 委托人执行操作。有关更多信息,请参阅 允许 Amazon Athena User Defined Functions (UDF) 的示例 IAM 权限策略

  • Lambda 配额 – Lambda 配额适用于 UDF。有关更多信息,请参阅 AWS Lambda Developer Guide 中的 AWS Lambda 配额

  • 已知问题 – 有关已知问题的最新列表,请参阅 Athena Federated Query (Preview) 中的限制和问题

UDF 查询语法

USING FUNCTION 子句指定可由查询中的后续 SELECT 语句引用的 UDF 或多个 UDF。您需要 UDF 的方法名称和托管 UDF 的 Lambda 函数的名称。

摘要

USING FUNCTION UDF_name(variable1 data_type[, variable2 data_type][,...]) RETURNS data_type TYPE LAMBDA_INVOKE WITH (lambda_name = 'my_lambda_function')[, FUNCTION][, ...] SELECT [...] UDF_name(expression) [...]

参数

USING FUNCTION UDF_name(variable1 data_type[, variable2 data_type][,...])

UDF_name 指定 UDF 的名称,该名称必须与引用的 Lambda 函数中的 Java 方法对应。每个 variable data_type 指定一个具有相应数据类型的命名变量,UDF 可以接受该变量作为输入。将 data_type 指定为下表中列出的受支持 Athena 数据类型之一。数据类型必须映射到相应的 Java 数据类型。

Athena 数据类型 Java 数据类型

TIMESTAMP

java.time.LocalDateTime (UTC)

DATE

java.time.LocalDate (UTC)

TINYINT

java.lang.Byte

SMALLINT

java.lang.Short

REAL

java.lang.Float

DOUBLE

java.lang.Double

DECIMAL

java.math.BigDecimal

BIGINT

java.lang.Long

INTEGER

java.lang.Int

VARCHAR

java.lang.String

VARBINARY

byte[]

BOOLEAN

java.lang.Boolean

ARRAY

java.util.List

ROW

java.util.Map<String, Object>

RETURNS data_type TYPE

data_type 指定 UDF 作为输出返回的 SQL 数据类型。支持上表中列出的 Athena 数据类型。

LAMBDA_INVOKE WITH (lambda_name = 'my_lambda_function')

my_lambda_function 指定运行 UDF 时要调用的 Lambda 函数的名称。

SELECT [...] UDF_name(expression) [...]

将值传递给 UDF 并返回结果的 SELECT 查询。UDF_name 指定要使用的 UDF,后跟进行计算以传递值的 expression。传递和返回的值必须与 USING FUNCTION 子句中为 UDF 指定的相应数据类型匹配。

示例

以下示例演示使用 UDF 的查询。Athena 查询示例基于 GitHub 中的 AthenaUDFHandler.java 代码。

例 – 压缩和解压缩字符串

Athena SQL

以下示例演示如何使用名为 MyAthenaUDFLambda 的 Lambda 函数中定义的 compress UDF。

USING FUNCTION compress(col1 VARCHAR) RETURNS VARCHAR TYPE LAMBDA_INVOKE WITH (lambda_name = 'MyAthenaUDFLambda') SELECT compress('StringToBeCompressed');

查询结果返回 ewLLinKzEsPyXdKdc7PLShKLS5OTQEAUrEH9w==

以下示例演示如何使用在同一 Lambda 函数中定义的 decompress UDF。

USING FUNCTION decompress(col1 VARCHAR) RETURNS VARCHAR TYPE LAMBDA_INVOKE WITH (lambda_name = 'MyAthenaUDFLambda') SELECT decompress('ewLLinKzEsPyXdKdc7PLShKLS5OTQEAUrEH9w==');

查询结果返回 StringToBeCompressed

使用 Lambda 创建和部署 UDF

要创建自定义 UDF,可以通过扩展 UserDefinedFunctionHandler 类来创建新的 Java 类。开发工具包中的 UserDefinedFunctionHandler.java 的源代码在 GitHub 上的 awslabs/aws-athena-query-federation/athena-federation-sdk 存储库中提供,其中还提供有示例 UDF 实施,您可以对其进行检查和修改,以创建自定义 UDF。

本节中的步骤演示从命令行使用 Apache Maven 编写和构建自定义 UDF Jar 文件和部署。

克隆软件开发工具包并准备开发环境

在开始之前,请确保使用 sudo yum install git -y 在您的系统上安装 git。

安装 AWS 查询联合软件开发工具包的步骤

  • 在命令行输入以下内容以克隆软件开发工具包存储库。此存储库包括软件开发工具包、示例和一套数据源连接器。有关数据源连接器的更多信息,请参阅使用Amazon Athena Federated Query (Preview)

    git clone https://github.com/awslabs/aws-athena-query-federation.git

此程序的安装先决条件

如果您正在使用已安装 Apache Maven、AWS CLI 和 AWS 无服务器应用程序模型 构建工具的开发计算机,则可以跳过此步骤。

  1. 从克隆时创建的 aws-athena-query-federation 目录的根目录下,运行准备开发环境的 prepare_dev_env.sh 脚本。

  2. 更新您的 shell 以获取在安装过程中创建的新变量或重新启动终端会话。

    source ~/.profile
    重要

    如果您跳过此步骤,稍后将收到关于 AWS CLI 或 AWS SAM 构建工具无法发布 Lambda 函数的错误。

创建您的 Maven 项目

运行以下命令以创建您的 Maven 项目。将 groupId 替换为组织的唯一 ID,并将 my-athena-udf 替换为您的应用程序的名称。有关详细信息,请参阅 Apache Maven 文档中的如何创建我的第一个 Maven 项目?

mvn -B archetype:generate \ -DarchetypeGroupId=org.apache.maven.archetypes \ -DgroupId=groupId \ -DartifactId=my-athena-udfs

将依赖项和插件添加到您的 Maven 项目中

将以下配置添加到 Maven 项目 pom.xml 文件中。要查看示例,请参阅 GitHub 中的 pom.xml 文件。

<properties> <aws-athena-federation-sdk.version>2019.48.1</aws-athena-federation-sdk.version> </properties> <dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-athena-federation-sdk</artifactId> <version>${aws-athena-federation-sdk.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.1</version> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins> </build>

为 UDF 编写 Java 代码

通过扩展 UserDefinedFunctionHandler.java 来创建一个新类。在类中编写您的 UDF。

在以下示例中,已在 MyUserDefinedFunctions 类中创建了两个用于 UDF、compress()decompress() 的 Java 方法。

*package *com.mycompany.athena.udfs; public class MyUserDefinedFunctions extends UserDefinedFunctionHandler { private static final String SOURCE_TYPE = "MyCompany"; public MyUserDefinedFunctions() { super(SOURCE_TYPE); } /** * Compresses a valid UTF-8 String using the zlib compression library. * Encodes bytes with Base64 encoding scheme. * * @param input the String to be compressed * @return the compressed String */ public String compress(String input) { byte[] inputBytes = input.getBytes(StandardCharsets.UTF_8); // create compressor Deflater compressor = new Deflater(); compressor.setInput(inputBytes); compressor.finish(); // compress bytes to output stream byte[] buffer = new byte[4096]; ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(inputBytes.length); while (!compressor.finished()) { int bytes = compressor.deflate(buffer); byteArrayOutputStream.write(buffer, 0, bytes); } try { byteArrayOutputStream.close(); } catch (IOException e) { throw new RuntimeException("Failed to close ByteArrayOutputStream", e); } // return encoded string byte[] compressedBytes = byteArrayOutputStream.toByteArray(); return Base64.getEncoder().encodeToString(compressedBytes); } /** * Decompresses a valid String that has been compressed using the zlib compression library. * Decodes bytes with Base64 decoding scheme. * * @param input the String to be decompressed * @return the decompressed String */ public String decompress(String input) { byte[] inputBytes = Base64.getDecoder().decode((input)); // create decompressor Inflater decompressor = new Inflater(); decompressor.setInput(inputBytes, 0, inputBytes.length); // decompress bytes to output stream byte[] buffer = new byte[4096]; ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(inputBytes.length); try { while (!decompressor.finished()) { int bytes = decompressor.inflate(buffer); if (bytes == 0 && decompressor.needsInput()) { throw new DataFormatException("Input is truncated"); } byteArrayOutputStream.write(buffer, 0, bytes); } } catch (DataFormatException e) { throw new RuntimeException("Failed to decompress string", e); } try { byteArrayOutputStream.close(); } catch (IOException e) { throw new RuntimeException("Failed to close ByteArrayOutputStream", e); } // return decoded string byte[] decompressedBytes = byteArrayOutputStream.toByteArray(); return new String(decompressedBytes, StandardCharsets.UTF_8); } }

构建 JAR 文件

运行 mvn clean install 以构建您的项目。成功构建后,将在名为 artifactId-version.jar 的项目的 target 文件夹中创建一个 JAR 文件,其中 artifactId 是您在 Maven 项目中提供的名称,例如 my-athena-udfs

将 JAR 部署到 AWS Lambda

有两个选项可以将代码部署到 Lambda:

  • 使用 AWS Serverless Application Repository 部署(推荐)

  • 从 JAR 文件创建 Lambda 函数

选项 1:部署到 AWS Serverless Application Repository

将 JAR 文件部署到 AWS Serverless Application Repository 时,您将创建一个代表应用程序体系结构的 AWS SAM 模板 YAML 文件。然后,您可以指定此 YAML 文件和一个 Amazon S3 存储桶,在其中上传应用程序的构件并使其对 AWS Serverless Application Repository 可用。以下过程使用位于之前克隆的 Athena 查询联合软件开发工具包的 athena-query-federation/tools 目录中的 publish.sh 脚本。

有关详细信息和要求,请参阅 AWS Serverless Application Repository 开发人员指南 中的发布应用程序AWS 无服务器应用程序模型 开发人员指南 中的 AWS SAM 模板概念以及使用 AWS SAM CLI 发布无服务器应用程序

以下示例演示了 YAML 文件中的参数。将类似的参数添加到 YAML 文件并将其保存在项目目录中。有关完整示例,请参阅 GitHub 中的 athena-udf.yaml

Transform: 'AWS::Serverless-2016-10-31' Metadata: 'AWS::ServerlessRepo::Application': Name: MyApplicationName Description: 'The description I write for my application' Author: 'Author Name' Labels: - athena-federation SemanticVersion: 1.0.0 Parameters: LambdaFunctionName: Description: 'The name of the Lambda function that will contain your UDFs.' Type: String LambdaTimeout: Description: 'Maximum Lambda invocation runtime in seconds. (min 1 - 900 max)' Default: 900 Type: Number LambdaMemory: Description: 'Lambda memory in MB (min 128 - 3008 max).' Default: 3008 Type: Number Resources: ConnectorConfig: Type: 'AWS::Serverless::Function' Properties: FunctionName: !Ref LambdaFunctionName Handler: "full.path.to.your.handler. For example, com.amazonaws.athena.connectors.udfs.MyUDFHandler" CodeUri: "Relative path to your JAR file. For example, ./target/athena-udfs-1.0.jar" Description: "My description of the UDFs that this Lambda function enables." Runtime: java8 Timeout: !Ref LambdaTimeout MemorySize: !Ref LambdaMemory

publish.sh 脚本复制到保存 YAML 文件的项目目录中,然后运行以下命令:

./publish.sh MyS3Location MyYamlFile

例如,如果您的存储桶位置为 s3://mybucket/mysarapps/athenaudf 并且您的 YAML 文件已另存为 my-athena-udfs.yaml

./publish.sh mybucket/mysarapps/athenaudf my-athena-udfs

创建 Lambda 函数

  1. https://console.amazonaws.cn/lambda/ 下打开 Lambda 控制台,选择 Create function (创建函数),然后选择 Browse serverless app repository (浏览无服务器应用程序存储库)

  2. 选择 Private applications (私有应用程序),在列表中找到您的应用程序,或使用关键词搜索它,然后选择它。

  3. 查看并提供应用程序详细信息,然后选择 Deploy (部署)

    您现在可以使用在 Lambda 函数 JAR 文件中定义的方法名称作为 Athena 中的 UDF。

选项 2:直接创建 Lambda 函数

您也可以直接使用控制台或 AWS CLI 创建 Lambda 函数。以下示例演示如何使用 Lambdacreate-function CLI 命令。

aws lambda create-function \ --function-name MyLambdaFunctionName \ --runtime java8 \ --role arn:aws:iam::1234567890123:role/my_lambda_role \ --handler com.mycompany.athena.udfs.MyUserDefinedFunctions \ --timeout 900 \ --zip-file fileb://./target/my-athena-udfs-1.0-SNAPSHOT.jar