从 Amazon S3 批处理操作调用 Lambda 函数 - Amazon Simple Storage Service
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

从 Amazon S3 批处理操作调用 Lambda 函数

S3 批处理操作可以调用 AWS Lambda 函数来对清单中列出的对象执行自定义操作。本部分介绍如何创建 Lambda 函数来与 S3 批处理操作结合使用,以及如何创建作业来调用该函数。S3 批处理操作作业使用 LambdaInvoke 操作对清单中列出的各个对象运行 Lambda 函数。

您可以使用 AWS 管理控制台、AWS 命令行界面 (AWS CLI)、AWS 开发工具包或 REST API 处理适用于 Lambda 的 S3 批处理操作。有关使用 Lambda 的更多信息,请参阅《AWS Lambda 开发人员指南》中的 AWS Lambda 入门

以下部分说明如何开始将 S3 批处理操作与 Lambda 结合使用。

将 Lambda 与 Amazon S3 批处理操作结合使用

将 S3 批处理操作与 AWS Lambda 结合使用时,您必须创建专门用于 S3 批处理操作的新 Lambda 函数。您无法在 S3 批处理操作中重复使用基于事件的现有 Amazon S3 函数。事件函数只能接收消息;不返回消息。与 S3 批处理操作结合使用的 Lambda 函数必须接受并返回消息。有关将 Lambda 与 Amazon S3 事件结合使用的更多信息,请参阅《AWS Lambda 开发人员指南》中的将 AWS Lambda 与 Amazon S3 搭配使用

创建调用 Lambda 函数的 S3 批处理操作作业。此作业在清单中列出的所有对象上运行相同的 Lambda 函数。在处理清单中的对象时,您可以控制要使用的 Lambda 函数版本。S3 批处理操作支持非限定的 Amazon 资源名称 (ARN)、别名和特定版本。有关更多信息,请参阅《AWS Lambda 开发人员指南》中的 AWS Lambda 版本控制简介

如果您提供的 S3 批处理操作作业包含使用别名或 $LATEST 限定符的函数 ARN,并更新它们各自所指向的版本,S3 批处理操作开始调用 Lambda 函数的新版本。如果您希望通过大型作业更新此方法的功能部分,这会非常有用。如果不您希望 S3 批处理操作更改所使用的版本,请在创建作业时在 FunctionARN 参数中提供特定版本。

响应和结果代码

S3 批处理操作期望从 Lambda 函数获得两个级别的代码。第一个是整个请求的响应代码,第二个是每个任务的结果代码。下表包含响应代码。

响应代码 描述
Succeeded 任务正常完成。如果您请求了作业完成报告,报告中将包含任务的结果字符串。
TemporaryFailure 任务暂时失败,将在作业完成前重新启动。忽略结果字符串。如果是最后一次重新启动,最终报告将包含错误消息。
PermanentFailure 任务永久失败。如果您请求了作业完成报告,任务将被标记为 Failed 并包含错误消息字符串。忽略失败任务的结果字符串。

创建与 S3 批处理操作一起使用的 Lambda 函数

本部分提供了您必须与 Lambda 函数一起使用的示例 AWS Identity and Access Management (IAM) 权限。它还包含一个与 S3 批处理操作一起使用的示例 Lambda 函数。如果您之前从未创建过 Lambda 函数,请参阅《AWS Lambda 开发人员指南》中的教程:将 AWS Lambda 与 Amazon S3 结合使用

您必须创建专门与 S3 批处理操作一起使用的 Lambda 函数。您无法重复使用基于事件的现有 Amazon S3 函数。这是因为用于 S3 批处理操作的 Lambda 函数必须接受并返回特殊数据字段。

重要

用 Java 编写的 AWS Lambda 函数接受 RequestHandlerRequestStreamHandler 处理程序接口。但是,为了支持 S3 批处理操作请求和响应格式,AWS Lambda 需要 RequestStreamHandler 接口来对请求和响应进行自定义序列化和反序列化。此接口允许 Lambda 将 InputStream 和 OutputStream 传递给 Java handleRequest 方法。

将 Lambda 函数与 S3 批处理操作结合使用时,请务必使用 RequestStreamHandler 接口。如果您使用 RequestHandler 接口,则批处理作业会失败,并在完成报告中显示“Lambda 负载中返回无效的 JSON”。

有关更多信息,请参阅《AWS Lambda 用户指南》中的处理程序接口

IAM 权限示例

以下是将 Lambda 函数与 S3 批处理操作结合使用所需的 IAM 权限示例。

例 — S3 批处理操作信任策略

以下是您可用于批处理操作 IAM 角色的信任策略示例。当您创建作业并授予批处理操作代入 IAM 角色的权限时,会指定此 IAM 角色。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "batchoperations.s3.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

例 — Lambda IAM 策略

