查询 Amazon CloudTrail 日志 - Amazon Athena
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

查询 Amazon CloudTrail 日志

Amazon CloudTrail 是一个为亚马逊云科技账号记录 Amazon API 调用和事件的服务。

CloudTrail 日志包含有关对您的所有 API 调用的详细信息Amazon Web Services,包括控制台。 CloudTrail 生成加密的日志文件并将其存储在 Amazon S3 中。有关更多信息,请参阅 Amazon CloudTrail 用户指南

注意

如果您想跨账户、区域和日期对 CloudTrail 事件信息执行 SQL 查询,请考虑使用 L CloudTrail ake。 CloudTrail Lake 是创建跟踪的Amazon替代方案,可将来自企业的信息聚合到单个可搜索的事件数据存储中。它不使用 Amazon S3 存储桶存储,而是将事件存储在某个数据湖中,从而支持更丰富、更快的查询。您可以使用它创建 SQL 查询,以便在自定义时间范围内跨组织和区域搜索事件。由于您在 CloudTrail 控制台本身内执行 CloudTrail Lake 查询,因此使用 CloudTrail Lake 不需要 Athena。有关更多信息,请参阅 CloudTrail Lake 文档。

将 Athena CloudTrail 与日志配合使用是增强活动分析的有力方法。Amazon Web Service例如,您可以使用查询来确定趋势,并根据属性 (如源 IP 地址或用户) 进一步隔离活动。

常见的应用是使用 CloudTrail 日志来分析操作活动,以确保安全性和合规性。有关详细示例的信息,请参阅 Amazon 大数据博客文章:使用 Amazon CloudTrail 和 Amazon Athena 分析安全性、合规性和运营活动

您可以使用 Athena 通过指定日志文件的 LOCATION 直接从 Amazon S3 查询这些日志文件。您可以通过两种方式之一来执行此操作:

  • 直接从 CloudTrail 控制台为 CloudTrail 日志文件创建表。

  • 在 Athena 控制台中手动创建 CloudTrail 日志文件表。

了解 CloudTrail 日志和 Athena 表

在开始创建表之前,您应该更多地了解它 CloudTrail 以及如何存储数据。无论您是通过 CloudTrail 控制台还是从 Athena 创建表,这都可以帮助您创建所需的表。

CloudTrail 将日志保存为 JSON 文本文件,采用 gzip 压缩格式 (*.json.gzip)。日志文件的位置取决于您如何设置跟踪、您正在记录的一个 Amazon Web Services 区域 或多个区域以及其他因素。

有关日志存储位置、JSON 结构和记录文件内容的更多信息,请参阅《Amazon CloudTrail 用户指南》中的以下主题:

要收集日志并将其保存到 Amazon S3,请 CloudTrail 从中启用Amazon Web Services Management Console。有关更多信息,请参阅《Amazon CloudTrail 用户指南》中的 创建跟踪记录

请记下您在其中保存日志的目标 Amazon S3 存储桶。将该LOCATION子句替换为 CloudTrail 日志位置的路径和要使用的对象集。该示例使用特定账户的日志的 LOCATION 值,但您可以使用最适合您的应用程序的明确度程度。

例如:

  • 要分析多个账户中的数据,您可以通过使用 LOCATION 's3://MyLogFiles/AWSLogs/',回滚 LOCATION 说明符来指示所有 AWSLogs

  • 要分析特定日期、账户和区域中的数据,请使用 LOCATION 's3://MyLogFiles/123456789012/CloudTrail/us-east-1/2016/03/14/'.

如果使用对象层次结构中的最高级别,则可以在使用 Athena 查询时获得最高程度的灵活度。

使用 CloudTrail 控制台为日志创建 Athena 表 CloudTrail

您可以创建非分区的 Athena 表,用于直接从控制台查询 CloudTrail 日志。 CloudTrail 要从 CloudTrail 控制台创建 Athena 表,您必须使用具有足够权限才能在 Athena 中创建表的角色登录。

注意

您无法使用 CloudTrail 控制台为组织跟踪日志创建 Athena 表。相反,请使用 Athena 控制台手动创建表,以便指定正确的存储位置。有关组织跟踪的信息,请参阅《Amazon CloudTrail 用户指南》中的为组织创建跟踪记录

