使用 Amazon S3 事件通知加速网络爬取 - Amazon Glue
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

使用 Amazon S3 事件通知加速网络爬取

您可以将爬网程序配置为使用 Amazon S3 事件来查找任何更改,而不是列出 Amazon S3 目标中的对象。此功能使用 Amazon S3 事件,通过列出触发事件的子文件夹中的所有文件,而不是列出完整的 Amazon S3 目标,来识别两次网络爬取之间的更改,从而缩短了重新爬网时间。

第一次网络爬取会列出目标中的所有 Amazon S3 对象。第一次成功网络爬取后,重新爬网将依赖于事件。

迁移到基于 Amazon S3 事件的爬网程序的优点包括:

  • 不需要列出目标中的所有对象,而是在添加或删除对象的位置列出特定文件夹,从而重新爬网更快。

  • 在添加或删除对象的位置列出特定文件夹,从而降低总体网络爬取成本。

Amazon S3 事件网络爬取基于爬网程序调度,从 SQS 队列中使用 Amazon S3 事件来运行。Amazon S3 事件可以配置为直接进入 SQS 队列,或者在多个用户需要相同事件的情况下,也可以配置为 SNS 和 SQS 的组合。有关更多信息,请参阅 为 Amazon S3 事件通知设置账户

在事件模式下创建和配置爬网程序之后,第一次网络爬取将在列表模式下通过执行 Amazon S3 目标的完整列表来运行。第一次成功网络爬取后将使用 Amazon S3 事件,以下日志可确认网络爬取的运行:“网络爬取正在使用 Amazon S3 事件来运行。”

创建 Amazon S3 事件网络爬取并更新可能影响网络爬取的爬网程序属性后,网络爬取将在列表模式下运行,并添加以下日志:“网络爬取未在 S3 事件模式下运行”。

为 Amazon S3 事件通知设置账户

本节介绍如何为 Amazon S3 事件通知设置账户,并提供使用脚本或 Amazon Glue 控制台执行此操作的说明。

Prerequisites

完成以下设置任务。请注意,括号中的值引用了脚本中的可配置设置。

  1. 创建 Amazon S3 存储桶 (s3_bucket_name)。

  2. 识别爬网程序目标(folder_name,例如“test1”),即识别的存储桶中的路径。

  3. 准备爬网程序名称 (crawler_name

  4. 准备 SNS 主题名称 (sns_topic_name),该名称可能与爬网程序名称相同。

  5. 准备要运行爬网程序且存在 S3 存储桶的 Amazon 区域 (region)。

  6. 如果使用电子邮件获取 Amazon S3 事件,则可以选择准备电子邮件地址 (subscribing_email)。

限制:

  • Amazon S3 事件爬网程序仅支持单个目标。

  • 不支持私有 VPC 上的 SQS。

  • 不支持 Amazon S3 采样。

  • 爬网程序目标应该是文件夹。

  • 不支持“所有”路径通配符:s3://%

要使用基于 Amazon S3 事件的爬网程序,您应该在 S3 存储桶上启用事件通知,并使用从与 S3 目标相同的前缀中筛选的事件,并存储在 SQS 中。您可以按照演练:为存储桶配置通知中的步骤或使用 用于从目标生成 SQS 和配置 Amazon S3 事件的脚本 通过控制台设置 SQS 和事件通知。

SQS 策略

添加以下 SQS 策略,需要附上爬网程序使用的角色。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "sqs:DeleteMessage", "sqs:GetQueueUrl", "sqs:ListDeadLetterSourceQueues", "sqs:DeleteMessageBatch", "sqs:ReceiveMessage", "sqs:GetQueueAttributes", "sqs:ListQueueTags", "sqs:SetQueueAttributes", "sqs:PurgeQueue" ], "Resource": "*" } ] }

用于从目标生成 SQS 和配置 Amazon S3 事件的脚本

确保满足先决条件后,可以运行以下 Python 脚本来创建 SQS。将可配置设置替换为根据先决条件准备的名称。

注意

运行脚本后,登录到 SQS 控制台以查找创建的 SQS 的 ARN。

