在 Amaz OpenSearch on S3 中使用采集管道 - 亚马逊 OpenSearch 服务
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Amaz OpenSearch on S3 中使用采集管道

借助 OpenSearch Ingestion,您可以将 Amazon S3 用作源或目标。当您使用 Amazon S3 作为数据源时,会将数据发送到 OpenSearch 摄取管道。当您使用 Amazon S3 作为目标时,会将数据从 OpenSearch 摄取管道写入到一个或多个 S3 存储桶。

Amazon S3 作为源

您可以通过两种方式使用 Amazon S3 作为源处理数据:S3-SQS 处理计划扫描

如果您需要在文件写入 S3 后近实时扫描文件,请使用 S3-SQS 处理。您可以配置 Amazon S3 存储桶,在存储桶中存储或修改对象时随时触发事件。使用一次性扫描或定期计划扫描批处理 S3 存储桶中的数据。

先决条件

要使用 Amazon S3 作为预定扫描或 S3-SQS 处理的 OpenSearch 摄取管道的来源,请先创建一个 S3 存储桶

注意

如果 OpenSearch Ingestion 管道中用作源的 S3 存储桶位于不同的存储桶中 Amazon Web Services 账户,则还需要对该存储桶启用跨账户读取权限。这样管道将可读取和处理数据。要启用跨账户权限,请参阅 Amazon S3 用户指南中的存储桶拥有者授予跨账户存储桶权限

如果您的 S3 存储桶位于多个账户中,请使用bucket_owners地图。有关示例,请参阅 OpenSearch文档中的跨账户 S3 访问权限

要设置 S3-SQS 处理,还需要执行以下步骤:

  1. 创建 Amazon SQS 队列

  2. 在以 SQS 队列为目标的 S3 存储桶上启用事件通知

步骤 1:配置管道角色

与其他将数据推送到管道的源插件不同,S3 源插件采用基于读取的架构,管道从源中拉取数据。

因此,为使管道能够从 S3 读取,必须在管道的 S3 源配置中指定一个可以同时访问 S3 存储桶和 Amazon SQS 队列的角色。管道将担任此角色,以便从队列中读取数据。

注意

在 S3 源配置中指定的角色必须是管道角色。因此,管道角色必须包含两个单独的权限策略,一个用于写入接收器,另一个用于从 S3 源中拉取。您必须在所有管道组件中使用相同的 sts_role_arn

