调用 AWS Lambda 函数 - Amazon Simple Storage Service
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

调用 AWS Lambda 函数

S3 批处理操作可以调用 AWS Lambda 函数来对清单中列出的对象执行自定义操作。本部分介绍如何创建 Lambda 函数来与 S3 批处理操作结合使用,以及如何创建作业来调用该函数。S3 批处理操作作业使用 LambdaInvoke 操作对清单中列出的每个对象运行 Lambda 函数。

您可以使用 AWS 管理控制台、AWS 命令行界面 (AWS CLI)、AWS 开发工具包或 REST API 处理适用于 Lambda 的 S3 批处理操作。有关使用 Lambda 的更多信息,请参阅《AWS Lambda 开发人员指南》中的 AWS Lambda 入门

以下部分说明如何开始将 S3 批处理操作与 Lambda 结合使用。

将 Lambda 与 Amazon S3 批处理操作结合使用

将 S3 批处理操作与 AWS Lambda 结合使用时,您必须创建专门用于 S3 批处理操作的新 Lambda 函数。您无法在 S3 批处理操作中重复使用基于事件的现有 Amazon S3 函数。事件函数只能接收消息;不返回消息。与 S3 批处理操作结合使用的 Lambda 函数必须接受并返回消息。有关将 Lambda 与 Amazon S3 事件结合使用的更多信息,请参阅《AWS Lambda 开发人员指南》中的将 AWS Lambda 与 Amazon S3 搭配使用

创建调用 Lambda 函数的 S3 批处理操作作业。此作业在清单中列出的所有对象上运行相同的 Lambda 函数。在处理清单中的对象时,您可以控制要使用的 Lambda 函数版本。S3 批处理操作支持非限定的 Amazon 资源名称 (ARN)、别名和特定版本。有关更多信息,请参阅《AWS Lambda 开发人员指南》中的 AWS Lambda 版本控制简介

如果您提供的 S3 批处理操作作业包含使用别名或 $LATEST 限定符的函数 ARN,并更新它们各自所指向的版本,S3 批处理操作开始调用 Lambda 函数的新版本。如果您希望通过大型作业更新此方法的功能部分,这会非常有用。如果不您希望 S3 批处理操作更改所使用的版本,请在创建作业时在 FunctionARN 参数中提供特定版本。

响应和结果代码

S3 批处理操作期望从 Lambda 函数获得两个级别的代码。第一个是整个请求的响应代码,第二个是每个任务的结果代码。下表包含响应代码。

响应代码 描述
Succeeded 任务正常完成。如果您请求了作业完成报告,报告中将包含任务的结果字符串。
TemporaryFailure 任务暂时失败,将在作业完成前重新启动。忽略结果字符串。如果是最后一次重新启动,最终报告将包含错误消息。
PermanentFailure 任务永久失败。如果您请求了作业完成报告,任务将被标记为 Failed 并包含错误消息字符串。忽略失败任务的结果字符串。

创建与 S3 批处理操作一起使用的 Lambda 函数

本部分提供了您必须与 Lambda 函数一起使用的示例 AWS Identity and Access Management (IAM) 权限。它还包含一个与 S3 批处理操作一起使用的示例 Lambda 函数。如果您之前从未创建过 Lambda 函数,请参阅《AWS Lambda 开发人员指南》中的教程:将 AWS Lambda 与 Amazon S3 结合使用

您必须创建专门与 S3 批处理操作一起使用的 Lambda 函数。您无法重复使用基于事件的现有 Amazon S3 函数。这是因为用于 S3 批处理操作的 Lambda 函数必须接受并返回特殊数据字段。

重要

用 Java 编写的 AWS Lambda 函数接受 RequestHandlerRequestStreamHandler 处理程序接口。但是,为了支持 S3 批处理操作请求和响应格式,AWS Lambda 需要 RequestStreamHandler 接口来对请求和响应进行自定义序列化和反序列化。此接口允许 Lambda 将 InputStream 和 OutputStream 传递给 Java handleRequest 方法。

将 Lambda 函数与 S3 批处理操作结合使用时,请务必使用 RequestStreamHandler 接口。如果您使用 RequestHandler 接口,则批处理作业会失败,并在完成报告中显示“Lambda 负载中返回无效的 JSON”。

有关更多信息,请参阅《AWS Lambda 用户指南》中的处理程序接口

IAM 权限示例

以下是将 Lambda 函数与 S3 批处理操作结合使用所需的 IAM 权限示例。

例 — S3 批处理操作信任策略

以下是您可用于批处理操作 IAM 角色的信任策略示例。当您创建作业并授予批处理操作代入 IAM 角色的权限时,会指定此 IAM 角色。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "batchoperations.s3.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

例 — Lambda IAM 策略

