创建外部函数 - Amazon Redshift
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

创建外部函数

根据 的 创建标量用户定义的函数 (AWS LambdaAmazon RedshiftUDF)。有关Lambda用户定义的 函数的更多信息,请参阅创建标量 Lambda UDF

Syntax

CREATE [ OR REPLACE ] EXTERNAL FUNCTION external_fn_name ( [data_type] [, ...] ) RETURNS data_type { VOLATILE | STABLE | IMMUTABLE } LAMBDA 'lambda_fn_name' IAM_ROLE 'iam-role-arn' RETRY_TIMEOUT milliseconds;

Parameters

OR REPLACE

一个子句,指定如果某个函数与已存在的此函数具有相同的名称和输入参数数据类型或签名,则替换现有函数。您只能将某个函数替换为定义一组相同数据类型的新函数。您必须是超级用户才能替换函数。

如果您定义的函数与现有函数具有相同的名称,但签名不同,则创建新的函数。换而言之,函数名称将会重载。有关更多信息,请参阅重载函数名称

external_fn_name

外部函数的名称。如果您指定架构名称(例如 myschema.myfunction),将使用指定的架构创建 函数。否则,将在当前 schema 中创建该函数。有关有效名称的更多信息,请参阅名称和标识符

我们建议您为所有 UDF 名称添加前缀 f_。 Amazon Redshift 保留 UDF 名称的f_前缀。通过使用 f_ 前缀,您可以帮助确保 UDF 名称Amazon Redshift现在或将来不会与任何内置 SQL 函数名称冲突。有关更多信息,请参阅命名 UDFs

data_type

输入参数的数据类型。有关更多信息,请参阅数据类型

RETURNS data_type

函数返回的值的数据类型。RETURNS 数据类型可以是任何标准的 Amazon Redshift 数据类型。有关更多信息,请参阅Python UDF 数据类型

VOLATILE | STABLE | IMMUTABLE

一个子句,用于向查询优化程序通知函数的不稳定性。

要获得最佳优化,请使用对其有效的最严格稳定性类别来标记函数。但是,如果类别过于严格,优化程序可能会错误地跳过某些调用,从而导致不正确的结果集。按照严格性顺序,从最不严格的开始,稳定性类别如下所示:

  • VOLATILE

    对于相同的参数,函数会对连续的调用返回不同的结果,甚至对于单个语句中的行也是如此。查询优化程序无法对不稳定函数的行为做出任何假设。使用不稳定函数的查询必须重新计算每个输入行的函数。

  • STABLE

    对于相同的参数,可保证函数对在单个语句内处理的所有行返回相同的结果。在不同的语句中调用时,函数可能会返回不同的结果。此类别使优化程序能够将单个语句内对该函数的多个调用优化为对该语句的单个调用。

  • IMMUTABLE

    对于相同的参数,函数始终返回相同的结果。当查询使用常量参数调用 IMMUTABLE 函数时,优化程序会预先计算函数。

LAMBDA 'lambda_fn_name'

Amazon Redshift 调用的 函数的名称。

有关创建 AWS Lambda 函数的步骤,请参阅 中的使用 控制台Lambda创建 AWS Lambda Developer Guide 函数。

有关 Lambda 函数所需权限的信息,请参阅 AWS Lambda 中的AWS Lambda Developer Guide权限

IAM_ROLE 'iam-role-arn'

您的集群用于身份验证和授权的 (IAM) 角色的 Amazon 资源名称 AWS Identity and Access Management (ARN)。CREATE EXTERNAL FUNCTION 命令有权通过此 IAM 角色调用 Lambda 函数。如果您的集群有一个现有 IAM 角色,该角色具有调用附加Lambda函数的权限,您可以替换角色的 ARN。有关更多信息,请参阅配置 Lambda UDFs 的授权参数

以下显示 IAM_ROLE 参数的语法。

