Amazon Simple Storage Service
开发人员指南 (API 版本 2006-03-01)
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

从 Amazon S3 批量操作调用 Lambda 函数

Amazon S3 批处理操作 可以调用 AWS Lambda 函数来对清单中列出的对象执行自定义操作。本部分介绍如何创建 Lambda 函数来与 Amazon S3 批处理操作 结合使用,以及如何创建作业来调用此函数。Amazon S3 批处理操作 作业使用 LambdaInvoke 操作在清单中列出的各个对象上运行 Lambda 函数。

您可以使用 AWS 管理控制台、AWS Command Line Interface (AWS CLI)、AWS 开发工具包或 REST API 来为 Lambda 使用 Amazon S3 批处理操作。有关使用 Lambda 的更多信息,请参阅 AWS Lambda Developer Guide 中的 AWS Lambda 入门

以下部分说明如何开始将 Lambda 与 Amazon S3 批处理操作 结合使用。

将 Lambda 与 Amazon S3 批量操作结合使用

在将 Amazon S3 批处理操作 与 AWS Lambda 结合使用时,必须专门为与 Amazon S3 批处理操作 结合使用创建新 Lambda 函数。不能重复地将现有的基于事件的 Amazon S3 函数与 Amazon S3 批处理操作 结合使用。事件函数只能接收消息;不返回消息。与 Amazon S3 批处理操作 结合使用的 Lambda 函数必须接受并返回消息。有关将 Lambda 与 Amazon S3 事件结合使用的更多信息,请参阅 AWS Lambda Developer Guide 中的配合使用 AWS Lambda 和 Amazon S3

您创建调用 Lambda 函数的 Amazon S3 批处理操作 作业。此作业在清单中列出的所有对象上运行相同的 Lambda 函数。您可以在处理清单中的对象时控制要使用的 Lambda 函数的版本。Amazon S3 批处理操作 支持非限定的 Amazon 资源名称 (ARN)、别名和特定版本。有关更多信息,请参阅 AWS Lambda Developer Guide 中的 AWS Lambda 版本控制简介

如果您提供包含使用别名的函数 ARN 的 Amazon S3 批处理操作 作业或 $LATEST 限定符,并更新它们各自所指向的版本,Amazon S3 批处理操作 将开始调用 Lambda 函数的新版本。如果您希望通过大型作业更新此方法的功能部分,这会非常有用。如果不您希望 Amazon S3 批处理操作 更改所使用的版本,请在创建作业时在 FunctionARN 参数中提供特定版本。

响应和结果代码

Amazon S3 批处理操作 可以从 Lambda 函数获得两个代码级别。第一个是整个请求的响应代码,第二个是每个任务的结果代码。下表包含响应代码。

响应代码 描述
Succeeded 任务正常完成。如果您请求了作业完成报告,报告中将包含任务的结果字符串。
TemporaryFailure 任务暂时失败,将在作业完成前重新启动。忽略结果字符串。如果是最后一次重新启动,最终报告将包含错误消息。
PermanentFailure 任务永久失败。如果您请求了作业完成报告,任务将被标记为 Failed 并包含错误消息字符串。忽略失败任务的结果字符串。

创建 Lambda 函数来与 Amazon S3 批量操作结合使用

此部分提供使用 Lambda 函数必须具有的 AWS Identity and Access Management (IAM) 权限示例。其中还包含与 Amazon S3 批处理操作 结合使用的示例 Lambda 函数。如果您之前从未创建 Lambda 函数,请参阅 AWS Lambda Developer Guide 中的教程:结合使用 AWS Lambda 和 Amazon S3

您必须专门创建要与 Amazon S3 批处理操作 结合使用的 Lambda 函数。不能重复使用现有的 Amazon S3 基于事件的 Lambda 函数。这是用于 Amazon S3 批处理操作 的 Lambda 函数必须接受并返回专用数据字段。

IAM 权限示例

以下是将 Lambda 函数与 Amazon S3 批处理操作 结合使用所必须具有的 IAM 权限的示例。

例 — Amazon S3 批量操作信任策略

以下是您可用于执行角色的信任策略示例。它提供 Lambda 权限以执行由 Amazon S3 批处理操作 作业调用的函数。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "batchoperations.s3.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

例 — Lambda IAM 策略

以下是提供 Amazon S3 批处理操作 权限以调用 Lambda 函数和读取输入清单的 IAM 策略示例。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "BatchOperationsLambdaPolicy", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion", "s3:PutObject", "lambda:InvokeFunction" ], "Resource": "*" } ] }

示例请求和响应

本节内容提供 Lambda 函数的请求和响应示例。

例 请求

以下是 Lambda 函数的请求的 JSON 示例。

