使用 Lambda 创建和部署 UDF - Amazon Athena
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

使用 Lambda 创建和部署 UDF

要创建自定义 UDF,可以通过扩展 UserDefinedFunctionHandler 类来创建新的 Java 类。开发工具包中的 UserDefinedFunctionHandler.java 的源代码在 GitHub 上的 awslabs/aws-athena-query-federation/athena-federation-sdk 存储库中提供,其中还提供有示例 UDF 实施,您可以对其进行检查和修改,以创建自定义 UDF。

本节中的步骤演示从命令行使用 Apache Maven 编写和构建自定义 UDF Jar 文件和部署。

执行以下步骤以使用 Maven 为 Athena 创建自定义 UDF

克隆开发工具包并准备开发环境

在开始之前,请确保使用 sudo yum install git -y 在您的系统上安装 git。

安装 Amazon 查询联合开发工具包
  • 在命令行输入以下内容以克隆软件开发工具包存储库。此存储库包括软件开发工具包、示例和一套数据源连接器。有关数据源连接器的更多信息,请参阅使用 Amazon Athena 联合查询

    git clone https://github.com/awslabs/aws-athena-query-federation.git
此程序的安装先决条件

如果您正在使用已安装 Apache Maven、Amazon CLI 和 Amazon Serverless Application Model 构建工具的开发计算机,则可以跳过此步骤。

  1. 从克隆时创建的 aws-athena-query-federation 目录的根目录下,运行准备开发环境的 prepare_dev_env.sh 脚本。

  2. 更新您的 shell 以获取在安装过程中创建的新变量或重新启动终端会话。

    source ~/.profile
    重要

    如果您跳过此步骤,稍后将收到关于 Amazon CLI 或 Amazon SAM 构建工具无法发布 Lambda 函数的错误。

创建您的 Maven 项目

运行以下命令以创建您的 Maven 项目。将 groupId 替换为组织的唯一 ID,并将 my-athena-udf 替换为您的应用程序的名称。有关详细信息,请参阅 Apache Maven 文档中的如何创建我的第一个 Maven 项目?

mvn -B archetype:generate \ -DarchetypeGroupId=org.apache.maven.archetypes \ -DgroupId=groupId \ -DartifactId=my-athena-udfs

将依赖项和插件添加到您的 Maven 项目中

将以下配置添加到 Maven 项目 pom.xml 文件中。要查看示例,请参阅 GitHub 中的 pom.xml 文件。