IAM_ROLE 'arn:aws:iam::aws-account-id:role/role-name'
RETRY_TIMEOUT 毫秒

用于重试退避Amazon Redshift延迟的总时间(以毫秒为单位)。

Amazon Redshift 执行退避并在重试之间等待一定时间,而不是立即重试任何失败的查询。然后, Amazon Redshift 重试请求以重新运行失败的查询,直到所有延迟的总和等于或超过您指定的 RETRY_TIMEOUT 值。默认值为 20,000 毫秒。

在调用 Lambda 函数时, Amazon Redshift 会重试接收错误(如 TooManyRequestsExceptionEC2ThrottledException和 )的查询ServiceException

您可以将 RETRY_TIMEOUT 参数设置为 0 毫秒,以防止 Lambda UDF 的任何重试。

Examples

以下是使用标量Lambda用户定义的函数 (UDFs的示例。

使用 Node.js Lambda 函数的标量 Lambda UDF 示例

以下示例创建一个名为 的外部函数exfunc_sum,该函数将两个整数作为输入参数。此函数将总和作为整数输出返回。要调用的Lambda函数的名称为 lambda_sum。用于此Lambda函数的语言是 Node.js 12.x。确保指定 IAM 角色。该示例使用 'arn:aws:iam::123456789012:user/johndoe' 作为 IAM 角色。我们建议使用 STABLE 选项。

CREATE EXTERNAL FUNCTION exfunc_sum(INT,INT) RETURNS INT STABLE LAMBDA 'lambda_sum' IAM_ROLE 'arn:aws:iam::123456789012:role/Redshift-Exfunc-Test';

Lambda 函数接收请求负载并对每行进行迭代。将添加单个行中的所有值以计算该行的总和,该行保存在响应数组中。结果数组中的行数与请求负载中接收的行数类似。

JSON 响应负载必须在“results”字段中具有结果数据,以便被外部函数识别。发送到 Lambda 函数的请求中的参数字段包含数据负载。在批处理请求的情况下,数据负载中可以有多个行。以下Lambda函数循环访问请求数据负载中的所有行。它还单独循环访问单个行中的所有值。

exports.handler = async (event) => { // The 'arguments' field in the request sent to the Lambda function contains the data payload. var t1 = event['arguments']; // 'len(t1)' represents the number of rows in the request payload. // The number of results in the response payload should be the same as the number of rows received. const resp = new Array(t1.length); // Iterating over all the rows in the request payload. for (const [i, x] of t1.entries()) { var sum = 0; // Iterating over all the values in a single row. for (const y of x) { sum = sum + y; } resp[i] = sum; } // The 'results' field should contain the results of the lambda call. const response = { results: resp }; return JSON.stringify(response); };

以下示例使用文本值调用外部函数 。

select exfunc_sum(1,2); exfunc_sum ------------ 3 (1 row)

以下示例创建一个名为 t_sum 的表,其中包含整数数据类型的两个列 c1 和 c2并插入两行数据。然后,通过传递此表的列名称来调用外部函数。两个表行作为单个Lambda调用在请求负载的批处理请求中发送。

CREATE TABLE t_sum(c1 int, c2 int); INSERT INTO t_sum VALUES (4,5), (6,7); SELECT exfunc_sum(c1,c2) FROM t_sum; exfunc_sum --------------- 9 13 (2 rows)

使用 RETRY_TIMEOUT 属性的标量 Lambda UDF 示例

在以下部分中,您可以找到如何在 Lambda UDFs 中使用 RETRY_TIMEOUT 属性的示例。

AWS Lambda 函数具有并发限制,您可以为每个函数设置这些限制。有关并发限制的更多信息,请参阅 中的管理 Lambda 函数的并发AWS Lambda Developer Guide,以及 AWS 计算博客上的管理 AWS Lambda 函数并发文章。

当 Lambda UDF 提供的请求数超过并发限制时,新请求会收到 TooManyRequestsException 错误。Lambda UDF 在此错误上重试,直到发送到函数的请求之间的所有延迟总和Lambda等于或超过您设置的 RETRY_TIMEOUT 值。默认 RETRY_TIMEOUT 值为 20000 毫秒。

以下示例使用名为 的Lambda函数exfunc_sleep_3。此函数接收请求负载,迭代每行,并将输入转换为大写。然后,它将休眠 3 秒并返回结果。用于此Lambda函数的语言是 Python 3.8。

结果数组中的行数与请求负载中接收的行数类似。JSON 响应负载必须在 字段中具有结果数据results,才能被外部函数识别。发送到 arguments 函数的请求中的 Lambda 字段包含数据负载。对于批处理请求,数据负载中可能会显示多行。

此函数的并发限制在预留并发中专门设置为 1,以说明 RETRY_TIMEOUT 属性的使用。当 属性设置为 1 时, Lambda 函数一次只能处理一个请求。

import json import time def lambda_handler(event, context): t1 = event['arguments'] # 'len(t1)' represents the number of rows in the request payload. # The number of results in the response payload should be the same as the number of rows received. resp = [None]*len(t1) # Iterating over all rows in the request payload. for i, x in enumerate(t1): # Iterating over all the values in a single row. for j, y in enumerate(x): resp[i] = y.upper() time.sleep(3) ret = dict() ret['results'] = resp ret_json = json.dumps(ret) return ret_json

接下来,两个附加示例说明了 RETRY_TIMEOUT 属性。它们分别调用单个 Lambda UDF。在调用 Lambda UDF 时,每个示例运行相同的 SQL 查询以同时从两个并发数据库会话中调用 Lambda UDF。当调用 Lambda UDF 的第一个查询由 UDF 提供服务时,第二个查询将接收TooManyRequestsException错误。出现此结果的原因是您专门将 UDF 中的预留并发设置为 1。有关如何为 Lambda 函数设置预留并发的信息,请参阅配置预留并发

下面的第一个示例将 Lambda UDF 的 RETRY_TIMEOUT 属性设置为 0 毫秒。如果Lambda请求从 Lambda 函数收到任何异常,则 Amazon Redshift 不会进行任何重试。出现此结果的原因是 RETRY_TIMEOUT 属性设置为 0。

CREATE OR REPLACE EXTERNAL FUNCTION exfunc_upper(varchar) RETURNS varchar STABLE LAMBDA 'exfunc_sleep_3' IAM_ROLE 'arn:aws:iam::123456789012:role/Redshift-Exfunc-Test' RETRY_TIMEOUT 0;

将 RETRY_TIMEOUT 设置为 0 时,您可以从单独的数据库会话运行以下两个查询来查看不同的结果。

第一个使用 Lambda UDF 的 SQL 查询成功运行。

select exfunc_upper('Varchar'); exfunc_upper -------------- VARCHAR (1 row)

第二个查询同时从单独的数据库会话运行,它会收到错误TooManyRequestsException

select exfunc_upper('Varchar'); ERROR: Rate Exceeded.; Exception: TooManyRequestsException; ShouldRetry: 1 DETAIL: ----------------------------------------------- error: Rate Exceeded.; Exception: TooManyRequestsException; ShouldRetry: 1 code: 32103 context:query: 0 location: exfunc_client.cpp:102 process: padbmaster [pid=26384] -----------------------------------------------

下面的第二个示例将 Lambda UDF 的 RETRY_TIMEOUT 属性设置为 3000 毫秒。即使第二个查询同时运行Lambda,UDF 也会重试,直至总延迟时间达到 3000 毫秒。因此,两个查询都会成功运行。

CREATE OR REPLACE EXTERNAL FUNCTION exfunc_upper(varchar) RETURNS varchar STABLE LAMBDA 'exfunc_sleep_3' IAM_ROLE 'arn:aws:iam::123456789012:role/Redshift-Exfunc-Test' RETRY_TIMEOUT 3000;

将 RETRY_TIMEOUT 设置为 3000 毫秒后,您可以从单独的数据库会话运行以下两个查询来查看相同的结果。

成功运行 Lambda UDF 的第一个 SQL 查询运行。

select exfunc_upper('Varchar'); exfunc_upper -------------- VARCHAR (1 row)

第二个查询同时运行Lambda,UDF 重试,直至总延迟时间达到 3000 毫秒。

select exfunc_upper('Varchar'); exfunc_upper -------------- VARCHAR (1 row)

使用 Python Lambda 函数的标量 Lambda UDF 示例

以下示例创建一个名为 的外部函数exfunc_multiplication,该函数对数字进行乘以并返回一个整数。此示例纳入error_msg响应中的 success 和 Lambda 字段。当乘积结果中存在整数溢出并且error_msg消息 设置为 时,成功字段设置为 Integer multiplication overflowfalse。该exfunc_multiplication函数将三个整数作为输入参数,并将和作为整数输出返回。

调用的 Lambda 函数的名称为 lambda_multiplication。用于此Lambda函数的语言是 Python 3.8。确保指定 IAM 角色。

CREATE EXTERNAL FUNCTION exfunc_multiplication(int, int, int) RETURNS INT STABLE LAMBDA 'lambda_multiplication' IAM_ROLE 'arn:aws:iam::123456789012:role/Redshift-Exfunc-Test';

Lambda 函数接收请求负载并迭代每行。单个行中的所有值将相乘以计算该行的结果,结果保存在响应列表中。此示例使用默认设置为 true 的布尔值成功。如果行的乘法结果具有整数溢出,则成功值设置为 false。然后,迭代循环中断。

在创建响应负载时,如果成功值为 false,则以下Lambda函数将 error_msg 字段添加到负载中。它还将错误消息设置为 Integer multiplication overflow。如果成功值为 true,则将结果数据添加到结果字段中。结果数组中的行数(如果有)类似于请求负载中接收的行数。

发送到 Lambda 函数的请求中的参数字段包含数据负载。在批处理请求的情况下,数据负载中可以有多个行。以下Lambda函数循环访问请求数据负载中的所有行,并单独循环访问单个行中的所有值。

import json def lambda_handler(event, context): t1 = event['arguments'] # 'len(t1)' represents the number of rows in the request payload. # The number of results in the response payload should be the same as the number of rows received. resp = [None]*len(t1) # By default success is set to 'True'. success = True # Iterating over all rows in the request payload. for i, x in enumerate(t1): mul = 1 # Iterating over all the values in a single row. for j, y in enumerate(x): mul = mul*y # Check integer overflow. if (mul >= 9223372036854775807 or mul <= -9223372036854775808): success = Falsebreakelse: resp[i] = mul ret = dict() ret['success'] = success if not success: ret['error_msg'] = "Integer multiplication overflow"else: ret['results'] = resp ret_json = json.dumps(ret) return ret_json

以下示例使用文本值调用外部函数 。

SELECT exfunc_multiplication(8, 9, 2); exfunc_multiplication --------------------------- 144 (1 row)

以下示例创建一个名为 t_multi 的表,其中包含整数数据类型的三个列 c1、c2 和 c3。通过传递此表的列名称调用外部函数。数据是以一种导致整数溢出的方式插入的,以显示错误的传播方式。

CREATE TABLE t_multi (c1 int, c2 int, c3 int); INSERT INTO t_multi VALUES (2147483647, 2147483647, 4); SELECT exfunc_multiplication(c1, c2, c3) FROM t_multi; DETAIL: ----------------------------------------------- error: Integer multiplication overflow code: 32004context: context: query: 38 location: exfunc_data.cpp:276 process: query2_16_38 [pid=30494] -----------------------------------------------