以下是提供 S3 批处理操作权限以调用 Lambda 函数和读取输入清单的 IAM 策略示例。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "BatchOperationsLambdaPolicy", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion", "s3:PutObject", "lambda:InvokeFunction" ], "Resource": "*" } ] }

示例请求和响应

本节内容提供 Lambda 函数的请求和响应示例。

例 请求

以下是 Lambda 函数的请求的 JSON 示例。

{ "invocationSchemaVersion": "1.0", "invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", "job": { "id": "f3cc4f60-61f6-4a2b-8a21-d07600c373ce" }, "tasks": [ { "taskId": "dGFza2lkZ29lc2hlcmUK", "s3Key": "customerImage1.jpg", "s3VersionId": "1", "s3BucketArn": "arn:aws:s3:us-east-1:0123456788:awsexamplebucket1" } ] }

例 响应

以下是 Lambda 函数的响应的 JSON 示例。

{ "invocationSchemaVersion": "1.0", "treatMissingKeysAs" : "PermanentFailure", "invocationId" : "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", "results": [ { "taskId": "dGFza2lkZ29lc2hlcmUK", "resultCode": "Succeeded", "resultString": "[\"Mary Major", \"John Stiles\"]" } ] }

S3 批处理操作的 Lambda 函数示例

下面的示例 Python Lambda 函数循环访问清单,复制和重命名每个对象。

如示例所示,S3 批处理操作中的键采用 URL 编码。要将 Amazon S3 与其他 AWS 服务结合使用,请务必对从 S3 批处理操作传递的键进行 URL 解码。

import boto3 import urllib from botocore.exceptions import ClientError def lambda_handler(event, context): # Instantiate boto client s3Client = boto3.client('s3') # Parse job parameters from S3 Batch Operations jobId = event['job']['id'] invocationId = event['invocationId'] invocationSchemaVersion = event['invocationSchemaVersion'] # Prepare results results = [] # Parse Amazon S3 Key, Key Version, and Bucket ARN taskId = event['tasks'][0]['taskId'] s3Key = urllib.unquote(event['tasks'][0]['s3Key']).decode('utf8') s3VersionId = event['tasks'][0]['s3VersionId'] s3BucketArn = event['tasks'][0]['s3BucketArn'] s3Bucket = s3BucketArn.split(':::')[-1] # Construct CopySource with VersionId copySrc = {'Bucket': s3Bucket, 'Key': s3Key} if s3VersionId is not None: copySrc['VersionId'] = s3VersionId # Copy object to new bucket with new key name try: # Prepare result code and string resultCode = None resultString = None # Construct New Key newKey = rename_key(s3Key) newBucket = 'destination-bucket-name' # Copy Object to New Bucket response = s3Client.copy_object( CopySource = copySrc, Bucket = newBucket, Key = newKey ) # Mark as succeeded resultCode = 'Succeeded' resultString = str(response) except ClientError as e: # If request timed out, mark as a temp failure # and S3 Batch Operations will make the task for retry. If # any other exceptions are received, mark as permanent failure. errorCode = e.response['Error']['Code'] errorMessage = e.response['Error']['Message'] if errorCode == 'RequestTimeout': resultCode = 'TemporaryFailure' resultString = 'Retry request to Amazon S3 due to timeout.' else: resultCode = 'PermanentFailure' resultString = '{}: {}'.format(errorCode, errorMessage) except Exception as e: # Catch all exceptions to permanently fail the task resultCode = 'PermanentFailure' resultString = 'Exception: {}'.format(e.message) finally: results.append({ 'taskId': taskId, 'resultCode': resultCode, 'resultString': resultString }) return { 'invocationSchemaVersion': invocationSchemaVersion, 'treatMissingKeysAs': 'PermanentFailure', 'invocationId': invocationId, 'results': results } def rename_key(s3Key): # Rename the key by adding additional suffix return s3Key + '_new_suffix'

创建调用 Lambda 函数的 S3 批处理操作作业

创建 S3 批处理操作作业以调用 Lambda 函数时,必须提供以下信息:

  • Lambda 函数的 ARN(可能包含函数别名或特定版本号)

  • 具有调用此函数的权限的 IAM 角色

  • 操作参数 LambdaInvokeFunction

有关创建 S3 批处理操作作业的更多信息,请参阅创建 S3 批处理操作作业操作

以下示例创建使用 AWS CLI 调用 Lambda 函数的 S3 批处理操作作业。

aws s3control create-job --account-id <AccountID> --operation '{"LambdaInvoke": { "FunctionArn": "arn:aws:lambda:Region:AccountID:function:LambdaFunctionName" } }' --manifest '{"Spec":{"Format":"S3BatchOperations_CSV_20180820","Fields":["Bucket","Key"]},"Location":{"ObjectArn":"arn:aws:s3:::ManifestLocation","ETag":"ManifestETag"}}' --report '{"Bucket":"arn:aws:s3:::awsexamplebucket1","Format":"Report_CSV_20180820","Enabled":true,"Prefix":"ReportPrefix","ReportScope":"AllTasks"}' --priority 2 --role-arn arn:aws:iam::AccountID:role/BatchOperationsRole --region Region --description “Lambda Function"

