查询 Amazon VPC 流日志 - Amazon Athena
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

查询 Amazon VPC 流日志

Amazon Virtual Private Cloud 流日志捕获有关在 VPC 中传入和传出网络接口的 IP 流量的信息。可使用日志来调查网络流量模式,并识别 VPC 网络中的威胁和风险。

若要查询 Amazon VPC 流日志,您有两种选择:

  • Amazon VPC 控制台 – 使用 Amazon VPC 控制台中的 Athena 集成功能生成 Amazon CloudFormation 模板,用于创建 Athena 数据库、工作组和流日志表并为您进行分区。该模板还会创建一组预定义的流日志查询,可用于获取有关流经 VPC 的流量的洞察。

    有关此方法更多信息,请参阅《Amazon VPC 用户指南》中的使用 Amazon Athena 查询流日志

  • Amazon Athena 控制台 – 直接在 Athena 控制台中创建表和查询。有关更多信息,请阅读此页面。

创建和查询 VPC 流日志表

当您开始在 Athena 中查询日志之前,启用 VPC 流日志,并将其配置为保存到您的 Amazon S3 存储桶。在您创建日志后,让它们运行几分钟以收集一些数据。这些日志是采用 Athena 允许您直接查询的 GZIP 压缩格式创建的。

当您创建 VPC 流日志时,可以使用默认格式,也可以指定自定义格式。当您希望指定在流日志中返回的字段以及这些字段的显示顺序时,请使用自定义格式。有关更多信息,请参阅《Amazon VPC 用户指南》中的流日志记录

常见注意事项

在 Athena 中为 Amazon VPC 流日志创建表时,请记住以下几点:

  • 默认情况下,在 Athena 中,Parquet 将按名称访问列。有关更多信息,请参阅 处理架构更新

  • 使用流日志记录中的名称作为 Athena 中列的名称。Athena 架构中列的名称应与 Amazon VPC 流日志中的字段名称完全匹配,但有以下不同之处:

    • 将 Amazon VPC 日志字段名称中的连字符替换为 Athena 列名称中的下划线。在 Athena 中,数据库名称、表名称和列名称仅可接受小写字母、数字和下划线字符。有关更多信息,请参阅 数据库、表和列名称

    • 在 Athena 中,使用反引号将保留关键字中的流日志记录名称括起来,将其转义。

适用于 Amazon VPC 流日志的 CREATE TABLE 语句

以下过程将为 Amazon VPC 流日志创建 Amazon VPC 表。如果您使用自定义格式创建流日志,那么您必须创建一个表,其字段与您在创建流日志时指定的字段相匹配,且字段顺序与您指定的字段顺序相同。

为 Amazon VPC 流日志创建 Athena 表

  1. 按照 常见注意事项 部分中的准则,在 Athena 控制台查询编辑器中输入类似以下内容的 DDL 语句。此示例语句创建一个表,其中包含 Amazon VPC 流日志版本 2 到 5 的列,如流日志记录中所示。如果使用不同的列集或列顺序,请相应地修改语句。

    CREATE EXTERNAL TABLE IF NOT EXISTS `vpc_flow_logs` ( `version` int, `account_id` string, `interface_id` string, `srcaddr` string, `dstaddr` string, `srcport` int, `dstport` int, `protocol` bigint, `packets` bigint, `bytes` bigint, `start` bigint, `end` bigint, `action` string, `log_status` string, `vpc_id` string, `subnet_id` string, `instance_id` string, `tcp_flags` int, `type` string, `pkt_srcaddr` string, `pkt_dstaddr` string, `region` string, `az_id` string, `sublocation_type` string, `sublocation_id` string, `pkt_src_aws_service` string, `pkt_dst_aws_service` string, `flow_direction` string, `traffic_path` int ) PARTITIONED BY (`date` date) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/AWSLogs/{account_id}/vpcflowlogs/{region_code}/' TBLPROPERTIES ("skip.header.line.count"="1");

    请注意以下几点:

    • 此查询指定 ROW FORMAT DELIMITED 并省略指定 SerDe。这意味着查询使用用于 CSV、TSV 和自定义分隔文件的 LazySimpleSerDe。在此查询中,字段由一个空格终止。

    • PARTITIONED BY 子句使用 date 类型。这样,就可以在查询中使用数学运算符来选择比特定日期更早或更新的内容。

      注意

      因为 date 在 DDL 语句中是保留关键字,所以用反引号字符对其进行转义。有关更多信息,请参阅 保留关键字

    • 对于具有自定义格式的 VPC 流日志,请修改字段以与您创建流日志时指定的字段相匹配。

  2. 修改 LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/AWSLogs/{account_id}/vpcflowlogs/{region_code}/' 以指向包含您的日志数据的 Amazon S3 存储桶。

  3. 在 Athena 控制台中运行查询。查询完成后,Athena 将注册 vpc_flow_logs 表,使其中的数据可以供您发出查询。

  4. 创建分区以便能够读取数据,如以下示例查询中所示。此示例查询创建指定日期的单个分区。根据需要替换日期和位置的占位符。

    注意

    此查询仅为您指定的日期创建单个分区。若要自动执行此过程,请使用运行此查询并以这种方式为 year/month/day 创建分区的脚本,或使用 CREATE TABLE 语句指定分区投影

    ALTER TABLE vpc_flow_logs ADD PARTITION (`date`='YYYY-MM-dd') LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/AWSLogs/{account_id}/vpcflowlogs/{region_code}/YYYY/MM/dd';

