将 OpenSearch Ingestion 管道与 Amazon S3 结合使用 - 亚马逊 OpenSearch 服务
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将 OpenSearch Ingestion 管道与 Amazon S3 结合使用

使用 OpenSearch Ingestion,您可以将 Amazon S3 用作源或目标。使用 Amazon S3 作为源时,将向 OpenSearch 摄取管道发送数据。使用 Amazon S3 作为目标时,将数据从 OpenSearch 摄取管道写入一个或多个 S3 存储桶。

Amazon S3 作为源

您可以通过两种方式使用 Amazon S3 作为源处理数据:S3-SQS 处理计划扫描

如果您需要在文件写入 S3 后近实时扫描文件,请使用 S3-SQS 处理。您可以配置 Amazon S3 存储桶,在存储桶中存储或修改对象时随时触发事件。使用一次性扫描或定期计划扫描批处理 S3 存储桶中的数据。

先决条件

要使用 Amazon S3 作为 OpenSearch 摄取管道源来执行计划扫描或 S3-SQS 处理,请先创建 S3 存储桶

注意

如果 OpenSearch Ingestion 管道中作为源的 S3 存储桶位于其他 Amazon Web Services 账户,则还需要对存储桶启用跨账户读取权限。这样管道将可读取和处理数据。要启用跨账户权限,请参阅 Amazon S3 用户指南中的存储桶拥有者授予跨账户存储桶权限

如果您的 S3 存储桶位于多个账户中,请使用 bucket_owners 映射。有关示例,请参阅 OpenSearch文档中的 Cro ss-account S3 访问

要设置 S3-SQS 处理,还需要执行以下步骤:

  1. 创建 Amazon SQS 队列

  2. 在以 SQS 队列为目标的 S3 存储桶上启用事件通知

步骤 1:配置管道角色

与其他将数据推送到管道的源插件不同,S3 源插件采用基于读取的架构,管道从源中拉取数据。

因此,为使管道能够从 S3 读取,必须在管道的 S3 源配置中指定一个可以同时访问 S3 存储桶和 Amazon SQS 队列的角色。管道将担任此角色,以便从队列中读取数据。

注意

在 S3 源配置中指定的角色必须是管道角色。因此,管道角色必须包含两个单独的权限策略,一个用于写入接收器,另一个用于从 S3 源中拉取。您必须在所有管道组件中使用相同的 sts_role_arn

