查询 Amazon WAF 日志 - Amazon Athena
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

查询 Amazon WAF 日志

Amazon WAF 是一个 Web 应用程序防火墙,可以监视和控制受保护的 Web 应用程序从客户端收到的 HTTP 和 HTTPS 请求。您可以通过在 Amazon WAF Web 访问控制列表(ACL)中配置规则来定义如何处理 Web 请求。然后,您可以通过将 Web ACL 关联到 Web 应用程序来保护该应用程序。您可以使用 Amazon WAF 保护的 Web 应用程序资源的示例包括 Amazon CloudFront 分配、Amazon API Gateway REST API 和应用程序负载均衡器。有关 Amazon WAF 的更多信息,请参阅《Amazon WAF Developer Guide》中的 Amazon WAF

Amazon WAF 日志包含您的 Web ACL 所分析的流量相关信息,例如 Amazon WAF 从 Amazon 资源收到请求的时间,有关请求的详细信息,以及每个请求所匹配的规则的操作。

您可以配置 Amazon WAF Web ACL 将日志发布到多个目标中的一个目标,即您可以查询和查看这些日志的地方。有关配置 Web ACL 日志记录和 Amazon WAF 日志内容的更多信息,请参阅《Amazon WAF Developer Guide》中的 Logging Amazon WAF web ACL traffic

有关如何将 Amazon WAF 日志聚合到中央数据湖存储库并使用 Athena 进行查询的示例,请参阅 Amazon 大数据博客文章使用 OpenSearch 服务、Amazon Athena 和 Amazon QuickSight 分析 Amazon WAF 日志

本主题提供两个示例 CREATE TABLE 语句:一个使用分区,另一个不使用分区。

注意

本主题中的 CREATE TABLE 语句可以用于 v1 和 v2 Amazon WAF 日志。在 v1 中,webaclid 字段包含一个 ID。在 v2 中,webaclid 字段包含完整的 ARN。这里的 CREATE TABLE 语句通过使用 string 数据类型未知地处理此内容。

使用分区投影为 Athena 中的 Amazon WAF S3 日志创建表

由于 Amazon WAF 日志具有您可以预先指定其分区方案的已知结构,因此您可以使用 Athena 分区投影功能减少查询运行时间并自动管理分区。当添加新数据时,分区投影会自动添加新分区。这样就不必使用 ALTER TABLE ADD PARTITION 手动添加分区了。

以下示例 CREATE TABLE 语句会自动在 Amazon WAF 日志上从指定日期开始到当前日期为止,为四个不同 Amazon 区域使用分区投影。本示例中的 PARTITION BY 子句按区域和日期进行分区,但您可以根据自己的要求修改此子句。根据需要修改字段以匹配您的日志输出。在 LOCATIONstorage.location.template 子句中,将 bucketaccountID 占位符替换为值,该值标识 Amazon WAF 日志在 Amazon S3 存储桶中的位置。对于 projection.day.range,将 2021/01/01 替换为要使用的开始日期。成功运行查询后,您可以查询表。您无需运行 ALTER TABLE ADD PARTITION 来加载分区。

