使用 Amazon OpenSearch Ingestion 实现事件聚合 - 亚马逊 OpenSearch 服务
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Amazon OpenSearch Ingestion 实现事件聚合

您可以使用 Amazon OpenSearch Ingestion 汇总一段时间内不同事件的数据。聚合事件可以帮助减少不必要的日志卷,并处理以独立事件形式出现的多行日志等用例。聚合处理器是有状态处理器,根据一组指定识别密钥值对事件进行分组,并对每个组执行可配置操作。

聚合处理器状态存储在内存中。例如,为将四个事件组合为一个,处理器需要保留前三个事件的片段。一组聚合事件的状态保留一段可配置的时间。聚合可能会持续很长时间,具体取决于您的日志、使用的聚合操作以及处理器配置中的内存选项数量。

除这些示例之外,您也可以将日志聚合与条件路由蓝图一起使用。有关蓝图的更多信息,请参阅 使用蓝图创建管道

基本用法

以下示例管道使用 Grok 处理器提取字段 sourceIpdestinationIpport,然后使用聚合处理器put_all 操作在 30 秒内对这些字段进行聚合。30 秒结束后,聚合日志将发送到 OpenSearch 接收器。

version: "2" aggregate_pipeline: source: http: path: "/${pipelineName}/logs" processor: - grok: match: log: ["%{IPORHOST:sourceIp} %{IPORHOST:destinationIp} %{NUMBER:port:int}"] - aggregate: group_duration: "30s" identification_keys: ["sourceIp", "destinationIp", "port"] action: put_all: sink: - opensearch: ... index: aggregated_logs

例如,考虑以下日志批次:

{ "log": "127.0.0.1 192.168.0.1 80", "status": 200 } { "log": "127.0.0.1 192.168.0.1 80", "bytes": 1000 } { "log": "127.0.0.1 192.168.0.1 80" "http_verb": "GET" }

Grok 处理器将提取 identification_keys,创建以下日志:

{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "port": 80, "status": 200 } { "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "port": 80, "bytes": 1000 } { "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "port": 80, "http_verb": "GET" }

当组在聚合处理器接收到第一个日志后 30 秒结束时,将以下聚合日志写入接收器:

{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "port": 80, "status": 200, "bytes": 1000, "http_verb": "GET" }

删除重复项

您可以通过从传入事件派生密钥并为聚合处理器指定 remove_duplicates 选项来删除重复条目。此操作会立即处理组的第一个事件,并删除该组中的所有后续事件。

在以下示例中,使用识别密钥 sourceIpdestinationIp 处理第一个事件:

{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200 }

然后,管道将删除以下事件,因为其中包含相同的密钥:

{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 1000 }

管道处理此事件并创建新组,因为 sourceIp 有所不同:

{ "sourceIp": "127.0.0.2", "destinationIp": "192.168.0.1", "bytes": 1000 }

日志聚合和条件路由

您可以使用多个插件组合日志聚合与条件路由。在此示例中,子管道 log-aggregate-pipeline 通过 HTTP 客户端(例如,FluentBit)接收日志,并通过匹配 log 密钥值与通用 Apache 日志模式从日志中提取重要值。

从采用 grok 模式的日志中提取的两个值包括 responseclientip。然后,聚合处理器使用 clientip 值和 remove_duplicates 选项删除包含已在给定 group_duration 中处理过的 clientip 的所有日志。

管道中存在三个路由或条件语句。这些路由将响应值分为 2xx/3xx、4xx 和 5xx 响应。状态为 2xx 和 3xx 的日志发送到 aggregated_2xx_3xx 索引,状态为 4xx 的日志发送到 aggregated_4xx 索引,状态为 5xx 的日志发送到 aggregated_5xx 索引。

version: "2" log-aggregate-pipeline: source: http: # Provide the path for ingestion. ${pipelineName} will be replaced with pipeline name configured for this pipeline. # In this case it would be "/log-aggregate-pipeline/logs". This will be the FluentBit output URI value. path: "/${pipelineName}/logs" processor: - grok: match: log: [ "%{COMMONAPACHELOG_DATATYPED}" ] - aggregate: identification_keys: ["clientip"] action: remove_duplicates: group_duration: "180s" route: - 2xx_status: "/response >= 200 and /response < 300" - 3xx_status: "/response >= 300 and /response < 400" - 4xx_status: "/response >= 400 and /response < 500" - 5xx_status: "/response >= 500 and /response < 600" sink: - opensearch: ... index: "aggregated_2xx_3xx" routes: - 2xx_status - 3xx_status - opensearch: ... index: "aggregated_4xx" routes: - 4xx_status - opensearch: ... index: "aggregated_5xx" routes: - 5xx_status