Amazon OpenSearch Ingestion 中的管道功能概述 - Amazon OpenSearch Service
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Amazon OpenSearch Ingestion 中的管道功能概述

Amazon OpenSearch Ingestion 预调配管道由一个来源、一个缓冲区、零个或多个处理器以及一个或多个接收器组成。提取管道由 Data Prepper 作为数据引擎提供支持。有关管道各个组件的概述,请参见 Amazon OpenSearch Ingestion 重要概念

以下章节概述了 Amazon OpenSearch Ingestion 中一些最常用的功能。

注意

该列表不是管道可用功能的详尽列表。有关管道所有可用功能的综合文档,请参阅 Data Prepper 文档。请注意,OpenSearch Ingestion 对您可以使用的插件和选项设置了一些限制。有关更多信息,请参阅 Amazon OpenSearch Ingestion 管道支持的插件和选项

持久缓冲功能

持久缓冲区将您的数据存储在跨多个可用区且基于磁盘的磁盘缓冲区中,以增强数据持久性。对于所有支持的基于推送的源,您可使用持久缓冲区以摄取数据,无需设置独立的缓冲区。这些源包括用于日志、跟踪和指标的 HTTP 和 OpenTelemetry。要启用持久缓冲功能,请在创建或更新管道时选择启用持久缓冲区。有关更多信息,请参阅 创建 Amazon OpenSearch Ingestion 管道

OpenSearch Ingestion 会根据数据来源、流式转换和接收器目标,动态确定用于持久缓冲功能的 OCU 数量。由于部分 OCU 会被分配用于缓冲,因此您可能需要增加最小和最大 OCU 数以维持相同的摄取吞吐量。管道在缓冲区中保留数据的时间最长可达 72 小时。

如果为管道启用持久缓冲功能,则默认最大请求有效载荷大小如下所示:

  • HTTP 源:10 MB

  • OpenTelemetry 源:4 MB

对于 HTTP 源,您可以将最大有效载荷大小增加到 20 MB。请求有效载荷大小包括整个 HTTP 请求,通常包含多个事件。每个事件不得超过 3.5 MB。

具有持久缓冲功能的管道将配置的管道单位在计算单位和缓冲区单位之间进行拆分。如果管道使用 CPU 密集型处理器(例如 grok、键值或拆分字符串),则按 1:1 的缓冲区与计算资源比例分配单位。否则,将按 3:1 的比例进行分配,始终优先考虑计算单位。

例如:

  • 具有 grok 和 2 个最大单位的管道:1 个计算单位和 1 个缓冲区单位

  • 具有 grok 和 5 个最大单位的管道:3 个计算单位和 2 个缓冲区单位

  • 无处理器但具有 2 个最大单位的管道:1 个计算单位和 1 个缓冲区单位

  • 无处理器但具有 4 个最大单位的管道:1 个计算单位和 3 个缓冲区单位

  • 具有 grok 和 5 个最大单位的管道:2 个计算单位和 3 个缓冲区单位

默认情况下,管道使用 Amazon 拥有的密钥 加密缓冲区数据。这些管道不需要任何额外的管道角色权限。

您也可以指定客户自主管理型密钥,并将以下 IAM 权限添加到管道角色:

