本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Amazon OpenSearch Ingestion 进行事件聚合
您可以使用 Amazon OpenSearch Ingestion 汇总一段时间内来自不同事件的数据。聚合事件可以帮助减少不必要的日志卷,并处理以独立事件形式出现的多行日志等用例。聚合处理器
聚合处理器状态存储在内存中。例如,为将四个事件组合为一个,处理器需要保留前三个事件的片段。一组聚合事件的状态保留一段可配置的时间。聚合可能会持续很长时间,具体取决于您的日志、使用的聚合操作以及处理器配置中的内存选项数量。
除这些示例之外,您也可以将日志聚合与条件路由蓝图一起使用。有关蓝图的更多信息,请参阅 使用蓝图创建管道。
基本用法
以下示例管道使用 Grok 处理器sourceIp
、destinationIp
和 port
,然后使用聚合处理器put_all
操作在 30 秒内对这些字段进行聚合。30 秒结束后,聚合日志将发送到 OpenSearch 接收器。
version: "2" aggregate_pipeline: source: http: path: "/${pipelineName}/logs" processor: - grok: match: log: ["%{IPORHOST:sourceIp} %{IPORHOST:destinationIp} %{NUMBER:port:int}"] - aggregate: group_duration: "30s" identification_keys: ["sourceIp", "destinationIp", "port"] action: put_all: sink: - opensearch: ... index: aggregated_logs
例如,考虑以下日志批次:
{ "log": "127.0.0.1 192.168.0.1 80", "status": 200 } { "log": "127.0.0.1 192.168.0.1 80", "bytes": 1000 } { "log": "127.0.0.1 192.168.0.1 80" "http_verb": "GET" }
Grok 处理器将提取 identification_keys
,创建以下日志:
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "port": 80, "status": 200 } { "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "port": 80, "bytes": 1000 } { "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "port": 80, "http_verb": "GET" }
当组在聚合处理器接收到第一个日志后 30 秒结束时,将以下聚合日志写入接收器:
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "port": 80, "status": 200, "bytes": 1000, "http_verb": "GET" }
删除重复项
您可以通过从传入事件派生密钥并为聚合处理器指定 remove_duplicates
选项来删除重复条目。此操作会立即处理组的第一个事件,并删除该组中的所有后续事件。
在以下示例中,使用识别密钥 sourceIp
和 destinationIp
处理第一个事件:
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200 }
然后,管道将删除以下事件,因为其中包含相同的密钥:
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 1000 }
管道处理此事件并创建新组,因为 sourceIp
有所不同:
{ "sourceIp": "127.0.0.2", "destinationIp": "192.168.0.1", "bytes": 1000 }
日志聚合和条件路由
您可以使用多个插件组合日志聚合与条件路由。在此示例中,子管道通过类似 HTTP 客户端log-aggregate-pipeline
接收日志, FluentBit 并通过将log
密钥中的值与常见的 Apache 日志模式进行匹配来从日志中提取重要值。
从采用 grok 模式的日志中提取的两个值包括 response
和 clientip
。然后,聚合处理器使用 clientip
值和 remove_duplicates
选项删除包含已在给定 group_duration
中处理过的 clientip
的所有日志。
管道中存在三个路由或条件语句。这些路由将响应值分为 2xx/3xx、4xx 和 5xx 响应。状态为 2xx 和 3xx 的日志发送到 aggregated_2xx_3xx
索引,状态为 4xx 的日志发送到 aggregated_4xx
索引,状态为 5xx 的日志发送到 aggregated_5xx
索引。
version: "2" log-aggregate-pipeline: source: http: # Provide the path for ingestion. ${pipelineName} will be replaced with pipeline name configured for this pipeline. # In this case it would be "/log-aggregate-pipeline/logs". This will be the FluentBit output URI value. path: "/${pipelineName}/logs" processor: - grok: match: log: [ "%{COMMONAPACHELOG_DATATYPED}" ] - aggregate: identification_keys: ["clientip"] action: remove_duplicates: group_duration: "180s" route: - 2xx_status: "/response >= 200 and /response < 300" - 3xx_status: "/response >= 300 and /response < 400" - 4xx_status: "/response >= 400 and /response < 500" - 5xx_status: "/response >= 500 and /response < 600" sink: - opensearch: ... index: "aggregated_2xx_3xx" routes: - 2xx_status - 3xx_status - opensearch: ... index: "aggregated_4xx" routes: - 4xx_status - opensearch: ... index: "aggregated_5xx" routes: - 5xx_status