查询 AWS WAF 日志 - Amazon Athena
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

查询 AWS WAF 日志

AWS WAF 日志包含您的 Web ACL 所分析的流量相关信息,例如 AWS WAF 从 AWS 资源收到请求的时间,有关请求的详细信息,以及每个请求所匹配的规则的操作。

您可以为 AWS WAF 日志启用访问日志记录并将其保存到 Amazon S3。记下您保存这些日志的 Amazon S3 存储桶,您可以为它们创建 Athena 表并在 中查询它们Athena。

有关启用AWS WAF日志和日志记录结构的更多信息,请参阅 中的记录 Web ACL AWS WAF 开发人员指南流量信息

有关如何将 AWS WAF 日志聚合到中央数据湖存储库并使用 查询它们的示例Athena,请参阅AWS大数据博客文章使用 Amazon Amazon Athena 和 Amazon QuickSight分析 AWS WAF 日志。

为 AWS WAF 日志创建表

创建 AWS WAF 表

  1. 将以下 DDL 语句复制并粘贴到 Athena 控制台中。修改用于存储日志的 LOCATION 存储桶的 Amazon S3

    此查询使用 OpenX JSON - SerDe. SerDe 爬网程序AWS Glue在分析AWS WAF日志时建议表格式和 。

    注意

    SerDe 希望 中的 WAF Amazon S3 日志中的每个 JSON 记录都位于一行文本上,并且没有行终止字符用于分隔记录中的字段。如果 WAF 日志 JSON 文本为美观的打印格式,您可能会收到错误消息 HIVE_CURSOR_ERROR: Row is not a valid JSON Object 当您在创建表后尝试查询表时。

    CREATE EXTERNAL TABLE `waf_logs`( `timestamp` bigint, `formatversion` int, `webaclid` string, `terminatingruleid` string, `terminatingruletype` string, `action` string, `terminatingrulematchdetails` array< struct< conditiontype:string, location:string, matcheddata:array<string> > >, `httpsourcename` string, `httpsourceid` string, `rulegrouplist` array< struct< rulegroupid:string, terminatingrule:struct< ruleid:string, action:string, rulematchdetails:string >, nonterminatingmatchingrules:array< struct< ruleid:string, action:string, rulematchdetails:array< struct< conditiontype:string, location:string, matcheddata:array<string> > > > >, excludedrules:string > >, `ratebasedrulelist` array< struct< ratebasedruleid:string, limitkey:string, maxrateallowed:int > >, `nonterminatingmatchingrules` array< struct< ruleid:string, action:string > >, `httprequest` struct< clientip:string, country:string, headers:array< struct< name:string, value:string > >, uri:string, args:string, httpversion:string, httpmethod:string, requestid:string > ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' WITH SERDEPROPERTIES ( 'paths'='action,formatVersion,httpRequest,httpSourceId,httpSourceName,nonTerminatingMatchingRules,rateBasedRuleList,ruleGroupList,terminatingRuleId,terminatingRuleMatchDetails,terminatingRuleType,timestamp,webaclId') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://athenawaflogs/WebACL/'
  2. 在 Athena 控制台中运行查询。查询完成后,Athena 注册 waf_logs 表,使其中的数据可用于查询。

AWS WAF 日志的示例查询

以下查询计算 RATE_BASED 终止规则阻止某个 IP 地址的次数。

SELECT COUNT(httpRequest.clientIp) as count, httpRequest.clientIp FROM waf_logs WHERE terminatingruletype='RATE_BASED' AND action='BLOCK' GROUP BY httpRequest.clientIp ORDER BY count LIMIT 100;

以下查询针对来自属于爱尔兰 (IE) IP 地址的请求,计算请求到达但被 RATE_BASED 终止规则阻止的次数。

SELECT COUNT(httpRequest.country) as count, httpRequest.country FROM waf_logs WHERE terminatingruletype='RATE_BASED' AND httpRequest.country='IE' GROUP BY httpRequest.country ORDER BY count LIMIT 100;

以下查询计算请求被阻止的次数,结果按 WebACL、RuleId、 ClientIP和 HTTP 请求 URI 分组。

SELECT COUNT(*) AS count, webaclid, terminatingruleid, httprequest.clientip, httprequest.uri FROM waf_logs WHERE action='BLOCK' GROUP BY webaclid, terminatingruleid, httprequest.clientip, httprequest.uri ORDER BY count DESC LIMIT 100;

以下查询计算特定终止规则 ID 匹配的次数 (WHERE terminatingruleid='e9dd190d-7a43-4c06-bcea-409613d9506e'). 然后,查询按 WebACL、Action (操作ClientIP)、 和 HTTP 请求 URI 对结果进行分组。

SELECT COUNT(*) AS count, webaclid, action, httprequest.clientip, httprequest.uri FROM waf_logs WHERE terminatingruleid='e9dd190d-7a43-4c06-bcea-409613d9506e' GROUP BY webaclid, action, httprequest.clientip, httprequest.uri ORDER BY count DESC LIMIT 100;

以下查询使用 from_unixtimeto_iso8601 函数以人类可读的 ISO 8601 格式返回 timestamp 字段(例如2019-12-13T23:40:12.000Z,而不是 1576280412771)。该查询还返回 HTTP 源名称、源 ID 和 请求。

SELECT to_iso8601(from_unixtime(timestamp / 1000)) as time_ISO_8601, httpsourcename, httpsourceid, httprequest FROM waf_logs LIMIT 10;

以下查询在 WHERE 子句中使用筛选条件为过去 24 小时的记录返回相同的字段。

SELECT to_iso8601(from_unixtime(timestamp/1000)) AS time_ISO_8601, httpsourcename, httpsourceid, httprequest FROM waf_logs WHERE from_unixtime(timestamp/1000) > now() - interval '1' day LIMIT 10;

有关日期和时间函数的更多信息,请参阅 Presto 文档中的日期和时间函数和运算符