Amazon SQS 会设置可见性超时,即 Amazon SQS 阻止其他用户接收并处理消息的一段时间。将可见性超时设置为大致等于网络爬取运行时。

#!venv/bin/python import boto3 import botocore #---------Start : READ ME FIRST ----------------------# # 1. Purpose of this script is to create the SQS, SNS and enable S3 bucket notification. # The following are the operations performed by the scripts: # a. Enable S3 bucket notification to trigger 's3:ObjectCreated:' and 's3:ObjectRemoved:' events. # b. Create SNS topic for fan out. # c. Create SQS queue for saving events which will be consumed by the crawler. # SQS Event Queue ARN will be used to create the crawler after running the script. # 2. This script does not create the crawler. # 3. SNS topic is created to support FAN out of S3 events. If S3 event is also used by another # purpose, SNS topic created by the script can be used. # 1. Creation of bucket is an optional step. # To create a bucket set create_bucket variable to true. # 2. The purpose of crawler_name is to easily locate the SQS/SNS. # crawler_name is used to create SQS and SNS with the same name as crawler. # 3. 'folder_name' is the target of crawl inside the specified bucket 's3_bucket_name' # #---------End : READ ME FIRST ------------------------# #--------------------------------# # Start : Configurable settings # #--------------------------------# #Create region = 'us-west-2' s3_bucket_name = 's3eventtestuswest2' folder_name = "test" crawler_name = "test33S3Event" sns_topic_name = crawler_name sqs_queue_name = sns_topic_name create_bucket = False #-------------------------------# # End : Configurable settings # #-------------------------------# # Define aws clients dev = boto3.session.Session(profile_name='myprofile') boto3.setup_default_session(profile_name='myprofile') s3 = boto3.resource('s3', region_name=region) sns = boto3.client('sns', region_name=region) sqs = boto3.client('sqs', region_name=region) client = boto3.client("sts") account_id = client.get_caller_identity()["Account"] queue_arn = "" def print_error(e): print(e.message + ' RequestId: ' + e.response['ResponseMetadata']['RequestId']) def create_s3_bucket(bucket_name, client): bucket = client.Bucket(bucket_name) try: if not create_bucket: return True response = bucket.create( ACL='private', CreateBucketConfiguration={ 'LocationConstraint': region }, ) return True except botocore.exceptions.ClientError as e: print_error(e) if 'BucketAlreadyOwnedByYou' in e.message: # we own this bucket so continue print('We own the bucket already. Lets continue...') return True return False def create_s3_bucket_folder(bucket_name, client, directory_name): s3.put_object(Bucket=bucket_name, Key=(directory_name + '/')) def set_s3_notification_sns(bucket_name, client, topic_arn): bucket_notification = client.BucketNotification(bucket_name) try: response = bucket_notification.put( NotificationConfiguration={ 'TopicConfigurations': [ { 'Id' : crawler_name, 'TopicArn': topic_arn, 'Events': [ 's3:ObjectCreated:*', 's3:ObjectRemoved:*', ], 'Filter' : {'Key': {'FilterRules': [{'Name': 'prefix', 'Value': folder_name}]}} }, ] } ) return True except botocore.exceptions.ClientError as e: print_error(e) return False def create_sns_topic(topic_name, client): try: response = client.create_topic( Name=topic_name ) return response['TopicArn'] except botocore.exceptions.ClientError as e: print_error(e) return None def set_sns_topic_policy(topic_arn, client, bucket_name): try: response = client.set_topic_attributes( TopicArn=topic_arn, AttributeName='Policy', AttributeValue='''{ "Version": "2008-10-17", "Id": "s3-publish-to-sns", "Statement": [{ "Effect": "Allow", "Principal": { "AWS" : "*" }, "Action": [ "SNS:Publish" ], "Resource": "%s", "Condition": { "StringEquals": { "AWS:SourceAccount": "%s" }, "ArnLike": { "aws:SourceArn": "arn:aws:s3:*:*:%s" } } }] }''' % (topic_arn, account_id, bucket_name) ) return True except botocore.exceptions.ClientError as e: print_error(e) return False def subscribe_to_sns_topic(topic_arn, client, protocol, endpoint): try: response = client.subscribe( TopicArn=topic_arn, Protocol=protocol, Endpoint=endpoint ) return response['SubscriptionArn'] except botocore.exceptions.ClientError as e: print_error(e) return None def create_sqs_queue(queue_name, client): try: response = client.create_queue( QueueName=queue_name, ) return response['QueueUrl'] except botocore.exceptions.ClientError as e: print_error(e) return None def get_sqs_queue_arn(queue_url, client): try: response = client.get_queue_attributes( QueueUrl=queue_url, AttributeNames=[ 'QueueArn', ] ) return response['Attributes']['QueueArn'] except botocore.exceptions.ClientError as e: print_error(e) return None def set_sqs_policy(queue_url, queue_arn, client, topic_arn): try: response = client.set_queue_attributes( QueueUrl=queue_url, Attributes={ 'Policy': '''{ "Version": "2012-10-17", "Id": "AllowSNSPublish", "Statement": [ { "Sid": "AllowSNSPublish01", "Effect": "Allow", "Principal": "*", "Action": "SQS:SendMessage", "Resource": "%s", "Condition": { "ArnEquals": { "aws:SourceArn": "%s" } } } ] }''' % (queue_arn, topic_arn) } ) return True except botocore.exceptions.ClientError as e: print_error(e) return False if __name__ == "__main__": print('Creating S3 bucket %s.' % s3_bucket_name) if create_s3_bucket(s3_bucket_name, s3): print('\nCreating SNS topic %s.' % sns_topic_name) topic_arn = create_sns_topic(sns_topic_name, sns) if topic_arn: print('SNS topic created successfully: %s' % topic_arn) print('Creating SQS queue %s' % sqs_queue_name) queue_url = create_sqs_queue(sqs_queue_name, sqs) if queue_url is not None: print('Subscribing sqs queue with sns.') queue_arn = get_sqs_queue_arn(queue_url, sqs) if queue_arn is not None: if set_sqs_policy(queue_url, queue_arn, sqs, topic_arn): print('Successfully configured queue policy.') subscription_arn = subscribe_to_sns_topic(topic_arn, sns, 'sqs', queue_arn) if subscription_arn is not None: if 'pending confirmation' in subscription_arn: print('Please confirm SNS subscription by visiting the subscribe URL.') else: print('Successfully subscribed SQS queue: ' + queue_arn) else: print('Failed to subscribe SNS') else: print('Failed to set queue policy.') else: print("Failed to get queue arn for %s" % queue_url) # ------------ End subscriptions to SNS topic ----------------- print('\nSetting topic policy to allow s3 bucket %s to publish.' % s3_bucket_name) if set_sns_topic_policy(topic_arn, sns, s3_bucket_name): print('SNS topic policy added successfully.') if set_s3_notification_sns(s3_bucket_name, s3, topic_arn): print('Successfully configured event for S3 bucket %s' % s3_bucket_name) print('Create S3 Event Crawler using SQS ARN %s' % queue_arn) else: print('Failed to configure S3 bucket notification.') else: print('Failed to add SNS topic policy.') else: print('Failed to create SNS topic.')

使用控制台为 Amazon S3 事件通知设置爬网程序

要使用 Amazon Glue 控制台为 Amazon S3 事件通知设置爬网程序,请执行以下操作:

  1. Data stores(数据存储)选择为 Crawler source type(爬网程序源类型)。

  2. Crawl changes identified by Amazon S3 events(网络爬取 Amazon S3 事件识别的更改)选择为 Repeat crawls of S3 data stores(重复对 S3 数据存储进行网络爬取)。

  3. 单击 Next(下一步)。

  4. (可选)为数据存储指定 Connection(连接)。

  5. 指定对文件夹和文件进行网络爬取的 Include path(包含路径)。

  6. Include SQS ARN(包含 SQS ARN)中指定数据存储参数,包括有效的 SQS ARN。

    例如:arn:aws:sqs:region:account:sqs

  7. Include dead-letter SQS ARN(包含死信 SQS ARN)中指定有效的 Amazon 死信 SQS ARN。

    例如:arn:aws:sqs:region:account:deadLetterQueue