使用控制台为跟踪创建 Athen CloudTrail a 表 CloudTrail
  1. 打开 CloudTrail 控制台,网址为 https://console.aws.amazon.com/cloudtrail/

  2. 在导航窗格中,选择事件历史记录

  3. 选择 Create Athena table(创建 Athena 表)。

    
                        选择 Create Athena table(创建 Athena 表)
  4. 对于 Storage location(存储位置),使用向下箭头选择其中存储了供跟踪查询的日志文件的 Amazon S3 存储桶。

    注意

    要查找与跟踪关联的存储桶的名称,请在 CloudTrail 导航窗格中选择 Trails,然后查看该跟踪的 S3 存储桶列。要查看 Amazon S3 中存储桶的位置,请在 S3 bucket(S3 存储桶)列中选择存储桶的链接。这将打开 CloudTrail 存储桶位置的 Amazon S3 控制台。

  5. 选择创建表。将使用包含 Amazon S3 存储桶名称的默认名称创建表。

使用手动分区在 Athena 中创建 CloudTrail 日志表

您可以在 Athena 控制台中手动创建 CloudTrail 日志文件表,然后在 Athena 中运行查询。

使用 Athena 控制台为轨迹创建 Athen CloudTrail a 表
  1. 将以下 DDL 语句复制并粘贴到 Athena 控制台查询编辑器中。

    CREATE EXTERNAL TABLE cloudtrail_logs ( eventversion STRING, useridentity STRUCT< type:STRING, principalid:STRING, arn:STRING, accountid:STRING, invokedby:STRING, accesskeyid:STRING, userName:STRING, sessioncontext:STRUCT< attributes:STRUCT< mfaauthenticated:STRING, creationdate:STRING>, sessionissuer:STRUCT< type:STRING, principalId:STRING, arn:STRING, accountId:STRING, userName:STRING>, ec2RoleDelivery:string, webIdFederationData:map<string,string> > >, eventtime STRING, eventsource STRING, eventname STRING, awsregion STRING, sourceipaddress STRING, useragent STRING, errorcode STRING, errormessage STRING, requestparameters STRING, responseelements STRING, additionaleventdata STRING, requestid STRING, eventid STRING, resources ARRAY<STRUCT< arn:STRING, accountid:STRING, type:STRING>>, eventtype STRING, apiversion STRING, readonly STRING, recipientaccountid STRING, serviceeventdetails STRING, sharedeventid STRING, vpcendpointid STRING, eventCategory STRING, tlsDetails struct< tlsVersion:string, cipherSuite:string, clientProvidedHostHeader:string> ) PARTITIONED BY (region string, year string, month string, day string) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://CloudTrail_bucket_name/AWSLogs/Account_ID/CloudTrail/';
    注意

    我们建议使用示例中org.apache.hive.hcatalog.data.JsonSerDe显示的。尽管com.amazon.emr.hive.serde.CloudTrailSerde存在 a,但它目前无法处理一些较新的 CloudTrail 字段。

  2. (可选)删除表格中所有非必填字段。如果您只需要读取一组特定的列,则您的表定义可以排除其他列。

  3. 修改 s3://CloudTrail_bucket_name/AWSLogs/Account_ID/CloudTrail/ 以指向包含您的日志数据的 Amazon S3 存储桶。

  4. 验证字段是否正确列出。有关 CloudTrail 记录中字段完整列表的更多信息,请参阅CloudTrail 记录内容

    下面的示例使用了 Hive JSON SerDe。在此示例中,字段 requestparametersresponseelementsadditionaleventdata 作为查询中的 STRING 类型列出,但在 JSON 中使用 STRUCT 数据类型。因此,要将数据移出这些字段,请使用 JSON_EXTRACT 函数。有关更多信息,请参阅 从 JSON 中提取数据。为了提高性能,此示例按 Amazon Web Services 区域、年份、月份和日期对数据进行分区。

  5. 在 Athena 控制台中运行 CREATE TABLE 语句。

  6. 使用 ALTER TABLE ADD PARTITION 命令加载分区,以便您可以查询它们,如以下示例所示。

    ALTER TABLE table_name ADD PARTITION (region='us-east-1', year='2019', month='02', day='01') LOCATION 's3://cloudtrail_bucket_name/AWSLogs/Account_ID/CloudTrail/us-east-1/2019/02/01/'