vpc_flow_logs 表的查询示例

使用 Athena 控制台中的查询编辑器在创建的表上运行 SQL 语句。您可以保存查询、查看之前的查询或下载 CSV 格式的查询结果。在以下示例中,将 vpc_flow_logs 替换为表名称。根据您的要求修改列值和其他变量。

以下示例查询列出了指定日期的最多 100 个流日志。

SELECT * FROM vpc_flow_logs WHERE date = DATE('2020-05-04') LIMIT 100;

以下查询列出所有被拒绝的 TCP 连接并使用新创建的日期分区列 date 来从中提取这些事件发生的星期几。

SELECT day_of_week(date) AS day, date, interface_id, srcaddr, action, protocol FROM vpc_flow_logs WHERE action = 'REJECT' AND protocol = 6 LIMIT 100;

若要查看哪个服务器收到最大数量的 HTTPS 请求,请使用以下查询。它计算在 HTTPS 端口 443 上接收的数据包数,按目标 IP 地址对它们进行分组,并返回上一周的前 10 个。

SELECT SUM(numpackets) AS packetcount, dstaddr FROM vpc_flow_logs WHERE dstport = 443 AND date > current_date - interval '7' day GROUP BY dstaddr ORDER BY packetcount DESC LIMIT 10;

以 Parquet 格式为流日志创建表

以下过程将以 Parquet 格式为 Amazon VPC 流日志创建 Amazon VPC 表。

以 Parquet 格式为 Amazon VPC 流日志创建 Athena 表

  1. 按照 常见注意事项 部分中的准则,在 Athena 控制台查询编辑器中输入类似以下内容的 DDL 语句。以下示例语句创建一个表,其中包含 VPC 流日志版本 2 到 5 的列,如 Parquet 格式的流日志记录中所示,Hive 按小时分区。如果您没有按小时分区,请删除 PARTITIONED BY 子句中的 hour

    CREATE EXTERNAL TABLE IF NOT EXISTS vpc_flow_logs ( `version` int, `account_id` string, `interface_id` string, `srcaddr` string, `dstaddr` string, `srcport` int, `dstport` int, `protocol` bigint, `packets` bigint, `bytes` bigint, `start` bigint, `end` bigint, `action` string, `log_status` string, `vpc_id` string, `subnet_id` string, `instance_id` string, `tcp_flags` int, `type` string, `pkt_srcaddr` string, `pkt_dstaddr` string, `region` string, `az_id` string, `sublocation_type` string, `sublocation_id` string, `pkt_src_aws_service` string, `pkt_dst_aws_service` string, `flow_direction` string, `traffic_path` int ) PARTITIONED BY ( `aws-account-id` string, `aws-service` string, `aws-region` string, `year` string, `month` string, `day` string, `hour` string ) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/AWSLogs/{account_id}/vpcflowlogs/{region_code}/' TBLPROPERTIES ( 'EXTERNAL'='true', 'skip.header.line.count'='1' )
  2. 修改 LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/AWSLogs/{account_id}/vpcflowlogs/{region_code}/' 以指向包含您的日志数据的 Amazon S3 存储桶。

  3. 在 Athena 控制台中运行查询。

  4. 通过在 Athena 控制台中运行以下命令,更新元数据仓中的 Hive 分区。查询完成后,可以查询 vpc_flow_logs 表中的数据。

    MSCK REPAIR TABLE vpc_flow_logs

