本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
Amazon OpenSearch Ingestion 中的管道功能概述
Amazon OpenSearch Ingestion 预调配管道,包括一个来源、一个缓冲区、零个或多个处理器以及一个或多个接收器。提取管道由 Data Prepper 作为数据引擎提供支持。有关管道各个组件的概述,请参见 Amazon OpenSearch Ingestion 中的关键概念。
以下章节概述了 Amazon OpenSearch Ingestion 中一些最常用的功能。
注意
该列表不是管道可用功能的详尽列表。有关管道所有可用功能的综合文档,请参阅 Data Prepper 文档
持久缓冲功能
永久缓冲区将您的数据存储在跨多个可用区的磁盘缓冲区中。对于来自所有支持的基于推送的源,您可以使用永久缓冲区来摄取数据,无需设置独立的缓冲区。这些来源包括 HTTP 以及 OpenTelemetry 用于日志、跟踪和指标的源。要启用永久缓冲区,请在创建或更新管道时选择启用永久缓冲区。有关更多信息,请参阅 创建 Amazon OpenSearch Ingestion 管道。
OpenSearch Ingestion 会动态确定 OCUs 用于永久缓冲的数量,同时考虑数据源、流式传输转换和接收目标。由于它会 OCUs 为缓冲分配一些资源,因此您可能需要增加最小值和最大值 OCUs 以保持相同的摄取吞吐量。管道在缓冲区中保留数据长达 72 小时。
如果您为管道启用永久缓冲区,则默认最大请求有效载荷大小如下:
-
HTTP 来源 — 10 MB
-
OpenTelemetry 来源 — 4 MB
对于 HTTP 源,您可以将最大有效载荷大小增加到 20 MB。请求有效载荷大小包括整个 HTTP 请求,通常包含多个事件。每个事件均不得超过 3.5 MB。
具有永久缓冲区的管道将配置的管道单位分成计算单位和缓冲区单位。如果管道使用 CPU 密集型处理器(例如 grok、键值或拆分字符串),则它将以 1:1 的比例分配单位。 buffer-to-compute否则,它以 3:1 的比例分配它们,始终偏向于计算单元。
例如:
-
具有 grok 和 2 个最大单位的管道:1 个计算单位和 1 个缓冲区单位
-
具有 grok 和 5 个最大单位的管道:3 个计算单位和 2 个缓冲区单位
-
无处理器但具有 2 个最大单位的管道:1 个计算单位和 1 个缓冲区单位
-
无处理器但具有 4 个最大单位的管道:1 个计算单位和 3 个缓冲区单位
-
具有 grok 和 5 个最大单位的管道:2 个计算单位和 3 个缓冲区单位
默认情况下,管道使用加密 Amazon 拥有的密钥 缓冲区数据。这些管道不需要任何额外的管道角色权限。
您也可以指定客户自主管理型密钥,并将以下 IAM 权限添加到管道角色:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "KeyAccess", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:GenerateDataKeyWithoutPlaintext" ], "Resource": "arn:aws:kms:
{region}
:{aws-account-id}
:key/1234abcd-12ab-34cd-56ef-1234567890ab
" } ] }
有关更多信息,请参阅《Amazon Key Management Service 开发人员指南》中的客户托管密钥。
注意
如果您禁用永久缓冲区,则您的管道将完全在内存缓冲上运行。
拆分
您可以配置 OpenSearch Ingestion 管道,将传入事件拆分到子管道,从而可对同一个传入事件执行不同类型的处理。
以下示例管道将传入事件拆分到两个子管道。每个子管道都使用自己的处理器来扩充和操作数据,然后将数据发送到不同的 OpenSearch 索引。
version: "2" log-pipeline: source: http: ... sink: - pipeline: name: "logs_enriched_one_pipeline" - pipeline: name: "logs_enriched_two_pipeline" logs_enriched_one_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_one_logs" logs_enriched_two_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_two_logs"
链接
您可以将多个子管道链接在一起,以便分块执行数据处理和扩充。也就是说,您可以在一个子管道中使用某些处理功能来扩充传入事件,然后将其发送到另一个子管道,以使用不同的处理器进行进一步扩充,最后将其发送到接收器。 OpenSearch
在以下示例中,log_pipeline
子管道使用一组处理器扩充传入的日志事件,然后将该事件发送到名为的 OpenSearch 索引。enriched_logs
管道向子管道发送相同的事件,log_advanced_pipeline
子管道对事件进行处理并将其发送到名enriched_advanced_logs
为的不同 OpenSearch 索引。
version: "2" log-pipeline: source: http: ... processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_logs" - pipeline: name: "log_advanced_pipeline" log_advanced_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_advanced_logs"
死信队列
死信队列 (DLQs) 是管道未能写入接收器的事件的目的地。在 OpenSearch Ingestion 中,您必须指定一个具有相应写入权限的 Amazon S3 存储桶,才能将其用作 DLQ。您可以向管道中的每个接收器添加 DLQ 配置。当管道遇到写入错误时,它会在配置的 S3 存储桶中创建 DLQ 对象。DLQ 对象作为一组失败事件存在于 JSON 文件中。
满足以下任意条件时,管道会向 DLQ 写入事件:
-
OpenSearch 接收器的最大重试次数已用完。 OpenSearch 对于此设置,摄取至少需要 16 个。
-
由于出现错误条件,接收器拒绝事件。
配置
要为子管道配置死信队列,请在配置接收器目标时选择启用 S3 DLQ。然后,为队列指定所需的设置。有关更多信息,请参阅 Data Prepper DLQ 文档中的配置
写入此 S3 DLQ 的文件具有以下命名模式:
dlq-v${version}-${pipelineName}-${pluginId}-${timestampIso8601}-${uniqueId}
有关手动配置管道角色以允许访问 DLQ 写入的 S3 存储桶的说明,请参阅。写入Amazon S3 或死信队列
示例
考虑以下示例 DLQ 文件:
dlq-v2-apache-log-pipeline-opensearch-2023-04-05T15:26:19.152938Z-e7eb675a-f558-4048-8566-dac15a4f8343
以下是未能写入接收器并发送到 DLQ S3 存储桶进行进一步分析的数据示例:
Record_0 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "sample log" timestamp "2023-04-14T10:36:01.070Z" Record_1 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "another sample log" timestamp "2023-04-14T10:36:01.071Z"
索引管理
Amazon OpenSearch Ingestion 具有许多索引管理功能,包括以下功能。
创建索引
您可以在管道接收器中指定索引名称, OpenSearch Ingestion 会在预调配管道时创建索引。如果索引已经存在,管道会将其用于索引传入事件。如果您停止并重启管道,或者更新其 YAML 配置,如果这些索引尚不存在,则管道会尝试创建新的索引。管道始终不会删除索引。
以下示例为接收器在预调配管道时创建两个索引:
sink: - opensearch: index: apache_logs - opensearch: index: nginx_logs
生成索引名称和模式
您可以使用传入事件字段的变量来生成动态索引名称。在接收器配置中,使用格式 string${}
表示字符串插值,并使用 JSON 指针从事件中提取字段。index_type
的选项是 custom
或 management_disabled
。由于 OpenSearch 域名和 OpenSearch 无服务器集合management_disabled
的index_type
默认值为,因此可以将其保留为未设置。custom
例如,以下管道从传入事件中选择 metadataType
字段以生成索引名称。
pipeline: ... sink: opensearch: index: "metadata-${metadataType}"
以下配置继续每天或每小时生成一个新索引。
pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd}" pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd.HH}"
索引名称也可以是以日期-时间模式作为后缀的纯字符串,例如 my-index-%{yyyy.MM.dd}
。当接收器向发送数据时 OpenSearch,它会使用 UTC 时间代替日期-时间模式,并每天创建一个新索引,例如。my-index-2022.01.25
有关更多信息,请参阅DateTimeFormatter
该索引名称也可以是带有/不带日期-时间样式后缀的格式化字符串,例如 my-${index}-name
。当接收器向发送数据时 OpenSearch,它会使用正在处理的事件中的值代替该"${index}"
部分。如果格式为 "${index1/index2/index3}"
,则使用事件中的值代替字段 index1/index2/index3
。
正在生成文档 IDs
管道可以在为文档建立文档的索引时生成文档 ID OpenSearch。它可以 IDs 从传入事件中的字段推断出这些文档。
此示例使用传入事件的 uuid
字段生成文档 ID。
pipeline: ... sink: opensearch: index_type: custom index: "metadata-${metadataType}-%{yyyy.MM.dd}" "document_id": "uuid"
在以下示例中,添加条目uuid
和 other_field
以生成文档 ID。
该create
操作可确保 IDs 不会覆盖具有相同内容的文档。管道会丢弃重复的文档,而不会出现任何重试或 DLQ 事件。由于使用此操作的管道作者的目的在于避免更新现有文档,因而这一预期十分合理。
pipeline: ... processor: - add_entries: entries: - key: "my_doc_id_field" format: "${uuid}-${other_field}" sink: - opensearch: ... action: "create" document_id: "my_doc_id"
您可能需要将事件的文档 ID 设置为子对象中的字段。在以下示例中, OpenSearch 接收器插件使用子对象info/id
生成文档 ID。
sink: - opensearch: ... document_id: info/id
鉴于以下事件,管道将生成一个 _id
字段设置为 json001
的文档:
{ "fieldA":"arbitrary value", "info":{ "id":"json001", "fieldA":"xyz", "fieldB":"def" } }
生成路由 ID IDs
您可以使用 OpenSearch 接收器插件中的routing_field
选项,将文档路由属性 (_routing
) 的值设置为传入事件中的值。
路由支持 JSON 指针语法,因此嵌套字段也可用,而不仅仅是顶级字段。
sink: - opensearch: ... routing_field: metadata/id document_id: id
鉴于以下事件,插件将生成一个 _routing
字段设置为 abcd
的文档:
{ "id":"123", "metadata":{ "id":"abcd", "fieldA":"valueA" }, "fieldB":"valueB" }
有关创建供管道在创建索引时使用的索引模板的说明,请参阅索引模板
End-to-end 确认
OpenSearch Ingestion 通过使用end-to-end确认,跟踪数据在无状态管道中从源到接收器的传输,从而确保数据的持久性和可靠性。
注意
目前,只有 S 3 源
通过 end-to-end确认,管道源插件会创建一个确认组合来监控一批事件。当这些事件成功发送到其接收器时,它会收到肯定应答,或者当任何事件无法发送到其接收器时,它会收到否定应答。
如果管道组件出现故障或崩溃,或者源未能收到确认,则源会超时并采取必要的操作,例如重试或记录失败。如果管道配置了多个接收器或多个子管道,则只有在将事件发送到所有子管道中的所有接收器之后,才会发送事件级别确认。如果接收器配置了 DLQ,则 end-to-end确认还会跟踪写入 DLQ 的事件。
要启用 end-to-end确认,请展开 Amazon S3 源配置中的其他选项,然后选择启用 end-to-end消息确认。
源背压
当管道忙于处理数据,或其接收器暂时关闭或数据摄取速度缓慢时,管道可能会承受反向压力。 OpenSearch Ingestion 有不同的方法来处理反向压力,具体取决于管道使用的源插件。
HTTP 源
使用 HTTP 源
-
缓冲区 — 当缓冲区已满时,管道开始将错误代码为 408 的 HTTP 状态
REQUEST_TIMEOUT
返回到源端点。缓冲区被释放后,管道将重新开始处理 HTTP 事件。 -
源线程 — 当所有 HTTP 源线程都忙于执行请求,并且未处理的请求队列大小已超过允许的最大请求数时,管道开始将错误代码为 429 的 HTTP 状态
TOO_MANY_REQUESTS
返回到源端点。当请求队列降至允许的最大队列大小以下时,管道将重新开始处理请求。
OTel 来源
当使用 OpenTelemetry 源(OTel 日志REQUEST_TIMEOUT
。缓冲区被释放后,管道将重新开始处理事件。
S3 源
当带有 S3
如果接收器关闭或无法摄取数据,并且启用了源的 end-to-end确认,则管道将停止处理 SQS 通知,直到收到来自所有接收器的成功确认为止。