查询 Amazon VPC 流日志 - Amazon Athena
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

查询 Amazon VPC 流日志

Amazon Virtual Private Cloud 流日志捕获有关在 VPC 中传入和传出网络接口的 IP 流量的信息。可使用日志来调查网络流量模式,并识别 VPC 网络中的威胁和风险。

若要查询 Amazon VPC 流日志,您有两种选择:

  • 亚马逊 VPC 控制台 — 使用 Amazon VPC 控制台中的 Athena 集成功能生成 Amazon CloudFormation 一个模板,该模板可为您创建带有分区功能的 Athena 数据库、工作组和流日志表。该模板还会创建一组预定义的流日志查询,可用于获取有关流经 VPC 的流量的洞察。

    有关此方法更多信息,请参阅《Amazon VPC 用户指南》中的使用 Amazon Athena 查询流日志

  • Amazon Athena 控制台 – 直接在 Athena 控制台中创建表和查询。有关更多信息,请继续阅读此页面。

创建和查询自定义 VPC 流日志表

当您开始在 Athena 中查询日志之前,启用 VPC 流日志,并将其配置为保存到您的 Amazon S3 存储桶。在您创建日志后,让它们运行几分钟以收集一些数据。这些日志是采用 Athena 允许您直接查询的 GZIP 压缩格式创建的。

创建 VPC 流日志时,当您希望指定在流日志中返回的字段以及这些字段的显示顺序时,请使用自定义格式。有关流日志记录的更多信息,请参阅《Amazon VPC 用户指南》中的流日志记录

常见注意事项

在 Athena 中为 Amazon VPC 流日志创建表时,请记住以下几点:

  • 默认情况下,在 Athena 中,Parquet 将按名称访问列。有关更多信息,请参阅处理架构更新

  • 使用流日志记录中的名称作为 Athena 中列的名称。Athena 架构中列的名称应与 Amazon VPC 流日志中的字段名称完全匹配,但有以下不同之处:

    • 将 Amazon VPC 日志字段名称中的连字符替换为 Athena 列名称中的下划线。在 Athena 中,数据库名称、表名称和列名称仅可接受小写字母、数字和下划线字符。有关更多信息,请参阅数据库、表和列名称

    • 在 Athena 中,使用反引号将保留关键字中的流日志记录名称括起来,将其转义。

  • VPC 流日志是 Amazon Web Services 账户 特定的。当您将日志文件发布到 Amazon S3 时,在 Amazon S3 中创建的 Amazon VPC 的路径包含用于创建流日志的 Amazon Web Services 账户 ID。有关更多信息,请参阅 Amazon VPC 用户指南中的将流日志发布到 Amazon S3

适用于 Amazon VPC 流日志的 CREATE TABLE 语句

以下过程将为 Amazon VPC 流日志创建 Amazon VPC 表。使用自定义格式创建流日志时,可以创建一个表,其字段与您在创建流日志时指定的字段相匹配,且字段顺序与您指定的字段顺序相同。

