调用 Amazon Lambda 函数
可以使用 Amazon S3 批量操作对 Amazon S3 对象执行大规模批量操作。批量操作调用 Amazon Lambda 函数操作将启动 Amazon Lambda 函数,来对清单中列出的对象执行自定义操作。本部分介绍如何创建 Lambda 函数来与 S3 批量操作结合使用,以及如何创建任务来调用该函数。S3 批量操作任务使用 LambdaInvoke
操作对清单中列出的每个对象运行 Lambda 函数。
可以使用 Amazon S3 控制台、Amazon Command Line Interface(Amazon CLI)、Amazon SDK 或 Amazon S3 REST API 处理 S3 批量操作。有关使用 Lambda 的更多信息,请参阅《Amazon Lambda 开发人员指南》中的入门Amazon Lambda。
以下部分说明如何开始将 S3 批量操作与 Lambda 结合使用。
将 Lambda 与批量操作结合使用
将 S3 批量操作与 Amazon Lambda 结合使用时,您必须创建专门用于 S3 批量操作的新 Lambda 函数。您无法在 S3 批量操作中重复使用基于事件的现有 Amazon S3 函数。事件函数只能接收消息;不返回消息。与 S3 批量操作结合使用的 Lambda 函数必须接受并返回消息。有关将 Lambda 与 Amazon S3 事件结合使用的更多信息,请参阅《Amazon Lambda 开发人员指南》中的将 Amazon Lambda 与 Amazon S3 结合使用。
创建调用 Lambda 函数的 S3 批量操作任务。此任务在清单中列出的所有对象上运行相同的 Lambda 函数。在处理清单中的对象时,您可以控制要使用的 Lambda 函数版本。S3 批量操作支持非限定的 Amazon Resource Name(ARN)、别名和特定版本。有关更多信息,请参阅《Amazon Lambda 开发人员指南》中的Amazon Lambda 版本控制简介。
如果您提供的 S3 批量操作任务包含使用别名或 $LATEST
限定符的函数 ARN,并更新它们各自所指向的版本,S3 批量操作开始调用 Lambda 函数的新版本。如果您希望通过大型任务更新此方法的功能部分,这会非常有用。如果您不希望 S3 批量操作更改所使用的版本,请在创建任务时在 FunctionARN
参数中提供特定版本。
将 Lambda 和批量操作用于目录存储桶
目录存储桶是一种 Amazon S3 存储桶类型,专为需要一致的个位数毫秒延迟的工作负载或性能至关重要的应用程序而设计。有关更多信息,请参阅目录存储桶。
使用批量操作来调用处理目录存储桶的 Lambda 函数有特殊要求。例如,必须使用更新后的 JSON 架构来构建 Lambda 请求,并在创建任务时指定 InvocationSchemaVersion 2.0(而不是 1.0)。此更新后的架构允许您为 UserArguments 指定可选的键值对,您可以使用这些键值对修改现有 Lambda 函数的某些参数。有关更多信息,请参阅 Amazon 存储博客中的 Automate object processing in Amazon S3 directory buckets with S3 Batch Operations and Amazon Lambda
响应和结果代码
S3 批量操作使用一个或多个键调用 Lambda 函数,每个键都有一个与之关联的 TaskID
。S3 批量操作需要 Lambda 函数提供每个键的结果代码。对于在请求中发送的任何任务 ID,如果没有为它们返回每个键的结果代码,则将从 treatMissingKeysAs
字段中提供结果代码。treatMissingKeysAs
是可选的请求字段,默认为 TemporaryFailure
。下表包含 treatMissingKeysAs
字段的其它可能的结果代码和值。
响应代码 | 描述 |
---|---|
Succeeded |
任务正常完成。如果您请求了任务完成报告,报告中将包含任务的结果字符串。 |
TemporaryFailure |
任务暂时失败,将在任务完成前重新启动。忽略结果字符串。如果是最后一次重新启动,最终报告将包含错误消息。 |
PermanentFailure |
任务永久失败。如果您请求了任务完成报告,任务将被标记为 Failed 并包含错误消息字符串。忽略失败任务的结果字符串。 |
创建与 S3 批量操作一起使用的 Lambda 函数
此部分提供使用 Lambda 函数必须具有的 Amazon Identity and Access Management (IAM) 权限示例。它还包含一个与 S3 批量操作一起使用的示例 Lambda 函数。如果您之前从未创建过 Lambda 函数,请参阅《Amazon Lambda 开发人员指南》中的教程:将 Amazon Lambda 与 Amazon S3 结合使用。
您必须创建专门与 S3 批量操作一起使用的 Lambda 函数。您无法重用基于 Amazon S3 事件的现有 Lambda 函数,因为用于 S3 批量操作的 Lambda 函数必须接受并返回特殊数据字段。
重要
用 Java 编写的 Amazon Lambda 函数接受 RequestHandlerRequestStreamHandler
接口来对请求和响应进行自定义序列化和反序列化。此接口允许 Lambda 将 InputStream 和 OutputStream 传递给 Java handleRequest
方法。
将 Lambda 函数与 S3 批量操作结合使用时,请务必使用 RequestStreamHandler
接口。如果您使用 RequestHandler
接口,则批处理任务会失败,并在完成报告中显示“Lambda 负载中返回无效的 JSON”。
有关更多信息,请参阅《Amazon Lambda 用户指南》中的处理程序接口。
IAM 权限示例
以下是将 Lambda 函数与 S3 批量操作结合使用所需的 IAM 权限示例。
例 – S3 批量操作信任策略
以下是您可用于批量操作 IAM 角色的信任策略示例。当您创建任务并授予批量操作代入 IAM 角色的权限时,会指定此 IAM 角色。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "batchoperations.s3.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }
例 – Lambda IAM 策略
以下是提供 S3 批量操作权限以调用 Lambda 函数和读取输入清单的 IAM 策略示例。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "BatchOperationsLambdaPolicy", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion", "s3:PutObject", "lambda:InvokeFunction" ], "Resource": "*" } ] }
示例请求和响应
本节内容提供 Lambda 函数的请求和响应示例。
例 请求
以下是 Lambda 函数的请求的 JSON 示例。
{ "invocationSchemaVersion": "1.0", "invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", "job": { "id": "f3cc4f60-61f6-4a2b-8a21-d07600c373ce" }, "tasks": [ { "taskId": "dGFza2lkZ29lc2hlcmUK", "s3Key": "customerImage1.jpg", "s3VersionId": "1", "s3BucketArn": "arn:aws:s3:us-east-1:0123456788:amzn-s3-demo-bucket1" } ] }
例 响应
以下是 Lambda 函数的响应的 JSON 示例。
{ "invocationSchemaVersion": "1.0", "treatMissingKeysAs" : "PermanentFailure", "invocationId" : "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", "results": [ { "taskId": "dGFza2lkZ29lc2hlcmUK", "resultCode": "Succeeded", "resultString": "[\"Mary Major", \"John Stiles\"]" } ] }
S3 批量操作的 Lambda 函数示例
以下示例中,Python Lambda 从受版本控制的对象中移除删除标记。
如示例所示,S3 批量操作中的键采用 URL 编码。要将 Amazon S3 与其他 Amazon 服务结合使用,请务必对从 S3 批量操作传递的键进行 URL 解码。
import logging from urllib import parse import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) logger.setLevel("INFO") s3 = boto3.client("s3") def lambda_handler(event, context): """ Removes a delete marker from the specified versioned object. :param event: The S3 batch event that contains the ID of the delete marker to remove. :param context: Context about the event. :return: A result structure that Amazon S3 uses to interpret the result of the operation. When the result code is TemporaryFailure, S3 retries the operation. """ # Parse job parameters from Amazon S3 batch operations invocation_id = event["invocationId"] invocation_schema_version = event["invocationSchemaVersion"] results = [] result_code = None result_string = None task = event["tasks"][0] task_id = task["taskId"] try: obj_key = parse.unquote(task["s3Key"], encoding="utf-8") obj_version_id = task["s3VersionId"] bucket_name = task["s3BucketArn"].split(":")[-1] logger.info( "Got task: remove delete marker %s from object %s.", obj_version_id, obj_key ) try: # If this call does not raise an error, the object version is not a delete # marker and should not be deleted. response = s3.head_object( Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id ) result_code = "PermanentFailure" result_string = ( f"Object {obj_key}, ID {obj_version_id} is not " f"a delete marker." ) logger.debug(response) logger.warning(result_string) except ClientError as error: delete_marker = error.response["ResponseMetadata"]["HTTPHeaders"].get( "x-amz-delete-marker", "false" ) if delete_marker == "true": logger.info( "Object %s, version %s is a delete marker.", obj_key, obj_version_id ) try: s3.delete_object( Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id ) result_code = "Succeeded" result_string = ( f"Successfully removed delete marker " f"{obj_version_id} from object {obj_key}." ) logger.info(result_string) except ClientError as error: # Mark request timeout as a temporary failure so it will be retried. if error.response["Error"]["Code"] == "RequestTimeout": result_code = "TemporaryFailure" result_string = ( f"Attempt to remove delete marker from " f"object {obj_key} timed out." ) logger.info(result_string) else: raise else: raise ValueError( f"The x-amz-delete-marker header is either not " f"present or is not 'true'." ) except Exception as error: # Mark all other exceptions as permanent failures. result_code = "PermanentFailure" result_string = str(error) logger.exception(error) finally: results.append( { "taskId": task_id, "resultCode": result_code, "resultString": result_string, } ) return { "invocationSchemaVersion": invocation_schema_version, "treatMissingKeysAs": "PermanentFailure", "invocationId": invocation_id, "results": results, }
创建调用 Lambda 函数的 S3 批量操作任务
创建 S3 批量操作任务以调用 Lambda 函数时,必须提供以下信息:
-
Lambda 函数的 ARN(可能包含函数别名或特定版本号)
-
具有调用此函数的权限的 IAM 角色
-
操作参数
LambdaInvokeFunction
有关创建 S3 批量操作任务的更多信息,请参阅 创建 S3 批量操作任务 和 S3 分批操作支持的操作。
以下示例使用 Amazon CLI 创建用于调用 Lambda 函数的 S3 批量操作任务。要使用此示例,请将
替换为您自己的信息。user input
placeholders
aws s3control create-job --account-id
account-id
--operation '{"LambdaInvoke": { "FunctionArn": "arn:aws:lambda:region
:account-id
:function:LambdaFunctionName
" } }' --manifest '{"Spec":{"Format":"S3BatchOperations_CSV_20180820","Fields":["Bucket","Key"]},"Location":{"ObjectArn":"arn:aws:s3:::amzn-s3-demo-manifest-bucket
","ETag":"ManifestETag
"}}' --report '{"Bucket":"arn:aws:s3:::amzn-s3-demo-bucket
","Format":"Report_CSV_20180820","Enabled":true,"Prefix":"ReportPrefix
","ReportScope":"AllTasks"}' --priority2
--role-arn arn:aws:iam::account-id
:role/BatchOperationsRole
--regionregion
--description "Lambda Function
"
在 Lambda 清单中提供任务级信息
当您将 Amazon Lambda 函数与 S3 批量操作结合使用时,对于所运行的各个任务或键,可能会需要附带额外的数据。例如,您可能希望同时提供源对象键和新的对象键。然后,您的 Lambda 函数可以将源键以新名称复制到新 S3 存储桶。默认情况下,通过批量操作,可以仅在任务的输入清单中指定目标存储桶以及源键的列表。下面的示例介绍了如何在清单中包含额外的数据,以便运行更复杂的 Lambda 函数。
要在 S3 批量操作清单中指定各个键的参数以在 Lambda 函数代码中使用,请使用以下 URL 编码的 JSON 格式。key
字段采用类似于 Amazon S3 对象键的方式传递到您的 Lambda 函数。但是,Lambda 函数可以解释此字段来包含其它值或多个键,如下示例所示。
注意
清单中 key
字段的最大字符数为 1024。
例 – 清单将“Amazon S3 密钥”替换为 JSON 字符串
必须向 S3 批量操作提供 URL 编码的版本。
amzn-s3-demo-bucket
,{"origKey": "object1key
", "newKey": "newObject1Key
"}amzn-s3-demo-bucket
,{"origKey": "object2key
", "newKey": "newObject2Key
"}amzn-s3-demo-bucket
,{"origKey": "object3key
", "newKey": "newObject3Key
"}
例 – URL 编码的清单
必须向 S3 批量操作提供此 URL 编码的版本。非 URL 编码的版本不起作用。
amzn-s3-demo-bucket
,%7B%22origKey%22%3A%20%22object1key
%22%2C%20%22newKey%22%3A%20%22newObject1Key
%22%7Damzn-s3-demo-bucket
,%7B%22origKey%22%3A%20%22object2key
%22%2C%20%22newKey%22%3A%20%22newObject2Key
%22%7Damzn-s3-demo-bucket
,%7B%22origKey%22%3A%20%22object3key
%22%2C%20%22newKey%22%3A%20%22newObject3Key
%22%7D
例 – 具有清单格式的 Lambda 函数将结果写入任务报告
这个 URL 编码的清单示例包含以竖线分隔的对象键,供以下 Lambda 函数进行解析。
amzn-s3-demo-bucket
,object1key
%7Cloweramzn-s3-demo-bucket
,object2key
%7Cupperamzn-s3-demo-bucket
,object3key
%7Creverseamzn-s3-demo-bucket
,object4key
%7Cdelete
此 Lambda 函数显示了如何解析编码到 S3 批量操作清单中的以竖线分隔的任务。该任务指示应用于指定对象的修订操作。
import logging from urllib import parse import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) logger.setLevel("INFO") s3 = boto3.resource("s3") def lambda_handler(event, context): """ Applies the specified revision to the specified object. :param event: The Amazon S3 batch event that contains the ID of the object to revise and the revision type to apply. :param context: Context about the event. :return: A result structure that Amazon S3 uses to interpret the result of the operation. """ # Parse job parameters from Amazon S3 batch operations invocation_id = event["invocationId"] invocation_schema_version = event["invocationSchemaVersion"] results = [] result_code = None result_string = None task = event["tasks"][0] task_id = task["taskId"] # The revision type is packed with the object key as a pipe-delimited string. obj_key, revision = parse.unquote(task["s3Key"], encoding="utf-8").split("|") bucket_name = task["s3BucketArn"].split(":")[-1] logger.info("Got task: apply revision %s to %s.", revision, obj_key) try: stanza_obj = s3.Bucket(bucket_name).Object(obj_key) stanza = stanza_obj.get()["Body"].read().decode("utf-8") if revision == "lower": stanza = stanza.lower() elif revision == "upper": stanza = stanza.upper() elif revision == "reverse": stanza = stanza[::-1] elif revision == "delete": pass else: raise TypeError(f"Can't handle revision type '{revision}'.") if revision == "delete": stanza_obj.delete() result_string = f"Deleted stanza {stanza_obj.key}." else: stanza_obj.put(Body=bytes(stanza, "utf-8")) result_string = ( f"Applied revision type '{revision}' to " f"stanza {stanza_obj.key}." ) logger.info(result_string) result_code = "Succeeded" except ClientError as error: if error.response["Error"]["Code"] == "NoSuchKey": result_code = "Succeeded" result_string = ( f"Stanza {obj_key} not found, assuming it was deleted " f"in an earlier revision." ) logger.info(result_string) else: result_code = "PermanentFailure" result_string = ( f"Got exception when applying revision type '{revision}' " f"to {obj_key}: {error}." ) logger.exception(result_string) finally: results.append( { "taskId": task_id, "resultCode": result_code, "resultString": result_string, } ) return { "invocationSchemaVersion": invocation_schema_version, "treatMissingKeysAs": "PermanentFailure", "invocationId": invocation_id, "results": results, }
S3 分批操作教程
以下教程提供了使用 Lambda 的一些批量操作任务的完整端到端过程。在本教程中,您将了解如何将批量操作设置为调用 Lambda 函数,来对存储在 S3 源存储桶中的视频进行批量转码。Lambda 函数调用 AWS Elemental MediaConvert 来对视频进行转码。