调用 Amazon Lambda 函数 - Amazon Simple Storage Service
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

调用 Amazon Lambda 函数

调用 Amazon Lambda 函数启动 Amazon Lambda 函数,来对清单中列出的对象执行自定义操作。本部分介绍如何创建 Lambda 函数来与 S3 批量操作结合使用,以及如何创建任务来调用该函数。S3 批量操作任务使用 LambdaInvoke 操作对清单中列出的每个对象运行 Lambda 函数。

您可以通过使用 Amazon Web Services Management Console、Amazon Command Line Interface(Amazon CLI)、Amazon SDK 或 REST API,来使用适用于 Lambda 的 S3 批量操作。有关使用 Lambda 的更多信息,请参阅《Amazon Lambda 开发人员指南》中的入门Amazon Lambda

以下部分说明如何开始将 S3 批量操作与 Lambda 结合使用。

将 Lambda 与 Amazon S3 批量操作结合使用

将 S3 批量操作与 Amazon Lambda 结合使用时,您必须创建专门用于 S3 批量操作的新 Lambda 函数。您无法在 S3 批量操作中重复使用基于事件的现有 Amazon S3 函数。事件函数只能接收消息;不返回消息。与 S3 批量操作结合使用的 Lambda 函数必须接受并返回消息。有关将 Lambda 与 Amazon S3 事件结合使用的更多信息,请参阅《Amazon Lambda 开发人员指南》中的将 Amazon Lambda 与 Amazon S3 结合使用。

创建调用 Lambda 函数的 S3 批量操作任务。此任务在清单中列出的所有对象上运行相同的 Lambda 函数。在处理清单中的对象时,您可以控制要使用的 Lambda 函数版本。S3 批量操作支持非限定的 Amazon Resource Name(ARN)、别名和特定版本。有关更多信息,请参阅《Amazon Lambda 开发人员指南》中的Amazon Lambda 版本控制简介

如果您提供的 S3 批量操作任务包含使用别名或 $LATEST 限定符的函数 ARN,并更新它们各自所指向的版本,S3 批量操作开始调用 Lambda 函数的新版本。如果您希望通过大型任务更新此方法的功能部分,这会非常有用。如果不您希望 S3 批量操作更改所使用的版本,请在创建任务时在 FunctionARN 参数中提供特定版本。

将 Lambda 和 Amazon S3 批量操作用于目录存储桶

目录存储桶是一种 Amazon S3 存储桶,专为需要一致的个位数毫秒延迟的工作负载或性能至关重要的应用程序而设计。有关更多信息,请参阅目录存储桶

使用 Amazon S3 批量操作来调用处理目录存储桶的 Lambda 函数有特殊要求。例如,您必须使用更新后的 JSON 架构来构建 Lambda 请求,并在创建任务时指定 InvocationSchemaVersion 2.0。此更新后的架构允许您为 UserArguments 指定可选的键值对,您可以使用这些键值对修改现有 Lambda 函数的某些参数。有关更多信息,请参阅 Amazon 存储博客中的 Automate object processing in Amazon S3 directory buckets with S3 Batch Operations and Amazon Lambda

响应和结果代码

S3 批量操作期望从 Lambda 函数获得两个级别的代码。第一个是整个请求的响应代码,第二个是每个任务的结果代码。下表包含响应代码。

响应代码 描述
Succeeded 任务正常完成。如果您请求了任务完成报告,报告中将包含任务的结果字符串。
TemporaryFailure 任务暂时失败,将在任务完成前重新启动。忽略结果字符串。如果是最后一次重新启动,最终报告将包含错误消息。
PermanentFailure 任务永久失败。如果您请求了任务完成报告,任务将被标记为 Failed 并包含错误消息字符串。忽略失败任务的结果字符串。

创建与 S3 批量操作一起使用的 Lambda 函数

此部分提供使用 Lambda 函数必须具有的 Amazon Identity and Access Management (IAM) 权限示例。它还包含一个与 S3 批量操作一起使用的示例 Lambda 函数。如果您之前从未创建过 Lambda 函数,请参阅《Amazon Lambda 开发人员指南》中的教程:将 Amazon Lambda 与 Amazon S3 结合使用

您必须创建专门与 S3 批量操作一起使用的 Lambda 函数。您无法重复使用基于事件的现有 Amazon S3 Lambda 函数。这是因为用于 S3 批量操作的 Lambda 函数必须接受并返回特殊数据字段。

重要

用 Java 编写的 Amazon Lambda 函数接受 RequestHandlerRequestStreamHandler 处理程序接口。但是,为了支持 S3 批量操作请求和响应格式,Amazon Lambda 需要 RequestStreamHandler 接口来对请求和响应进行自定义序列化和反序列化。此接口允许 Lambda 将 InputStream 和 OutputStream 传递给 Java handleRequest 方法。

将 Lambda 函数与 S3 批量操作结合使用时,请务必使用 RequestStreamHandler 接口。如果您使用 RequestHandler 接口,则批处理任务会失败,并在完成报告中显示“Lambda 负载中返回无效的 JSON”。

