使用户定义的函数进行查询 - Amazon Athena
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用户定义的函数进行查询

Amazon Athena 中的用户定义函数 (UDF) 使您可以创建自定义函数来处理记录或记录组。UDF 接受参数,执行工作,然后返回结果。

要在 Athena 中使用 UDF,请在 SQL 查询中的 SELECT 语句之前写入 USING EXTERNAL FUNCTION 子句。SELECT 语句引用 UDF 并定义在查询运行时传递给 UDF 的变量。SQL 查询在调用 UDF 时使用 Java 运行时调用 Lambda 函数。UDF 在 Lambda 函数中定义为 Java 部署包中的方法。可以在同一个 Java 部署包中为 Lambda 函数定义多个 UDF。您还可以在USING EXTERNAL FUNCTION子句。

部署 Athena UDF 的 Lambda 函数有两个选项。您可以直接使用 Lambda 部署函数,也可以使用Amazon Serverless Application Repository。要查找 UDF 的现有 Lambda 函数,您可以搜索公共Amazon Serverless Application Repository或您的私有存储库,然后部署到 Lambda。您还可以创建或修改 Java 源代码,将其打包到 JAR 文件中,然后使用 Lambda 或Amazon Serverless Application Repository。例如 Java 源代码和软件包,帮助您开始使用,请参阅使用 Lambda 创建和部署 UDF。有关 Lambda 的更多信息,请参阅Amazon Lambda开发人员指南。有关 Amazon Serverless Application Repository 的更多信息,请参阅 Amazon Serverless Application Repository 开发人员指南

有关使用 UDF 与 Athena 一起翻译和分析文本的示例,请参阅AmazonMachine Learning 博客文章使用 Amazon Athena、亚马逊 Translate 和 Amazon Comprehend 的 SQL 函数翻译和分析文本,或观看video

注意事项和限制

  • 可用区— Athena UDF 功能在支持 Athena 引擎版本 2 的区域中可用。有关列表Amazon支持 Athena 引擎版本 2 的地区,请参阅Athena 引擎版本 2

  • 内置 Athena 函数— Athena 中的内置 Presto 函数设计为实现高性能。我们建议尽可能使用内置函数而不是 UDF。有关内置函数的更多信息,请参阅 Amazon Athena 中的 Presto 函数

  • 仅标量 UDF— Athena 仅支持标量 UDF,它一次处理一行并返回单个列值。Athena 每次调用 Lambda 时都会将行批量(可能并行)传递给 UDF。在设计 UDF 和查询时,请注意此处理可能对网络流量产生的潜在影响。

  • Java 运行时支持-目前,Athena UDF 支持 Lambda 的 Java 8 和 Java 11 运行时。有关更多信息,请参阅 。使用 Java 构建 Lambda 函数中的Amazon Lambda开发人员指南

  • IAM 权限— 要在 Athena 中运行和创建 UDF 查询语句,则除 Athena 函数之外,还必须允许运行查询的 IAM 委托人执行操作。有关更多信息,请参阅 允许 Amazon Athena 用户定义函数 (UDF) 的 IAM 权限策略示例

  • lambda 配额— Lambda 配额适用于 UDF。有关更多信息,请参阅 。lambda 配额中的Amazon Lambda开发人员指南

  • 已知问题— 有关已知问题的最新列表,请参阅限制和问题在 GitHub 的 awslab/aws-雅典-查询-联合部分中。

Videos

观看以下视频,了解有关在 Athena 中使用 UDF 的更多信息。

视频: Amazon Athena 中的用户定义函数 (UDF) 介绍

以下视频展示了如何使用 Amazon Athena 中的 UDF 来编辑敏感信息。

注意

本视频中的语法是预发行的,但概念是相同的。使用 Athena 引擎版本 2 而不使用AmazonAthenaPreviewFunctionality工作组。

视频: 使用 Amazon Athena 中的 SQL 查询 Translate、分析和编辑文本字段