<properties> <aws-athena-federation-sdk.version>2022.47.1</aws-athena-federation-sdk.version> </properties> <dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-athena-federation-sdk</artifactId> <version>${aws-athena-federation-sdk.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.1</version> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins> </build>

为 UDF 编写 Java 代码

通过扩展 UserDefinedFunctionHandler.java 来创建一个新类。在类中编写您的 UDF。

在以下示例中,已在 MyUserDefinedFunctions 类中创建了两个用于 UDF、compress()decompress() 的 Java 方法。

*package *com.mycompany.athena.udfs; public class MyUserDefinedFunctions extends UserDefinedFunctionHandler { private static final String SOURCE_TYPE = "MyCompany"; public MyUserDefinedFunctions() { super(SOURCE_TYPE); } /** * Compresses a valid UTF-8 String using the zlib compression library. * Encodes bytes with Base64 encoding scheme. * * @param input the String to be compressed * @return the compressed String */ public String compress(String input) { byte[] inputBytes = input.getBytes(StandardCharsets.UTF_8); // create compressor Deflater compressor = new Deflater(); compressor.setInput(inputBytes); compressor.finish(); // compress bytes to output stream byte[] buffer = new byte[4096]; ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(inputBytes.length); while (!compressor.finished()) { int bytes = compressor.deflate(buffer); byteArrayOutputStream.write(buffer, 0, bytes); } try { byteArrayOutputStream.close(); } catch (IOException e) { throw new RuntimeException("Failed to close ByteArrayOutputStream", e); } // return encoded string byte[] compressedBytes = byteArrayOutputStream.toByteArray(); return Base64.getEncoder().encodeToString(compressedBytes); } /** * Decompresses a valid String that has been compressed using the zlib compression library. * Decodes bytes with Base64 decoding scheme. * * @param input the String to be decompressed * @return the decompressed String */ public String decompress(String input) { byte[] inputBytes = Base64.getDecoder().decode((input)); // create decompressor Inflater decompressor = new Inflater(); decompressor.setInput(inputBytes, 0, inputBytes.length); // decompress bytes to output stream byte[] buffer = new byte[4096]; ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(inputBytes.length); try { while (!decompressor.finished()) { int bytes = decompressor.inflate(buffer); if (bytes == 0 && decompressor.needsInput()) { throw new DataFormatException("Input is truncated"); } byteArrayOutputStream.write(buffer, 0, bytes); } } catch (DataFormatException e) { throw new RuntimeException("Failed to decompress string", e); } try { byteArrayOutputStream.close(); } catch (IOException e) { throw new RuntimeException("Failed to close ByteArrayOutputStream", e); } // return decoded string byte[] decompressedBytes = byteArrayOutputStream.toByteArray(); return new String(decompressedBytes, StandardCharsets.UTF_8); } }

构建 JAR 文件

运行 mvn clean install 以构建您的项目。成功构建后,将在名为 artifactId-version.jar 的项目的 target 文件夹中创建一个 JAR 文件,其中 artifactId 是您在 Maven 项目中提供的名称,例如 my-athena-udfs

将 JAR 部署到 Amazon Lambda

有两个选项可以将代码部署到 Lambda:

  • 使用 Amazon Serverless Application Repository 部署(推荐)

  • 从 JAR 文件创建 Lambda 函数

选项 1:部署到 Amazon Serverless Application Repository

将 JAR 文件部署到 Amazon Serverless Application Repository 时,您将创建一个代表应用程序体系结构的 Amazon SAM 模板 YAML 文件。然后,您可以指定此 YAML 文件和一个 Amazon S3 存储桶,在其中上载应用程序的构件并使其对 Amazon Serverless Application Repository 可用。下面的过程使用 publish.sh 脚本,该脚本位于您之前克隆的 Athena Query Federation SDK 的 athena-query-federation/tools 目录中。

有关更多信息和要求,请参阅《Amazon Serverless Application Repository 开发人员指南》中的 发布应用程序、《Amazon Serverless Application Model 开发人员指南》中的 Amazon SAM 模板概念使用 Amazon SAM CLI 发布无服务器应用程序

以下示例演示了 YAML 文件中的参数。将类似的参数添加到 YAML 文件并将其保存在项目目录中。有关完整示例,请参阅 GitHub 中的 athena-udf.yaml

Transform: 'AWS::Serverless-2016-10-31' Metadata: 'AWS::ServerlessRepo::Application': Name: MyApplicationName Description: 'The description I write for my application' Author: 'Author Name' Labels: - athena-federation SemanticVersion: 1.0.0 Parameters: LambdaFunctionName: Description: 'The name of the Lambda function that will contain your UDFs.' Type: String LambdaTimeout: Description: 'Maximum Lambda invocation runtime in seconds. (min 1 - 900 max)' Default: 900 Type: Number LambdaMemory: Description: 'Lambda memory in MB (min 128 - 3008 max).' Default: 3008 Type: Number Resources: ConnectorConfig: Type: 'AWS::Serverless::Function' Properties: FunctionName: !Ref LambdaFunctionName Handler: "full.path.to.your.handler. For example, com.amazonaws.athena.connectors.udfs.MyUDFHandler" CodeUri: "Relative path to your JAR file. For example, ./target/athena-udfs-1.0.jar" Description: "My description of the UDFs that this Lambda function enables." Runtime: java8 Timeout: !Ref LambdaTimeout MemorySize: !Ref LambdaMemory

publish.sh 脚本复制到保存 YAML 文件的项目目录中,然后运行以下命令:

./publish.sh MyS3Location MyYamlFile

例如,如果您的存储桶位置为 s3://amzn-s3-demo-bucket/mysarapps/athenaudf 并且您的 YAML 文件已另存为 my-athena-udfs.yaml

./publish.sh amzn-s3-demo-bucket/mysarapps/athenaudf my-athena-udfs
创建 Lambda 函数
  1. 在以下位置打开 Lambda 控制台:https://console.aws.amazon.com/lambda/,选择 Create function(创建函数),然后选择 Browse serverless app repository(浏览无服务器应用程序存储库)

  2. 选择 Private applications (私有应用程序),在列表中找到您的应用程序,或使用关键词搜索它,然后选择它。

  3. 查看并提供应用程序详细信息,然后选择 Deploy (部署)

    您现在可以使用在 Lambda 函数 JAR 文件中定义的方法名称作为 Athena 中的 UDF。

选项 2:直接创建 Lambda 函数

您也可以直接使用控制台或 Amazon CLI 创建 Lambda 函数。以下示例演示如何使用 create-function Lambda CLI 命令。

aws lambda create-function \ --function-name MyLambdaFunctionName \ --runtime java8 \ --role arn:aws:iam::1234567890123:role/my_lambda_role \ --handler com.mycompany.athena.udfs.MyUserDefinedFunctions \ --timeout 900 \ --zip-file fileb://./target/my-athena-udfs-1.0-SNAPSHOT.jar