{ "invocationSchemaVersion": "1.0", "invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", "job": { "id": "f3cc4f60-61f6-4a2b-8a21-d07600c373ce" }, "tasks": [ { "taskId": "dGFza2lkZ29lc2hlcmUK", "s3Key": "customerImage1.jpg", "s3VersionId": "1", "s3BucketArn": "arn:aws:s3:us-east-1:0123456788:awsexamplebucket" } ] }

例 响应

以下是 Lambda 函数的响应的 JSON 示例。

{ "invocationSchemaVersion": "1.0", "treatMissingKeysAs" : "PermanentFailure", "invocationId" : "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", "results": [ { "taskId": "dGFza2lkZ29lc2hlcmUK", "resultCode": "Succeeded", "resultString": "[\"Mary Major", \"John Stiles\"]" } ] }

Amazon S3 批量操作的示例 Lambda 函数

下面的示例 Python Lambda 函数循环访问清单、复制和重命名每个对象。

如示例所示,Amazon S3 批处理操作中的键采用 URL 编码。要将 Amazon S3 与其他 AWS 服务结合使用,请确保对从 Amazon S3 批处理操作传递的键进行 URL 解码。

import boto3 import urllib from botocore.exceptions import ClientError def lambda_handler(event, context): # Instantiate boto client s3Client = boto3.client('s3') # Parse job parameters from Amazon S3 batch operations jobId = event['job']['id'] invocationId = event['invocationId'] invocationSchemaVersion = event['invocationSchemaVersion'] # Prepare results results = [] # Parse Amazon S3 Key, Key Version, and Bucket ARN taskId = event['tasks'][0]['taskId'] s3Key = urllib.unquote(event['tasks'][0]['s3Key']).decode('utf8') s3VersionId = event['tasks'][0]['s3VersionId'] s3BucketArn = event['tasks'][0]['s3BucketArn'] s3Bucket = s3BucketArn.split(':::')[-1] # Construct CopySource with VersionId copySrc = {'Bucket': s3Bucket, 'Key': s3Key} if s3VersionId is not None: copySrc['VersionId'] = s3VersionId # Copy object to new bucket with new key name try: # Prepare result code and string resultCode = None resultString = None # Construct New Key newKey = rename_key(s3Key) newBucket = 'destination-bucket-name' # Copy Object to New Bucket response = s3Client.copy_object( CopySource = copySrc, Bucket = newBucket, Key = newKey ) # Mark as succeeded resultCode = 'Succeeded' resultString = str(response) except ClientError as e: # If request timed out, mark as a temp failure # and Amason S3 batch operations will make the task for retry. If # any other exceptions are received, mark as permanent failure. errorCode = e.response['Error']['Code'] errorMessage = e.response['Error']['Message'] if errorCode == 'RequestTimeout': resultCode = 'TemporaryFailure' resultString = 'Retry request to Amazon S3 due to timeout.' else: resultCode = 'PermanentFailure' resultString = '{}: {}'.format(errorCode, errorMessage) except Exception as e: # Catch all exceptions to permanently fail the task resultCode = 'PermanentFailure' resultString = 'Exception: {}'.format(e.message) finally: results.append({ 'taskId': taskId, 'resultCode': resultCode, 'resultString': resultString }) return { 'invocationSchemaVersion': invocationSchemaVersion, 'treatMissingKeysAs': 'PermanentFailure', 'invocationId': invocationId, 'results': results } def rename_key(s3Key): # Rename the key by adding additional suffix return s3Key + '_new_suffix'

创建调用 Lambda 函数的 Amazon S3 批量操作作业

创建用于调用 Lambda 函数的 Amazon S3 批处理操作 作业时,必须提供以下信息:

  • Lambda 函数的 ARN(其中可以包含函数别名或特定版本号)

  • 具有调用此函数的权限的 IAM 角色

  • 操作参数 LambdaInvokeFunction

有关创建 Amazon S3 批处理操作 作业的更多信息,请参阅 创建 Amazon S3 批量操作作业操作

以下示例创建使用 AWS CLI 调用 Lambda 函数的 Amazon S3 批处理操作 作业。

aws s3control create-job --account-id <AccountID> --operation '{"LambdaInvoke": { "FunctionArn": "arn:aws:lambda:Region:AccountID:function:LambdaFunctionName" } }' --manifest '{"Spec":{"Format":"S3BatchOperations_CSV_20180820","Fields":["Bucket","Key"]},"Location":{"ObjectArn":"arn:aws:s3:::ManifestLocation","ETag":"ManifestETag"}}' --report '{"Bucket":"arn:aws:s3:::awsexamplebucket","Format":"Report_CSV_20180820","Enabled":true,"Prefix":"ReportPrefix","ReportScope":"AllTasks"}' --priority 2 --role-arn arn:aws:iam::AccountID:role/BatchOperationsRole --region Region --description “Lambda Function"