以下示例策略显示了使用 S3 作为源所需的权限:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action":[ "s3:ListBucket", "s3:GetBucketLocation", "s3:GetObject" ], "Resource": "arn:aws:s3:::bucket-name/*" }, { "Effect":"Allow", "Action":"s3:ListAllMyBuckets", "Resource":"arn:aws:s3:::*" }, { "Effect": "Allow", "Action": [ "sqs:DeleteMessage", "sqs:ReceiveMessage", "sqs:ChangeMessageVisibility" ], "Resource": "arn:aws:sqs:us-west-2:account-id:MyS3EventSqsQueue" } ] }

必须将以下权限附加到在 S3 源插件配置的 sts_role_arn 选项中指定的 IAM 角色:

version: "2" source: s3: ... aws: ... processor: ... sink: - opensearch: ...

步骤 2:创建管道

设置权限后,您可以根据 Amazon S3 用例配置 OpenSearch 摄取管道。

S3-SQS 处理

要设置 S3-SQS 处理,请配置您的管道,指定 S3 作为源并设置 Amazon SQS 通知:

version: "2" s3-pipeline: source: s3: notification_type: "sqs" codec: newline: null sqs: queue_url: "https://sqs.us-east-1amazonaws.com/account-id/ingestion-queue" compression: "none" aws: region: "region" processor: - grok: match: message: - "%{COMMONAPACHELOG}" - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint.us-east-1es.amazonaws.com"] index: "index-name" aws: region: "region"

如果您在处理 Amazon S3 上的小文件时发现 CPU 利用率较低,可考虑通过修改 workers 选项的值来提高吞吐量。有关更多信息,请参阅 S3 plugin configuration options

计划扫描

要设置计划扫描,请使用适用于所有 S3 存储桶的扫描级别或存储桶级别的计划来配置管道。存储桶级别计划或扫描间隔配置始终覆盖扫描级别配置。

您可以使用一次性扫描(非常适合数据迁移)或定期扫描(非常适合批处理)配置计划扫描。

要将管道配置为从 Amazon S3 读取,请使用预先配置的 Amazon S3 蓝图。您可以编辑管道配置的 scan 部分以满足计划需求。有关更多信息,请参阅 处理蓝图

一次性扫描

一次性计划扫描运行一次。在管道配置中,您可以使用 end_time and 指定希望何时扫描存储桶中的对象。start_time或者,您也可以使用 range 指定相对于当前时间的时间间隔,以该时间间隔扫描存储桶中的对象。

例如,范围设置为 PT4H 将扫描最近四个小时内创建的所有文件。要配置再次运行一次性扫描,必须先停止管道,然后再重新启动。如果未配置范围,则还必须更新开始时间和结束时间。

以下配置为所有存储桶及这些存储桶中的所有对象设置一次性扫描:

version: "2" log-pipeline: source: s3: codec: csv: compression: "none" aws: region: "region" acknowledgments: true scan: buckets: - bucket: name: my-bucket filter: include_prefix: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 key_prefix: include: - Objects2/ exclude_suffix: - .jpeg - .png delete_s3_objects_on_read: false processor: - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint.us-east-1es.amazonaws.com"] index: "index-name" aws: region: "region" dlq: s3: bucket: "dlq-bucket" region: "us-east-1"

以下配置设置在指定时段内对所有存储桶运行一次性扫描。这意味着 S3 仅处理创建时间在此时段内的对象。

scan: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z buckets: - bucket: name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png

以下配置设置扫描级别和存储桶级别一次性扫描。存储桶级别开始时间和结束时间将覆盖扫描级别开始时间和结束时间。

scan: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z buckets: - bucket: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png

停止管道会移除对管道在停止之前所扫描对象的任何先前引用。如果停止单个扫描管道,则将在启动后重新扫描所有对象,即使这些已被扫描。如果您需要停止单个扫描管道,建议在重新启动管道之前更改时间段。

如果您需要按开始时间和结束时间筛选对象,则停止和启动管道是唯一的选择。如果您不需要按开始时间和结束时间进行筛选,则可以按名称筛选对象。按名称筛选对象不需要停止和启动管道。要执行此操作,请使用 include_prefixexclude_suffix

定期扫描

定期计划扫描按定期计划时间间隔对您指定的 S3 存储桶运行扫描。只能在扫描级别配置间隔,因为不支持单独执行存储桶级别配置。

在管道配置中,将interval指定定期扫描频率,范围介于 30 秒到 365 天之间。始终在创建管道时运行首次扫描。count 定义扫描实例总数。

以下配置设置定期扫描,两次扫描之间的延迟为 12 小时:

scan: scheduling: interval: PT12H count: 4 buckets: - bucket: name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png

Amazon S3 作为目标

要将数据从 OpenSearch Ingestion 管道写入 S3 存储桶,请使用预先配置的 S3 蓝图创建具有一个 S3 接收器的管道。此管道将选择性数据路由到 OpenSearch 接收器,同时将所有数据发送到 S3 中进行存档。有关更多信息,请参阅 处理蓝图

创建 S3 接收器时,您可以从各种不同的接收器编解码器指定首选格式。例如,如果要以列式格式写入数据,请选择 Parquet 或 Avro 编解码器。如果您更喜欢基于行的格式,请选择 JSON 或 NDJSON。要将数据写入指定架构中的 S3,您还可以使用 Avro 格式在接收器编解码器中定义内联架构。

以下示例在 S3 接收器中定义内联架构:

- s3: codec: parquet: schema: > { "type" : "record", "namespace" : "org.vpcFlowLog.examples", "name" : "VpcFlowLog", "fields" : [ { "name" : "version", "type" : "string"}, { "name" : "srcport", "type": "int"}, { "name" : "dstport", "type": "int"}, { "name" : "start", "type": "int"}, { "name" : "end", "type": "int"}, { "name" : "protocol", "type": "int"}, { "name" : "packets", "type": "int"}, { "name" : "bytes", "type": "int"}, { "name" : "action", "type": "string"}, { "name" : "logStatus", "type" : "string"} ] }

定义此架构时,请指定管道向接收器发送的不同类型事件中可能存在的所有键的超集。

例如,如果事件可能缺少键,则在架构中添加值为 null 的键。Null 值声明允许架构处理非统一数据(一些事件具有这些键,另一些事件则没有)。当传入事件确实存在这些键时,则将键值写入接收器。

此架构定义充当筛选器,仅允许将定义的键发送到接收器,并从传入事件中删除未定义的键。

您也可以在接收器中使用 include_keysexclude_keys 筛选路由到其他接收器的数据。两个筛选器互斥,因此在架构中一次只能使用一个筛选器。此外,不能在用户定义的架构中使用它们。

要使用上述筛选条件创建管道,请使用预先配置的接收器筛选条件蓝图。有关更多信息,请参阅 处理蓝图

将 Amazon S3 跨账户作为源

您可以向 Amazon S3 账户授予跨账户访问权限,从而让 OpenSearch Ingestion 管道能够对用作源的其他账户中的 S3 存储桶进行访问。要启用跨账户访问,请参阅《Amazon S3 用户指南》中的 Bucket owner granting cross-account bucket permissions。授予访问权限后,请确保您的管道角色具有所需的权限。

然后,您可以使用创建管道,从而bucket_owners对用作源的 Amazon S3 存储桶启用跨账户访问:

s3-pipeline: source: s3: notification_type: "sqs" codec: csv: delimiter: "," quote_character: "\"" detect_header: True sqs: queue_url: "https://sqs.ap-northeast-1.amazonaws.com/401447383613/test-s3-queue" bucket_owners: my-bucket-01: 123456789012 my-bucket-02: 999999999999 compression: "gzip"