使用手动分区为整个组织范围的跟踪创建表

要在 Athena 中为组织范围的 CloudTrail 日志文件创建表,请按照使用手动分区在 Athena 中创建 CloudTrail 日志表中的步骤操作,但要按照以下步骤进行修改。

为组织范围的日志创建 Athena 表 CloudTrail
  1. CREATE TABLE 语句中,修改 LOCATION 子句以包含组织 ID,如下例所示:

    LOCATION 's3://cloudtrail_bucket_name/AWSLogs/organization_id/Account_ID/CloudTrail/'
  2. PARTITIONED BY 子句中,为账户 ID 添加一个字符串条目,如下例所示:

    PARTITIONED BY (account string, region string, year string, month string, day string)

    以下示例显示的是综合结果:

    ... PARTITIONED BY (account string, region string, year string, month string, day string) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://cloudtrail_bucket_name/AWSLogs/organization_id/Account_ID/CloudTrail/'
  3. ALTER TABLE 语句的 ADD PARTITION 子句包含账户 ID,如下例所示:

    ALTER TABLE table_name ADD PARTITION (account='111122223333', region='us-east-1', year='2022', month='08', day='08')
  4. ALTER TABLE 语句的 LOCATION 子句包含组织 ID、账户 ID 以及您要添加的分区,如下例所示:

    LOCATION 's3://cloudtrail_bucket_name/AWSLogs/organization_id/Account_ID/CloudTrail/us-east-1/2022/08/08/'

    以下示例 ALTER TABLE 语句显示的是综合结果:

    ALTER TABLE table_name ADD PARTITION (account='111122223333', region='us-east-1', year='2022', month='08', day='08') LOCATION 's3://cloudtrail_bucket_name/AWSLogs/organization_id/111122223333/CloudTrail/us-east-1/2022/08/08/'

使用分区投影为 Athena 中的 CloudTrail 日志创建表

由于 CloudTrail 日志具有已知结构,您可以提前指定其分区方案,因此您可以使用 Athena 分区投影功能缩短查询运行时间并自动管理分区。当添加新数据时,分区投影会自动添加新分区。这样就不必使用 ALTER TABLE ADD PARTITION 手动添加分区了。

以下示例CREATE TABLE语句自动对单个 CloudTrail 日志使用从指定日期到现在的分区投影Amazon Web Services 区域。在 LOCATIONstorage.location.template 子句中,将存储桶account-idaws-region 占位符替换为对应的相同值。对于 projection.timestamp.range,将 2020/01/01 替换为要使用的开始日期。成功运行查询后,您可以查询表。您无需运行 ALTER TABLE ADD PARTITION 来加载分区。

CREATE EXTERNAL TABLE cloudtrail_logs_pp( eventVersion STRING, userIdentity STRUCT< type: STRING, principalId: STRING, arn: STRING, accountId: STRING, invokedBy: STRING, accessKeyId: STRING, userName: STRING, sessionContext: STRUCT< attributes: STRUCT< mfaAuthenticated: STRING, creationDate: STRING>, sessionIssuer: STRUCT< type: STRING, principalId: STRING, arn: STRING, accountId: STRING, userName: STRING>, ec2RoleDelivery:string, webIdFederationData:map<string,string> > >, eventTime STRING, eventSource STRING, eventName STRING, awsRegion STRING, sourceIpAddress STRING, userAgent STRING, errorCode STRING, errorMessage STRING, requestparameters STRING, responseelements STRING, additionaleventdata STRING, requestId STRING, eventId STRING, readOnly STRING, resources ARRAY<STRUCT< arn: STRING, accountId: STRING, type: STRING>>, eventType STRING, apiVersion STRING, recipientAccountId STRING, serviceEventDetails STRING, sharedEventID STRING, vpcendpointid STRING, eventCategory STRING, tlsDetails struct< tlsVersion:string, cipherSuite:string, clientProvidedHostHeader:string> ) PARTITIONED BY ( `timestamp` string) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://bucket/AWSLogs/account-id/CloudTrail/aws-region' TBLPROPERTIES ( 'projection.enabled'='true', 'projection.timestamp.format'='yyyy/MM/dd', 'projection.timestamp.interval'='1', 'projection.timestamp.interval.unit'='DAYS', 'projection.timestamp.range'='2020/01/01,NOW', 'projection.timestamp.type'='date', 'storage.location.template'='s3://bucket/AWSLogs/account-id/CloudTrail/aws-region/${timestamp}')

