从 Amazon Aurora MySQL 数据库集群中调用 Lambda 函数
您可以使用本机函数 lambda_sync
或 lambda_async
从与 Amazon Aurora MySQL 兼容的数据库集群中调用 Amazon Lambda 函数。从 Aurora MySQL 中调用 Lambda 函数之前,Aurora 数据库集群必须具有 Lambda 的访问权限。有关授予对 Aurora MySQL 的访问权限的详细信息,请参阅 为 Aurora 授予 Lambda 的访问权限。有关 lambda_sync
和 lambda_async
存储函数的信息,请参阅 使用 Aurora MySQL 本机函数调用 Lambda 函数。
您还可以通过使用存储过程调用 Amazon Lambda 函数。然而,使用存储过程已弃用。如果您使用以下 Aurora MySQL 版本之一,强烈建议使用 Aurora MySQL 本机函数:
-
Aurora MySQL 版本 2,适用于与 MySQL 5.7 兼容的集群。
-
Aurora MySQL 版本 3.01 及更高版本,针对与 MySQL 8.0 兼容的集群。此存储过程在 Aurora MySQL 版本 3 中不可用。
为 Aurora 授予 Lambda 的访问权限
从 Aurora MySQL 数据库集群中调用 Lambda 函数之前,请确保先为您的集群授予访问 Lambda 的权限。
为 Lambda 授予 Aurora MySQL 的访问权限
-
创建一个 Amazon Identity and Access Management(IAM)策略,以提供允许 Aurora MySQL 数据库集群调用 Lambda 函数的权限。有关说明,请参阅创建 IAM 策略以访问 Amazon Lambda 资源。
-
创建一个 IAM 角色,并将您在 创建 IAM 策略以访问 Amazon Lambda 资源 中创建的 IAM 策略附加到新的 IAM 角色。有关说明,请参阅创建 IAM 角色以允许 Amazon Aurora 访问Amazon服务。
-
将
aws_default_lambda_role
数据库集群参数设置为新 IAM 角色的 Amazon 资源名称 (ARN)。如果集群是 Aurora 全局数据库的一部分,则为该全局数据库中的每个 Aurora 集群应用相同的设置。
有关数据库集群参数的更多信息,请参阅Amazon Aurora 数据库集群和数据库实例参数。
-
要允许 Aurora MySQL 数据库集群中的数据库用户调用 Lambda 函数,请将您在创建 IAM 角色以允许 Amazon Aurora 访问Amazon服务中创建的角色与该数据库集群关联。有关将 IAM 角色与数据库集群关联的信息,请参阅将 IAM 角色与 Amazon Aurora MySQL 数据库集群关联。
如果集群是 Aurora 全局数据库的一部分,则将此角色与该全局数据库中的每个 Aurora 集群关联。
-
配置 Aurora MySQL 数据库集群以允许建立到 Lambda 的出站连接。有关说明,请参阅启用从 Amazon Aurora MySQL 到其他Amazon服务的网络通信。
如果集群是 Aurora 全局数据库的一部分,则为该全局数据库中的每个 Aurora 集群启用出站连接。
使用 Aurora MySQL 本机函数调用 Lambda 函数
注意
当您使用 Aurora MySQL 2 或 Aurora MySQL 版本 3.01 及更高版本时,您可以调用 lambda_sync
和 lambda_async
原生函数。有关 Aurora MySQL 版本的更多信息,请参阅Amazon Aurora MySQL 的数据库引擎更新。
您可以调用本机函数 lambda_sync
和 lambda_async
,以从 Aurora MySQL 数据库集群中调用 Amazon Lambda 函数。如果要将在 Aurora MySQL 上运行的数据库与其他Amazon服务集成,这种方法可能是非常有用的。例如,每次在数据库的特定表中插入行时,您可能希望使用 Amazon Simple Notification Service (Amazon SNS) 发送通知。
使用本机函数调用 Lambda 函数
lambda_sync
和 lambda_async
函数是内置的本机函数,它们同步或异步地调用 Lambda 函数。如果您必须知道 Lambda 函数的结果才能执行其他操作,请使用同步函数 lambda_sync
。如果您不需要知道 Lambda 函数的结果即可执行其他操作,请使用异步函数 lambda_async
。
在 Aurora MySQL 版本 3 中,必须为调用本机函数的用户授予 AWS_LAMBDA_ACCESS
角色。要为用户授予此角色,请以管理用户身份连接到数据库实例,然后运行以下语句。
GRANT AWS_LAMBDA_ACCESS TO
user
@domain-or-ip-address
您可以运行以下语句以吊销该角色。
REVOKE AWS_LAMBDA_ACCESS FROM
user
@domain-or-ip-address
提示
当您使用 Aurora MySQL 版本 3 中的角色方法时,还可以通过使用 SET ROLE
或 role_name
SET ROLE ALL
语句来激活角色。如果您不熟悉 MySQL 8.0 角色系统,可以在 基于角色的权限模型 中了解详情。您还可以在《MySQL 参考手册》的使用角色
这仅适用于当前的活动会话。当您重新连接时,必须再次运行 SET ROLE
语句以授予特权。有关更多信息,请参阅《MySQL 参考手册》中的 SET ROLE 语句
您也可以使用 activate_all_roles_on_login
数据库集群参数,在用户连接到数据库实例时自动激活所有角色。设置此参数后,您不必显式调用 SET ROLE 语句即可激活角色。有关更多信息,请参阅《MySQL 参考手册》中的 activate_all_roles_on_login
在 Aurora MySQL 版本 2 中,必须为调用原生函数的用户授予 INVOKE LAMBDA
权限。要为用户授予此权限,请以管理用户身份连接到数据库实例,然后运行以下语句。
GRANT INVOKE LAMBDA ON *.* TO
user
@domain-or-ip-address
您可以运行以下语句以吊销该权限。
REVOKE INVOKE LAMBDA ON *.* FROM
user
@domain-or-ip-address
lambda_sync 函数的语法
您可以使用 lambda_sync
调用类型同步地调用 RequestResponse
函数。该函数在 JSON 负载中返回 Lambda 调用的结果。该函数使用以下语法。
lambda_sync (
lambda_function_ARN
,
JSON_payload
)
lambda_sync 函数的参数
lambda_sync
函数具有以下参数。
- lambda_function_ARN
-
要调用的 Lambda 函数的 Amazon Resource Name (ARN)。
- JSON_payload
-
调用的 Lambda 函数的负载,采用 JSON 格式。
注意
Aurora MySQL 版本 3 支持 MySQL 8.0 中的 JSON 解析函数。但是,Aurora MySQL 版本 2 不包含这些函数。在 Lambda 函数返回原子值 (如数字或字符串) 时,不需要进行 JSON 解析。
lambda_sync 函数示例
基于 lambda_sync
的以下查询使用函数 ARN 同步地调用 Lambda 函数 BasicTestLambda
。该函数的负载是 {"operation":
"ping"}
。
SELECT lambda_sync( 'arn:aws:lambda:us-east-1:123456789012:function:BasicTestLambda', '{"operation": "ping"}');
lambda_async 函数的语法
您可以使用 lambda_async
调用类型异步地调用 Event
函数。该函数在 JSON 负载中返回 Lambda 调用的结果。该函数使用以下语法。
lambda_async (
lambda_function_ARN
,
JSON_payload
)
lambda_async 函数的参数
lambda_async
函数具有以下参数。
- lambda_function_ARN
-
要调用的 Lambda 函数的 Amazon Resource Name (ARN)。
- JSON_payload
-
调用的 Lambda 函数的负载,采用 JSON 格式。
注意
Aurora MySQL 版本 3 支持 MySQL 8.0 中的 JSON 解析函数。但是,Aurora MySQL 版本 2 不包含这些函数。在 Lambda 函数返回原子值 (如数字或字符串) 时,不需要进行 JSON 解析。
lambda_async 函数示例
基于 lambda_async
的以下查询使用函数 ARN 异步地调用 Lambda 函数 BasicTestLambda
。该函数的负载是 {"operation": "ping"}
。
SELECT lambda_async( 'arn:aws:lambda:us-east-1:123456789012:function:BasicTestLambda', '{"operation": "ping"}');
在触发器中调用 Lambda 函数
您可以使用触发器对数据修改语句调用 Lambda。以下示例使用 lambda_async
原生函数并将结果存储在变量中。
mysql>
SET @result=0;mysql>
DELIMITER //mysql>
CREATE TRIGGER myFirstTrigger AFTER INSERT ON Test_trigger FOR EACH ROW BEGIN SELECT lambda_async( 'arn:aws:lambda:us-east-1:123456789012:function:BasicTestLambda', '{"operation": "ping"}') INTO @result; END; //mysql>
DELIMITER ;
注意
触发器不是针对每个 SQL 语句运行一次,而是针对每个修改后的行运行一次(一次一行)。当触发器运行时,进程是同步的。数据修改语句仅在触发器完成时返回。
对于高写入流量的表,请谨慎使用其中的触发器来调用 Amazon Lambda 函数。INSERT
、UPDATE
和 DELETE
触发器按行激活。在包含 INSERT
、UPDATE
或 DELETE
触发器的表中,如果出现密集型写入工作负载,会导致大量调用 Amazon Lambda 函数。
使用 Aurora MySQL 存储过程调用 Lambda 函数(已弃用)
您可以调用 mysql.lambda_async
过程以从 Aurora MySQL 数据库集群中调用 Amazon Lambda 函数。如果要将在 Aurora MySQL 上运行的数据库与其他Amazon服务集成,这种方法可能是非常有用的。例如,每次在数据库的特定表中插入行时,您可能希望使用 Amazon Simple Notification Service (Amazon SNS) 发送通知。
Aurora MySQL 版本注意事项
从 Aurora MySQL 版本 2 开始,您可以使用原生函数方法调用 Lambda 函数,而不是使用这些存储过程。有关本机函数的更多信息,请参阅 使用本机函数调用 Lambda 函数。
在 Aurora MySQL 版本 2 中,不再支持存储过程 mysql.lambda_async
。强烈建议您改用原生 Lambda 函数。存储过程在 Aurora MySQL 版本 3 中不可用。
结合使用 mysql.lambda_async 过程来调用 Lambda 函数(已弃用)
mysql.lambda_async
过程是一个异步调用 Lambda 函数的内置存储过程。要使用该过程,您的数据库用户必须对 EXECUTE
存储过程拥有 mysql.lambda_async
权限。
语法
mysql.lambda_async
过程使用以下语法。
CALL mysql.lambda_async (
lambda_function_ARN
,
lambda_function_input
)
参数
mysql.lambda_async
过程具有以下参数。
- lambda_function_ARN
-
要调用的 Lambda 函数的 Amazon Resource Name (ARN)。
- lambda_function_input
-
所调用 Lambda 函数的 JSON 格式的输入字符串。
示例
作为最佳实践,我们建议您将对 mysql.lambda_async
的调用包装在存储过程中,该存储过程可从不同的来源 (例如触发器或客户端代码) 调用。这有助于避免出现阻抗不一致问题,并且可更轻松地调用 Lambda 函数。
注意
对于高写入流量的表,请谨慎使用其中的触发器来调用 Amazon Lambda 函数。INSERT
、UPDATE
和 DELETE
触发器按行激活。在包含 INSERT
、UPDATE
或 DELETE
触发器的表中,如果出现密集型写入工作负载,会导致大量调用 Amazon Lambda 函数。
尽管对 mysql.lambda_async
过程的调用是异步操作,但触发器是同步的。产生大量触发器激活的语句不等待调用 Amazon Lambda 函数完成,但是在返回对客户端的控制之前,它却需要等待触发器完成。
例 示例:调用 Amazon Lambda 函数来发送电子邮件
以下示例创建一个存储过程,您可以在您的数据库代码中调用该过程来使用 Lambda 函数发送电子邮件。
Amazon Lambda 函数
import boto3 ses = boto3.client('ses') def SES_send_email(event, context): return ses.send_email( Source=event['email_from'], Destination={ 'ToAddresses': [ event['email_to'], ] }, Message={ 'Subject': { 'Data': event['email_subject'] }, 'Body': { 'Text': { 'Data': event['email_body'] } } } )
存储过程
DROP PROCEDURE IF EXISTS SES_send_email; DELIMITER ;; CREATE PROCEDURE SES_send_email(IN email_from VARCHAR(255), IN email_to VARCHAR(255), IN subject VARCHAR(255), IN body TEXT) LANGUAGE SQL BEGIN CALL mysql.lambda_async( 'arn:aws:lambda:us-west-2:123456789012:function:SES_send_email', CONCAT('{"email_to" : "', email_to, '", "email_from" : "', email_from, '", "email_subject" : "', subject, '", "email_body" : "', body, '"}') ); END ;; DELIMITER ;
调用存储过程以调用 Amazon Lambda 函数
mysql>
call SES_send_email('example_from@amazon.com', 'example_to@amazon.com', 'Email subject', 'Email content');
例 示例:调用 Amazon Lambda 函数来从触发器发布事件
以下示例创建一个存储过程,该过程使用 Amazon SNS 发布事件。向表中添加行时,该代码会从触发器调用过程。
Amazon Lambda 函数
import boto3 sns = boto3.client('sns') def SNS_publish_message(event, context): return sns.publish( TopicArn='arn:aws:sns:us-west-2:123456789012:Sample_Topic', Message=event['message'], Subject=event['subject'], MessageStructure='string' )
存储过程
DROP PROCEDURE IF EXISTS SNS_Publish_Message; DELIMITER ;; CREATE PROCEDURE SNS_Publish_Message (IN subject VARCHAR(255), IN message TEXT) LANGUAGE SQL BEGIN CALL mysql.lambda_async('arn:aws:lambda:us-west-2:123456789012:function:SNS_publish_message', CONCAT('{ "subject" : "', subject, '", "message" : "', message, '" }') ); END ;; DELIMITER ;
表
CREATE TABLE 'Customer_Feedback' ( 'id' int(11) NOT NULL AUTO_INCREMENT, 'customer_name' varchar(255) NOT NULL, 'customer_feedback' varchar(1024) NOT NULL, PRIMARY KEY ('id') ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
触发器
DELIMITER ;; CREATE TRIGGER TR_Customer_Feedback_AI AFTER INSERT ON Customer_Feedback FOR EACH ROW BEGIN SELECT CONCAT('New customer feedback from ', NEW.customer_name), NEW.customer_feedback INTO @subject, @feedback; CALL SNS_Publish_Message(@subject, @feedback); END ;; DELIMITER ;
将行插入表中触发通知
mysql>
insert into Customer_Feedback (customer_name, customer_feedback) VALUES ('Sample Customer', 'Good job guys!');