以下视频展示了如何在 Amazon Athena 中使用 UDF 与其他Amazon服务来翻译和分析文本。

注意

本视频中的语法是预发行的,但概念是相同的。有关正确的语法,请参阅相关博客文章使用 Amazon Athena、亚马逊 Translate 和 Amazon Comprehend 的 SQL 函数 Amazon Translate、编辑和分析文本上的AmazonMachine Learning 博客

UDF 查询语法

USING EXTERNAL FUNCTION 子句指定可由查询中的后续 SELECT 语句引用的 UDF 或多个 UDF。您需要 UDF 的方法名称和托管 UDF 的 Lambda 函数的名称。

Synopsis

USING EXTERNAL FUNCTION UDF_name(variable1 data_type[, variable2 data_type][,...]) RETURNS data_type LAMBDA 'lambda_function' SELECT [...] UDF_name(expression) [...]

Parameters

使用外部函数UDF 名称(变量 1 data_type[,变量 2 data_type] [,...])

UDF 名称指定 UDF 的名称,该名称必须与引用的 Lambda 函数中的 Java 方法对应。EACH可变数据类型指定 UDF 接受作为输入的命名变量及其相应的数据类型。这些区域有:data_type必须是下表中列出的受支持 Athena 数据类型之一,并映射到相应的 Java 数据类型。

Athena 数据类型 Java 数据类型

TIMESTAMP

java.time.LocalDateTime (UTC)

DATE

java.time.LocalDate (UTC)

TINYINT

java.lang.Byte

SMALLINT

java.lang.Short

REAL

java.lang.Float

DOUBLE

java.lang.Double

DECIMAL

java.math.BigDecimal

BIGINT

java.lang.Long

INTEGER

java.lang.Int

VARCHAR

java.lang.String

VARBINARY

byte[]

BOOLEAN

java.lang.Boolean

ARRAY

java.util.List

ROW

java.util.Map<String, Object>

RETURNS data_type

data_type指定 UDF 作为输出返回的 SQL 数据类型。支持上表中列出的 Athena 数据类型。

LAMBDAlambda 函数'

lambda 函数指定运行 UDF 时要调用的 Lambda 函数的名称。

选择 [...]UDF 名称(expression) [...]

这些区域有:SELECT查询,该查询将值传递给 UDF 并返回结果。UDF 名称指定要使用的 UDF,后跟expression,被评估为传递值。传递和返回的值必须与 USING EXTERNAL FUNCTION 子句中为 UDF 指定的相应数据类型匹配。

Examples

例如,基于AthenaUDFHandler.java代码,请参阅 GitHubAmazon Athena UDF 连接器页.

使用 Lambda 创建和部署 UDF

要创建自定义 UDF,可以通过扩展 UserDefinedFunctionHandler 类来创建新的 Java 类。开发工具包中的 UserDefinedFunctionHandler.java 的源代码在 GitHub 上的 awslabs/aws-athena-query-federation/athena-federation-sdk 存储库中提供,其中还提供有示例 UDF 实施,您可以对其进行检查和修改,以创建自定义 UDF。

本节中的步骤演示从命令行使用 Apache Maven 编写和构建自定义 UDF Jar 文件和部署。

克隆软件开发工具包并准备开发环境

在开始之前,请确保使用 sudo yum install git -y 在您的系统上安装 git。

安装 Amazon 查询联合软件开发工具包的步骤

  • 在命令行输入以下内容以克隆软件开发工具包存储库。此存储库包括软件开发工具包、示例和一套数据源连接器。有关数据源连接器的更多信息,请参阅使用 Amazon Athena 联合查询

    git clone https://github.com/awslabs/aws-athena-query-federation.git

此程序的安装先决条件

