Amazon OpenSearch Ingestion 中的管道功能概述 - 亚马逊 OpenSearch 服务
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Amazon OpenSearch Ingestion 中的管道功能概述

Amazon OpenSearch Ingestion 预置管道,这些管道由一个源、一个缓冲区、零个或多个处理器以及一个或多个接收器组成。提取管道由 Data Prepper 作为数据引擎提供支持。有关管道各个组件的概述,请参见 重要概念

以下各节概述了 Amazon OpenSearch Ingestion 中一些最常用的功能。

注意

该列表不是管道可用功能的详尽列表。有关管道所有可用功能的综合文档,请参阅 Data Prepper 文档。请注意, OpenSearch Ingestion 对你可以使用的插件和选项施加了一些限制。有关更多信息,请参阅 Amazon OpenSearch Ingestion 管道支持的插件和选项

持久缓冲功能

永久缓冲区将您的数据存储在跨多个可用区的基于磁盘的缓冲区中,以增加数据的持久性。您可以使用持久缓冲来为所有支持的基于推送的源提取数据,而无需设置独立的缓冲区。这些包括 HTTP 以及日志、跟踪和指标的 OpenTelemetry 来源。

要启用永久缓冲,请在创建或更新管道时选择 “启用永久缓冲区”。有关更多信息,请参阅创建 Amazon OpenSearch Ingestion 管道。 OpenSearch Ingestion 会根据您为管道指定的摄取 OpenSearch 计算单位(摄取 OCU)自动确定所需的缓冲容量。

默认情况下,管道使用加密 Amazon 拥有的密钥 缓冲区数据。这些管道不需要管道角色的任何额外权限。或者,您可以指定客户托管密钥并将以下 IAM 权限添加到管道角色:

{ "Version": "2012-10-17", "Statement": [ { "Sid": "KeyAccess", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:GenerateDataKeyWithoutPlaintext" ], "Resource": "arn:aws:kms:{region}:{aws-account-id}:key/1234abcd-12ab-34cd-56ef-1234567890ab" } ] }

有关更多信息,请参阅《Amazon Key Management Service 开发人员指南》中的客户托管密钥

注意

如果您禁用持久缓冲功能,则您的管道将更新为完全在内存缓冲上运行。

调整最大请求负载大小

如果您为管道启用永久缓冲,则可以选择调整最大请求负载大小。此设置限制了在单个请求中发送到接收器的记录的大小,从而避免发送大量请求。要调整最大有效载荷大小,请在源配置中设置该max_request_length选项。就像永久缓冲一样,此选项仅支持 HTTP 以及日志、跟踪和指标的 OpenTelemetry 来源。

max_request_length选项的唯一有效值是 1mb、1.5mb、2mb、2.5mb、3mb、3.5mb 和 4mb。如果指定不同的值,则会收到错误消息。

以下示例演示了如何在管道配置中配置最大负载大小:

... log-pipeline: source: http: path: "/${pipelineName}/logs" max_request_length: "4mb" processor: ...

如果您启用永久缓冲但未指定该max_request_length选项,则该值默认为 1mb。

拆分

您可以将 OpenSearch Ingestion 管道配置为将传入事件分为子管道,从而允许您对同一个传入事件执行不同类型的处理。

以下示例管道将传入事件拆分到两个子管道。每个子管道都使用自己的处理器来丰富和操作数据,然后将数据发送到不同的 OpenSearch 索引。

version: "2" log-pipeline: source: http: ... sink: - pipeline: name: "logs_enriched_one_pipeline" - pipeline: name: "logs_enriched_two_pipeline" logs_enriched_one_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_one_logs" logs_enriched_two_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_two_logs"

链接

您可以将多个子管道链接在一起,以便分块执行数据处理和扩充。换句话说,你可以在一个子管道中使用一定的处理能力来丰富传入的事件,然后将其发送到另一个子管道,使用不同的处理器进行进一步丰富,最后将其发送到其 OpenSearch 接收器。

在以下示例中,log_pipeline子管道使用一组处理器丰富传入的日志事件,然后将该事件发送到名为的 OpenSearch 索引。enriched_logs管道将相同的事件发送到log_advanced_pipeline子管道,子管道对其进行处理并将其发送到名enriched_advanced_logs为的其他 OpenSearch 索引。

version: "2" log-pipeline: source: http: ... processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_logs" - pipeline: name: "log_advanced_pipeline" log_advanced_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_advanced_logs"

死信队列

死信队列 (DLQ) 是管道未能写入接收器的事件的目的地。在 OpenSearch Ingestion 中,您必须指定一个具有相应写入权限的 Amazon S3 存储桶才能用作 DLQ。您可以向管道中的每个接收器添加 DLQ 配置。当管道遇到写入错误时,它会在配置的 S3 存储桶中创建 DLQ 对象。DLQ 对象作为一组失败事件存在于 JSON 文件中。

满足以下任意条件时,管道会向 DLQ 写入事件:

  • max_retries用于 OpenSearch 水槽的东西已经用完了。 OpenSearch 对于此选项,摄取至少需要 16。

  • 由于出现错误条件,事件被接收器拒绝。

配置

要为子管道配置死信队列,请在 opensearch 接收器配置中指定 dlq 选项:

apache-log-pipeline: ... sink: opensearch: dlq: s3: bucket: "my-dlq-bucket" key_path_prefix: "dlq-files" region: "us-west-2" sts_role_arn: "arn:aws:iam::123456789012:role/dlq-role"

写入此 S3 DLQ 的文件将采用以下命名模式:

dlq-v${version}-${pipelineName}-${pluginId}-${timestampIso8601}-${uniqueId}

有关更多信息,请参阅死信队列 (DLQ)

有关配置 sts_role_arn 角色的说明,请参阅写入死信队列

示例

考虑以下示例 DLQ 文件:

dlq-v2-apache-log-pipeline-opensearch-2023-04-05T15:26:19.152938Z-e7eb675a-f558-4048-8566-dac15a4f8343

以下是未能写入接收器并发送到 DLQ S3 存储桶进行进一步分析的数据示例:

Record_0 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "sample log" timestamp "2023-04-14T10:36:01.070Z" Record_1 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "another sample log" timestamp "2023-04-14T10:36:01.071Z"

索引管理

Amazon OpenSearch Ingestion 具有许多索引管理功能,包括以下功能。

创建索引

您可以在管道接收器中指定索引名称, OpenSearch Ingestion 在置备管道时会创建索引。如果索引已经存在,管道会将其用于索引传入事件。如果您停止并重启管道,或者更新其 YAML 配置,如果这些索引尚不存在,则管道会尝试创建新的索引。管道始终不会删除索引。

以下示例为接收器在预调配管道时创建两个索引:

sink: - opensearch: index: apache_logs - opensearch: index: nginx_logs

生成索引名称和模式

您可以使用传入事件字段的变量来生成动态索引名称。在接收器配置中,使用格式 string${} 表示字符串插值,并使用 JSON 指针从事件中提取字段。index_type 的选项是 custommanagement_disabled。由于 OpenSearch 域名和 OpenSearch 无服务器集合management_disabledindex_type默认值为,因此可以将其保留为未设置。custom

例如,以下管道从传入事件中选择 metadataType 字段以生成索引名称。

pipeline: ... sink: opensearch: index: "metadata-${metadataType}"

以下配置继续每天或每小时生成一个新索引。

pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd}" pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd.HH}"

索引名称也可以是以日期-时间模式作为后缀的纯字符串,例如 my-index-%{yyyy.MM.dd}。当接收器向发送数据时 OpenSearch,它会将日期时间模式替换为 UTC 时间,并为每天创建一个新索引,例如。my-index-2022.01.25有关更多信息,请参阅DateTimeFormatter课程。

该索引名称也可以是带有/不带日期-时间样式后缀的格式化字符串,例如 my-${index}-name。当接收器向发送数据时 OpenSearch,它会将该"${index}"部分替换为正在处理的事件中的值。如果格式为 "${index1/index2/index3}",则使用事件中的值代替字段 index1/index2/index3

生成文档 ID

管道可以在为文档编制索引时生成文档 ID OpenSearch。它可以从传入事件中的字段推断出这些文档 ID。

此示例使用传入事件的 uuid 字段生成文档 ID。

pipeline: ... sink: opensearch: index_type: custom index: "metadata-${metadataType}-%{yyyy.MM.dd}" document_id_field: "uuid"

在以下示例中,添加条目处理器合并传入事件的字段 uuidother_field 以生成文档 ID。

create 操作可确保不会覆盖具有相同 ID 的文档。管道会丢弃重复的文档,而不会出现任何重试或 DLQ 事件。由于使用此操作的管道作者的目的在于避免更新现有文档,因而这一预期十分合理。

pipeline: ... processor: - add_entries: entries: - key: "my_doc_id_field" format: "${uuid}-${other_field}" sink: - opensearch: ... action: "create" document_id_field: "my_doc_id_field"

您可能需要将事件的文档 ID 设置为子对象中的字段。在以下示例中,s OpenSearch ink 插件使用子对象info/id生成文档 ID。

sink: - opensearch: ... document_id_field: info/id

鉴于以下事件,管道将生成一个 _id 字段设置为 json001 的文档:

{ "fieldA":"arbitrary value", "info":{ "id":"json001", "fieldA":"xyz", "fieldB":"def" } }

生成路由 ID

你可以使用 sin OpenSearch k 插件中的routing_field选项将文档路由属性 (_routing) 的值设置为来自传入事件的值。

路由支持 JSON 指针语法,因此嵌套字段也可用,而不仅仅是顶级字段。

sink: - opensearch: ... routing_field: metadata/id document_id_field: id

鉴于以下事件,插件将生成一个 _routing 字段设置为 abcd 的文档:

{ "id":"123", "metadata":{ "id":"abcd", "fieldA":"valueA" }, "fieldB":"valueB" }

有关创建供管道在创建索引时使用的索引模板的说明,请参阅索引模板

E nd-to-end 致谢

OpenSearch Ingestion 使用end-to-end确认功能跟踪无状态管道中从源到接收器的传输,从而确保数据的持久性和可靠性。目前,只有 S 3 源插件支持 end-to-end 确认。

通过 end-to-end 确认,管道源插件会创建一个确认集来监视一批事件。当这些事件成功发送到其接收器时,它会收到肯定应答,或者当任何事件无法发送到其接收器时,它会收到否定应答。

如果管道组件出现故障或崩溃,或者源未能收到确认,则源会超时并采取必要的操作,例如重试或记录失败。如果管道配置了多个接收器或多个子管道,则只有在将事件发送到所有子管道中的所有接收器之后,才会发送事件级别确认。如果接收器配置了 DLQ,则 end-to-end 确认还会跟踪写入 DLQ 的事件。

要启用 end-to-end 确认,请在源配置中包含以下acknowledgments选项:

s3-pipeline: source: s3: acknowledgments: true ...

源背压

当管道忙于处理数据,或者其接收器暂时关闭或数据采集速度缓慢时,管道可能会承受背压。 OpenSearch 根据管道使用的源插件,Ingestion 有不同的处理背压的方法。

HTTP 源

使用 HTTP 源插件的管道处理反向压力的方式会有所不同,具体取决于哪个管道组件处于拥塞状态:

  • 缓冲区 — 当缓冲区已满时,管道开始将错误代码为 408 的 HTTP 状态 REQUEST_TIMEOUT 返回到源端点。缓冲区被释放后,管道将重新开始处理 HTTP 事件。

  • 源线程 — 当所有 HTTP 源线程都忙于执行请求,并且未处理的请求队列大小已超过允许的最大请求数时,管道开始将错误代码为 429 的 HTTP 状态 TOO_MANY_REQUESTS 返回到源端点。当请求队列降至允许的最大队列大小以下时,管道将重新开始处理请求。

OTel 源

当使用 OpenTelemetry 源(oTel 日志、oTel 指标和 oTel 跟踪)的管道的缓冲区已满时,管道开始向源端点返回错误代码为 408 的 HTTP 状态REQUEST_TIMEOUT。缓冲区被释放后,管道将重新开始处理事件。

S3 源

当带有 S3 源的管道的缓冲区已满时,管道将停止处理 SQS 通知。缓冲区被释放后,管道将重新开始处理通知。

如果接收器关闭或无法采集数据,并且为源启用了 end-to-end 确认功能,则管道将停止处理 SQS 通知,直到收到来自所有接收器的成功确认为止。