以下示例策略显示了使用 S3 作为源所需的权限:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action":[ "s3:ListBucket", "s3:GetBucketLocation", "s3:GetObject" ], "Resource": "arn:aws:s3:::my-bucket/*" }, { "Effect":"Allow", "Action":"s3:ListAllMyBuckets", "Resource":"arn:aws:s3:::*" }, { "Effect": "Allow", "Action": [ "sqs:DeleteMessage", "sqs:ReceiveMessage", "sqs:ChangeMessageVisibility" ], "Resource": "arn:aws:sqs:us-west-2:{account-id}:MyS3EventSqsQueue" } ] }

必须将以下权限附加到在 S3 源插件配置的 sts_role_arn 选项中指定的 IAM 角色:

version: "2" source: s3: ... aws: ... sts_role_arn: arn:aws:iam::{account-id}:role/pipeline-role processor: ... sink: - opensearch: ...

步骤 2:创建管道

设置权限后,您可以根据您的 Amazon S3 用例配置 OpenSearch 摄取管道。

S3-SQS 处理

要设置 S3-SQS 处理,请配置您的管道,指定 S3 作为源并设置 Amazon SQS 通知:

version: "2" s3-pipeline: source: s3: notification_type: "sqs" codec: newline: null sqs: queue_url: "https://sqs.us-east-1.amazonaws.com/{account-id}/ingestion-queue" compression: "none" aws: region: "us-east-1" # IAM role that the pipeline assumes to read data from the queue. This role must be the same as the pipeline role. sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" processor: - grok: match: log: - "%{COMMONAPACHELOG}" - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint.us-east-1.es.amazonaws.com"] index: "index-name" aws: # IAM role that the pipeline assumes to access the domain sink sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" region: "us-east-1"

如果您在 Amazon S3 上处理小文件时发现 CPU 使用率较低,请考虑通过修改该workers选项的值来增加吞吐量。有关更多信息,请参阅 S 3 插件配置选项

计划扫描

要设置计划扫描,请使用适用于所有 S3 存储桶的扫描级别或存储桶级别的计划来配置管道。存储桶级别计划或扫描间隔配置始终覆盖扫描级别配置。

您可以使用一次性扫描(非常适合数据迁移)或定期扫描(非常适合批处理)配置计划扫描。

要将您的管道配置为从亚马逊 S3 读取,请使用名为 AWS-S3 ScanPipelineAWS-S3 ScanSchedulePipeline 的亚马逊 S3 蓝图。您可以编辑管道配置的 scan 部分以满足计划需求。有关更多信息,请参阅 使用蓝图创建管道

一次性扫描

一次性计划扫描运行一次。在 YAML 配置中,您可以使用 start_timeend_time 指定希望何时扫描存储桶中的对象。或者,您也可以使用 range 指定相对于当前时间的时间间隔,以该时间间隔扫描存储桶中的对象。

例如,范围设置为 PT4H 将扫描最近四个小时内创建的所有文件。要配置再次运行一次性扫描,必须先停止管道,然后再重新启动。如果未配置范围,则还必须更新开始时间和结束时间。

以下配置为所有存储桶及这些存储桶中的所有对象设置一次性扫描:

version: "2" log-pipeline: source: s3: codec: csv: compression: "none" aws: region: "us-east-1" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" acknowledgments: true scan: buckets: - bucket: name: my-bucket-1 filter: include_prefix: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 key_prefix: include: - Objects2/ exclude_suffix: - .jpeg - .png delete_s3_objects_on_read: false processor: - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint.us-east-1.es.amazonaws.com"] index: "index-name" aws: sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" region: "us-east-1" dlq: s3: bucket: "my-bucket-1" region: "us-east-1" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role"

以下配置设置在指定时段内对所有存储桶运行一次性扫描。这意味着 S3 仅处理创建时间在此时段内的对象。

scan: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z buckets: - bucket: name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png

以下配置设置扫描级别和存储桶级别一次性扫描。存储桶级别开始时间和结束时间将覆盖扫描级别开始时间和结束时间。

scan: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z buckets: - bucket: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png

停止管道会移除管道在停止之前对管道扫描过的对象的任何先前存在的引用。如果单个扫描管道停止,它将在启动后重新扫描所有对象,即使它们已经被扫描。如果您需要停止单个扫描管道,建议您在重新启动管道之前更改时间窗口。

如果您需要按开始时间和结束时间筛选对象,则停止和启动管道是唯一的选择。如果您不需要按开始时间和结束时间进行筛选,则可以按名称筛选对象。按名字过滤不需要你停止并启动你的管道。为此,请使用include_prefixexclude_suffix

定期扫描

定期计划扫描按定期计划时间间隔对您指定的 S3 存储桶运行扫描。只能在扫描级别配置间隔,因为不支持单独执行存储桶级别配置。

在 YAML 配置中,interval 将指定定期扫描频率,范围介于 30 秒到 365 天之间。始终在创建管道时运行首次扫描。count 定义扫描实例总数。

以下配置设置定期扫描,两次扫描之间的延迟为 12 小时:

scan: scheduling: interval: PT12H count: 4 buckets: - bucket: name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png

Amazon S3 作为目标

要将数据从 OpenSearch 摄取管道写入 S3 存储桶,请使用名为 AWS-S3 SinkLogPipeline 的蓝图创建带有 S3 接收器的管道。该管道将选择性数据路由到 OpenSearch 接收器,同时将所有数据发送到 S3 中进行存档。有关更多信息,请参阅 使用蓝图创建管道

创建 S3 接收器时,您可以从各种不同的接收器编解码器指定首选格式。例如,如果要以列式格式写入数据,请选择 Parquet 或 Avro 编解码器。如果您更喜欢基于行的格式,请选择 JSON 或 ND-JSON。要将数据写入指定架构中的 S3,您还可以使用 Avro 格式在接收器编解码器中定义内联架构。

以下示例在 S3 接收器中定义内联架构:

- s3: codec: parquet: schema: > { "type" : "record", "namespace" : "org.vpcFlowLog.examples", "name" : "VpcFlowLog", "fields" : [ { "name" : "version", "type" : "string"}, { "name" : "srcport", "type": "int"}, { "name" : "dstport", "type": "int"}, { "name" : "start", "type": "int"}, { "name" : "end", "type": "int"}, { "name" : "protocol", "type": "int"}, { "name" : "packets", "type": "int"}, { "name" : "bytes", "type": "int"}, { "name" : "action", "type": "string"}, { "name" : "logStatus", "type" : "string"} ] }

定义此架构时,请指定管道向接收器发送的不同类型事件中可能存在的所有键的超集。

例如,如果事件可能缺少键,则在架构中添加值为 null 的键。Null 值声明允许架构处理非统一数据(一些事件具有这些键,另一些事件则没有)。当传入事件确实存在这些键时,则将键值写入接收器。

此架构定义充当筛选器,仅允许将定义的键发送到接收器,并从传入事件中删除未定义的键。

您也可以在接收器中使用 include_keysexclude_keys 筛选路由到其他接收器的数据。两个筛选器互斥,因此在架构中一次只能使用一个筛选器。此外,不能在用户定义的架构中使用它们。

要使用此类过滤器创建管道,请使用AWSSinkFilterWithSchemaPipeline蓝图。有关更多信息,请参阅 使用蓝图创建管道

亚马逊 S3 交叉账户作为来源

您可以通过 Amazon S3 授予跨账户访问权限,这样 OpenSearch Ingestion 管道就可以访问另一个账户中的 S3 存储桶作为来源。以下 YAML 配置允许跨账户访问作为来源的 Amazon S3 存储桶:

s3-pipeline: source: s3: notification_type: "sqs" codec: csv: delimiter: "," quote_character: "\"" detect_header: True sqs: queue_url: "https://sqs.ap-northeast-1.amazonaws.com/401447383613/test-s3-queue" bucket_owners: user-role-1234567890: 1234567890 # User1 user-role-12345678891: 1234567891 # User2 compression: "gzip"