使用 Amazon S3 事件通知加速网络爬取 - Amazon Glue
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

使用 Amazon S3 事件通知加速网络爬取

您可以将爬网程序配置为使用 Amazon S3 事件来查找任何更改,而不是列出 Amazon S3 或 Data Catalog 目标中的对象。此功能使用 Amazon S3 事件,通过列出触发事件的子文件夹中的所有文件,而不是列出完整的 Amazon S3 或 Data Catalog 目标,来识别两次网络爬取之间的更改,从而缩短了重新爬网时间。

第一次网络爬取会列出目标中的所有 Amazon S3 对象。第一次成功爬取之后,您可以选择手动重新爬取或按设定计划重新爬取。爬网程序只会列出这些事件中的对象,而不会列出所有对象。

迁移到基于 Amazon S3 事件的爬网程序的优点包括:

  • 不需要列出目标中的所有对象,而是在添加或删除对象的位置列出特定文件夹,从而重新爬网更快。

  • 在添加或删除对象的位置列出特定文件夹,从而降低总体网络爬取成本。

Amazon S3 事件网络爬取基于爬网程序调度,从 SQS 队列中使用 Amazon S3 事件来运行。如果队列中没有事件,则无需支付费用。Amazon S3 事件可以配置为直接进入 SQS 队列,或者在多个用户需要相同事件的情况下,也可以配置为 SNS 和 SQS 的组合。有关更多信息,请参阅为 Amazon S3 事件通知设置账户

在事件模式下创建和配置爬网程序之后,第一次网络爬取将在列表模式下通过执行 Amazon S3 或 Data Catalog 目标的完整列表来运行。第一次成功网络爬取后将使用 Amazon S3 事件,以下日志可确认网络爬取的运行:“网络爬取正在使用 Amazon S3 事件来运行。”

创建 Amazon S3 事件网络爬取并更新可能影响网络爬取的爬网程序属性后,网络爬取将在列表模式下运行,并添加以下日志:“网络爬取未在 S3 事件模式下运行”。

目录目标

当目标为 Data Catalog 时,爬网程序会利用更改(例如,表中的额外分区)更新 Data Catalog 中的现有表。

为 Amazon S3 事件通知设置账户

本节介绍如何为 Amazon S3 事件通知设置账户,并提供使用脚本或 Amazon Glue 控制台执行此操作的说明。

先决条件

完成以下设置任务。请注意,括号中的值引用了脚本中的可配置设置。

  1. 创建 Amazon S3 存储桶 (s3_bucket_name)。

  2. 识别爬网程序目标(folder_name,例如“test1”),即识别的存储桶中的路径。

  3. 准备爬网程序名称 (crawler_name

  4. 准备 SNS 主题名称 (sns_topic_name),该名称可能与爬网程序名称相同。

  5. 准备要运行爬网程序且存在 S3 存储桶的 Amazon 区域 (region)。

  6. 如果使用电子邮件获取 Amazon S3 事件,则可以选择准备电子邮件地址 (subscribing_email)。

限制:

  • 无论是 Amazon S3 还是 Data Catalog 目标,爬网程序仅支持单个目标。

  • 不支持私有 VPC 上的 SQS。

  • 不支持 Amazon S3 采样。

  • 爬网程序目标应为 Amazon S3 目标的文件夹,或者 Data Catalog 目标的一个或多个 Amazon Glue Data Catalog 表。

  • 不支持“所有”路径通配符:s3://%

  • 对于 Data Catalog 目标,所有目录表都应指向 Amazon S3 事件模式的同一 Amazon S3 存储桶。

  • 对于 Data Catalog 目标,目录表不应指向 Delta Lake 格式的 Amazon S3 位置(包含 _symlink 文件夹或检查目录表的 InputFormat)。

要使用基于 Amazon S3 事件的爬网程序,您应该在 S3 存储桶上启用事件通知,并使用从与 S3 目标相同的前缀中筛选的事件,并存储在 SQS 中。您可以按照演练:为存储桶配置通知中的步骤或使用 用于从目标生成 SQS 和配置 Amazon S3 事件的脚本 通过控制台设置 SQS 和事件通知。

SQS 策略

添加以下 SQS 策略,需要附上爬网程序使用的角色。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "sqs:DeleteMessage", "sqs:GetQueueUrl", "sqs:ListDeadLetterSourceQueues", "sqs:DeleteMessageBatch", "sqs:ReceiveMessage", "sqs:GetQueueAttributes", "sqs:ListQueueTags", "sqs:SetQueueAttributes", "sqs:PurgeQueue" ], "Resource": "*" } ] }