CREATE EXTERNAL TABLE `waf_logs`( `timestamp` bigint, `formatversion` int, `webaclid` string, `terminatingruleid` string, `terminatingruletype` string, `action` string, `terminatingrulematchdetails` array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, `httpsourcename` string, `httpsourceid` string, `rulegrouplist` array < struct < rulegroupid: string, terminatingrule: struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > >, nonterminatingmatchingrules: array < struct < ruleid: string, action: string, overriddenaction: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, challengeresponse: struct < responsecode: string, solvetimestamp: string >, captcharesponse: struct < responsecode: string, solvetimestamp: string > > >, excludedrules: string > >, `ratebasedrulelist` array < struct < ratebasedruleid: string, limitkey: string, maxrateallowed: int > >, `nonterminatingmatchingrules` array < struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, challengeresponse: struct < responsecode: string, solvetimestamp: string >, captcharesponse: struct < responsecode: string, solvetimestamp: string > > >, `requestheadersinserted` array < struct < name: string, value: string > >, `responsecodesent` string, `httprequest` struct < clientip: string, country: string, headers: array < struct < name: string, value: string > >, uri: string, args: string, httpversion: string, httpmethod: string, requestid: string >, `labels` array < struct < name: string > >, `captcharesponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `challengeresponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `ja3Fingerprint` string, `oversizefields` string, `requestbodysize` int, `requestbodysizeinspectedbywaf` int ) PARTITIONED BY ( `region` string, `date` string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/AWSLogs/accountID/WAFLogs/region/DOC-EXAMPLE-WEBACL/' TBLPROPERTIES( 'projection.enabled' = 'true', 'projection.region.type' = 'enum', 'projection.region.values' = 'us-east-1,us-west-2,eu-central-1,eu-west-1', 'projection.date.type' = 'date', 'projection.date.range' = '2021/01/01,NOW', 'projection.date.format' = 'yyyy/MM/dd', 'projection.date.interval' = '1', 'projection.date.interval.unit' = 'DAYS', 'storage.location.template' = 's3://DOC-EXAMPLE-BUCKET/AWSLogs/accountID/WAFLogs/${region}/DOC-EXAMPLE-WEBACL/${date}/')
注意

示例中 LOCATION 子句中的路径格式是标准格式,但可能因所实施的 Amazon WAF 配置而异。例如,以下示例 Amazon WAF 日志路径适用于 CloudFront 分配:

s3://DOC-EXAMPLE-BUCKET/AWSLogs/12345678910/WAFLogs/cloudfront/cloudfronyt/2022/08/08/17/55/

如果您在创建或查询 Amazon WAF 日志表时遇到问题,请确认日志数据位置或联系 Amazon Web Services Support

更多有关分区投影的信息,请参阅 使用 Amazon Athena 分区投影

创建不进行分区的 Amazon WAF 日志表

本节介绍如何创建不进行分区或分区投影的 Amazon WAF 日志表。

注意

出于性能和成本原因,不建议使用非分区架构进行查询。有关更多信息,请参阅 Amazon 大数据博客中的 Top 10 Performance Tuning Tips for Amazon Athena(Amazon Athena 的十大性能优化技巧)。

创建 Amazon WAF 表

  1. 将以下 DDL 语句复制并粘贴到 Athena 控制台中。根据需要修改字段以匹配您的日志输出。修改 Amazon S3 存储桶的 LOCATION 以对应用于存储日志的存储桶。

    此查询使用 OpenX JSON SerDe

    注意

    SerDe 期望每个 JSON 文档都位于单行文本中,并且不使用行终止字符分隔记录中的字段。如果 JSON 文本采用美观的打印格式,当您在创建表后尝试对其进行查询时,可能会收到类似以下内容的错误消息:HIVE_CURSOR_ERROR: Row is not a valid JSON Object(HIVE_CURSOR_ERROR:行不是有效的 JSON 对象)或 HIVE_CURSOR_ERROR: JsonParseException: Unexpected end-of-input: expected close marker for OBJECT(HIVE_CURSOR_ERROR:JsonParseException:意外的输入结束:对象的预期关闭标记)。有关更多信息,请参阅 GitHub 上 OpenX SerDe 文档中的 JSON 数据文件

    CREATE EXTERNAL TABLE `waf_logs`( `timestamp` bigint, `formatversion` int, `webaclid` string, `terminatingruleid` string, `terminatingruletype` string, `action` string, `terminatingrulematchdetails` array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, `httpsourcename` string, `httpsourceid` string, `rulegrouplist` array < struct < rulegroupid: string, terminatingrule: struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > >, nonterminatingmatchingrules: array < struct < ruleid: string, action: string, overriddenaction: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, challengeresponse: struct < responsecode: string, solvetimestamp: string >, captcharesponse: struct < responsecode: string, solvetimestamp: string > > >, excludedrules: string > >, `ratebasedrulelist` array < struct < ratebasedruleid: string, limitkey: string, maxrateallowed: int > >, `nonterminatingmatchingrules` array < struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, challengeresponse: struct < responsecode: string, solvetimestamp: string >, captcharesponse: struct < responsecode: string, solvetimestamp: string > > >, `requestheadersinserted` array < struct < name: string, value: string > >, `responsecodesent` string, `httprequest` struct < clientip: string, country: string, headers: array < struct < name: string, value: string > >, uri: string, args: string, httpversion: string, httpmethod: string, requestid: string >, `labels` array < struct < name: string > >, `captcharesponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `challengeresponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `ja3Fingerprint` string, `oversizefields` string, `requestbodysize` int, `requestbodysizeinspectedbywaf` int ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/'
  2. 在 Athena 控制台查询编辑器中运行 CREATE EXTERNAL TABLE 语句。这将注册 waf_logs 表,并使其中的数据可用于来自 Athena 的查询。

Amazon WAF 日志的示例查询

以下许多示例查询使用之前在此文档中创建的分区投影表。根据您的要求修改示例中的表名称、列值和其他变量。若要提高查询的性能并降低成本,请在筛选条件中添加分区列。

 

 

 

– 统计包含指定术语的引用站点数量

以下查询计算指定日期范围内包含“amazon”一词的引用者数量。

WITH test_dataset AS (SELECT header FROM waf_logs CROSS JOIN UNNEST(httprequest.headers) AS t(header) WHERE "date" >= '2021/03/01' AND "date" < '2021/03/31') SELECT COUNT(*) referer_count FROM test_dataset WHERE LOWER(header.name)='referer' AND header.value LIKE '%amazon%'
– 统计过去 10 天内与排除规则匹配的所有匹配 IP 地址

以下查询计算过去 10 天内 IP 地址与规则组中排除规则匹配的次数。

WITH test_dataset AS (SELECT * FROM waf_logs CROSS JOIN UNNEST(rulegrouplist) AS t(allrulegroups)) SELECT COUNT(*) AS count, "httprequest"."clientip", "allrulegroups"."excludedrules", "allrulegroups"."ruleGroupId" FROM test_dataset WHERE allrulegroups.excludedrules IS NOT NULL AND from_unixtime(timestamp/1000) > now() - interval '10' day GROUP BY "httprequest"."clientip", "allrulegroups"."ruleGroupId", "allrulegroups"."excludedrules" ORDER BY count DESC
- 按匹配次数对所有已计数的托管规则进行分组

如果您在 2022 年 10 月 27 日之前在 Web ACL 配置中将规则组规则操作设置为“计数”,Amazon WAF 在 Web ACL JSON 中将覆盖内容保存为 excludedRules。现在,用于将规则替换为“计数”的 JSON 设置位于 ruleActionOverrides 设置中。有关更多信息,请参阅《Amazon WAF 开发人员指南》中的规则组中的操作覆盖。要从新的日志结构中提取计数模式下的托管规则,请在 ruleGroupList 部分而不是 excludedRules 字段中查询 nonTerminatingMatchingRules,如下例所示。

SELECT count(*) AS count, httpsourceid, httprequest.clientip, t.rulegroupid, t.nonTerminatingMatchingRules FROM "waf_logs" CROSS JOIN UNNEST(rulegrouplist) AS t(t) WHERE action <> 'BLOCK' AND cardinality(t.nonTerminatingMatchingRules) > 0 GROUP BY t.nonTerminatingMatchingRules, action, httpsourceid, httprequest.clientip, t.rulegroupid ORDER BY "count" DESC Limit 50
- 按匹配次数对所有已计数的自定义规则进行分组

以下查询按匹配次数对所有已计数的自定义规则进行分组。

SELECT count(*) AS count, httpsourceid, httprequest.clientip, t.ruleid, t.action FROM "waf_logs" CROSS JOIN UNNEST(nonterminatingmatchingrules) AS t(t) WHERE action <> 'BLOCK' AND cardinality(nonTerminatingMatchingRules) > 0 GROUP BY t.ruleid, t.action, httpsourceid, httprequest.clientip ORDER BY "count" DESC Limit 50

有关自定义规则和托管规则组的日志位置的信息,请参阅《Amazon WAF 开发人员指南》中的监控和调整

使用日期和时间

– 以人类可读 ISO 8601 格式返回时间戳字段

以下查询使用 from_unixtimeto_iso8601 函数以人类可读的 ISO 8601 格式返回 timestamp 字段(例如 2019-12-13T23:40:12.000Z 而不是 1576280412771)。该查询还返回 HTTP 源名称、源 ID 和请求。

SELECT to_iso8601(from_unixtime(timestamp / 1000)) as time_ISO_8601, httpsourcename, httpsourceid, httprequest FROM waf_logs LIMIT 10;
– 返回过去 24 小时的记录

以下查询使用 WHERE 子句中的筛选条件返回过去 24 小时内记录的 HTTP 源名称、HTTP 源 ID 和 HTTP 请求字段。

SELECT to_iso8601(from_unixtime(timestamp/1000)) AS time_ISO_8601, httpsourcename, httpsourceid, httprequest FROM waf_logs WHERE from_unixtime(timestamp/1000) > now() - interval '1' day LIMIT 10;
– 返回指定日期范围和 IP 地址的记录

以下查询列出了指定的客户端 IP 地址在指定日期范围内的记录。

SELECT * FROM waf_logs WHERE httprequest.clientip='53.21.198.66' AND "date" >= '2021/03/01' AND "date" < '2021/03/31'
– 对于指定的日期范围,计算在五分钟间隔内的 IP 地址数

对于特定日期范围,以下查询计算在五分钟间隔内的 IP 地址数。

WITH test_dataset AS (SELECT format_datetime(from_unixtime((timestamp/1000) - ((minute(from_unixtime(timestamp / 1000))%5) * 60)),'yyyy-MM-dd HH:mm') AS five_minutes_ts, "httprequest"."clientip" FROM waf_logs WHERE "date" >= '2021/03/01' AND "date" < '2021/03/31') SELECT five_minutes_ts,"clientip",count(*) ip_count FROM test_dataset GROUP BY five_minutes_ts,"clientip"
– 计算过去 10 天内 X-Forwarded-For IP 的数量

以下查询将筛选请求标头,并统计过去 10 天内 X-Forwarded-For IP 的数量。

WITH test_dataset AS (SELECT header FROM waf_logs CROSS JOIN UNNEST (httprequest.headers) AS t(header) WHERE from_unixtime("timestamp"/1000) > now() - interval '10' DAY) SELECT header.value AS ip, count(*) AS COUNT FROM test_dataset WHERE header.name='X-Forwarded-For' GROUP BY header.value ORDER BY COUNT DESC

有关日期和时间函数的更多信息,请参阅 Trino 文档中的 日期与时间函数和运算符

使用阻止的请求和地址

– 提取被指定规则类型阻止的前 100 个 IP 地址

下面的查询将提取并统计在指定的日期范围内被 RATE_BASED 终止规则阻止的前 100 个 IP 地址。

SELECT COUNT(httpRequest.clientIp) as count, httpRequest.clientIp FROM waf_logs WHERE terminatingruletype='RATE_BASED' AND action='BLOCK' and "date" >= '2021/03/01' AND "date" < '2021/03/31' GROUP BY httpRequest.clientIp ORDER BY count DESC LIMIT 100
– 计算来自指定国家/地区的请求被阻止的次数

以下查询针对来自属于爱尔兰 (IE) IP 地址的请求,计算请求到达但被 RATE_BASED 终止规则阻止的次数。

SELECT COUNT(httpRequest.country) as count, httpRequest.country FROM waf_logs WHERE terminatingruletype='RATE_BASED' AND httpRequest.country='IE' GROUP BY httpRequest.country ORDER BY count LIMIT 100;
– 计算请求被阻止的次数,按特定属性分组

以下查询计算请求被阻止的次数,并按照 WebACL、RuleId、ClientIP 和 HTTP 请求 URI 对结果分组。

SELECT COUNT(*) AS count, webaclid, terminatingruleid, httprequest.clientip, httprequest.uri FROM waf_logs WHERE action='BLOCK' GROUP BY webaclid, terminatingruleid, httprequest.clientip, httprequest.uri ORDER BY count DESC LIMIT 100;
– 计算特定终止规则 ID 匹配的次数

以下查询计算特定终止规则 ID 匹配的次数 (WHERE terminatingruleid='e9dd190d-7a43-4c06-bcea-409613d9506e')。然后,查询按照 WebACL、操作、ClientIP 和 HTTP 请求 URI 对结果分组。

SELECT COUNT(*) AS count, webaclid, action, httprequest.clientip, httprequest.uri FROM waf_logs WHERE terminatingruleid='e9dd190d-7a43-4c06-bcea-409613d9506e' GROUP BY webaclid, action, httprequest.clientip, httprequest.uri ORDER BY count DESC LIMIT 100;
– 检索指定日期范围内被阻止的前 100 个 IP 地址

以下查询将提取在指定日期范围内被阻止的前 100 个 IP 地址。该查询还列出了 IP 地址被阻止的次数。

SELECT "httprequest"."clientip", "count"(*) "ipcount", "httprequest"."country" FROM waf_logs WHERE "action" = 'BLOCK' and "date" >= '2021/03/01' AND "date" < '2021/03/31' GROUP BY "httprequest"."clientip", "httprequest"."country" ORDER BY "ipcount" DESC limit 100