使用分区投影创建和查询 Amazon VPC 流日志表

使用如下所示的 CREATE TABLE 语句创建表、对表进行分区并使用分区投影自动填充分区。将示例中的表名称 test_table_vpclogs 替换为您的表名称。编辑 LOCATION 子句以指定包含 Amazon VPC 日志数据的 Amazon S3 存储桶。

以下 CREATE TABLE 语句适用于以非 Hive 样式的分区格式传送的 VPC 流日志。

CREATE EXTERNAL TABLE IF NOT EXISTS test_table_vpclogs ( `version` int, `account_id` string, `interface_id` string, `srcaddr` string, `dstaddr` string, `srcport` int, `dstport` int, `protocol` bigint, `packets` bigint, `bytes` bigint, `start` bigint, `end` bigint, `action` string, `log_status` string, `vpc_id` string, `subnet_id` string, `instance_id` string, `tcp_flags` int, `type` string, `pkt_srcaddr` string, `pkt_dstaddr` string, `az_id` string, `sublocation_type` string, `sublocation_id` string, `pkt_src_aws_service` string, `pkt_dst_aws_service` string, `flow_direction` string, `traffic_path` int ) PARTITIONED BY (region string, day string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/AWSLogs/{account_id}/vpcflowlogs/' TBLPROPERTIES ( "skip.header.line.count"="1", "projection.enabled" = "true", "projection.region.type" = "enum", "projection.region.values" = "us-east-1,us-west-2,ap-south-1,eu-west-1", "projection.day.type" = "date", "projection.day.range" = "2021/01/01,NOW", "projection.day.format" = "yyyy/MM/dd", "storage.location.template" = "s3://DOC-EXAMPLE-BUCKET/prefix/AWSLogs/{account_id}/vpcflowlogs/${region}/${day}" )

test_table_vpclogs 表的查询示例

以下示例查询将查询之前 CREATE TABLE 语句创建的 test_table_vpclogs。将查询中的 test_table_vpclogs 替换为您自己的表名称。根据您的要求修改列值和其他变量。

若要在指定时间段内按时间顺序返回前 100 个访问日志条目,请运行如下所示的查询。

SELECT * FROM test_table_vpclogs WHERE day >= '2021/02/01' AND day < '2021/02/28' ORDER BY day ASC LIMIT 100

若要查看哪个服务器在指定时间段内接收前十个 HTTP 数据包,请运行如下所示的查询。该查询计算在 HTTPS 端口 443 上接收的数据包数,按目标 IP 地址对其进行分组,并返回上一周的前 10 个条目。

SELECT SUM(packets) AS packetcount, dstaddr FROM test_table_vpclogs WHERE dstport = 443 AND day >= '2021/03/01' AND day < '2021/03/31' GROUP BY dstaddr ORDER BY packetcount DESC LIMIT 10

若要返回在指定时间段内创建的日志,请运行如下所示的查询。

SELECT interface_id, srcaddr, action, protocol, to_iso8601(from_unixtime(start)) AS start_time, to_iso8601(from_unixtime(end)) AS end_time FROM test_table_vpclogs WHERE DAY >= '2021/04/01' AND DAY < '2021/04/30'

若要返回指定时间段内源 IP 地址的访问日志,请运行如下所示的查询。

SELECT * FROM test_table_vpclogs WHERE srcaddr = '10.117.1.22' AND day >= '2021/02/01' AND day < '2021/02/28'

若要列出已被拒绝的 TCP 连接,请运行如下所示的查询。

SELECT day, interface_id, srcaddr, action, protocol FROM test_table_vpclogs WHERE action = 'REJECT' AND protocol = 6 AND day >= '2021/02/01' AND day < '2021/02/28' LIMIT 10

若要返回以 10.117 开头的 IP 地址范围的访问日志,请运行如下所示的查询。

SELECT * FROM test_table_vpclogs WHERE split_part(srcaddr,'.', 1)='10' AND split_part(srcaddr,'.', 2) ='117'

若要返回某个时间范围内目标 IP 地址的访问日志,请运行如下所示的查询。

SELECT * FROM test_table_vpclogs WHERE dstaddr = '10.0.1.14' AND day >= '2021/01/01' AND day < '2021/01/31'

其他资源

有关更多信息,请参阅 Amazon 大数据博客文章使用 Amazon Kinesis Firehose、Athena 和 Amazon QuickSight 分析 VPC 流日志。博客文章使用的是 VPC 流日志的版本 2。