在 Lambda 清单中提供任务级信息

当您将 AWS Lambda 函数与 S3 批处理操作结合使用时,对于所操作的各个任务/键可能会需要附带额外的数据。例如,您可能希望同时提供源对象键和新的对象键。然后,您的 Lambda 函数可以将源键以新名称复制到新 S3 存储桶。默认情况下,通过 Amazon S3 批处理操作,您可以仅在作业的输入清单中,指定目标存储桶以及源键列表。下文介绍了如何在清单中包含额外的数据,以便运行更复杂的 Lambda 函数。

要在 S3 批处理操作清单中指定各个键的参数以在 Lambda 函数代码中使用,请使用以下 URL 编码的 JSON 格式。key 字段采用类似于 Amazon S3 对象键的方式传递到您的 Lambda 函数。但是,Lambda 函数可以解释它以包含其他值或多个键,如下所示。

注意

清单中 key 字段的最大字符数为 1024。

例 — 清单使用 JSON 字符串替换“Amazon S3 键”

必须向 S3 批处理操作提供 URL 编码的版本。

my-bucket,{"origKey": "object1key", "newKey": "newObject1Key"} my-bucket,{"origKey": "object2key", "newKey": "newObject2Key"} my-bucket,{"origKey": "object3key", "newKey": "newObject3Key"}

例 — URL 编码的清单

必须向 S3 批处理操作提供此 URL 编码的版本。非 URL 编码的版本不起作用。

my-bucket,%7B%22origKey%22%3A%20%22object1key%22%2C%20%22newKey%22%3A%20%22newObject1Key%22%7D my-bucket,%7B%22origKey%22%3A%20%22object2key%22%2C%20%22newKey%22%3A%20%22newObject2Key%22%7D my-bucket,%7B%22origKey%22%3A%20%22object3key%22%2C%20%22newKey%22%3A%20%22newObject3Key%22%7D

例 — 具有清单格式的 Lambda 函数将结果写入作业报告

此 Lambda 函数显示了如何解析编码到 S3 批处理操作清单中的 JSON。

import json from urllib.parse import unquote_plus # This example Lambda function shows how to parse JSON that is encoded into the Amazon S3 batch # operations manifest containing lines like this: # # bucket,encoded-json # bucket,encoded-json # bucket,encoded-json # # For example, if we wanted to send the following JSON to this Lambda function: # # bucket,{"origKey": "object1key", "newKey": "newObject1Key"} # bucket,{"origKey": "object2key", "newKey": "newObject2Key"} # bucket,{"origKey": "object3key", "newKey": "newObject3Key"} # # We would simply URL-encode the JSON like this to create the real manifest to create a batch # operations job with: # # my-bucket,%7B%22origKey%22%3A%20%22object1key%22%2C%20%22newKey%22%3A%20%22newObject1Key%22%7D # my-bucket,%7B%22origKey%22%3A%20%22object2key%22%2C%20%22newKey%22%3A%20%22newObject2Key%22%7D # my-bucket,%7B%22origKey%22%3A%20%22object3key%22%2C%20%22newKey%22%3A%20%22newObject3Key%22%7D # def lambda_handler(event, context): # Parse job parameters from S3 batch operations jobId = event['job']['id'] invocationId = event['invocationId'] invocationSchemaVersion = event['invocationSchemaVersion'] # Prepare results results = [] # S3 batch operations currently only passes a single task at a time in the array of tasks. task = event['tasks'][0] # Extract the task values we might want to use taskId = task['taskId'] s3Key = task['s3Key'] s3VersionId = task['s3VersionId'] s3BucketArn = task['s3BucketArn'] s3BucketName = s3BucketArn.split(':::')[-1] try: # Assume it will succeed for now resultCode = 'Succeeded' resultString = '' # Decode the JSON string that was encoded into the S3 Key value and convert the # resulting string into a JSON structure. s3Key_decoded = unquote_plus(s3Key) keyJson = json.loads(s3Key_decoded) # Extract some values from the JSON that we might want to operate on. In this example # we won't do anything except return the concatenated string as a fake result. newKey = keyJson['newKey'] origKey = keyJson['origKey'] resultString = origKey + " --> " + newKey except Exception as e: # If we run into any exceptions, fail this task so batch operations does not retry it and # return the exception string so we can see the failure message in the final report # created by batch operations. resultCode = 'PermanentFailure' resultString = 'Exception: {}'.format(e) finally: # Send back the results for this task. results.append({ 'taskId': taskId, 'resultCode': resultCode, 'resultString': resultString }) return { 'invocationSchemaVersion': invocationSchemaVersion, 'treatMissingKeysAs': 'PermanentFailure', 'invocationId': invocationId, 'results': results }