用于从目标生成 SQS 和配置 Amazon S3 事件的脚本

确保满足先决条件后,可以运行以下 Python 脚本来创建 SQS。将可配置设置替换为根据先决条件准备的名称。

注意

运行脚本后,登录到 SQS 控制台以查找创建的 SQS 的 ARN。

Amazon SQS 会设置可见性超时,即 Amazon SQS 阻止其他用户接收并处理消息的一段时间。将可见性超时设置为大致等于网络爬取运行时。

#!venv/bin/python import boto3 import botocore #---------Start : READ ME FIRST ----------------------# # 1. Purpose of this script is to create the SQS, SNS and enable S3 bucket notification. # The following are the operations performed by the scripts: # a. Enable S3 bucket notification to trigger 's3:ObjectCreated:' and 's3:ObjectRemoved:' events. # b. Create SNS topic for fan out. # c. Create SQS queue for saving events which will be consumed by the crawler. # SQS Event Queue ARN will be used to create the crawler after running the script. # 2. This script does not create the crawler. # 3. SNS topic is created to support FAN out of S3 events. If S3 event is also used by another # purpose, SNS topic created by the script can be used. # 1. Creation of bucket is an optional step. # To create a bucket set create_bucket variable to true. # 2. The purpose of crawler_name is to easily locate the SQS/SNS. # crawler_name is used to create SQS and SNS with the same name as crawler. # 3. 'folder_name' is the target of crawl inside the specified bucket 's3_bucket_name' # #---------End : READ ME FIRST ------------------------# #--------------------------------# # Start : Configurable settings # #--------------------------------# #Create region = 'us-west-2' s3_bucket_name = 's3eventtestuswest2' folder_name = "test" crawler_name = "test33S3Event" sns_topic_name = crawler_name sqs_queue_name = sns_topic_name create_bucket = False #-------------------------------# # End : Configurable settings # #-------------------------------# # Define aws clients dev = boto3.session.Session(profile_name='myprofile') boto3.setup_default_session(profile_name='myprofile') s3 = boto3.resource('s3', region_name=region) sns = boto3.client('sns', region_name=region) sqs = boto3.client('sqs', region_name=region) client = boto3.client("sts") account_id = client.get_caller_identity()["Account"] queue_arn = "" def print_error(e): print(e.message + ' RequestId: ' + e.response['ResponseMetadata']['RequestId']) def create_s3_bucket(bucket_name, client): bucket = client.Bucket(bucket_name) try: if not create_bucket: return True response = bucket.create( ACL='private', CreateBucketConfiguration={ 'LocationConstraint': region }, ) return True except botocore.exceptions.ClientError as e: print_error(e) if 'BucketAlreadyOwnedByYou' in e.message: # we own this bucket so continue print('We own the bucket already. Lets continue...') return True return False def create_s3_bucket_folder(bucket_name, client, directory_name): s3.put_object(Bucket=bucket_name, Key=(directory_name + '/')) def set_s3_notification_sns(bucket_name, client, topic_arn): bucket_notification = client.BucketNotification(bucket_name) try: response = bucket_notification.put( NotificationConfiguration={ 'TopicConfigurations': [ { 'Id' : crawler_name, 'TopicArn': topic_arn, 'Events': [ 's3:ObjectCreated:*', 's3:ObjectRemoved:*', ], 'Filter' : {'Key': {'FilterRules': [{'Name': 'prefix', 'Value': folder_name}]}} }, ] } ) return True except botocore.exceptions.ClientError as e: print_error(e) return False def create_sns_topic(topic_name, client): try: response = client.create_topic( Name=topic_name ) return response['TopicArn'] except botocore.exceptions.ClientError as e: print_error(e) return None def set_sns_topic_policy(topic_arn, client, bucket_name): try: response = client.set_topic_attributes( TopicArn=topic_arn, AttributeName='Policy', AttributeValue='''{ "Version": "2008-10-17", "Id": "s3-publish-to-sns", "Statement": [{ "Effect": "Allow", "Principal": { "AWS" : "*" }, "Action": [ "SNS:Publish" ], "Resource": "%s", "Condition": { "StringEquals": { "AWS:SourceAccount": "%s" }, "ArnLike": { "aws:SourceArn": "arn:aws:s3:*:*:%s" } } }] }''' % (topic_arn, account_id, bucket_name) ) return True except botocore.exceptions.ClientError as e: print_error(e) return False def subscribe_to_sns_topic(topic_arn, client, protocol, endpoint): try: response = client.subscribe( TopicArn=topic_arn, Protocol=protocol, Endpoint=endpoint ) return response['SubscriptionArn'] except botocore.exceptions.ClientError as e: print_error(e) return None def create_sqs_queue(queue_name, client): try: response = client.create_queue( QueueName=queue_name, ) return response['QueueUrl'] except botocore.exceptions.ClientError as e: print_error(e) return None def get_sqs_queue_arn(queue_url, client): try: response = client.get_queue_attributes( QueueUrl=queue_url, AttributeNames=[ 'QueueArn', ] ) return response['Attributes']['QueueArn'] except botocore.exceptions.ClientError as e: print_error(e) return None def set_sqs_policy(queue_url, queue_arn, client, topic_arn): try: response = client.set_queue_attributes( QueueUrl=queue_url, Attributes={ 'Policy': '''{ "Version": "2012-10-17", "Id": "AllowSNSPublish", "Statement": [ { "Sid": "AllowSNSPublish01", "Effect": "Allow", "Principal": "*", "Action": "SQS:SendMessage", "Resource": "%s", "Condition": { "ArnEquals": { "aws:SourceArn": "%s" } } } ] }''' % (queue_arn, topic_arn) } ) return True except botocore.exceptions.ClientError as e: print_error(e) return False if __name__ == "__main__": print('Creating S3 bucket %s.' % s3_bucket_name) if create_s3_bucket(s3_bucket_name, s3): print('\nCreating SNS topic %s.' % sns_topic_name) topic_arn = create_sns_topic(sns_topic_name, sns) if topic_arn: print('SNS topic created successfully: %s' % topic_arn) print('Creating SQS queue %s' % sqs_queue_name) queue_url = create_sqs_queue(sqs_queue_name, sqs) if queue_url is not None: print('Subscribing sqs queue with sns.') queue_arn = get_sqs_queue_arn(queue_url, sqs) if queue_arn is not None: if set_sqs_policy(queue_url, queue_arn, sqs, topic_arn): print('Successfully configured queue policy.') subscription_arn = subscribe_to_sns_topic(topic_arn, sns, 'sqs', queue_arn) if subscription_arn is not None: if 'pending confirmation' in subscription_arn: print('Please confirm SNS subscription by visiting the subscribe URL.') else: print('Successfully subscribed SQS queue: ' + queue_arn) else: print('Failed to subscribe SNS') else: print('Failed to set queue policy.') else: print("Failed to get queue arn for %s" % queue_url) # ------------ End subscriptions to SNS topic ----------------- print('\nSetting topic policy to allow s3 bucket %s to publish.' % s3_bucket_name) if set_sns_topic_policy(topic_arn, sns, s3_bucket_name): print('SNS topic policy added successfully.') if set_s3_notification_sns(s3_bucket_name, s3, topic_arn): print('Successfully configured event for S3 bucket %s' % s3_bucket_name) print('Create S3 Event Crawler using SQS ARN %s' % queue_arn) else: print('Failed to configure S3 bucket notification.') else: print('Failed to add SNS topic policy.') else: print('Failed to create SNS topic.')

