CREATE EXTERNAL FUNCTION - Amazon Redshift
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

CREATE EXTERNAL FUNCTION

根据 Amazon Redshift 的 Amazon Lambda 创建标量用户定义函数 (UDF)。有关 Lambda 用户定义函数的更多信息,请参阅创建标量 Lambda UDF

所需的权限

以下是 CREATE EXTERNAL FUNCTION 所需的权限:

  • Superuser

  • 具有 CREATE [ OR REPLACE ] EXTERNAL FUNCTION 权限的用户

语法

CREATE [ OR REPLACE ] EXTERNAL FUNCTION external_fn_name ( [data_type] [, ...] ) RETURNS data_type { VOLATILE | STABLE } LAMBDA 'lambda_fn_name' IAM_ROLE { default | ‘arn:aws:iam::<Amazon Web Services 账户-id>:role/<role-name>’ RETRY_TIMEOUT milliseconds MAX_BATCH_ROWS count MAX_BATCH_SIZE size [ KB | MB ];

以下是 Amazon Redshift 上的机器学习语法。有关特定于模型的参数的信息,请参阅参数

CREATE [ OR REPLACE ] EXTERNAL FUNCTION external_fn_name ( [data_type] [, ...] ) RETURNS data_type { VOLATILE | STABLE } SAGEMAKER'endpoint_name' IAM_ROLE { default | ‘arn:aws:iam::<Amazon Web Services 账户-id>:role/<role-name>’ };

参数

OR REPLACE

一个子句,指定如果某个函数与已存在的此函数具有相同的名称和输入参数数据类型或签名,则替换现有的函数。您只能将某个函数替换为定义一组相同数据类型的新函数。您必须是超级用户才能替换函数。

如果您定义的函数与现有函数具有相同的名称,但签名不同,则创建新的函数。换而言之,函数名称将会重载。有关更多信息,请参阅 重载函数名称

external_fn_name

外部函数的名称。如果您指定 schema 名称(例如 myschema.myfunction),则使用指定的 schema 创建函数。否则,将在当前 schema 中创建该函数。有关有效名称的更多信息,请参阅名称和标识符

我们建议您将所有 UDF 名称添加前缀 f_。Amazon Redshift 保留 f_ 前缀,用于 UDF 名称。通过使用 f_ 前缀,您可以帮助确保您的 UDF 名称现在或将来不会与 Amazon Redshift 的任何内置 SQL 函数名称发生冲突。有关更多信息,请参阅 对 UDF 命名

data_type

输入参数的数据类型。有关更多信息,请参阅 数据类型

RETURNS data_type

函数返回的值的数据类型。RETURNS 数据类型可以是任何标准的 Amazon Redshift 数据类型。有关更多信息,请参阅 Python UDF 数据类型

VOLATILE | STABLE

通知查询优化程序有关函数的不稳定性。

要获得最大程度的优化,将函数标记为其有效的最严格稳定性类别。按照严格性顺序,从最不严格的开始,稳定性类别如下所示:

  • VOLATILE

  • STABLE

VOLATILE

对于相同的参数,函数会对连续的调用返回不同的结果,甚至对于单个语句中的行也是如此。查询优化器无法对不稳定函数的行为做出假设。使用不稳定函数的查询必须对每个输入重新计算该函数。

STABLE

对于相同的参数,可保证函数对在单个语句内处理的所有后续调用返回相同的结果。在不同的语句中调用时,函数可能会返回不同的结果。通过这一类别,优化器可以减少在单个语句中调用函数的次数。

请注意,如果所选的严格性对函数无效,则存在优化器可能会基于这种严格性跳过某些调用的风险。这可能会导致结果集不正确。

Lambda UDF 目前不支持 IMMUTABLE 子句。

LAMBDA 'lambda_fn_name'

Amazon Redshift 调用的函数的名称。

有关创建 Amazon Lambda 函数的步骤,请参阅《Amazon Lambda 开发人员指南》中的使用控制台创建 Lambda 函数

有关 Lambda 函数所需权限的信息,请参阅《Amazon Lambda 开发人员指南》中的 Amazon Lambda 权限

IAM_ROLE { default | ‘arn:aws:iam::<Amazon Web Services 账户-id>:role/<role-name>

使用默认关键字让 Amazon Redshift 使用 IAM 角色,该角色设置为默认值并在 CREATE EXTERNAL FUNCTION 命令运行时与集群关联。

使用 IAM 角色的 Amazon 资源名称(ARN),您的集群使用该角色进行身份验证和授权。CREATE EXTERNAL FUNCTION 命令被授权通过此 IAM 角色调用 Lambda 函数。如果您的集群具有有权调用所附加 Lambda 函数的现有 IAM 角色,您可以替换您的角色的 ARN。有关更多信息,请参阅 配置 Lambda UDF 的授权参数

以下显示 IAM_ROLE 参数的语法。

IAM_ROLE 'arn:aws:iam::aws-account-id:role/role-name'
RETRY_TIMEOUT 毫秒

Amazon Redshift 用于重试退避延迟的总时间(以毫秒为单位)。

Amazon Redshift 不会立即重试任何失败的查询,而是执行退避并等待一定时间间隔再重试。然后,Amazon Redshift 将重试请求,以重新运行失败的查询,直到所有延迟的总和等于或超过您指定的 RETRY_TIMEOUT 值。默认值为 20000 毫秒。

当调用 Lambda 函数时,Amazon Redshift 会重试接收 TooManyRequestsExceptionEC2ThrottledExceptionServiceException 等错误的查询。

您可以将 RETRY_TIMEOUT 参数设置为 0 毫秒,以防止对 Lambda UDF 进行任何重试。

MAX_BATCH_ROWS count

Amazon Redshift 在单个批处理请求中为单个 lambda 调用发送的最大行数。

此参数的最小值为 1。最大值为 INT_MAX 或 2,147,483,647。

此参数为可选的。默认值为 INT_MAX 或 2,147,483,647。

MAX_BATCH_SIZE size [ KB | MB ]

Amazon Redshift 在单个批处理请求中为单个 lambda 调用发送的数据负载的最大大小。

此参数的最小值为 1 KB。最大值为 5 MB。

此参数的默认值为 5 MB。

KB 和 MB 是可选的。如果您未设置计量单位,则 Amazon Redshift 默认使用 KB。

使用说明

创建 Lambda UDF 时请考虑以下几点:

  • Lambda 函数调用输入参数的顺序不是固定的或有保证的。这可能因正在运行的查询实例而异,具体取决于集群配置。

  • 不能保证函数对每个输入参数应用一次,且只应用一次。Amazon Redshift 和 Amazon Lambda 之间的交互可能会导致使用相同输入的重复调用。

示例

以下是使用标量 Lambda 用户定义函数 (UDF) 的示例。

使用 Node.js Lambda 函数的标量 Lambda UDF 示例

以下示例创建一个名为 exfunc_sum 的外部函数,它将两个整数作为输入参数。此函数以整数输出形式返回总和。要调用的 Lambda 函数的名称为 lambda_sum。用于此 Lambda 函数的语言是 Node.js 12.x。确保指定 IAM 角色。此示例使用 'arn:aws:iam::123456789012:user/johndoe' 作为 IAM 角色。

CREATE EXTERNAL FUNCTION exfunc_sum(INT,INT) RETURNS INT VOLATILE LAMBDA 'lambda_sum' IAM_ROLE 'arn:aws:iam::123456789012:role/Redshift-Exfunc-Test';

Lambda 函数接受请求负载并对每一行进行迭代。将单行中的所有值相加以计算该行的总和,并将其保存在响应数组中。结果数组中的行数与请求负载中接收的行数相似。

JSON 响应负载必须在“results”字段中包含结果数据,才能被外部函数识别。发送到 Lambda 函数的请求中的参数字段包含数据负载。在批处理请求的情况下,数据负载中可能有多行。以下 Lambda 函数对请求数据负载中的所有行进行迭代。它还分别对一行中的所有值进行迭代。

exports.handler = async (event) => { // The 'arguments' field in the request sent to the Lambda function contains the data payload. var t1 = event['arguments']; // 'len(t1)' represents the number of rows in the request payload. // The number of results in the response payload should be the same as the number of rows received. const resp = new Array(t1.length); // Iterating over all the rows in the request payload. for (const [i, x] of t1.entries()) { var sum = 0; // Iterating over all the values in a single row. for (const y of x) { sum = sum + y; } resp[i] = sum; } // The 'results' field should contain the results of the lambda call. const response = { results: resp }; return JSON.stringify(response); };

以下示例使用文本值调用外部函数。

select exfunc_sum(1,2); exfunc_sum ------------ 3 (1 row)

以下示例创建一个名为 t_sum 的表(其中包含整数数据类型的两个列 c1 和 c2)并插入两行数据。然后通过传递此表的列名称来调用外部函数。这两个表行作为单个 Lambda 调用在请求负载中的批处理请求中发送。

CREATE TABLE t_sum(c1 int, c2 int); INSERT INTO t_sum VALUES (4,5), (6,7); SELECT exfunc_sum(c1,c2) FROM t_sum; exfunc_sum --------------- 9 13 (2 rows)

使用 RETRY_TIMEOUT 属性的标量 Lambda UDF 示例

在以下部分中,您可以找到如何在 Lambda UDF 中使用 RETRY_TIMEOUT 属性的示例。

Amazon Lambda 函数具有并发限制,您可以为每个函数设置这些限制。有关并发限制的更多信息,请参阅《Amazon Lambda 开发人员指南》中的为 Lambda 函数管理并发和 Amazon 计算博客上的博客文章 Managing Amazon Lambda Function Concurrency

当 Lambda UDF 服务的请求数超过并发限制时,新请求将接收 TooManyRequestsException 错误。Lambda UDF 会重试此错误,直到发送到 Lambda 函数的请求之间的所有延迟总和等于或超过您设置的 RETRY_TIMEOUT 值。默认的 RETRY_TIMEOUT 值为 20000 毫秒。

下面的示例使用一个名为 exfunc_sleep_3 的 Lambda 函数。此函数接受请求负载,对每一行进行迭代,并将输入转换为大写。然后它会睡眠 3 秒钟并返回结果。此 Lambda 函数使用的语言是 Python 3.8。

结果数组中的行数与请求负载中接收的行数相似。JSON 响应负载必须在 results 字段中包含结果数据,才能被外部函数识别。发送到 Lambda 函数的请求中的 arguments 字段包含数据负载。在批处理请求的情况下,数据负载中可能有多行。

在保留并发中,此函数的并发限制被专门设置为 1,以演示 RETRY_TIMEOUT 属性的使用情况。当属性设置为 1 时,Lambda 函数一次只能处理一个请求。

import json import time def lambda_handler(event, context): t1 = event['arguments'] # 'len(t1)' represents the number of rows in the request payload. # The number of results in the response payload should be the same as the number of rows received. resp = [None]*len(t1) # Iterating over all rows in the request payload. for i, x in enumerate(t1): # Iterating over all the values in a single row. for j, y in enumerate(x): resp[i] = y.upper() time.sleep(3) ret = dict() ret['results'] = resp ret_json = json.dumps(ret) return ret_json

以下两个附加示例对 RETRY_TIMEOUT 属性进行了说明。它们各自调用了一个 Lambda UDF。在调用 Lambda UDF 时,每个示例都运行相同的 SQL 查询,以便同时从两个并发数据库会话调用 Lambda UDF。当第一个调用 Lambda UDF 的查询由 UDF 提供时,第二个查询会收到 TooManyRequestsException 错误。出现此结果是因为您将 UDF 中的保留并发专门设置为 1。有关如何为 Lambda 函数设置保留并发的信息,请参阅配置预留并发

下面的第一个示例将 Lambda UDF 的 RETRY_TIMEOUT 属性设置为 0 毫秒。如果 Lambda 请求收到来自 Lambda 函数的任何异常,则 Amazon Redshift 不会进行任何重试。出现此结果的原因是 RETRY_TIMEOUT 属性被设置为 0。

CREATE OR REPLACE EXTERNAL FUNCTION exfunc_upper(varchar) RETURNS varchar VOLATILE LAMBDA 'exfunc_sleep_3' IAM_ROLE 'arn:aws:iam::123456789012:role/Redshift-Exfunc-Test' RETRY_TIMEOUT 0;

将 RETRY_TIMEOUT 设置为 0 时,您可以从单独的数据库会话运行以下两个查询以查看不同的结果。

使用 Lambda UDF 的第一个 SQL 查询成功运行。

select exfunc_upper('Varchar'); exfunc_upper -------------- VARCHAR (1 row)

同时从单独的数据库会话运行的第二个查询将收到 TooManyRequestsException 错误。

select exfunc_upper('Varchar'); ERROR: Rate Exceeded.; Exception: TooManyRequestsException; ShouldRetry: 1 DETAIL: ----------------------------------------------- error: Rate Exceeded.; Exception: TooManyRequestsException; ShouldRetry: 1 code: 32103 context:query: 0 location: exfunc_client.cpp:102 process: padbmaster [pid=26384] -----------------------------------------------

下面的第二个示例将 Lambda UDF 的 RETRY_TIMEOUT 属性设置为 3000 毫秒。即使同时运行第二个查询,Lambda UDF 也会重试,直到总延迟为 3000 毫秒。因此,两个查询都成功运行。

CREATE OR REPLACE EXTERNAL FUNCTION exfunc_upper(varchar) RETURNS varchar VOLATILE LAMBDA 'exfunc_sleep_3' IAM_ROLE 'arn:aws:iam::123456789012:role/Redshift-Exfunc-Test' RETRY_TIMEOUT 3000;

将 RETRY_TIMEOUT 被设置为 3000 时,您可以从单独的数据库会话运行以下两个查询以查看相同的结果。

运行 Lambda UDF 的第一个 SQL 查询成功运行。

select exfunc_upper('Varchar'); exfunc_upper -------------- VARCHAR (1 row)

第二个查询同时运行,且 Lambda UDF 重试,直到总延迟为 3000 毫秒。

select exfunc_upper('Varchar'); exfunc_upper -------------- VARCHAR (1 row)

使用 Python Lambda 函数的标量 Lambda UDF 示例

以下示例创建名为 exfunc_multiplication 的外部函数,然后将数字相乘并返回整数。此示例合并了 Lambda 响应中的 success 和 error_msg 字段。当乘法结果中存在整数溢出时,success 字段设置为 false,且 error_msg 消息设置为 Integer multiplication overflowexfunc_multiplication 函数将三个整数作为输入参数,并将总和作为整数输出返回。

被调用的 Lambda 函数的名称为 lambda_multiplication。此 Lambda 函数使用的语言是 Python 3.8。确保指定 IAM 角色。

CREATE EXTERNAL FUNCTION exfunc_multiplication(int, int, int) RETURNS INT VOLATILE LAMBDA 'lambda_multiplication' IAM_ROLE 'arn:aws:iam::123456789012:role/Redshift-Exfunc-Test';

Lambda 函数接受请求负载并对每一行进行迭代。将单行中的所有值相乘以计算该行的结果,并将其保存在响应列表中。此示例使用默认设置为 true 的 Boolean success 值。如果行的乘法结果具有整数溢出,则 success 值设置为 false。然后,迭代循环中断。

创建响应负载时,如果 success 值为 false,则以下 Lambda 函数将 error_msg 字段添加到负载中。它还将错误消息设置为 Integer multiplication overflow。如果 success 值为 true,则结果数据将添加到结果字段中。结果数组(如果有)中的行数与请求负载中接收的行数相似。

发送到 Lambda 函数的请求中的参数字段包含数据负载。在批处理请求的情况下,数据负载中可能有多行。以下 Lambda 函数对请求数据负载中的所有行进行迭代,并分别对一行中的所有值进行迭代。

import json def lambda_handler(event, context): t1 = event['arguments'] # 'len(t1)' represents the number of rows in the request payload. # The number of results in the response payload should be the same as the number of rows received. resp = [None]*len(t1) # By default success is set to 'True'. success = True # Iterating over all rows in the request payload. for i, x in enumerate(t1): mul = 1 # Iterating over all the values in a single row. for j, y in enumerate(x): mul = mul*y # Check integer overflow. if (mul >= 9223372036854775807 or mul <= -9223372036854775808): success = False break else: resp[i] = mul ret = dict() ret['success'] = success if not success: ret['error_msg'] = "Integer multiplication overflow" else: ret['results'] = resp ret_json = json.dumps(ret) return ret_json

以下示例使用文本值调用外部函数。

SELECT exfunc_multiplication(8, 9, 2); exfunc_multiplication --------------------------- 144 (1 row)

以下示例创建一个名为 t_multi 的表,其中包含整数数据类型的三个列 c1、c2 和 c3。通过传递此表的列名称来调用外部函数。以导致整数溢出的方式插入数据,从而显示错误的传播方式。

CREATE TABLE t_multi (c1 int, c2 int, c3 int); INSERT INTO t_multi VALUES (2147483647, 2147483647, 4); SELECT exfunc_multiplication(c1, c2, c3) FROM t_multi; DETAIL: ----------------------------------------------- error: Integer multiplication overflow code: 32004context: context: query: 38 location: exfunc_data.cpp:276 process: query2_16_38 [pid=30494] -----------------------------------------------