更多有关分区投影的信息,请参阅 使用 Amazon Athena 分区投影

查询嵌套字段

由于 userIdentityresources 字段是嵌套的数据类型,查询这些内容需要特殊处理。

userIdentity 对象由嵌套 STRUCT 类型组成。可以使用点分隔字段以分隔待查询的字段,如下例所示:

SELECT eventsource, eventname, useridentity.sessioncontext.attributes.creationdate, useridentity.sessioncontext.sessionissuer.arn FROM cloudtrail_logs WHERE useridentity.sessioncontext.sessionissuer.arn IS NOT NULL ORDER BY eventsource, eventname LIMIT 10

resources 字段是一个 STRUCT 对象数组。对于这些数组,请使用 CROSS JOIN UNNEST 来取消嵌套数组,以便您可以查询其对象。

下面的示例将返回资源 ARN 以 example/datafile.txt 结尾的所有行。为了便于读取,replace 函数将从 ARN 中删除初始 arn:aws:s3::: 子字符串。

SELECT awsregion, replace(unnested.resources_entry.ARN,'arn:aws:s3:::') as s3_resource, eventname, eventtime, useragent FROM cloudtrail_logs t CROSS JOIN UNNEST(t.resources) unnested (resources_entry) WHERE unnested.resources_entry.ARN LIKE '%example/datafile.txt' ORDER BY eventtime

以下是 DeleteBucket 事件的示例查询。查询将从 resources 对象中提取存储桶的名称以及存储桶所属的账户 ID。

SELECT awsregion, replace(unnested.resources_entry.ARN,'arn:aws:s3:::') as deleted_bucket, eventtime AS time_deleted, useridentity.username, unnested.resources_entry.accountid as bucket_acct_id FROM cloudtrail_logs t CROSS JOIN UNNEST(t.resources) unnested (resources_entry) WHERE eventname = 'DeleteBucket' ORDER BY eventtime

有关取消嵌套的更多信息,请参阅 筛选数组

示例查询

以下示例显示了从为 CloudTrail 事件日志创建的表中返回所有匿名(未签名)请求的查询的一部分。此查询选择 useridentity.accountid 匿名并且 useridentity.arn 未指定的那些请求:

SELECT * FROM cloudtrail_logs WHERE eventsource = 's3.amazonaws.com' AND eventname in ('GetObject') AND useridentity.accountid = 'anonymous' AND useridentity.arn IS NULL AND requestparameters LIKE '%[your bucket name ]%';

有关更多信息,请参阅 Amazon 大数据博客文章:使用 Amazon CloudTrail 和 Amazon Athena 分析安全性、合规性和运营活动

查询 CloudTrail 日志的提示

要浏览 CloudTrail 日志数据,请使用以下提示:

  • 在查询日志之前,请验证您的日志表是否看似与使用手动分区在 Athena 中创建 CloudTrail 日志表中的表一样。如果不是第一个表,请使用以下命令删除现有表:DROP TABLE cloudtrail_logs

  • 删除现有表后,重新创建它。有关更多信息,请参阅 使用手动分区在 Athena 中创建 CloudTrail 日志表

    确认正确列出了 Athena 查询中的字段。有关 CloudTrail 记录中字段的完整列表的信息,请参阅CloudTrail 记录内容

    如果您的查询包含 JSON 格式的字段,例如 STRUCT,请从 JSON 中提取数据。有关更多信息,请参阅 从 JSON 中提取数据

    针对您的 CloudTrail 表格发出查询的一些建议:

  • 首先查看哪些 用户调用了哪些 API 操作以及来自哪些源 IP 地址。

  • 将以下基本 SQL 查询用作您的模板。将查询粘贴到 Athena 控制台并运行它。

    SELECT useridentity.arn, eventname, sourceipaddress, eventtime FROM cloudtrail_logs LIMIT 100;
  • 修改查询以进一步探索您的数据。

  • 为了提高性能,请包含 LIMIT 子句以返回指定的行子集。