如果您正在使用已有 Apache Maven 的开发计算机,则Amazon CLI,以及Amazon Serverless Application Model构建工具,您可以跳过此步骤。

  1. 从克隆时创建的 aws-athena-query-federation 目录的根目录下,运行准备开发环境的 prepare_dev_env.sh 脚本。

  2. 更新您的 shell 以获取在安装过程中创建的新变量或重新启动终端会话。

    source ~/.profile
    重要

    如果您跳过此步骤,稍后将收到关于Amazon CLI或者Amazon SAM构建工具无法发布您的 Lambda 函数。

创建您的 Maven 项目

运行以下命令以创建您的 Maven 项目。将 groupId 替换为组织的唯一 ID,并将 my-athena-udf 替换为您的应用程序的名称。有关详细信息,请参阅 Apache Maven 文档中的如何创建我的第一个 Maven 项目?

mvn -B archetype:generate \ -DarchetypeGroupId=org.apache.maven.archetypes \ -DgroupId=groupId \ -DartifactId=my-athena-udfs

将依赖项和插件添加到您的 Maven 项目中

将以下配置添加到 Maven 项目 pom.xml 文件中。要查看示例,请参阅 GitHub 中的 pom.xml 文件。

<properties> <aws-athena-federation-sdk.version>2021.6.1</aws-athena-federation-sdk.version> </properties> <dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-athena-federation-sdk</artifactId> <version>${aws-athena-federation-sdk.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.1</version> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins> </build>

为 UDF 编写 Java 代码

通过扩展 UserDefinedFunctionHandler.java 来创建一个新类。在类中编写您的 UDF。

在以下示例中,已在 MyUserDefinedFunctions 类中创建了两个用于 UDF、compress()decompress() 的 Java 方法。

*package *com.mycompany.athena.udfs; public class MyUserDefinedFunctions extends UserDefinedFunctionHandler { private static final String SOURCE_TYPE = "MyCompany"; public MyUserDefinedFunctions() { super(SOURCE_TYPE); } /** * Compresses a valid UTF-8 String using the zlib compression library. * Encodes bytes with Base64 encoding scheme. * * @param input the String to be compressed * @return the compressed String */ public String compress(String input) { byte[] inputBytes = input.getBytes(StandardCharsets.UTF_8); // create compressor Deflater compressor = new Deflater(); compressor.setInput(inputBytes); compressor.finish(); // compress bytes to output stream byte[] buffer = new byte[4096]; ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(inputBytes.length); while (!compressor.finished()) { int bytes = compressor.deflate(buffer); byteArrayOutputStream.write(buffer, 0, bytes); } try { byteArrayOutputStream.close(); } catch (IOException e) { throw new RuntimeException("Failed to close ByteArrayOutputStream", e); } // return encoded string byte[] compressedBytes = byteArrayOutputStream.toByteArray(); return Base64.getEncoder().encodeToString(compressedBytes); } /** * Decompresses a valid String that has been compressed using the zlib compression library. * Decodes bytes with Base64 decoding scheme. * * @param input the String to be decompressed * @return the decompressed String */ public String decompress(String input) { byte[] inputBytes = Base64.getDecoder().decode((input)); // create decompressor Inflater decompressor = new Inflater(); decompressor.setInput(inputBytes, 0, inputBytes.length); // decompress bytes to output stream byte[] buffer = new byte[4096]; ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(inputBytes.length); try { while (!decompressor.finished()) { int bytes = decompressor.inflate(buffer); if (bytes == 0 && decompressor.needsInput()) { throw new DataFormatException("Input is truncated"); } byteArrayOutputStream.write(buffer, 0, bytes); } } catch (DataFormatException e) { throw new RuntimeException("Failed to decompress string", e); } try { byteArrayOutputStream.close(); } catch (IOException e) { throw new RuntimeException("Failed to close ByteArrayOutputStream", e); } // return decoded string byte[] decompressedBytes = byteArrayOutputStream.toByteArray(); return new String(decompressedBytes, StandardCharsets.UTF_8); } }

构建 JAR 文件

运行 mvn clean install 以构建您的项目。成功构建后,将在名为 artifactId-version.jar 的项目的 target 文件夹中创建一个 JAR 文件,其中 artifactId 是您在 Maven 项目中提供的名称,例如 my-athena-udfs

将 JAR 部署到 Amazon Lambda

有两个选项可以将代码部署到 Lambda:

  • 使用 Amazon Serverless Application Repository 部署(推荐)

  • 从 JAR 文件创建 Lambda 函数

选项 1:部署到 Amazon Serverless Application Repository

将 JAR 文件部署到 Amazon Serverless Application Repository 时,您将创建一个代表应用程序体系结构的 Amazon SAM 模板 YAML 文件。然后,您可以指定此 YAML 文件和一个 Amazon S3 存储桶,在其中上传应用程序的构件并使其对可用Amazon Serverless Application Repository。下面的过程使用publish.sh脚本位于athena-query-federation/tools目录中您之前克隆的 Athena 查询联合 SDK。

有关更多信息和要求,请参阅发布应用程序中的Amazon Serverless Application Repository开发人员指南Amazon SAM模板概念中的Amazon Serverless Application Model开发人员指南, 和发布无服务器应用程序使用Amazon SAMCLI

以下示例演示了 YAML 文件中的参数。将类似的参数添加到 YAML 文件并将其保存在项目目录中。有关完整示例,请参阅 GitHub 中的 athena-udf.yaml

Transform: 'AWS::Serverless-2016-10-31' Metadata: 'AWS::ServerlessRepo::Application': Name: MyApplicationName Description: 'The description I write for my application' Author: 'Author Name' Labels: - athena-federation SemanticVersion: 1.0.0 Parameters: LambdaFunctionName: Description: 'The name of the Lambda function that will contain your UDFs.' Type: String LambdaTimeout: Description: 'Maximum Lambda invocation runtime in seconds. (min 1 - 900 max)' Default: 900 Type: Number LambdaMemory: Description: 'Lambda memory in MB (min 128 - 3008 max).' Default: 3008 Type: Number Resources: ConnectorConfig: Type: 'AWS::Serverless::Function' Properties: FunctionName: !Ref LambdaFunctionName Handler: "full.path.to.your.handler. For example, com.amazonaws.athena.connectors.udfs.MyUDFHandler" CodeUri: "Relative path to your JAR file. For example, ./target/athena-udfs-1.0.jar" Description: "My description of the UDFs that this Lambda function enables." Runtime: java8 Timeout: !Ref LambdaTimeout MemorySize: !Ref LambdaMemory

publish.sh 脚本复制到保存 YAML 文件的项目目录中,然后运行以下命令:

./publish.sh MyS3Location MyYamlFile

例如,如果您的存储桶位置为 s3://mybucket/mysarapps/athenaudf 并且您的 YAML 文件已另存为 my-athena-udfs.yaml

./publish.sh mybucket/mysarapps/athenaudf my-athena-udfs

创建 Lambda 函数

  1. 打开 Lambda 控制台https://console.aws.amazon.com/lambda/中,选择创建函数,然后选择浏览无服务器应用程序库

  2. 选择 Private applications (私有应用程序),在列表中找到您的应用程序,或使用关键词搜索它,然后选择它。

  3. 查看并提供应用程序详细信息,然后选择 Deploy (部署)

    您现在可以使用 Lambda 函数 JAR 文件中定义的方法名称作为 Athena 中的 UDF。

选项 2:直接创建 Lambda 函数

您也可以直接使用控制台或创建 Lambda 函数Amazon CLI。以下示例演示如何使用 Lambdacreate-functionCLI 命令。

aws lambda create-function \ --function-name MyLambdaFunctionName \ --runtime java8 \ --role arn:aws:iam::1234567890123:role/my_lambda_role \ --handler com.mycompany.athena.udfs.MyUserDefinedFunctions \ --timeout 900 \ --zip-file fileb://./target/my-athena-udfs-1.0-SNAPSHOT.jar