在 Lambda 清单中提供任务级别信息

当您将 AWS Lambda 函数与 Amazon S3 批量操作结合使用时,对于所操作的各个任务/键可能会需要附带额外的数据。例如,您可能希望同时提供源对象键和新的对象键。然后,您的 Lambda 函数可以将源键以新名称复制到新 S3 存储桶。默认情况下,通过 Amazon S3 批量操作,您可以仅在作业的输入清单中,指定目标存储桶以及源键列表。下文介绍了如何在清单中包含额外的数据,以便运行更复杂的 Lambda 函数。

要在 Amazon S3 批量操作清单中指定各个键的参数以在 Lambda 函数代码中使用,请使用以下 URL 编码的 JSON 格式。key 字段采用类似于 Amazon S3 对象键的方式传递到您的 Lambda 函数。但是,Lambda 函数可以解释它以包含其他值或多个键,如下所示。

注意

清单中 key 字段的最大字符数为 1024。

例 — 清单使用 JSON 字符串替换“Amazon S3 键”

必须向 Amazon S3 批量操作提供 URL 编码的版本。

my-bucket,{"origKey": "object1key", "newKey": "newObject1Key"} my-bucket,{"origKey": "object2key", "newKey": "newObject2Key"} my-bucket,{"origKey": "object3key", "newKey": "newObject3Key"}

例 — 清单 URL 编码

必须向 Amazon S3 批量操作提供此 URL 编码的版本。非 URL 编码的版本不起作用。

my-bucket,%7B%22origKey%22%3A%20%22object1key%22%2C%20%22newKey%22%3A%20%22newObject1Key%22%7D my-bucket,%7B%22origKey%22%3A%20%22object2key%22%2C%20%22newKey%22%3A%20%22newObject2Key%22%7D my-bucket,%7B%22origKey%22%3A%20%22object3key%22%2C%20%22newKey%22%3A%20%22newObject3Key%22%7D

例 — 具有清单格式的 Lambda 函数将结果写入作业报告

import json from urllib.parse import unquote_plus # This example Lambda function shows how to parse JSON that is encoded into the Amazon S3 batch # operations manifest containing lines like this: # # bucket,encoded-json # bucket,encoded-json # bucket,encoded-json # # For example, if we wanted to send the following JSON to this Lambda function: # # bucket,{"origKey": "object1key", "newKey": "newObject1Key"} # bucket,{"origKey": "object2key", "newKey": "newObject2Key"} # bucket,{"origKey": "object3key", "newKey": "newObject3Key"} # # We would simply URL-encode the JSON like this to create the real manifest to create a batch # operations job with: # # my-bucket,%7B%22origKey%22%3A%20%22object1key%22%2C%20%22newKey%22%3A%20%22newObject1Key%22%7D # my-bucket,%7B%22origKey%22%3A%20%22object2key%22%2C%20%22newKey%22%3A%20%22newObject2Key%22%7D # my-bucket,%7B%22origKey%22%3A%20%22object3key%22%2C%20%22newKey%22%3A%20%22newObject3Key%22%7D # def lambda_handler(event, context): # Parse job parameters from Amazon S3 batch operations jobId = event['job']['id'] invocationId = event['invocationId'] invocationSchemaVersion = event['invocationSchemaVersion'] # Prepare results results = [] # S3 batch operations currently only passes a single task at a time in the array of tasks. task = event['tasks'][0] # Extract the task values we might want to use taskId = task['taskId'] s3Key = task['s3Key'] s3VersionId = task['s3VersionId'] s3BucketArn = task['s3BucketArn'] s3BucketName = s3BucketArn.split(':::')[-1] try: # Assume it will succeed for now resultCode = 'Succeeded' resultString = '' # Decode the JSON string that was encoded into the S3 Key value and convert the # resulting string into a JSON structure. s3Key_decoded = unquote_plus(s3Key) keyJson = json.loads(s3Key_decoded) # Extract some values from the JSON that we might want to operate on. In this example # we won't do anything except return the concatenated string as a fake result. newKey = keyJson['newKey'] origKey = keyJson['origKey'] resultString = origKey + " --> " + newKey except Exception as e: # If we run into any exceptions, fail this task so batch operations does retry it and # return the exception string so we can see the failure message in the final report # created by batch operations. resultCode = 'PermanentFailure' resultString = 'Exception: {}'.format(e) finally: # Send back the results for this task. results.append({ 'taskId': taskId, 'resultCode': resultCode, 'resultString': resultString }) return { 'invocationSchemaVersion': invocationSchemaVersion, 'treatMissingKeysAs': 'PermanentFailure', 'invocationId': invocationId, 'results': results }