JSON
{ "Version":"2012-10-17", "Statement": [ { "Sid": "KeyAccess", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:GenerateDataKeyWithoutPlaintext" ], "Resource": "arn:aws:kms:us-east-1:111122223333:key/ASIAIOSFODNN7EXAMPLE" } ] }

有关更多信息,请参阅《Amazon Key Management Service 开发人员指南》中的客户托管密钥

注意

如果禁用永久缓冲功能,则管道将开始完全基于内存缓冲运行。

拆分

您可以配置 OpenSearch Ingestion 管道,将传入事件拆分到子管道,从而可对同一个传入事件执行不同类型的处理。

以下示例管道将传入事件拆分到两个子管道。每个子管道都使用自己的处理器来扩充和操作数据,然后将数据发送到不同的 OpenSearch 索引。

version: "2" log-pipeline: source: http: ... sink: - pipeline: name: "logs_enriched_one_pipeline" - pipeline: name: "logs_enriched_two_pipeline" logs_enriched_one_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_one_logs" logs_enriched_two_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_two_logs"

链接

您可以将多个子管道链接在一起,以便分块执行数据处理和扩充。也就是说,您可以在一个子管道中使用某些处理功能来扩充传入事件,然后将其发送到另一个子管道,以使用不同的处理器进行进一步扩充,最后将其发送到 OpenSearch 接收器。

在以下示例中,log_pipeline 子管道使用一组处理器扩充传入的日志事件,然后将该事件发送到名为 enriched_logs 的 OpenSearch 索引。管道向 log_advanced_pipeline 子管道发送相同的事件,子管道对事件进行处理并将其发送到名为 enriched_advanced_logs 的其他 OpenSearch 索引。

version: "2" log-pipeline: source: http: ... processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_logs" - pipeline: name: "log_advanced_pipeline" log_advanced_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_advanced_logs"

死信队列

死信队列 (DLQ) 是管道未能写入接收器的事件的目的地。在 OpenSearch Ingestion 中,您必须指定一个具有相应写入权限的 Amazon S3 存储桶,才能将其用作 DLQ。您可以向管道中的每个接收器添加 DLQ 配置。当管道遇到写入错误时,它会在配置的 S3 存储桶中创建 DLQ 对象。DLQ 对象作为一组失败事件存在于 JSON 文件中。

满足以下任意条件时,管道会向 DLQ 写入事件:

  • OpenSearch 接收器的最大重试次数已用尽。对于此设置,OpenSearch Ingestion 至少需要 16 个。

  • 由于出现错误条件,事件遭接收器拒绝。

配置

要为子管道配置死信队列,请在配置接收器目标时选择启用 S3 DLQ。随后为队列指定所需设置。有关更多信息,请参阅 Data Prepper DLQ 文档中的配置

写入此 S3 DLQ 的文件采用以下命名模式:

dlq-v${version}-${pipelineName}-${pluginId}-${timestampIso8601}-${uniqueId}

有关手动配置管道角色以允许访问 DLQ 写入到 S3 存储桶的说明,请参阅 写入 Amazon S3 或死信队列的权限

示例

考虑以下示例 DLQ 文件:

dlq-v2-apache-log-pipeline-opensearch-2023-04-05T15:26:19.152938Z-e7eb675a-f558-4048-8566-dac15a4f8343

以下是未能写入接收器并发送到 DLQ S3 存储桶进行进一步分析的数据示例:

Record_0 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "sample log" timestamp "2023-04-14T10:36:01.070Z" Record_1 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "another sample log" timestamp "2023-04-14T10:36:01.071Z"

索引管理

Amazon OpenSearch Ingestion 具有许多索引管理功能,包括以下功能。

创建索引

您可以在管道接收器中指定索引名称,OpenSearch Ingestion 会在预调配管道时创建索引。如果索引已经存在,管道会将其用于索引传入事件。如果您停止并重启管道,或者更新其 YAML 配置,如果这些索引尚不存在,则管道会尝试创建新的索引。管道始终不会删除索引。

以下示例为接收器在预调配管道时创建两个索引:

sink: - opensearch: index: apache_logs - opensearch: index: nginx_logs

生成索引名称和模式

您可以使用传入事件字段的变量来生成动态索引名称。在接收器配置中,使用格式 string${} 表示字符串插值,并使用 JSON 指针从事件中提取字段。index_type 的选项是 custommanagement_disabled。由于 OpenSearch 域的 index_type 默认设置为 custom,OpenSearch 无服务器集合的默认设置为 management_disabled,因此可以将其保留为未设置。

例如,以下管道从传入事件中选择 metadataType 字段以生成索引名称。

pipeline: ... sink: opensearch: index: "metadata-${metadataType}"

以下配置继续每天或每小时生成一个新索引。

pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd}" pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd.HH}"

索引名称也可以是以日期-时间模式作为后缀的纯字符串,例如 my-index-%{yyyy.MM.dd}。当接收器向 OpenSearch 发送数据时,它会使用 UTC 时间代替日期-时间模式,并每天创建一个新索引,例如 my-index-2022.01.25。有关更多信息,请参阅 DateTimeFormatter 类。

该索引名称也可以是带有/不带日期-时间样式后缀的格式化字符串,例如 my-${index}-name。当接收器向 OpenSearch 发送数据时,它会使用正在处理的事件中的值代替 "${index}" 部分。如果格式为 "${index1/index2/index3}",则使用事件中的值代替字段 index1/index2/index3

生成文档 ID

管道可以在为文档建立到 OpenSearch 的索引时生成文档 ID。它可以从传入事件中的字段推断出这些文档 ID。

此示例使用传入事件的 uuid 字段生成文档 ID。

pipeline: ... sink: opensearch: index_type: custom index: "metadata-${metadataType}-%{yyyy.MM.dd}" "document_id": "uuid"

在以下示例中,添加条目处理器合并传入事件的字段 uuidother_field 以生成文档 ID。

create 操作可确保不会覆盖具有相同 ID 的文档。管道会丢弃重复的文档,而不会出现任何重试或 DLQ 事件。由于使用此操作的管道作者的目的在于避免更新现有文档,因而这一预期十分合理。

pipeline: ... processor: - add_entries: entries: - key: "my_doc_id_field" format: "${uuid}-${other_field}" sink: - opensearch: ... action: "create" document_id: "my_doc_id"

您可能需要将事件的文档 ID 设置为子对象中的字段。在以下示例中,OpenSearch 接收器插件使用子对象 info/id 生成文档 ID。

sink: - opensearch: ... document_id: info/id

鉴于以下事件,管道将生成一个 _id 字段设置为 json001 的文档:

{ "fieldA":"arbitrary value", "info":{ "id":"json001", "fieldA":"xyz", "fieldB":"def" } }

生成路由 ID

您可以使用 OpenSearch 接收器插件中的 routing_field 选项,将文档路由属性 (_routing) 的值设置为传入事件中的值。

路由支持 JSON 指针语法,因此嵌套字段也可用,而不仅仅是顶级字段。

sink: - opensearch: ... routing_field: metadata/id document_id: id

鉴于以下事件,插件将生成一个 _routing 字段设置为 abcd 的文档:

{ "id":"123", "metadata":{ "id":"abcd", "fieldA":"valueA" }, "fieldB":"valueB" }

有关创建供管道在创建索引时使用的索引模板的说明,请参阅索引模板

端到端确认

OpenSearch Ingestion 通过使用端到端确认,跟踪数据在无状态管道中从源到接收器的传输,从而确保数据的持久性和可靠性。

注意

目前,只有 S3 源插件支持端到端确认。

通过端到端确认,管道源插件会创建一个确认组合来监控一批事件。当这些事件成功发送到其接收器时,它会收到肯定应答,或者当任何事件无法发送到其接收器时,它会收到否定应答。

如果管道组件出现故障或崩溃,或者源未能收到确认,则源会超时并采取必要的操作,例如重试或记录失败。如果管道配置了多个接收器或多个子管道,则只有在将事件发送到所有子管道中的所有接收器之后,才会发送事件级别确认。如果接收器配置了 DLQ,则端到端确认还会跟踪写入 DLQ 的事件。

要启用端到端确认,请展开 Amazon S3 源配置中的其他选项,并选择启用端到端消息确认

源背压

当管道忙于处理数据,或其接收器暂时关闭或数据摄取速度缓慢时,管道可能会承受反向压力。OpenSearch Ingestion 有不同的方法来处理反向压力,具体取决于管道使用的源插件。

HTTP 源

使用 HTTP 源插件的管道处理反向压力的方式会有所不同,具体取决于哪个管道组件处于拥塞状态:

  • 缓冲区 — 当缓冲区已满时,管道开始将错误代码为 408 的 HTTP 状态 REQUEST_TIMEOUT 返回到源端点。缓冲区被释放后,管道将重新开始处理 HTTP 事件。

  • 源线程 — 当所有 HTTP 源线程都忙于执行请求,并且未处理的请求队列大小已超过允许的最大请求数时,管道开始将错误代码为 429 的 HTTP 状态 TOO_MANY_REQUESTS 返回到源端点。当请求队列降至允许的最大队列大小以下时,管道将重新开始处理请求。

OTel 源

使用 OpenTelemetry 源(OTel 日志OTel 指标OTel 跟踪)的管道的缓冲区已满时,管道开始将错误代码为 408 的 HTTP 状态 REQUEST_TIMEOUT 返回到源端点。缓冲区被释放后,管道将重新开始处理事件。

S3 源

当带有 S3 源的管道的缓冲区已满时,管道将停止处理 SQS 通知。缓冲区被释放后,管道将重新开始处理通知。

如果接收器关闭或无法摄取数据,并且为源启用了端到端确认,则管道将停止处理 SQS 通知,直到收到来自所有接收器的成功确认为止。