

# 使用 Lambda 创建和部署 UDF


要创建自定义 UDF，可以通过扩展 `UserDefinedFunctionHandler` 类来创建新的 Java 类。开发工具包中的 [UserDefinedFunctionHandler.java](https://github.com/awslabs/aws-athena-query-federation/blob/master/athena-federation-sdk/src/main/java/com/amazonaws/athena/connector/lambda/handlers/UserDefinedFunctionHandler.java) 的源代码在 GitHub 上的 awslabs/aws-athena-query-federation/athena-federation-sdk [存储库](https://github.com/awslabs/aws-athena-query-federation/tree/master/athena-federation-sdk)中提供，其中还提供有[示例 UDF 实施](https://github.com/awslabs/aws-athena-query-federation/tree/master/athena-udfs)，您可以对其进行检查和修改，以创建自定义 UDF。

本节中的步骤演示从命令行使用 [Apache Maven](https://maven.apache.org/index.html) 编写和构建自定义 UDF Jar 文件和部署。

执行以下步骤以使用 Maven 为 Athena 创建自定义 UDF

1. [克隆开发工具包并准备开发环境](#udf-create-install-sdk-prep-environment)

1. [创建您的 Maven 项目](#create-maven-project)

1. [将依赖项和插件添加到您的 Maven 项目中](#udf-add-maven-dependencies)

1. [为 UDF 编写 Java 代码](#udf-write-java)

1. [构建 JAR 文件](#udf-create-package-jar)

1. [将 JAR 部署到 Amazon Lambda](#udf-create-deploy)

## 克隆开发工具包并准备开发环境


在开始之前，请确保使用 `sudo yum install git -y` 在您的系统上安装 git。

**安装 Amazon 查询联合开发工具包**
+ 在命令行输入以下内容以克隆软件开发工具包存储库。此存储库包括软件开发工具包、示例和一套数据源连接器。有关数据源连接器的更多信息，请参阅[使用 Amazon Athena 联合查询](federated-queries.md)。

  ```
  git clone https://github.com/awslabs/aws-athena-query-federation.git
  ```

**此程序的安装先决条件**

如果您正在使用已安装 Apache Maven、Amazon CLI 和 Amazon Serverless Application Model 构建工具的开发计算机，则可以跳过此步骤。

1. 从克隆时创建的 `aws-athena-query-federation` 目录的根目录下，运行准备开发环境的 [prepare\$1dev\$1env.sh](https://github.com/awslabs/aws-athena-query-federation/blob/master/tools/prepare_dev_env.sh) 脚本。

1. 更新您的 shell 以获取在安装过程中创建的新变量或重新启动终端会话。

   ```
   source ~/.profile
   ```
**重要**  
如果您跳过此步骤，稍后将收到关于 Amazon CLI 或 Amazon SAM 构建工具无法发布 Lambda 函数的错误。

## 创建您的 Maven 项目


运行以下命令以创建您的 Maven 项目。将 *groupId* 替换为组织的唯一 ID，并将 *my-athena-udf* 替换为您的应用程序的名称。有关详细信息，请参阅 Apache Maven 文档中的[如何创建我的第一个 Maven 项目？](https://maven.apache.org/guides/getting-started/index.html#How_do_I_make_my_first_Maven_project)。

```
mvn -B archetype:generate \
-DarchetypeGroupId=org.apache.maven.archetypes \
-DgroupId=groupId \
-DartifactId=my-athena-udfs
```

## 将依赖项和插件添加到您的 Maven 项目中


将以下配置添加到 Maven 项目 `pom.xml` 文件中。要查看示例，请参阅 GitHub 中的 [pom.xml](https://github.com/awslabs/aws-athena-query-federation/blob/master/athena-udfs/pom.xml) 文件。

```
<properties>
    <aws-athena-federation-sdk.version>2022.47.1</aws-athena-federation-sdk.version>
</properties>

<dependencies>
    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-athena-federation-sdk</artifactId>
        <version>${aws-athena-federation-sdk.version}</version>
    </dependency>
</dependencies>
    
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.2.1</version>
            <configuration>
                <createDependencyReducedPom>false</createDependencyReducedPom>
                <filters>
                    <filter>
                        <artifact>*:*</artifact>
                        <excludes>
                            <exclude>META-INF/*.SF</exclude>
                            <exclude>META-INF/*.DSA</exclude>
                            <exclude>META-INF/*.RSA</exclude>
                        </excludes>
                    </filter>
                </filters>
            </configuration>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
```

## 为 UDF 编写 Java 代码


通过扩展 [UserDefinedFunctionHandler.java](https://github.com/awslabs/aws-athena-query-federation/blob/master/athena-federation-sdk/src/main/java/com/amazonaws/athena/connector/lambda/handlers/UserDefinedFunctionHandler.java) 来创建一个新类。在类中编写您的 UDF。

在以下示例中，已在 `MyUserDefinedFunctions` 类中创建了两个用于 UDF、`compress()` 和 `decompress()` 的 Java 方法。

```
*package *com.mycompany.athena.udfs;

public class MyUserDefinedFunctions
        extends UserDefinedFunctionHandler
{
    private static final String SOURCE_TYPE = "MyCompany";

    public MyUserDefinedFunctions()
    {
        super(SOURCE_TYPE);
    }

    /**
     * Compresses a valid UTF-8 String using the zlib compression library.
     * Encodes bytes with Base64 encoding scheme.
     *
     * @param input the String to be compressed
     * @return the compressed String
     */
    public String compress(String input)
    {
        byte[] inputBytes = input.getBytes(StandardCharsets.UTF_8);

        // create compressor
        Deflater compressor = new Deflater();
        compressor.setInput(inputBytes);
        compressor.finish();

        // compress bytes to output stream
        byte[] buffer = new byte[4096];
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(inputBytes.length);
        while (!compressor.finished()) {
            int bytes = compressor.deflate(buffer);
            byteArrayOutputStream.write(buffer, 0, bytes);
        }

        try {
            byteArrayOutputStream.close();
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to close ByteArrayOutputStream", e);
        }

        // return encoded string
        byte[] compressedBytes = byteArrayOutputStream.toByteArray();
        return Base64.getEncoder().encodeToString(compressedBytes);
    }

    /**
     * Decompresses a valid String that has been compressed using the zlib compression library.
     * Decodes bytes with Base64 decoding scheme.
     *
     * @param input the String to be decompressed
     * @return the decompressed String
     */
    public String decompress(String input)
    {
        byte[] inputBytes = Base64.getDecoder().decode((input));

        // create decompressor
        Inflater decompressor = new Inflater();
        decompressor.setInput(inputBytes, 0, inputBytes.length);

        // decompress bytes to output stream
        byte[] buffer = new byte[4096];
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(inputBytes.length);
        try {
            while (!decompressor.finished()) {
                int bytes = decompressor.inflate(buffer);
                if (bytes == 0 && decompressor.needsInput()) {
                    throw new DataFormatException("Input is truncated");
                }
                byteArrayOutputStream.write(buffer, 0, bytes);
            }
        }
        catch (DataFormatException e) {
            throw new RuntimeException("Failed to decompress string", e);
        }

        try {
            byteArrayOutputStream.close();
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to close ByteArrayOutputStream", e);
        }

        // return decoded string
        byte[] decompressedBytes = byteArrayOutputStream.toByteArray();
        return new String(decompressedBytes, StandardCharsets.UTF_8);
    }
}
```

## 构建 JAR 文件


运行 `mvn clean install` 以构建您的项目。成功构建后，将在名为 `artifactId-version.jar` 的项目的 `target` 文件夹中创建一个 JAR 文件，其中 *artifactId* 是您在 Maven 项目中提供的名称，例如 `my-athena-udfs`。

## 将 JAR 部署到 Amazon Lambda


有两个选项可以将代码部署到 Lambda：
+ 使用 Amazon Serverless Application Repository 部署（推荐）
+ 从 JAR 文件创建 Lambda 函数

### 选项 1：部署到 Amazon Serverless Application Repository


将 JAR 文件部署到 Amazon Serverless Application Repository 时，您将创建一个代表应用程序体系结构的 Amazon SAM 模板 YAML 文件。然后，您可以指定此 YAML 文件和一个 Amazon S3 存储桶，在其中上载应用程序的构件并使其对 Amazon Serverless Application Repository 可用。下面的过程使用 [publish.sh](https://github.com/awslabs/aws-athena-query-federation/blob/master/tools/publish.sh) 脚本，该脚本位于您之前克隆的 Athena Query Federation SDK 的 `athena-query-federation/tools` 目录中。

有关更多信息和要求，请参阅《**Amazon Serverless Application Repository 开发人员指南》中的 [发布应用程序](https://docs.amazonaws.cn/serverlessrepo/latest/devguide/serverlessrepo-publishing-applications.html)、《**Amazon Serverless Application Model 开发人员指南》中的 [Amazon SAM 模板概念](https://docs.amazonaws.cn/serverless-application-model/latest/developerguide/serverless-sam-template-basics.html)和 [使用 Amazon SAM CLI 发布无服务器应用程序](https://docs.amazonaws.cn/serverless-application-model/latest/developerguide/serverless-sam-template-publishing-applications.html)。

以下示例演示了 YAML 文件中的参数。将类似的参数添加到 YAML 文件并将其保存在项目目录中。有关完整示例，请参阅 GitHub 中的 [athena-udf.yaml](https://github.com/awslabs/aws-athena-query-federation/blob/master/athena-udfs/athena-udfs.yaml)。

```
Transform: 'AWS::Serverless-2016-10-31'
Metadata:
  'AWS::ServerlessRepo::Application':
    Name: MyApplicationName
    Description: 'The description I write for my application'
    Author: 'Author Name'
    Labels:
      - athena-federation
    SemanticVersion: 1.0.0
Parameters:
  LambdaFunctionName:
    Description: 'The name of the Lambda function that will contain your UDFs.'
    Type: String
  LambdaTimeout:
    Description: 'Maximum Lambda invocation runtime in seconds. (min 1 - 900 max)'
    Default: 900
    Type: Number
  LambdaMemory:
    Description: 'Lambda memory in MB (min 128 - 3008 max).'
    Default: 3008
    Type: Number
Resources:
  ConnectorConfig:
    Type: 'AWS::Serverless::Function'
    Properties:
      FunctionName: !Ref LambdaFunctionName
      Handler: "full.path.to.your.handler. For example, com.amazonaws.athena.connectors.udfs.MyUDFHandler"
      CodeUri: "Relative path to your JAR file. For example, ./target/athena-udfs-1.0.jar"
      Description: "My description of the UDFs that this Lambda function enables."
      Runtime: java8
      Timeout: !Ref LambdaTimeout
      MemorySize: !Ref LambdaMemory
```

将 `publish.sh` 脚本复制到保存 YAML 文件的项目目录中，然后运行以下命令：

```
./publish.sh MyS3Location MyYamlFile
```

例如，如果您的存储桶位置为 `s3://amzn-s3-demo-bucket/mysarapps/athenaudf` 并且您的 YAML 文件已另存为 `my-athena-udfs.yaml`：

```
./publish.sh amzn-s3-demo-bucket/mysarapps/athenaudf my-athena-udfs
```

**创建 Lambda 函数**

1. 在以下位置打开 Lambda 控制台：[https://console.aws.amazon.com/lambda/](https://console.amazonaws.cn/lambda/)，选择 **Create function**（创建函数），然后选择 **Browse serverless app repository**（浏览无服务器应用程序存储库）

1. 选择 **Private applications (私有应用程序)**，在列表中找到您的应用程序，或使用关键词搜索它，然后选择它。

1. 查看并提供应用程序详细信息，然后选择 **Deploy (部署)**。

   您现在可以使用在 Lambda 函数 JAR 文件中定义的方法名称作为 Athena 中的 UDF。

### 选项 2：直接创建 Lambda 函数


您也可以直接使用控制台或 Amazon CLI 创建 Lambda 函数。以下示例演示如何使用 `create-function` Lambda CLI 命令。

```
aws lambda create-function \
 --function-name MyLambdaFunctionName \
 --runtime java8 \
 --role arn:aws:iam::1234567890123:role/my_lambda_role \
 --handler com.mycompany.athena.udfs.MyUserDefinedFunctions \
 --timeout 900 \
 --zip-file fileb://./target/my-athena-udfs-1.0-SNAPSHOT.jar
```