本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
Amazon OpenSearch Ingestion 中的管道功能概述
Amazon OpenSearch Ingestion 预置管道,包括一个源、一个缓冲区、零个或多个处理器以及一个或多个接收器。提取管道由 Data Prepper 作为数据引擎提供支持。有关管道各个组件的概述,请参见 重要概念。
以下各节概述了 Amazon OpenSearch Ingestion 中一些最常用的功能。
注意
该列表不是管道可用功能的详尽列表。有关管道所有可用功能的综合文档,请参阅 Data Prepper 文档
持久缓冲功能
永久缓冲区将您的数据存储在跨多个可用区的磁盘缓冲区中。对于所有支持的基于推送的源,您可以使用永久缓冲区来摄取数据,无需设置独立的缓冲区。其中包括HTTP日志、跟踪和指标的 OpenTelemetry 来源。
要启用永久缓冲区,请在创建或更新管道时选择启用永久缓冲区。有关更多信息,请参阅创建 Amazon OpenSearch Ingestion 管道。 OpenSearch Ingestion 会根据您为管道指定的摄取 OpenSearch 计算单位(摄取OCUs)自动确定所需的缓冲容量。
如果您为管道启用永久缓冲,则源的默认最大请求负载大小为 10 MB,HTTP源的最大请求负载大小为 OpenTelemetry 4 MB。对于HTTP源,您可以将最大请求负载大小增加到 20 MB。请求负载大小是整个HTTP请求的大小,通常包含多个事件。任何给定事件均不得超过 3.5 MB。如果您不启用永久缓冲区,则无法将最大有效载荷大小修改为 20 MB。
对于启用了永久缓冲区的管道,配置的管道单位数将在计算单位和缓冲区单位之间进行分配。如果管道使用的是CPU密集型处理器(grok、键值和/或拆分字符串),则指定的单元将按照 1:1 的缓冲区与计算的比例进行拆分,否则它们将以 3:1 的比例进行拆分。对于其中的每个比率,人们倾向于预置更多的计算单位。
例如:
-
具有 grok 和 2 个最大单位的管道:1 个计算单位和 1 个缓冲区单位
-
具有 grok 和 5 个最大单位的管道:3 个计算单位和 2 个缓冲区单位
-
无处理器但具有 2 个最大单位的管道:1 个计算单位和 1 个缓冲区单位
-
无处理器但具有 4 个最大单位的管道:1 个计算单位和 3 个缓冲区单位
-
具有 grok 和 5 个最大单位的管道:2 个计算单位和 3 个缓冲区单位
默认情况下,管道使用加密 Amazon 拥有的密钥 缓冲区数据。这些管道不需要任何额外的管道角色权限。或者,您可以指定客户托管密钥并向管道角色添加以下IAM权限:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "KeyAccess", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:GenerateDataKeyWithoutPlaintext" ], "Resource": "arn:aws:kms:
{region}
:{aws-account-id}
:key/1234abcd-12ab-34cd-56ef-1234567890ab
" } ] }
有关更多信息,请参阅《Amazon Key Management Service 开发人员指南》中的客户托管密钥。
注意
如果您禁用持久缓冲功能,则您的管道将更新为完全在内存缓冲上运行。
配置永久缓冲
Amazon OpenSearch Ingestion 会跟踪写入接收器的数据,如果接收器中断或其他导致数据无法成功写入的问题,Amazon Ingestion 会自动从上次成功检查点恢复写入。除了为管道设置的最小和最大 OpenSearch 计算单位 (OCU) 外,永久缓冲区不需要其他服务或组件。开启持久缓冲后,每个 Ingest OCU ion 现在都能够提供持久缓冲以及其现有的摄取、转换和路由数据的功能。启用永久缓冲后,数据将在缓冲区中保留 72 小时。Amazon OpenSearch Ingestion 从您为管道定义的最小和最大分配中动态分配缓冲区。OCUs
OCUs用于持久缓冲的 Ingestion 数量是根据数据源、流数据的转换以及数据写入的接收器动态计算的。由于摄取的一部分OCUs现在适用于持久缓冲,因此为了使您的管道保持相同的摄取吞吐量,您需要在开启持久缓冲时增加最小和最大摄取OCUs量。持久缓冲所需的缓冲量取决于您从中提取数据的来源,也取决于您对数据执行的处理类型。OCUs下表显示了使用不同源和处理器进行持续缓冲所需的数量。OCUs
拆分
您可以将 OpenSearch Ingestion 管道配置为将传入事件拆分为子管道,从而允许您对同一个传入事件执行不同类型的处理。
以下示例管道将传入事件拆分到两个子管道。每个子管道都使用自己的处理器来丰富和操作数据,然后将数据发送到不同的 OpenSearch 索引。
version: "2" log-pipeline: source: http: ... sink: - pipeline: name: "logs_enriched_one_pipeline" - pipeline: name: "logs_enriched_two_pipeline" logs_enriched_one_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_one_logs" logs_enriched_two_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_two_logs"
链接
您可以将多个子管道链接在一起,以便分块执行数据处理和扩充。换句话说,你可以在一个子管道中使用一定的处理能力来丰富传入的事件,然后将其发送到另一个子管道,使用不同的处理器进行进一步丰富,最后将其发送到其 OpenSearch 接收器。
在以下示例中,log_pipeline
子管道使用一组处理器丰富传入的日志事件,然后将该事件发送到名为的 OpenSearch 索引。enriched_logs
管道向子管道发送相同的事件,log_advanced_pipeline
子管道对其进行处理并将其发送到名enriched_advanced_logs
为的其他 OpenSearch 索引。
version: "2" log-pipeline: source: http: ... processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_logs" - pipeline: name: "log_advanced_pipeline" log_advanced_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_advanced_logs"
死信队列
死信队列 (DLQs) 是管道未能写入接收器的事件的目的地。在 OpenSearch Ingestion 中,您必须指定具有相应写入权限的 Amazon S3 存储桶才能用作。DLQ您可以为管道中的每个接收器添加DLQ配置。当管道遇到写入错误时,它会在配置的 S3 存储桶中创建DLQ对象。DLQ对象以失败事件数组的形式存在于JSON文件中。
当满足以下任一条件DLQ时,管道会将事件写入到:
-
max_retries
用于 OpenSearch 水槽的东西已经用完了。 OpenSearch 对于此选项,摄取至少需要 16。 -
由于出现错误条件,事件被接收器拒绝。
配置
要为子管道配置死信队列,请在 opensearch
接收器配置中指定 dlq
选项:
apache-log-pipeline: ... sink: opensearch: dlq: s3: bucket: "my-dlq-bucket" key_path_prefix: "dlq-files" region: "us-west-2" sts_role_arn: "arn:aws:iam::123456789012:role/dlq-role"
写入此 S3 的文件DLQ将采用以下命名模式:
dlq-v${version}-${pipelineName}-${pluginId}-${timestampIso8601}-${uniqueId}
有关更多信息,请参阅死信队列 () DLQ
有关配置 sts_role_arn
角色的说明,请参阅写入死信队列。
示例
考虑以下示例DLQ文件:
dlq-v2-apache-log-pipeline-opensearch-2023-04-05T15:26:19.152938Z-e7eb675a-f558-4048-8566-dac15a4f8343
以下是未能写入接收器并发送到 DLQ S3 存储桶进行进一步分析的数据示例:
Record_0 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "sample log" timestamp "2023-04-14T10:36:01.070Z" Record_1 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "another sample log" timestamp "2023-04-14T10:36:01.071Z"
索引管理
Amazon OpenSearch Ingestion 具有许多索引管理功能,包括以下功能。
创建索引
您可以在管道接收器中指定索引名称, OpenSearch Ingestion 在置备管道时会创建索引。如果索引已经存在,管道会将其用于索引传入事件。如果您停止并重新启动管道,或者更新其YAML配置,则管道会尝试创建新的索引(如果这些索引尚不存在)。管道始终不会删除索引。
以下示例为接收器在预调配管道时创建两个索引:
sink: - opensearch: index: apache_logs - opensearch: index: nginx_logs
生成索引名称和模式
您可以使用传入事件字段的变量来生成动态索引名称。在接收器配置中,使用格式string${}
表示字符串插值,并使用JSON指针从事件中提取字段。index_type
的选项是 custom
或 management_disabled
。由于 OpenSearch 域名和 OpenSearch 无服务器集合management_disabled
的index_type
默认值为,因此可以将其保留为未设置。custom
例如,以下管道从传入事件中选择 metadataType
字段以生成索引名称。
pipeline: ... sink: opensearch: index: "metadata-${metadataType}"
以下配置继续每天或每小时生成一个新索引。
pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd}" pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd.HH}"
索引名称也可以是以日期-时间模式作为后缀的纯字符串,例如 my-index-%{yyyy.MM.dd}
。当接收器向发送数据时 OpenSearch,它会将日期时间模式替换为UTC时间,并为每天创建一个新索引,例如。my-index-2022.01.25
有关更多信息,请参阅DateTimeFormatter
该索引名称也可以是带有/不带日期-时间样式后缀的格式化字符串,例如 my-${index}-name
。当接收器向发送数据时 OpenSearch,它会将该"${index}"
部分替换为正在处理的事件中的值。如果格式为 "${index1/index2/index3}"
,则使用事件中的值代替字段 index1/index2/index3
。
正在生成文档 IDs
管道可以在为文档编制索引时生成文档 ID OpenSearch。它可以IDs从传入事件中的字段中推断出这些文档。
此示例使用传入事件的 uuid
字段生成文档 ID。
pipeline: ... sink: opensearch: index_type: custom index: "metadata-${metadataType}-%{yyyy.MM.dd}" document_id_field: "uuid"
在以下示例中,添加条目uuid
和 other_field
以生成文档 ID。
该create
操作可确保具有相同内容的文档IDs不会被覆盖。该管道会丢弃重复的文档,而不会发生任何重试或DLQ事件。由于使用此操作的管道作者的目的在于避免更新现有文档,因而这一预期十分合理。
pipeline: ... processor: - add_entries: entries: - key: "my_doc_id_field" format: "${uuid}-${other_field}" sink: - opensearch: ... action: "create" document_id_field: "my_doc_id_field"
您可能需要将事件的文档 ID 设置为子对象中的字段。在以下示例中,s OpenSearch ink 插件使用子对象info/id
生成文档 ID。
sink: - opensearch: ... document_id_field: info/id
鉴于以下事件,管道将生成一个 _id
字段设置为 json001
的文档:
{ "fieldA":"arbitrary value", "info":{ "id":"json001", "fieldA":"xyz", "fieldB":"def" } }
正在生成路由 IDs
你可以使用 sin OpenSearch k 插件中的routing_field
选项将文档路由属性 (_routing
) 的值设置为来自传入事件的值。
路由支持JSON指针语法,因此嵌套字段也可用,而不仅仅是顶级字段。
sink: - opensearch: ... routing_field: metadata/id document_id_field: id
鉴于以下事件,插件将生成一个 _routing
字段设置为 abcd
的文档:
{ "id":"123", "metadata":{ "id":"abcd", "fieldA":"valueA" }, "fieldB":"valueB" }
有关创建供管道在创建索引时使用的索引模板的说明,请参阅索引模板
End-to-end 确认
OpenSearch Ingestion 使用end-to-end确认功能跟踪无状态管道中从源到接收器的传输,从而确保数据的持久性和可靠性。目前,只有 S 3 源
通过 end-to-end确认,管道源插件会创建一个确认集来监视一批事件。当这些事件成功发送到其接收器时,它会收到肯定应答,或者当任何事件无法发送到其接收器时,它会收到否定应答。
如果管道组件出现故障或崩溃,或者源未能收到确认,则源会超时并采取必要的操作,例如重试或记录失败。如果管道配置了多个接收器或多个子管道,则只有在将事件发送到所有子管道中的所有接收器之后,才会发送事件级别确认。如果接收器已DLQ配置,则 end-to-end确认还会跟踪写入的事件。DLQ
要启用 end-to-end确认,请在源配置中包含以下acknowledgments
选项:
s3-pipeline: source: s3: acknowledgments: true ...
源背压
当管道忙于处理数据,或者其接收器暂时关闭或数据采集速度缓慢时,管道可能会遇到背压。 OpenSearch 根据管道使用的源插件,Ingestion 有不同的处理背压的方法。
HTTP来源
使用HTTP源
-
缓冲区 — 当缓冲区已满时,管道开始将错误代码为 408
REQUEST_TIMEOUT
的HTTP状态返回到源端点。缓冲区被释放后,管道将重新开始处理HTTP事件。 -
源线程 — 当所有HTTP源线程都忙于执行请求并且未处理的请求队列大小已超过允许的最大请求数时,管道开始将错误代码为 429 的HTTP状态
TOO_MANY_REQUESTS
返回到源端点。当请求队列降至允许的最大队列大小以下时,管道将重新开始处理请求。
OTel来源
当使用 OpenTelemetry 源(OTel日志REQUEST_TIMEOUT
。缓冲区被释放后,管道将重新开始处理事件。
S3 源
当带有 S3
如果接收器关闭或无法采集数据,并且已为源启用 end-to-end确认功能,则管道将停止处理SQS通知,直到收到来自所有接收器的成功确认为止。