有关更多信息,请参阅《Amazon Lambda 用户指南》中的处理程序接口

IAM 权限示例

以下是将 Lambda 函数与 S3 批量操作结合使用所需的 IAM 权限示例。

例 – S3 批量操作信任策略

以下是您可用于批量操作 IAM 角色的信任策略示例。当您创建任务并授予批量操作代入 IAM 角色的权限时,会指定此 IAM 角色。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "batchoperations.s3.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }
例 – Lambda IAM 策略

以下是提供 S3 批量操作权限以调用 Lambda 函数和读取输入清单的 IAM 策略示例。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "BatchOperationsLambdaPolicy", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion", "s3:PutObject", "lambda:InvokeFunction" ], "Resource": "*" } ] }

示例请求和响应

本节内容提供 Lambda 函数的请求和响应示例。

例 请求

以下是 Lambda 函数的请求的 JSON 示例。

{ "invocationSchemaVersion": "1.0", "invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", "job": { "id": "f3cc4f60-61f6-4a2b-8a21-d07600c373ce" }, "tasks": [ { "taskId": "dGFza2lkZ29lc2hlcmUK", "s3Key": "customerImage1.jpg", "s3VersionId": "1", "s3BucketArn": "arn:aws:s3:us-east-1:0123456788:awsexamplebucket1" } ] }
例 响应

以下是 Lambda 函数的响应的 JSON 示例。

{ "invocationSchemaVersion": "1.0", "treatMissingKeysAs" : "PermanentFailure", "invocationId" : "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", "results": [ { "taskId": "dGFza2lkZ29lc2hlcmUK", "resultCode": "Succeeded", "resultString": "[\"Mary Major", \"John Stiles\"]" } ] }

S3 批量操作的 Lambda 函数示例

以下示例中,Python Lambda 从受版本控制的对象中移除删除标记。

如示例所示,S3 批量操作中的键采用 URL 编码。要将 Amazon S3 与其他 Amazon 服务结合使用,请务必对从 S3 批量操作传递的键进行 URL 解码。

import logging from urllib import parse import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) logger.setLevel("INFO") s3 = boto3.client("s3") def lambda_handler(event, context): """ Removes a delete marker from the specified versioned object. :param event: The S3 batch event that contains the ID of the delete marker to remove. :param context: Context about the event. :return: A result structure that Amazon S3 uses to interpret the result of the operation. When the result code is TemporaryFailure, S3 retries the operation. """ # Parse job parameters from Amazon S3 batch operations invocation_id = event["invocationId"] invocation_schema_version = event["invocationSchemaVersion"] results = [] result_code = None result_string = None task = event["tasks"][0] task_id = task["taskId"] try: obj_key = parse.unquote(task["s3Key"], encoding="utf-8") obj_version_id = task["s3VersionId"] bucket_name = task["s3BucketArn"].split(":")[-1] logger.info( "Got task: remove delete marker %s from object %s.", obj_version_id, obj_key ) try: # If this call does not raise an error, the object version is not a delete # marker and should not be deleted. response = s3.head_object( Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id ) result_code = "PermanentFailure" result_string = ( f"Object {obj_key}, ID {obj_version_id} is not " f"a delete marker." ) logger.debug(response) logger.warning(result_string) except ClientError as error: delete_marker = error.response["ResponseMetadata"]["HTTPHeaders"].get( "x-amz-delete-marker", "false" ) if delete_marker == "true": logger.info( "Object %s, version %s is a delete marker.", obj_key, obj_version_id ) try: s3.delete_object( Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id ) result_code = "Succeeded" result_string = ( f"Successfully removed delete marker " f"{obj_version_id} from object {obj_key}." ) logger.info(result_string) except ClientError as error: # Mark request timeout as a temporary failure so it will be retried. if error.response["Error"]["Code"] == "RequestTimeout": result_code = "TemporaryFailure" result_string = ( f"Attempt to remove delete marker from " f"object {obj_key} timed out." ) logger.info(result_string) else: raise else: raise ValueError( f"The x-amz-delete-marker header is either not " f"present or is not 'true'." ) except Exception as error: # Mark all other exceptions as permanent failures. result_code = "PermanentFailure" result_string = str(error) logger.exception(error) finally: results.append( { "taskId": task_id, "resultCode": result_code, "resultString": result_string, } ) return { "invocationSchemaVersion": invocation_schema_version, "treatMissingKeysAs": "PermanentFailure", "invocationId": invocation_id, "results": results, }

创建调用 Lambda 函数的 S3 批量操作任务

创建 S3 批量操作任务以调用 Lambda 函数时,必须提供以下信息:

  • Lambda 函数的 ARN(可能包含函数别名或特定版本号)

  • 具有调用此函数的权限的 IAM 角色

  • 操作参数 LambdaInvokeFunction

有关创建 S3 批量操作任务的更多信息,请参阅 创建 S3 批量操作任务S3 分批操作支持的操作

以下示例创建使用 Amazon CLI 调用 Lambda 函数的 S3 批量操作任务。