以下是提供 S3 批处理操作权限以调用 Lambda 函数和读取输入清单的 IAM 策略示例。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "BatchOperationsLambdaPolicy", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion", "s3:PutObject", "lambda:InvokeFunction" ], "Resource": "*" } ] }

示例请求和响应

本节内容提供 Lambda 函数的请求和响应示例。

例 请求

以下是 Lambda 函数的请求的 JSON 示例。

{ "invocationSchemaVersion": "1.0", "invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", "job": { "id": "f3cc4f60-61f6-4a2b-8a21-d07600c373ce" }, "tasks": [ { "taskId": "dGFza2lkZ29lc2hlcmUK", "s3Key": "customerImage1.jpg", "s3VersionId": "1", "s3BucketArn": "arn:aws:s3:us-east-1:0123456788:awsexamplebucket1" } ] }

例 响应

以下是 Lambda 函数的响应的 JSON 示例。

{ "invocationSchemaVersion": "1.0", "treatMissingKeysAs" : "PermanentFailure", "invocationId" : "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", "results": [ { "taskId": "dGFza2lkZ29lc2hlcmUK", "resultCode": "Succeeded", "resultString": "[\"Mary Major", \"John Stiles\"]" } ] }

S3 批处理操作的 Lambda 函数示例

以下示例中,Python Lambda 从受版本控制的对象中移除删除标记。

如示例所示,S3 批处理操作中的键采用 URL 编码。要将 Amazon S3 与其他 AWS 服务结合使用,请务必对从 S3 批处理操作传递的键进行 URL 解码。

import logging from urllib import parse import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) logger.setLevel('INFO') s3 = boto3.client('s3') def lambda_handler(event, context): """ Removes a delete marker from the specified versioned object. :param event: The S3 batch event that contains the ID of the delete marker to remove. :param context: Context about the event. :return: A result structure that Amazon S3 uses to interpret the result of the operation. When the result code is TemporaryFailure, S3 retries the operation. """ # Parse job parameters from Amazon S3 batch operations invocation_id = event['invocationId'] invocation_schema_version = event['invocationSchemaVersion'] results = [] result_code = None result_string = None task = event['tasks'][0] task_id = task['taskId'] try: obj_key = parse.unquote(task['s3Key'], encoding='utf-8') obj_version_id = task['s3VersionId'] bucket_name = task['s3BucketArn'].split(':')[-1] logger.info("Got task: remove delete marker %s from object %s.", obj_version_id, obj_key) try: # If this call does not raise an error, the object version is not a delete # marker and should not be deleted. response = s3.head_object( Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id) result_code = 'PermanentFailure' result_string = f"Object {obj_key}, ID {obj_version_id} is not " \ f"a delete marker." logger.debug(response) logger.warning(result_string) except ClientError as error: delete_marker = error.response['ResponseMetadata']['HTTPHeaders'] \ .get('x-amz-delete-marker', 'false') if delete_marker == 'true': logger.info("Object %s, version %s is a delete marker.", obj_key, obj_version_id) try: s3.delete_object( Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id) result_code = 'Succeeded' result_string = f"Successfully removed delete marker " \ f"{obj_version_id} from object {obj_key}." logger.info(result_string) except ClientError as error: # Mark request timeout as a temporary failure so it will be retried. if error.response['Error']['Code'] == 'RequestTimeout': result_code = 'TemporaryFailure' result_string = f"Attempt to remove delete marker from " \ f"object {obj_key} timed out." logger.info(result_string) else: raise else: raise ValueError(f"The x-amz-delete-marker header is either not " f"present or is not 'true'.") except Exception as error: # Mark all other exceptions as permanent failures. result_code = 'PermanentFailure' result_string = str(error) logger.exception(error) finally: results.append({ 'taskId': task_id, 'resultCode': result_code, 'resultString': result_string }) return { 'invocationSchemaVersion': invocation_schema_version, 'treatMissingKeysAs': 'PermanentFailure', 'invocationId': invocation_id, 'results': results }

创建调用 Lambda 函数的 S3 批处理操作作业

创建 S3 批处理操作作业以调用 Lambda 函数时,必须提供以下信息:

  • Lambda 函数的 ARN(可能包含函数别名或特定版本号)

  • 具有调用此函数的权限的 IAM 角色

  • 操作参数 LambdaInvokeFunction

有关创建 S3 批处理操作作业的更多信息,请参阅创建 S3 批处理操作作业操作

以下示例创建使用 AWS CLI 调用 Lambda 函数的 S3 批处理操作作业。