使用控制台为 Amazon S3 事件通知设置爬网程序(Amazon S3 目标)

要针对 Amazon S3 目标使用 Amazon Glue 控制台为 Amazon S3 事件通知设置爬网程序,请执行以下操作:

  1. 设置爬网程序属性。有关更多信息,请参阅在 Amazon Glue 控制台上设置爬网程序配置选项

  2. Data source configuration(数据来源配置)部分中,系统将询问您 Is your data already mapped to Amazon Glue tables?(您的数据是否已映射到 Amazon Glue 表?)

    默认情况下已选择 Not yet(尚未)。请将其保留为默认值,这是因为您使用的是 Amazon S3 数据来源,而该数据尚未映射到 Amazon Glue 表。

  3. Data sources(数据来源)部分中,选择 Add a data source(添加数据来源)。

  4. Add data source(添加数据来源)模态中,配置 Amazon S3 数据来源:

    • Data source(数据来源):默认选择 Amazon S3。

    • Network connection(网络连接)(可选):选择 Add new connection(添加新连接)。

    • Location of Amazon S3 data(Amazon S3 数据位置):默认选择 In this account(此账户中)。

    • Amazon S3 path(Amazon S3 路径):指定在其中爬取文件夹和文件的 Amazon S3 路径。

    • Subsequent crawler runs(后续爬网程序运行):选择 Crawl based on events(基于事件爬取)以对爬网程序使用 Amazon S3 事件通知。

    • Include SQS ARN(包含 SQS ARN):指定数据存储参数,包括有效的 SQS ARN。(例如,arn:aws:sqs:region:account:sqs)。

    • Include dead-letter SQS ARN(包含死信 SQS ARN)(可选):指定有效的 Amazon 死信 SQS ARN。(例如,arn:aws:sqs:region:account:deadLetterQueue)。

    • 选择 Add an Amazon S3 data source(添加 Amazon S3 数据来源)。