aws s3control create-job --account-id <AccountID> --operation '{"LambdaInvoke": { "FunctionArn": "arn:aws:lambda:Region:AccountID:function:LambdaFunctionName" } }' --manifest '{"Spec":{"Format":"S3BatchOperations_CSV_20180820","Fields":["Bucket","Key"]},"Location":{"ObjectArn":"arn:aws:s3:::ManifestLocation","ETag":"ManifestETag"}}' --report '{"Bucket":"arn:aws:s3:::awsexamplebucket1","Format":"Report_CSV_20180820","Enabled":true,"Prefix":"ReportPrefix","ReportScope":"AllTasks"}' --priority 2 --role-arn arn:aws:iam::AccountID:role/BatchOperationsRole --region Region --description “Lambda Function"

在 Lambda 清单中提供任务级信息

当您将 Amazon Lambda 函数与 S3 批量操作结合使用时,对于所操作的各个任务/键可能会需要附带额外的数据。例如,您可能希望同时提供源对象键和新的对象键。然后,您的 Lambda 函数可以将源键以新名称复制到新 S3 存储桶。默认情况下,通过 Amazon S3 批量操作,您可以仅在任务的输入清单中,指定目标存储桶以及源键列表。下文介绍了如何在清单中包含额外的数据,以便运行更复杂的 Lambda 函数。

要在 S3 批量操作清单中指定各个键的参数以在 Lambda 函数代码中使用,请使用以下 URL 编码的 JSON 格式。key 字段采用类似于 Amazon S3 对象键的方式传递到您的 Lambda 函数。但是,Lambda 函数可以解释它以包含其他值或多个键,如下所示。

注意

清单中 key 字段的最大字符数为 1024。

例 – 清单使用 JSON 字符串替换“Amazon S3 键”

必须向 S3 批量操作提供 URL 编码的版本。

my-bucket,{"origKey": "object1key", "newKey": "newObject1Key"} my-bucket,{"origKey": "object2key", "newKey": "newObject2Key"} my-bucket,{"origKey": "object3key", "newKey": "newObject3Key"}
例 – URL 编码的清单

必须向 S3 批量操作提供此 URL 编码的版本。非 URL 编码的版本不起作用。

my-bucket,%7B%22origKey%22%3A%20%22object1key%22%2C%20%22newKey%22%3A%20%22newObject1Key%22%7D my-bucket,%7B%22origKey%22%3A%20%22object2key%22%2C%20%22newKey%22%3A%20%22newObject2Key%22%7D my-bucket,%7B%22origKey%22%3A%20%22object3key%22%2C%20%22newKey%22%3A%20%22newObject3Key%22%7D
例 – 具有清单格式的 Lambda 函数将结果写入任务报告

此 Lambda 函数显示了如何解析编码到 S3 批量操作清单中以竖线分隔的任务。该任务指示应用于指定对象的修订操作。

import logging from urllib import parse import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) logger.setLevel("INFO") s3 = boto3.resource("s3") def lambda_handler(event, context): """ Applies the specified revision to the specified object. :param event: The Amazon S3 batch event that contains the ID of the object to revise and the revision type to apply. :param context: Context about the event. :return: A result structure that Amazon S3 uses to interpret the result of the operation. """ # Parse job parameters from Amazon S3 batch operations invocation_id = event["invocationId"] invocation_schema_version = event["invocationSchemaVersion"] results = [] result_code = None result_string = None task = event["tasks"][0] task_id = task["taskId"] # The revision type is packed with the object key as a pipe-delimited string. obj_key, revision = parse.unquote(task["s3Key"], encoding="utf-8").split("|") bucket_name = task["s3BucketArn"].split(":")[-1] logger.info("Got task: apply revision %s to %s.", revision, obj_key) try: stanza_obj = s3.Bucket(bucket_name).Object(obj_key) stanza = stanza_obj.get()["Body"].read().decode("utf-8") if revision == "lower": stanza = stanza.lower() elif revision == "upper": stanza = stanza.upper() elif revision == "reverse": stanza = stanza[::-1] elif revision == "delete": pass else: raise TypeError(f"Can't handle revision type '{revision}'.") if revision == "delete": stanza_obj.delete() result_string = f"Deleted stanza {stanza_obj.key}." else: stanza_obj.put(Body=bytes(stanza, "utf-8")) result_string = ( f"Applied revision type '{revision}' to " f"stanza {stanza_obj.key}." ) logger.info(result_string) result_code = "Succeeded" except ClientError as error: if error.response["Error"]["Code"] == "NoSuchKey": result_code = "Succeeded" result_string = ( f"Stanza {obj_key} not found, assuming it was deleted " f"in an earlier revision." ) logger.info(result_string) else: result_code = "PermanentFailure" result_string = ( f"Got exception when applying revision type '{revision}' " f"to {obj_key}: {error}." ) logger.exception(result_string) finally: results.append( { "taskId": task_id, "resultCode": result_code, "resultString": result_string, } ) return { "invocationSchemaVersion": invocation_schema_version, "treatMissingKeysAs": "PermanentFailure", "invocationId": invocation_id, "results": results, }

从 S3 批量操作教程中学习

以下教程提供了使用 Lambda 的一些批量操作任务的完整端到端过程。