为 Amazon VPC 流日志创建 Athena 表
  1. 按照 常见注意事项 部分中的准则,在 Athena 控制台查询编辑器中输入类似以下内容的 DDL 语句。此示例语句创建一个表,其中包含 Amazon VPC 流日志版本 2 到 5 的列,如流日志记录中所示。如果使用不同的列集或列顺序,请相应地修改语句。

    CREATE EXTERNAL TABLE IF NOT EXISTS `vpc_flow_logs` ( version int, account_id string, interface_id string, srcaddr string, dstaddr string, srcport int, dstport int, protocol bigint, packets bigint, bytes bigint, start bigint, `end` bigint, action string, log_status string, vpc_id string, subnet_id string, instance_id string, tcp_flags int, type string, pkt_srcaddr string, pkt_dstaddr string, region string, az_id string, sublocation_type string, sublocation_id string, pkt_src_aws_service string, pkt_dst_aws_service string, flow_direction string, traffic_path int ) PARTITIONED BY (`date` date) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/AWSLogs/{account_id}/vpcflowlogs/{region_code}/' TBLPROPERTIES ("skip.header.line.count"="1");

    请注意以下几点:

    • 该查询指定但ROW FORMAT DELIMITED省略了指定。 SerDe这意味着查询使用用于 CSV、TSV 和自定义分隔文件的 LazySimpleSerDe。在此查询中,字段由一个空格终止。

    • PARTITIONED BY 子句使用 date 类型。这样,就可以在查询中使用数学运算符来选择比特定日期更早或更新的内容。

      注意

      因为 date 在 DDL 语句中是保留关键字,所以用反引号字符对其进行转义。有关更多信息,请参阅保留关键字

    • 对于具有不同自定义格式的 VPC 流日志,请修改字段以与您创建流日志时指定的字段相匹配。

  2. 修改 LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/AWSLogs/{account_id}/vpcflowlogs/{region_code}/' 以指向包含您的日志数据的 Amazon S3 存储桶。

  3. 在 Athena 控制台中运行查询。查询完成后,Athena 将注册 vpc_flow_logs 表,使其中的数据可以供您发出查询。

  4. 创建分区以便能够读取数据,如以下示例查询中所示。此示例查询创建指定日期的单个分区。根据需要替换日期和位置的占位符。

    注意

    此查询仅为您指定的日期创建单个分区。若要自动执行此过程,请使用运行此查询并以这种方式为 year/month/day 创建分区的脚本,或使用 CREATE TABLE 语句指定分区投影

    ALTER TABLE vpc_flow_logs ADD PARTITION (`date`='YYYY-MM-dd') LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/AWSLogs/{account_id}/vpcflowlogs/{region_code}/YYYY/MM/dd';

vpc_flow_logs 表的查询示例

使用 Athena 控制台中的查询编辑器在创建的表上运行 SQL 语句。您可以保存查询、查看之前的查询或下载 CSV 格式的查询结果。在以下示例中,将 vpc_flow_logs 替换为表名称。根据您的要求修改列值和其他变量。

以下示例查询列出了指定日期的最多 100 个流日志。

SELECT * FROM vpc_flow_logs WHERE date = DATE('2020-05-04') LIMIT 100;

以下查询列出所有被拒绝的 TCP 连接并使用新创建的日期分区列 date 来从中提取这些事件发生的星期几。

SELECT day_of_week(date) AS day, date, interface_id, srcaddr, action, protocol FROM vpc_flow_logs WHERE action = 'REJECT' AND protocol = 6 LIMIT 100;

若要查看哪个服务器收到最大数量的 HTTPS 请求,请使用以下查询。它计算在 HTTPS 端口 443 上接收的数据包数,按目标 IP 地址对它们进行分组,并返回上一周的前 10 个。

SELECT SUM(packets) AS packetcount, dstaddr FROM vpc_flow_logs WHERE dstport = 443 AND date > current_date - interval '7' day GROUP BY dstaddr ORDER BY packetcount DESC LIMIT 10;

以 Apache Parquet 格式为流日志创建表

以下过程将以 Apache Parquet 格式为 Amazon VPC 流日志创建 Amazon VPC 表。