使用控制台为 Amazon S3 事件通知设置爬网程序(Data Catalog 目标)

当您有目录目标时,请使用 Amazon Glue 控制台为 Amazon S3 事件通知设置爬网程序:

  1. 设置爬网程序属性。有关更多信息,请参阅在 Amazon Glue 控制台上设置爬网程序配置选项

  2. Data source configuration(数据来源配置)部分中,系统将询问您 Is your data already mapped to Amazon Glue tables?(您的数据是否已映射到 Amazon Glue 表?)

    选择 Yes(是),从 Data Catalog 中选择现有表作为数据来源。

  3. Glue tables(Glue 表)部分中,选择 Add tables(添加表)。

  4. Add table(添加表)模式中,配置数据库和表:

    • Network connection(网络连接)(可选):选择 Add new connection(添加新连接)。

    • Database(数据库):在 Data Catalog 中选择数据库。

    • Tables(表):在 Data Catalog 中选择该数据库中的一个或多个表。

    • Subsequent crawler runs(后续爬网程序运行):选择 Crawl based on events(基于事件爬取)以对爬网程序使用 Amazon S3 事件通知。

    • Include SQS ARN(包含 SQS ARN):指定数据存储参数,包括有效的 SQS ARN。(例如,arn:aws:sqs:region:account:sqs)。

    • Include dead-letter SQS ARN(包含死信 SQS ARN)(可选):指定有效的 Amazon 死信 SQS ARN。(例如,arn:aws:sqs:region:account:deadLetterQueue)。

    • 选择 Confirm(确认)。