查询 Amazon WAF 日志 - Amazon Athena
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

查询 Amazon WAF 日志

Amazon WAF日志包含您的 Web ACL 所分析的流量相关信息,例如Amazon WAF收到您的请求Amazon资源、有关请求的详细信息以及每个请求所匹配的规则的操作。

您可以启用访问日志记录以Amazon WAF记录并将其保存到 Amazon S3。记下您将这些日志保存到的 Amazon S3 存储桶,您可以为它们创建一个 Athena 表并在 Athena 中查询它们。

有关启用的更多信息Amazon WAF日志和有关日志记录结构的信息,请参阅记录 Web ACL 流量信息中的Amazon WAF开发人员指南.

有关如何聚合Amazon WAF登录到中央数据湖存储库并使用 Athena 进行查询,请参阅Amazon大数据博客帖子分析Amazon WAF日志与亚马逊 ES、Amazon Athena 和亚 Amazon QuickSight.

为 Amazon WAF 日志创建表

创建 Amazon WAF 表

  1. 将以下 DDL 语句复制并粘贴到 Athena 控制台中。修改LOCATION用于存储日志的 Amazon S3 存储桶。

    此查询使用 OpenX JSON SerDe。Amazon Glue 爬网程序在分析 Amazon WAF 日志时建议表格式和 SerDe。

    注意

    SerDe 期望 Amazon S3 中的 WAF 日志中的每条 JSON 记录都位于单行文本中,并且不使用行终止字符分隔记录中的字段。如果 WAF 日志 JSON 文本是非常打印的格式,您可能会收到以下错误消息:配置单元图标错误:行不是有效的 JSON 对象当您尝试在创建表后查询表时。

    CREATE EXTERNAL TABLE `waf_logs`( `timestamp` bigint, `formatversion` int, `webaclid` string, `terminatingruleid` string, `terminatingruletype` string, `action` string, `terminatingrulematchdetails` array< struct< conditiontype:string, location:string, matcheddata:array<string> > >, `httpsourcename` string, `httpsourceid` string, `rulegrouplist` array< struct< rulegroupid:string, terminatingrule:struct< ruleid:string, action:string, rulematchdetails:string >, nonterminatingmatchingrules:array< struct< ruleid:string, action:string, rulematchdetails:array< struct< conditiontype:string, location:string, matcheddata:array<string> > > > >, excludedrules:array< struct< ruleid:string, exclusiontype:string > > > >, `ratebasedrulelist` array< struct< ratebasedruleid:string, limitkey:string, maxrateallowed:int > >, `nonterminatingmatchingrules` array< struct< ruleid:string, action:string > >, `requestheadersinserted` string, `responsecodesent` string, `httprequest` struct< clientip:string, country:string, headers:array< struct< name:string, value:string > >, uri:string, args:string, httpversion:string, httpmethod:string, requestid:string >, `labels` array< struct< name:string > > ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' WITH SERDEPROPERTIES ( 'paths'='action,formatVersion,httpRequest,httpSourceId,httpSourceName,labels,nonTerminatingMatchingRules,rateBasedRuleList,requestHeadersInserted,responseCodeSent,ruleGroupList,terminatingRuleId,terminatingRuleMatchDetails,terminatingRuleType,timestamp,webaclId') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://athenawaflogs/WebACL/'
  2. 运行CREATE EXTERNAL TABLE语句在 Athena 控制台查询编辑器中。这将注册waf_logs表中的数据,并使其中的数据可用于 Athena 的查询。

创建表Amazon WAF使用分区投影在 Athena 中的日志

由于Amazon WAF日志具有一个已知结构,您可以提前指定其分区方案,那么您可以使用 Athena 分区投影功能减少查询运行时间并自动执行分区管理。分区投影会在添加新数据时自动添加新分区。这样,就不必手动添加分区了,方法是使用ALTER TABLE ADD PARTITION.

以下示例CREATE TABLE语句自动使用Amazon WAF日志从指定日期到现在为单个Amazon区域。在LOCATIONstorage.location.template子句中,将存储桶folder占位符,其中包含标识您的 Amazon S3 存储桶位置的值。Amazon WAF日志。适用于projection.day.range,替换2021 年/01/01,其中包含要使用的开始日期。在您成功运行查询后,您可以查询表。您无需运行ALTER TABLE ADD PARTITION来加载分区。

CREATE EXTERNAL TABLE `waf_logs`( `timestamp` bigint, `formatversion` int, `webaclid` string, `terminatingruleid` string, `terminatingruletype` string, `action` string, `terminatingrulematchdetails` array< struct< conditiontype:string, location:string, matcheddata:array<string> > >, `httpsourcename` string, `httpsourceid` string, `rulegrouplist` array< struct< rulegroupid:string, terminatingrule:struct< ruleid:string, action:string, rulematchdetails:string >, nonterminatingmatchingrules:array< struct< ruleid:string, action:string, rulematchdetails:array< struct< conditiontype:string, location:string, matcheddata:array<string> > > > >, excludedrules:array< struct< ruleid:string, exclusiontype:string > > > >, `ratebasedrulelist` array< struct< ratebasedruleid:string, limitkey:string, maxrateallowed:int > >, `nonterminatingmatchingrules` array< struct< ruleid:string, action:string > >, `requestheadersinserted` string, `responsecodesent` string, `httprequest` struct< clientip:string, country:string, headers:array< struct< name:string, value:string > >, uri:string, args:string, httpversion:string, httpmethod:string, requestid:string >, `labels` array< struct< name:string > > ) PARTITIONED BY ( day STRING ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://bucket/folder/' TBLPROPERTIES ( "projection.enabled" = "true", "projection.day.type" = "date", "projection.day.range" = "2021/01/01,NOW", "projection.day.format" = "yyyy/MM/dd", "projection.day.interval" = "1", "projection.day.interval.unit" = "DAYS", "storage.location.template" = "s3://bucket/folder/${day}" )

有关分区投影的更多信息,请参阅。Amazon Athena 分区投影.

Amazon WAF 日志的示例查询

在以下示例查询中,根据您的要求修改表名、列值和其他变量。若要提高查询的性能并降低成本,请在筛选条件中添加分区列。

统计包含指定术语的参考者的数量

以下查询计算指定日期范围内包含术语 “亚马逊” 的引用者数量。

WITH test_dataset AS (SELECT header FROM waf_logs CROSS JOIN UNNEST(httprequest.headers) AS t(header) WHERE day >= '2021/03/01' AND day < '2021/03/31') SELECT COUNT(*) referer_count FROM DATASET WHERE LOWER(header.name)='referer' AND header.value LIKE '%amazon%'

统计过去 10 天内与排除的规则匹配的所有匹配 IP 地址

以下查询计算过去 10 天内 IP 地址与规则组中的排除规则相匹配的次数。

WITH test_dataset AS (SELECT * FROM waf_logs CROSS JOIN UNNEST(rulegrouplist) AS t(allrulegroups)) SELECT COUNT(*) AS count, "httprequest"."clientip", "allrulegroups"."excludedrules", "allrulegroups"."ruleGroupId" FROM test_dataset WHERE allrulegroups.excludedrules IS NOT NULL AND from_unixtime(timestamp/1000) > now() - interval '10' day GROUP BY "httprequest"."clientip", "allrulegroups"."ruleGroupId", "allrulegroups"."excludedrules" ORDER BY count DESC

使用日期和时间

以人类可读 ISO 8601 格式返回时间戳字段

以下查询使用from_unixtimeto_iso8601函数返回timestamp字段为人类可读 ISO 8601 格式(例如2019-12-13T23:40:12.000ZINSTEAD OF1576280412771)。该查询还返回 HTTP 源名称、源 ID 和请求。

SELECT to_iso8601(from_unixtime(timestamp / 1000)) as time_ISO_8601, httpsourcename, httpsourceid, httprequest FROM waf_logs LIMIT 10;

过去 24 小时返回记录

以下查询使用WHERE子句返回过去 24 小时内记录的 HTTP 源名称、HTTP 源 ID 和 HTTP 请求字段。

SELECT to_iso8601(from_unixtime(timestamp/1000)) AS time_ISO_8601, httpsourcename, httpsourceid, httprequest FROM waf_logs WHERE from_unixtime(timestamp/1000) > now() - interval '1' day LIMIT 10;

返回指定日期范围和 IP 地址的记录

以下查询列出了指定的客户端 IP 地址在指定日期范围内的记录。

SELECT * FROM waf_logs WHERE httprequest.clientip='53.21.198.66' AND day >= '2021/03/01' AND day < '2021/03/31'

对于指定的日期范围,在五分钟间隔内计算 IP 地址数

对于特定日期范围,以下查询计算在五分钟间隔内的 IP 地址数。

WITH test_dataset AS (SELECT format_datetime(from_unixtime((timestamp/1000) - ((minute(from_unixtime(timestamp / 1000))%5) * 60)),'yyyy-MM-dd HH:mm') AS five_minutes_ts, "httprequest"."clientip" FROM waf_logs WHERE day >= '2021/03/01' AND day < '2021/03/31') SELECT five_minutes_ts,"clientip",count(*) ip_count FROM test_dataset GROUP BY five_minutes_ts,"clientip"

有关日期和时间函数的更多信息,请参阅。日期与时间函数和运算符在普雷斯托文档中。

使用阻止的请求和地址

提取被指定规则类型阻止的前 100 个 IP 地址

以下查询提取并统计已被RATE_BASED在指定的日期范围内终止规则。

SELECT COUNT(httpRequest.clientIp) as count, httpRequest.clientIp FROM waf_logs WHERE terminatingruletype='RATE_BASED' AND action='BLOCK' and day >= '2021/03/01' AND day < '2021/03/31' GROUP BY httpRequest.clientIp ORDER BY count DESC LIMIT 100

计算来自指定国家/地区的请求被阻止的次数

以下查询针对来自属于爱尔兰 (IE) IP 地址的请求,计算请求到达但被 RATE_BASED 终止规则阻止的次数。

SELECT COUNT(httpRequest.country) as count, httpRequest.country FROM waf_logs WHERE terminatingruletype='RATE_BASED' AND httpRequest.country='IE' GROUP BY httpRequest.country ORDER BY count LIMIT 100;

计算请求被阻止的次数,按特定属性分组

以下查询计算请求被阻止的次数,并按照 WebACL、RuleId、ClientIP 和 HTTP 请求 URI 对结果分组。

SELECT COUNT(*) AS count, webaclid, terminatingruleid, httprequest.clientip, httprequest.uri FROM waf_logs WHERE action='BLOCK' GROUP BY webaclid, terminatingruleid, httprequest.clientip, httprequest.uri ORDER BY count DESC LIMIT 100;

计算特定终止规则 ID 匹配的次数

以下查询计算特定终止规则 ID 匹配的次数 (WHERE terminatingruleid='e9dd190d-7a43-4c06-bcea-409613d9506e')。然后,查询按照 WebACL、操作、ClientIP 和 HTTP 请求 URI 对结果分组。

SELECT COUNT(*) AS count, webaclid, action, httprequest.clientip, httprequest.uri FROM waf_logs WHERE terminatingruleid='e9dd190d-7a43-4c06-bcea-409613d9506e' GROUP BY webaclid, action, httprequest.clientip, httprequest.uri ORDER BY count DESC LIMIT 100;

检索指定日期范围内阻止的前 100 个 IP 地址

以下查询提取在指定日期范围内被阻止的前 100 个 IP 地址。该查询还列出了 IP 地址被阻止的次数。

SELECT "httprequest"."clientip", "count"(*) "ipcount", "httprequest"."country" FROM waf_logs WHERE "action" = 'BLOCK' and day >= '2021/03/01' AND day < '2021/03/31' GROUP BY "httprequest"."clientip", "httprequest"."country" ORDER BY "ipcount" DESC limit 100