使用用户定义函数进行查询
Amazon Athena 中的用户定义函数 (UDF) 使您可以创建自定义函数来处理记录或记录组。UDF 接受参数,执行工作,然后返回结果。
要在 Athena 中使用 UDF,请在 SQL 查询中的 SELECT
语句之前写入 USING EXTERNAL FUNCTION
子句。SELECT
语句引用 UDF 并定义在查询运行时传递给 UDF 的变量。SQL 查询在调用 UDF 时使用 Java 运行时调用 Lambda 函数。UDF 在 Lambda 函数中定义为 Java 部署包中的方法。可以在同一个 Java 部署包中为某个 Lambda 函数定义多个 UDF。您还可以在 USING EXTERNAL FUNCTION
子句中指定 Lambda 函数的名称。
部署 Athena UDF 的 Lambda 函数有两个选项。您可以直接使用 Lambda 部署函数,也可以使用 Amazon Serverless Application Repository 来部署。要查找 UDF 的现有 Lambda 函数,您可以搜索公共 Amazon Serverless Application Repository 或私有存储库,然后部署到 Lambda。您还可以创建或修改 Java 源代码,将其打包到 JAR 文件中,然后使用 Lambda 或 Amazon Serverless Application Repository 来部署它。有关能够帮助您开始使用的示例 Java 源代码和软件包,请参阅 使用 Lambda 创建和部署 UDF。有关 Lambda 的更多信息,请参阅《Amazon Lambda 开发人员指南》。有关 Amazon Serverless Application Repository 的更多信息,请参阅《Amazon Serverless Application Repository 开发人员指南》。
有关使用 UDF 与 Athena 一起翻译和分析文本的示例,请参阅 Amazon Machine Learning 博客文章《将 SQL 函数与 Amazon Athena、Amazon Translate 和 Amazon Comprehend 结合起来翻译和分析文本
有关在 Amazon Athena 中使用 UDF 扩展地理空间查询的示例,请参阅 Amazon 大数据博客中的使用 UDF 和 Amazon Lambda 在 Amazon Athena 中扩展地理空间查询
注意事项和限制
-
内置 Athena 函数 – Athena 中的内置函数旨在实现高性能。我们建议尽可能使用内置函数而不是 UDF。有关内置函数的更多信息,请参阅 Amazon Athena 中的函数。
-
仅标量 UDF – Athena 仅支持标量 UDF,它将一次处理一行并返回单个列值。每次调用 Lambda 时,Athena 都会将行批量(可能并行)传递给 UDF。在设计 UDF 和查询时,请注意此处理可能对网络流量产生的潜在影响。
-
UDF 处理函数使用缩写格式– 对于 UDF 函数使用缩写格式(而非完整格式)(例如,使用
package.Class
而非package.Class::method
)。 -
UDF 方法必须为小写– UDF 方法必须为小写字母;不允许使用骆驼大小写。
-
UDF 方法需要参数 - UDF 方法必须至少有一个输入参数。尝试调用未定义输入参数的 UDF 会导致运行时异常。UDF 旨在对数据记录执行函数,但是没有参数的 UDF 不接受任何数据,因此会出现异常。
-
Java 运行时支持 – 目前,Athena UDF 支持用于 Lambda 的 Java 8 和 Java 11 运行时。有关更多信息,请参阅《Amazon Lambda 开发人员指南》中的使用 Jav 构建 Lambda 函数。
-
IAM 权限 – 要在 Athena 中创建并运行 UDF 查询语句,则除 Athena 函数之外,还必须允许运行查询的 IAM 委托人执行操作。有关更多信息,请参阅 允许 Amazon Athena 用户定义函数(UDF)的 IAM 权限策略示例。
-
Lambda 配额 – Lambda 配额适用于 UDF。有关更多信息,请参阅《Amazon Lambda 开发人员指南》中的 Lambda 配额。
-
行级别筛选 - UDF 不支持 Lake Formation 行级别筛选。
-
视图 – 您不能将视图与 UDF 同时使用。
-
已知问题 – 有关已知问题的最新列表,请参阅 GitHub 的 awslabs/aws-athena-query-federation 部分中的 Limitations and issues
(限制和问题)。
UDF 查询语法
USING EXTERNAL FUNCTION
子句指定可由查询中的后续 SELECT
语句引用的 UDF 或多个 UDF。您需要 UDF 的方法名称和托管 UDF 的 Lambda 函数的名称。您可以使用 Lambda ARN 代替 Lambda 函数名称。在跨账户场景中,需要提供 Lambda ARN。
摘要
USING EXTERNAL FUNCTION UDF_name
(variable1
data_type
[, variable2
data_type
][,...])
RETURNS data_type
LAMBDA 'lambda_function_name_or_ARN
'
[, EXTERNAL FUNCTION UDF_name2
(variable1
data_type
[, variable2
data_type
][,...])
RETURNS data_type
LAMBDA 'lambda_function_name_or_ARN
'[,...]]
SELECT [...] UDF_name
(expression
) [, UDF_name2
(expression
)] [...]
参数
- USING EXTERNAL FUNCTION
UDF_name
(variable1
data_type
[,variable2
data_type
][,...]) -
UDF_name
指定 UDF 的名称,该名称必须与引用的 Lambda 函数中的 Java 方法对应。每个variable data_type
指定一个命名变量,且其相应的数据类型应可为 UDF 接受作为输入。data_type
必须是下表中列出的受支持 Athena 数据类型之一,并映射到相应的 Java 数据类型。Athena 数据类型 Java 数据类型 TIMESTAMP
java.time.LocalDateTime (UTC)
DATE
java.time.LocalDate (UTC)
TINYINT
java.lang.Byte
SMALLINT
java.lang.Short
REAL
java.lang.Float
DOUBLE
java.lang.Double
DECIMAL(请参阅
RETURNS
注释)java.math.BigDecimal
BIGINT
java.lang.Long
INTEGER
java.lang.Int
VARCHAR
java.lang.String
VARBINARY
byte[]
BOOLEAN
java.lang.Boolean
ARRAY
java.util.List
ROW
java.util.Map<String, Object>
- RETURNS
data_type
-
data_type
指定 UDF 作为输出返回的 SQL 数据类型。上表中列出的 Athena 数据类型均受支持。对于DECIMAL
数据类型,请使用语法RETURNS DECIMAL(
,其中precision
,scale
)precision
(精度)和scale
(小数位数)是整数。 - LAMBDA '
lambda_function
' -
lambda_function
指定运行 UDF 时要调用的 Lambda 函数的名称。 - SELECT [...]
UDF_name
(expression
) [...] -
SELECT
查询会将值传递给 UDF 并返回结果。UDF_name
指定要使用的 UDF,后跟一个表达式
,该表达式将进行估算以传递值。传递和返回的值必须与USING EXTERNAL FUNCTION
子句中为 UDF 指定的相应数据类型匹配。
示例
如需获取 GitHub 上基于AthenaUDFHandler.java
使用 Lambda 创建和部署 UDF
要创建自定义 UDF,可以通过扩展 UserDefinedFunctionHandler
类来创建新的 Java 类。开发工具包中的 UserDefinedFunctionHandler.java
本节中的步骤演示从命令行使用 Apache Maven
使用 Maven 为 Athena 创建自定义 UDF 的步骤
克隆开发工具包并准备开发环境
在开始之前,请确保使用 sudo
yum install git -y
在您的系统上安装 git。
安装 Amazon 查询联合开发工具包
-
在命令行输入以下内容以克隆软件开发工具包存储库。此存储库包括软件开发工具包、示例和一套数据源连接器。有关数据源连接器的更多信息,请参阅使用 Amazon Athena 联合查询。
git clone https://github.com/awslabs/aws-athena-query-federation.git
此程序的安装先决条件
如果您正在使用已安装 Apache Maven、Amazon CLI 和 Amazon Serverless Application Model 构建工具的开发计算机,则可以跳过此步骤。
-
从克隆时创建的
aws-athena-query-federation
目录的根目录下,运行准备开发环境的 prepare_dev_env.sh脚本。 -
更新您的 shell 以获取在安装过程中创建的新变量或重新启动终端会话。
source ~/.profile
重要
如果您跳过此步骤,稍后将收到关于 Amazon CLI 或 Amazon SAM 构建工具无法发布 Lambda 函数的错误。
创建您的 Maven 项目
运行以下命令以创建您的 Maven 项目。将 groupId
替换为组织的唯一 ID,并将 my-athena-udf
替换为您的应用程序的名称。有关详细信息,请参阅 Apache Maven 文档中的如何创建我的第一个 Maven 项目?
mvn -B archetype:generate \ -DarchetypeGroupId=org.apache.maven.archetypes \ -DgroupId=
groupId
\ -DartifactId=my-athena-udfs
将依赖项和插件添加到您的 Maven 项目中
将以下配置添加到 Maven 项目 pom.xml
文件中。要查看示例,请参阅 GitHub 中的 pom.xml
<properties> <aws-athena-federation-sdk.version>2021.6.1</aws-athena-federation-sdk.version> </properties> <dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-athena-federation-sdk</artifactId> <version>${aws-athena-federation-sdk.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.1</version> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
为 UDF 编写 Java 代码
通过扩展 UserDefinedFunctionHandler.java
在以下示例中,已在 MyUserDefinedFunctions
类中创建了两个用于 UDF、compress()
和 decompress()
的 Java 方法。
*package *com.mycompany.athena.udfs; public class MyUserDefinedFunctions extends UserDefinedFunctionHandler { private static final String SOURCE_TYPE = "MyCompany"; public MyUserDefinedFunctions() { super(SOURCE_TYPE); } /** * Compresses a valid UTF-8 String using the zlib compression library. * Encodes bytes with Base64 encoding scheme. * * @param input the String to be compressed * @return the compressed String */ public String compress(String input) { byte[] inputBytes = input.getBytes(StandardCharsets.UTF_8); // create compressor Deflater compressor = new Deflater(); compressor.setInput(inputBytes); compressor.finish(); // compress bytes to output stream byte[] buffer = new byte[4096]; ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(inputBytes.length); while (!compressor.finished()) { int bytes = compressor.deflate(buffer); byteArrayOutputStream.write(buffer, 0, bytes); } try { byteArrayOutputStream.close(); } catch (IOException e) { throw new RuntimeException("Failed to close ByteArrayOutputStream", e); } // return encoded string byte[] compressedBytes = byteArrayOutputStream.toByteArray(); return Base64.getEncoder().encodeToString(compressedBytes); } /** * Decompresses a valid String that has been compressed using the zlib compression library. * Decodes bytes with Base64 decoding scheme. * * @param input the String to be decompressed * @return the decompressed String */ public String decompress(String input) { byte[] inputBytes = Base64.getDecoder().decode((input)); // create decompressor Inflater decompressor = new Inflater(); decompressor.setInput(inputBytes, 0, inputBytes.length); // decompress bytes to output stream byte[] buffer = new byte[4096]; ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(inputBytes.length); try { while (!decompressor.finished()) { int bytes = decompressor.inflate(buffer); if (bytes == 0 && decompressor.needsInput()) { throw new DataFormatException("Input is truncated"); } byteArrayOutputStream.write(buffer, 0, bytes); } } catch (DataFormatException e) { throw new RuntimeException("Failed to decompress string", e); } try { byteArrayOutputStream.close(); } catch (IOException e) { throw new RuntimeException("Failed to close ByteArrayOutputStream", e); } // return decoded string byte[] decompressedBytes = byteArrayOutputStream.toByteArray(); return new String(decompressedBytes, StandardCharsets.UTF_8); } }
构建 JAR 文件
运行 mvn clean install
以构建您的项目。成功构建后,将在名为
的项目的 artifactId
-version
.jartarget
文件夹中创建一个 JAR 文件,其中 artifactId
是您在 Maven 项目中提供的名称,例如 my-athena-udfs
。
将 JAR 部署到 Amazon Lambda
有两个选项可以将代码部署到 Lambda:
-
使用 Amazon Serverless Application Repository 部署(推荐)
-
从 JAR 文件创建 Lambda 函数
选项 1:部署到 Amazon Serverless Application Repository
将 JAR 文件部署到 Amazon Serverless Application Repository 时,您将创建一个代表应用程序体系结构的 Amazon SAM 模板 YAML 文件。然后,您可以指定此 YAML 文件和一个 Amazon S3 存储桶,在其中上载应用程序的构件并使其对 Amazon Serverless Application Repository 可用。下面的过程使用 publish.shathena-query-federation/tools
目录中。
有关更多信息和要求,请参阅《Amazon Serverless Application Repository 开发人员指南》中的 发布应用程序、《Amazon Serverless Application Model 开发人员指南》中的 Amazon SAM 模板概念和 使用 Amazon SAM CLI 发布无服务器应用程序。
以下示例演示了 YAML 文件中的参数。将类似的参数添加到 YAML 文件并将其保存在项目目录中。有关完整示例,请参阅 GitHub 中的 athena-udf.yaml
Transform: 'AWS::Serverless-2016-10-31' Metadata: 'AWS::ServerlessRepo::Application': Name:
MyApplicationName
Description: 'The description I write for my application
' Author: 'Author Name
' Labels: - athena-federation SemanticVersion: 1.0.0 Parameters: LambdaFunctionName: Description: 'The name of the Lambda function that will contain your UDFs.
' Type: String LambdaTimeout: Description: 'Maximum Lambda invocation runtime in seconds. (min 1 - 900 max)' Default: 900 Type: Number LambdaMemory: Description: 'Lambda memory in MB (min 128 - 3008 max).' Default: 3008 Type: Number Resources: ConnectorConfig: Type: 'AWS::Serverless::Function' Properties: FunctionName: !Ref LambdaFunctionName Handler: "full.path.to.your.handler. For example, com.amazonaws.athena.connectors.udfs.MyUDFHandler
" CodeUri: "Relative path to your JAR file. For example, ./target/athena-udfs-1.0.jar
" Description: "My description of the UDFs that this Lambda function enables.
" Runtime: java8 Timeout: !Ref LambdaTimeout MemorySize: !Ref LambdaMemory
将 publish.sh
脚本复制到保存 YAML 文件的项目目录中,然后运行以下命令:
./publish.sh
MyS3Location
MyYamlFile
例如,如果您的存储桶位置为 s3://mybucket/mysarapps/athenaudf
并且您的 YAML 文件已另存为 my-athena-udfs.yaml
:
./publish.sh mybucket/mysarapps/athenaudf my-athena-udfs
创建 Lambda 函数
-
在以下位置打开 Lambda 控制台:https://console.aws.amazon.com/lambda/
,选择 Create function(创建函数),然后选择 Browse serverless app repository(浏览无服务器应用程序存储库) -
选择 Private applications (私有应用程序),在列表中找到您的应用程序,或使用关键词搜索它,然后选择它。
-
查看并提供应用程序详细信息,然后选择 Deploy (部署)。
您现在可以使用在 Lambda 函数 JAR 文件中定义的方法名称作为 Athena 中的 UDF。
选项 2:直接创建 Lambda 函数
您也可以直接使用控制台或 Amazon CLI 创建 Lambda 函数。以下示例演示如何使用 create-function
Lambda CLI 命令。
aws lambda create-function \ --function-name
MyLambdaFunctionName
\ --runtime java8 \ --role arn:aws:iam::1234567890123:role/my_lambda_role
\ --handlercom.mycompany.athena.udfs.MyUserDefinedFunctions
\ --timeout 900 \ --zip-file fileb://./target/my-athena-udfs-1.0-SNAPSHOT.jar