aws s3control create-job --account-id <AccountID> --operation '{"LambdaInvoke": { "FunctionArn": "arn:aws:lambda:Region:AccountID:function:LambdaFunctionName" } }' --manifest '{"Spec":{"Format":"S3BatchOperations_CSV_20180820","Fields":["Bucket","Key"]},"Location":{"ObjectArn":"arn:aws:s3:::ManifestLocation","ETag":"ManifestETag"}}' --report '{"Bucket":"arn:aws:s3:::awsexamplebucket1","Format":"Report_CSV_20180820","Enabled":true,"Prefix":"ReportPrefix","ReportScope":"AllTasks"}' --priority 2 --role-arn arn:aws:iam::AccountID:role/BatchOperationsRole --region Region --description “Lambda Function"

在 Lambda 清单中提供任务级信息

当您将 AWS Lambda 函数与 S3 批处理操作结合使用时,对于所操作的各个任务/键可能会需要附带额外的数据。例如,您可能希望同时提供源对象键和新的对象键。然后,您的 Lambda 函数可以将源键以新名称复制到新 S3 存储桶。默认情况下,通过 Amazon S3 批处理操作,您可以仅在作业的输入清单中,指定目标存储桶以及源键列表。下文介绍了如何在清单中包含额外的数据,以便运行更复杂的 Lambda 函数。

要在 S3 批处理操作清单中指定各个键的参数以在 Lambda 函数代码中使用,请使用以下 URL 编码的 JSON 格式。key 字段采用类似于 Amazon S3 对象键的方式传递到您的 Lambda 函数。但是,Lambda 函数可以解释它以包含其他值或多个键,如下所示。

注意

清单中 key 字段的最大字符数为 1024。

例 — 清单使用 JSON 字符串替换“Amazon S3 键”

必须向 S3 批处理操作提供 URL 编码的版本。

my-bucket,{"origKey": "object1key", "newKey": "newObject1Key"} my-bucket,{"origKey": "object2key", "newKey": "newObject2Key"} my-bucket,{"origKey": "object3key", "newKey": "newObject3Key"}

例 — URL 编码的清单

必须向 S3 批处理操作提供此 URL 编码的版本。非 URL 编码的版本不起作用。

my-bucket,%7B%22origKey%22%3A%20%22object1key%22%2C%20%22newKey%22%3A%20%22newObject1Key%22%7D my-bucket,%7B%22origKey%22%3A%20%22object2key%22%2C%20%22newKey%22%3A%20%22newObject2Key%22%7D my-bucket,%7B%22origKey%22%3A%20%22object3key%22%2C%20%22newKey%22%3A%20%22newObject3Key%22%7D

例 — 具有清单格式的 Lambda 函数将结果写入作业报告

此 Lambda 函数显示了如何解析编码到 S3 批处理操作清单中以竖线分隔的任务。该任务指示应用于指定对象的修订操作。

import logging from urllib import parse import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) logger.setLevel('INFO') s3 = boto3.resource('s3') def lambda_handler(event, context): """ Applies the specified revision to the specified object. :param event: The Amazon S3 batch event that contains the ID of the object to revise and the revision type to apply. :param context: Context about the event. :return: A result structure that Amazon S3 uses to interpret the result of the operation. """ # Parse job parameters from Amazon S3 batch operations invocation_id = event['invocationId'] invocation_schema_version = event['invocationSchemaVersion'] results = [] result_code = None result_string = None task = event['tasks'][0] task_id = task['taskId'] # The revision type is packed with the object key as a pipe-delimited string. obj_key, revision = \ parse.unquote(task['s3Key'], encoding='utf-8').split('|') bucket_name = task['s3BucketArn'].split(':')[-1] logger.info("Got task: apply revision %s to %s.", revision, obj_key) try: stanza_obj = s3.Bucket(bucket_name).Object(obj_key) stanza = stanza_obj.get()['Body'].read().decode('utf-8') if revision == 'lower': stanza = stanza.lower() elif revision == 'upper': stanza = stanza.upper() elif revision == 'reverse': stanza = stanza[::-1] elif revision == 'delete': pass else: raise TypeError(f"Can't handle revision type '{revision}'.") if revision == 'delete': stanza_obj.delete() result_string = f"Deleted stanza {stanza_obj.key}." else: stanza_obj.put(Body=bytes(stanza, 'utf-8')) result_string = f"Applied revision type '{revision}' to " \ f"stanza {stanza_obj.key}." logger.info(result_string) result_code = 'Succeeded' except ClientError as error: if error.response['Error']['Code'] == 'NoSuchKey': result_code = 'Succeeded' result_string = f"Stanza {obj_key} not found, assuming it was deleted " \ f"in an earlier revision." logger.info(result_string) else: result_code = 'PermanentFailure' result_string = f"Got exception when applying revision type '{revision}' " \ f"to {obj_key}: {error}." logger.exception(result_string) finally: results.append({ 'taskId': task_id, 'resultCode': result_code, 'resultString': result_string }) return { 'invocationSchemaVersion': invocation_schema_version, 'treatMissingKeysAs': 'PermanentFailure', 'invocationId': invocation_id, 'results': results }