以 Parquet 格式为 Amazon VPC 流日志创建 Athena 表
  1. 按照 常见注意事项 部分中的准则,在 Athena 控制台查询编辑器中输入类似以下内容的 DDL 语句。以下示例语句创建一个表,其中包含 Amazon VPC 流日志版本 2 到 5 的列,如 Parquet 格式的流日志记录中所示,Hive 按小时分区。如果您没有按小时分区,请删除 PARTITIONED BY 子句中的 hour

    CREATE EXTERNAL TABLE IF NOT EXISTS vpc_flow_logs_parquet ( version int, account_id string, interface_id string, srcaddr string, dstaddr string, srcport int, dstport int, protocol bigint, packets bigint, bytes bigint, start bigint, `end` bigint, action string, log_status string, vpc_id string, subnet_id string, instance_id string, tcp_flags int, type string, pkt_srcaddr string, pkt_dstaddr string, region string, az_id string, sublocation_type string, sublocation_id string, pkt_src_aws_service string, pkt_dst_aws_service string, flow_direction string, traffic_path int ) PARTITIONED BY ( `aws-account-id` string, `aws-service` string, `aws-region` string, `year` string, `month` string, `day` string, `hour` string ) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/AWSLogs/' TBLPROPERTIES ( 'EXTERNAL'='true', 'skip.header.line.count'='1' )
  2. 修改示例 LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/AWSLogs/' 以指向包含您的日志数据的 Amazon S3 路径。

  3. 在 Athena 控制台中运行查询。

  4. 如果数据采用 Hive 兼容格式,请在 Athena 控制台中运行以下命令,更新并加载元存储中的 Hive 分区。查询完成后,可以查询 vpc_flow_logs_parquet 表中的数据。

    MSCK REPAIR TABLE vpc_flow_logs_parquet

    如果您使用的不是 Hive 兼容数据,请运行 ALTER TABLE ADD PARTITION 以加载分区。

有关使用 Athena 查询 Parquet 格式 Amazon VPC 流日志的更多信息,请参阅 Amazon 大数据博客中的文章 使用 Apache Pparquet 格式的 VPC 流日志优化性能并降低网络分析成本

使用分区投影创建和查询 Amazon VPC 流日志表

使用如下所示的 CREATE TABLE 语句创建表、对表进行分区并使用分区投影自动填充分区。将示例中的表名称 test_table_vpclogs 替换为您的表名称。编辑 LOCATION 子句以指定包含 Amazon VPC 日志数据的 Amazon S3 存储桶。

以下 CREATE TABLE 语句适用于以非 Hive 样式的分区格式传送的 VPC 流日志。该示例支持多账户聚合。要将来自多个账户的 VPC 流日志集中到一个 Amazon S3 存储桶中,必须在 Amazon S3 路径中输入账户 ID。

CREATE EXTERNAL TABLE IF NOT EXISTS test_table_vpclogs ( version int, account_id string, interface_id string, srcaddr string, dstaddr string, srcport int, dstport int, protocol bigint, packets bigint, bytes bigint, start bigint, `end` bigint, action string, log_status string, vpc_id string, subnet_id string, instance_id string, tcp_flags int, type string, pkt_srcaddr string, pkt_dstaddr string, az_id string, sublocation_type string, sublocation_id string, pkt_src_aws_service string, pkt_dst_aws_service string, flow_direction string, traffic_path int ) PARTITIONED BY (accid string, region string, day string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' LOCATION '$LOCATION_OF_LOGS' TBLPROPERTIES ( "skip.header.line.count"="1", "projection.enabled" = "true", "projection.accid.type" = "enum", "projection.accid.values" = "$ACCID_1,$ACCID_2", "projection.region.type" = "enum", "projection.region.values" = "$REGION_1,$REGION_2,$REGION_3", "projection.day.type" = "date", "projection.day.range" = "$START_RANGE,NOW", "projection.day.format" = "yyyy/MM/dd", "storage.location.template" = "s3://$LOCATION_OF_LOGS/AWSLogs/${accid}/vpcflowlogs/${region}/${day}" )

test_table_vpclogs 表的查询示例

以下示例查询将查询之前 CREATE TABLE 语句创建的 test_table_vpclogs。将查询中的 test_table_vpclogs 替换为您自己的表名称。根据您的要求修改列值和其他变量。

若要在指定时间段内按时间顺序返回前 100 个访问日志条目,请运行如下所示的查询。

SELECT * FROM test_table_vpclogs WHERE day >= '2021/02/01' AND day < '2021/02/28' ORDER BY day ASC LIMIT 100

若要查看哪个服务器在指定时间段内接收前十个 HTTP 数据包,请运行如下所示的查询。该查询计算在 HTTPS 端口 443 上接收的数据包数,按目标 IP 地址对其进行分组,并返回上一周的前 10 个条目。

SELECT SUM(packets) AS packetcount, dstaddr FROM test_table_vpclogs WHERE dstport = 443 AND day >= '2021/03/01' AND day < '2021/03/31' GROUP BY dstaddr ORDER BY packetcount DESC LIMIT 10

若要返回在指定时间段内创建的日志,请运行如下所示的查询。

SELECT interface_id, srcaddr, action, protocol, to_iso8601(from_unixtime(start)) AS start_time, to_iso8601(from_unixtime("end")) AS end_time FROM test_table_vpclogs WHERE DAY >= '2021/04/01' AND DAY < '2021/04/30'

若要返回指定时间段内源 IP 地址的访问日志,请运行如下所示的查询。

SELECT * FROM test_table_vpclogs WHERE srcaddr = '10.117.1.22' AND day >= '2021/02/01' AND day < '2021/02/28'

若要列出已被拒绝的 TCP 连接,请运行如下所示的查询。

SELECT day, interface_id, srcaddr, action, protocol FROM test_table_vpclogs WHERE action = 'REJECT' AND protocol = 6 AND day >= '2021/02/01' AND day < '2021/02/28' LIMIT 10

若要返回以 10.117 开头的 IP 地址范围的访问日志,请运行如下所示的查询。

SELECT * FROM test_table_vpclogs WHERE split_part(srcaddr,'.', 1)='10' AND split_part(srcaddr,'.', 2) ='117'

若要返回某个时间范围内目标 IP 地址的访问日志,请运行如下所示的查询。

SELECT * FROM test_table_vpclogs WHERE dstaddr = '10.0.1.14' AND day >= '2021/01/01' AND day < '2021/01/31'

使用分区投影并以 Apache Parquet 格式为流日志创建表

以下用于 VPC 流日志的分区投影 CREATE TABLE 语句采用 Apache Parquet 格式,不兼容 Hive,按小时和日期而不是按天分区。将示例中的表名称 test_table_vpclogs_parquet 替换为您的表名称。编辑 LOCATION 子句以指定包含 Amazon VPC 日志数据的 Amazon S3 存储桶。

CREATE EXTERNAL TABLE IF NOT EXISTS test_table_vpclogs_parquet ( version int, account_id string, interface_id string, srcaddr string, dstaddr string, srcport int, dstport int, protocol bigint, packets bigint, bytes bigint, start bigint, `end` bigint, action string, log_status string, vpc_id string, subnet_id string, instance_id string, tcp_flags int, type string, pkt_srcaddr string, pkt_dstaddr string, az_id string, sublocation_type string, sublocation_id string, pkt_src_aws_service string, pkt_dst_aws_service string, flow_direction string, traffic_path int ) PARTITIONED BY (region string, date string, hour string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/AWSLogs/{account_id}/vpcflowlogs/' TBLPROPERTIES ( "EXTERNAL"="true", "skip.header.line.count" = "1", "projection.enabled" = "true", "projection.region.type" = "enum", "projection.region.values" = "us-east-1,us-west-2,ap-south-1,eu-west-1", "projection.date.type" = "date", "projection.date.range" = "2021/01/01,NOW", "projection.date.format" = "yyyy/MM/dd", "projection.hour.type" = "integer", "projection.hour.range" = "00,23", "projection.hour.digits" = "2", "storage.location.template" = "s3://DOC-EXAMPLE-BUCKET/prefix/AWSLogs/${account_id}/vpcflowlogs/${region}/${date}/${hour}" )

其他 资源

有关使用 Athena 分析 VPC 流日志的更多信息,请参阅以下 Amazon 大数据博客文章: